Initial commit

This commit is contained in:
simon987 2021-02-07 16:00:54 -05:00
commit e4b94ce045
9 changed files with 332 additions and 0 deletions

2
.dockerignore Normal file
View File

@ -0,0 +1,2 @@
.idea/
*.iml

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.idea/
*.iml

11
Dockerfile Normal file
View File

@ -0,0 +1,11 @@
FROM python:3.8
ADD requirements.txt /requirements.txt
RUN pip install -r requirements.txt
COPY . /app
RUN chmod 777 -R /app
WORKDIR /app
ENTRYPOINT ["python", "run.py"]

8
docker-compose.yml Normal file
View File

@ -0,0 +1,8 @@
version: "3"
services:
scraper:
image: simon987/lbry_feed
restart: always
environment:
- "LF_REDIS_HOST="

240
lbry.py Normal file
View File

@ -0,0 +1,240 @@
from time import time
import json
import requests
from state import LbryState
from util import logger
BASE_URL = "https://api.lbry.tv/api"
LIGHTHOUSE_URL = "https://lighthouse.lbry.com"
def now():
return round(time() * 1000)
class LbryApi:
def __init__(self):
self._s = requests.session()
def _post(self, url, **kwargs):
r = self._s.post(url, **kwargs)
logger.debug("GET %s <%d>" % (url, r.status_code))
return r
def _get(self, url, **kwargs):
r = self._s.get(url, **kwargs)
logger.debug("GET %s <%d>" % (url, r.status_code))
return r
def channel_videos(self, channel_id, size=30, page=1):
j = self._post(
f"{BASE_URL}/v1/proxy?m=claim_search",
data=json.dumps({
"id": now(),
"jsonrpc": "2.0",
"method": "claim_search",
"params": {
"channel_ids": [channel_id],
"claim_type": ["channel", "repost", "stream"],
"fee_amount": ">=0",
"include_purchase_receipt": True,
"no_totals": True,
"not_channel_ids": [],
"not_tags": [],
"order_by": ["release_time"],
"page": page,
"page_size": size,
}
})
).json()
if j["result"]["items"]:
def next_page():
return self.channel_videos(channel_id, size, page + 1)
return j["result"]["items"], next_page
return j["result"]["items"], None
def comment_list(self, claim_id, page_size=99999, page=1):
j = self._post(
f"{BASE_URL}/v1/proxy?m=comment_list",
data=json.dumps({
"id": now(),
"jsonrpc": "2.0",
"method": "comment_list",
"params": {
"claim_id": claim_id,
"include_replies": True,
"page": page,
"page_size": page_size,
"skip_validation": True
}
})
).json()
return j["result"]["items"] if "items" in j["result"] else []
def comment_react_list(self, comment_ids):
j = self._post(
f"{BASE_URL}/v1/proxy?m=comment_react_list",
data=json.dumps({
"id": now(),
"jsonrpc": "2.0",
"method": "comment_react_list",
"params": {
"comment_ids": ",".join(comment_ids)
}
})
).json()
if "error" in j["result"]:
return {}
return j["result"]["others_reactions"]
def resolve(self, urls):
j = self._post(
f"{BASE_URL}/v1/proxy?m=resolve",
data=json.dumps({
"id": now(),
"jsonrpc": "2.0",
"method": "resolve",
"params": {
"include_is_my_output": False,
"include_purchase_receipt": True,
"urls": urls
}
})
).json()
return j["result"]
def get_related_videos(self, s, related_to, size=1000, from_=0, nsfw=False):
if len(s) < 3:
s = "aaa"
return self._post(
f"{LIGHTHOUSE_URL}/search",
params={
"s": s,
"size": size,
"from": from_,
"related_to": related_to,
# Note: I don't think there's a way to get both nsfw & sfw in the same query
"nsfw": "true" if nsfw else "false"
}
).json()
class LbryWrapper:
def __init__(self):
self._api = LbryApi()
self._state = LbryState()
def _iter(self, func, *args, **kwargs):
items, next_page = func(*args, **kwargs)
for item in items:
yield item
while next_page is not None:
items, next_page = next_page()
for item in items:
yield item
def _get_videos(self, channel_id):
return self._iter(self._api.channel_videos, channel_id=channel_id)
def _get_comments(self, claim_id, fetch_reactions=False):
comments = self._api.comment_list(claim_id)
comment_ids = [com["comment_id"] for com in comments]
if fetch_reactions:
reactions = self._api.comment_react_list(comment_ids)
else:
reactions = {}
for k, v in reactions.items():
for com in comments:
if com["comment_id"] == k:
com["reactions"] = v
break
return comments
def _get_related(self, claim):
j = self._api.get_related_videos(claim["name"], claim["claim_id"])
return [(f"lbry://{c['name']}#{c['claimId']}", c["claimId"]) for c in j]
def all_items(self):
seed_list = [
# Varg
"d1bb8684d445e6dd397fc13bfbb14bbe194c7129",
# Quartering
"113515e893b8186595595e594ecc410bae50c026",
# Liberty hangout
"5499c784a960d96497151f5e0e8434b84ea5da24",
# Alex Jones
"cde3b125543e3e930ac2647df957a836e3da3816",
# ancaps
"0135b83c29aa82120401f3f9053bf5b0520529ed",
"b89ed227c49e726fcccf913bdc9dec4c8fec99c2",
]
for channel_id in seed_list:
if not self._state.has_visited(channel_id):
self._state.queue_channel(channel_id)
while True:
channel_id = self._state.pop_channel()
if channel_id is None:
break
if self._state.has_visited(channel_id):
continue
# re-queue immediately in case of fault: it will be ignored if pop'ed again
# only if it got crawled completely
self._state.queue_channel(channel_id)
published_channel_data = False
for claim in self._get_videos(channel_id):
channel_url = claim["signing_channel"]["short_url"]
if not published_channel_data:
channel_data = self._api.resolve([channel_url])[channel_url]
yield channel_data, "channel"
if not self._state.has_visited(claim["claim_id"]):
yield claim, "video"
for comment in self._get_comments(claim["claim_id"]):
yield comment, "comment"
related_to_resolve = []
for rel_url, rel_id in self._get_related(claim):
if not self._state.has_visited(rel_id):
related_to_resolve.append(rel_url)
for rel_url, rel_claim in self._api.resolve(related_to_resolve).items():
if "error" in rel_claim:
continue
rel_claim["_related_to"] = claim["claim_id"]
yield rel_claim, "video"
for rel_comment in self._get_comments(rel_claim["claim_id"]):
yield rel_comment, "comment"
if "signing_channel" in rel_claim and "channel_id" in rel_claim["signing_channel"]:
rel_channel_id = rel_claim["signing_channel"]["channel_id"]
if not self._state.has_visited(rel_channel_id):
self._state.queue_channel(rel_channel_id)
self._state.mark_visited(rel_claim["claim_id"])
self._state.mark_visited(claim["claim_id"])
self._state.mark_visited(channel_id)
logger.warning("No more channels to crawl!")

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
requests
git+git://github.com/simon987/hexlib.git
redis

