commit eb40a9de689081632be0bbbb2182a8b08a4704fb Author: simon987 Date: Wed Dec 23 19:30:31 2020 -0500 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ee64372 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/ +*.pyc diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..bf21a04 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.8 + +ADD requirements.txt /requirements.txt +RUN pip install -r requirements.txt + +COPY . /app + +RUN chmod 777 -R /app + +WORKDIR /app +ENTRYPOINT ["python", "run.py"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..afc75fb --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,9 @@ +version: "3" + +services: + scraper: + image: simon987/poal_feed + restart: always + environment: + - "PF_REDIS_HOST=" + - "PF_RPS=1" diff --git a/poal.py b/poal.py new file mode 100644 index 0000000..82e1dc1 --- /dev/null +++ b/poal.py @@ -0,0 +1,179 @@ +import json +from json import JSONDecodeError +from urllib.parse import urljoin + +from post_process import get_links_from_body +from state import PoalState +from util import Web, logger + + +class PoalHelper: + + def __init__(self, url, rps, boards): + self.rps = rps + self._boards = boards + self._url = url + + def item_unique_id(self, item): + item_type = self.item_type(item) + if item_type == "post": + return item["pid"] + if item_type == "comment": + return item["cid"] + + return item["uid"] + + def item_urls(self, item): + item_type = self.item_type(item) + if item_type == "post": + urls = [ + item["link"], + *(get_links_from_body(item["content"]) if item["content"] else []) + ] + if item["thumbnail"]: + urls.append(urljoin("https://poal.co/static/thumbs/", item["thumbnail"])) + return urls + return get_links_from_body(item["content"]) if item["content"] else [] + + def item_type(self, item): + if "cid" in item: + return "comment" + if "pid" in item: + return "post" + return "user" + + def item_user(self, item): + return (item["user"], item["uid"]) if "uid" in item and item["user"] != "[Deleted]" else (None, None) + + def boards(self): + return [b.replace("\\_", "_") for b in self._boards if not b.startswith("_")] + + def posts_url(self, board, page=1): + return "%s/api/getPostList/%s/new/%d" % (self._url, board, page) + + def comments_url(self, post_id, page=1): + return "%s/api/getPostComments/%s/%d" % (self._url, str(post_id), page) + + def user_url(self, username): + return "%s/api/getUser/%s" % (self._url, username) + + def parse_posts_list(self, r, board): + try: + j = json.loads(r.content.decode('utf-8', 'ignore')) + if "posts" not in j: + logger.warning("No posts in response for %s: %s" % (r.url, r.text,)) + return [], None + except JSONDecodeError: + logger.warning("JSONDecodeError for %s:" % (r.url,)) + logger.warning(r.text) + return [], None + + posts = j["posts"] + if len(posts) == 25: + if len(r.history): + page = 1 + else: + page = int(r.url.split("/")[-1]) + return posts, self.posts_url(board, page=page + 1) + return posts, None + + def parse_comments(self, r): + try: + j = json.loads(r.content.decode('utf-8', 'ignore')) + except JSONDecodeError: + logger.warning("JSONDecodeError for %s:" % (r.url,)) + logger.warning(r.text) + return [] + + comments = j["comments"] + if len(comments) == 50: + if len(r.history): + pid = int(r.url.split("/")[-1]) + page = 1 + else: + pid = int(r.url.split("/")[-2]) + page = int(r.url.split("/")[-1]) + return comments, self.comments_url(pid, page=page + 1) + return comments, None + + +class PoalScanner: + + def __init__(self, state: PoalState, helper: PoalHelper): + self._state = state + self._helper = helper + self._web = Web(rps=helper.rps) + + def _posts(self, board): + r = self._web.get(self._helper.posts_url(board)) + if not r or r.status_code != 200: + return [] + + while True: + threads, next_url = self._helper.parse_posts_list(r, board) + for thread in threads: + yield thread + if not next_url: + break + r = self._web.get(next_url) + if not r or r.status_code != 200: + break + + def _fetch_comments(self, post): + r = self._web.get(self._helper.comments_url(post["pid"])) + if not r or r.status_code != 200: + return [] + + while True: + comments, next_url = self._helper.parse_comments(r) + for comment in comments: + yield comment + if not next_url: + break + r = self._web.get(next_url) + if not r or r.status_code != 200: + break + + def _fetch_user(self, username): + r = self._web.get(self._helper.user_url(username)) + if not r or r.status_code != 200: + return None + return self.parse_user(r) + + def parse_user(self, r): + try: + j = json.loads(r.content.decode('utf-8', 'ignore')) + if "error" in j: + return None + return j + except JSONDecodeError: + logger.warning("JSONDecodeError for %s:" % (r.url,)) + logger.warning(r.text) + + def _fetch_user_from_item(self, item): + user, uid = self._helper.item_user(item) + if user and not self._state.has_visited_user(uid): + j = self._fetch_user(user) + if j and "user" in j: + j["user"]["uid"] = uid + self._state.mark_user_as_visited(uid) + return j["user"] + return None + + def all_items(self): + for board in self._helper.boards(): + for post in self._posts(board): + cur_board = post["sub"] + yield post, cur_board + + user = self._fetch_user_from_item(post) + if user: + yield user, cur_board + + if self._state.has_new_comments(post, self._helper): + for comment in self._fetch_comments(post): + yield comment, cur_board + user = self._fetch_user_from_item(post) + if user: + yield user, cur_board + self._state.mark_post_as_visited(post, self._helper) diff --git a/post_process.py b/post_process.py new file mode 100644 index 0000000..03205c3 --- /dev/null +++ b/post_process.py @@ -0,0 +1,22 @@ +from hexlib.regex import LINK_RE + + +def post_process(item, board, helper): + item["_v"] = 1.0 + item["_id"] = helper.item_unique_id(item) + + if helper.item_type(item) != "user": + item["_urls"] = helper.item_urls(item) + if helper.item_type(item) == "comment": + item["_sub"] = board + + return item + + +def get_links_from_body(body): + result = [] + + for match in LINK_RE.finditer(body): + url = match.group(1) + result.append(url) + return result diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..86f3a42 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +requests +urllib3 +git+git://github.com/simon987/hexlib.git +redis diff --git a/run.py b/run.py new file mode 100644 index 0000000..630d7d4 --- /dev/null +++ b/run.py @@ -0,0 +1,88 @@ +import json +import os +import traceback +from queue import Queue +from threading import Thread + +import redis + +from poal import PoalScanner, PoalHelper +from post_process import post_process +from state import PoalState +from util import logger + +REDIS_HOST = os.environ.get("PF_REDIS_HOST", "localhost") +REDIS_PORT = os.environ.get("PF_REDIS_PORT", 6379) +PF_PUBLISH = os.environ.get("PF_PUBLISH", False) +PF_RPS = os.environ.get("PF_RPS", 1) + +ARC_LISTS = os.environ.get("PF_ARC_LISTS", "arc").split(",") + + +def publish_worker(queue: Queue, helper): + while True: + try: + item, board = queue.get() + if item is None: + break + publish(item, board, helper) + + except Exception as e: + logger.error(str(e) + ": " + traceback.format_exc()) + finally: + queue.task_done() + + +def once(func): + def wrapper(item, board, helper): + if not state.has_visited(helper.item_unique_id(item)): + func(item, board, helper) + state.mark_visited(helper.item_unique_id(item)) + + return wrapper + + +@once +def publish(item, board, helper): + post_process(item, board, helper) + + item_type = helper.item_type(item) + routing_key = "%s.%s" % (item_type, board) + + message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True) + if PF_PUBLISH: + rdb.publish("poal." + routing_key, message) + for arc in ARC_LISTS: + rdb.lpush(arc + ".poal." + routing_key, message) + + +HELPER = PoalHelper( + boards=( + "all", + # TODO: Are there hidden boards that do not show up in /all ? + ), + rps=PF_RPS, + url="https://poal.co" +) + +if __name__ == "__main__": + + state = PoalState("poal", REDIS_HOST, REDIS_PORT) + rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT) + + publish_q = Queue() + for _ in range(3): + publish_thread = Thread(target=publish_worker, args=(publish_q, HELPER)) + publish_thread.setDaemon(True) + publish_thread.start() + + s = PoalScanner(state, HELPER) + while True: + try: + for item, board in s.all_items(): + publish_q.put((item, board)) + except KeyboardInterrupt as e: + print("cleanup..") + for _ in range(3): + publish_q.put((None, None)) + break diff --git a/state.py b/state.py new file mode 100644 index 0000000..8f08cc8 --- /dev/null +++ b/state.py @@ -0,0 +1,27 @@ +from hexlib.db import VolatileState, VolatileBooleanState + + +class PoalState: + def __init__(self, prefix, host, port): + self._posts = VolatileState(prefix, host=host, port=port) + self._comments = VolatileBooleanState(prefix, host=host, port=port) + self._users = VolatileBooleanState(prefix, host=host, port=port) + + def has_visited(self, item_id): + return self._comments["comments"][item_id] + + def mark_visited(self, item_id): + self._comments["comments"][item_id] = True + + def mark_post_as_visited(self, post, helper): + self._posts["posts"][helper.item_unique_id(post)] = post["comments"] + + def has_new_comments(self, post, helper): + comment_count = self._posts["posts"][helper.item_unique_id(post)] + return comment_count is None or post["comments"] > comment_count + + def has_visited_user(self, uid): + return self._users["users"][uid.replace("-", "")] + + def mark_user_as_visited(self, uid): + self._users["users"][uid.replace("-", "")] = True diff --git a/util.py b/util.py new file mode 100644 index 0000000..addea9d --- /dev/null +++ b/util.py @@ -0,0 +1,55 @@ +import logging +import sys +import traceback +from logging import StreamHandler + +import requests +from hexlib.misc import rate_limit + +logger = logging.getLogger("default") +logger.setLevel(logging.DEBUG) + +formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') +for h in logger.handlers: + logger.removeHandler(h) +logger.addHandler(StreamHandler(sys.stdout)) + +UA = "User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0" + + +class Web: + def __init__(self, rps): + self.session = requests.Session() + self._rps = rps + + @rate_limit(self._rps) + def _get(url, **kwargs): + if "headers" in kwargs: + kwargs["headers"]["User-Agent"] = UA + else: + kwargs["headers"] = {"User-Agent": UA} + retries = 3 + + while retries > 0: + retries -= 1 + try: + return self.session.get(url, **kwargs) + except KeyboardInterrupt as e: + raise e + except Exception as e: + logger.warning("Error with request %s: %s" % (url, str(e))) + raise Exception("Gave up request after maximum number of retries") + + self._get = _get + + def get(self, url, **kwargs): + try: + r = self._get(url, **kwargs) + + logger.debug("GET %s <%d>" % (url, r.status_code)) + return r + except KeyboardInterrupt as e: + raise e + except Exception as e: + logger.error(str(e) + traceback.format_exc()) + return None