mirror of
https://github.com/simon987/gabtv_feed.git
synced 2025-04-04 08:23:04 +00:00
167 lines
5.2 KiB
Python
167 lines
5.2 KiB
Python
from json import JSONDecodeError
|
|
import time
|
|
import os
|
|
from time import sleep
|
|
|
|
from hexlib.env import get_web
|
|
from hexlib.log import logger
|
|
|
|
from state import GabTvState
|
|
from bs4 import BeautifulSoup
|
|
import json
|
|
|
|
PER_PAGE = 5
|
|
MAX_PAGES = int(os.environ.get("GTV_MAX_PAGES", 99999999))
|
|
|
|
|
|
def item_type(item):
|
|
if "author" in item:
|
|
return "comment"
|
|
if "category" in item:
|
|
return "episode"
|
|
if "moderators" in item:
|
|
return "channel"
|
|
|
|
|
|
def episode_url(page, cpp=PER_PAGE):
|
|
return "https://tv.gab.com/api/v1/episode?cpp=%d&p=%d" % (cpp, page,)
|
|
|
|
|
|
def increment_episode_url(url):
|
|
tokens = url.rsplit("=", 1)
|
|
return "=".join((
|
|
tokens[0],
|
|
str(int(tokens[1]) + 1)
|
|
))
|
|
|
|
|
|
def comments_url(episode):
|
|
return f"https://tv.gab.com/channel/{episode['channel']['slug']}/view/{episode['slug']}"
|
|
|
|
|
|
def channel_url(channel_id, page=1):
|
|
return f"https://tv.gab.com/api/v1/channel/{channel_id}/episode?p={page}"
|
|
|
|
|
|
def parse_episode_list(r):
|
|
try:
|
|
j = json.loads(r.content.decode('utf-8', 'ignore'))
|
|
except JSONDecodeError:
|
|
logger.warning("JSONDecodeError for %s:" % (r.url,))
|
|
logger.warning(r.text)
|
|
return [], None
|
|
|
|
episodes = j["episodes"]
|
|
page = j["pagination"]["p"]
|
|
if len(episodes) == PER_PAGE and page + 1 < MAX_PAGES:
|
|
return episodes, episode_url(page=page + 1)
|
|
return episodes, None
|
|
|
|
|
|
def parse_channel_episode_list(channel_id, r):
|
|
try:
|
|
j = json.loads(r.content.decode('utf-8', 'ignore'))
|
|
except JSONDecodeError:
|
|
logger.warning("JSONDecodeError for %s:" % (r.url,))
|
|
logger.warning(r.text)
|
|
return [], None
|
|
|
|
episodes = j["episodes"]
|
|
if len(episodes) == PER_PAGE:
|
|
page = j["pagination"]["p"]
|
|
return episodes, channel_url(channel_id, page=page + 1)
|
|
return episodes, None
|
|
|
|
|
|
class GabTvScanner:
|
|
|
|
def __init__(self, state: GabTvState):
|
|
self._state = state
|
|
self._web = get_web()
|
|
|
|
def episodes_of_channel(self, channel_id):
|
|
if not self._state.has_visited_channel(channel_id):
|
|
r = self._web.get(channel_url(channel_id))
|
|
while True:
|
|
episodes, next_url = parse_channel_episode_list(channel_id, r)
|
|
for episode in episodes:
|
|
yield episode
|
|
self._state.mark_visited_channel(channel_id)
|
|
|
|
if not next_url:
|
|
break
|
|
r = self._web.get(next_url)
|
|
if not r or r.status_code != 200:
|
|
break
|
|
|
|
def episodes(self):
|
|
r = self._web.get(episode_url(page=1))
|
|
# TODO: This is sometimes broken for no reason on page=1 (?!)
|
|
if r.status_code == 500:
|
|
sleep(30)
|
|
return []
|
|
|
|
skips = 15
|
|
while True:
|
|
episodes, next_url = parse_episode_list(r)
|
|
for episode in episodes:
|
|
yield episode
|
|
|
|
# Also crawl channel list
|
|
# Looks like only a
|
|
channel_id = episode["channel"]["_id"]
|
|
for channel_ep in self.episodes_of_channel(channel_id):
|
|
yield channel_ep
|
|
|
|
if not next_url:
|
|
break
|
|
r = self._web.get(next_url)
|
|
# Some pages are broken, attempt to skip it once
|
|
while r.status_code == 500 and skips > 0:
|
|
logger.info("Skipped page!")
|
|
next_url = increment_episode_url(next_url)
|
|
r = self._web.get(next_url)
|
|
skips -= 1
|
|
if not r or r.status_code != 200:
|
|
break
|
|
skips = 15
|
|
|
|
def fetch_random_episode_ids(self):
|
|
r = self._web.get("https://tv.gab.com/search")
|
|
if not r or r.status_code != 200:
|
|
return []
|
|
|
|
def fetch_comments(self, episode):
|
|
r = self._web.get(comments_url(episode))
|
|
if not r or r.status_code != 200:
|
|
return []
|
|
|
|
soup = BeautifulSoup(r.content, "html.parser")
|
|
|
|
for com_el in soup.find_all("div", class_="tv-comment"):
|
|
yield {
|
|
"_id": com_el.find("div", class_="comment-content").get("data-comment-id"),
|
|
"author_display_name": com_el.find("div", class_="author-name").find("a").text,
|
|
"author": com_el.find("div", class_="author-name").find("a").get("href").split("/")[2],
|
|
"channel": episode["channel"]["_id"],
|
|
"episode": episode["_id"],
|
|
"_created_rel": int(time.time()),
|
|
"created": com_el.find("div", class_="created-date").text,
|
|
"content": com_el.find("div", class_="comment-content").text.strip(),
|
|
"upvotes": int(com_el.find("span", class_="upvote-label").text),
|
|
"downvotes": int(com_el.find("span", class_="downvote-label").text),
|
|
"replies": int(com_el.find_all("span", class_="icon-label")[-1].text),
|
|
"_raw": str(com_el)
|
|
}
|
|
|
|
def all_items(self):
|
|
for episode in self.episodes():
|
|
yield episode
|
|
|
|
yield episode["channel"]
|
|
|
|
if not self._state.has_visited_episode(episode):
|
|
for comment in self.fetch_comments(episode):
|
|
yield comment
|
|
self._state.mark_visited_episode(episode)
|