From 880ae0014e5b5c6611a0d3a41e0ebce2a88b6df0 Mon Sep 17 00:00:00 2001 From: simon987 Date: Sun, 27 Dec 2020 14:25:27 -0500 Subject: [PATCH] initial working version --- .gitignore | 2 + Dockerfile | 11 +++ docker-compose.yml | 10 ++ items.py | 117 +++++++++++++++++++++++ requirements.txt | 4 + run.py | 8 ++ scanner.py | 225 +++++++++++++++++++++++++++++++++++++++++++++ state.py | 38 ++++++++ util.py | 11 +++ 9 files changed, 426 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 items.py create mode 100644 requirements.txt create mode 100644 run.py create mode 100644 scanner.py create mode 100644 state.py create mode 100644 util.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ee64372 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/ +*.pyc diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a68487c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.8 + +ADD requirements.txt /requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY . /app + +RUN chmod 777 -R /app + +WORKDIR /app +ENTRYPOINT ["python", "run.py"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..45a1306 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +version: "3" + +services: + scraper: + image: simon987/parler_feed + restart: always + environment: + - "PAF_REDIS_HOST=" + - "PAF_MST=" + - "PAF_JST=" diff --git a/items.py b/items.py new file mode 100644 index 0000000..c330537 --- /dev/null +++ b/items.py @@ -0,0 +1,117 @@ +import json + + +class ParlerItem: + def __init__(self): + self.data = None + + def serialize(self): + self.data["_v"] = self.version() + self.data["_id"] = self.item_id() + return json.dumps(self.data, separators=(',', ':'), ensure_ascii=False, sort_keys=True) + + def item_type(self) -> str: + raise NotImplementedError + + def item_id(self) -> str: + raise NotImplementedError + + def version(self): + return "1.0" + + +class ParlerProfile(ParlerItem): + def __init__(self, data, int_id): + super().__init__() + self.data = data + self.data["_int_id"] = int_id + + def item_type(self): + return "profile" + + def item_id(self): + return self.data["id"] + + +class ParlerFollower(ParlerItem): + + def __init__(self, user_id, follower_id): + super().__init__() + self.data = { + "user_id": user_id, + "follower_id": follower_id, + } + + def item_type(self): + return "follower" + + def item_id(self): + return self.data["user_id"] + self.data["follower_id"] + + +class ParlerFollowee(ParlerItem): + + def __init__(self, user_id, followee_id): + super().__init__() + self.data = { + "user_id": user_id, + "followee_id": followee_id, + } + + def item_type(self): + return "followee" + + def item_id(self): + return self.data["user_id"] + self.data["followee_id"] + + +class ParlerPost(ParlerItem): + + def __init__(self, data): + super().__init__() + self.data = data + + def item_type(self): + return "post" + + def item_id(self): + return self.data["id"] + + +class ParlerPostRef(ParlerItem): + + def __init__(self, data): + super().__init__() + self.data = data + + def item_type(self): + return "postref" + + def item_id(self): + return self.data["id"] + + +class ParlerUrl(ParlerItem): + + def __init__(self, data): + super().__init__() + self.data = data + + def item_type(self): + return "url" + + def item_id(self): + return self.data["_id"] + + +class ParlerComment(ParlerItem): + + def __init__(self, data): + super().__init__() + self.data = data + + def item_type(self): + return "comment" + + def item_id(self): + return self.data["_id"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bc4d1a8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +requests +git+git://github.com/simon987/hexlib.git +git+git://github.com/simon987/parler-tricks.git +redis diff --git a/run.py b/run.py new file mode 100644 index 0000000..8d77087 --- /dev/null +++ b/run.py @@ -0,0 +1,8 @@ +from scanner import ParlerScanner +from state import ParlerState + +if __name__ == "__main__": + state = ParlerState() + scanner = ParlerScanner(state, threads=1) + + scanner.scan_all_items() diff --git a/scanner.py b/scanner.py new file mode 100644 index 0000000..c730963 --- /dev/null +++ b/scanner.py @@ -0,0 +1,225 @@ +import json +import os +import traceback +from queue import Queue +from threading import Thread +from time import sleep +from urllib.parse import urlencode + +import redis +import requests +from parler import Parler + +from items import ParlerProfile, ParlerPost, ParlerPostRef, ParlerUrl, ParlerComment, ParlerItem, ParlerFollower, \ + ParlerFollowee +from state import ParlerState +from util import logger + +ARC_LISTS = os.environ.get("PAF_ARC_LISTS", "arc").split(",") +REDIS_HOST = os.environ.get("PAF_REDIS_HOST", "localhost") +REDIS_PORT = os.environ.get("PAF_REDIS_PORT", 6379) +MST = os.environ.get("PAF_MST", None) +JST = os.environ.get("PAF_JST", None) + +if not MST: + print("MST & JST MISSING!") + exit(-1) + + +class SessionDebugWrapper(requests.Session): + def get(self, url, **kwargs): + retries = 3 + + while retries > 0: + retries -= 1 + try: + r = super().get(url, **kwargs, timeout=15) + logger.debug( + "GET %s <%d>" + % (url + "?" + (urlencode(kwargs["params"]) if "params" in kwargs else ""), r.status_code) + ) + if r.status_code == 429: + sleep(1) + raise Exception("rate limited") + return r + except KeyboardInterrupt as e: + raise e + except Exception as e: + logger.warning("Error with request %s: %s" % (url, str(e))) + raise Exception("Gave up request after maximum number of retries") + + +def tmp_patch_api(api): + s = SessionDebugWrapper() + s.headers['User-Agent'] = 'Parler%20Staging/545 CFNetwork/978.0.7 Darwin 18.7.0' + s.cookies['mst'] = MST + s.cookies['jst'] = JST + api.user_api.s = s + api.feed_api.s = s + api.comments_api.s = s + + +class ParlerScanner: + + def __init__(self, state: ParlerState, threads): + self._state = state + self.api: Parler = Parler(mst=MST, jst=JST) + self._threads = [] + self._q = Queue(maxsize=threads + 1) + self.rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT) + + tmp_patch_api(self.api) + + for _ in range(threads): + t = Thread(target=self._worker, daemon=True) + self._threads.append(t) + t.start() + + def _worker(self): + logger.info("Starting worker thread") + + while True: + try: + user_id, int_id = self._q.get() + if user_id is None: + break + for item in self.process_userid(user_id, int_id): + self.publish(item) + + except Exception as e: + logger.error(str(e) + ": " + traceback.format_exc()) + finally: + self._q.task_done() + logger.info("Worker thread finished") + + def _iterate_endpoint(self, func, params, resume_endpoint, resume_id, items_keys): + + current_key = self._state.resume_key(resume_endpoint, resume_id) + if not current_key: + current_key = "" + + while True: + params["startkey"] = current_key + r = func(params=params) + if not r: + return [] + + j = json.loads(r.content.decode('utf-8', 'ignore')) + for items_key in items_keys: + if items_key in j and j[items_key]: + for item in j[items_key]: + yield item, items_key + + if j["last"]: + self._state.set_resume_key(resume_endpoint, resume_id, None) + break + + self._state.set_resume_key(resume_endpoint, resume_id, current_key) + current_key = j["next"] + + def user_followers(self, user_id): + for profile, key in self._iterate_endpoint( + func=self.api.user_api.get_followers_for_user_id, + params={"id": user_id}, + resume_endpoint="followers", + resume_id=user_id, + items_keys=["followers"] + ): + yield ParlerFollower(user_id=user_id, follower_id=profile["id"]) + + def user_followees(self, user_id): + for profile, key in self._iterate_endpoint( + func=self.api.user_api.get_following_for_user_id, + params={"id": user_id}, + resume_endpoint="followees", + resume_id=user_id, + items_keys=["followees"] + ): + yield ParlerFollowee(user_id=user_id, followee_id=profile["id"]) + + def user_posts(self, user_id): + for item, key in self._iterate_endpoint( + func=self.api.feed_api.get_users_feed, + params={"id": user_id}, + resume_endpoint="posts", + resume_id=user_id, + # Also available: 'users' + items_keys=["posts", "postRefs", "urls"] + ): + if key == "posts": + yield ParlerPost(data=item) + elif key == "postRefs": + yield ParlerPostRef(data=item) + elif key == "urls": + yield ParlerUrl(data=item) + + def post_comments(self, post_id): + for item, key in self._iterate_endpoint( + func=self.api.comments_api.get_comments, + params={"id": post_id, "reverse": True}, + resume_endpoint="comments", + resume_id=post_id, + # Also available: "users", "post", "postRefs" + items_keys=["comments"] + ): + yield ParlerComment(data=item) + + def _get_user_id_hash(self, int_id): + s = self.api.user_api.s + r = s.get(f"https://api.parler.com/v3/uuidConversion/user/{int_id}") + if "No resource for URL" in r.text: + return None + return r.text.strip() + + def fetch_profile(self, user_id, int_id): + r = self.api.user_api.get_profile_for_user(params={ + "id": user_id + }) + + if not r: + return None + + return ParlerProfile(data=json.loads(r.content.decode('utf-8', 'ignore')), int_id=int_id) + + def user_ids(self): + current_key = self._state.resume_key("users", "it") + current_key = int(current_key) if current_key else 0 + + for int_id in range(current_key, 15000000): + user_id = self._get_user_id_hash(int_id) + if user_id and not self._state.has_visited_user(user_id): + yield user_id, int_id + self._state.set_resume_key("users", "it", int_id) + + def process_userid(self, user_id, int_id): + profile = self.fetch_profile(user_id, int_id) + + yield profile + + for follow in self.user_followers(user_id): + yield follow + for follow in self.user_followees(user_id): + yield follow + + for post in self.user_posts(user_id): + yield post + + if (post.item_type() == "post" or post.item_type() == "postref") \ + and not self._state.has_visited_post(post): + for comment in self.post_comments(post.item_id()): + yield comment + self._state.mark_visited_post(post.item_id()) + self._state.mark_visited_user(user_id) + + def publish(self, item: ParlerItem): + message = item.serialize() + item_type = item.item_type() + + routing_key = "parler.%s.x" % (item_type,) + + for arc in ARC_LISTS: + self.rdb.lpush(arc + "." + routing_key, message) + + def scan_all_items(self): + for user_id, int_id in self.user_ids(): + self._q.put((user_id, int_id)) diff --git a/state.py b/state.py new file mode 100644 index 0000000..16254c9 --- /dev/null +++ b/state.py @@ -0,0 +1,38 @@ +import os + +from hexlib.db import VolatileBooleanState, VolatileState + +from items import ParlerItem + +REDIS_HOST = os.environ.get("PAF_REDIS_HOST", "localhost") +REDIS_PORT = os.environ.get("PAF_REDIS_PORT", 6379) + + +class ParlerState: + + def __init__(self): + self._visited = VolatileBooleanState("parler", host=REDIS_HOST, port=REDIS_PORT) + self._resume = VolatileState("parler", host=REDIS_HOST, port=REDIS_PORT) + + def resume_key(self, endpoint, user_id): + return self._resume[endpoint][user_id] + + def set_resume_key(self, endpoint, user_id, key): + if key is None: + del self._resume[endpoint][user_id] + else: + self._resume[endpoint][user_id] = key + + def has_visited_post(self, post: ParlerItem): + if post.data["comments"] == "0": + return True + return self._visited["post"][post.item_id()] + + def mark_visited_post(self, post_id): + self._visited["post"][post_id] = True + + def has_visited_user(self, user_id): + return self._visited["user"][user_id] + + def mark_visited_user(self, user_id): + self._visited["user"][user_id] = True diff --git a/util.py b/util.py new file mode 100644 index 0000000..80abb8e --- /dev/null +++ b/util.py @@ -0,0 +1,11 @@ +import logging +import sys +from logging import StreamHandler + +logger = logging.getLogger("default") +logger.setLevel(logging.DEBUG) + +formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') +for h in logger.handlers: + logger.removeHandler(h) +logger.addHandler(StreamHandler(sys.stdout))