30
run.py Normal file
View File

@ -0,0 +1,30 @@
import json
import redis
import os
from lbry import LbryWrapper
REDIS_HOST = os.environ.get("LF_REDIS_HOST", "localhost")
def publish(item, item_type):
routing_key = f"arc.lbry.{item_type}.x"
if item_type == "video":
item["_id"] = item["claim_id"]
elif item_type == "comment":
item["_id"] = item["comment_id"]
elif item_type == "channel":
item["_id"] = item["claim_id"]
message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
rdb.lpush(routing_key, message)
if __name__ == '__main__':
lbry = LbryWrapper()
rdb = redis.Redis(host=REDIS_HOST)
for item, item_type in lbry.all_items():
publish(item, item_type)

23
state.py Normal file
View File

@ -0,0 +1,23 @@
from hexlib.db import VolatileQueue, VolatileBooleanState
import os
REDIS_HOST = os.environ.get("LF_REDIS_HOST", "localhost")
class LbryState:
def __init__(self):
self._visited = VolatileBooleanState(prefix="lbry", host=REDIS_HOST)
self._channel_queue = VolatileQueue("lbry_channel_queue", host=REDIS_HOST)
def has_visited(self, item_id):
return self._visited["byid"][item_id]
def mark_visited(self, item_id):
self._visited["byid"][item_id] = True
def queue_channel(self, channel_id):
self._channel_queue.put(channel_id)
def pop_channel(self):
return self._channel_queue.get()

13
util.py Normal file
View File

@ -0,0 +1,13 @@
import logging
import sys
from logging import StreamHandler
logger = logging.getLogger("default")
logger.setLevel(logging.DEBUG)
for h in logger.handlers:
logger.removeHandler(h)
handler = StreamHandler(sys.stdout)
handler.formatter = logging.Formatter("%(asctime)s %(levelname)-5s %(message)s")
logger.addHandler(handler)