threading with one thread per account

This commit is contained in:
simon987 2020-12-27 16:01:47 -05:00
parent 880ae0014e
commit de97e7bc23
3 changed files with 50 additions and 45 deletions

2
run.py
View File

@ -3,6 +3,6 @@ from state import ParlerState
if __name__ == "__main__":
state = ParlerState()
scanner = ParlerScanner(state, threads=1)
scanner = ParlerScanner(state)
scanner.scan_all_items()

View File

@ -18,8 +18,9 @@ from util import logger
ARC_LISTS = os.environ.get("PAF_ARC_LISTS", "arc").split(",")
REDIS_HOST = os.environ.get("PAF_REDIS_HOST", "localhost")
REDIS_PORT = os.environ.get("PAF_REDIS_PORT", 6379)
MST = os.environ.get("PAF_MST", None)
JST = os.environ.get("PAF_JST", None)
MST = os.environ.get("PAF_MST", "").split(",")
JST = os.environ.get("PAF_JST", "").split(",")
THREADS = len(MST)
if not MST:
print("MST & JST MISSING!")
@ -49,11 +50,11 @@ class SessionDebugWrapper(requests.Session):
raise Exception("Gave up request after maximum number of retries")
def tmp_patch_api(api):
def tmp_patch_api(api, mst, jst):
s = SessionDebugWrapper()
s.headers['User-Agent'] = 'Parler%20Staging/545 CFNetwork/978.0.7 Darwin 18.7.0'
s.cookies['mst'] = MST
s.cookies['jst'] = JST
s.cookies['mst'] = mst
s.cookies['jst'] = jst
api.user_api.s = s
api.feed_api.s = s
api.comments_api.s = s
@ -61,29 +62,32 @@ def tmp_patch_api(api):
class ParlerScanner:
def __init__(self, state: ParlerState, threads):
def __init__(self, state: ParlerState):
self._state = state
self.api: Parler = Parler(mst=MST, jst=JST)
self.main_api = Parler(mst=MST[0], jst=JST[0])
tmp_patch_api(self.main_api, MST[0], JST[0])
self._threads = []
self._q = Queue(maxsize=threads + 1)
self._q = Queue(maxsize=THREADS + 1)
self.rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
tmp_patch_api(self.api)
for _ in range(threads):
t = Thread(target=self._worker, daemon=True)
for i in range(THREADS):
t = Thread(target=self._worker, args=(i,), daemon=True)
self._threads.append(t)
t.name = "wrk_%d" % (i,)
t.start()
def _worker(self):
logger.info("Starting worker thread")
def _worker(self, worker_id):
logger.info("Starting worker thread (%d)" % (worker_id,))
api = Parler(mst=MST[worker_id], jst=JST[worker_id])
tmp_patch_api(api, MST[worker_id], JST[worker_id])
while True:
try:
user_id, int_id = self._q.get()
if user_id is None:
break
for item in self.process_userid(user_id, int_id):
for item in self.process_userid(api, user_id, int_id):
self.publish(item)
except Exception as e:
@ -117,9 +121,9 @@ class ParlerScanner:
self._state.set_resume_key(resume_endpoint, resume_id, current_key)
current_key = j["next"]
def user_followers(self, user_id):
def user_followers(self, api, user_id):
for profile, key in self._iterate_endpoint(
func=self.api.user_api.get_followers_for_user_id,
func=api.user_api.get_followers_for_user_id,
params={"id": user_id},
resume_endpoint="followers",
resume_id=user_id,
@ -127,9 +131,9 @@ class ParlerScanner:
):
yield ParlerFollower(user_id=user_id, follower_id=profile["id"])
def user_followees(self, user_id):
def user_followees(self, api, user_id):
for profile, key in self._iterate_endpoint(
func=self.api.user_api.get_following_for_user_id,
func=api.user_api.get_following_for_user_id,
params={"id": user_id},
resume_endpoint="followees",
resume_id=user_id,
@ -137,9 +141,9 @@ class ParlerScanner:
):
yield ParlerFollowee(user_id=user_id, followee_id=profile["id"])
def user_posts(self, user_id):
def user_posts(self, api, user_id):
for item, key in self._iterate_endpoint(
func=self.api.feed_api.get_users_feed,
func=api.feed_api.get_users_feed,
params={"id": user_id},
resume_endpoint="posts",
resume_id=user_id,
@ -153,9 +157,9 @@ class ParlerScanner:
elif key == "urls":
yield ParlerUrl(data=item)
def post_comments(self, post_id):
def post_comments(self, api, post_id):
for item, key in self._iterate_endpoint(
func=self.api.comments_api.get_comments,
func=api.comments_api.get_comments,
params={"id": post_id, "reverse": True},
resume_endpoint="comments",
resume_id=post_id,
@ -164,15 +168,15 @@ class ParlerScanner:
):
yield ParlerComment(data=item)
def _get_user_id_hash(self, int_id):
s = self.api.user_api.s
def _get_user_id_hash(self, api, int_id):
s = api.user_api.s
r = s.get(f"https://api.parler.com/v3/uuidConversion/user/{int_id}")
if "No resource for URL" in r.text:
return None
return r.text.strip()
def fetch_profile(self, user_id, int_id):
r = self.api.user_api.get_profile_for_user(params={
def fetch_profile(self, api, user_id, int_id):
r = api.user_api.get_profile_for_user(params={
"id": user_id
})
@ -181,35 +185,34 @@ class ParlerScanner:
return ParlerProfile(data=json.loads(r.content.decode('utf-8', 'ignore')), int_id=int_id)
def user_ids(self):
current_key = self._state.resume_key("users", "it")
current_key = int(current_key) if current_key else 0
def user_ids(self, api):
current_key = 0
for int_id in range(current_key, 15000000):
user_id = self._get_user_id_hash(int_id)
if user_id and not self._state.has_visited_user(user_id):
yield user_id, int_id
self._state.set_resume_key("users", "it", int_id)
if not self._state.has_visited_user(int_id):
user_id = self._get_user_id_hash(api, int_id)
if user_id:
yield user_id, int_id
def process_userid(self, user_id, int_id):
profile = self.fetch_profile(user_id, int_id)
def process_userid(self, api, user_id, int_id):
profile = self.fetch_profile(api, user_id, int_id)
yield profile
for follow in self.user_followers(user_id):
for follow in self.user_followers(api, user_id):
yield follow
for follow in self.user_followees(user_id):
for follow in self.user_followees(api, user_id):
yield follow
for post in self.user_posts(user_id):
for post in self.user_posts(api, user_id):
yield post
if (post.item_type() == "post" or post.item_type() == "postref") \
and not self._state.has_visited_post(post):
for comment in self.post_comments(post.item_id()):
for comment in self.post_comments(api, post.item_id()):
yield comment
self._state.mark_visited_post(post.item_id())
self._state.mark_visited_user(user_id)
self._state.mark_visited_user(int_id)
def publish(self, item: ParlerItem):
message = item.serialize()
@ -221,5 +224,5 @@ class ParlerScanner:
self.rdb.lpush(arc + "." + routing_key, message)
def scan_all_items(self):
for user_id, int_id in self.user_ids():
for user_id, int_id in self.user_ids(self.main_api):
self._q.put((user_id, int_id))

View File

@ -5,7 +5,9 @@ from logging import StreamHandler
logger = logging.getLogger("default")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
formatter = logging.Formatter('%(asctime)s <%(threadName)s> %(levelname)-5s %(message)s')
for h in logger.handlers:
logger.removeHandler(h)
logger.addHandler(StreamHandler(sys.stdout))
handler = StreamHandler(sys.stdout)
handler.formatter = formatter
logger.addHandler(handler)