Merge remote-tracking branch 'origin/master'

This commit is contained in:
Simon 2018-11-18 11:07:45 -05:00
commit 6e491513bf
2 changed files with 30 additions and 9 deletions

8
app.py
View File

@ -604,14 +604,6 @@ def api_complete_task():
filename = None filename = None
taskManager.complete_task(filename, task, task_result, name) taskManager.complete_task(filename, task, task_result, name)
if filename and os.path.exists(filename):
os.remove(filename)
# Handle task callback
callback = PostCrawlCallbackFactory.get_callback(task)
if callback:
callback.run(task_result, searchEngine)
return "Successfully logged task result and indexed files" return "Successfully logged task result and indexed files"
else: else:

View File

@ -1,5 +1,7 @@
import logging
import os
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from werkzeug.datastructures import FileStorage
from search.search import ElasticSearchEngine from search.search import ElasticSearchEngine
import json import json
import database import database
@ -7,6 +9,8 @@ import urllib3
urllib3.disable_warnings() urllib3.disable_warnings()
logger = logging.getLogger("default")
class Task: class Task:
@ -61,8 +65,25 @@ class TaskManager:
self.search = ElasticSearchEngine("od-database") self.search = ElasticSearchEngine("od-database")
self.db = database.Database("db.sqlite3") self.db = database.Database("db.sqlite3")
self.to_index_queue = []
self.scheduler = BackgroundScheduler()
self.scheduler.add_job(self._do_index, "interval", seconds=0.1, max_instances=2)
self.scheduler._logger.setLevel("ERROR")
self.scheduler.start()
def complete_task(self, file_list, task, task_result, crawler_name): def complete_task(self, file_list, task, task_result, crawler_name):
self.to_index_queue.append((file_list, task, task_result, crawler_name))
logger.info("Queued tasks: " + str(len(self.to_index_queue)))
def _do_index(self):
if len(self.to_index_queue) == 0:
return
from callbacks import PostCrawlCallbackFactory
file_list, task, task_result, crawler_name = self.to_index_queue.pop()
self.search.delete_docs(task_result.website_id) self.search.delete_docs(task_result.website_id)
if file_list: if file_list:
@ -80,6 +101,14 @@ class TaskManager:
task_result.server_id = crawler_name task_result.server_id = crawler_name
if file_list and os.path.exists(file_list):
os.remove(file_list)
# Handle task callback
callback = PostCrawlCallbackFactory.get_callback(task)
if callback:
callback.run(task_result, self.search)
self.db.log_result(task_result) self.db.log_result(task_result)
def queue_task(self, task: Task): def queue_task(self, task: Task):