gabtv_feed/gabtv.py

167 lines
5.2 KiB
Python

from json import JSONDecodeError
import time
import os
from time import sleep
from hexlib.env import get_web
from hexlib.log import logger
from state import GabTvState
from bs4 import BeautifulSoup
import json
PER_PAGE = 5
MAX_PAGES = int(os.environ.get("GTV_MAX_PAGES", 99999999))
def item_type(item):
if "author" in item:
return "comment"
if "category" in item:
return "episode"
if "moderators" in item:
return "channel"
def episode_url(page, cpp=PER_PAGE):
return "https://tv.gab.com/api/v1/episode?cpp=%d&p=%d" % (cpp, page,)
def increment_episode_url(url):
tokens = url.rsplit("=", 1)
return "=".join((
tokens[0],
str(int(tokens[1]) + 1)
))
def comments_url(episode):
return f"https://tv.gab.com/channel/{episode['channel']['slug']}/view/{episode['slug']}"
def channel_url(channel_id, page=1):
return f"https://tv.gab.com/api/v1/channel/{channel_id}/episode?p={page}"
def parse_episode_list(r):
try:
j = json.loads(r.content.decode('utf-8', 'ignore'))
except JSONDecodeError:
logger.warning("JSONDecodeError for %s:" % (r.url,))
logger.warning(r.text)
return [], None
episodes = j["episodes"]
page = j["pagination"]["p"]
if len(episodes) == PER_PAGE and page + 1 < MAX_PAGES:
return episodes, episode_url(page=page + 1)
return episodes, None
def parse_channel_episode_list(channel_id, r):
try:
j = json.loads(r.content.decode('utf-8', 'ignore'))
except JSONDecodeError:
logger.warning("JSONDecodeError for %s:" % (r.url,))
logger.warning(r.text)
return [], None
episodes = j["episodes"]
if len(episodes) == PER_PAGE:
page = j["pagination"]["p"]
return episodes, channel_url(channel_id, page=page + 1)
return episodes, None
class GabTvScanner:
def __init__(self, state: GabTvState):
self._state = state
self._web = get_web()
def episodes_of_channel(self, channel_id):
if not self._state.has_visited_channel(channel_id):
r = self._web.get(channel_url(channel_id))
while True:
episodes, next_url = parse_channel_episode_list(channel_id, r)
for episode in episodes:
yield episode
self._state.mark_visited_channel(channel_id)
if not next_url:
break
r = self._web.get(next_url)
if not r or r.status_code != 200:
break
def episodes(self):
r = self._web.get(episode_url(page=1))
# TODO: This is sometimes broken for no reason on page=1 (?!)
if r.status_code == 500:
sleep(30)
return []
skips = 15
while True:
episodes, next_url = parse_episode_list(r)
for episode in episodes:
yield episode
# Also crawl channel list
# Looks like only a
channel_id = episode["channel"]["_id"]
for channel_ep in self.episodes_of_channel(channel_id):
yield channel_ep
if not next_url:
break
r = self._web.get(next_url)
# Some pages are broken, attempt to skip it once
while r.status_code == 500 and skips > 0:
logger.info("Skipped page!")
next_url = increment_episode_url(next_url)
r = self._web.get(next_url)
skips -= 1
if not r or r.status_code != 200:
break
skips = 15
def fetch_random_episode_ids(self):
r = self._web.get("https://tv.gab.com/search")
if not r or r.status_code != 200:
return []
def fetch_comments(self, episode):
r = self._web.get(comments_url(episode))
if not r or r.status_code != 200:
return []
soup = BeautifulSoup(r.content, "html.parser")
for com_el in soup.find_all("div", class_="tv-comment"):
yield {
"_id": com_el.find("div", class_="comment-content").get("data-comment-id"),
"author_display_name": com_el.find("div", class_="author-name").find("a").text,
"author": com_el.find("div", class_="author-name").find("a").get("href").split("/")[2],
"channel": episode["channel"]["_id"],
"episode": episode["_id"],
"_created_rel": int(time.time()),
"created": com_el.find("div", class_="created-date").text,
"content": com_el.find("div", class_="comment-content").text.strip(),
"upvotes": int(com_el.find("span", class_="upvote-label").text),
"downvotes": int(com_el.find("span", class_="downvote-label").text),
"replies": int(com_el.find_all("span", class_="icon-label")[-1].text),
"_raw": str(com_el)
}
def all_items(self):
for episode in self.episodes():
yield episode
yield episode["channel"]
if not self._state.has_visited_episode(episode):
for comment in self.fetch_comments(episode):
yield comment
self._state.mark_visited_episode(episode)