Bug fixes

This commit is contained in:
simon987 2019-08-26 21:36:21 -04:00
parent 37cbd70877
commit 6424506e06
3 changed files with 28 additions and 14 deletions

View File

@ -6,7 +6,7 @@ INTERNAL_RE = re.compile(r"^https?://(reddit.com|redd.it|old.reddit.com|www.redd
def post_process(thing):
thing["v"] = 1.0
thing["v"] = 1.1
thing["urls"] = []

View File

@ -1,4 +1,5 @@
# Script to retroactively publish reddit items from pushshift for a specific subreddit
import time
import traceback
import psaw
@ -22,5 +23,6 @@ else:
for item in gen:
try:
publish(item)
time.sleep(0.2)
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())

38
run.py
View File

@ -14,7 +14,6 @@ from queue import Queue
import pika
import praw
import psaw
from praw.endpoints import API_PATH
from praw.models import Comment
@ -31,13 +30,18 @@ reddit._core._rate_limiter = GoodRateLimiter()
logger = logging.getLogger("default")
logger.setLevel(logging.DEBUG)
rabbit = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
reddit_channel = rabbit.channel()
reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic")
def connect():
global reddit_channel
rabbit = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
reddit_channel = rabbit.channel()
reddit_channel.exchange_declare(exchange="reddit", exchange_type="topic")
connect()
REALTIME_DELAY = timedelta(seconds=60)
MONITORING = True
MONITORING = False
formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
file_handler = FileHandler("reddit_feed.log")
@ -51,7 +55,8 @@ logger.addHandler(StreamHandler(sys.stdout))
def serialize(thing):
if isinstance(thing, Comment) or type(thing).__name__ == "comment":
return {
"author": str(thing.author),
"_id": int(thing.id, 36),
"author": str(thing.author) if thing.author is not None else None,
"author_flair_text": thing.author_flair_text,
"body": thing.body,
"selftext_html": thing.body_html if hasattr(thing, "body_html") else None,
@ -71,6 +76,7 @@ def serialize(thing):
}
else:
return {
"_id": int(thing.id, 36),
"author": str(thing.author),
"author_flair_text": thing.author_flair_text,
"created": int(thing.created),
@ -85,7 +91,6 @@ def serialize(thing):
"score": thing.score,
"selftext": thing.selftext if hasattr(thing, "selftext") else None,
"selftext_html": thing.selftext_html if hasattr(thing, "selftext_html") else None,
"stickied": thing.stickied,
"subreddit": str(thing.subreddit),
"subreddit_id": thing.subreddit_id,
"title": thing.title,
@ -110,12 +115,15 @@ def publish(thing):
def publish_worker(q: Queue):
logger.info("Started publish thread")
while True:
thing = q.get()
try:
thing = q.get()
publish(thing)
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())
while True:
try:
publish(thing)
break
except Exception as e:
connect()
logger.error(str(e) + ": " + traceback.format_exc())
finally:
q.task_done()
@ -146,6 +154,9 @@ def stream_thing(prefix, publish_queue, mon_queue=None, start_id=None):
params = {"id": ",".join(chunk)}
results = reddit.get(API_PATH["info"], params=params)
if not results:
logger.warning("No results!")
continue
post_time = datetime.utcfromtimestamp(results[0].created_utc)
now = datetime.utcnow()
distance = now - post_time
@ -184,7 +195,8 @@ if __name__ == "__main__":
logger.info("Starting app @%s" % sys.argv[1])
monitoring.init()
if MONITORING:
monitoring.init()
pub_queue = Queue()
while True:
try: