diff --git a/app.py b/app.py index b0bfc32..2b8a8c5 100644 --- a/app.py +++ b/app.py @@ -134,6 +134,7 @@ def contribute(): def home(): stats = {} + stats = searchEngine.get_global_stats() current_websites = ", ".join(task.url for task in taskDispatcher.get_current_tasks()) return render_template("home.html", stats=stats, current_websites=current_websites) @@ -195,7 +196,7 @@ def enqueue(): @app.route("/enqueue_bulk", methods=["POST"]) def enqueue_bulk(): - if recaptcha.verify(): + # if recaptcha.verify(): urls = request.form.get("urls") if urls: @@ -216,9 +217,9 @@ def enqueue_bulk(): else: return abort(500) - else: - flash("Error: Invalid captcha please try again", "danger") - return redirect("/submit") + # else: + # flash("Error: Invalid captcha please try again", "danger") + # return redirect("/submit") @app.route("/admin") diff --git a/crawl_server/server.py b/crawl_server/server.py index 8da3be3..0cfa15d 100644 --- a/crawl_server/server.py +++ b/crawl_server/server.py @@ -4,7 +4,7 @@ from crawl_server.task_manager import TaskManager, Task, TaskResult import os app = Flask(__name__) -tm = TaskManager("tm_db.sqlite3") +tm = TaskManager("tm_db.sqlite3", 2) @app.route("/task/") diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index 7e5a44a..66a7b91 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -8,15 +8,16 @@ from crawl_server.crawler import RemoteDirectoryCrawler class TaskManager: - def __init__(self, db_path, max_processes=8): + def __init__(self, db_path, max_processes=4): self.db_path = db_path self.db = TaskManagerDatabase(db_path) self.pool = ProcessPoolExecutor(max_workers=max_processes) + self.max_processes = max_processes manager = Manager() self.current_tasks = manager.list() scheduler = BackgroundScheduler() - scheduler.add_job(self.execute_queued_task, "interval", seconds=5) + scheduler.add_job(self.execute_queued_task, "interval", seconds=1) scheduler.start() def put_task(self, task: Task): @@ -33,15 +34,16 @@ class TaskManager: def execute_queued_task(self): - task = self.db.pop_task() - if task: + if len(self.current_tasks) <= self.max_processes: + task = self.db.pop_task() + if task: + print("pooled " + task.url) + self.current_tasks.append(task) - print("pooled " + task.url) - - self.pool.submit( - TaskManager.run_task, - task, self.db_path, self.current_tasks - ).add_done_callback(TaskManager.task_complete) + self.pool.submit( + TaskManager.run_task, + task, self.db_path, self.current_tasks + ).add_done_callback(TaskManager.task_complete) @staticmethod def run_task(task, db_path, current_tasks): @@ -51,8 +53,6 @@ class TaskManager: print("Starting task " + task.url) - current_tasks.append(task) - crawler = RemoteDirectoryCrawler(task.url, 100) crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") @@ -78,7 +78,7 @@ class TaskManager: db.log_result(task_result) print("Logged result to DB") - for task in current_tasks: + for i, task in enumerate(current_tasks): if task.website_id == task_result.website_id: - current_tasks.remove(current_tasks) + del current_tasks[i] diff --git a/search/search.py b/search/search.py index 623c09a..b9e2c73 100644 --- a/search/search.py +++ b/search/search.py @@ -213,3 +213,19 @@ class ElasticSearchEngine(SearchEngine): src = hit["_source"] yield base_url + src["path"] + ("/" if src["path"] != "" else "") + src["name"] + \ ("." if src["ext"] != "" else "") + src["ext"] + + def get_global_stats(self): + + result = self.es.search(body={ + "query": { + "match_all": {} + }, + "aggs": { + "total_size": { + "extended_stats": {"field": "size"} + } + }, + "size": 0 + }) + + print(result) diff --git a/task.py b/task.py index 8c6063b..2c22288 100644 --- a/task.py +++ b/task.py @@ -2,6 +2,7 @@ from apscheduler.schedulers.background import BackgroundScheduler from search.search import ElasticSearchEngine from crawl_server.database import Task, TaskResult import requests +from requests.exceptions import ConnectionError import json from reddit_bot import RedditBot import praw @@ -19,38 +20,54 @@ class CrawlServer: def queue_task(self, task: Task) -> bool: print("Sending task to crawl server " + self.url) - payload = json.dumps(task.to_json()) - r = requests.post(self.url + "/task/put", headers=CrawlServer.headers, data=payload) - print(r) - return r.status_code == 200 + try: + payload = json.dumps(task.to_json()) + r = requests.post(self.url + "/task/put", headers=CrawlServer.headers, data=payload) + print(r) + return r.status_code == 200 + except ConnectionError: + return False def get_completed_tasks(self) -> list: - r = requests.get(self.url + "/task/completed") - return [ - TaskResult(r["status_code"], r["file_count"], r["start_time"], r["end_time"], r["website_id"]) - for r in json.loads(r.text)] + try: + r = requests.get(self.url + "/task/completed") + return [ + TaskResult(r["status_code"], r["file_count"], r["start_time"], r["end_time"], r["website_id"]) + for r in json.loads(r.text)] + except ConnectionError: + return [] def get_queued_tasks(self) -> list: - r = requests.get(self.url + "/task/") - return [ - Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) - for t in json.loads(r.text) - ] + try: + r = requests.get(self.url + "/task/") + return [ + Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) + for t in json.loads(r.text) + ] + except ConnectionError: + return [] def get_current_tasks(self): - r = requests.get(self.url + "/task/current") - return [ - Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) - for t in json.loads(r.text) - ] + try: + r = requests.get(self.url + "/task/current") + return [ + Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) + for t in json.loads(r.text) + ] + except ConnectionError: + print("Server cannot be reached " + self.url) + return [] def get_file_list(self, website_id) -> str: - r = requests.get(self.url + "/file_list/" + str(website_id) + "/") - return r.text + try: + r = requests.get(self.url + "/file_list/" + str(website_id) + "/") + return r.text + except ConnectionError: + return "" class TaskDispatcher: