import json import os import traceback from queue import Queue from threading import Thread import redis from gabtv import GabTvScanner, item_type from post_process import post_process from state import GabTvState from util import logger REDIS_HOST = os.environ.get("GTV_REDIS_HOST", "localhost") REDIS_PORT = os.environ.get("GTV_REDIS_PORT", 6379) PF_PUBLISH = os.environ.get("GTV_PUBLISH", False) GTV_RPS = os.environ.get("GTV_RPS", 1) ARC_LISTS = os.environ.get("GTV_ARC_LISTS", "arc").split(",") def publish_worker(queue: Queue): while True: try: item = queue.get() if item is None: break publish(item) except Exception as e: logger.error(str(e) + ": " + traceback.format_exc()) finally: queue.task_done() def once(func): def wrapper(item): if not state.has_visited(item["_id"]): func(item) state.mark_visited(item["_id"]) return wrapper @once def publish(item): post_process(item) itm_type = item_type(item) routing_key = "%s.x" % (itm_type,) message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True) if PF_PUBLISH: rdb.publish("gtv." + routing_key, message) for arc in ARC_LISTS: rdb.lpush(arc + ".gtv." + routing_key, message) if __name__ == "__main__": state = GabTvState("gtv", REDIS_HOST, REDIS_PORT) rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT) publish_q = Queue() publish_thread = Thread(target=publish_worker, args=(publish_q,)) publish_thread.setDaemon(True) publish_thread.start() s = GabTvScanner(state, GTV_RPS) while True: try: for item in s.all_items(): publish_q.put(item) except KeyboardInterrupt as e: print("cleanup..") for _ in range(3): publish_q.put(None) break