From fe1d29aaeaeb3d0a50995181270795cc589f4b86 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 14 Jul 2018 17:31:18 -0400 Subject: [PATCH] Crawl tasks are now fetched by the crawlers instead of pushed by the server --- README.md | 6 +- app.py | 108 +++++----- crawl_server/callbacks.py => callbacks.py | 2 +- crawl_server/__init__.py | 4 +- crawl_server/database.py | 145 ------------- crawl_server/run.py | 8 + crawl_server/server.py | 104 ---------- crawl_server/task_db_init.sql | 19 -- crawl_server/task_manager.py | 79 +++++--- database.py | 176 ++++++++++------ init_script.sql | 34 ++-- search/search.py | 2 +- stress_test.py | 4 +- task.py | 237 ---------------------- tasks.py | 90 ++++++++ templates/crawl_logs.html | 8 +- templates/dashboard.html | 91 +++------ templates/home.html | 3 - templates/stats.html | 4 +- tmp/README.md | 1 + 20 files changed, 376 insertions(+), 749 deletions(-) rename crawl_server/callbacks.py => callbacks.py (97%) delete mode 100644 crawl_server/database.py create mode 100644 crawl_server/run.py delete mode 100644 crawl_server/server.py delete mode 100644 crawl_server/task_db_init.sql delete mode 100644 task.py create mode 100644 tasks.py create mode 100644 tmp/README.md diff --git a/README.md b/README.md index 1205dbd..885a20d 100644 --- a/README.md +++ b/README.md @@ -22,9 +22,6 @@ FLASK_SECRET = "" RESULTS_PER_PAGE = (25, 50, 100, 250, 500, 1000) # Headers for http crawler HEADERS = {} -# Token for the crawl server, used by the server to communicate to the crawl server -CRAWL_SERVER_TOKEN = "" -CRAWL_SERVER_PORT = 5001 # Number of crawler instances (one per task) CRAWL_SERVER_PROCESSES = 3 # Number of threads per crawler instance @@ -33,6 +30,9 @@ CRAWL_SERVER_THREADS = 20 SUBMIT_FTP = False # Allow http(s) websites in /submit SUBMIT_HTTP = True + +SERVER_URL = "http://localhost/api" +API_TOKEN = "5817926d-f2f9-4422-a411-a98f1bfe4b6c" ``` ## Running the crawl server diff --git a/app.py b/app.py index 355ec35..a898063 100644 --- a/app.py +++ b/app.py @@ -3,13 +3,14 @@ import json from urllib.parse import urlparse import os import time +import datetime import itertools from database import Database, Website, InvalidQueryException from flask_recaptcha import ReCaptcha import od_util import config from flask_caching import Cache -from task import TaskDispatcher, Task, CrawlServer +from tasks import TaskManager, Task, TaskResult from search.search import ElasticSearchEngine app = Flask(__name__) @@ -26,12 +27,12 @@ app.jinja_env.globals.update(truncate_path=od_util.truncate_path) app.jinja_env.globals.update(get_color=od_util.get_color) app.jinja_env.globals.update(get_mime=od_util.get_category) -taskDispatcher = TaskDispatcher() +taskManager = TaskManager() searchEngine = ElasticSearchEngine("od-database") @app.template_filter("date_format") -def datetime_format(value, format='%Y-%m-%d'): +def date_format(value, format='%Y-%m-%d'): return time.strftime(format, time.gmtime(value)) @@ -40,6 +41,11 @@ def datetime_format(value, format='%Y-%m-%d %H:%M:%S'): return time.strftime(format, time.gmtime(value)) +@app.template_filter("from_timestamp") +def from_timestamp(value): + return datetime.datetime.fromtimestamp(value) + + @app.route("/dl") def downloads(): try: @@ -53,7 +59,7 @@ def downloads(): @app.route("/stats") def stats_page(): - crawl_server_stats = db.get_stats_by_server() + crawl_server_stats = db.get_stats_by_crawler() return render_template("stats.html", crawl_server_stats=crawl_server_stats) @@ -136,7 +142,7 @@ def random_website(): def admin_redispatch_queued(): if "username" in session: - count = taskDispatcher.redispatch_queued() + count = taskManager.redispatch_queued() flash("Re-dispatched " + str(count) + " tasks", "success") return redirect("/dashboard") @@ -145,7 +151,7 @@ def admin_redispatch_queued(): def get_empty_websites(): - current_tasks = itertools.chain(taskDispatcher.get_queued_tasks(), taskDispatcher.get_current_tasks()) + current_tasks = taskManager.get_queued_tasks() queued_websites = [task.website_id for task in current_tasks] all_websites = db.get_all_websites() @@ -180,7 +186,7 @@ def admin_queue_empty_websites(): for website_id in get_empty_websites(): website = db.get_website_by_id(website_id) task = Task(website.id, website.url, 1) - taskDispatcher.dispatch_task(task) + taskManager.queue_task(task) flash("Dispatched empty websites", "success") return redirect("/dashboard") @@ -221,7 +227,7 @@ def admin_rescan_website(website_id): if website: priority = request.args.get("priority") if "priority" in request.args else 1 task = Task(website_id, website.url, priority) - taskDispatcher.dispatch_task(task) + taskManager.queue_task(task) flash("Enqueued rescan task", "success") else: @@ -320,16 +326,14 @@ def home(): try: stats = searchEngine.get_global_stats() stats["website_count"] = len(db.get_all_websites()) - current_websites = ", ".join(task.url for task in taskDispatcher.get_current_tasks()) except: stats = {} - current_websites = None - return render_template("home.html", stats=stats, current_websites=current_websites) + return render_template("home.html", stats=stats) @app.route("/submit") def submit(): - queued_websites = taskDispatcher.get_queued_tasks() + queued_websites = taskManager.get_queued_tasks() return render_template("submit.html", queue=queued_websites, recaptcha=recaptcha, show_captcha=config.CAPTCHA_SUBMIT) @@ -362,7 +366,7 @@ def try_enqueue(url): web_id = db.insert_website(Website(url, str(request.remote_addr), str(request.user_agent))) task = Task(web_id, url, priority=1) - taskDispatcher.dispatch_task(task) + taskManager.queue_task(task) return "The website has been added to the queue", "success" @@ -450,9 +454,8 @@ def admin_dashboard(): tokens = db.get_tokens() blacklist = db.get_blacklist() - crawl_servers = db.get_crawl_servers() - return render_template("dashboard.html", api_tokens=tokens, blacklist=blacklist, crawl_servers=crawl_servers) + return render_template("dashboard.html", api_tokens=tokens, blacklist=blacklist) else: return abort(403) @@ -516,52 +519,59 @@ def admin_crawl_logs(): return abort(403) -@app.route("/crawl_server/add", methods=["POST"]) -def admin_add_crawl_server(): - if "username" in session: +@app.route("/api/task/get", methods=["POST"]) +def api_get_task(): + token = request.form.get("token") + name = db.check_api_token(token) - server = CrawlServer( - request.form.get("url"), - request.form.get("name"), - request.form.get("slots"), - request.form.get("token") - ) - - db.add_crawl_server(server) - flash("Added crawl server", "success") - return redirect("/dashboard") + if name: + task = db.pop_task(name) + if task: + print("Assigning task " + str(task.website_id) + " to " + name) + return Response(str(task), mimetype="application/json") + else: + return abort(404) else: return abort(403) -@app.route("/crawl_server//delete") -def admin_delete_crawl_server(server_id): - if "username" in session: +@app.route("/api/task/complete", methods=["POST"]) +def api_complete_task(): + token = request.form.get("token") + tr = json.loads(request.form.get("result")) + print(tr) + task_result = TaskResult(tr["status_code"], tr["file_count"], tr["start_time"], tr["end_time"], tr["website_id"]) - db.remove_crawl_server(server_id) - flash("Deleted crawl server", "success") - return redirect("/dashboard") + name = db.check_api_token(token) - else: - abort(403) + if name: + print("Task for " + str(task_result.website_id) + " completed by " + name) + task = db.complete_task(task_result.website_id, name) + if task: -@app.route("/crawl_server//update", methods=["POST"]) -def admin_update_crawl_server(server_id): - crawl_servers = db.get_crawl_servers() - for server in crawl_servers: - if server.id == server_id: - new_slots = request.form.get("slots") if "slots" in request.form else server.slots - new_name = request.form.get("name") if "name" in request.form else server.name - new_url = request.form.get("url") if "url" in request.form else server.url + if "file_list" in request.files: + file = request.files['file_list'] + filename = "./tmp/" + str(task_result.website_id) + ".json" + print("Saving temp file " + filename + " ...") + file.save(filename) + print("Done") + else: + filename = None - db.update_crawl_server(server_id, new_url, new_name, new_slots) - flash("Updated crawl server", "success") - return redirect("/dashboard") + taskManager.complete_task(filename, task, task_result, name) - flash("Couldn't find crawl server with this id: " + str(server_id), "danger") - return redirect("/dashboard") + if os.path.exists(filename): + os.remove(filename) + + # TODO: handle callback here + return "Successfully logged task result and indexed files" + + else: + print("ERROR: " + name + " indicated that task for " + str(task_result.website_id) + + " was completed but there is no such task in the database.") + print("No such task") if __name__ == '__main__': diff --git a/crawl_server/callbacks.py b/callbacks.py similarity index 97% rename from crawl_server/callbacks.py rename to callbacks.py index ae0faee..89bda6c 100644 --- a/crawl_server/callbacks.py +++ b/callbacks.py @@ -1,4 +1,4 @@ -from crawl_server.database import Task +from tasks import Task from crawl_server.reddit_bot import RedditBot import praw diff --git a/crawl_server/__init__.py b/crawl_server/__init__.py index 2623852..fc75507 100644 --- a/crawl_server/__init__.py +++ b/crawl_server/__init__.py @@ -1,5 +1,6 @@ import logging -from logging import FileHandler +import sys +from logging import FileHandler, StreamHandler logger = logging.getLogger("default") logger.setLevel(logging.DEBUG) @@ -8,3 +9,4 @@ formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') file_handler = FileHandler("crawl_server.log") file_handler.setFormatter(formatter) logger.addHandler(file_handler) +logger.addHandler(StreamHandler(sys.stdout)) diff --git a/crawl_server/database.py b/crawl_server/database.py deleted file mode 100644 index 1493efc..0000000 --- a/crawl_server/database.py +++ /dev/null @@ -1,145 +0,0 @@ -from crawl_server import logger -import os -import json -import sqlite3 - - -class TaskResult: - - def __init__(self, status_code=None, file_count=0, start_time=0, - end_time=0, website_id=0, indexed_time=0, server_name=""): - self.status_code = status_code - self.file_count = file_count - self.start_time = start_time - self.end_time = end_time - self.website_id = website_id - self.indexed_time = indexed_time - self.server_name = server_name - - def to_json(self): - return { - "status_code": self.status_code, - "file_count": self.file_count, - "start_time": self.start_time, - "end_time": self.end_time, - "website_id": self.website_id, - "indexed_time": self.indexed_time - } - - -class Task: - - def __init__(self, website_id: int, url: str, priority: int = 1, - callback_type: str = None, callback_args: str = None): - self.website_id = website_id - self.url = url - self.priority = priority - self.callback_type = callback_type - self.callback_args = json.loads(callback_args) if callback_args else {} - - def to_json(self): - return { - "website_id": self.website_id, - "url": self.url, - "priority": self.priority, - "callback_type": self.callback_type, - "callback_args": json.dumps(self.callback_args) - } - - def __str__(self): - return json.dumps(self.to_json()) - - def __repr__(self): - return self.__str__() - - -class TaskManagerDatabase: - - def __init__(self, db_path): - self.db_path = db_path - - if not os.path.exists(db_path): - self.init_database() - logger.info("Initialised database") - - def init_database(self): - - with open("task_db_init.sql", "r") as f: - init_script = f.read() - - with sqlite3.connect(self.db_path) as conn: - conn.executescript(init_script) - conn.commit() - - def pop_task(self): - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("SELECT id, website_id, url, priority, callback_type, callback_args" - " FROM Queue ORDER BY priority DESC, Queue.id ASC LIMIT 1") - task = cursor.fetchone() - - if task: - cursor.execute("DELETE FROM Queue WHERE id=?", (task[0],)) - conn.commit() - return Task(task[1], task[2], task[3], task[4], task[5]) - else: - return None - - def pop_all_tasks(self): - - tasks = self.get_tasks() - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("DELETE FROM Queue") - return tasks - - def put_task(self, task: Task): - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("INSERT INTO Queue (website_id, url, priority, callback_type, callback_args) " - "VALUES (?,?,?,?,?)", - (task.website_id, task.url, task.priority, - task.callback_type, json.dumps(task.callback_args))) - conn.commit() - - def get_tasks(self): - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("SELECT website_id, url, priority, callback_type, callback_args FROM Queue") - tasks = cursor.fetchall() - - return [Task(t[0], t[1], t[2], t[3], t[4]) for t in tasks] - - def log_result(self, result: TaskResult): - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("INSERT INTO TaskResult (website_id, status_code, file_count, start_time, end_time) " - "VALUES (?,?,?,?,?)", (result.website_id, result.status_code, result.file_count, - result.start_time, result.end_time)) - conn.commit() - - def get_non_indexed_results(self): - """Get a list of new TaskResults since the last call of this method""" - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("SELECT status_code, file_count, start_time, end_time, website_id" - " FROM TaskResult WHERE indexed_time IS NULL") - db_result = cursor.fetchall() - - cursor.execute("UPDATE TaskResult SET indexed_time=CURRENT_TIMESTAMP WHERE indexed_time IS NULL") - conn.commit() - - return [TaskResult(r[0], r[1], r[2], r[3], r[4]) for r in db_result] - diff --git a/crawl_server/run.py b/crawl_server/run.py new file mode 100644 index 0000000..18b190d --- /dev/null +++ b/crawl_server/run.py @@ -0,0 +1,8 @@ +from crawl_server.task_manager import TaskManager +import time +import config + +tm = TaskManager(config.CRAWL_SERVER_PROCESSES) + +while True: + time.sleep(1) diff --git a/crawl_server/server.py b/crawl_server/server.py deleted file mode 100644 index f5d4d61..0000000 --- a/crawl_server/server.py +++ /dev/null @@ -1,104 +0,0 @@ -from flask import Flask, request, abort, Response, send_file -from flask_httpauth import HTTPTokenAuth -import json -from crawl_server import logger -from crawl_server.task_manager import TaskManager, Task -import os -import config -app = Flask(__name__) -auth = HTTPTokenAuth(scheme="Token") - -token = config.CRAWL_SERVER_TOKEN - -tm = TaskManager("tm_db.sqlite3", config.CRAWL_SERVER_PROCESSES) - - -@auth.verify_token -def verify_token(provided_token): - return token == provided_token - - -@app.route("/task/") -@auth.login_required -def get_tasks(): - json_str = json.dumps([task.to_json() for task in tm.get_tasks()]) - return Response(json_str, mimetype="application/json") - - -@app.route("/task/put", methods=["POST"]) -@auth.login_required -def task_put(): - - if request.json: - try: - website_id = request.json["website_id"] - url = request.json["url"] - priority = request.json["priority"] - callback_type = request.json["callback_type"] - callback_args = request.json["callback_args"] - except KeyError as e: - logger.error("Invalid task put request from " + request.remote_addr + " missing key: " + str(e)) - return abort(400) - - task = Task(website_id, url, priority, callback_type, callback_args) - tm.put_task(task) - logger.info("Submitted new task to queue: " + str(task.to_json())) - return '{"ok": "true"}' - - return abort(400) - - -@app.route("/task/completed", methods=["GET"]) -@auth.login_required -def get_completed_tasks(): - json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()]) - logger.debug("Webserver has requested list of newly completed tasks from " + request.remote_addr) - return Response(json_str, mimetype="application/json") - - -@app.route("/task/current", methods=["GET"]) -@auth.login_required -def get_current_tasks(): - - current_tasks = tm.get_current_tasks() - logger.debug("Webserver has requested list of current tasks from " + request.remote_addr) - return json.dumps([t.to_json() for t in current_tasks]) - - -@app.route("/file_list//") -@auth.login_required -def get_file_list(website_id): - - file_name = "./crawled/" + str(website_id) + ".json" - if os.path.exists(file_name): - logger.info("Webserver requested file list of website with id" + str(website_id)) - return send_file(file_name) - else: - logger.error("Webserver requested file list of non-existent or empty website with id: " + str(website_id)) - return abort(404) - - -@app.route("/file_list//free") -@auth.login_required -def free_file_list(website_id): - file_name = "./crawled/" + str(website_id) + ".json" - if os.path.exists(file_name): - os.remove(file_name) - logger.debug("Webserver indicated that the files for the website with id " + - str(website_id) + " are safe to delete") - return '{"ok": "true"}' - else: - return abort(404) - - -@app.route("/task/pop_all") -@auth.login_required -def pop_queued_tasks(): - - json_str = json.dumps([task.to_json() for task in tm.pop_tasks()]) - logger.info("Webserver poped all queued tasks") - return Response(json_str, mimetype="application/json") - - -if __name__ == "__main__": - app.run(port=config.CRAWL_SERVER_PORT, host="0.0.0.0", ssl_context="adhoc") diff --git a/crawl_server/task_db_init.sql b/crawl_server/task_db_init.sql deleted file mode 100644 index bc6440c..0000000 --- a/crawl_server/task_db_init.sql +++ /dev/null @@ -1,19 +0,0 @@ - -CREATE TABLE Queue ( - id INTEGER PRIMARY KEY, - website_id INTEGER, - url TEXT, - priority INTEGER, - callback_type TEXT, - callback_args TEXT -); - -CREATE TABLE TaskResult ( - id INTEGER PRIMARY KEY, - website_id INT, - status_code TEXT, - file_count INT, - start_time TIMESTAMP, - end_time TIMESTAMP, - indexed_time TIMESTAMP DEFAULT NULL -); \ No newline at end of file diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index 47a6e11..0ade1ae 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -1,6 +1,8 @@ from crawl_server import logger +from tasks import TaskResult, Task import config -from crawl_server.database import TaskManagerDatabase, Task, TaskResult +import requests +import json from multiprocessing import Manager, Pool from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime @@ -9,9 +11,7 @@ from crawl_server.crawler import RemoteDirectoryCrawler class TaskManager: - def __init__(self, db_path, max_processes=2): - self.db_path = db_path - self.db = TaskManagerDatabase(db_path) + def __init__(self, max_processes=2): self.pool = Pool(maxtasksperchild=1, processes=max_processes) self.max_processes = max_processes manager = Manager() @@ -21,41 +21,68 @@ class TaskManager: scheduler.add_job(self.execute_queued_task, "interval", seconds=1) scheduler.start() - def put_task(self, task: Task): - self.db.put_task(task) + def fetch_task(self): + try: + payload = { + "token": config.API_TOKEN + } + r = requests.post(config.SERVER_URL + "/task/get", data=payload) - def get_tasks(self): - return self.db.get_tasks() + if r.status_code == 200: + text = r.text + logger.info("Fetched task from server : " + text) + task_json = json.loads(text) + return Task(task_json["website_id"], task_json["url"]) - def pop_tasks(self): - return self.db.pop_all_tasks() + return None - def get_current_tasks(self): - return self.current_tasks + except Exception as e: + raise e - def get_non_indexed_results(self): - return self.db.get_non_indexed_results() + @staticmethod + def push_result(task_result: TaskResult): + + try: + + payload = { + "token": config.API_TOKEN, + "result": json.dumps(task_result.to_json()) + } + + files = { + # "file_list": open("./crawled/" + str(task_result.website_id) + ".json") + "file_list": open("./local.json") + } + + r = requests.post(config.SERVER_URL + "/task/complete", data=payload, files=files) + + logger.info("RESPONSE: " + r.text) + + except Exception as e: + raise e def execute_queued_task(self): if len(self.current_tasks) <= self.max_processes: - task = self.db.pop_task() + + task = self.fetch_task() + if task: logger.info("Submitted " + task.url + " to process pool") self.current_tasks.append(task) self.pool.apply_async( TaskManager.run_task, - args=(task, self.db_path, self.current_tasks), + args=(task, self.current_tasks), callback=TaskManager.task_complete, error_callback=TaskManager.task_error ) @staticmethod - def run_task(task, db_path, current_tasks): + def run_task(task, current_tasks): result = TaskResult() - result.start_time = datetime.utcnow() + result.start_time = datetime.utcnow().timestamp() result.website_id = task.website_id logger.info("Starting task " + task.url) @@ -67,15 +94,10 @@ class TaskManager: result.file_count = crawl_result.file_count result.status_code = crawl_result.status_code - result.end_time = datetime.utcnow() + result.end_time = datetime.utcnow().timestamp() logger.info("End task " + task.url) - # TODO: Figure out the callbacks - # callback = PostCrawlCallbackFactory.get_callback(task) - # if callback: - # callback.run() - - return result, db_path, current_tasks + return result, current_tasks @staticmethod def task_error(result): @@ -85,14 +107,13 @@ class TaskManager: @staticmethod def task_complete(result): - task_result, db_path, current_tasks = result + task_result, current_tasks = result - logger.info("Task completed, logger result to database") + logger.info("Task completed, sending result to server") logger.info("Status code: " + task_result.status_code) logger.info("File count: " + str(task_result.file_count)) - db = TaskManagerDatabase(db_path) - db.log_result(task_result) + TaskManager.push_result(task_result) for i, task in enumerate(current_tasks): if task.website_id == task_result.website_id: diff --git a/database.py b/database.py index b53e158..ec3d157 100644 --- a/database.py +++ b/database.py @@ -1,13 +1,11 @@ import sqlite3 +import json import datetime -from collections import defaultdict from urllib.parse import urlparse import os import bcrypt import uuid -import task -from crawl_server.database import TaskResult - +import tasks class InvalidQueryException(Exception): pass @@ -29,11 +27,37 @@ class Website: self.id = website_id -class ApiToken: +class ApiClient: - def __init__(self, token, description): + def __init__(self, token, name): self.token = token - self.description = description + self.name = name + + +class Task: + + def __init__(self, website_id: int, url: str, priority: int = 1, + callback_type: str = None, callback_args: str = None): + self.website_id = website_id + self.url = url + self.priority = priority + self.callback_type = callback_type + self.callback_args = json.loads(callback_args) if callback_args else {} + + def to_json(self): + return { + "website_id": self.website_id, + "url": self.url, + "priority": self.priority, + "callback_type": self.callback_type, + "callback_args": json.dumps(self.callback_args) + } + + def __str__(self): + return json.dumps(self.to_json()) + + def __repr__(self): + return self.__str__() class Database: @@ -171,21 +195,22 @@ class Database: cursor.execute("INSERT INTO Admin (username, password) VALUES (?,?)", (username, hashed_pw)) conn.commit() - def check_api_token(self, token) -> bool: + def check_api_token(self, token) -> str: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("SELECT token FROM ApiToken WHERE token=?", (token, )) - return cursor.fetchone() is not None + cursor.execute("SELECT name FROM ApiClient WHERE token=?", (token, )) + result = cursor.fetchone() + return result[0] if result else None - def generate_api_token(self, description: str) -> str: + def generate_api_token(self, name: str) -> str: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() token = str(uuid.uuid4()) - cursor.execute("INSERT INTO ApiToken (token, description) VALUES (?, ?)", (token, description)) + cursor.execute("INSERT INTO ApiClient (token, name) VALUES (?, ?)", (token, name)) conn.commit() return token @@ -195,16 +220,16 @@ class Database: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("SELECT * FROM ApiToken") + cursor.execute("SELECT token, name FROM ApiClient") - return [ApiToken(x[0], x[1]) for x in cursor.fetchall()] + return [ApiClient(x[0], x[1]) for x in cursor.fetchall()] def delete_token(self, token: str) -> None: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("DELETE FROM ApiToken WHERE token=?", (token, )) + cursor.execute("DELETE FROM ApiClient WHERE token=?", (token, )) conn.commit() def get_all_websites(self) -> dict: @@ -289,41 +314,7 @@ class Database: cursor.execute("SELECT * FROM BlacklistedWebsite") return [BlacklistedWebsite(r[0], r[1]) for r in cursor.fetchall()] - def add_crawl_server(self, server: task.CrawlServer): - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("INSERT INTO CrawlServer (url, name, slots, token) VALUES (?,?,?,?)", - (server.url, server.name, server.slots, server.token)) - conn.commit() - - def remove_crawl_server(self, server_id): - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("DELETE FROM CrawlServer WHERE id=?", (server_id, )) - conn.commit() - - def get_crawl_servers(self) -> list: - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("SELECT url, name, slots, token, id FROM CrawlServer") - - return [task.CrawlServer(r[0], r[1], r[2], r[3], r[4]) for r in cursor.fetchall()] - - def update_crawl_server(self, server_id, url, name, slots): - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - cursor.execute("UPDATE CrawlServer SET url=?, name=?, slots=? WHERE id=?", (url, name, slots, server_id)) - conn.commit() - - def log_result(self, result: TaskResult): + def log_result(self, result): with sqlite3.connect(self.db_path) as conn: @@ -338,29 +329,27 @@ class Database: def get_crawl_logs(self): - with sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) as conn: + with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("SELECT website_id, status_code, file_count, start_time, end_time, indexed_time, S.name " - "FROM TaskResult INNER JOIN CrawlServer S on TaskResult.server = S.id " - "ORDER BY end_time DESC") - return [TaskResult(r[1], r[2], r[3].timestamp(), r[4].timestamp(), - r[0], r[5].timestamp() if r[5] else None, r[6]) for r in cursor.fetchall()] + cursor.execute("SELECT website_id, status_code, file_count, start_time, end_time, server " + "FROM TaskResult ORDER BY end_time DESC") + return [tasks.TaskResult(r[1], r[2], r[3], r[4], r[0], r[5]) for r in cursor.fetchall()] - def get_stats_by_server(self): + def get_stats_by_crawler(self): stats = dict() task_results = self.get_crawl_logs() - for server in self.get_crawl_servers(): - task_count = sum(1 for result in task_results if result.server_name == server.name) + for crawler in self.get_tokens(): + task_count = sum(1 for result in task_results if result.server_name == crawler.name) if task_count > 0: - stats[server.name] = dict() - stats[server.name]["file_count"] = sum(result.file_count for result in task_results if result.server_name == server.name) - stats[server.name]["time"] = sum((result.end_time - result.start_time) for result in task_results if result.server_name == server.name) - stats[server.name]["task_count"] = task_count - stats[server.name]["time_avg"] = stats[server.name]["time"] / task_count - stats[server.name]["file_count_avg"] = stats[server.name]["file_count"] / task_count + stats[crawler.name] = dict() + stats[crawler.name]["file_count"] = sum(result.file_count for result in task_results if result.server_name == crawler.name) + stats[crawler.name]["time"] = sum((result.end_time - result.start_time) for result in task_results if result.server_name == crawler.name) + stats[crawler.name]["task_count"] = task_count + stats[crawler.name]["time_avg"] = stats[crawler.name]["time"] / task_count + stats[crawler.name]["file_count_avg"] = stats[crawler.name]["file_count"] / task_count return stats @@ -374,8 +363,61 @@ class Database: conn.commit() - - + def put_task(self, task: Task) -> None: + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + cursor.execute("INSERT INTO Queue (website_id, url, priority, callback_type, callback_args) " + "VALUES (?,?,?,?,?)", + (task.website_id, task.url, task.priority, + task.callback_type, json.dumps(task.callback_args))) + conn.commit() + + def get_tasks(self) -> list: + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + cursor.execute("SELECT website_id, url, priority, callback_type, callback_args FROM Queue " + "WHERE assigned_crawler is NULL ") + db_tasks = cursor.fetchall() + + return [Task(t[0], t[1], t[2], t[3], t[4]) for t in db_tasks] + + def pop_task(self, name) -> Task: + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + cursor.execute("SELECT id, website_id, url, priority, callback_type, callback_args " + "FROM Queue WHERE assigned_crawler is NULL " + "ORDER BY priority DESC, Queue.id ASC LIMIT 1") + task = cursor.fetchone() + + if task: + cursor.execute("UPDATE Queue SET assigned_crawler=? WHERE id=?", (name, task[0],)) + conn.commit() + return Task(task[1], task[2], task[3], task[4], task[5]) + else: + return None + + def complete_task(self, website_id: int, name: str) -> Task: + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + cursor.execute("SELECT id, website_id, url, priority, callback_type, callback_args FROM " + "Queue WHERE website_id=? AND assigned_crawler=?", (website_id, name)) + + task = cursor.fetchone() + + if task: + cursor.execute("DELETE FROM Queue WHERE website_id=? AND assigned_crawler=?", (website_id, name)) + conn.commit() + return Task(task[1], task[2], task[3], task[4], task[5]) + else: + return None diff --git a/init_script.sql b/init_script.sql index 56c2cfd..87e214c 100644 --- a/init_script.sql +++ b/init_script.sql @@ -14,27 +14,14 @@ CREATE TABLE Admin ( password TEXT ); -CREATE TABLE ApiToken ( - token TEXT PRIMARY KEY NOT NULL, - description TEXT -); - CREATE TABLE BlacklistedWebsite ( id INTEGER PRIMARY KEY NOT NULL, url TEXT ); -CREATE TABLE CrawlServer ( - id INTEGER PRIMARY KEY NOT NULL, - url TEXT, - name TEXT, - token TEXT, - slots INTEGER -); - CREATE TABLE TaskResult ( id INTEGER PRIMARY KEY, - server INT, + server TEXT, website_id INT, status_code TEXT, file_count INT, @@ -42,7 +29,12 @@ CREATE TABLE TaskResult ( end_time TIMESTAMP, indexed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (server) REFERENCES CrawlServer(id) + FOREIGN KEY (server) REFERENCES ApiClient(name) +); + +CREATE TABLE ApiClient ( + name TEXT PRIMARY KEY NOT NULL, + token TEXT NOT NULL ); @@ -55,3 +47,15 @@ CREATE TABLE SearchLogEntry ( extensions TEXT, page INT ); + +CREATE TABLE Queue ( + id INTEGER PRIMARY KEY, + website_id INTEGER, + url TEXT, + priority INTEGER, + callback_type TEXT, + callback_args TEXT, + assigned_crawler TEXT NULL DEFAULT NULL, + + FOREIGN KEY (assigned_crawler) REFERENCES ApiClient(name) +); diff --git a/search/search.py b/search/search.py index 10b39eb..f3e19d0 100644 --- a/search/search.py +++ b/search/search.py @@ -115,7 +115,7 @@ class ElasticSearchEngine(SearchEngine): def import_json(self, in_lines, website_id: int): import_every = 1000 - cooldown_time = 0.5 + cooldown_time = 1 docs = [] diff --git a/stress_test.py b/stress_test.py index 6c8e39c..aff5d18 100644 --- a/stress_test.py +++ b/stress_test.py @@ -91,8 +91,8 @@ def make_wide_filesystem(count=100000): os.mkdir(new_path) -# dump_local_filesystem("/mnt/") -index_file_list("local_filesystem.json", 4) +dump_local_filesystem("/mnt/") +# index_file_list("local_filesystem.json", 4) # random_searches(100000) # dump_random_files(20000 * 100000) # make_wide_filesystem(10000) diff --git a/task.py b/task.py deleted file mode 100644 index 59059e4..0000000 --- a/task.py +++ /dev/null @@ -1,237 +0,0 @@ -from apscheduler.schedulers.background import BackgroundScheduler -from search.search import ElasticSearchEngine -from crawl_server.database import Task, TaskResult -import requests -from requests.exceptions import ConnectionError, ReadTimeout -import json -import database -from concurrent.futures import ThreadPoolExecutor -import urllib3 - -urllib3.disable_warnings() - - -class CrawlServer: - - def __init__(self, url, name, slots, token, server_id=None): - self.url = url - self.name = name - self.slots = slots - self.used_slots = 0 - self.token = token - self.id = server_id - - def _generate_headers(self): - return { - "Content-Type": "application/json", - "Authorization": "Token " + self.token, - } - - def queue_task(self, task: Task) -> bool: - - print("Sending task to crawl server " + self.url) - try: - payload = json.dumps(task.to_json()) - r = requests.post(self.url + "/task/put", headers=self._generate_headers(), data=payload, verify=False, - timeout=20) - print(r) # TODO: If the task could not be added, fallback to another server - return r.status_code == 200 - except (ConnectionError, ReadTimeout): - return False - - def pop_completed_tasks(self) -> list: - - try: - r = requests.get(self.url + "/task/completed", headers=self._generate_headers(), verify=False, timeout=15) - if r.status_code != 200: - print("Problem while fetching completed tasks for '" + self.name + "': " + str(r.status_code)) - print(r.text) - return [] - return [ - TaskResult(r["status_code"], r["file_count"], r["start_time"], r["end_time"], r["website_id"]) - for r in json.loads(r.text)] - except (ConnectionError, ReadTimeout): - print("Crawl server cannot be reached @ " + self.url) - return [] - - def fetch_queued_tasks(self): - - try: - r = requests.get(self.url + "/task/", headers=self._generate_headers(), verify=False, timeout=15) - - if r.status_code != 200: - print("Problem while fetching queued tasks for '" + self.name + "' " + str(r.status_code)) - print(r.text) - return None - - return [ - Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) - for t in json.loads(r.text) - ] - except (ConnectionError, ReadTimeout): - return None - - def fetch_current_tasks(self): - - try: - r = requests.get(self.url + "/task/current", headers=self._generate_headers(), verify=False, timeout=10) - - if r.status_code != 200: - print("Problem while fetching current tasks for '" + self.name + "' " + str(r.status_code)) - print(r.text) - return None - - return [ - Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) - for t in json.loads(r.text) - ] - except (ConnectionError, ReadTimeout): - return None - - def fetch_website_files(self, website_id) -> str: - - try: - r = requests.get(self.url + "/file_list/" + str(website_id) + "/", stream=True, - headers=self._generate_headers(), verify=False) - - if r.status_code != 200: - print("Problem while fetching website files for '" + self.name + "': " + str(r.status_code)) - print(r.text) - return "" - - for line in r.iter_lines(chunk_size=1024 * 256): - yield line - except (ConnectionError, ReadTimeout): - return "" - - def free_website_files(self, website_id) -> bool: - - try: - r = requests.get(self.url + "/file_list/" + str(website_id) + "/free", headers=self._generate_headers(), - verify=False) - return r.status_code == 200 - except (ConnectionError, ReadTimeout) as e: - print(e) - return False - - def pop_queued_tasks(self): - try: - r = requests.get(self.url + "/task/pop_all", headers=self._generate_headers(), verify=False) - - if r.status_code != 200: - print("Problem while popping tasks for '" + self.name + "': " + str(r.status_code)) - print(r.text) - - return [ - Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) - for t in json.loads(r.text) - ] - except (ConnectionError, ReadTimeout): - return [] - - -class TaskDispatcher: - - def __init__(self): - scheduler = BackgroundScheduler() - scheduler.add_job(self.check_completed_tasks, "interval", seconds=10) - scheduler.start() - - self.search = ElasticSearchEngine("od-database") - self.db = database.Database("db.sqlite3") - - def check_completed_tasks(self): - - for server in self.db.get_crawl_servers(): - for task in server.pop_completed_tasks(): - print("Completed task") - - task.server_id = server.id - - if task.file_count: - # All files are overwritten - self.search.delete_docs(task.website_id) - file_list = server.fetch_website_files(task.website_id) - self.search.import_json(file_list, task.website_id) - # File list is safe to delete once indexed - server.free_website_files(task.website_id) - - # Update last_modified date for website - self.db.update_website_date_if_exists(task.website_id) - - self.db.log_result(task) - - def dispatch_task(self, task: Task): - self._get_available_crawl_server().queue_task(task) - - def _get_available_crawl_server(self) -> CrawlServer: - - queued_tasks_by_server = self._get_queued_tasks_by_server() - server_with_most_free_slots = None - most_free_slots = -10000 - - for server in queued_tasks_by_server: - free_slots = server.slots - len(queued_tasks_by_server[server]) - if free_slots > most_free_slots: - server_with_most_free_slots = server - most_free_slots = free_slots - - print("Dispatching task to '" + - server_with_most_free_slots.name + "' " + - str(most_free_slots) + " free out of " + str(server_with_most_free_slots.slots)) - - return server_with_most_free_slots - - def get_queued_tasks(self): - - queued_tasks_by_server = self._get_queued_tasks_by_server() - for queued_tasks in queued_tasks_by_server.values(): - for task in queued_tasks: - yield task - - def _get_queued_tasks_by_server(self) -> dict: - - queued_tasks = dict() - pool = ThreadPoolExecutor(max_workers=10) - crawl_servers = self.db.get_crawl_servers() - responses = list(pool.map(lambda s: s.fetch_queued_tasks(), crawl_servers)) - pool.shutdown() - - for i, server in enumerate(crawl_servers): - if responses[i] is not None: - queued_tasks[server] = responses[i] - - return queued_tasks - - def get_current_tasks(self): - - current_tasks_by_server = self._get_current_tasks_by_server() - for current_tasks in current_tasks_by_server.values(): - for task in current_tasks: - yield task - - def _get_current_tasks_by_server(self) -> dict: - - current_tasks = dict() - pool = ThreadPoolExecutor(max_workers=10) - crawl_servers = self.db.get_crawl_servers() - responses = list(pool.map(lambda s: s.fetch_current_tasks(), crawl_servers)) - pool.shutdown() - - for i, server in enumerate(crawl_servers): - if responses[i] is not None: - current_tasks[server] = responses[i] - - return current_tasks - - def redispatch_queued(self) -> int: - - counter = 0 - for server in self.db.get_crawl_servers(): - for task in server.pop_queued_tasks(): - self.dispatch_task(task) - counter += 1 - - return counter - - diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..e940404 --- /dev/null +++ b/tasks.py @@ -0,0 +1,90 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from werkzeug.datastructures import FileStorage +from search.search import ElasticSearchEngine +import json +import database +import urllib3 + +urllib3.disable_warnings() + + +class Task: + + def __init__(self, website_id: int, url: str, priority: int = 1, + callback_type: str = None, callback_args: str = None): + self.website_id = website_id + self.url = url + self.priority = priority + self.callback_type = callback_type + self.callback_args = json.loads(callback_args) if callback_args else {} + + def to_json(self): + return { + "website_id": self.website_id, + "url": self.url, + "priority": self.priority, + "callback_type": self.callback_type, + "callback_args": json.dumps(self.callback_args) + } + + def __str__(self): + return json.dumps(self.to_json()) + + def __repr__(self): + return self.__str__() + + +class TaskResult: + + def __init__(self, status_code=None, file_count=0, start_time=0, + end_time=0, website_id=0, server_name=""): + self.status_code = status_code + self.file_count = file_count + self.start_time = start_time + self.end_time = end_time + self.website_id = website_id + self.server_name = server_name + + def to_json(self): + return { + "status_code": self.status_code, + "file_count": self.file_count, + "start_time": self.start_time, + "end_time": self.end_time, + "website_id": self.website_id + } + + +class TaskManager: + + def __init__(self): + self.search = ElasticSearchEngine("od-database") + self.db = database.Database("db.sqlite3") + + def complete_task(self, file_list, task, task_result, crawler_name): + + if file_list: + self.search.delete_docs(task_result.website_id) + + def iter_lines(): + + with open(file_list, "r") as f: + line = f.readline() + while line: + yield line + line = f.readline() + + self.search.import_json(iter_lines(), task.website_id) + + self.db.update_website_date_if_exists(task.website_id) + + task_result.server_id = crawler_name + + self.db.log_result(task_result) + + def queue_task(self, task: Task): + self.db.put_task(task) + print("Queued task and made it available to crawlers: " + str(task.website_id)) + + def get_queued_tasks(self) -> list: + return self.db.get_tasks() diff --git a/templates/crawl_logs.html b/templates/crawl_logs.html index a48a04b..6422df4 100644 --- a/templates/crawl_logs.html +++ b/templates/crawl_logs.html @@ -7,14 +7,13 @@ - + - @@ -25,10 +24,9 @@ - - + + - {% endfor %} diff --git a/templates/dashboard.html b/templates/dashboard.html index 7cc74c1..a1b8457 100644 --- a/templates/dashboard.html +++ b/templates/dashboard.html @@ -7,57 +7,15 @@
Dashboard
- Logs -
-
-

