mirror of
https://github.com/simon987/od-database.git
synced 2025-04-19 10:26:44 +00:00
238 lines
8.2 KiB
Python
238 lines
8.2 KiB
Python
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from search.search import ElasticSearchEngine
|
|
from crawl_server.database import Task, TaskResult
|
|
import requests
|
|
from requests.exceptions import ConnectionError, ReadTimeout
|
|
import json
|
|
import database
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import urllib3
|
|
|
|
urllib3.disable_warnings()
|
|
|
|
|
|
class CrawlServer:
|
|
|
|
def __init__(self, url, name, slots, token, server_id=None):
|
|
self.url = url
|
|
self.name = name
|
|
self.slots = slots
|
|
self.used_slots = 0
|
|
self.token = token
|
|
self.id = server_id
|
|
|
|
def _generate_headers(self):
|
|
return {
|
|
"Content-Type": "application/json",
|
|
"Authorization": "Token " + self.token,
|
|
}
|
|
|
|
def queue_task(self, task: Task) -> bool:
|
|
|
|
print("Sending task to crawl server " + self.url)
|
|
try:
|
|
payload = json.dumps(task.to_json())
|
|
r = requests.post(self.url + "/task/put", headers=self._generate_headers(), data=payload, verify=False,
|
|
timeout=5)
|
|
print(r) # TODO: If the task could not be added, fallback to another server
|
|
return r.status_code == 200
|
|
except (ConnectionError, ReadTimeout):
|
|
return False
|
|
|
|
def pop_completed_tasks(self) -> list:
|
|
|
|
try:
|
|
r = requests.get(self.url + "/task/completed", headers=self._generate_headers(), verify=False, timeout=5)
|
|
if r.status_code != 200:
|
|
print("Problem while fetching completed tasks for '" + self.name + "': " + str(r.status_code))
|
|
print(r.text)
|
|
return []
|
|
return [
|
|
TaskResult(r["status_code"], r["file_count"], r["start_time"], r["end_time"], r["website_id"])
|
|
for r in json.loads(r.text)]
|
|
except (ConnectionError, ReadTimeout):
|
|
print("Crawl server cannot be reached @ " + self.url)
|
|
return []
|
|
|
|
def fetch_queued_tasks(self):
|
|
|
|
try:
|
|
r = requests.get(self.url + "/task/", headers=self._generate_headers(), verify=False, timeout=5)
|
|
|
|
if r.status_code != 200:
|
|
print("Problem while fetching queued tasks for '" + self.name + "' " + str(r.status_code))
|
|
print(r.text)
|
|
return None
|
|
|
|
return [
|
|
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
|
|
for t in json.loads(r.text)
|
|
]
|
|
except (ConnectionError, ReadTimeout):
|
|
return None
|
|
|
|
def fetch_current_tasks(self):
|
|
|
|
try:
|
|
r = requests.get(self.url + "/task/current", headers=self._generate_headers(), verify=False, timeout=5)
|
|
|
|
if r.status_code != 200:
|
|
print("Problem while fetching current tasks for '" + self.name + "' " + str(r.status_code))
|
|
print(r.text)
|
|
return None
|
|
|
|
return [
|
|
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
|
|
for t in json.loads(r.text)
|
|
]
|
|
except (ConnectionError, ReadTimeout):
|
|
return None
|
|
|
|
def fetch_website_files(self, website_id) -> str:
|
|
|
|
try:
|
|
r = requests.get(self.url + "/file_list/" + str(website_id) + "/", stream=True,
|
|
headers=self._generate_headers(), verify=False)
|
|
|
|
if r.status_code != 200:
|
|
print("Problem while fetching website files for '" + self.name + "': " + str(r.status_code))
|
|
print(r.text)
|
|
return ""
|
|
|
|
for line in r.iter_lines(chunk_size=1024 * 256):
|
|
yield line
|
|
except (ConnectionError, ReadTimeout):
|
|
return ""
|
|
|
|
def free_website_files(self, website_id) -> bool:
|
|
|
|
try:
|
|
r = requests.get(self.url + "/file_list/" + str(website_id) + "/free", headers=self._generate_headers(),
|
|
verify=False)
|
|
return r.status_code == 200
|
|
except (ConnectionError, ReadTimeout) as e:
|
|
print(e)
|
|
return False
|
|
|
|
def pop_queued_tasks(self):
|
|
try:
|
|
r = requests.get(self.url + "/task/pop_all", headers=self._generate_headers(), verify=False)
|
|
|
|
if r.status_code != 200:
|
|
print("Problem while popping tasks for '" + self.name + "': " + str(r.status_code))
|
|
print(r.text)
|
|
|
|
return [
|
|
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
|
|
for t in json.loads(r.text)
|
|
]
|
|
except (ConnectionError, ReadTimeout):
|
|
return []
|
|
|
|
|
|
class TaskDispatcher:
|
|
|
|
def __init__(self):
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(self.check_completed_tasks, "interval", seconds=10)
|
|
scheduler.start()
|
|
|
|
self.search = ElasticSearchEngine("od-database")
|
|
self.db = database.Database("db.sqlite3")
|
|
|
|
def check_completed_tasks(self):
|
|
|
|
for server in self.db.get_crawl_servers():
|
|
for task in server.pop_completed_tasks():
|
|
print("Completed task")
|
|
|
|
task.server_id = server.id
|
|
|
|
if task.file_count:
|
|
# All files are overwritten
|
|
self.search.delete_docs(task.website_id)
|
|
file_list = server.fetch_website_files(task.website_id)
|
|
self.search.import_json(file_list, task.website_id)
|
|
# File list is safe to delete once indexed
|
|
server.free_website_files(task.website_id)
|
|
|
|
# Update last_modified date for website
|
|
self.db.update_website_date_if_exists(task.website_id)
|
|
|
|
self.db.log_result(task)
|
|
|
|
def dispatch_task(self, task: Task):
|
|
self._get_available_crawl_server().queue_task(task)
|
|
|
|
def _get_available_crawl_server(self) -> CrawlServer:
|
|
|
|
queued_tasks_by_server = self._get_queued_tasks_by_server()
|
|
server_with_most_free_slots = None
|
|
most_free_slots = -10000
|
|
|
|
for server in queued_tasks_by_server:
|
|
free_slots = server.slots - len(queued_tasks_by_server[server])
|
|
if free_slots > most_free_slots:
|
|
server_with_most_free_slots = server
|
|
most_free_slots = free_slots
|
|
|
|
print("Dispatching task to '" +
|
|
server_with_most_free_slots.name + "' " +
|
|
str(most_free_slots) + " free out of " + str(server_with_most_free_slots.slots))
|
|
|
|
return server_with_most_free_slots
|
|
|
|
def get_queued_tasks(self):
|
|
|
|
queued_tasks_by_server = self._get_queued_tasks_by_server()
|
|
for queued_tasks in queued_tasks_by_server.values():
|
|
for task in queued_tasks:
|
|
yield task
|
|
|
|
def _get_queued_tasks_by_server(self) -> dict:
|
|
|
|
queued_tasks = dict()
|
|
pool = ThreadPoolExecutor(max_workers=10)
|
|
crawl_servers = self.db.get_crawl_servers()
|
|
responses = list(pool.map(lambda s: s.fetch_queued_tasks(), crawl_servers))
|
|
pool.shutdown()
|
|
|
|
for i, server in enumerate(crawl_servers):
|
|
if responses[i] is not None:
|
|
queued_tasks[server] = responses[i]
|
|
|
|
return queued_tasks
|
|
|
|
def get_current_tasks(self):
|
|
|
|
current_tasks_by_server = self._get_current_tasks_by_server()
|
|
for current_tasks in current_tasks_by_server.values():
|
|
for task in current_tasks:
|
|
yield task
|
|
|
|
def _get_current_tasks_by_server(self) -> dict:
|
|
|
|
current_tasks = dict()
|
|
pool = ThreadPoolExecutor(max_workers=10)
|
|
crawl_servers = self.db.get_crawl_servers()
|
|
responses = list(pool.map(lambda s: s.fetch_current_tasks(), crawl_servers))
|
|
pool.shutdown()
|
|
|
|
for i, server in enumerate(crawl_servers):
|
|
if responses[i] is not None:
|
|
current_tasks[server] = responses[i]
|
|
|
|
return current_tasks
|
|
|
|
def redispatch_queued(self) -> int:
|
|
|
|
counter = 0
|
|
for server in self.db.get_crawl_servers():
|
|
for task in server.pop_queued_tasks():
|
|
self.dispatch_task(task)
|
|
counter += 1
|
|
|
|
return counter
|
|
|
|
|