import json import traceback from queue import Queue from threading import Thread from hexlib.env import get_redis from hexlib.log import logger from gabtv import GabTvScanner, item_type from post_process import post_process from state import GabTvState def publish_worker(queue: Queue): while True: try: item = queue.get() if item is None: break publish(item) except Exception as e: logger.error(str(e) + ": " + traceback.format_exc()) finally: queue.task_done() def once(func): def wrapper(item): if not state.has_visited(item["_id"]): func(item) state.mark_visited(item["_id"]) return wrapper @once def publish(item): post_process(item) itm_type = item_type(item) routing_key = "%s.x" % (itm_type,) message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True) rdb.lpush("arc.gtv." + routing_key, message) if __name__ == "__main__": state = GabTvState("gtv") rdb = get_redis() publish_q = Queue() publish_thread = Thread(target=publish_worker, args=(publish_q,)) publish_thread.setDaemon(True) publish_thread.start() s = GabTvScanner(state) while True: try: for item in s.all_items(): publish_q.put(item) except KeyboardInterrupt as e: print("cleanup..") publish_q.put(None) break