diff --git a/monitoring.py b/monitoring.py new file mode 100644 index 0000000..f21647e --- /dev/null +++ b/monitoring.py @@ -0,0 +1,20 @@ +from influxdb import InfluxDBClient + +client = InfluxDBClient('localhost', 8086, 'root', 'root', 'reddit_feed') + + +def init(): + db_exists = False + for db in client.get_list_database(): + if db["name"] == "reddit_feed": + db_exists = True + break + + if not db_exists: + client.create_database("reddit_feed") + + +def log(event): + client.write_points(event) + + diff --git a/rate_limiter.py b/rate_limiter.py new file mode 100644 index 0000000..44a57b1 --- /dev/null +++ b/rate_limiter.py @@ -0,0 +1,56 @@ +"""Modified version of praw's rate limiter that can reliably use all of the 60 allowed requests per second.""" +import time + + +class GoodRateLimiter(object): + + def __init__(self): + self.remaining = None + self.next_request_timestamp = None + self.reset_timestamp = None + self.used = None + + def call(self, request_function, set_header_callback, *args, **kwargs): + self.delay() + kwargs['headers'] = set_header_callback() + response = request_function(*args, **kwargs) + self.update(response.headers) + return response + + def delay(self): + if self.next_request_timestamp is None: + return + sleep_seconds = self.next_request_timestamp - time.time() + if sleep_seconds <= 0: + return + time.sleep(sleep_seconds) + + def update(self, response_headers): + if 'x-ratelimit-remaining' not in response_headers: + if self.remaining is not None: + self.remaining -= 1 + self.used += 1 + return + + now = time.time() + prev_remaining = self.remaining + + seconds_to_reset = int(response_headers['x-ratelimit-reset']) + self.remaining = float(response_headers['x-ratelimit-remaining']) + self.used = int(response_headers['x-ratelimit-used']) + self.reset_timestamp = now + seconds_to_reset + + if self.remaining <= 0: + self.next_request_timestamp = self.reset_timestamp + return + + if prev_remaining is not None and prev_remaining > self.remaining: + estimated_clients = prev_remaining - self.remaining + else: + estimated_clients = 1.0 + + self.next_request_timestamp = min( + self.reset_timestamp, + now + (estimated_clients * seconds_to_reset / self.remaining) + if (seconds_to_reset > self.remaining) and self.remaining > 5 else 0 + ) diff --git a/requirements.txt b/requirements.txt index e6599bb..40e6508 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ praw -pika \ No newline at end of file +pika +influxdb diff --git a/run.py b/run.py index b9f2b20..62358d7 100644 --- a/run.py +++ b/run.py @@ -15,15 +15,20 @@ import praw from praw.endpoints import API_PATH from praw.models import Comment +import monitoring +from rate_limiter import GoodRateLimiter from util import update_cursor, read_cursor, reddit_ids -reddit_t3 = praw.Reddit('archivist-bot') -reddit_t1 = praw.Reddit('archivist-bot') +reddit = praw.Reddit('archivist-bot') + +# Fix praw's default rate limiter +reddit._core._rate_limiter = GoodRateLimiter() logger = logging.getLogger("default") logger.setLevel(logging.DEBUG) REALTIME_DELAY = timedelta(seconds=60) +MONITORING = True formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') file_handler = FileHandler("reddit_feed.log") @@ -39,7 +44,6 @@ def serialize(thing): return json.dumps({ "author": str(thing.author), "author_flair_text": thing.author_flair_text, - "author_fullname": thing.author_fullname if hasattr(thing, "author_fullname") else None, "body": thing.body, "body_html": thing.body_html, "controversiality": thing.controversiality, @@ -65,7 +69,6 @@ def serialize(thing): "archived": thing.archived, "author": str(thing.author), "author_flair_text": thing.author_flair_text, - "author_fullname": thing.author_fullname if hasattr(thing, "author_fullname") else None, "category": thing.category, "comment_limit": thing.comment_limit, "created": int(thing.created), @@ -143,7 +146,20 @@ def publish_worker(q: Queue): q.task_done() -def stream_thing(prefix, publish_queue, reddit, start_id=None): +def mon_worker(q: Queue): + logger.info("Started monitoring thread") + while True: + try: + event = q.get() + monitoring.log(event) + + except Exception as e: + logger.error(str(e) + ": " + traceback.format_exc()) + finally: + q.task_done() + + +def stream_thing(prefix, publish_queue, mon_queue=None, start_id=None): if start_id is None: start_id = read_cursor(prefix) @@ -172,24 +188,47 @@ def stream_thing(prefix, publish_queue, reddit, start_id=None): for result in results: publish_queue.put(result) + if MONITORING: + mon_queue.put([{ + "measurement": "reddit", + "time": str(datetime.utcnow()), + "tags": { + "item_type": prefix, + }, + "fields": { + "item_count": len(results), + "distance": distance.total_seconds() + } + }]) + if __name__ == "__main__": - queue = Queue() if len(sys.argv) < 2: print("You must specify RabbitMQ host!") quit(1) logger.info("Starting app @%s" % sys.argv[1]) + + monitoring.init() + pub_queue = Queue() rabbit = pika.BlockingConnection(pika.ConnectionParameters(host=sys.argv[1])) reddit_channel = rabbit.channel() reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic") while True: try: - comment_thread = threading.Thread(target=stream_thing, args=("t1_", queue, reddit_t1)) - post_thread = threading.Thread(target=stream_thing, args=("t3_", queue, reddit_t3)) - publish_thread = threading.Thread(target=publish_worker, args=(queue,)) + publish_thread = threading.Thread(target=publish_worker, args=(pub_queue,)) + if MONITORING: + monitoring_queue = Queue() + log_thread = threading.Thread(target=mon_worker, args=(monitoring_queue,)) + log_thread.start() + + comment_thread = threading.Thread(target=stream_thing, args=("t1_", pub_queue, monitoring_queue)) + post_thread = threading.Thread(target=stream_thing, args=("t3_", pub_queue, monitoring_queue)) + else: + comment_thread = threading.Thread(target=stream_thing, args=("t1_", pub_queue)) + post_thread = threading.Thread(target=stream_thing, args=("t3_", pub_queue)) comment_thread.start() post_thread.start()