buffer rabbitMQ & influxdb writes

This commit is contained in:
simon 2019-12-19 10:20:25 -05:00
parent cc4408dbbf
commit 3a3fe1f546
3 changed files with 36 additions and 40 deletions

View File

@ -1,20 +0,0 @@
from influxdb import InfluxDBClient
client = InfluxDBClient("localhost", 8086, "root", "root", "reddit_feed")
def init():
db_exists = False
for db in client.get_list_database():
if db["name"] == "reddit_feed":
db_exists = True
break
if not db_exists:
client.create_database("reddit_feed")
def log(event):
client.write_points(event)

View File

@ -2,3 +2,4 @@ praw
pika pika
influxdb influxdb
psaw psaw
git+git://github.com/simon987/hexlib.git

49
run.py
View File

@ -7,6 +7,7 @@ import sys
import threading import threading
import time import time
import traceback import traceback
from collections import defaultdict
from datetime import datetime, timedelta from datetime import datetime, timedelta
from itertools import islice from itertools import islice
from logging import FileHandler, StreamHandler from logging import FileHandler, StreamHandler
@ -14,10 +15,11 @@ from queue import Queue
import pika import pika
import praw import praw
from hexlib.misc import buffered
from hexlib.monitoring import Monitoring
from praw.endpoints import API_PATH from praw.endpoints import API_PATH
from praw.models import Comment from praw.models import Comment
import monitoring
from post_process import post_process from post_process import post_process
from rate_limiter import GoodRateLimiter from rate_limiter import GoodRateLimiter
from util import update_cursor, read_cursor, reddit_ids from util import update_cursor, read_cursor, reddit_ids
@ -41,7 +43,10 @@ def connect():
connect() connect()
REALTIME_DELAY = timedelta(seconds=60) REALTIME_DELAY = timedelta(seconds=60)
MONITORING = False MONITORING = True
if MONITORING:
monitoring = Monitoring("reddit_feed", logger=logger, batch_size=50, flush_on_exit=True)
formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
file_handler = FileHandler("reddit_feed.log") file_handler = FileHandler("reddit_feed.log")
@ -100,31 +105,43 @@ def serialize(thing):
} }
@buffered(batch_size=10000, flush_on_exit=True)
def _publish_buffered(items):
buckets = defaultdict(list)
for item in items:
buckets[item[0]].append(item)
for bucket in buckets.values():
routing_key, _ = bucket[0]
body = [item[1] for item in bucket]
while True:
try:
reddit_channel.basic_publish(
exchange='reddit',
routing_key=routing_key,
body=json.dumps(body, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
)
logger.debug("RabbitMQ: published %d items (%s)" % (len(body), routing_key))
break
except Exception as e:
logger.error(str(e))
time.sleep(0.5)
def publish(thing): def publish(thing):
thing_type = type(thing).__name__.lower() thing_type = type(thing).__name__.lower()
j = serialize(thing) j = serialize(thing)
post_process(j) post_process(j)
reddit_channel.basic_publish( _publish_buffered([("%s.%s" % (thing_type, str(thing.subreddit).lower()), j)])
exchange='reddit',
routing_key="%s.%s" % (thing_type, str(thing.subreddit).lower()),
body=json.dumps(j)
)
def publish_worker(q: Queue): def publish_worker(q: Queue):
logger.info("Started publish thread") logger.info("Started publish thread")
while True: while True:
thing = q.get() thing = q.get()
try:
while True:
try:
publish(thing) publish(thing)
break
except Exception as e:
connect()
logger.error(str(e) + ": " + traceback.format_exc())
finally:
q.task_done() q.task_done()
@ -195,8 +212,6 @@ if __name__ == "__main__":
logger.info("Starting app @%s" % sys.argv[1]) logger.info("Starting app @%s" % sys.argv[1])
if MONITORING:
monitoring.init()
pub_queue = Queue() pub_queue = Queue()
try: try:
publish_thread = threading.Thread(target=publish_worker, args=(pub_queue,)) publish_thread = threading.Thread(target=publish_worker, args=(pub_queue,))