od-database/crawl_server/task_manager.py
2018-07-14 17:36:16 -04:00

122 lines
3.5 KiB
Python

from crawl_server import logger
from tasks import TaskResult, Task
import config
import requests
import json
from multiprocessing import Manager, Pool
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from crawl_server.crawler import RemoteDirectoryCrawler
class TaskManager:
def __init__(self, max_processes=2):
self.pool = Pool(maxtasksperchild=1, processes=max_processes)
self.max_processes = max_processes
manager = Manager()
self.current_tasks = manager.list()
scheduler = BackgroundScheduler()
scheduler.add_job(self.execute_queued_task, "interval", seconds=1)
scheduler.start()
def fetch_task(self):
try:
payload = {
"token": config.API_TOKEN
}
r = requests.post(config.SERVER_URL + "/task/get", data=payload)
if r.status_code == 200:
text = r.text
logger.info("Fetched task from server : " + text)
task_json = json.loads(text)
return Task(task_json["website_id"], task_json["url"])
return None
except Exception as e:
raise e
@staticmethod
def push_result(task_result: TaskResult):
try:
payload = {
"token": config.API_TOKEN,
"result": json.dumps(task_result.to_json())
}
files = {
"file_list": open("./crawled/" + str(task_result.website_id) + ".json")
}
r = requests.post(config.SERVER_URL + "/task/complete", data=payload, files=files)
logger.info("RESPONSE: " + r.text)
except Exception as e:
raise e
def execute_queued_task(self):
if len(self.current_tasks) <= self.max_processes:
task = self.fetch_task()
if task:
logger.info("Submitted " + task.url + " to process pool")
self.current_tasks.append(task)
self.pool.apply_async(
TaskManager.run_task,
args=(task, self.current_tasks),
callback=TaskManager.task_complete,
error_callback=TaskManager.task_error
)
@staticmethod
def run_task(task, current_tasks):
result = TaskResult()
result.start_time = datetime.utcnow().timestamp()
result.website_id = task.website_id
logger.info("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, config.CRAWL_SERVER_THREADS)
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
del crawler
result.file_count = crawl_result.file_count
result.status_code = crawl_result.status_code
result.end_time = datetime.utcnow().timestamp()
logger.info("End task " + task.url)
return result, current_tasks
@staticmethod
def task_error(result):
logger.error("Uncaught exception during a task: ")
raise result
@staticmethod
def task_complete(result):
task_result, current_tasks = result
logger.info("Task completed, sending result to server")
logger.info("Status code: " + task_result.status_code)
logger.info("File count: " + str(task_result.file_count))
TaskManager.push_result(task_result)
for i, task in enumerate(current_tasks):
if task.website_id == task_result.website_id:
del current_tasks[i]