From 2f8cbca0b231dc7c087eb7e04791e3c24db4b425 Mon Sep 17 00:00:00 2001 From: simon987 Date: Fri, 25 Dec 2020 20:11:46 -0500 Subject: [PATCH] Initial commit --- .gitignore | 2 + Dockerfile | 11 +++ docker-compose.yml | 10 +++ gabtv.py | 164 +++++++++++++++++++++++++++++++++++++++++++++ post_process.py | 4 ++ requirements.txt | 5 ++ run.py | 80 ++++++++++++++++++++++ state.py | 34 ++++++++++ util.py | 49 ++++++++++++++ 9 files changed, 359 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 gabtv.py create mode 100644 post_process.py create mode 100644 requirements.txt create mode 100644 run.py create mode 100644 state.py create mode 100644 util.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ee64372 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/ +*.pyc diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a68487c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.8 + +ADD requirements.txt /requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY . /app + +RUN chmod 777 -R /app + +WORKDIR /app +ENTRYPOINT ["python", "run.py"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..0268694 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +version: "3" + +services: + scraper: + image: simon987/gabtv_feed + restart: always + environment: + - "GTV_REDIS_HOST=" + - "GTV_RPS=0.10" + - "GTV_MAX_PAGES=9999999" diff --git a/gabtv.py b/gabtv.py new file mode 100644 index 0000000..3e78463 --- /dev/null +++ b/gabtv.py @@ -0,0 +1,164 @@ +from json import JSONDecodeError +import time +import os +from time import sleep + +from state import GabTvState +from util import Web, logger +from bs4 import BeautifulSoup +import json + +PER_PAGE = 5 +MAX_PAGES = int(os.environ.get("GTV_MAX_PAGES", 99999999)) + + +def item_type(item): + if "author" in item: + return "comment" + if "category" in item: + return "episode" + if "moderators" in item: + return "channel" + + +def episode_url(page, cpp=PER_PAGE): + return "https://tv.gab.com/api/v1/episode?cpp=%d&p=%d" % (cpp, page,) + + +def increment_episode_url(url): + tokens = url.rsplit("=", 1) + return "=".join(( + tokens[0], + str(int(tokens[1]) + 1) + )) + + +def comments_url(episode): + return f"https://tv.gab.com/channel/{episode['channel']['slug']}/view/{episode['slug']}" + + +def channel_url(channel_id, page=1): + return f"https://tv.gab.com/api/v1/channel/{channel_id}/episode?p={page}" + + +def parse_episode_list(r): + try: + j = json.loads(r.content.decode('utf-8', 'ignore')) + except JSONDecodeError: + logger.warning("JSONDecodeError for %s:" % (r.url,)) + logger.warning(r.text) + return [], None + + episodes = j["episodes"] + page = j["pagination"]["p"] + if len(episodes) == PER_PAGE and page + 1 < MAX_PAGES: + return episodes, episode_url(page=page + 1) + return episodes, None + + +def parse_channel_episode_list(channel_id, r): + try: + j = json.loads(r.content.decode('utf-8', 'ignore')) + except JSONDecodeError: + logger.warning("JSONDecodeError for %s:" % (r.url,)) + logger.warning(r.text) + return [], None + + episodes = j["episodes"] + if len(episodes) == PER_PAGE: + page = j["pagination"]["p"] + return episodes, channel_url(channel_id, page=page + 1) + return episodes, None + + +class GabTvScanner: + + def __init__(self, state: GabTvState, rps): + self._state = state + self._web = Web(rps) + + def episodes_of_channel(self, channel_id): + if not self._state.has_visited_channel(channel_id): + r = self._web.get(channel_url(channel_id)) + while True: + episodes, next_url = parse_channel_episode_list(channel_id, r) + for episode in episodes: + yield episode + self._state.mark_visited_channel(channel_id) + + if not next_url: + break + r = self._web.get(next_url) + if not r or r.status_code != 200: + break + + def episodes(self): + r = self._web.get(episode_url(page=1)) + # TODO: This is sometimes broken for no reason on page=1 (?!) + if r.status_code == 500: + sleep(30) + return [] + + skips = 15 + while True: + episodes, next_url = parse_episode_list(r) + for episode in episodes: + yield episode + + # Also crawl channel list + # Looks like only a + channel_id = episode["channel"]["_id"] + for channel_ep in self.episodes_of_channel(channel_id): + yield channel_ep + + if not next_url: + break + r = self._web.get(next_url) + # Some pages are broken, attempt to skip it once + while r.status_code == 500 and skips > 0: + logger.info("Skipped page!") + next_url = increment_episode_url(next_url) + r = self._web.get(next_url) + skips -= 1 + if not r or r.status_code != 200: + break + skips = 15 + + def fetch_random_episode_ids(self): + r = self._web.get("https://tv.gab.com/search") + if not r or r.status_code != 200: + return [] + + def fetch_comments(self, episode): + r = self._web.get(comments_url(episode)) + if not r or r.status_code != 200: + return [] + + soup = BeautifulSoup(r.content, "html.parser") + + for com_el in soup.find_all("div", class_="tv-comment"): + yield { + "_id": com_el.find("div", class_="comment-content").get("data-comment-id"), + "author_display_name": com_el.find("div", class_="author-name").find("a").text, + "author": com_el.find("div", class_="author-name").find("a").get("href").split("/")[2], + "channel": episode["channel"]["_id"], + "episode": episode["_id"], + "_created_rel": int(time.time()), + "created": com_el.find("div", class_="created-date").text, + "content": com_el.find("div", class_="comment-content").text.strip(), + "upvotes": int(com_el.find("span", class_="upvote-label").text), + "downvotes": int(com_el.find("span", class_="downvote-label").text), + "replies": int(com_el.find_all("span", class_="icon-label")[-1].text), + "_raw": str(com_el) + } + + def all_items(self): + for episode in self.episodes(): + yield episode + + yield episode["channel"] + + if not self._state.has_visited_episode(episode): + for comment in self.fetch_comments(episode): + yield comment + self._state.mark_visited_episode(episode) diff --git a/post_process.py b/post_process.py new file mode 100644 index 0000000..36cca48 --- /dev/null +++ b/post_process.py @@ -0,0 +1,4 @@ +def post_process(item): + item["_v"] = 1.0 + + return item diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1961e9b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +requests +urllib3 +git+git://github.com/simon987/hexlib.git +redis +bs4 diff --git a/run.py b/run.py new file mode 100644 index 0000000..6a1f93a --- /dev/null +++ b/run.py @@ -0,0 +1,80 @@ +import json +import os +import traceback +from queue import Queue +from threading import Thread + +import redis + +from gabtv import GabTvScanner, item_type +from post_process import post_process +from state import GabTvState +from util import logger + +REDIS_HOST = os.environ.get("GTV_REDIS_HOST", "localhost") +REDIS_PORT = os.environ.get("GTV_REDIS_PORT", 6379) +PF_PUBLISH = os.environ.get("GTV_PUBLISH", False) +GTV_RPS = os.environ.get("GTV_RPS", 1) + +ARC_LISTS = os.environ.get("GTV_ARC_LISTS", "arc").split(",") + + +def publish_worker(queue: Queue): + while True: + try: + item = queue.get() + if item is None: + break + publish(item) + + except Exception as e: + logger.error(str(e) + ": " + traceback.format_exc()) + finally: + queue.task_done() + + +def once(func): + def wrapper(item): + if not state.has_visited(item["_id"]): + func(item) + state.mark_visited(item["_id"]) + + return wrapper + + +@once +def publish(item): + post_process(item) + + itm_type = item_type(item) + routing_key = "%s.x" % (itm_type,) + + message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True) + if PF_PUBLISH: + rdb.publish("gtv." + routing_key, message) + for arc in ARC_LISTS: + rdb.lpush(arc + ".gtv." + routing_key, message) + + +if __name__ == "__main__": + + state = GabTvState("gtv", REDIS_HOST, REDIS_PORT) + rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT) + + publish_q = Queue() + for _ in range(3): + publish_thread = Thread(target=publish_worker, args=(publish_q,)) + publish_thread.setDaemon(True) + publish_thread.start() + + s = GabTvScanner(state, GTV_RPS) + + while True: + try: + for item in s.all_items(): + publish_q.put(item) + except KeyboardInterrupt as e: + print("cleanup..") + for _ in range(3): + publish_q.put(None) + break diff --git a/state.py b/state.py new file mode 100644 index 0000000..a2091cd --- /dev/null +++ b/state.py @@ -0,0 +1,34 @@ +from time import time + +from hexlib.db import VolatileState, VolatileBooleanState + + +class GabTvState: + def __init__(self, prefix, host, port): + self._episodes = VolatileState(prefix, host=host, port=port) + self._visited = VolatileBooleanState(prefix, host=host, port=port) + + def has_visited(self, item_id): + return self._visited["byid"][item_id] + + def mark_visited(self, item_id): + self._visited["byid"][item_id] = True + + def has_visited_episode(self, episode): + # TODO: This doesn't actually work because the 'stats' object never actually updates (?!?) + # if episode["stats"]["commentCount"] == 0: + # # No comments, don't need to visit + # return True + # com_count = self._episodes["episodes"][episode["_id"]] + # return not com_count or episode["stats"]["commentCount"] == com_count + last_visited = self._episodes["ep_ts"][episode["_id"]] + return last_visited and int(time()) - int(last_visited) <= 3600 * 24 * 3 + + def mark_visited_episode(self, episode): + self._episodes["ep_ts"][episode["_id"]] = int(time()) + + def has_visited_channel(self, channel_id): + return self._visited["channel"][channel_id] + + def mark_visited_channel(self, item_id): + self._visited["channel"][item_id] = True diff --git a/util.py b/util.py new file mode 100644 index 0000000..02d5da2 --- /dev/null +++ b/util.py @@ -0,0 +1,49 @@ +import logging +import sys +import traceback +from logging import StreamHandler + +import requests +from hexlib.misc import rate_limit + +logger = logging.getLogger("default") +logger.setLevel(logging.DEBUG) + +formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') +for h in logger.handlers: + logger.removeHandler(h) +logger.addHandler(StreamHandler(sys.stdout)) + + +class Web: + def __init__(self, rps): + self.session = requests.Session() + self._rps = rps + + @rate_limit(self._rps) + def _get(url, **kwargs): + retries = 3 + + while retries > 0: + retries -= 1 + try: + return self.session.get(url, **kwargs) + except KeyboardInterrupt as e: + raise e + except Exception as e: + logger.warning("Error with request %s: %s" % (url, str(e))) + raise Exception("Gave up request after maximum number of retries") + + self._get = _get + + def get(self, url, **kwargs): + try: + r = self._get(url, **kwargs) + + logger.debug("GET %s <%d>" % (url, r.status_code)) + return r + except KeyboardInterrupt as e: + raise e + except Exception as e: + logger.error(str(e) + traceback.format_exc()) + return None