Crawl server now holds at most max_workers + 1 tasks in pool to minimize waiting time and to avoid loss of too many tasks in case of crash/restart

This commit is contained in:
Simon 2018-06-12 22:28:36 -04:00
parent 24ef493245
commit 2fe81e4b06
5 changed files with 73 additions and 39 deletions

9
app.py
View File

@ -134,6 +134,7 @@ def contribute():
def home(): def home():
stats = {} stats = {}
stats = searchEngine.get_global_stats()
current_websites = ", ".join(task.url for task in taskDispatcher.get_current_tasks()) current_websites = ", ".join(task.url for task in taskDispatcher.get_current_tasks())
return render_template("home.html", stats=stats, current_websites=current_websites) return render_template("home.html", stats=stats, current_websites=current_websites)
@ -195,7 +196,7 @@ def enqueue():
@app.route("/enqueue_bulk", methods=["POST"]) @app.route("/enqueue_bulk", methods=["POST"])
def enqueue_bulk(): def enqueue_bulk():
if recaptcha.verify(): # if recaptcha.verify():
urls = request.form.get("urls") urls = request.form.get("urls")
if urls: if urls:
@ -216,9 +217,9 @@ def enqueue_bulk():
else: else:
return abort(500) return abort(500)
else: # else:
flash("<strong>Error:</strong> Invalid captcha please try again", "danger") # flash("<strong>Error:</strong> Invalid captcha please try again", "danger")
return redirect("/submit") # return redirect("/submit")
@app.route("/admin") @app.route("/admin")

View File

@ -4,7 +4,7 @@ from crawl_server.task_manager import TaskManager, Task, TaskResult
import os import os
app = Flask(__name__) app = Flask(__name__)
tm = TaskManager("tm_db.sqlite3") tm = TaskManager("tm_db.sqlite3", 2)
@app.route("/task/") @app.route("/task/")

View File

@ -8,15 +8,16 @@ from crawl_server.crawler import RemoteDirectoryCrawler
class TaskManager: class TaskManager:
def __init__(self, db_path, max_processes=8): def __init__(self, db_path, max_processes=4):
self.db_path = db_path self.db_path = db_path
self.db = TaskManagerDatabase(db_path) self.db = TaskManagerDatabase(db_path)
self.pool = ProcessPoolExecutor(max_workers=max_processes) self.pool = ProcessPoolExecutor(max_workers=max_processes)
self.max_processes = max_processes
manager = Manager() manager = Manager()
self.current_tasks = manager.list() self.current_tasks = manager.list()
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
scheduler.add_job(self.execute_queued_task, "interval", seconds=5) scheduler.add_job(self.execute_queued_task, "interval", seconds=1)
scheduler.start() scheduler.start()
def put_task(self, task: Task): def put_task(self, task: Task):
@ -33,10 +34,11 @@ class TaskManager:
def execute_queued_task(self): def execute_queued_task(self):
if len(self.current_tasks) <= self.max_processes:
task = self.db.pop_task() task = self.db.pop_task()
if task: if task:
print("pooled " + task.url) print("pooled " + task.url)
self.current_tasks.append(task)
self.pool.submit( self.pool.submit(
TaskManager.run_task, TaskManager.run_task,
@ -51,8 +53,6 @@ class TaskManager:
print("Starting task " + task.url) print("Starting task " + task.url)
current_tasks.append(task)
crawler = RemoteDirectoryCrawler(task.url, 100) crawler = RemoteDirectoryCrawler(task.url, 100)
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
@ -78,7 +78,7 @@ class TaskManager:
db.log_result(task_result) db.log_result(task_result)
print("Logged result to DB") print("Logged result to DB")
for task in current_tasks: for i, task in enumerate(current_tasks):
if task.website_id == task_result.website_id: if task.website_id == task_result.website_id:
current_tasks.remove(current_tasks) del current_tasks[i]

View File

@ -213,3 +213,19 @@ class ElasticSearchEngine(SearchEngine):
src = hit["_source"] src = hit["_source"]
yield base_url + src["path"] + ("/" if src["path"] != "" else "") + src["name"] + \ yield base_url + src["path"] + ("/" if src["path"] != "" else "") + src["name"] + \
("." if src["ext"] != "" else "") + src["ext"] ("." if src["ext"] != "" else "") + src["ext"]
def get_global_stats(self):
result = self.es.search(body={
"query": {
"match_all": {}
},
"aggs": {
"total_size": {
"extended_stats": {"field": "size"}
}
},
"size": 0
})
print(result)

17
task.py
View File

@ -2,6 +2,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
from search.search import ElasticSearchEngine from search.search import ElasticSearchEngine
from crawl_server.database import Task, TaskResult from crawl_server.database import Task, TaskResult
import requests import requests
from requests.exceptions import ConnectionError
import json import json
from reddit_bot import RedditBot from reddit_bot import RedditBot
import praw import praw
@ -19,38 +20,54 @@ class CrawlServer:
def queue_task(self, task: Task) -> bool: def queue_task(self, task: Task) -> bool:
print("Sending task to crawl server " + self.url) print("Sending task to crawl server " + self.url)
try:
payload = json.dumps(task.to_json()) payload = json.dumps(task.to_json())
r = requests.post(self.url + "/task/put", headers=CrawlServer.headers, data=payload) r = requests.post(self.url + "/task/put", headers=CrawlServer.headers, data=payload)
print(r) print(r)
return r.status_code == 200 return r.status_code == 200
except ConnectionError:
return False
def get_completed_tasks(self) -> list: def get_completed_tasks(self) -> list:
try:
r = requests.get(self.url + "/task/completed") r = requests.get(self.url + "/task/completed")
return [ return [
TaskResult(r["status_code"], r["file_count"], r["start_time"], r["end_time"], r["website_id"]) TaskResult(r["status_code"], r["file_count"], r["start_time"], r["end_time"], r["website_id"])
for r in json.loads(r.text)] for r in json.loads(r.text)]
except ConnectionError:
return []
def get_queued_tasks(self) -> list: def get_queued_tasks(self) -> list:
try:
r = requests.get(self.url + "/task/") r = requests.get(self.url + "/task/")
return [ return [
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
for t in json.loads(r.text) for t in json.loads(r.text)
] ]
except ConnectionError:
return []
def get_current_tasks(self): def get_current_tasks(self):
try:
r = requests.get(self.url + "/task/current") r = requests.get(self.url + "/task/current")
return [ return [
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
for t in json.loads(r.text) for t in json.loads(r.text)
] ]
except ConnectionError:
print("Server cannot be reached " + self.url)
return []
def get_file_list(self, website_id) -> str: def get_file_list(self, website_id) -> str:
try:
r = requests.get(self.url + "/file_list/" + str(website_id) + "/") r = requests.get(self.url + "/file_list/" + str(website_id) + "/")
return r.text return r.text
except ConnectionError:
return ""
class TaskDispatcher: class TaskDispatcher: