diff --git a/post_process.py b/post_process.py index ca6305b..c23ad93 100644 --- a/post_process.py +++ b/post_process.py @@ -6,7 +6,7 @@ INTERNAL_RE = re.compile(r"^https?://(reddit.com|redd.it|old.reddit.com|www.redd def post_process(thing): - thing["v"] = 1.0 + thing["v"] = 1.1 thing["urls"] = [] diff --git a/retropublish.py b/retropublish.py index 01dd2cb..433d133 100644 --- a/retropublish.py +++ b/retropublish.py @@ -1,4 +1,5 @@ # Script to retroactively publish reddit items from pushshift for a specific subreddit +import time import traceback import psaw @@ -22,5 +23,6 @@ else: for item in gen: try: publish(item) + time.sleep(0.2) except Exception as e: logger.error(str(e) + ": " + traceback.format_exc()) diff --git a/run.py b/run.py index c6028c4..1c2f091 100755 --- a/run.py +++ b/run.py @@ -14,7 +14,6 @@ from queue import Queue import pika import praw -import psaw from praw.endpoints import API_PATH from praw.models import Comment @@ -31,13 +30,18 @@ reddit._core._rate_limiter = GoodRateLimiter() logger = logging.getLogger("default") logger.setLevel(logging.DEBUG) -rabbit = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) -reddit_channel = rabbit.channel() -reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic") +def connect(): + global reddit_channel + rabbit = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) + reddit_channel = rabbit.channel() + reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic") + + +connect() REALTIME_DELAY = timedelta(seconds=60) -MONITORING = True +MONITORING = False formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') file_handler = FileHandler("reddit_feed.log") @@ -51,7 +55,8 @@ logger.addHandler(StreamHandler(sys.stdout)) def serialize(thing): if isinstance(thing, Comment) or type(thing).__name__ == "comment": return { - "author": str(thing.author), + "_id": int(thing.id, 36), + "author": str(thing.author) if thing.author is not None else None, "author_flair_text": thing.author_flair_text, "body": thing.body, "selftext_html": thing.body_html if hasattr(thing, "body_html") else None, @@ -71,6 +76,7 @@ def serialize(thing): } else: return { + "_id": int(thing.id, 36), "author": str(thing.author), "author_flair_text": thing.author_flair_text, "created": int(thing.created), @@ -85,7 +91,6 @@ def serialize(thing): "score": thing.score, "selftext": thing.selftext if hasattr(thing, "selftext") else None, "selftext_html": thing.selftext_html if hasattr(thing, "selftext_html") else None, - "stickied": thing.stickied, "subreddit": str(thing.subreddit), "subreddit_id": thing.subreddit_id, "title": thing.title, @@ -110,12 +115,15 @@ def publish(thing): def publish_worker(q: Queue): logger.info("Started publish thread") while True: + thing = q.get() try: - thing = q.get() - publish(thing) - - except Exception as e: - logger.error(str(e) + ": " + traceback.format_exc()) + while True: + try: + publish(thing) + break + except Exception as e: + connect() + logger.error(str(e) + ": " + traceback.format_exc()) finally: q.task_done() @@ -146,6 +154,9 @@ def stream_thing(prefix, publish_queue, mon_queue=None, start_id=None): params = {"id": ",".join(chunk)} results = reddit.get(API_PATH["info"], params=params) + if not results: + logger.warning("No results!") + continue post_time = datetime.utcfromtimestamp(results[0].created_utc) now = datetime.utcnow() distance = now - post_time @@ -184,7 +195,8 @@ if __name__ == "__main__": logger.info("Starting app @%s" % sys.argv[1]) - monitoring.init() + if MONITORING: + monitoring.init() pub_queue = Queue() while True: try: