parler_feed/scanner.py

234 lines
7.8 KiB
Python

import json
import os
import traceback
from queue import Queue
from threading import Thread
from time import sleep
from urllib.parse import urlencode
import redis
import requests
from parler import Parler
from items import ParlerProfile, ParlerPost, ParlerPostRef, ParlerUrl, ParlerComment, ParlerItem, ParlerFollower, \
ParlerFollowee
from state import ParlerState
from util import logger
ARC_LISTS = os.environ.get("PAF_ARC_LISTS", "arc").split(",")
REDIS_HOST = os.environ.get("PAF_REDIS_HOST", "localhost")
REDIS_PORT = os.environ.get("PAF_REDIS_PORT", 6379)
MST = os.environ.get("PAF_MST", "").split(",")
JST = os.environ.get("PAF_JST", "").split(",")
THREADS = len(MST)
if not MST:
print("MST & JST MISSING!")
exit(-1)
class SessionDebugWrapper(requests.Session):
def get(self, url, **kwargs):
retries = 4
while retries > 0:
retries -= 1
try:
r = super().get(url, **kwargs, timeout=45)
logger.debug(
"GET %s <%d>"
% (url + "?" + (urlencode(kwargs["params"]) if "params" in kwargs else ""), r.status_code)
)
if r.status_code == 429:
sleep(15)
raise Exception("rate limited")
if r.status_code == 502:
raise Exception("Server error")
return r
except KeyboardInterrupt as e:
raise e
except Exception as e:
logger.warning(
"%s: %s"
% (url + "?" + (urlencode(kwargs["params"]) if "params" in kwargs else ""), str(e))
)
sleep(10)
raise Exception("Gave up request after maximum number of retries")
def tmp_patch_api(api, mst, jst):
s = SessionDebugWrapper()
s.headers['User-Agent'] = 'Parler%20Staging/545 CFNetwork/978.0.7 Darwin 18.7.0'
s.cookies['mst'] = mst
s.cookies['jst'] = jst
api.user_api.s = s
api.feed_api.s = s
api.comments_api.s = s
class ParlerScanner:
def __init__(self, state: ParlerState):
self._state = state
self.main_api = Parler(mst=MST[0], jst=JST[0])
tmp_patch_api(self.main_api, MST[0], JST[0])
self._threads = []
self._q = Queue(maxsize=THREADS + 1)
self.rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
for i in range(THREADS):
t = Thread(target=self._worker, args=(i,), daemon=True)
self._threads.append(t)
t.name = "wrk_%d" % (i,)
t.start()
def _worker(self, worker_id):
logger.info("Starting worker thread (%d)" % (worker_id,))
api = Parler(mst=MST[worker_id], jst=JST[worker_id])
tmp_patch_api(api, MST[worker_id], JST[worker_id])
while True:
try:
user_id, int_id = self._q.get()
if user_id is None:
break
for item in self.process_userid(api, user_id, int_id):
self.publish(item)
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())
finally:
self._q.task_done()
logger.info("Worker thread finished")
def _iterate_endpoint(self, func, params, resume_endpoint, resume_id, items_keys):
current_key = self._state.resume_key(resume_endpoint, resume_id)
if not current_key:
current_key = ""
while True:
params["startkey"] = current_key
r = func(params=params)
if not r:
return []
j = json.loads(r.content.decode('utf-8', 'ignore'))
for items_key in items_keys:
if items_key in j and j[items_key]:
for item in j[items_key]:
yield item, items_key, current_key
self._state.set_resume_key(resume_endpoint, resume_id, current_key)
if j["last"]:
break
current_key = j["next"]
def user_followers(self, api, user_id):
for profile, key, it_index in self._iterate_endpoint(
func=api.user_api.get_followers_for_user_id,
params={"id": user_id},
resume_endpoint="followers",
resume_id=user_id,
items_keys=["followers"]
):
yield ParlerFollower(user_id=user_id, follower_id=profile["id"], approx_date=it_index)
def user_followees(self, api, user_id):
for profile, key, it_index in self._iterate_endpoint(
func=api.user_api.get_following_for_user_id,
params={"id": user_id},
resume_endpoint="followees",
resume_id=user_id,
items_keys=["followees"]
):
yield ParlerFollowee(user_id=user_id, followee_id=profile["id"], approx_date=it_index)
def user_posts(self, api, user_id):
for item, key, it_index in self._iterate_endpoint(
func=api.feed_api.get_users_feed,
params={"id": user_id},
resume_endpoint="posts",
resume_id=user_id,
# Also available: 'users'
items_keys=["posts", "postRefs", "urls"]
):
if key == "posts":
yield ParlerPost(data=item)
elif key == "postRefs":
yield ParlerPostRef(post_id=item["_id"], user_id=user_id, approx_date=it_index)
elif key == "urls":
yield ParlerUrl(data=item)
def post_comments(self, api, post_id):
for item, key, _ in self._iterate_endpoint(
func=api.comments_api.get_comments,
params={"id": post_id, "reverse": "true"},
resume_endpoint="comments",
resume_id=post_id,
# Also available: "users", "post", "postRefs"
items_keys=["comments"]
):
yield ParlerComment(data=item)
def _get_user_id_hash(self, api, int_id):
s = api.user_api.s
r = s.get(f"https://api.parler.com/v3/uuidConversion/user/{int_id}")
if "No resource for URL" in r.text:
return None
return r.text.strip()
def fetch_profile(self, api, user_id, int_id):
r = api.user_api.get_profile_for_user(params={
"id": user_id
})
if not r:
return None
return ParlerProfile(data=json.loads(r.content.decode('utf-8', 'ignore')), int_id=int_id)
def user_ids(self, api):
for int_id in range(0, 15000000):
if not self._state.has_visited_user(int_id):
user_id = self._get_user_id_hash(api, int_id)
if user_id:
yield user_id, int_id
else:
self._state.mark_visited_user(int_id)
def process_userid(self, api, user_id, int_id):
profile = self.fetch_profile(api, user_id, int_id)
yield profile
for follow in self.user_followers(api, user_id):
yield follow
for follow in self.user_followees(api, user_id):
yield follow
for post in self.user_posts(api, user_id):
yield post
if post.item_type() == "post" and not self._state.has_visited_post(post):
for comment in self.post_comments(api, post.item_id()):
yield comment
self._state.mark_visited_post(post.item_id())
self._state.mark_visited_user(int_id)
def publish(self, item: ParlerItem):
message = item.serialize()
item_type = item.item_type()
routing_key = "parler.%s.x" % (item_type,)
for arc in ARC_LISTS:
self.rdb.lpush(arc + "." + routing_key, message)
def scan_all_items(self):
for user_id, int_id in self.user_ids(self.main_api):
self._q.put((user_id, int_id))