mirror of
https://github.com/simon987/gabtv_feed.git
synced 2025-04-10 14:16:46 +00:00
Update hexlib version, refactor
This commit is contained in:
parent
3dda97e180
commit
3fa2d90486
@ -5,7 +5,7 @@ services:
|
||||
image: simon987/gabtv_feed
|
||||
restart: always
|
||||
environment:
|
||||
- "GTV_REDIS_HOST="
|
||||
- "GTV_RPS=0.10"
|
||||
- "REDIS_HOST="
|
||||
- "RPS=0.10"
|
||||
- "GTV_MAX_PAGES=9999999"
|
||||
- "GTV_RECRAWL_HOURS=8"
|
||||
|
8
gabtv.py
8
gabtv.py
@ -3,8 +3,10 @@ import time
|
||||
import os
|
||||
from time import sleep
|
||||
|
||||
from hexlib.env import get_web
|
||||
from hexlib.log import logger
|
||||
|
||||
from state import GabTvState
|
||||
from util import Web, logger
|
||||
from bs4 import BeautifulSoup
|
||||
import json
|
||||
|
||||
@ -73,9 +75,9 @@ def parse_channel_episode_list(channel_id, r):
|
||||
|
||||
class GabTvScanner:
|
||||
|
||||
def __init__(self, state: GabTvState, rps):
|
||||
def __init__(self, state: GabTvState):
|
||||
self._state = state
|
||||
self._web = Web(rps)
|
||||
self._web = get_web()
|
||||
|
||||
def episodes_of_channel(self, channel_id):
|
||||
if not self._state.has_visited_channel(channel_id):
|
||||
|
26
run.py
26
run.py
@ -1,22 +1,14 @@
|
||||
import json
|
||||
import os
|
||||
import traceback
|
||||
from queue import Queue
|
||||
from threading import Thread
|
||||
|
||||
import redis
|
||||
from hexlib.env import get_redis
|
||||
from hexlib.log import logger
|
||||
|
||||
from gabtv import GabTvScanner, item_type
|
||||
from post_process import post_process
|
||||
from state import GabTvState
|
||||
from util import logger
|
||||
|
||||
REDIS_HOST = os.environ.get("GTV_REDIS_HOST", "localhost")
|
||||
REDIS_PORT = os.environ.get("GTV_REDIS_PORT", 6379)
|
||||
PF_PUBLISH = os.environ.get("GTV_PUBLISH", False)
|
||||
GTV_RPS = os.environ.get("GTV_RPS", 1)
|
||||
|
||||
ARC_LISTS = os.environ.get("GTV_ARC_LISTS", "arc").split(",")
|
||||
|
||||
|
||||
def publish_worker(queue: Queue):
|
||||
@ -50,23 +42,20 @@ def publish(item):
|
||||
routing_key = "%s.x" % (itm_type,)
|
||||
|
||||
message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
|
||||
if PF_PUBLISH:
|
||||
rdb.publish("gtv." + routing_key, message)
|
||||
for arc in ARC_LISTS:
|
||||
rdb.lpush(arc + ".gtv." + routing_key, message)
|
||||
rdb.lpush("arc.gtv." + routing_key, message)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
state = GabTvState("gtv", REDIS_HOST, REDIS_PORT)
|
||||
rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
|
||||
state = GabTvState("gtv")
|
||||
rdb = get_redis()
|
||||
|
||||
publish_q = Queue()
|
||||
publish_thread = Thread(target=publish_worker, args=(publish_q,))
|
||||
publish_thread.setDaemon(True)
|
||||
publish_thread.start()
|
||||
|
||||
s = GabTvScanner(state, GTV_RPS)
|
||||
s = GabTvScanner(state)
|
||||
|
||||
while True:
|
||||
try:
|
||||
@ -74,6 +63,5 @@ if __name__ == "__main__":
|
||||
publish_q.put(item)
|
||||
except KeyboardInterrupt as e:
|
||||
print("cleanup..")
|
||||
for _ in range(3):
|
||||
publish_q.put(None)
|
||||
publish_q.put(None)
|
||||
break
|
||||
|
8
state.py
8
state.py
@ -3,15 +3,13 @@ import os
|
||||
|
||||
from hexlib.db import VolatileState, VolatileBooleanState
|
||||
|
||||
from util import logger
|
||||
|
||||
RECRAWL_HOURS = int(os.environ.get("GTV_RECRAWL_HOURS", 8))
|
||||
|
||||
|
||||
class GabTvState:
|
||||
def __init__(self, prefix, host, port):
|
||||
self._episodes = VolatileState(prefix, host=host, port=port)
|
||||
self._visited = VolatileBooleanState(prefix, host=host, port=port)
|
||||
def __init__(self, prefix):
|
||||
self._episodes = VolatileState(prefix)
|
||||
self._visited = VolatileBooleanState(prefix)
|
||||
|
||||
def has_visited(self, item_id):
|
||||
return self._visited["byid"][item_id]
|
||||
|
49
util.py
49
util.py
@ -1,49 +0,0 @@
|
||||
import logging
|
||||
import sys
|
||||
import traceback
|
||||
from logging import StreamHandler
|
||||
|
||||
import requests
|
||||
from hexlib.misc import rate_limit
|
||||
|
||||
logger = logging.getLogger("default")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
|
||||
for h in logger.handlers:
|
||||
logger.removeHandler(h)
|
||||
logger.addHandler(StreamHandler(sys.stdout))
|
||||
|
||||
|
||||
class Web:
|
||||
def __init__(self, rps):
|
||||
self.session = requests.Session()
|
||||
self._rps = rps
|
||||
|
||||
@rate_limit(self._rps)
|
||||
def _get(url, **kwargs):
|
||||
retries = 3
|
||||
|
||||
while retries > 0:
|
||||
retries -= 1
|
||||
try:
|
||||
return self.session.get(url, **kwargs)
|
||||
except KeyboardInterrupt as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.warning("Error with request %s: %s" % (url, str(e)))
|
||||
raise Exception("Gave up request after maximum number of retries")
|
||||
|
||||
self._get = _get
|
||||
|
||||
def get(self, url, **kwargs):
|
||||
try:
|
||||
r = self._get(url, **kwargs)
|
||||
|
||||
logger.debug("GET %s <%d>" % (url, r.status_code))
|
||||
return r
|
||||
except KeyboardInterrupt as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.error(str(e) + traceback.format_exc())
|
||||
return None
|
Loading…
x
Reference in New Issue
Block a user