mirror of
https://github.com/simon987/chan_feed.git
synced 2025-04-04 08:23:05 +00:00
123 lines
3.6 KiB
Python
123 lines
3.6 KiB
Python
import json
|
|
import os
|
|
import traceback
|
|
from queue import Queue
|
|
from threading import Thread
|
|
|
|
from hexlib.concurrency import queue_iter
|
|
from hexlib.db import VolatileBooleanState, VolatileState
|
|
from hexlib.env import get_web, get_redis, redis_publish
|
|
from hexlib.log import logger
|
|
|
|
from chan.chan import CHANS
|
|
from post_process import post_process
|
|
|
|
CHAN = os.environ.get("CF_CHAN", None)
|
|
|
|
|
|
class ChanScanner:
|
|
def __init__(self, helper):
|
|
self.web = get_web()
|
|
self.helper = helper
|
|
self.state = state
|
|
|
|
def _threads(self, board):
|
|
r = self.web.get(self.helper.threads_url(board))
|
|
if not r or r.status_code != 200:
|
|
return []
|
|
|
|
while True:
|
|
threads, next_url = self.helper.parse_threads_list(r)
|
|
for thread in threads:
|
|
yield thread
|
|
if not next_url:
|
|
break
|
|
r = self.web.get(next_url)
|
|
if not r or r.status_code != 200:
|
|
break
|
|
|
|
def _fetch_posts(self, board, thread):
|
|
r = self.web.get(self.helper.posts_url(board, thread))
|
|
if r and r.status_code == 200:
|
|
return self.helper.parse_thread(r)
|
|
return []
|
|
|
|
def _posts(self, board):
|
|
for thread in self._threads(board):
|
|
if self.state.has_new_posts(thread, self.helper, board):
|
|
for post in self._fetch_posts(board, thread):
|
|
yield post
|
|
self.state.mark_thread_as_visited(thread, self.helper, board)
|
|
|
|
def all_posts(self):
|
|
for board in self.helper.boards():
|
|
for post in self._posts(board):
|
|
yield post, board
|
|
|
|
|
|
class ChanState:
|
|
def __init__(self, prefix):
|
|
self._posts = VolatileBooleanState(prefix)
|
|
self._threads = VolatileState(prefix)
|
|
|
|
def mark_visited(self, item: int):
|
|
self._posts["posts"][item] = True
|
|
|
|
def has_visited(self, item: int):
|
|
return self._posts["posts"][item]
|
|
|
|
def has_new_posts(self, thread, helper, board):
|
|
mtime = helper.thread_mtime(thread)
|
|
if mtime == -1:
|
|
return True
|
|
|
|
t = self._threads["threads"][helper.item_unique_id(thread, board)]
|
|
|
|
return not t or helper.thread_mtime(thread) != t
|
|
|
|
def mark_thread_as_visited(self, thread, helper, board):
|
|
self._threads["threads"][helper.item_unique_id(thread, board)] = helper.thread_mtime(thread)
|
|
|
|
|
|
def publish_worker(queue: Queue, helper):
|
|
for item, board in queue_iter(queue):
|
|
try:
|
|
post_process(item, board, helper)
|
|
|
|
redis_publish(
|
|
rdb,
|
|
item=json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True),
|
|
item_project="chan",
|
|
item_subproject=CHAN,
|
|
item_type=helper.item_type(item),
|
|
item_category=board
|
|
)
|
|
except Exception as e:
|
|
logger.error(str(e) + ": " + traceback.format_exc())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
chan_helper = CHANS[CHAN]
|
|
save_folder = os.environ.get("CF_SAVE_FOLDER", "")
|
|
|
|
if save_folder:
|
|
chan_helper.save_folder = save_folder
|
|
|
|
state = ChanState(CHAN)
|
|
rdb = get_redis()
|
|
|
|
publish_q = Queue()
|
|
publish_thread = Thread(target=publish_worker, args=(publish_q, chan_helper))
|
|
publish_thread.setDaemon(True)
|
|
publish_thread.start()
|
|
|
|
s = ChanScanner(chan_helper)
|
|
while True:
|
|
try:
|
|
for p, b in s.all_posts():
|
|
publish_q.put((p, b))
|
|
except KeyboardInterrupt as e:
|
|
print("cleanup..")
|
|
publish_q.put(None)
|
|
break
|