From 11901b7d6c6bb7c07371390ce535e72ce99882cd Mon Sep 17 00:00:00 2001 From: simon987 Date: Thu, 31 Dec 2020 08:49:02 -0500 Subject: [PATCH] Fix postref, add approx date --- items.py | 16 +++++++++++----- scanner.py | 35 +++++++++++++++++++++-------------- util.py | 2 +- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/items.py b/items.py index c330537..060ebd3 100644 --- a/items.py +++ b/items.py @@ -35,11 +35,12 @@ class ParlerProfile(ParlerItem): class ParlerFollower(ParlerItem): - def __init__(self, user_id, follower_id): + def __init__(self, user_id, follower_id, approx_date): super().__init__() self.data = { "user_id": user_id, "follower_id": follower_id, + "approx_date": approx_date, } def item_type(self): @@ -51,11 +52,12 @@ class ParlerFollower(ParlerItem): class ParlerFollowee(ParlerItem): - def __init__(self, user_id, followee_id): + def __init__(self, user_id, followee_id, approx_date): super().__init__() self.data = { "user_id": user_id, "followee_id": followee_id, + "approx_date": approx_date } def item_type(self): @@ -80,15 +82,19 @@ class ParlerPost(ParlerItem): class ParlerPostRef(ParlerItem): - def __init__(self, data): + def __init__(self, post_id, user_id, approx_date): super().__init__() - self.data = data + self.data = { + "post_id": post_id, + "user_id": user_id, + "approx_date": approx_date + } def item_type(self): return "postref" def item_id(self): - return self.data["id"] + return self.data["user_id"] + self.data["post_id"] class ParlerUrl(ParlerItem): diff --git a/scanner.py b/scanner.py index a96e219..fce5293 100644 --- a/scanner.py +++ b/scanner.py @@ -29,24 +29,30 @@ if not MST: class SessionDebugWrapper(requests.Session): def get(self, url, **kwargs): - retries = 3 + retries = 4 while retries > 0: retries -= 1 try: - r = super().get(url, **kwargs, timeout=15) + r = super().get(url, **kwargs, timeout=45) logger.debug( "GET %s <%d>" % (url + "?" + (urlencode(kwargs["params"]) if "params" in kwargs else ""), r.status_code) ) if r.status_code == 429: - sleep(1) + sleep(15) raise Exception("rate limited") + if r.status_code == 502: + raise Exception("Server error") return r except KeyboardInterrupt as e: raise e except Exception as e: - logger.warning("Error with request %s: %s" % (url, str(e))) + logger.warning( + "%s: %s" + % (url + "?" + (urlencode(kwargs["params"]) if "params" in kwargs else ""), str(e)) + ) + sleep(10) raise Exception("Gave up request after maximum number of retries") @@ -112,7 +118,7 @@ class ParlerScanner: for items_key in items_keys: if items_key in j and j[items_key]: for item in j[items_key]: - yield item, items_key + yield item, items_key, current_key self._state.set_resume_key(resume_endpoint, resume_id, current_key) @@ -122,27 +128,27 @@ class ParlerScanner: current_key = j["next"] def user_followers(self, api, user_id): - for profile, key in self._iterate_endpoint( + for profile, key, it_index in self._iterate_endpoint( func=api.user_api.get_followers_for_user_id, params={"id": user_id}, resume_endpoint="followers", resume_id=user_id, items_keys=["followers"] ): - yield ParlerFollower(user_id=user_id, follower_id=profile["id"]) + yield ParlerFollower(user_id=user_id, follower_id=profile["id"], approx_date=it_index) def user_followees(self, api, user_id): - for profile, key in self._iterate_endpoint( + for profile, key, it_index in self._iterate_endpoint( func=api.user_api.get_following_for_user_id, params={"id": user_id}, resume_endpoint="followees", resume_id=user_id, items_keys=["followees"] ): - yield ParlerFollowee(user_id=user_id, followee_id=profile["id"]) + yield ParlerFollowee(user_id=user_id, followee_id=profile["id"], approx_date=it_index) def user_posts(self, api, user_id): - for item, key in self._iterate_endpoint( + for item, key, it_index in self._iterate_endpoint( func=api.feed_api.get_users_feed, params={"id": user_id}, resume_endpoint="posts", @@ -153,12 +159,12 @@ class ParlerScanner: if key == "posts": yield ParlerPost(data=item) elif key == "postRefs": - yield ParlerPostRef(data=item) + yield ParlerPostRef(post_id=item["_id"], user_id=user_id, approx_date=it_index) elif key == "urls": yield ParlerUrl(data=item) def post_comments(self, api, post_id): - for item, key in self._iterate_endpoint( + for item, key, _ in self._iterate_endpoint( func=api.comments_api.get_comments, params={"id": post_id, "reverse": "true"}, resume_endpoint="comments", @@ -191,6 +197,8 @@ class ParlerScanner: user_id = self._get_user_id_hash(api, int_id) if user_id: yield user_id, int_id + else: + self._state.mark_visited_user(int_id) def process_userid(self, api, user_id, int_id): profile = self.fetch_profile(api, user_id, int_id) @@ -205,8 +213,7 @@ class ParlerScanner: for post in self.user_posts(api, user_id): yield post - if (post.item_type() == "post" or post.item_type() == "postref") \ - and not self._state.has_visited_post(post): + if post.item_type() == "post" and not self._state.has_visited_post(post): for comment in self.post_comments(api, post.item_id()): yield comment self._state.mark_visited_post(post.item_id()) diff --git a/util.py b/util.py index a6a6c1b..ec2ca69 100644 --- a/util.py +++ b/util.py @@ -3,7 +3,7 @@ import sys from logging import StreamHandler logger = logging.getLogger("default") -logger.setLevel(logging.DEBUG) +logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s <%(threadName)s> %(levelname)-5s %(message)s') for h in logger.handlers: