Task crawl result now logged in a database

This commit is contained in:
Simon 2018-06-12 11:03:45 -04:00
parent 011b8455a7
commit 6d48f1f780
6 changed files with 70 additions and 29 deletions

View File

@ -3,9 +3,21 @@ import json
import sqlite3 import sqlite3
class TaskResult:
def __init__(self):
self.status_code: str = None
self.file_count = 0
self.start_time = None
self.end_time = None
self.website_id = None
class Task: class Task:
def __init__(self, url: str, priority: int = 1, callback_type: str = None, callback_args: str = None): def __init__(self, website_id: int, url: str, priority: int = 1,
callback_type: str = None, callback_args: str = None):
self.website_id = website_id
self.url = url self.url = url
self.priority = priority self.priority = priority
self.callback_type = callback_type self.callback_type = callback_type
@ -13,6 +25,7 @@ class Task:
def to_json(self): def to_json(self):
return ({ return ({
"website_id": self.website_id,
"url": self.url, "url": self.url,
"priority": self.priority, "priority": self.priority,
"callback_type": self.callback_type, "callback_type": self.callback_type,
@ -42,14 +55,14 @@ class TaskManagerDatabase:
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT id, url, priority, callback_type, callback_args" cursor.execute("SELECT id, website_id, url, priority, callback_type, callback_args"
" FROM Queue ORDER BY priority DESC, Queue.id ASC LIMIT 1") " FROM Queue ORDER BY priority DESC, Queue.id ASC LIMIT 1")
task = cursor.fetchone() task = cursor.fetchone()
if task: if task:
cursor.execute("DELETE FROM Queue WHERE id=?", (task[0],)) cursor.execute("DELETE FROM Queue WHERE id=?", (task[0],))
conn.commit() conn.commit()
return Task(task[1], task[2], task[3], task[4]) return Task(task[1], task[2], task[3], task[4], task[5])
else: else:
return None return None
@ -58,8 +71,10 @@ class TaskManagerDatabase:
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("INSERT INTO Queue (url, priority, callback_type, callback_args) VALUES (?,?,?,?)", cursor.execute("INSERT INTO Queue (website_id, url, priority, callback_type, callback_args) "
(task.url, task.priority, task.callback_type, json.dumps(task.callback_args))) "VALUES (?,?,?,?,?)",
(task.website_id, task.url, task.priority,
task.callback_type, json.dumps(task.callback_args)))
conn.commit() conn.commit()
def get_tasks(self): def get_tasks(self):
@ -67,7 +82,17 @@ class TaskManagerDatabase:
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT * FROM Queue") cursor.execute("SELECT website_id, url, priority, callback_type, callback_args FROM Queue")
tasks = cursor.fetchall() tasks = cursor.fetchall()
return [Task(t[1], t[2], t[3], t[4]) for t in tasks] return [Task(t[0], t[1], t[2], t[3], t[4]) for t in tasks]
def log_result(self, result: TaskResult):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO TaskResult (website_id, status_code, file_count, start_time, end_time) "
"VALUES (?,?,?,?,?)", (result.website_id, result.status_code, result.file_count,
result.start_time, result.end_time))
conn.commit()

View File

@ -22,6 +22,7 @@ def task_put():
if request.json: if request.json:
try: try:
website_id = request.json["website_id"]
url = request.json["url"] url = request.json["url"]
priority = request.json["priority"] priority = request.json["priority"]
callback_type = request.json["callback_type"] callback_type = request.json["callback_type"]
@ -29,7 +30,7 @@ def task_put():
except KeyError: except KeyError:
return abort(400) return abort(400)
task = Task(url, priority, callback_type, callback_args) task = Task(website_id, url, priority, callback_type, callback_args)
tm.put_task(task) tm.put_task(task)
return '{"ok": "true"}' return '{"ok": "true"}'

View File

@ -1,23 +1,14 @@
from crawl_server.database import TaskManagerDatabase, Task from crawl_server.database import TaskManagerDatabase, Task, TaskResult
from multiprocessing import Pool from multiprocessing import Pool
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime from datetime import datetime
from crawler.crawler import RemoteDirectoryCrawler from crawler.crawler import RemoteDirectoryCrawler
class TaskResult:
def __init__(self):
self.status_code: str = None
self.file_count = 0
self.start_time = None
self.end_time = None
self.website_id = None
class TaskManager: class TaskManager:
def __init__(self, db_path, max_processes=8): def __init__(self, db_path, max_processes=8):
self.db_path = db_path
self.db = TaskManagerDatabase(db_path) self.db = TaskManagerDatabase(db_path)
self.pool = Pool(processes=max_processes) self.pool = Pool(processes=max_processes)
@ -38,19 +29,21 @@ class TaskManager:
print("pooled " + task.url) print("pooled " + task.url)
self.pool.apply_async( self.pool.apply_async(
TaskManager.run_task, TaskManager.run_task,
args=(task, ), args=(task, self.db_path),
callback=TaskManager.task_complete callback=TaskManager.task_complete,
error_callback=TaskManager.task_error
) )
@staticmethod @staticmethod
def run_task(task): def run_task(task, db_path):
result = TaskResult() result = TaskResult()
result.start_time = datetime.utcnow() result.start_time = datetime.utcnow()
result.website_id = task.website_id
print("Starting task " + task.url) print("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, 100) crawler = RemoteDirectoryCrawler(task.url, 100)
crawl_result = crawler.crawl_directory("12345.json") crawl_result = crawler.crawl_directory("crawled/" + str(task.website_id) + ".json")
result.file_count = crawl_result.file_count result.file_count = crawl_result.file_count
result.status_code = crawl_result.status_code result.status_code = crawl_result.status_code
@ -59,15 +52,24 @@ class TaskManager:
result.end_time = datetime.utcnow() result.end_time = datetime.utcnow()
return result return dict(result=result, db_path=db_path)
@staticmethod @staticmethod
def task_complete(result: TaskResult): def task_complete(kwargs):
result = kwargs["result"]
db_path = kwargs["db_path"]
print(result.status_code) print(result.status_code)
print(result.file_count) print(result.file_count)
print(result.start_time) print(result.start_time)
print(result.end_time) print(result.end_time)
# todo save in db
db = TaskManagerDatabase(db_path)
db.log_result(result)
@staticmethod
def task_error(err):
print("ERROR")
print(err)

View File

@ -3,7 +3,8 @@ import json
payload = json.dumps({ payload = json.dumps({
"url": "http://138.197.215.189/", "website_id": 123,
"url": "http://124.158.108.137/ebooks/",
"priority": 2, "priority": 2,
"callback_type": "", "callback_type": "",
"callback_args": "{}" "callback_args": "{}"

View File

@ -1,8 +1,18 @@
CREATE TABLE Queue ( CREATE TABLE Queue (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
website_id INTEGER,
url TEXT, url TEXT,
priority INTEGER, priority INTEGER,
callback_type TEXT, callback_type TEXT,
callback_args TEXT callback_args TEXT
); );
CREATE TABLE TaskResult (
id INTEGER PRIMARY KEY,
website_id INT,
status_code TEXT,
file_count INT,
start_time INT,
end_time INT
);

View File

@ -21,6 +21,7 @@ class CrawlServerTest(LiveServerTestCase):
def test_put_task(self): def test_put_task(self):
payload = json.dumps({ payload = json.dumps({
"website_id": 123,
"url": "a", "url": "a",
"priority": 2, "priority": 2,
"callback_type": "c", "callback_type": "c",
@ -33,12 +34,13 @@ class CrawlServerTest(LiveServerTestCase):
self.assertEqual(200, r.status_code) self.assertEqual(200, r.status_code)
result = json.loads(r.text)[0] result = json.loads(r.text)[0]
self.assertEqual(result["website_id"], 123)
self.assertEqual(result["url"], "a") self.assertEqual(result["url"], "a")
self.assertEqual(result["priority"], 2) self.assertEqual(result["priority"], 2)
self.assertEqual(result["callback_type"], "c") self.assertEqual(result["callback_type"], "c")
self.assertEqual(result["callback_args"], '{"d": 4}') self.assertEqual(result["callback_args"], '{"d": 4}')
payload = json.dumps({"url": "", "priority": 1, "callback_type": "", "callback_args": "{}"}) payload = json.dumps({"website_id": 1, "url": "", "priority": 1, "callback_type": "", "callback_args": "{}"})
r = requests.post(self.HOST + "/task/put", data=payload) r = requests.post(self.HOST + "/task/put", data=payload)
self.assertEqual(400, r.status_code) self.assertEqual(400, r.status_code)