parler_feed/scanner.py
2020-12-27 14:25:27 -05:00

226 lines
7.4 KiB
Python

import json
import os
import traceback
from queue import Queue
from threading import Thread
from time import sleep
from urllib.parse import urlencode
import redis
import requests
from parler import Parler
from items import ParlerProfile, ParlerPost, ParlerPostRef, ParlerUrl, ParlerComment, ParlerItem, ParlerFollower, \
ParlerFollowee
from state import ParlerState
from util import logger
ARC_LISTS = os.environ.get("PAF_ARC_LISTS", "arc").split(",")
REDIS_HOST = os.environ.get("PAF_REDIS_HOST", "localhost")
REDIS_PORT = os.environ.get("PAF_REDIS_PORT", 6379)
MST = os.environ.get("PAF_MST", None)
JST = os.environ.get("PAF_JST", None)
if not MST:
print("MST & JST MISSING!")
exit(-1)
class SessionDebugWrapper(requests.Session):
def get(self, url, **kwargs):
retries = 3
while retries > 0:
retries -= 1
try:
r = super().get(url, **kwargs, timeout=15)
logger.debug(
"GET %s <%d>"
% (url + "?" + (urlencode(kwargs["params"]) if "params" in kwargs else ""), r.status_code)
)
if r.status_code == 429:
sleep(1)
raise Exception("rate limited")
return r
except KeyboardInterrupt as e:
raise e
except Exception as e:
logger.warning("Error with request %s: %s" % (url, str(e)))
raise Exception("Gave up request after maximum number of retries")
def tmp_patch_api(api):
s = SessionDebugWrapper()
s.headers['User-Agent'] = 'Parler%20Staging/545 CFNetwork/978.0.7 Darwin 18.7.0'
s.cookies['mst'] = MST
s.cookies['jst'] = JST
api.user_api.s = s
api.feed_api.s = s
api.comments_api.s = s
class ParlerScanner:
def __init__(self, state: ParlerState, threads):
self._state = state
self.api: Parler = Parler(mst=MST, jst=JST)
self._threads = []
self._q = Queue(maxsize=threads + 1)
self.rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
tmp_patch_api(self.api)
for _ in range(threads):
t = Thread(target=self._worker, daemon=True)
self._threads.append(t)
t.start()
def _worker(self):
logger.info("Starting worker thread")
while True:
try:
user_id, int_id = self._q.get()
if user_id is None:
break
for item in self.process_userid(user_id, int_id):
self.publish(item)
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())
finally:
self._q.task_done()
logger.info("Worker thread finished")
def _iterate_endpoint(self, func, params, resume_endpoint, resume_id, items_keys):
current_key = self._state.resume_key(resume_endpoint, resume_id)
if not current_key:
current_key = ""
while True:
params["startkey"] = current_key
r = func(params=params)
if not r:
return []
j = json.loads(r.content.decode('utf-8', 'ignore'))
for items_key in items_keys:
if items_key in j and j[items_key]:
for item in j[items_key]:
yield item, items_key
if j["last"]:
self._state.set_resume_key(resume_endpoint, resume_id, None)
break
self._state.set_resume_key(resume_endpoint, resume_id, current_key)
current_key = j["next"]
def user_followers(self, user_id):
for profile, key in self._iterate_endpoint(
func=self.api.user_api.get_followers_for_user_id,
params={"id": user_id},
resume_endpoint="followers",
resume_id=user_id,
items_keys=["followers"]
):
yield ParlerFollower(user_id=user_id, follower_id=profile["id"])
def user_followees(self, user_id):
for profile, key in self._iterate_endpoint(
func=self.api.user_api.get_following_for_user_id,
params={"id": user_id},
resume_endpoint="followees",
resume_id=user_id,
items_keys=["followees"]
):
yield ParlerFollowee(user_id=user_id, followee_id=profile["id"])
def user_posts(self, user_id):
for item, key in self._iterate_endpoint(
func=self.api.feed_api.get_users_feed,
params={"id": user_id},
resume_endpoint="posts",
resume_id=user_id,
# Also available: 'users'
items_keys=["posts", "postRefs", "urls"]
):
if key == "posts":
yield ParlerPost(data=item)
elif key == "postRefs":
yield ParlerPostRef(data=item)
elif key == "urls":
yield ParlerUrl(data=item)
def post_comments(self, post_id):
for item, key in self._iterate_endpoint(
func=self.api.comments_api.get_comments,
params={"id": post_id, "reverse": True},
resume_endpoint="comments",
resume_id=post_id,
# Also available: "users", "post", "postRefs"
items_keys=["comments"]
):
yield ParlerComment(data=item)
def _get_user_id_hash(self, int_id):
s = self.api.user_api.s
r = s.get(f"https://api.parler.com/v3/uuidConversion/user/{int_id}")
if "No resource for URL" in r.text:
return None
return r.text.strip()
def fetch_profile(self, user_id, int_id):
r = self.api.user_api.get_profile_for_user(params={
"id": user_id
})
if not r:
return None
return ParlerProfile(data=json.loads(r.content.decode('utf-8', 'ignore')), int_id=int_id)
def user_ids(self):
current_key = self._state.resume_key("users", "it")
current_key = int(current_key) if current_key else 0
for int_id in range(current_key, 15000000):
user_id = self._get_user_id_hash(int_id)
if user_id and not self._state.has_visited_user(user_id):
yield user_id, int_id
self._state.set_resume_key("users", "it", int_id)
def process_userid(self, user_id, int_id):
profile = self.fetch_profile(user_id, int_id)
yield profile
for follow in self.user_followers(user_id):
yield follow
for follow in self.user_followees(user_id):
yield follow
for post in self.user_posts(user_id):
yield post
if (post.item_type() == "post" or post.item_type() == "postref") \
and not self._state.has_visited_post(post):
for comment in self.post_comments(post.item_id()):
yield comment
self._state.mark_visited_post(post.item_id())
self._state.mark_visited_user(user_id)
def publish(self, item: ParlerItem):
message = item.serialize()
item_type = item.item_type()
routing_key = "parler.%s.x" % (item_type,)
for arc in ARC_LISTS:
self.rdb.lpush(arc + "." + routing_key, message)
def scan_all_items(self):
for user_id, int_id in self.user_ids():
self._q.put((user_id, int_id))