reddit_feed/run.py
2019-08-09 10:16:36 -04:00

243 lines
7.9 KiB
Python
Executable File

#!/bin/env python
import datetime
import json
import logging
import sys
import threading
import time
import traceback
from datetime import datetime, timedelta
from itertools import islice
from logging import FileHandler, StreamHandler
from queue import Queue
import pika
import praw
from praw.endpoints import API_PATH
from praw.models import Comment
import monitoring
from rate_limiter import GoodRateLimiter
from util import update_cursor, read_cursor, reddit_ids
reddit = praw.Reddit('archivist-bot')
# Fix praw's default rate limiter
reddit._core._rate_limiter = GoodRateLimiter()
logger = logging.getLogger("default")
logger.setLevel(logging.DEBUG)
REALTIME_DELAY = timedelta(seconds=60)
MONITORING = True
formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
file_handler = FileHandler("reddit_feed.log")
file_handler.setFormatter(formatter)
for h in logger.handlers:
logger.removeHandler(h)
logger.addHandler(file_handler)
logger.addHandler(StreamHandler(sys.stdout))
def serialize(thing):
if isinstance(thing, Comment):
return json.dumps({
"author": str(thing.author),
"author_flair_text": thing.author_flair_text,
"body": thing.body,
"body_html": thing.body_html,
"controversiality": thing.controversiality,
"created": int(thing.created),
"created_utc": int(thing.created_utc),
"downs": thing.downs,
"edited": thing.edited,
"gilded": thing.gilded,
"gildings": thing.gildings,
"id": thing.id,
"link_id": thing.link_id,
"name": thing.name,
"parent_id": thing.parent_id,
"permalink": thing.permalink,
"score": thing.score,
"subreddit": str(thing.subreddit),
"subreddit_id": thing.subreddit_id,
"subreddit_type": thing.subreddit_type,
"ups": thing.ups,
})
else:
return json.dumps({
"archived": thing.archived,
"author": str(thing.author),
"author_flair_text": thing.author_flair_text,
"category": thing.category,
"comment_limit": thing.comment_limit,
"created": int(thing.created),
"created_utc": int(thing.created_utc),
"discussion_type": thing.discussion_type,
"domain": thing.domain,
"downs": thing.downs,
"edited": int(thing.edited),
"gilded": thing.gilded,
"gildings": thing.gildings,
"id": thing.id,
"is_crosspostable": thing.is_crosspostable,
"is_meta": thing.is_meta,
"is_original_content": thing.is_original_content,
"is_reddit_media_domain": thing.is_reddit_media_domain,
"is_robot_indexable": thing.is_robot_indexable,
"is_self": thing.is_self,
"is_video": thing.is_video,
"likes": thing.likes,
"link_flair_text": thing.link_flair_text,
"locked": thing.locked,
"media": thing.media,
"media_embed": thing.media_embed,
"media_only": thing.media_only,
"name": thing.name,
"num_comments": thing.num_comments,
"num_crossposts": thing.num_crossposts,
"num_reports": thing.num_reports,
"over_18": thing.over_18,
"parent_whitelist_status": thing.parent_whitelist_status,
"permalink": thing.permalink,
"pinned": thing.pinned,
"quarantine": thing.quarantine,
"score": thing.score,
"secure_media": thing.secure_media,
"secure_media_embed": thing.secure_media_embed,
"selftext": thing.selftext,
"selftext_html": thing.selftext_html,
"spoiler": thing.spoiler,
"stickied": thing.stickied,
"subreddit": str(thing.subreddit),
"subreddit_id": thing.subreddit_id,
"subreddit_subscribers": thing.subreddit_subscribers,
"subreddit_type": thing.subreddit_type,
"thumbnail": thing.thumbnail,
"thumbnail_height": thing.thumbnail_height,
"thumbnail_width": thing.thumbnail_width,
"title": thing.title,
"ups": thing.ups,
"url": thing.url,
})
def publish(thing):
thing_type = type(thing).__name__.lower()
j = serialize(thing)
reddit_channel.basic_publish(
exchange='reddit',
routing_key="%s.%s" % (thing_type, str(thing.subreddit)),
body=j
)
def publish_worker(q: Queue):
logger.info("Started publish thread")
while True:
try:
thing = q.get()
publish(thing)
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())
finally:
q.task_done()
def mon_worker(q: Queue):
logger.info("Started monitoring thread")
while True:
try:
event = q.get()
monitoring.log(event)
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())
finally:
q.task_done()
def stream_thing(prefix, publish_queue, mon_queue=None, start_id=None):
if start_id is None:
start_id = read_cursor(prefix)
logger.info("Starting stream for %s at cursor %s" % (prefix, start_id))
iterable = reddit_ids(prefix, start_id)
while True:
chunk = list(islice(iterable, 100))
params = {"id": ",".join(chunk)}
results = reddit.get(API_PATH["info"], params=params)
post_time = datetime.utcfromtimestamp(results[0].created_utc)
now = datetime.utcnow()
distance = now - post_time
logger.debug("[+%d] (%s) We are %s away from realtime (%s)" %
(len(results), prefix, distance, datetime.fromtimestamp(results[0].created_utc)))
if distance < REALTIME_DELAY:
sleep_time = (REALTIME_DELAY - distance).total_seconds()
logger.info("Sleeping %s seconds" % sleep_time)
time.sleep(sleep_time)
update_cursor(results[0].id, prefix)
for result in results:
publish_queue.put(result)
if MONITORING:
mon_queue.put([{
"measurement": "reddit",
"time": str(datetime.utcnow()),
"tags": {
"item_type": prefix,
},
"fields": {
"item_count": len(results),
"distance": distance.total_seconds()
}
}])
if __name__ == "__main__":
if len(sys.argv) < 2:
print("You must specify RabbitMQ host!")
quit(1)
logger.info("Starting app @%s" % sys.argv[1])
monitoring.init()
pub_queue = Queue()
rabbit = pika.BlockingConnection(pika.ConnectionParameters(host=sys.argv[1]))
reddit_channel = rabbit.channel()
reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic")
while True:
try:
publish_thread = threading.Thread(target=publish_worker, args=(pub_queue,))
if MONITORING:
monitoring_queue = Queue()
log_thread = threading.Thread(target=mon_worker, args=(monitoring_queue,))
log_thread.start()
comment_thread = threading.Thread(target=stream_thing, args=("t1_", pub_queue, monitoring_queue))
post_thread = threading.Thread(target=stream_thing, args=("t3_", pub_queue, monitoring_queue))
else:
comment_thread = threading.Thread(target=stream_thing, args=("t1_", pub_queue))
post_thread = threading.Thread(target=stream_thing, args=("t3_", pub_queue))
comment_thread.start()
post_thread.start()
publish_thread.start()
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())
while True:
time.sleep(10)