From eb40a9de689081632be0bbbb2182a8b08a4704fb Mon Sep 17 00:00:00 2001
From: simon987 <me@simon987.net>
Date: Wed, 23 Dec 2020 19:30:31 -0500
Subject: [PATCH] Initial commit

---
 .gitignore         |   2 +
 Dockerfile         |  11 +++
 docker-compose.yml |   9 +++
 poal.py            | 179 +++++++++++++++++++++++++++++++++++++++++++++
 post_process.py    |  22 ++++++
 requirements.txt   |   4 +
 run.py             |  88 ++++++++++++++++++++++
 state.py           |  27 +++++++
 util.py            |  55 ++++++++++++++
 9 files changed, 397 insertions(+)
 create mode 100644 .gitignore
 create mode 100644 Dockerfile
 create mode 100644 docker-compose.yml
 create mode 100644 poal.py
 create mode 100644 post_process.py
 create mode 100644 requirements.txt
 create mode 100644 run.py
 create mode 100644 state.py
 create mode 100644 util.py

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..ee64372
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.idea/
+*.pyc
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..bf21a04
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,11 @@
+FROM python:3.8
+
+ADD requirements.txt /requirements.txt
+RUN pip install -r requirements.txt
+
+COPY . /app
+
+RUN chmod 777 -R /app
+
+WORKDIR /app
+ENTRYPOINT ["python", "run.py"]
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..afc75fb
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,9 @@
+version: "3"
+
+services:
+  scraper:
+    image: simon987/poal_feed
+    restart: always
+    environment:
+      - "PF_REDIS_HOST="
+      - "PF_RPS=1"
diff --git a/poal.py b/poal.py
new file mode 100644
index 0000000..82e1dc1
--- /dev/null
+++ b/poal.py
@@ -0,0 +1,179 @@
+import json
+from json import JSONDecodeError
+from urllib.parse import urljoin
+
+from post_process import get_links_from_body
+from state import PoalState
+from util import Web, logger
+
+
+class PoalHelper:
+
+    def __init__(self, url, rps, boards):
+        self.rps = rps
+        self._boards = boards
+        self._url = url
+
+    def item_unique_id(self, item):
+        item_type = self.item_type(item)
+        if item_type == "post":
+            return item["pid"]
+        if item_type == "comment":
+            return item["cid"]
+
+        return item["uid"]
+
+    def item_urls(self, item):
+        item_type = self.item_type(item)
+        if item_type == "post":
+            urls = [
+                item["link"],
+                *(get_links_from_body(item["content"]) if item["content"] else [])
+            ]
+            if item["thumbnail"]:
+                urls.append(urljoin("https://poal.co/static/thumbs/", item["thumbnail"]))
+            return urls
+        return get_links_from_body(item["content"]) if item["content"] else []
+
+    def item_type(self, item):
+        if "cid" in item:
+            return "comment"
+        if "pid" in item:
+            return "post"
+        return "user"
+
+    def item_user(self, item):
+        return (item["user"], item["uid"]) if "uid" in item and item["user"] != "[Deleted]" else (None, None)
+
+    def boards(self):
+        return [b.replace("\\_", "_") for b in self._boards if not b.startswith("_")]
+
+    def posts_url(self, board, page=1):
+        return "%s/api/getPostList/%s/new/%d" % (self._url, board, page)
+
+    def comments_url(self, post_id, page=1):
+        return "%s/api/getPostComments/%s/%d" % (self._url, str(post_id), page)
+
+    def user_url(self, username):
+        return "%s/api/getUser/%s" % (self._url, username)
+
+    def parse_posts_list(self, r, board):
+        try:
+            j = json.loads(r.content.decode('utf-8', 'ignore'))
+            if "posts" not in j:
+                logger.warning("No posts in response for %s: %s" % (r.url, r.text,))
+                return [], None
+        except JSONDecodeError:
+            logger.warning("JSONDecodeError for %s:" % (r.url,))
+            logger.warning(r.text)
+            return [], None
+
+        posts = j["posts"]
+        if len(posts) == 25:
+            if len(r.history):
+                page = 1
+            else:
+                page = int(r.url.split("/")[-1])
+            return posts, self.posts_url(board, page=page + 1)
+        return posts, None
+
+    def parse_comments(self, r):
+        try:
+            j = json.loads(r.content.decode('utf-8', 'ignore'))
+        except JSONDecodeError:
+            logger.warning("JSONDecodeError for %s:" % (r.url,))
+            logger.warning(r.text)
+            return []
+
+        comments = j["comments"]
+        if len(comments) == 50:
+            if len(r.history):
+                pid = int(r.url.split("/")[-1])
+                page = 1
+            else:
+                pid = int(r.url.split("/")[-2])
+                page = int(r.url.split("/")[-1])
+            return comments, self.comments_url(pid, page=page + 1)
+        return comments, None
+
+
+class PoalScanner:
+
+    def __init__(self, state: PoalState, helper: PoalHelper):
+        self._state = state
+        self._helper = helper
+        self._web = Web(rps=helper.rps)
+
+    def _posts(self, board):
+        r = self._web.get(self._helper.posts_url(board))
+        if not r or r.status_code != 200:
+            return []
+
+        while True:
+            threads, next_url = self._helper.parse_posts_list(r, board)
+            for thread in threads:
+                yield thread
+            if not next_url:
+                break
+            r = self._web.get(next_url)
+            if not r or r.status_code != 200:
+                break
+
+    def _fetch_comments(self, post):
+        r = self._web.get(self._helper.comments_url(post["pid"]))
+        if not r or r.status_code != 200:
+            return []
+
+        while True:
+            comments, next_url = self._helper.parse_comments(r)
+            for comment in comments:
+                yield comment
+            if not next_url:
+                break
+            r = self._web.get(next_url)
+            if not r or r.status_code != 200:
+                break
+
+    def _fetch_user(self, username):
+        r = self._web.get(self._helper.user_url(username))
+        if not r or r.status_code != 200:
+            return None
+        return self.parse_user(r)
+
+    def parse_user(self, r):
+        try:
+            j = json.loads(r.content.decode('utf-8', 'ignore'))
+            if "error" in j:
+                return None
+            return j
+        except JSONDecodeError:
+            logger.warning("JSONDecodeError for %s:" % (r.url,))
+            logger.warning(r.text)
+
+    def _fetch_user_from_item(self, item):
+        user, uid = self._helper.item_user(item)
+        if user and not self._state.has_visited_user(uid):
+            j = self._fetch_user(user)
+            if j and "user" in j:
+                j["user"]["uid"] = uid
+                self._state.mark_user_as_visited(uid)
+                return j["user"]
+        return None
+
+    def all_items(self):
+        for board in self._helper.boards():
+            for post in self._posts(board):
+                cur_board = post["sub"]
+                yield post, cur_board
+
+                user = self._fetch_user_from_item(post)
+                if user:
+                    yield user, cur_board
+
+                if self._state.has_new_comments(post, self._helper):
+                    for comment in self._fetch_comments(post):
+                        yield comment, cur_board
+                        user = self._fetch_user_from_item(post)
+                        if user:
+                            yield user, cur_board
+                    self._state.mark_post_as_visited(post, self._helper)
diff --git a/post_process.py b/post_process.py
new file mode 100644
index 0000000..03205c3
--- /dev/null
+++ b/post_process.py
@@ -0,0 +1,22 @@
+from hexlib.regex import LINK_RE
+
+
+def post_process(item, board, helper):
+    item["_v"] = 1.0
+    item["_id"] = helper.item_unique_id(item)
+
+    if helper.item_type(item) != "user":
+        item["_urls"] = helper.item_urls(item)
+    if helper.item_type(item) == "comment":
+        item["_sub"] = board
+
+    return item
+
+
+def get_links_from_body(body):
+    result = []
+
+    for match in LINK_RE.finditer(body):
+        url = match.group(1)
+        result.append(url)
+    return result
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..86f3a42
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,4 @@
+requests
+urllib3
+git+git://github.com/simon987/hexlib.git
+redis
diff --git a/run.py b/run.py
new file mode 100644
index 0000000..630d7d4
--- /dev/null
+++ b/run.py
@@ -0,0 +1,88 @@
+import json
+import os
+import traceback
+from queue import Queue
+from threading import Thread
+
+import redis
+
+from poal import PoalScanner, PoalHelper
+from post_process import post_process
+from state import PoalState
+from util import logger
+
+REDIS_HOST = os.environ.get("PF_REDIS_HOST", "localhost")
+REDIS_PORT = os.environ.get("PF_REDIS_PORT", 6379)
+PF_PUBLISH = os.environ.get("PF_PUBLISH", False)
+PF_RPS = os.environ.get("PF_RPS", 1)
+
+ARC_LISTS = os.environ.get("PF_ARC_LISTS", "arc").split(",")
+
+
+def publish_worker(queue: Queue, helper):
+    while True:
+        try:
+            item, board = queue.get()
+            if item is None:
+                break
+            publish(item, board, helper)
+
+        except Exception as e:
+            logger.error(str(e) + ": " + traceback.format_exc())
+        finally:
+            queue.task_done()
+
+
+def once(func):
+    def wrapper(item, board, helper):
+        if not state.has_visited(helper.item_unique_id(item)):
+            func(item, board, helper)
+            state.mark_visited(helper.item_unique_id(item))
+
+    return wrapper
+
+
+@once
+def publish(item, board, helper):
+    post_process(item, board, helper)
+
+    item_type = helper.item_type(item)
+    routing_key = "%s.%s" % (item_type, board)
+
+    message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
+    if PF_PUBLISH:
+        rdb.publish("poal." + routing_key, message)
+    for arc in ARC_LISTS:
+        rdb.lpush(arc + ".poal." + routing_key, message)
+
+
+HELPER = PoalHelper(
+    boards=(
+        "all",
+        # TODO: Are there hidden boards that do not show up in /all ?
+    ),
+    rps=PF_RPS,
+    url="https://poal.co"
+)
+
+if __name__ == "__main__":
+
+    state = PoalState("poal", REDIS_HOST, REDIS_PORT)
+    rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
+
+    publish_q = Queue()
+    for _ in range(3):
+        publish_thread = Thread(target=publish_worker, args=(publish_q, HELPER))
+        publish_thread.setDaemon(True)
+        publish_thread.start()
+
+    s = PoalScanner(state, HELPER)
+    while True:
+        try:
+            for item, board in s.all_items():
+                publish_q.put((item, board))
+        except KeyboardInterrupt as e:
+            print("cleanup..")
+            for _ in range(3):
+                publish_q.put((None, None))
+            break
diff --git a/state.py b/state.py
new file mode 100644
index 0000000..8f08cc8
--- /dev/null
+++ b/state.py
@@ -0,0 +1,27 @@
+from hexlib.db import VolatileState, VolatileBooleanState
+
+
+class PoalState:
+    def __init__(self, prefix, host, port):
+        self._posts = VolatileState(prefix, host=host, port=port)
+        self._comments = VolatileBooleanState(prefix, host=host, port=port)
+        self._users = VolatileBooleanState(prefix, host=host, port=port)
+
+    def has_visited(self, item_id):
+        return self._comments["comments"][item_id]
+
+    def mark_visited(self, item_id):
+        self._comments["comments"][item_id] = True
+
+    def mark_post_as_visited(self, post, helper):
+        self._posts["posts"][helper.item_unique_id(post)] = post["comments"]
+
+    def has_new_comments(self, post, helper):
+        comment_count = self._posts["posts"][helper.item_unique_id(post)]
+        return comment_count is None or post["comments"] > comment_count
+
+    def has_visited_user(self, uid):
+        return self._users["users"][uid.replace("-", "")]
+
+    def mark_user_as_visited(self, uid):
+        self._users["users"][uid.replace("-", "")] = True
diff --git a/util.py b/util.py
new file mode 100644
index 0000000..addea9d
--- /dev/null
+++ b/util.py
@@ -0,0 +1,55 @@
+import logging
+import sys
+import traceback
+from logging import StreamHandler
+
+import requests
+from hexlib.misc import rate_limit
+
+logger = logging.getLogger("default")
+logger.setLevel(logging.DEBUG)
+
+formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
+for h in logger.handlers:
+    logger.removeHandler(h)
+logger.addHandler(StreamHandler(sys.stdout))
+
+UA = "User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/83.0"
+
+
+class Web:
+    def __init__(self, rps):
+        self.session = requests.Session()
+        self._rps = rps
+
+        @rate_limit(self._rps)
+        def _get(url, **kwargs):
+            if "headers" in kwargs:
+                kwargs["headers"]["User-Agent"] = UA
+            else:
+                kwargs["headers"] = {"User-Agent": UA}
+            retries = 3
+
+            while retries > 0:
+                retries -= 1
+                try:
+                    return self.session.get(url, **kwargs)
+                except KeyboardInterrupt as e:
+                    raise e
+                except Exception as e:
+                    logger.warning("Error with request %s: %s" % (url, str(e)))
+            raise Exception("Gave up request after maximum number of retries")
+
+        self._get = _get
+
+    def get(self, url, **kwargs):
+        try:
+            r = self._get(url, **kwargs)
+
+            logger.debug("GET %s <%d>" % (url, r.status_code))
+            return r
+        except KeyboardInterrupt as e:
+            raise e
+        except Exception as e:
+            logger.error(str(e) + traceback.format_exc())
+            return None