From 9447463e566a5b2caf9ac87ddbe6d7434d76c311 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 5 Sep 2019 12:59:08 -0400 Subject: [PATCH] rename meta attributes, add 2ch.hk support, version bump --- .gitignore | 4 +- README.md | 2 +- chan.py | 105 ++++++++++++++++++++++++++++++++++++++++++++++- post_process.py | 31 +++++--------- requirements.txt | 5 +++ run.py | 35 +++++++--------- 6 files changed, 137 insertions(+), 45 deletions(-) create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore index ac6df84..bfccfda 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,5 @@ __pychache__/ *.pyc *.iml -*.db -*.log \ No newline at end of file +*.log +state.db* diff --git a/README.md b/README.md index d41a47f..9f40c5e 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ image boards and publishes serialised JSON to RabbitMQ for real-time ingest. Compatible image boards: 4chan, lainchan, uboachan, -22chan, wizchan, 1chan. +22chan, wizchan, 1chan, 2ch.hk. Can optionally push monitoring data to InfluxDB. Below is an example of Grafana being used to display it. diff --git a/chan.py b/chan.py index c2c8ba5..b4c61b9 100644 --- a/chan.py +++ b/chan.py @@ -1,3 +1,8 @@ +import json + +from post_process import get_links_from_body + + class ChanHelper: def __init__(self, db_id, base_url, image_url, thread_path, image_path, boards): self.db_id = db_id @@ -16,6 +21,85 @@ class ChanHelper: def posts_url(self, board, thread): return "%s%s%s%d.json" % (self._base_url, board, self._thread_path, thread) + @staticmethod + def item_id(item): + return item["no"] + + def item_urls(self, item, board): + urls = set() + + if "com" in item and item["com"]: + urls.update(get_links_from_body(item["com"])) + elif "sub" in item and item["sub"]: + urls.update(get_links_from_body(item["sub"])) + if "fsize" in item and item["fsize"]: + urls.add(self.image_url(board, item["tim"], item["ext"])) + + return list(urls) + + @staticmethod + def item_type(item): + return "thread" if "sub" in item else "post" + + @staticmethod + def thread_mtime(thread): + return thread["last_modified"] + + @staticmethod + def parse_threads_list(content): + j = json.loads(content) + for page in j: + for thread in page["threads"]: + yield thread + + @staticmethod + def parse_thread(content): + j = json.loads(content) + return j["posts"] + + +class RussianChanHelper(ChanHelper): + + @staticmethod + def item_id(item): + return int(item["num"]) + + @staticmethod + def parse_threads_list(content): + j = json.loads(content) + return j["threads"] + + @staticmethod + def parse_thread(content): + j = json.loads(content) + for thread in j["threads"]: + for post in thread["posts"]: + yield post + + @staticmethod + def thread_mtime(thread): + return thread["posts_count"] + + @staticmethod + def item_type(item): + return "thread" if "subject" in item and item["subject"] != "" else "post" + + def item_urls(self, item, board): + urls = set() + + if "comment" in item and item["comment"]: + urls.update(get_links_from_body(item["comment"])) + elif "subject" in item and item["subject"]: + urls.update(get_links_from_body(item["subject"])) + + if urls: + print(list(urls)) + + for file in item["files"]: + urls.add(self._base_url + file["path"]) + + return list(urls) + CHANS = { "4chan": ChanHelper( @@ -87,6 +171,25 @@ CHANS = { "/src/", [ "rails" - ] + ], + ), + "2chhk": RussianChanHelper( + 7, + "https://2ch.hk/", + "https://2ch.hk/", + "/res/", + "/src/", + [ + "d", "b", "o", "soc", "media", "r", "api", "rf", "int", + "po", "news", "hry", "au", "bi", "biz", "bo", "c", "em", + "fa", "fiz", "fl", "ftb", "hh", "hi", "me", "mg", "mlp", + "mo", "mov", "mu", "ne", "psy", "re", + "sci", "sf", "sn", "sp", "spc", "tv", "un", "w", "wh", + "wm", "wp", "zog", "de", "di", "diy", "mus", "pa", "p", + "wrk", "trv", "gd", "hw", "mobi", "pr", "ra", "s", "t", + "web", "bg", "cg", "gsg", "ruvn", "tes", "v", "vg", "wr", + "a", "fd", "ja", "ma", "vn", "fg", "fur", "gg", "ga", + "vape", "h", "ho", "hc", "e", "fet", "sex", "fag" + ], ) } diff --git a/post_process.py b/post_process.py index 3450f18..b0d9349 100644 --- a/post_process.py +++ b/post_process.py @@ -3,31 +3,20 @@ import re LINK_RE = re.compile(r"(https?://[\w\-_.]+\.[a-z]{2,4}([^\s<'\"]*|$))") -def post_process(thing, board, helper): - thing["v"] = 1.1 - thing["_id"] = int(thing["no"]) +def post_process(item, board, helper): + item["_v"] = 1.2 + item["_id"] = helper.item_id(item) - thing["board"] = board - thing["chan"] = helper.db_id + item["_board"] = board + item["_chan"] = helper.db_id - if "com" in thing and thing["com"]: - thing["urls"] = get_links_from_body(thing["com"]) - elif "sub" in thing and thing["sub"]: - thing["urls"] = get_links_from_body(thing["sub"]) - if "fsize" in thing and thing["fsize"]: - url = helper.image_url(board, thing["tim"], thing["ext"]) - if "urls" in thing: - thing["urls"].append(url) - else: - thing["urls"] = [url] - if "urls" not in thing: - thing["urls"] = [] + item["_urls"] = helper.item_urls(item, board) - return thing + return item def get_links_from_body(body): - result = set() + result = [] body = body \ .replace("", "") \ @@ -37,9 +26,9 @@ def get_links_from_body(body): for match in LINK_RE.finditer(body): url = match.group(1) if is_external(url): - result.add(url) + result.append(url) - return list(result) + return result def is_external(url): diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dbf739d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +requests +requests[socks] +stem +influxdb +pika diff --git a/run.py b/run.py index ada9639..ff52dc7 100644 --- a/run.py +++ b/run.py @@ -23,27 +23,22 @@ class ChanScanner: self.helper = helper self.state = ChanState() - def _fetch_threads(self, board): + def _threads(self, board): r = self.web.get(self.helper.threads_url(board)) if r.status_code == 200: - return r.json() + return self.helper.parse_threads_list(r.text) return [] def _fetch_posts(self, board, thread): r = self.web.get(self.helper.posts_url(board, thread)) if r.status_code == 200: - return r.json() - return {"posts": []} - - def _threads(self, board): - for page in self._fetch_threads(board): - for thread in page["threads"]: - yield thread + return self.helper.parse_thread(r.text) + return [] def _posts(self, board): - for thread in sorted(self._threads(board), key=lambda x: x["no"]): + for thread in self._threads(board): if self.state.has_new_posts(thread, self.helper): - for post in sorted(self._fetch_posts(board, thread["no"])["posts"], key=lambda x: x["no"]): + for post in self._fetch_posts(board, self.helper.item_id(thread)): yield post self.state.mark_thread_as_visited(thread, self.helper) @@ -55,9 +50,9 @@ class ChanScanner: def once(func): def wrapper(item, board, helper): - if not state.has_visited(item["no"], helper): + if not state.has_visited(helper.item_id(item), helper): func(item, board, helper) - state.mark_visited(item["no"], helper) + state.mark_visited(helper.item_id(item), helper) return wrapper @@ -110,10 +105,10 @@ class ChanState: cur = conn.cursor() cur.execute( "SELECT last_modified FROM threads WHERE thread=? AND chan=?", - (thread["no"], helper.db_id) + (helper.item_id(thread), helper.db_id) ) row = cur.fetchone() - if not row or thread["last_modified"] != row[0]: + if not row or helper.thread_mtime(thread) != row[0]: return True return False @@ -124,8 +119,8 @@ class ChanState: "VALUES (?,?,?) " "ON CONFLICT (thread, chan) " "DO UPDATE SET last_modified=?", - (thread["no"], thread["last_modified"], helper.db_id, - thread["last_modified"]) + (helper.item_id(thread), helper.thread_mtime(thread), helper.db_id, + helper.thread_mtime(thread)) ) conn.commit() @@ -144,7 +139,7 @@ def publish_worker(queue: Queue, helper): @once def publish(item, board, helper): - item_type = "thread" if "sub" in item else "post" + item_type = helper.item_type(item) post_process(item, board, helper) chan_channel.basic_publish( @@ -154,9 +149,9 @@ def publish(item, board, helper): ) if MONITORING: - distance = datetime.utcnow() - datetime.fromtimestamp(item["time"]) + distance = datetime.utcnow() - datetime.fromtimestamp(helper.item_mtime(item)) monitoring.log([{ - "measurement": helper.db_id, + "measurement": chan, "time": str(datetime.utcnow()), "tags": { "board": board