diff --git a/requirements.txt b/requirements.txt index 40e6508..75ee2ac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ praw pika influxdb +psaw \ No newline at end of file diff --git a/retropublish.py b/retropublish.py new file mode 100644 index 0000000..01dd2cb --- /dev/null +++ b/retropublish.py @@ -0,0 +1,26 @@ +# Script to retroactively publish reddit items from pushshift for a specific subreddit +import traceback + +import psaw +import sys + +from run import publish, logger + +if len(sys.argv) != 3: + print("Usage: ./retropublish.py post|comment subreddit") + +item_type = sys.argv[1] +subreddit = sys.argv[2] + +p = psaw.PushshiftAPI() + +if item_type == "post": + gen = p.search_submissions(subreddit=subreddit, limit=100000000) +else: + gen = p.search_comments(subreddit=subreddit, limit=1000000000) + +for item in gen: + try: + publish(item) + except Exception as e: + logger.error(str(e) + ": " + traceback.format_exc()) diff --git a/run.py b/run.py index 6dc7443..dc99cc9 100755 --- a/run.py +++ b/run.py @@ -30,6 +30,11 @@ reddit._core._rate_limiter = GoodRateLimiter() logger = logging.getLogger("default") logger.setLevel(logging.DEBUG) +rabbit = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) +reddit_channel = rabbit.channel() +reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic") + + REALTIME_DELAY = timedelta(seconds=60) MONITORING = True @@ -48,14 +53,11 @@ def serialize(thing): "author": str(thing.author), "author_flair_text": thing.author_flair_text, "body": thing.body, - "body_html": thing.body_html, + "selftext_html": thing.body_html if hasattr(thing, "body_html") else None, "controversiality": thing.controversiality, "created": int(thing.created), "created_utc": int(thing.created_utc), - "downs": thing.downs, - "edited": thing.edited, - "gilded": thing.gilded, - "gildings": thing.gildings, + "downs": thing.downs if hasattr(thing, "downs") else 0, "id": thing.id, "link_id": thing.link_id, "name": thing.name, @@ -69,19 +71,11 @@ def serialize(thing): } else: return { - "archived": thing.archived, "author": str(thing.author), "author_flair_text": thing.author_flair_text, - "category": thing.category, - "comment_limit": thing.comment_limit, "created": int(thing.created), "created_utc": int(thing.created_utc), - "discussion_type": thing.discussion_type, "domain": thing.domain, - "downs": thing.downs, - "edited": int(thing.edited), - "gilded": thing.gilded, - "gildings": thing.gildings, "id": thing.id, "is_crosspostable": thing.is_crosspostable, "is_meta": thing.is_meta, @@ -90,37 +84,26 @@ def serialize(thing): "is_robot_indexable": thing.is_robot_indexable, "is_self": thing.is_self, "is_video": thing.is_video, - "likes": thing.likes, - "link_flair_text": thing.link_flair_text, + "link_flair_text": thing.link_flair_text if hasattr(thing, "link_flair_text") else None, "locked": thing.locked, - "media": thing.media, - "media_embed": thing.media_embed, - "media_only": thing.media_only, - "name": thing.name, "num_comments": thing.num_comments, "num_crossposts": thing.num_crossposts, - "num_reports": thing.num_reports, "over_18": thing.over_18, "parent_whitelist_status": thing.parent_whitelist_status, "permalink": thing.permalink, "pinned": thing.pinned, - "quarantine": thing.quarantine, "score": thing.score, - "secure_media": thing.secure_media, - "secure_media_embed": thing.secure_media_embed, "selftext": thing.selftext, - "selftext_html": thing.selftext_html, + "selftext_html": thing.selftext_html if hasattr(thing, "selftext_html") else None, "spoiler": thing.spoiler, "stickied": thing.stickied, "subreddit": str(thing.subreddit), "subreddit_id": thing.subreddit_id, "subreddit_subscribers": thing.subreddit_subscribers, "subreddit_type": thing.subreddit_type, - "thumbnail": thing.thumbnail, - "thumbnail_height": thing.thumbnail_height, - "thumbnail_width": thing.thumbnail_width, "title": thing.title, - "ups": thing.ups, + "ups": thing.ups if hasattr(thing, "ups") else 0, + "downs": thing.downs if hasattr(thing, "downs") else 0, "url": thing.url, } @@ -216,10 +199,6 @@ if __name__ == "__main__": monitoring.init() pub_queue = Queue() - rabbit = pika.BlockingConnection(pika.ConnectionParameters(host=sys.argv[1])) - reddit_channel = rabbit.channel() - reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic") - while True: try: publish_thread = threading.Thread(target=publish_worker, args=(pub_queue,))