commit e4b94ce045b8d112d0b1e7551480f25a18094d9a Author: simon987 Date: Sun Feb 7 16:00:54 2021 -0500 Initial commit diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..caa32e6 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +.idea/ +*.iml \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..caa32e6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/ +*.iml \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..bf21a04 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.8 + +ADD requirements.txt /requirements.txt +RUN pip install -r requirements.txt + +COPY . /app + +RUN chmod 777 -R /app + +WORKDIR /app +ENTRYPOINT ["python", "run.py"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2bd4888 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,8 @@ +version: "3" + +services: + scraper: + image: simon987/lbry_feed + restart: always + environment: + - "LF_REDIS_HOST=" diff --git a/lbry.py b/lbry.py new file mode 100644 index 0000000..4035eed --- /dev/null +++ b/lbry.py @@ -0,0 +1,240 @@ +from time import time +import json + +import requests + +from state import LbryState +from util import logger + +BASE_URL = "https://api.lbry.tv/api" +LIGHTHOUSE_URL = "https://lighthouse.lbry.com" + + +def now(): + return round(time() * 1000) + + +class LbryApi: + def __init__(self): + self._s = requests.session() + + def _post(self, url, **kwargs): + r = self._s.post(url, **kwargs) + logger.debug("GET %s <%d>" % (url, r.status_code)) + return r + + def _get(self, url, **kwargs): + r = self._s.get(url, **kwargs) + logger.debug("GET %s <%d>" % (url, r.status_code)) + return r + + def channel_videos(self, channel_id, size=30, page=1): + j = self._post( + f"{BASE_URL}/v1/proxy?m=claim_search", + data=json.dumps({ + "id": now(), + "jsonrpc": "2.0", + "method": "claim_search", + "params": { + "channel_ids": [channel_id], + "claim_type": ["channel", "repost", "stream"], + "fee_amount": ">=0", + "include_purchase_receipt": True, + "no_totals": True, + "not_channel_ids": [], + "not_tags": [], + "order_by": ["release_time"], + "page": page, + "page_size": size, + } + }) + ).json() + + if j["result"]["items"]: + def next_page(): + return self.channel_videos(channel_id, size, page + 1) + + return j["result"]["items"], next_page + return j["result"]["items"], None + + def comment_list(self, claim_id, page_size=99999, page=1): + j = self._post( + f"{BASE_URL}/v1/proxy?m=comment_list", + data=json.dumps({ + "id": now(), + "jsonrpc": "2.0", + "method": "comment_list", + "params": { + "claim_id": claim_id, + "include_replies": True, + "page": page, + "page_size": page_size, + "skip_validation": True + } + }) + ).json() + + return j["result"]["items"] if "items" in j["result"] else [] + + def comment_react_list(self, comment_ids): + j = self._post( + f"{BASE_URL}/v1/proxy?m=comment_react_list", + data=json.dumps({ + "id": now(), + "jsonrpc": "2.0", + "method": "comment_react_list", + "params": { + "comment_ids": ",".join(comment_ids) + } + }) + ).json() + + if "error" in j["result"]: + return {} + + return j["result"]["others_reactions"] + + def resolve(self, urls): + j = self._post( + f"{BASE_URL}/v1/proxy?m=resolve", + data=json.dumps({ + "id": now(), + "jsonrpc": "2.0", + "method": "resolve", + "params": { + "include_is_my_output": False, + "include_purchase_receipt": True, + "urls": urls + } + }) + ).json() + + return j["result"] + + def get_related_videos(self, s, related_to, size=1000, from_=0, nsfw=False): + if len(s) < 3: + s = "aaa" + return self._post( + f"{LIGHTHOUSE_URL}/search", + params={ + "s": s, + "size": size, + "from": from_, + "related_to": related_to, + # Note: I don't think there's a way to get both nsfw & sfw in the same query + "nsfw": "true" if nsfw else "false" + } + ).json() + + +class LbryWrapper: + def __init__(self): + self._api = LbryApi() + self._state = LbryState() + + def _iter(self, func, *args, **kwargs): + items, next_page = func(*args, **kwargs) + for item in items: + yield item + while next_page is not None: + items, next_page = next_page() + for item in items: + yield item + + def _get_videos(self, channel_id): + return self._iter(self._api.channel_videos, channel_id=channel_id) + + def _get_comments(self, claim_id, fetch_reactions=False): + comments = self._api.comment_list(claim_id) + comment_ids = [com["comment_id"] for com in comments] + if fetch_reactions: + reactions = self._api.comment_react_list(comment_ids) + else: + reactions = {} + + for k, v in reactions.items(): + for com in comments: + if com["comment_id"] == k: + com["reactions"] = v + break + + return comments + + def _get_related(self, claim): + j = self._api.get_related_videos(claim["name"], claim["claim_id"]) + return [(f"lbry://{c['name']}#{c['claimId']}", c["claimId"]) for c in j] + + def all_items(self): + + seed_list = [ + # Varg + "d1bb8684d445e6dd397fc13bfbb14bbe194c7129", + # Quartering + "113515e893b8186595595e594ecc410bae50c026", + # Liberty hangout + "5499c784a960d96497151f5e0e8434b84ea5da24", + # Alex Jones + "cde3b125543e3e930ac2647df957a836e3da3816", + # ancaps + "0135b83c29aa82120401f3f9053bf5b0520529ed", + "b89ed227c49e726fcccf913bdc9dec4c8fec99c2", + ] + + for channel_id in seed_list: + if not self._state.has_visited(channel_id): + self._state.queue_channel(channel_id) + + while True: + channel_id = self._state.pop_channel() + if channel_id is None: + break + + if self._state.has_visited(channel_id): + continue + + # re-queue immediately in case of fault: it will be ignored if pop'ed again + # only if it got crawled completely + self._state.queue_channel(channel_id) + + published_channel_data = False + + for claim in self._get_videos(channel_id): + + channel_url = claim["signing_channel"]["short_url"] + + if not published_channel_data: + channel_data = self._api.resolve([channel_url])[channel_url] + yield channel_data, "channel" + + if not self._state.has_visited(claim["claim_id"]): + yield claim, "video" + + for comment in self._get_comments(claim["claim_id"]): + yield comment, "comment" + + related_to_resolve = [] + for rel_url, rel_id in self._get_related(claim): + if not self._state.has_visited(rel_id): + related_to_resolve.append(rel_url) + for rel_url, rel_claim in self._api.resolve(related_to_resolve).items(): + if "error" in rel_claim: + continue + + rel_claim["_related_to"] = claim["claim_id"] + yield rel_claim, "video" + + for rel_comment in self._get_comments(rel_claim["claim_id"]): + yield rel_comment, "comment" + + if "signing_channel" in rel_claim and "channel_id" in rel_claim["signing_channel"]: + rel_channel_id = rel_claim["signing_channel"]["channel_id"] + if not self._state.has_visited(rel_channel_id): + self._state.queue_channel(rel_channel_id) + + self._state.mark_visited(rel_claim["claim_id"]) + self._state.mark_visited(claim["claim_id"]) + self._state.mark_visited(channel_id) + + logger.warning("No more channels to crawl!") + + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4fd7d2c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +requests +git+git://github.com/simon987/hexlib.git +redis \ No newline at end of file diff --git a/run.py b/run.py new file mode 100644 index 0000000..aed4195 --- /dev/null +++ b/run.py @@ -0,0 +1,30 @@ +import json +import redis +import os + +from lbry import LbryWrapper + +REDIS_HOST = os.environ.get("LF_REDIS_HOST", "localhost") + + +def publish(item, item_type): + routing_key = f"arc.lbry.{item_type}.x" + + if item_type == "video": + item["_id"] = item["claim_id"] + elif item_type == "comment": + item["_id"] = item["comment_id"] + elif item_type == "channel": + item["_id"] = item["claim_id"] + + message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True) + rdb.lpush(routing_key, message) + + +if __name__ == '__main__': + lbry = LbryWrapper() + + rdb = redis.Redis(host=REDIS_HOST) + + for item, item_type in lbry.all_items(): + publish(item, item_type) diff --git a/state.py b/state.py new file mode 100644 index 0000000..9bdfe36 --- /dev/null +++ b/state.py @@ -0,0 +1,23 @@ +from hexlib.db import VolatileQueue, VolatileBooleanState +import os + +REDIS_HOST = os.environ.get("LF_REDIS_HOST", "localhost") + + +class LbryState: + + def __init__(self): + self._visited = VolatileBooleanState(prefix="lbry", host=REDIS_HOST) + self._channel_queue = VolatileQueue("lbry_channel_queue", host=REDIS_HOST) + + def has_visited(self, item_id): + return self._visited["byid"][item_id] + + def mark_visited(self, item_id): + self._visited["byid"][item_id] = True + + def queue_channel(self, channel_id): + self._channel_queue.put(channel_id) + + def pop_channel(self): + return self._channel_queue.get() diff --git a/util.py b/util.py new file mode 100644 index 0000000..cd834ed --- /dev/null +++ b/util.py @@ -0,0 +1,13 @@ +import logging +import sys +from logging import StreamHandler + +logger = logging.getLogger("default") +logger.setLevel(logging.DEBUG) + +for h in logger.handlers: + logger.removeHandler(h) + +handler = StreamHandler(sys.stdout) +handler.formatter = logging.Formatter("%(asctime)s %(levelname)-5s %(message)s") +logger.addHandler(handler)