From 3a3fe1f5466c5a8b5447eeac7f6be52589c25b0b Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 19 Dec 2019 10:20:25 -0500 Subject: [PATCH] buffer rabbitMQ & influxdb writes --- monitoring.py | 20 ------------------ requirements.txt | 3 ++- run.py | 53 +++++++++++++++++++++++++++++++----------------- 3 files changed, 36 insertions(+), 40 deletions(-) delete mode 100644 monitoring.py diff --git a/monitoring.py b/monitoring.py deleted file mode 100644 index 7325d93..0000000 --- a/monitoring.py +++ /dev/null @@ -1,20 +0,0 @@ -from influxdb import InfluxDBClient - -client = InfluxDBClient("localhost", 8086, "root", "root", "reddit_feed") - - -def init(): - db_exists = False - for db in client.get_list_database(): - if db["name"] == "reddit_feed": - db_exists = True - break - - if not db_exists: - client.create_database("reddit_feed") - - -def log(event): - client.write_points(event) - - diff --git a/requirements.txt b/requirements.txt index 75ee2ac..d8aa751 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ praw pika influxdb -psaw \ No newline at end of file +psaw +git+git://github.com/simon987/hexlib.git diff --git a/run.py b/run.py index 5216dce..40dfb97 100755 --- a/run.py +++ b/run.py @@ -7,6 +7,7 @@ import sys import threading import time import traceback +from collections import defaultdict from datetime import datetime, timedelta from itertools import islice from logging import FileHandler, StreamHandler @@ -14,10 +15,11 @@ from queue import Queue import pika import praw +from hexlib.misc import buffered +from hexlib.monitoring import Monitoring from praw.endpoints import API_PATH from praw.models import Comment -import monitoring from post_process import post_process from rate_limiter import GoodRateLimiter from util import update_cursor, read_cursor, reddit_ids @@ -41,7 +43,10 @@ def connect(): connect() REALTIME_DELAY = timedelta(seconds=60) -MONITORING = False +MONITORING = True + +if MONITORING: + monitoring = Monitoring("reddit_feed", logger=logger, batch_size=50, flush_on_exit=True) formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') file_handler = FileHandler("reddit_feed.log") @@ -100,32 +105,44 @@ def serialize(thing): } +@buffered(batch_size=10000, flush_on_exit=True) +def _publish_buffered(items): + buckets = defaultdict(list) + for item in items: + buckets[item[0]].append(item) + + for bucket in buckets.values(): + routing_key, _ = bucket[0] + body = [item[1] for item in bucket] + + while True: + try: + reddit_channel.basic_publish( + exchange='reddit', + routing_key=routing_key, + body=json.dumps(body, separators=(',', ':'), ensure_ascii=False, sort_keys=True) + ) + logger.debug("RabbitMQ: published %d items (%s)" % (len(body), routing_key)) + break + except Exception as e: + logger.error(str(e)) + time.sleep(0.5) + + def publish(thing): thing_type = type(thing).__name__.lower() j = serialize(thing) post_process(j) - reddit_channel.basic_publish( - exchange='reddit', - routing_key="%s.%s" % (thing_type, str(thing.subreddit).lower()), - body=json.dumps(j) - ) + _publish_buffered([("%s.%s" % (thing_type, str(thing.subreddit).lower()), j)]) def publish_worker(q: Queue): logger.info("Started publish thread") while True: thing = q.get() - try: - while True: - try: - publish(thing) - break - except Exception as e: - connect() - logger.error(str(e) + ": " + traceback.format_exc()) - finally: - q.task_done() + publish(thing) + q.task_done() def mon_worker(q: Queue): @@ -195,8 +212,6 @@ if __name__ == "__main__": logger.info("Starting app @%s" % sys.argv[1]) - if MONITORING: - monitoring.init() pub_queue = Queue() try: publish_thread = threading.Thread(target=publish_worker, args=(pub_queue,))