mirror of
https://github.com/simon987/gabtv_feed.git
synced 2025-04-04 08:23:04 +00:00
68 lines
1.5 KiB
Python
68 lines
1.5 KiB
Python
import json
|
|
import traceback
|
|
from queue import Queue
|
|
from threading import Thread
|
|
|
|
from hexlib.env import get_redis
|
|
from hexlib.log import logger
|
|
|
|
from gabtv import GabTvScanner, item_type
|
|
from post_process import post_process
|
|
from state import GabTvState
|
|
|
|
|
|
def publish_worker(queue: Queue):
|
|
while True:
|
|
try:
|
|
item = queue.get()
|
|
if item is None:
|
|
break
|
|
publish(item)
|
|
|
|
except Exception as e:
|
|
logger.error(str(e) + ": " + traceback.format_exc())
|
|
finally:
|
|
queue.task_done()
|
|
|
|
|
|
def once(func):
|
|
def wrapper(item):
|
|
if not state.has_visited(item["_id"]):
|
|
func(item)
|
|
state.mark_visited(item["_id"])
|
|
|
|
return wrapper
|
|
|
|
|
|
@once
|
|
def publish(item):
|
|
post_process(item)
|
|
|
|
itm_type = item_type(item)
|
|
routing_key = "%s.x" % (itm_type,)
|
|
|
|
message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
|
|
rdb.lpush("arc.gtv." + routing_key, message)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
state = GabTvState("gtv")
|
|
rdb = get_redis()
|
|
|
|
publish_q = Queue()
|
|
publish_thread = Thread(target=publish_worker, args=(publish_q,))
|
|
publish_thread.setDaemon(True)
|
|
publish_thread.start()
|
|
|
|
s = GabTvScanner(state)
|
|
|
|
while True:
|
|
try:
|
|
for item in s.all_items():
|
|
publish_q.put(item)
|
|
except KeyboardInterrupt as e:
|
|
print("cleanup..")
|
|
publish_q.put(None)
|
|
break
|