Crawl servers

-
ServerCrawler Website Status code File count Start End DeltaIndex
#{{ task_result.website_id }} {{ task_result.status_code }} {{ task_result.file_count }}{{ task_result.start_time | datetime_format }}{{ task_result.end_time | datetime_format }}{{ task_result.start_time | int | datetime_format }}{{ task_result.end_time | int | datetime_format }} {{ ((task_result.end_time - task_result.start_time)) | int }} sec{{ task_result.indexed_time | datetime_format }}
- - - - - - - - - - {% for server in crawl_servers %} - - - - - - - {% endfor %} - -
UrlNameSlotsAction
{{ server.url }}{{ server.name }}{{ server.slots }}Delete
-
-
-
- -
-
- -
-
- -
-
- -
-
- -
-
-
+ Logs

API Keys

- - + + @@ -65,7 +23,7 @@ {% for token in api_tokens %} - + - + {% for server in crawl_server_stats %} {% endfor %} - + {% for server in crawl_server_stats %} {% endfor %} diff --git a/tmp/README.md b/tmp/README.md new file mode 100644 index 0000000..a9a812e --- /dev/null +++ b/tmp/README.md @@ -0,0 +1 @@ +Files currently being indexing goes here \ No newline at end of file
DescriptionKeyNameToken Action
{{ token.description }}{{ token.name }} {{ token.token }}
@@ -122,7 +80,8 @@

Misc actions

- Delete websites with no associated files that are not queued + Delete websites with no associated files that are + not queued Re-dispatch queued tasks Re-queue websites with no associated files @@ -133,30 +92,30 @@ {% endblock body %} diff --git a/templates/home.html b/templates/home.html index f011158..df2a5fa 100644 --- a/templates/home.html +++ b/templates/home.html @@ -11,9 +11,6 @@ {% if stats and stats["total_size"] %}

{{ stats["total_count"] }} files totalling ~{{ stats["total_size"] | filesizeformat }} from {{ stats["website_count"] }} websites

- {% if current_websites %} -

Currently indexing {{ current_websites }} 

- {% endif %} {% else %}

We're currently experiencing a high volume of traffic. The search function may be unresponsive.

diff --git a/templates/stats.html b/templates/stats.html index 45ead7e..b90f0c1 100644 --- a/templates/stats.html +++ b/templates/stats.html @@ -100,13 +100,13 @@ {% endfor %}
File crawledFiles crawled{{ crawl_server_stats[server].file_count }}
File crawled averageFiles crawled average{{ crawl_server_stats[server].file_count_avg | round(2) }} per task