diff --git a/app.py b/app.py index a69194e..cc145a2 100644 --- a/app.py +++ b/app.py @@ -604,14 +604,6 @@ def api_complete_task(): filename = None taskManager.complete_task(filename, task, task_result, name) - if filename and os.path.exists(filename): - os.remove(filename) - - # Handle task callback - callback = PostCrawlCallbackFactory.get_callback(task) - if callback: - callback.run(task_result, searchEngine) - return "Successfully logged task result and indexed files" else: diff --git a/tasks.py b/tasks.py index d0057e5..b0ddd1d 100644 --- a/tasks.py +++ b/tasks.py @@ -1,5 +1,7 @@ +import logging +import os + from apscheduler.schedulers.background import BackgroundScheduler -from werkzeug.datastructures import FileStorage from search.search import ElasticSearchEngine import json import database @@ -7,6 +9,8 @@ import urllib3 urllib3.disable_warnings() +logger = logging.getLogger("default") + class Task: @@ -61,8 +65,25 @@ class TaskManager: self.search = ElasticSearchEngine("od-database") self.db = database.Database("db.sqlite3") + self.to_index_queue = [] + + self.scheduler = BackgroundScheduler() + self.scheduler.add_job(self._do_index, "interval", seconds=0.1, max_instances=2) + self.scheduler._logger.setLevel("ERROR") + self.scheduler.start() + def complete_task(self, file_list, task, task_result, crawler_name): + self.to_index_queue.append((file_list, task, task_result, crawler_name)) + logger.info("Queued tasks: " + str(len(self.to_index_queue))) + + def _do_index(self): + if len(self.to_index_queue) == 0: + return + + from callbacks import PostCrawlCallbackFactory + + file_list, task, task_result, crawler_name = self.to_index_queue.pop() self.search.delete_docs(task_result.website_id) if file_list: @@ -80,6 +101,14 @@ class TaskManager: task_result.server_id = crawler_name + if file_list and os.path.exists(file_list): + os.remove(file_list) + + # Handle task callback + callback = PostCrawlCallbackFactory.get_callback(task) + if callback: + callback.run(task_result, self.search) + self.db.log_result(task_result) def queue_task(self, task: Task):