gabtv_feed/run.py

68 lines
1.5 KiB
Python

import json
import traceback
from queue import Queue
from threading import Thread
from hexlib.env import get_redis
from hexlib.log import logger
from gabtv import GabTvScanner, item_type
from post_process import post_process
from state import GabTvState
def publish_worker(queue: Queue):
while True:
try:
item = queue.get()
if item is None:
break
publish(item)
except Exception as e:
logger.error(str(e) + ": " + traceback.format_exc())
finally:
queue.task_done()
def once(func):
def wrapper(item):
if not state.has_visited(item["_id"]):
func(item)
state.mark_visited(item["_id"])
return wrapper
@once
def publish(item):
post_process(item)
itm_type = item_type(item)
routing_key = "%s.x" % (itm_type,)
message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
rdb.lpush("arc.gtv." + routing_key, message)
if __name__ == "__main__":
state = GabTvState("gtv")
rdb = get_redis()
publish_q = Queue()
publish_thread = Thread(target=publish_worker, args=(publish_q,))
publish_thread.setDaemon(True)
publish_thread.start()
s = GabTvScanner(state)
while True:
try:
for item in s.all_items():
publish_q.put(item)
except KeyboardInterrupt as e:
print("cleanup..")
publish_q.put(None)
break