import datetime import json import sqlite3 import sys import time import traceback from datetime import datetime from queue import Queue from threading import Thread import pika from hexlib.monitoring import Monitoring from chan.chan import CHANS from post_process import post_process from util import logger, Web MONITORING = True BYPASS_RPS = False DBNAME = "chan_feed" if MONITORING: influxdb = Monitoring(DBNAME, logger=logger, batch_size=100, flush_on_exit=True) class ChanScanner: def __init__(self, helper, proxy): self.web = Web(influxdb if MONITORING else None, rps=helper.rps, get_method=helper.get_method, proxy=proxy) self.helper = helper self.state = ChanState() def _threads(self, board): r = self.web.get(self.helper.threads_url(board)) if not r or r.status_code != 200: return [] while True: threads, next_url = self.helper.parse_threads_list(r) for thread in threads: yield thread if not next_url: break r = self.web.get(next_url) if not r or r.status_code != 200: break def _fetch_posts(self, board, thread): r = self.web.get(self.helper.posts_url(board, thread)) if r and r.status_code == 200: return self.helper.parse_thread(r) return [] def _posts(self, board): for thread in self._threads(board): if self.state.has_new_posts(thread, self.helper, board): for post in self._fetch_posts(board, thread): yield post self.state.mark_thread_as_visited(thread, self.helper, board) def all_posts(self): for board in self.helper.boards(): for post in self._posts(board): yield post, board def once(func): def wrapper(item, board, helper, channel, web): if not state.has_visited(helper.item_unique_id(item, board), helper): func(item, board, helper, channel, web) state.mark_visited(helper.item_unique_id(item, board), helper) return wrapper class ChanState: def __init__(self): self._db = "state.db" with sqlite3.connect(self._db) as conn: conn.execute( "CREATE TABLE IF NOT EXISTS posts " "(" " post INT," " ts INT DEFAULT (strftime('%s','now'))," " chan INT," " PRIMARY KEY(post, chan)" ")" ) conn.execute( "CREATE TABLE IF NOT EXISTS threads " "(" " thread INT," " last_modified INT," " ts INT DEFAULT (strftime('%s','now'))," " chan INT," " PRIMARY KEY(thread, chan)" ")" ) conn.execute("PRAGMA journal_mode=wal") conn.commit() def mark_visited(self, item: int, helper): with sqlite3.connect(self._db) as conn: conn.execute( "INSERT INTO posts (post, chan) VALUES (?,?)", (item, helper.db_id) ) def has_visited(self, item: int, helper): with sqlite3.connect(self._db) as conn: cur = conn.cursor() cur.execute( "SELECT post FROM posts WHERE post=? AND chan=?", (item, helper.db_id) ) return cur.fetchone() is not None def has_new_posts(self, thread, helper, board): mtime = helper.thread_mtime(thread) if mtime == -1: return True with sqlite3.connect(self._db, timeout=5000) as conn: cur = conn.cursor() cur.execute( "SELECT last_modified, ts FROM threads WHERE thread=? AND chan=?", (helper.item_unique_id(thread, board), helper.db_id) ) row = cur.fetchone() if not row or helper.thread_mtime(thread) != row[0] or row[1] + 86400 < int(time.time()): return True return False def mark_thread_as_visited(self, thread, helper, board): with sqlite3.connect(self._db, timeout=5000) as conn: conn.execute( "INSERT INTO threads (thread, last_modified, chan) " "VALUES (?,?,?) " "ON CONFLICT (thread, chan) " "DO UPDATE SET last_modified=?, ts=(strftime('%s','now'))", (helper.item_unique_id(thread, board), helper.thread_mtime(thread), helper.db_id, helper.thread_mtime(thread)) ) conn.commit() def publish_worker(queue: Queue, helper, p): channel = connect() web = Web(influxdb if MONITORING else None, rps=helper.rps, get_method=helper.get_method, proxy=p) while True: try: item, board = queue.get() publish(item, board, helper, channel, web) except Exception as e: logger.error(str(e) + ": " + traceback.format_exc()) finally: queue.task_done() @once def publish(item, board, helper, channel, web): item_type = helper.item_type(item) post_process(item, board, helper, web) while True: try: channel.basic_publish( exchange='chan', routing_key="%s.%s.%s" % (chan, item_type, board), body=json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True) ) if MONITORING: distance = datetime.utcnow() - datetime.utcfromtimestamp(helper.item_mtime(item)) influxdb.log([{ "measurement": chan, "time": str(datetime.utcnow()), "tags": { "board": board }, "fields": { "distance": distance.total_seconds() } }]) break except Exception as e: # logger.debug(traceback.format_exc()) logger.error(str(e)) time.sleep(0.5) channel = connect() def connect(): rabbit = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host)) channel = rabbit.channel() channel.exchange_declare(exchange="chan", exchange_type="topic") return channel if __name__ == "__main__": if len(sys.argv) < 3: logger.error("You must specify RabbitMQ host & chan!") quit(1) rabbitmq_host = sys.argv[1] chan = sys.argv[2] chan_helper = CHANS[chan] proxy = None if len(sys.argv) > 3: proxy = sys.argv[3] logger.info("Using proxy %s" % proxy) if BYPASS_RPS: chan_helper.rps = 10 state = ChanState() publish_q = Queue() for _ in range(5): publish_thread = Thread(target=publish_worker, args=(publish_q, chan_helper, proxy)) publish_thread.start() s = ChanScanner(chan_helper, proxy) while True: for p, b in s.all_posts(): publish_q.put((p, b))