diff --git a/run.py b/run.py index 8d77087..00c5c85 100644 --- a/run.py +++ b/run.py @@ -3,6 +3,6 @@ from state import ParlerState if __name__ == "__main__": state = ParlerState() - scanner = ParlerScanner(state, threads=1) + scanner = ParlerScanner(state) scanner.scan_all_items() diff --git a/scanner.py b/scanner.py index c730963..927c620 100644 --- a/scanner.py +++ b/scanner.py @@ -18,8 +18,9 @@ from util import logger ARC_LISTS = os.environ.get("PAF_ARC_LISTS", "arc").split(",") REDIS_HOST = os.environ.get("PAF_REDIS_HOST", "localhost") REDIS_PORT = os.environ.get("PAF_REDIS_PORT", 6379) -MST = os.environ.get("PAF_MST", None) -JST = os.environ.get("PAF_JST", None) +MST = os.environ.get("PAF_MST", "").split(",") +JST = os.environ.get("PAF_JST", "").split(",") +THREADS = len(MST) if not MST: print("MST & JST MISSING!") @@ -49,11 +50,11 @@ class SessionDebugWrapper(requests.Session): raise Exception("Gave up request after maximum number of retries") -def tmp_patch_api(api): +def tmp_patch_api(api, mst, jst): s = SessionDebugWrapper() s.headers['User-Agent'] = 'Parler%20Staging/545 CFNetwork/978.0.7 Darwin 18.7.0' - s.cookies['mst'] = MST - s.cookies['jst'] = JST + s.cookies['mst'] = mst + s.cookies['jst'] = jst api.user_api.s = s api.feed_api.s = s api.comments_api.s = s @@ -61,29 +62,32 @@ def tmp_patch_api(api): class ParlerScanner: - def __init__(self, state: ParlerState, threads): + def __init__(self, state: ParlerState): self._state = state - self.api: Parler = Parler(mst=MST, jst=JST) + self.main_api = Parler(mst=MST[0], jst=JST[0]) + tmp_patch_api(self.main_api, MST[0], JST[0]) self._threads = [] - self._q = Queue(maxsize=threads + 1) + self._q = Queue(maxsize=THREADS + 1) self.rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT) - tmp_patch_api(self.api) - - for _ in range(threads): - t = Thread(target=self._worker, daemon=True) + for i in range(THREADS): + t = Thread(target=self._worker, args=(i,), daemon=True) self._threads.append(t) + t.name = "wrk_%d" % (i,) t.start() - def _worker(self): - logger.info("Starting worker thread") + def _worker(self, worker_id): + logger.info("Starting worker thread (%d)" % (worker_id,)) + + api = Parler(mst=MST[worker_id], jst=JST[worker_id]) + tmp_patch_api(api, MST[worker_id], JST[worker_id]) while True: try: user_id, int_id = self._q.get() if user_id is None: break - for item in self.process_userid(user_id, int_id): + for item in self.process_userid(api, user_id, int_id): self.publish(item) except Exception as e: @@ -117,9 +121,9 @@ class ParlerScanner: self._state.set_resume_key(resume_endpoint, resume_id, current_key) current_key = j["next"] - def user_followers(self, user_id): + def user_followers(self, api, user_id): for profile, key in self._iterate_endpoint( - func=self.api.user_api.get_followers_for_user_id, + func=api.user_api.get_followers_for_user_id, params={"id": user_id}, resume_endpoint="followers", resume_id=user_id, @@ -127,9 +131,9 @@ class ParlerScanner: ): yield ParlerFollower(user_id=user_id, follower_id=profile["id"]) - def user_followees(self, user_id): + def user_followees(self, api, user_id): for profile, key in self._iterate_endpoint( - func=self.api.user_api.get_following_for_user_id, + func=api.user_api.get_following_for_user_id, params={"id": user_id}, resume_endpoint="followees", resume_id=user_id, @@ -137,9 +141,9 @@ class ParlerScanner: ): yield ParlerFollowee(user_id=user_id, followee_id=profile["id"]) - def user_posts(self, user_id): + def user_posts(self, api, user_id): for item, key in self._iterate_endpoint( - func=self.api.feed_api.get_users_feed, + func=api.feed_api.get_users_feed, params={"id": user_id}, resume_endpoint="posts", resume_id=user_id, @@ -153,9 +157,9 @@ class ParlerScanner: elif key == "urls": yield ParlerUrl(data=item) - def post_comments(self, post_id): + def post_comments(self, api, post_id): for item, key in self._iterate_endpoint( - func=self.api.comments_api.get_comments, + func=api.comments_api.get_comments, params={"id": post_id, "reverse": True}, resume_endpoint="comments", resume_id=post_id, @@ -164,15 +168,15 @@ class ParlerScanner: ): yield ParlerComment(data=item) - def _get_user_id_hash(self, int_id): - s = self.api.user_api.s + def _get_user_id_hash(self, api, int_id): + s = api.user_api.s r = s.get(f"https://api.parler.com/v3/uuidConversion/user/{int_id}") if "No resource for URL" in r.text: return None return r.text.strip() - def fetch_profile(self, user_id, int_id): - r = self.api.user_api.get_profile_for_user(params={ + def fetch_profile(self, api, user_id, int_id): + r = api.user_api.get_profile_for_user(params={ "id": user_id }) @@ -181,35 +185,34 @@ class ParlerScanner: return ParlerProfile(data=json.loads(r.content.decode('utf-8', 'ignore')), int_id=int_id) - def user_ids(self): - current_key = self._state.resume_key("users", "it") - current_key = int(current_key) if current_key else 0 + def user_ids(self, api): + current_key = 0 for int_id in range(current_key, 15000000): - user_id = self._get_user_id_hash(int_id) - if user_id and not self._state.has_visited_user(user_id): - yield user_id, int_id - self._state.set_resume_key("users", "it", int_id) + if not self._state.has_visited_user(int_id): + user_id = self._get_user_id_hash(api, int_id) + if user_id: + yield user_id, int_id - def process_userid(self, user_id, int_id): - profile = self.fetch_profile(user_id, int_id) + def process_userid(self, api, user_id, int_id): + profile = self.fetch_profile(api, user_id, int_id) yield profile - for follow in self.user_followers(user_id): + for follow in self.user_followers(api, user_id): yield follow - for follow in self.user_followees(user_id): + for follow in self.user_followees(api, user_id): yield follow - for post in self.user_posts(user_id): + for post in self.user_posts(api, user_id): yield post if (post.item_type() == "post" or post.item_type() == "postref") \ and not self._state.has_visited_post(post): - for comment in self.post_comments(post.item_id()): + for comment in self.post_comments(api, post.item_id()): yield comment self._state.mark_visited_post(post.item_id()) - self._state.mark_visited_user(user_id) + self._state.mark_visited_user(int_id) def publish(self, item: ParlerItem): message = item.serialize() @@ -221,5 +224,5 @@ class ParlerScanner: self.rdb.lpush(arc + "." + routing_key, message) def scan_all_items(self): - for user_id, int_id in self.user_ids(): + for user_id, int_id in self.user_ids(self.main_api): self._q.put((user_id, int_id)) diff --git a/util.py b/util.py index 80abb8e..a6a6c1b 100644 --- a/util.py +++ b/util.py @@ -5,7 +5,9 @@ from logging import StreamHandler logger = logging.getLogger("default") logger.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') +formatter = logging.Formatter('%(asctime)s <%(threadName)s> %(levelname)-5s %(message)s') for h in logger.handlers: logger.removeHandler(h) -logger.addHandler(StreamHandler(sys.stdout)) +handler = StreamHandler(sys.stdout) +handler.formatter = formatter +logger.addHandler(handler)