commit 210d0327039a5537b0d1a6b3291c70bd92fd770d Author: simon987 Date: Tue Aug 13 15:26:01 2019 -0400 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ac6df84 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.idea/ +__pychache__/ +*.pyc +*.iml +*.db +*.log \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..d41a47f --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +### chan_feed + +Daemon that fetches posts from compatible *chan +image boards and publishes serialised JSON to RabbitMQ + for real-time ingest. + +Compatible image boards: 4chan, lainchan, uboachan, +22chan, wizchan, 1chan. + +Can optionally push monitoring data to InfluxDB. Below is an +example of Grafana being used to display it. + +![monitoring.png](monitoring.png) diff --git a/chan.py b/chan.py new file mode 100644 index 0000000..c2c8ba5 --- /dev/null +++ b/chan.py @@ -0,0 +1,92 @@ +class ChanHelper: + def __init__(self, db_id, base_url, image_url, thread_path, image_path, boards): + self.db_id = db_id + self._base_url = base_url + self._image_url = image_url + self._thread_path = thread_path + self._image_path = image_path + self.boards = boards + + def image_url(self, board, tim, extension): + return "%s%s%s%s%s" % (self._image_url, board, self._image_path, tim, extension) + + def threads_url(self, board): + return "%s%s/threads.json" % (self._base_url, board) + + def posts_url(self, board, thread): + return "%s%s%s%d.json" % (self._base_url, board, self._thread_path, thread) + + +CHANS = { + "4chan": ChanHelper( + 1, + "https://a.4cdn.org/", + "https://i.4cdn.org/", + "/thread/", + "/", + [ + "a", "b", "c", "d", "e", "f", "g", "gif", "h", "hr", + "k", "m", "o", "p", "r", "s", "t", "u", "v", "vg", + "vr", "w", "wg", "i", "ic", "r9k", "s4s", "vip", "qa", + "cm", "hm", "lgbt", "y", "3", "aco", "adv", "an", "asp", + "bant", "biz", "cgl", "ck", "co", "diy", "fa", "fit", + "gd", "hc", "his", "int", "jp", "lit", "mlp", "mu", "n", + "news", "out", "po", "pol", "qst", "sci", "soc", "sp", + "tg", "toy", "trv", "tv", "vp", "wsg", "wsr", "x" + ] + ), + "lainchan": ChanHelper( + 2, + "https://lainchan.org/", + "https://lainchan.org/", + "/res/", + "/src/", + [ + "λ", "diy", "sec", "tech", "inter", "lit", "music", "vis", + "hum", "drg", "zzz", "layer" "q", "r", "cult", "psy", + "mega", "random" + ] + ), + "uboachan": ChanHelper( + 3, + "https://uboachan.net/", + "https://uboachan.net/", + "/res/", + "/src/", + [ + "yn", "yndd", "fg", "yume", "o", "lit", "media", "og", + "ig", "2", "ot", "hikki", "cc", "x", "sugg" + ] + ), + "22chan": ChanHelper( + 4, + "https://22chan.org/", + "https://22chan.org/", + "/res/", + "/src/", + [ + "a", "b", "f", "feels", "i", "k", "mu", "pol", "sewers", + "sg", "t", "vg" + ] + ), + "wizchan": ChanHelper( + 5, + "https://wizchan.org/", + "https://wizchan.org/", + "/res/", + "/src/", + [ + "wiz", "dep", "hob", "lounge", "jp", "meta", "games", "music", + ] + ), + "1chan": ChanHelper( + 6, + "https://www.1chan.net/", + "https://www.1chan.net/", + "/res/", + "/src/", + [ + "rails" + ] + ) +} diff --git a/monitoring.png b/monitoring.png new file mode 100644 index 0000000..ae76b18 Binary files /dev/null and b/monitoring.png differ diff --git a/monitoring.py b/monitoring.py new file mode 100644 index 0000000..23f945c --- /dev/null +++ b/monitoring.py @@ -0,0 +1,18 @@ +from influxdb import InfluxDBClient + +client = InfluxDBClient("localhost", 8086, "root", "root", "chan_feed") + + +def init(): + db_exists = False + for db in client.get_list_database(): + if db["name"] == "chan_feed": + db_exists = True + break + + if not db_exists: + client.create_database("chan_feed") + + +def log(event): + client.write_points(event) diff --git a/post_process.py b/post_process.py new file mode 100644 index 0000000..764b3b0 --- /dev/null +++ b/post_process.py @@ -0,0 +1,45 @@ +import re + +LINK_RE = re.compile(r"(https?://[\w\-_.]+\.[a-z]{2,4}([^\s<'\"]*|$))") + + +def post_process(thing, board, helper): + thing["v"] = 1.0 + + thing["board"] = board + thing["chan"] = helper.db_id + + if "com" in thing and thing["com"]: + thing["urls"] = get_links_from_body(thing["com"]) + elif "sub" in thing and thing["sub"]: + thing["urls"] = get_links_from_body(thing["sub"]) + if "fsize" in thing and thing["fsize"]: + url = helper.image_url(board, thing["tim"], thing["ext"]) + if "urls" in thing: + thing["urls"].append(url) + else: + thing["urls"] = [url] + if "urls" not in thing: + thing["urls"] = [] + + return thing + + +def get_links_from_body(body): + result = set() + + body = body \ + .replace("", "") \ + .replace("", "") \ + .replace(" dot ", ".") + + for match in LINK_RE.finditer(body): + url = match.group(1) + if is_external(url): + result.add(url) + + return list(result) + + +def is_external(url): + return not url.startswith(("#", "/")) diff --git a/run.py b/run.py new file mode 100644 index 0000000..7f26786 --- /dev/null +++ b/run.py @@ -0,0 +1,195 @@ +import datetime +import json +import sqlite3 +import sys +import traceback +from datetime import datetime +from queue import Queue +from threading import Thread + +import pika + +import monitoring +from chan import CHANS +from post_process import post_process +from util import logger, Web + +MONITORING = True + + +class ChanScanner: + def __init__(self, helper): + self.web = Web(monitoring if MONITORING else None) + self.helper = helper + self.state = ChanState() + + def _fetch_threads(self, board): + r = self.web.get(self.helper.threads_url(board)) + if r.status_code == 200: + return r.json() + return [] + + def _fetch_posts(self, board, thread): + r = self.web.get(self.helper.posts_url(board, thread)) + if r.status_code == 200: + return r.json() + return {"posts": []} + + def _threads(self, board): + for page in self._fetch_threads(board): + for thread in page["threads"]: + yield thread + + def _posts(self, board): + for thread in sorted(self._threads(board), key=lambda x: x["no"]): + if self.state.has_new_posts(thread, self.helper): + for post in sorted(self._fetch_posts(board, thread["no"])["posts"], key=lambda x: x["no"]): + yield post + self.state.mark_thread_as_visited(thread, self.helper) + + def all_posts(self): + for board in self.helper.boards: + for post in self._posts(board): + yield post, board + + +def once(func): + def wrapper(item, board, helper): + if not state.has_visited(item["no"], helper): + func(item, board, helper) + state.mark_visited(item["no"], helper) + + return wrapper + + +class ChanState: + def __init__(self): + self._db = "state.db" + + with sqlite3.connect(self._db) as conn: + conn.execute( + "CREATE TABLE IF NOT EXISTS posts " + "(" + " post INT," + " ts INT DEFAULT (strftime('%s','now'))," + " chan INT," + " PRIMARY KEY(post, chan)" + ")" + ) + conn.execute( + "CREATE TABLE IF NOT EXISTS threads " + "(" + " thread INT," + " last_modified INT," + " ts INT DEFAULT (strftime('%s','now'))," + " chan INT," + " PRIMARY KEY(thread, chan)" + ")" + ) + conn.execute("PRAGMA journal_mode=wal") + conn.commit() + + def mark_visited(self, item: int, helper): + with sqlite3.connect(self._db) as conn: + conn.execute( + "INSERT INTO posts (post, chan) VALUES (?,?)", + (item, helper.db_id) + ) + + def has_visited(self, item: int, helper): + with sqlite3.connect(self._db) as conn: + cur = conn.cursor() + cur.execute( + "SELECT post FROM posts WHERE post=? AND chan=?", + (item, helper.db_id) + ) + return cur.fetchone() is not None + + def has_new_posts(self, thread, helper): + with sqlite3.connect(self._db) as conn: + cur = conn.cursor() + cur.execute( + "SELECT last_modified FROM threads WHERE thread=? AND chan=?", + (thread["no"], helper.db_id) + ) + row = cur.fetchone() + if not row or thread["last_modified"] != row[0]: + return True + return False + + def mark_thread_as_visited(self, thread, helper): + with sqlite3.connect(self._db) as conn: + conn.execute( + "INSERT INTO threads (thread, last_modified, chan) " + "VALUES (?,?,?) " + "ON CONFLICT (thread, chan) " + "DO UPDATE SET last_modified=?", + (thread["no"], thread["last_modified"], helper.db_id, + thread["last_modified"]) + ) + conn.commit() + + +def publish_worker(queue: Queue, helper): + while True: + try: + item, board = queue.get() + publish(item, board, helper) + + except Exception as e: + logger.error(str(e) + ": " + traceback.format_exc()) + finally: + queue.task_done() + + +@once +def publish(item, board, helper): + item_type = "thread" if "sub" in item else "post" + post_process(item, board, helper) + + chan_channel.basic_publish( + exchange='chan', + routing_key="%d.%s.%s" % (helper.db_id, item_type, board), + body=json.dumps(item) + ) + + if MONITORING: + distance = datetime.utcnow() - datetime.fromtimestamp(item["time"]) + monitoring.log([{ + "measurement": helper.db_id, + "time": str(datetime.utcnow()), + "tags": { + "board": board + }, + "fields": { + "distance": distance.total_seconds() + } + }]) + + +if __name__ == "__main__": + + if len(sys.argv) < 3: + logger.error("You must specify RabbitMQ host & chan!") + quit(1) + + rabbitmq_host = sys.argv[1] + chan = sys.argv[2] + chan_helper = CHANS[chan] + + if MONITORING: + monitoring.init() + state = ChanState() + + publish_q = Queue() + publish_thread = Thread(target=publish_worker, args=(publish_q, chan_helper)) + publish_thread.start() + + rabbit = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host)) + chan_channel = rabbit.channel() + chan_channel.exchange_declare(exchange="chan", exchange_type="topic") + + s = ChanScanner(chan_helper) + while True: + for p, b in s.all_posts(): + publish_q.put((p, b)) diff --git a/util.py b/util.py new file mode 100644 index 0000000..0f074ba --- /dev/null +++ b/util.py @@ -0,0 +1,60 @@ +import logging +import sys +import time +from datetime import datetime +from logging import FileHandler, StreamHandler + +import requests + +last_time_called = dict() + +logger = logging.getLogger("default") +logger.setLevel(logging.DEBUG) + +formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') +file_handler = FileHandler("chan_feed.log") +file_handler.setFormatter(formatter) +for h in logger.handlers: + logger.removeHandler(h) +logger.addHandler(file_handler) +logger.addHandler(StreamHandler(sys.stdout)) + + +def rate_limit(per_second): + min_interval = 1.0 / float(per_second) + + def decorate(func): + last_time_called[func] = 0 + + def wrapper(*args, **kwargs): + elapsed = time.perf_counter() - last_time_called[func] + wait_time = min_interval - elapsed + if wait_time > 0: + time.sleep(wait_time) + + last_time_called[func] = time.perf_counter() + return func(*args, **kwargs) + + return wrapper + + return decorate + + +class Web: + def __init__(self, monitoring): + self.session = requests.Session() + self.monitoring = monitoring + + @rate_limit(1 / 2) # TODO: per chan rate limit? + def get(self, url, **kwargs): + r = self.session.get(url, **kwargs) + logger.debug("GET %s <%d>" % (url, r.status_code)) + if self.monitoring: + self.monitoring.log([{ + "measurement": "web", + "time": str(datetime.utcnow()), + "fields": { + "status_code": r.status_code + } + }]) + return r