od-database/crawl_server/task_manager.py

140 lines
4.3 KiB
Python

from crawl_server import logger
import os
from tasks import TaskResult, Task
import config
import requests
import json
from multiprocessing import Manager, Pool
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from crawl_server.crawler import RemoteDirectoryCrawler
class TaskManager:
def __init__(self, max_processes=2):
self.pool = Pool(maxtasksperchild=1, processes=max_processes)
self.max_processes = max_processes
manager = Manager()
self.current_tasks = manager.list()
scheduler = BackgroundScheduler()
scheduler.add_job(self.execute_queued_task, "interval", seconds=1)
scheduler.start()
def fetch_task(self):
try:
payload = {
"token": config.API_TOKEN
}
r = requests.post(config.SERVER_URL + "/task/get", data=payload)
if r.status_code == 200:
text = r.text
logger.info("Fetched task from server : " + text)
task_json = json.loads(text)
return Task(task_json["website_id"], task_json["url"])
return None
except Exception as e:
raise e
@staticmethod
def push_result(task_result: TaskResult):
try:
logger.info("Uploading file list in small chunks")
filename = "./crawled/" + str(task_result.website_id) + ".json"
CHUNK_SIZE = 1000000 * 10
if os.path.exists(filename):
with open(filename) as f:
chunk = f.read(CHUNK_SIZE)
while chunk:
payload = {
"token": config.API_TOKEN,
"website_id": task_result.website_id
}
files = {
"file_list": chunk
}
r = requests.post(config.SERVER_URL + "/task/upload", data=payload, files=files)
logger.info("RESPONSE: " + r.text)
chunk = f.read(CHUNK_SIZE)
payload = {
"token": config.API_TOKEN,
"result": json.dumps(task_result.to_json())
}
r = requests.post(config.SERVER_URL + "/task/complete", data=payload)
logger.info("RESPONSE: " + r.text)
if os.path.exists(filename):
os.remove(filename)
except Exception as e:
raise e
def execute_queued_task(self):
if len(self.current_tasks) <= self.max_processes:
task = self.fetch_task()
if task:
logger.info("Submitted " + task.url + " to process pool")
self.current_tasks.append(task)
self.pool.apply_async(
TaskManager.run_task,
args=(task, self.current_tasks),
callback=TaskManager.task_complete,
error_callback=TaskManager.task_error
)
@staticmethod
def run_task(task, current_tasks):
result = TaskResult()
result.start_time = datetime.utcnow().timestamp()
result.website_id = task.website_id
logger.info("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, config.CRAWL_SERVER_THREADS)
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
result.file_count = crawl_result.file_count
result.status_code = crawl_result.status_code
result.end_time = datetime.utcnow().timestamp()
logger.info("End task " + task.url)
return result, current_tasks
@staticmethod
def task_error(result):
logger.error("Uncaught exception during a task: ")
raise result
@staticmethod
def task_complete(result):
task_result, current_tasks = result
logger.info("Task completed, sending result to server")
logger.info("Status code: " + task_result.status_code)
logger.info("File count: " + str(task_result.file_count))
TaskManager.push_result(task_result)
for i, task in enumerate(current_tasks):
if task.website_id == task_result.website_id:
del current_tasks[i]