import logging import os from apscheduler.schedulers.background import BackgroundScheduler from search.search import ElasticSearchEngine import json import database import urllib3 urllib3.disable_warnings() logger = logging.getLogger("default") class Task: def __init__(self, website_id: int, url: str, priority: int = 1, callback_type: str = None, callback_args: str = None): self.website_id = website_id self.url = url self.priority = priority self.callback_type = callback_type self.callback_args = json.loads(callback_args) if callback_args else {} def to_json(self): return { "website_id": self.website_id, "url": self.url, "priority": self.priority, "callback_type": self.callback_type, "callback_args": json.dumps(self.callback_args) } def __str__(self): return json.dumps(self.to_json()) def __repr__(self): return self.__str__() class TaskResult: def __init__(self, status_code=None, file_count=0, start_time=0, end_time=0, website_id=0, server_name=""): self.status_code = status_code self.file_count = file_count self.start_time = start_time self.end_time = end_time self.website_id = website_id self.server_name = server_name def to_json(self): return { "status_code": self.status_code, "file_count": self.file_count, "start_time": self.start_time, "end_time": self.end_time, "website_id": self.website_id } class TaskManager: def __init__(self): self.search = ElasticSearchEngine("od-database") self.db = database.Database("db.sqlite3") self.to_index_queue = [] self.scheduler = BackgroundScheduler() self.scheduler.add_job(self._do_index, "interval", seconds=0.1, max_instances=2) self.scheduler._logger.setLevel("ERROR") self.scheduler.start() def complete_task(self, file_list, task, task_result, crawler_name): self.to_index_queue.append((file_list, task, task_result, crawler_name)) logger.info("Queued tasks: " + str(len(self.to_index_queue))) def _do_index(self): if len(self.to_index_queue) == 0: return from callbacks import PostCrawlCallbackFactory file_list, task, task_result, crawler_name = self.to_index_queue.pop() self.search.delete_docs(task_result.website_id) if file_list: def iter_lines(): with open(file_list, "r") as f: line = f.readline() while line: yield line line = f.readline() self.search.import_json(iter_lines(), task.website_id) self.db.update_website_date_if_exists(task.website_id) task_result.server_id = crawler_name if file_list and os.path.exists(file_list): os.remove(file_list) # Handle task callback callback = PostCrawlCallbackFactory.get_callback(task) if callback: callback.run(task_result, self.search) self.db.log_result(task_result) def queue_task(self, task: Task): self.db.put_task(task) print("Queued task and made it available to crawlers: " + str(task.website_id)) def get_queued_tasks(self) -> list: return self.db.get_tasks()