mirror of
https://github.com/simon987/od-database.git
synced 2025-04-19 18:36:44 +00:00
144 lines
4.5 KiB
Python
144 lines
4.5 KiB
Python
from crawl_server import logger
|
|
import os
|
|
from tasks import TaskResult, Task
|
|
import config
|
|
import requests
|
|
import json
|
|
from multiprocessing import Manager, Pool
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from datetime import datetime
|
|
from crawl_server.crawler import RemoteDirectoryCrawler
|
|
|
|
|
|
class TaskManager:
|
|
|
|
def __init__(self, max_processes=2):
|
|
self.pool = Pool(maxtasksperchild=1, processes=max_processes)
|
|
self.max_processes = max_processes
|
|
manager = Manager()
|
|
self.current_tasks = manager.list()
|
|
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(self.execute_queued_task, "interval", seconds=1)
|
|
scheduler.start()
|
|
|
|
def fetch_task(self):
|
|
try:
|
|
payload = {
|
|
"token": config.API_TOKEN
|
|
}
|
|
r = requests.post(config.SERVER_URL + "/task/get", data=payload)
|
|
|
|
if r.status_code == 200:
|
|
text = r.text
|
|
logger.info("Fetched task from server : " + text)
|
|
task_json = json.loads(text)
|
|
return Task(task_json["website_id"], task_json["url"])
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
raise e
|
|
|
|
@staticmethod
|
|
def push_result(task_result: TaskResult):
|
|
|
|
try:
|
|
|
|
logger.info("Uploading file list in small chunks")
|
|
filename = "./crawled/" + str(task_result.website_id) + ".json"
|
|
CHUNK_SIZE = 500000 * 10 # 5Mb
|
|
if os.path.exists(filename):
|
|
with open(filename) as f:
|
|
chunk = f.read(CHUNK_SIZE)
|
|
while chunk:
|
|
try:
|
|
payload = {
|
|
"token": config.API_TOKEN,
|
|
"website_id": task_result.website_id
|
|
}
|
|
|
|
files = {
|
|
"file_list": chunk
|
|
}
|
|
|
|
r = requests.post(config.SERVER_URL + "/task/upload", data=payload, files=files)
|
|
logger.info("RESPONSE: " + r.text)
|
|
except Exception as e:
|
|
logger.error("Exception while sending file_list chunk: " + str(e))
|
|
pass
|
|
chunk = f.read(CHUNK_SIZE)
|
|
|
|
payload = {
|
|
"token": config.API_TOKEN,
|
|
"result": json.dumps(task_result.to_json())
|
|
}
|
|
|
|
r = requests.post(config.SERVER_URL + "/task/complete", data=payload)
|
|
logger.info("RESPONSE: " + r.text)
|
|
|
|
if os.path.exists(filename):
|
|
os.remove(filename)
|
|
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def execute_queued_task(self):
|
|
|
|
if len(self.current_tasks) <= self.max_processes:
|
|
|
|
task = self.fetch_task()
|
|
|
|
if task:
|
|
logger.info("Submitted " + task.url + " to process pool")
|
|
self.current_tasks.append(task)
|
|
|
|
self.pool.apply_async(
|
|
TaskManager.run_task,
|
|
args=(task, self.current_tasks),
|
|
callback=TaskManager.task_complete,
|
|
error_callback=TaskManager.task_error
|
|
)
|
|
|
|
@staticmethod
|
|
def run_task(task, current_tasks):
|
|
|
|
result = TaskResult()
|
|
result.start_time = datetime.utcnow().timestamp()
|
|
result.website_id = task.website_id
|
|
|
|
logger.info("Starting task " + task.url)
|
|
|
|
crawler = RemoteDirectoryCrawler(task.url, config.CRAWL_SERVER_THREADS)
|
|
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
|
|
|
|
result.file_count = crawl_result.file_count
|
|
result.status_code = crawl_result.status_code
|
|
|
|
result.end_time = datetime.utcnow().timestamp()
|
|
logger.info("End task " + task.url)
|
|
|
|
return result, current_tasks
|
|
|
|
@staticmethod
|
|
def task_error(result):
|
|
logger.error("Uncaught exception during a task: ")
|
|
raise result
|
|
|
|
@staticmethod
|
|
def task_complete(result):
|
|
|
|
task_result, current_tasks = result
|
|
|
|
logger.info("Task completed, sending result to server")
|
|
logger.info("Status code: " + task_result.status_code)
|
|
logger.info("File count: " + str(task_result.file_count))
|
|
|
|
TaskManager.push_result(task_result)
|
|
|
|
for i, task in enumerate(current_tasks):
|
|
if task.website_id == task_result.website_id:
|
|
del current_tasks[i]
|
|
|
|
|