From cfaa9d92c971df9c00506cdec16c361725717ab6 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 5 Sep 2019 15:30:30 -0400 Subject: [PATCH] Bug fixes, HTML boards WIP --- chan.py | 159 ++++++++++++++++++++++++++++++++++++++++------- post_process.py | 4 +- requirements.txt | 1 + run.py | 77 ++++++++++++++--------- 4 files changed, 187 insertions(+), 54 deletions(-) diff --git a/chan.py b/chan.py index 4949917..6521b29 100644 --- a/chan.py +++ b/chan.py @@ -1,4 +1,7 @@ import json +from urllib.parse import urljoin + +from bs4 import BeautifulSoup from post_process import get_links_from_body @@ -21,6 +24,95 @@ class ChanHelper: def posts_url(self, board, thread): return "%s%s%s%d.json" % (self._base_url, board, self._thread_path, thread) + def board_hash(self, board): + return str((self.boards.index(board) + 1) * 10000) + + @staticmethod + def item_id(item): + raise NotImplementedError + + def item_unique_id(self, item, board): + return int(self.board_hash(board) + str(self.item_id(item))) + + @staticmethod + def thread_mtime(thread): + raise NotImplementedError + + def item_urls(self, item, board): + raise NotImplementedError + + @staticmethod + def item_type(item): + raise NotImplementedError + + @staticmethod + def parse_threads_list(content): + raise NotImplementedError + + @staticmethod + def parse_thread(r): + raise NotImplementedError + + +class HtmlChanHelper(ChanHelper): + + def threads_url(self, board): + return "%s%s/" % (self._base_url, board) + + def posts_url(self, board, thread): + return "%s%s%s%d.html" % (self._base_url, board, self._thread_path, thread) + + @staticmethod + def item_id(item): + return item["id"] + + def item_urls(self, item, board): + return [] + + @staticmethod + def item_type(item): + return item["type"] + + @staticmethod + def thread_mtime(thread): + return -1 + + def parse_threads_list(self, r): + soup = BeautifulSoup(r.text, "html.parser") + + threads = [] + + for threadEl in soup.find_all("div", attrs={"class": "opCell"}): + threads.append({ + "id": int(threadEl.get("id")), + }) + + next_url = soup.find("a", attrs={"id": "linkNext"}) + if next_url: + return threads, urljoin(r.url, next_url.get("href")) + return threads, None + + @staticmethod + def parse_thread(r): + soup = BeautifulSoup(r.text, "html.parser") + + op_el = soup.find("div", attrs={"class": "innerOP"}) + yield { + "id": int(soup.find("div", attrs={"class": "opCell"}).get("id")), + "type": "thread", + "html": str(op_el), + } + + for post_el in soup.find_all("div", attrs={"class": "postCell"}): + yield { + "id": int(post_el.get("id")), + "type": "post", + "html": str(post_el), + } + + +class JsonChanHelper(ChanHelper): + @staticmethod def item_id(item): return item["no"] @@ -46,32 +138,34 @@ class ChanHelper: return thread["last_modified"] @staticmethod - def parse_threads_list(content): - j = json.loads(content) + def parse_threads_list(r): + j = json.loads(r.text) + threads = [] for page in j: for thread in page["threads"]: - yield thread + threads.append(thread) + return threads, None @staticmethod - def parse_thread(content): - j = json.loads(content) + def parse_thread(r): + j = json.loads(r.text) return j["posts"] -class RussianChanHelper(ChanHelper): +class RussianJsonChanHelper(ChanHelper): @staticmethod def item_id(item): return int(item["num"]) @staticmethod - def parse_threads_list(content): - j = json.loads(content) - return j["threads"] + def parse_threads_list(r): + j = json.loads(r.text) + return j["threads"], None @staticmethod - def parse_thread(content): - j = json.loads(content) + def parse_thread(r): + j = json.loads(r.text) for thread in j["threads"]: for post in thread["posts"]: yield post @@ -92,9 +186,6 @@ class RussianChanHelper(ChanHelper): elif "subject" in item and item["subject"]: urls.update(get_links_from_body(item["subject"])) - if urls: - print(list(urls)) - for file in item["files"]: urls.add(self._base_url + file["path"]) @@ -102,7 +193,7 @@ class RussianChanHelper(ChanHelper): CHANS = { - "4chan": ChanHelper( + "4chan": JsonChanHelper( 1, "https://a.4cdn.org/", "https://i.4cdn.org/", @@ -119,7 +210,7 @@ CHANS = { "tg", "toy", "trv", "tv", "vp", "wsg", "wsr", "x" ] ), - "lainchan": ChanHelper( + "lainchan": JsonChanHelper( 2, "https://lainchan.org/", "https://lainchan.org/", @@ -127,11 +218,11 @@ CHANS = { "/src/", [ "λ", "diy", "sec", "tech", "inter", "lit", "music", "vis", - "hum", "drg", "zzz", "layer" "q", "r", "cult", "psy", - "mega", "random" + "hum", "drg", "zzz", "layer", "q", "r", "cult", "psy", + "mega", ] ), - "uboachan": ChanHelper( + "uboachan": JsonChanHelper( 3, "https://uboachan.net/", "https://uboachan.net/", @@ -142,7 +233,7 @@ CHANS = { "ig", "2", "ot", "hikki", "cc", "x", "sugg" ] ), - "22chan": ChanHelper( + "22chan": JsonChanHelper( 4, "https://22chan.org/", "https://22chan.org/", @@ -153,7 +244,7 @@ CHANS = { "sg", "t", "vg" ] ), - "wizchan": ChanHelper( + "wizchan": JsonChanHelper( 5, "https://wizchan.org/", "https://wizchan.org/", @@ -163,7 +254,18 @@ CHANS = { "wiz", "dep", "hob", "lounge", "jp", "meta", "games", "music", ] ), - "2chhk": RussianChanHelper( + # TODO + "1chan": ChanHelper( + 6, + "https://www.1chan.net/", + "https://www.1chan.net/", + "/res/", + "/src/", + [ + "rails" + ], + ), + "2chhk": RussianJsonChanHelper( 7, "https://2ch.hk/", "https://2ch.hk/", @@ -181,5 +283,16 @@ CHANS = { "a", "fd", "ja", "ma", "vn", "fg", "fur", "gg", "ga", "vape", "h", "ho", "hc", "e", "fet", "sex", "fag" ], - ) + ), + # TODO + "endchan": HtmlChanHelper( + 8, + "https://endchan.net/", + "https://endchan.net/", + "/res/", + "/.media/", + [ + "yuri" + ], + ), } diff --git a/post_process.py b/post_process.py index b0d9349..2ba662f 100644 --- a/post_process.py +++ b/post_process.py @@ -4,8 +4,8 @@ LINK_RE = re.compile(r"(https?://[\w\-_.]+\.[a-z]{2,4}([^\s<'\"]*|$))") def post_process(item, board, helper): - item["_v"] = 1.2 - item["_id"] = helper.item_id(item) + item["_v"] = 1.3 + item["_id"] = helper.item_unique_id(item, board) item["_board"] = board item["_chan"] = helper.db_id diff --git a/requirements.txt b/requirements.txt index dbf739d..0be0b80 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ requests[socks] stem influxdb pika +bs4 \ No newline at end of file diff --git a/run.py b/run.py index ff52dc7..8dc83b8 100644 --- a/run.py +++ b/run.py @@ -25,22 +25,31 @@ class ChanScanner: def _threads(self, board): r = self.web.get(self.helper.threads_url(board)) - if r.status_code == 200: - return self.helper.parse_threads_list(r.text) - return [] + if r.status_code != 200: + return [] + + while True: + threads, next_url = self.helper.parse_threads_list(r) + for thread in threads: + yield thread + if not next_url: + break + r = self.web.get(next_url) + if r.status_code != 200: + break def _fetch_posts(self, board, thread): r = self.web.get(self.helper.posts_url(board, thread)) if r.status_code == 200: - return self.helper.parse_thread(r.text) + return self.helper.parse_thread(r) return [] def _posts(self, board): for thread in self._threads(board): - if self.state.has_new_posts(thread, self.helper): + if self.state.has_new_posts(thread, self.helper, board): for post in self._fetch_posts(board, self.helper.item_id(thread)): yield post - self.state.mark_thread_as_visited(thread, self.helper) + self.state.mark_thread_as_visited(thread, self.helper, board) def all_posts(self): for board in self.helper.boards: @@ -50,9 +59,9 @@ class ChanScanner: def once(func): def wrapper(item, board, helper): - if not state.has_visited(helper.item_id(item), helper): + if not state.has_visited(helper.item_unique_id(item, board), helper): func(item, board, helper) - state.mark_visited(helper.item_id(item), helper) + state.mark_visited(helper.item_unique_id(item, board), helper) return wrapper @@ -100,26 +109,30 @@ class ChanState: ) return cur.fetchone() is not None - def has_new_posts(self, thread, helper): + def has_new_posts(self, thread, helper, board): + mtime = helper.thread_mtime(thread) + if mtime == -1: + return True + with sqlite3.connect(self._db, timeout=5000) as conn: cur = conn.cursor() cur.execute( "SELECT last_modified FROM threads WHERE thread=? AND chan=?", - (helper.item_id(thread), helper.db_id) + (helper.item_unique_id(thread, board), helper.db_id) ) row = cur.fetchone() if not row or helper.thread_mtime(thread) != row[0]: return True return False - def mark_thread_as_visited(self, thread, helper): + def mark_thread_as_visited(self, thread, helper, board): with sqlite3.connect(self._db, timeout=5000) as conn: conn.execute( "INSERT INTO threads (thread, last_modified, chan) " "VALUES (?,?,?) " "ON CONFLICT (thread, chan) " "DO UPDATE SET last_modified=?", - (helper.item_id(thread), helper.thread_mtime(thread), helper.db_id, + (helper.item_unique_id(thread, board), helper.thread_mtime(thread), helper.db_id, helper.thread_mtime(thread)) ) conn.commit() @@ -142,24 +155,30 @@ def publish(item, board, helper): item_type = helper.item_type(item) post_process(item, board, helper) - chan_channel.basic_publish( - exchange='chan', - routing_key="%s.%s.%s" % (chan, item_type, board), - body=json.dumps(item) - ) + while True: + try: + chan_channel.basic_publish( + exchange='chan', + routing_key="%s.%s.%s" % (chan, item_type, board), + body=json.dumps(item) + ) - if MONITORING: - distance = datetime.utcnow() - datetime.fromtimestamp(helper.item_mtime(item)) - monitoring.log([{ - "measurement": chan, - "time": str(datetime.utcnow()), - "tags": { - "board": board - }, - "fields": { - "distance": distance.total_seconds() - } - }]) + if MONITORING: + distance = datetime.utcnow() - datetime.fromtimestamp(helper.item_mtime(item)) + monitoring.log([{ + "measurement": chan, + "time": str(datetime.utcnow()), + "tags": { + "board": board + }, + "fields": { + "distance": distance.total_seconds() + } + }]) + break + except Exception as e: + logger.debug(traceback.format_exc()) + logger.error(str(e)) if __name__ == "__main__":