From 6d48f1f7803ddb1996300befc45087129336d73b Mon Sep 17 00:00:00 2001 From: Simon Date: Tue, 12 Jun 2018 11:03:45 -0400 Subject: [PATCH] Task crawl result now logged in a database --- crawl_server/database.py | 39 +++++++++++++++++++++++++++++------- crawl_server/server.py | 3 ++- crawl_server/task_manager.py | 38 ++++++++++++++++++----------------- debug_put.py | 5 +++-- task_db_init.sql | 10 +++++++++ test/test_crawl_server.py | 4 +++- 6 files changed, 70 insertions(+), 29 deletions(-) diff --git a/crawl_server/database.py b/crawl_server/database.py index 4966e6d..e2b0704 100644 --- a/crawl_server/database.py +++ b/crawl_server/database.py @@ -3,9 +3,21 @@ import json import sqlite3 +class TaskResult: + + def __init__(self): + self.status_code: str = None + self.file_count = 0 + self.start_time = None + self.end_time = None + self.website_id = None + + class Task: - def __init__(self, url: str, priority: int = 1, callback_type: str = None, callback_args: str = None): + def __init__(self, website_id: int, url: str, priority: int = 1, + callback_type: str = None, callback_args: str = None): + self.website_id = website_id self.url = url self.priority = priority self.callback_type = callback_type @@ -13,6 +25,7 @@ class Task: def to_json(self): return ({ + "website_id": self.website_id, "url": self.url, "priority": self.priority, "callback_type": self.callback_type, @@ -42,14 +55,14 @@ class TaskManagerDatabase: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("SELECT id, url, priority, callback_type, callback_args" + cursor.execute("SELECT id, website_id, url, priority, callback_type, callback_args" " FROM Queue ORDER BY priority DESC, Queue.id ASC LIMIT 1") task = cursor.fetchone() if task: cursor.execute("DELETE FROM Queue WHERE id=?", (task[0],)) conn.commit() - return Task(task[1], task[2], task[3], task[4]) + return Task(task[1], task[2], task[3], task[4], task[5]) else: return None @@ -58,8 +71,10 @@ class TaskManagerDatabase: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("INSERT INTO Queue (url, priority, callback_type, callback_args) VALUES (?,?,?,?)", - (task.url, task.priority, task.callback_type, json.dumps(task.callback_args))) + cursor.execute("INSERT INTO Queue (website_id, url, priority, callback_type, callback_args) " + "VALUES (?,?,?,?,?)", + (task.website_id, task.url, task.priority, + task.callback_type, json.dumps(task.callback_args))) conn.commit() def get_tasks(self): @@ -67,7 +82,17 @@ class TaskManagerDatabase: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("SELECT * FROM Queue") + cursor.execute("SELECT website_id, url, priority, callback_type, callback_args FROM Queue") tasks = cursor.fetchall() - return [Task(t[1], t[2], t[3], t[4]) for t in tasks] + return [Task(t[0], t[1], t[2], t[3], t[4]) for t in tasks] + + def log_result(self, result: TaskResult): + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + cursor.execute("INSERT INTO TaskResult (website_id, status_code, file_count, start_time, end_time) " + "VALUES (?,?,?,?,?)", (result.website_id, result.status_code, result.file_count, + result.start_time, result.end_time)) + conn.commit() diff --git a/crawl_server/server.py b/crawl_server/server.py index b1aa1a9..cb1006d 100644 --- a/crawl_server/server.py +++ b/crawl_server/server.py @@ -22,6 +22,7 @@ def task_put(): if request.json: try: + website_id = request.json["website_id"] url = request.json["url"] priority = request.json["priority"] callback_type = request.json["callback_type"] @@ -29,7 +30,7 @@ def task_put(): except KeyError: return abort(400) - task = Task(url, priority, callback_type, callback_args) + task = Task(website_id, url, priority, callback_type, callback_args) tm.put_task(task) return '{"ok": "true"}' diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index 6daaefd..cfeebda 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -1,23 +1,14 @@ -from crawl_server.database import TaskManagerDatabase, Task +from crawl_server.database import TaskManagerDatabase, Task, TaskResult from multiprocessing import Pool from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime from crawler.crawler import RemoteDirectoryCrawler -class TaskResult: - - def __init__(self): - self.status_code: str = None - self.file_count = 0 - self.start_time = None - self.end_time = None - self.website_id = None - - class TaskManager: def __init__(self, db_path, max_processes=8): + self.db_path = db_path self.db = TaskManagerDatabase(db_path) self.pool = Pool(processes=max_processes) @@ -38,19 +29,21 @@ class TaskManager: print("pooled " + task.url) self.pool.apply_async( TaskManager.run_task, - args=(task, ), - callback=TaskManager.task_complete + args=(task, self.db_path), + callback=TaskManager.task_complete, + error_callback=TaskManager.task_error ) @staticmethod - def run_task(task): + def run_task(task, db_path): result = TaskResult() result.start_time = datetime.utcnow() + result.website_id = task.website_id print("Starting task " + task.url) crawler = RemoteDirectoryCrawler(task.url, 100) - crawl_result = crawler.crawl_directory("12345.json") + crawl_result = crawler.crawl_directory("crawled/" + str(task.website_id) + ".json") result.file_count = crawl_result.file_count result.status_code = crawl_result.status_code @@ -59,15 +52,24 @@ class TaskManager: result.end_time = datetime.utcnow() - return result + return dict(result=result, db_path=db_path) @staticmethod - def task_complete(result: TaskResult): + def task_complete(kwargs): + result = kwargs["result"] + db_path = kwargs["db_path"] print(result.status_code) print(result.file_count) print(result.start_time) print(result.end_time) - # todo save in db + + db = TaskManagerDatabase(db_path) + db.log_result(result) + + @staticmethod + def task_error(err): + print("ERROR") + print(err) diff --git a/debug_put.py b/debug_put.py index 3274d78..9b5cffa 100644 --- a/debug_put.py +++ b/debug_put.py @@ -3,7 +3,8 @@ import json payload = json.dumps({ - "url": "http://138.197.215.189/", + "website_id": 123, + "url": "http://124.158.108.137/ebooks/", "priority": 2, "callback_type": "", "callback_args": "{}" @@ -11,4 +12,4 @@ payload = json.dumps({ r = requests.post("http://localhost:5000/task/put", headers={"Content-Type": "application/json"}, - data=payload) \ No newline at end of file + data=payload) diff --git a/task_db_init.sql b/task_db_init.sql index f532463..809694f 100644 --- a/task_db_init.sql +++ b/task_db_init.sql @@ -1,8 +1,18 @@ CREATE TABLE Queue ( id INTEGER PRIMARY KEY, + website_id INTEGER, url TEXT, priority INTEGER, callback_type TEXT, callback_args TEXT +); + +CREATE TABLE TaskResult ( + id INTEGER PRIMARY KEY, + website_id INT, + status_code TEXT, + file_count INT, + start_time INT, + end_time INT ); \ No newline at end of file diff --git a/test/test_crawl_server.py b/test/test_crawl_server.py index bba031b..9c29740 100644 --- a/test/test_crawl_server.py +++ b/test/test_crawl_server.py @@ -21,6 +21,7 @@ class CrawlServerTest(LiveServerTestCase): def test_put_task(self): payload = json.dumps({ + "website_id": 123, "url": "a", "priority": 2, "callback_type": "c", @@ -33,12 +34,13 @@ class CrawlServerTest(LiveServerTestCase): self.assertEqual(200, r.status_code) result = json.loads(r.text)[0] + self.assertEqual(result["website_id"], 123) self.assertEqual(result["url"], "a") self.assertEqual(result["priority"], 2) self.assertEqual(result["callback_type"], "c") self.assertEqual(result["callback_args"], '{"d": 4}') - payload = json.dumps({"url": "", "priority": 1, "callback_type": "", "callback_args": "{}"}) + payload = json.dumps({"website_id": 1, "url": "", "priority": 1, "callback_type": "", "callback_args": "{}"}) r = requests.post(self.HOST + "/task/put", data=payload) self.assertEqual(400, r.status_code)