gabtv_feed/run.py
2020-12-25 20:11:46 -05:00

81 lines
1.9 KiB
Python

import json
import os
import traceback
from queue import Queue
from threading import Thread
import redis
from gabtv import GabTvScanner, item_type
from post_process import post_process
from state import GabTvState
from util import logger
REDIS_HOST = os.environ.get("GTV_REDIS_HOST", "localhost")
REDIS_PORT = os.environ.get("GTV_REDIS_PORT", 6379)
PF_PUBLISH = os.environ.get("GTV_PUBLISH", False)
GTV_RPS = os.environ.get("GTV_RPS", 1)
ARC_LISTS = os.environ.get("GTV_ARC_LISTS", "arc").split(",")
def publish_worker(queue: Queue):
while True:
try:
item = queue.get()
if item is None:
break
publish(item)
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())
finally:
queue.task_done()
def once(func):
def wrapper(item):
if not state.has_visited(item["_id"]):
func(item)
state.mark_visited(item["_id"])
return wrapper
@once
def publish(item):
post_process(item)
itm_type = item_type(item)
routing_key = "%s.x" % (itm_type,)
message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
if PF_PUBLISH:
rdb.publish("gtv." + routing_key, message)
for arc in ARC_LISTS:
rdb.lpush(arc + ".gtv." + routing_key, message)
if __name__ == "__main__":
state = GabTvState("gtv", REDIS_HOST, REDIS_PORT)
rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
publish_q = Queue()
for _ in range(3):
publish_thread = Thread(target=publish_worker, args=(publish_q,))
publish_thread.setDaemon(True)
publish_thread.start()
s = GabTvScanner(state, GTV_RPS)
while True:
try:
for item in s.all_items():
publish_q.put(item)
except KeyboardInterrupt as e:
print("cleanup..")
for _ in range(3):
publish_q.put(None)
break