From 2aefd2c3a2b73d3ab38c5b6ba9642e6187aebea9 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 19 Dec 2019 09:47:22 -0500 Subject: [PATCH] buffer publish --- run.py | 74 ++++++++++++++++++++++++++++++++++++--------------------- util.py | 2 ++ 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/run.py b/run.py index cd8dcf9..26a58c8 100644 --- a/run.py +++ b/run.py @@ -4,11 +4,13 @@ import sqlite3 import sys import time import traceback +from collections import defaultdict from datetime import datetime from queue import Queue from threading import Thread import pika +from hexlib.misc import buffered from hexlib.monitoring import Monitoring from chan.chan import CHANS @@ -18,7 +20,6 @@ from util import logger, Web MONITORING = True BYPASS_RPS = False - DBNAME = "chan_feed" if MONITORING: influxdb = Monitoring(DBNAME, logger=logger, batch_size=100, flush_on_exit=True) @@ -160,37 +161,56 @@ def publish_worker(queue: Queue, helper, p): queue.task_done() +@buffered(batch_size=300, flush_on_exit=True) +def _publish_buffered(items): + if not items: + return + + buckets = defaultdict(list) + for item in items: + buckets[item[1]].append(item) + + for bucket in buckets.values(): + channel, routing_key, _ = bucket[0] + body = [item[2] for item in bucket] + + while True: + try: + channel.basic_publish( + exchange='chan', + routing_key=routing_key, + body=json.dumps(body, separators=(',', ':'), ensure_ascii=False, sort_keys=True) + ) + logger.debug("RabbitMQ: published %d items" % len(body)) + break + except Exception as e: + # logger.debug(traceback.format_exc()) + logger.error(str(e)) + time.sleep(0.5) + channel = connect() + + @once def publish(item, board, helper, channel, web): - item_type = helper.item_type(item) post_process(item, board, helper, web) - while True: - try: - channel.basic_publish( - exchange='chan', - routing_key="%s.%s.%s" % (chan, item_type, board), - body=json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True) - ) + item_type = helper.item_type(item) + routing_key = "%s.%s.%s" % (chan, item_type, board) - if MONITORING: - distance = datetime.utcnow() - datetime.utcfromtimestamp(helper.item_mtime(item)) - influxdb.log([{ - "measurement": chan, - "time": str(datetime.utcnow()), - "tags": { - "board": board - }, - "fields": { - "distance": distance.total_seconds() - } - }]) - break - except Exception as e: - # logger.debug(traceback.format_exc()) - logger.error(str(e)) - time.sleep(0.5) - channel = connect() + _publish_buffered([(channel, routing_key, item)]) + + if MONITORING: + distance = datetime.utcnow() - datetime.utcfromtimestamp(helper.item_mtime(item)) + influxdb.log([{ + "measurement": chan, + "time": str(datetime.utcnow()), + "tags": { + "board": board + }, + "fields": { + "distance": distance.total_seconds() + } + }]) def connect(): diff --git a/util.py b/util.py index 72d6a2b..b501173 100644 --- a/util.py +++ b/util.py @@ -44,6 +44,8 @@ class Web: if self._get_method: return self._get_method(url, **kwargs) return self.session.get(url, **kwargs) + except KeyboardInterrupt as e: + raise e except Exception as e: logger.warning("Error with request %s: %s" % (url, str(e))) raise Exception("Gave up request after maximum number of retries")