import json import os import traceback from queue import Queue from threading import Thread from time import sleep from urllib.parse import urlencode import redis import requests from parler import Parler from items import ParlerProfile, ParlerPost, ParlerPostRef, ParlerUrl, ParlerComment, ParlerItem, ParlerFollower, \ ParlerFollowee from state import ParlerState from util import logger ARC_LISTS = os.environ.get("PAF_ARC_LISTS", "arc").split(",") REDIS_HOST = os.environ.get("PAF_REDIS_HOST", "localhost") REDIS_PORT = os.environ.get("PAF_REDIS_PORT", 6379) MST = os.environ.get("PAF_MST", "").split(",") JST = os.environ.get("PAF_JST", "").split(",") THREADS = len(MST) if not MST: print("MST & JST MISSING!") exit(-1) class SessionDebugWrapper(requests.Session): def get(self, url, **kwargs): retries = 4 while retries > 0: retries -= 1 try: r = super().get(url, **kwargs, timeout=45) logger.debug( "GET %s <%d>" % (url + "?" + (urlencode(kwargs["params"]) if "params" in kwargs else ""), r.status_code) ) if r.status_code == 429: sleep(15) raise Exception("rate limited") if r.status_code == 502: raise Exception("Server error") return r except KeyboardInterrupt as e: raise e except Exception as e: logger.warning( "%s: %s" % (url + "?" + (urlencode(kwargs["params"]) if "params" in kwargs else ""), str(e)) ) sleep(10) raise Exception("Gave up request after maximum number of retries") def tmp_patch_api(api, mst, jst): s = SessionDebugWrapper() s.headers['User-Agent'] = 'Parler%20Staging/545 CFNetwork/978.0.7 Darwin 18.7.0' s.cookies['mst'] = mst s.cookies['jst'] = jst api.user_api.s = s api.feed_api.s = s api.comments_api.s = s class ParlerScanner: def __init__(self, state: ParlerState): self._state = state self.main_api = Parler(mst=MST[0], jst=JST[0]) tmp_patch_api(self.main_api, MST[0], JST[0]) self._threads = [] self._q = Queue(maxsize=THREADS + 1) self.rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT) for i in range(THREADS): t = Thread(target=self._worker, args=(i,), daemon=True) self._threads.append(t) t.name = "wrk_%d" % (i,) t.start() def _worker(self, worker_id): logger.info("Starting worker thread (%d)" % (worker_id,)) api = Parler(mst=MST[worker_id], jst=JST[worker_id]) tmp_patch_api(api, MST[worker_id], JST[worker_id]) while True: try: user_id, int_id = self._q.get() if user_id is None: break for item in self.process_userid(api, user_id, int_id): self.publish(item) except Exception as e: logger.error(str(e) + ": " + traceback.format_exc()) finally: self._q.task_done() logger.info("Worker thread finished") def _iterate_endpoint(self, func, params, resume_endpoint, resume_id, items_keys): current_key = self._state.resume_key(resume_endpoint, resume_id) if not current_key: current_key = "" while True: params["startkey"] = current_key r = func(params=params) if not r: return [] j = json.loads(r.content.decode('utf-8', 'ignore')) for items_key in items_keys: if items_key in j and j[items_key]: for item in j[items_key]: yield item, items_key, current_key self._state.set_resume_key(resume_endpoint, resume_id, current_key) if j["last"]: break current_key = j["next"] def user_followers(self, api, user_id): for profile, key, it_index in self._iterate_endpoint( func=api.user_api.get_followers_for_user_id, params={"id": user_id}, resume_endpoint="followers", resume_id=user_id, items_keys=["followers"] ): yield ParlerFollower(user_id=user_id, follower_id=profile["id"], approx_date=it_index) def user_followees(self, api, user_id): for profile, key, it_index in self._iterate_endpoint( func=api.user_api.get_following_for_user_id, params={"id": user_id}, resume_endpoint="followees", resume_id=user_id, items_keys=["followees"] ): yield ParlerFollowee(user_id=user_id, followee_id=profile["id"], approx_date=it_index) def user_posts(self, api, user_id): for item, key, it_index in self._iterate_endpoint( func=api.feed_api.get_users_feed, params={"id": user_id}, resume_endpoint="posts", resume_id=user_id, # Also available: 'users' items_keys=["posts", "postRefs", "urls"] ): if key == "posts": yield ParlerPost(data=item) elif key == "postRefs": yield ParlerPostRef(post_id=item["_id"], user_id=user_id, approx_date=it_index) elif key == "urls": yield ParlerUrl(data=item) def post_comments(self, api, post_id): for item, key, _ in self._iterate_endpoint( func=api.comments_api.get_comments, params={"id": post_id, "reverse": "true"}, resume_endpoint="comments", resume_id=post_id, # Also available: "users", "post", "postRefs" items_keys=["comments"] ): yield ParlerComment(data=item) def _get_user_id_hash(self, api, int_id): s = api.user_api.s r = s.get(f"https://api.parler.com/v3/uuidConversion/user/{int_id}") if "No resource for URL" in r.text: return None return r.text.strip() def fetch_profile(self, api, user_id, int_id): r = api.user_api.get_profile_for_user(params={ "id": user_id }) if not r: return None return ParlerProfile(data=json.loads(r.content.decode('utf-8', 'ignore')), int_id=int_id) def user_ids(self, api): for int_id in range(0, 15000000): if not self._state.has_visited_user(int_id): user_id = self._get_user_id_hash(api, int_id) if user_id: yield user_id, int_id else: self._state.mark_visited_user(int_id) def process_userid(self, api, user_id, int_id): profile = self.fetch_profile(api, user_id, int_id) yield profile for follow in self.user_followers(api, user_id): yield follow for follow in self.user_followees(api, user_id): yield follow for post in self.user_posts(api, user_id): yield post if post.item_type() == "post" and not self._state.has_visited_post(post): for comment in self.post_comments(api, post.item_id()): yield comment self._state.mark_visited_post(post.item_id()) self._state.mark_visited_user(int_id) def publish(self, item: ParlerItem): message = item.serialize() item_type = item.item_type() routing_key = "parler.%s.x" % (item_type,) for arc in ARC_LISTS: self.rdb.lpush(arc + "." + routing_key, message) def scan_all_items(self): for user_id, int_id in self.user_ids(self.main_api): self._q.put((user_id, int_id))