diff --git a/requirements.txt b/requirements.txt index d8aa751..f7b79fa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ praw -pika +redis influxdb psaw git+git://github.com/simon987/hexlib.git diff --git a/retropublish.py b/retropublish.py index 433d133..8baf806 100644 --- a/retropublish.py +++ b/retropublish.py @@ -9,6 +9,7 @@ from run import publish, logger if len(sys.argv) != 3: print("Usage: ./retropublish.py post|comment subreddit") + quit(0) item_type = sys.argv[1] subreddit = sys.argv[2] @@ -23,6 +24,5 @@ else: for item in gen: try: publish(item) - time.sleep(0.2) except Exception as e: logger.error(str(e) + ": " + traceback.format_exc()) diff --git a/run.py b/run.py index e242035..d257776 100755 --- a/run.py +++ b/run.py @@ -12,8 +12,8 @@ from datetime import datetime, timedelta from itertools import islice from logging import FileHandler, StreamHandler from queue import Queue +import redis -import pika import praw from hexlib.misc import buffered from hexlib.monitoring import Monitoring @@ -34,16 +34,14 @@ logger.setLevel(logging.DEBUG) def connect(): - global reddit_channel - rabbit = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) - reddit_channel = rabbit.channel() - reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic") + global r + r = redis.Redis(host='localhost', port=6379, db=0) connect() REALTIME_DELAY = timedelta(seconds=60) -MONITORING = True +MONITORING = False if MONITORING: monitoring = Monitoring("reddit_feed", logger=logger, batch_size=50, flush_on_exit=True) @@ -105,36 +103,22 @@ def serialize(thing): } -@buffered(batch_size=5000, flush_on_exit=True) -def _publish_buffered(items): - buckets = defaultdict(list) - for item in items: - buckets[item[0]].append(item) - - for bucket in buckets.values(): - routing_key, _ = bucket[0] - body = [item[1] for item in bucket] - - while True: - try: - reddit_channel.basic_publish( - exchange='reddit', - routing_key=routing_key, - body=json.dumps(body, separators=(',', ':'), ensure_ascii=False, sort_keys=True) - ) - logger.debug("RabbitMQ: published %d items (%s)" % (len(body), routing_key)) - break - except Exception as e: - logger.error(str(e)) - time.sleep(0.5) - - def publish(thing): thing_type = type(thing).__name__.lower() j = serialize(thing) post_process(j) + routing_key = "%s.%s" % (thing_type, str(thing.subreddit).lower()) - _publish_buffered([("%s.%s" % (thing_type, str(thing.subreddit).lower()), j)]) + while True: + try: + r.rpush( + "q.reddit." + routing_key, + json.dumps(j, separators=(',', ':'), ensure_ascii=False, sort_keys=True) + ) + break + except Exception as e: + logger.error(str(e)) + time.sleep(0.5) def publish_worker(q: Queue):