buffer publish

This commit is contained in:
simon 2019-12-19 09:47:22 -05:00
parent 7a360de2ab
commit 2aefd2c3a2
2 changed files with 49 additions and 27 deletions

74
run.py
View File

@ -4,11 +4,13 @@ import sqlite3
import sys
import time
import traceback
from collections import defaultdict
from datetime import datetime
from queue import Queue
from threading import Thread
import pika
from hexlib.misc import buffered
from hexlib.monitoring import Monitoring
from chan.chan import CHANS
@ -18,7 +20,6 @@ from util import logger, Web
MONITORING = True
BYPASS_RPS = False
DBNAME = "chan_feed"
if MONITORING:
influxdb = Monitoring(DBNAME, logger=logger, batch_size=100, flush_on_exit=True)
@ -160,37 +161,56 @@ def publish_worker(queue: Queue, helper, p):
queue.task_done()
@buffered(batch_size=300, flush_on_exit=True)
def _publish_buffered(items):
if not items:
return
buckets = defaultdict(list)
for item in items:
buckets[item[1]].append(item)
for bucket in buckets.values():
channel, routing_key, _ = bucket[0]
body = [item[2] for item in bucket]
while True:
try:
channel.basic_publish(
exchange='chan',
routing_key=routing_key,
body=json.dumps(body, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
)
logger.debug("RabbitMQ: published %d items" % len(body))
break
except Exception as e:
# logger.debug(traceback.format_exc())
logger.error(str(e))
time.sleep(0.5)
channel = connect()
@once
def publish(item, board, helper, channel, web):
item_type = helper.item_type(item)
post_process(item, board, helper, web)
while True:
try:
channel.basic_publish(
exchange='chan',
routing_key="%s.%s.%s" % (chan, item_type, board),
body=json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
)
item_type = helper.item_type(item)
routing_key = "%s.%s.%s" % (chan, item_type, board)
if MONITORING:
distance = datetime.utcnow() - datetime.utcfromtimestamp(helper.item_mtime(item))
influxdb.log([{
"measurement": chan,
"time": str(datetime.utcnow()),
"tags": {
"board": board
},
"fields": {
"distance": distance.total_seconds()
}
}])
break
except Exception as e:
# logger.debug(traceback.format_exc())
logger.error(str(e))
time.sleep(0.5)
channel = connect()
_publish_buffered([(channel, routing_key, item)])
if MONITORING:
distance = datetime.utcnow() - datetime.utcfromtimestamp(helper.item_mtime(item))
influxdb.log([{
"measurement": chan,
"time": str(datetime.utcnow()),
"tags": {
"board": board
},
"fields": {
"distance": distance.total_seconds()
}
}])
def connect():

View File

@ -44,6 +44,8 @@ class Web:
if self._get_method:
return self._get_method(url, **kwargs)
return self.session.get(url, **kwargs)
except KeyboardInterrupt as e:
raise e
except Exception as e:
logger.warning("Error with request %s: %s" % (url, str(e)))
raise Exception("Gave up request after maximum number of retries")