Add post-process pass

This commit is contained in:
simon987 2019-08-12 09:19:46 -04:00
parent 04befa5e0e
commit 2f08467a83
3 changed files with 54 additions and 7 deletions

View File

@ -1,6 +1,6 @@
from influxdb import InfluxDBClient from influxdb import InfluxDBClient
client = InfluxDBClient('localhost', 8086, 'root', 'root', 'reddit_feed') client = InfluxDBClient("localhost", 8086, "root", "root", "reddit_feed")
def init(): def init():

45
post_process.py Normal file
View File

@ -0,0 +1,45 @@
import re
LINK_RE = re.compile(r'href="([^"]+)"')
INTERNAL_RE = re.compile(r"^https?://(reddit.com|redd.it|old.reddit.com|www.reddit.com|np.reddit.com)/(.*)")
def post_process(thing):
thing["v"] = 1.0
thing["urls"] = []
if "body_html" in thing and thing["body_html"]:
thing["urls"].extend(get_links_from_body(thing["body_html"]))
elif "selftext_html" in thing and thing["selftext_html"]:
thing["urls"].extend(get_links_from_body(thing["selftext_html"]))
if "url" in thing and thing["url"] and is_external(thing["url"]):
thing["urls"].append(thing["url"])
return thing
def get_links_from_body(body):
result = set()
for match in LINK_RE.finditer(body):
url = match.group(1)
if is_external(url):
result.add(url)
return list(result)
def is_external(url):
if INTERNAL_RE.match(url):
return False
if "message/compose" in url:
return False
if url.startswith(("/", "#", "</")):
return False
return True

14
run.py
View File

@ -18,6 +18,7 @@ from praw.endpoints import API_PATH
from praw.models import Comment from praw.models import Comment
import monitoring import monitoring
from post_process import post_process
from rate_limiter import GoodRateLimiter from rate_limiter import GoodRateLimiter
from util import update_cursor, read_cursor, reddit_ids from util import update_cursor, read_cursor, reddit_ids
@ -43,7 +44,7 @@ logger.addHandler(StreamHandler(sys.stdout))
def serialize(thing): def serialize(thing):
if isinstance(thing, Comment): if isinstance(thing, Comment):
return json.dumps({ return {
"author": str(thing.author), "author": str(thing.author),
"author_flair_text": thing.author_flair_text, "author_flair_text": thing.author_flair_text,
"body": thing.body, "body": thing.body,
@ -65,9 +66,9 @@ def serialize(thing):
"subreddit_id": thing.subreddit_id, "subreddit_id": thing.subreddit_id,
"subreddit_type": thing.subreddit_type, "subreddit_type": thing.subreddit_type,
"ups": thing.ups, "ups": thing.ups,
}) }
else: else:
return json.dumps({ return {
"archived": thing.archived, "archived": thing.archived,
"author": str(thing.author), "author": str(thing.author),
"author_flair_text": thing.author_flair_text, "author_flair_text": thing.author_flair_text,
@ -121,17 +122,18 @@ def serialize(thing):
"title": thing.title, "title": thing.title,
"ups": thing.ups, "ups": thing.ups,
"url": thing.url, "url": thing.url,
}) }
def publish(thing): def publish(thing):
thing_type = type(thing).__name__.lower() thing_type = type(thing).__name__.lower()
j = serialize(thing) j = serialize(thing)
post_process(j)
reddit_channel.basic_publish( reddit_channel.basic_publish(
exchange='reddit', exchange='reddit',
routing_key="%s.%s" % (thing_type, str(thing.subreddit)), routing_key="%s.%s" % (thing_type, str(thing.subreddit).lower()),
body=j body=json.dumps(j)
) )