Add monitoring, bug fixes

This commit is contained in:
Simon Fortier 2019-08-09 09:21:09 -04:00
parent dee01d13ee
commit f664a6f0df
4 changed files with 126 additions and 10 deletions

20
monitoring.py Normal file
View File

@ -0,0 +1,20 @@
from influxdb import InfluxDBClient
client = InfluxDBClient('localhost', 8086, 'root', 'root', 'reddit_feed')
def init():
db_exists = False
for db in client.get_list_database():
if db["name"] == "reddit_feed":
db_exists = True
break
if not db_exists:
client.create_database("reddit_feed")
def log(event):
client.write_points(event)

56
rate_limiter.py Normal file
View File

@ -0,0 +1,56 @@
"""Modified version of praw's rate limiter that can reliably use all of the 60 allowed requests per second."""
import time
class GoodRateLimiter(object):
def __init__(self):
self.remaining = None
self.next_request_timestamp = None
self.reset_timestamp = None
self.used = None
def call(self, request_function, set_header_callback, *args, **kwargs):
self.delay()
kwargs['headers'] = set_header_callback()
response = request_function(*args, **kwargs)
self.update(response.headers)
return response
def delay(self):
if self.next_request_timestamp is None:
return
sleep_seconds = self.next_request_timestamp - time.time()
if sleep_seconds <= 0:
return
time.sleep(sleep_seconds)
def update(self, response_headers):
if 'x-ratelimit-remaining' not in response_headers:
if self.remaining is not None:
self.remaining -= 1
self.used += 1
return
now = time.time()
prev_remaining = self.remaining
seconds_to_reset = int(response_headers['x-ratelimit-reset'])
self.remaining = float(response_headers['x-ratelimit-remaining'])
self.used = int(response_headers['x-ratelimit-used'])
self.reset_timestamp = now + seconds_to_reset
if self.remaining <= 0:
self.next_request_timestamp = self.reset_timestamp
return
if prev_remaining is not None and prev_remaining > self.remaining:
estimated_clients = prev_remaining - self.remaining
else:
estimated_clients = 1.0
self.next_request_timestamp = min(
self.reset_timestamp,
now + (estimated_clients * seconds_to_reset / self.remaining)
if (seconds_to_reset > self.remaining) and self.remaining > 5 else 0
)

View File

@ -1,2 +1,3 @@
praw
pika
pika
influxdb

57
run.py
View File

@ -15,15 +15,20 @@ import praw
from praw.endpoints import API_PATH
from praw.models import Comment
import monitoring
from rate_limiter import GoodRateLimiter
from util import update_cursor, read_cursor, reddit_ids
reddit_t3 = praw.Reddit('archivist-bot')
reddit_t1 = praw.Reddit('archivist-bot')
reddit = praw.Reddit('archivist-bot')
# Fix praw's default rate limiter
reddit._core._rate_limiter = GoodRateLimiter()
logger = logging.getLogger("default")
logger.setLevel(logging.DEBUG)
REALTIME_DELAY = timedelta(seconds=60)
MONITORING = True
formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
file_handler = FileHandler("reddit_feed.log")
@ -39,7 +44,6 @@ def serialize(thing):
return json.dumps({
"author": str(thing.author),
"author_flair_text": thing.author_flair_text,
"author_fullname": thing.author_fullname if hasattr(thing, "author_fullname") else None,
"body": thing.body,
"body_html": thing.body_html,
"controversiality": thing.controversiality,
@ -65,7 +69,6 @@ def serialize(thing):
"archived": thing.archived,
"author": str(thing.author),
"author_flair_text": thing.author_flair_text,
"author_fullname": thing.author_fullname if hasattr(thing, "author_fullname") else None,
"category": thing.category,
"comment_limit": thing.comment_limit,
"created": int(thing.created),
@ -143,7 +146,20 @@ def publish_worker(q: Queue):
q.task_done()
def stream_thing(prefix, publish_queue, reddit, start_id=None):
def mon_worker(q: Queue):
logger.info("Started monitoring thread")
while True:
try:
event = q.get()
monitoring.log(event)
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())
finally:
q.task_done()
def stream_thing(prefix, publish_queue, mon_queue=None, start_id=None):
if start_id is None:
start_id = read_cursor(prefix)
@ -172,24 +188,47 @@ def stream_thing(prefix, publish_queue, reddit, start_id=None):
for result in results:
publish_queue.put(result)
if MONITORING:
mon_queue.put([{
"measurement": "reddit",
"time": str(datetime.utcnow()),
"tags": {
"item_type": prefix,
},
"fields": {
"item_count": len(results),
"distance": distance.total_seconds()
}
}])
if __name__ == "__main__":
queue = Queue()
if len(sys.argv) < 2:
print("You must specify RabbitMQ host!")
quit(1)
logger.info("Starting app @%s" % sys.argv[1])
monitoring.init()
pub_queue = Queue()
rabbit = pika.BlockingConnection(pika.ConnectionParameters(host=sys.argv[1]))
reddit_channel = rabbit.channel()
reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic")
while True:
try:
comment_thread = threading.Thread(target=stream_thing, args=("t1_", queue, reddit_t1))
post_thread = threading.Thread(target=stream_thing, args=("t3_", queue, reddit_t3))
publish_thread = threading.Thread(target=publish_worker, args=(queue,))
publish_thread = threading.Thread(target=publish_worker, args=(pub_queue,))
if MONITORING:
monitoring_queue = Queue()
log_thread = threading.Thread(target=mon_worker, args=(monitoring_queue,))
log_thread.start()
comment_thread = threading.Thread(target=stream_thing, args=("t1_", pub_queue, monitoring_queue))
post_thread = threading.Thread(target=stream_thing, args=("t3_", pub_queue, monitoring_queue))
else:
comment_thread = threading.Thread(target=stream_thing, args=("t1_", pub_queue))
post_thread = threading.Thread(target=stream_thing, args=("t3_", pub_queue))
comment_thread.start()
post_thread.start()