commit 880ae0014e5b5c6611a0d3a41e0ebce2a88b6df0
Author: simon987 <me@simon987.net>
Date:   Sun Dec 27 14:25:27 2020 -0500

    initial working version

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..ee64372
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.idea/
+*.pyc
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..a68487c
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,11 @@
+FROM python:3.8
+
+ADD requirements.txt /requirements.txt
+RUN pip install --no-cache-dir -r requirements.txt
+
+COPY . /app
+
+RUN chmod 777 -R /app
+
+WORKDIR /app
+ENTRYPOINT ["python", "run.py"]
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..45a1306
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,10 @@
+version: "3"
+
+services:
+  scraper:
+    image: simon987/parler_feed
+    restart: always
+    environment:
+      - "PAF_REDIS_HOST="
+      - "PAF_MST="
+      - "PAF_JST="
diff --git a/items.py b/items.py
new file mode 100644
index 0000000..c330537
--- /dev/null
+++ b/items.py
@@ -0,0 +1,117 @@
+import json
+
+
+class ParlerItem:
+    def __init__(self):
+        self.data = None
+
+    def serialize(self):
+        self.data["_v"] = self.version()
+        self.data["_id"] = self.item_id()
+        return json.dumps(self.data, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
+
+    def item_type(self) -> str:
+        raise NotImplementedError
+
+    def item_id(self) -> str:
+        raise NotImplementedError
+
+    def version(self):
+        return "1.0"
+
+
+class ParlerProfile(ParlerItem):
+    def __init__(self, data, int_id):
+        super().__init__()
+        self.data = data
+        self.data["_int_id"] = int_id
+
+    def item_type(self):
+        return "profile"
+
+    def item_id(self):
+        return self.data["id"]
+
+
+class ParlerFollower(ParlerItem):
+
+    def __init__(self, user_id, follower_id):
+        super().__init__()
+        self.data = {
+            "user_id": user_id,
+            "follower_id": follower_id,
+        }
+
+    def item_type(self):
+        return "follower"
+
+    def item_id(self):
+        return self.data["user_id"] + self.data["follower_id"]
+
+
+class ParlerFollowee(ParlerItem):
+
+    def __init__(self, user_id, followee_id):
+        super().__init__()
+        self.data = {
+            "user_id": user_id,
+            "followee_id": followee_id,
+        }
+
+    def item_type(self):
+        return "followee"
+
+    def item_id(self):
+        return self.data["user_id"] + self.data["followee_id"]
+
+
+class ParlerPost(ParlerItem):
+
+    def __init__(self, data):
+        super().__init__()
+        self.data = data
+
+    def item_type(self):
+        return "post"
+
+    def item_id(self):
+        return self.data["id"]
+
+
+class ParlerPostRef(ParlerItem):
+
+    def __init__(self, data):
+        super().__init__()
+        self.data = data
+
+    def item_type(self):
+        return "postref"
+
+    def item_id(self):
+        return self.data["id"]
+
+
+class ParlerUrl(ParlerItem):
+
+    def __init__(self, data):
+        super().__init__()
+        self.data = data
+
+    def item_type(self):
+        return "url"
+
+    def item_id(self):
+        return self.data["_id"]
+
+
+class ParlerComment(ParlerItem):
+
+    def __init__(self, data):
+        super().__init__()
+        self.data = data
+
+    def item_type(self):
+        return "comment"
+
+    def item_id(self):
+        return self.data["_id"]
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..bc4d1a8
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,4 @@
+requests
+git+git://github.com/simon987/hexlib.git
+git+git://github.com/simon987/parler-tricks.git
+redis
diff --git a/run.py b/run.py
new file mode 100644
index 0000000..8d77087
--- /dev/null
+++ b/run.py
@@ -0,0 +1,8 @@
+from scanner import ParlerScanner
+from state import ParlerState
+
+if __name__ == "__main__":
+    state = ParlerState()
+    scanner = ParlerScanner(state, threads=1)
+
+    scanner.scan_all_items()
diff --git a/scanner.py b/scanner.py
new file mode 100644
index 0000000..c730963
--- /dev/null
+++ b/scanner.py
@@ -0,0 +1,225 @@
+import json
+import os
+import traceback
+from queue import Queue
+from threading import Thread
+from time import sleep
+from urllib.parse import urlencode
+
+import redis
+import requests
+from parler import Parler
+
+from items import ParlerProfile, ParlerPost, ParlerPostRef, ParlerUrl, ParlerComment, ParlerItem, ParlerFollower, \
+    ParlerFollowee
+from state import ParlerState
+from util import logger
+
+ARC_LISTS = os.environ.get("PAF_ARC_LISTS", "arc").split(",")
+REDIS_HOST = os.environ.get("PAF_REDIS_HOST", "localhost")
+REDIS_PORT = os.environ.get("PAF_REDIS_PORT", 6379)
+MST = os.environ.get("PAF_MST", None)
+JST = os.environ.get("PAF_JST", None)
+
+if not MST:
+    print("MST & JST MISSING!")
+    exit(-1)
+
+
+class SessionDebugWrapper(requests.Session):
+    def get(self, url, **kwargs):
+        retries = 3
+
+        while retries > 0:
+            retries -= 1
+            try:
+                r = super().get(url, **kwargs, timeout=15)
+                logger.debug(
+                    "GET %s <%d>"
+                    % (url + "?" + (urlencode(kwargs["params"]) if "params" in kwargs else ""), r.status_code)
+                )
+                if r.status_code == 429:
+                    sleep(1)
+                    raise Exception("rate limited")
+                return r
+            except KeyboardInterrupt as e:
+                raise e
+            except Exception as e:
+                logger.warning("Error with request %s: %s" % (url, str(e)))
+        raise Exception("Gave up request after maximum number of retries")
+
+
+def tmp_patch_api(api):
+    s = SessionDebugWrapper()
+    s.headers['User-Agent'] = 'Parler%20Staging/545 CFNetwork/978.0.7 Darwin 18.7.0'
+    s.cookies['mst'] = MST
+    s.cookies['jst'] = JST
+    api.user_api.s = s
+    api.feed_api.s = s
+    api.comments_api.s = s
+
+
+class ParlerScanner:
+
+    def __init__(self, state: ParlerState, threads):
+        self._state = state
+        self.api: Parler = Parler(mst=MST, jst=JST)
+        self._threads = []
+        self._q = Queue(maxsize=threads + 1)
+        self.rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
+
+        tmp_patch_api(self.api)
+
+        for _ in range(threads):
+            t = Thread(target=self._worker, daemon=True)
+            self._threads.append(t)
+            t.start()
+
+    def _worker(self):
+        logger.info("Starting worker thread")
+
+        while True:
+            try:
+                user_id, int_id = self._q.get()
+                if user_id is None:
+                    break
+                for item in self.process_userid(user_id, int_id):
+                    self.publish(item)
+
+            except Exception as e:
+                logger.error(str(e) + ": " + traceback.format_exc())
+            finally:
+                self._q.task_done()
+        logger.info("Worker thread finished")
+
+    def _iterate_endpoint(self, func, params, resume_endpoint, resume_id, items_keys):
+
+        current_key = self._state.resume_key(resume_endpoint, resume_id)
+        if not current_key:
+            current_key = ""
+
+        while True:
+            params["startkey"] = current_key
+            r = func(params=params)
+            if not r:
+                return []
+
+            j = json.loads(r.content.decode('utf-8', 'ignore'))
+            for items_key in items_keys:
+                if items_key in j and j[items_key]:
+                    for item in j[items_key]:
+                        yield item, items_key
+
+            if j["last"]:
+                self._state.set_resume_key(resume_endpoint, resume_id, None)
+                break
+
+            self._state.set_resume_key(resume_endpoint, resume_id, current_key)
+            current_key = j["next"]
+
+    def user_followers(self, user_id):
+        for profile, key in self._iterate_endpoint(
+                func=self.api.user_api.get_followers_for_user_id,
+                params={"id": user_id},
+                resume_endpoint="followers",
+                resume_id=user_id,
+                items_keys=["followers"]
+        ):
+            yield ParlerFollower(user_id=user_id, follower_id=profile["id"])
+
+    def user_followees(self, user_id):
+        for profile, key in self._iterate_endpoint(
+                func=self.api.user_api.get_following_for_user_id,
+                params={"id": user_id},
+                resume_endpoint="followees",
+                resume_id=user_id,
+                items_keys=["followees"]
+        ):
+            yield ParlerFollowee(user_id=user_id, followee_id=profile["id"])
+
+    def user_posts(self, user_id):
+        for item, key in self._iterate_endpoint(
+                func=self.api.feed_api.get_users_feed,
+                params={"id": user_id},
+                resume_endpoint="posts",
+                resume_id=user_id,
+                # Also available: 'users'
+                items_keys=["posts", "postRefs", "urls"]
+        ):
+            if key == "posts":
+                yield ParlerPost(data=item)
+            elif key == "postRefs":
+                yield ParlerPostRef(data=item)
+            elif key == "urls":
+                yield ParlerUrl(data=item)
+
+    def post_comments(self, post_id):
+        for item, key in self._iterate_endpoint(
+                func=self.api.comments_api.get_comments,
+                params={"id": post_id, "reverse": True},
+                resume_endpoint="comments",
+                resume_id=post_id,
+                # Also available: "users", "post", "postRefs"
+                items_keys=["comments"]
+        ):
+            yield ParlerComment(data=item)
+
+    def _get_user_id_hash(self, int_id):
+        s = self.api.user_api.s
+        r = s.get(f"https://api.parler.com/v3/uuidConversion/user/{int_id}")
+        if "No resource for URL" in r.text:
+            return None
+        return r.text.strip()
+
+    def fetch_profile(self, user_id, int_id):
+        r = self.api.user_api.get_profile_for_user(params={
+            "id": user_id
+        })
+
+        if not r:
+            return None
+
+        return ParlerProfile(data=json.loads(r.content.decode('utf-8', 'ignore')), int_id=int_id)
+
+    def user_ids(self):
+        current_key = self._state.resume_key("users", "it")
+        current_key = int(current_key) if current_key else 0
+
+        for int_id in range(current_key, 15000000):
+            user_id = self._get_user_id_hash(int_id)
+            if user_id and not self._state.has_visited_user(user_id):
+                yield user_id, int_id
+            self._state.set_resume_key("users", "it", int_id)
+
+    def process_userid(self, user_id, int_id):
+        profile = self.fetch_profile(user_id, int_id)
+
+        yield profile
+
+        for follow in self.user_followers(user_id):
+            yield follow
+        for follow in self.user_followees(user_id):
+            yield follow
+
+        for post in self.user_posts(user_id):
+            yield post
+
+            if (post.item_type() == "post" or post.item_type() == "postref") \
+                    and not self._state.has_visited_post(post):
+                for comment in self.post_comments(post.item_id()):
+                    yield comment
+                self._state.mark_visited_post(post.item_id())
+        self._state.mark_visited_user(user_id)
+
+    def publish(self, item: ParlerItem):
+        message = item.serialize()
+        item_type = item.item_type()
+
+        routing_key = "parler.%s.x" % (item_type,)
+
+        for arc in ARC_LISTS:
+            self.rdb.lpush(arc + "." + routing_key, message)
+
+    def scan_all_items(self):
+        for user_id, int_id in self.user_ids():
+            self._q.put((user_id, int_id))
diff --git a/state.py b/state.py
new file mode 100644
index 0000000..16254c9
--- /dev/null
+++ b/state.py
@@ -0,0 +1,38 @@
+import os
+
+from hexlib.db import VolatileBooleanState, VolatileState
+
+from items import ParlerItem
+
+REDIS_HOST = os.environ.get("PAF_REDIS_HOST", "localhost")
+REDIS_PORT = os.environ.get("PAF_REDIS_PORT", 6379)
+
+
+class ParlerState:
+
+    def __init__(self):
+        self._visited = VolatileBooleanState("parler", host=REDIS_HOST, port=REDIS_PORT)
+        self._resume = VolatileState("parler", host=REDIS_HOST, port=REDIS_PORT)
+
+    def resume_key(self, endpoint, user_id):
+        return self._resume[endpoint][user_id]
+
+    def set_resume_key(self, endpoint, user_id, key):
+        if key is None:
+            del self._resume[endpoint][user_id]
+        else:
+            self._resume[endpoint][user_id] = key
+
+    def has_visited_post(self, post: ParlerItem):
+        if post.data["comments"] == "0":
+            return True
+        return self._visited["post"][post.item_id()]
+
+    def mark_visited_post(self, post_id):
+        self._visited["post"][post_id] = True
+
+    def has_visited_user(self, user_id):
+        return self._visited["user"][user_id]
+
+    def mark_visited_user(self, user_id):
+        self._visited["user"][user_id] = True
diff --git a/util.py b/util.py
new file mode 100644
index 0000000..80abb8e
--- /dev/null
+++ b/util.py
@@ -0,0 +1,11 @@
+import logging
+import sys
+from logging import StreamHandler
+
+logger = logging.getLogger("default")
+logger.setLevel(logging.DEBUG)
+
+formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
+for h in logger.handlers:
+    logger.removeHandler(h)
+logger.addHandler(StreamHandler(sys.stdout))