From d84922779879a9c178888976e79f6d87e74b9a33 Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 11 Jun 2018 19:00:43 -0400 Subject: [PATCH] barebones crawl_server microservice --- crawl_server/__init__.py | 0 crawl_server/database.py | 73 ++++++++++++++ crawl_server/server.py | 40 ++++++++ crawl_server/task_manager.py | 73 ++++++++++++++ crawler/crawler.py | 2 +- crawler/http.py | 2 +- debug_put.py | 14 +++ requirements.txt | 1 + task.py | 2 - task_db_init.sql | 8 ++ test/test_Database.py | 190 ----------------------------------- test/test_crawl_server.py | 53 ++++++++++ test/test_input.py | 22 ---- test/test_scan1.json | 4 - 14 files changed, 264 insertions(+), 220 deletions(-) create mode 100644 crawl_server/__init__.py create mode 100644 crawl_server/database.py create mode 100644 crawl_server/server.py create mode 100644 crawl_server/task_manager.py create mode 100644 debug_put.py create mode 100644 task_db_init.sql delete mode 100644 test/test_Database.py create mode 100644 test/test_crawl_server.py delete mode 100644 test/test_input.py delete mode 100644 test/test_scan1.json diff --git a/crawl_server/__init__.py b/crawl_server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/crawl_server/database.py b/crawl_server/database.py new file mode 100644 index 0000000..4966e6d --- /dev/null +++ b/crawl_server/database.py @@ -0,0 +1,73 @@ +import os +import json +import sqlite3 + + +class Task: + + def __init__(self, url: str, priority: int = 1, callback_type: str = None, callback_args: str = None): + self.url = url + self.priority = priority + self.callback_type = callback_type + self.callback_args = json.loads(callback_args) if callback_args else {} + + def to_json(self): + return ({ + "url": self.url, + "priority": self.priority, + "callback_type": self.callback_type, + "callback_args": json.dumps(self.callback_args) + }) + + +class TaskManagerDatabase: + + def __init__(self, db_path): + self.db_path = db_path + + if not os.path.exists(db_path): + self.init_database() + + def init_database(self): + + with open("task_db_init.sql", "r") as f: + init_script = f.read() + + with sqlite3.connect(self.db_path) as conn: + conn.executescript(init_script) + conn.commit() + + def pop_task(self): + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + cursor.execute("SELECT id, url, priority, callback_type, callback_args" + " FROM Queue ORDER BY priority DESC, Queue.id ASC LIMIT 1") + task = cursor.fetchone() + + if task: + cursor.execute("DELETE FROM Queue WHERE id=?", (task[0],)) + conn.commit() + return Task(task[1], task[2], task[3], task[4]) + else: + return None + + def put_task(self, task: Task): + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + cursor.execute("INSERT INTO Queue (url, priority, callback_type, callback_args) VALUES (?,?,?,?)", + (task.url, task.priority, task.callback_type, json.dumps(task.callback_args))) + conn.commit() + + def get_tasks(self): + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + cursor.execute("SELECT * FROM Queue") + tasks = cursor.fetchall() + + return [Task(t[1], t[2], t[3], t[4]) for t in tasks] diff --git a/crawl_server/server.py b/crawl_server/server.py new file mode 100644 index 0000000..b1aa1a9 --- /dev/null +++ b/crawl_server/server.py @@ -0,0 +1,40 @@ +from flask import Flask, request, abort, Response +import json +from crawl_server.task_manager import TaskManager, Task +app = Flask(__name__) + +tm = TaskManager("tm_db.sqlite3") + + +@app.route("/") +def hello(): + return "Hello World!" + + +@app.route("/task/") +def get_tasks(): + json_str = json.dumps([task.to_json() for task in tm.get_tasks()]) + return Response(json_str, mimetype="application/json") + + +@app.route("/task/put", methods=["POST"]) +def task_put(): + + if request.json: + try: + url = request.json["url"] + priority = request.json["priority"] + callback_type = request.json["callback_type"] + callback_args = request.json["callback_args"] + except KeyError: + return abort(400) + + task = Task(url, priority, callback_type, callback_args) + tm.put_task(task) + return '{"ok": "true"}' + + return abort(400) + + +if __name__ == "__main__": + app.run() diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py new file mode 100644 index 0000000..12216aa --- /dev/null +++ b/crawl_server/task_manager.py @@ -0,0 +1,73 @@ +from crawl_server.database import TaskManagerDatabase, Task +from multiprocessing import Pool +from apscheduler.schedulers.background import BackgroundScheduler +from enum import Enum +from datetime import datetime +from crawler.crawler import RemoteDirectoryCrawler + + +class TaskResultStatus(Enum): + SUCCESS = 0 + FAILURE = 1 + + +class TaskResult: + + def __init__(self): + self.status_code: TaskResultStatus = None + self.file_count = 0 + self.start_time = None + self.end_time = None + self.website_id = None + + +class TaskManager: + + def __init__(self, db_path, max_processes=8): + self.db = TaskManagerDatabase(db_path) + self.pool = Pool(processes=max_processes) + + scheduler = BackgroundScheduler() + scheduler.add_job(self.execute_queued_task, "interval", seconds=1) + scheduler.start() + + def put_task(self, task: Task): + self.db.put_task(task) + + def get_tasks(self): + return self.db.get_tasks() + + def execute_queued_task(self): + + task = self.db.pop_task() + if task: + print("pooled " + task.url) + self.pool.apply_async( + TaskManager.run_task, + args=(task, ), + callback=TaskManager.task_complete + ) + + @staticmethod + def run_task(task): + result = TaskResult() + result.start_time = datetime.utcnow() + + print("Starting task " + task.url) + + crawler = RemoteDirectoryCrawler(task.url, 10) + crawler.crawl_directory() + + print("End task " + task.url) + + result.end_time = datetime.utcnow() + + return result + + @staticmethod + def task_complete(result: TaskResult): + print("Task done " + str(result)) + # todo save in db + + + diff --git a/crawler/crawler.py b/crawler/crawler.py index 7268ae8..73968a7 100644 --- a/crawler/crawler.py +++ b/crawler/crawler.py @@ -71,7 +71,7 @@ class RemoteDirectoryCrawler: try: directory = RemoteDirectoryFactory.get_directory(self.url) - root_listing = directory.list_dir("/dl2/") # todo get path + root_listing = directory.list_dir("/") directory.close() except TimeoutError: return diff --git a/crawler/http.py b/crawler/http.py index bf81103..3f09df7 100644 --- a/crawler/http.py +++ b/crawler/http.py @@ -54,7 +54,7 @@ class HttpDirectory(RemoteDirectory): if self._should_ignore(link): continue - file_url = urljoin(path_url, link[1]) + file_url = urljoin(path_url, link.url) path, file_name = os.path.split(file_url[len(self.base_url) - 1:]) if self._isdir(link): diff --git a/debug_put.py b/debug_put.py new file mode 100644 index 0000000..416a368 --- /dev/null +++ b/debug_put.py @@ -0,0 +1,14 @@ +import requests +import json + + +payload = json.dumps({ + "url": "http://124.158.108.137/ebooks/", + "priority": 2, + "callback_type": "", + "callback_args": "{}" +}) + +r = requests.post("http://localhost:5000/task/put", + headers={"Content-Type": "application/json"}, + data=payload) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a001e40..5758590 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ flask +flask_testing requests bs4 validators diff --git a/task.py b/task.py index bb80cd7..8031d14 100644 --- a/task.py +++ b/task.py @@ -2,8 +2,6 @@ from apscheduler.schedulers.background import BackgroundScheduler import os from database import Website from multiprocessing import Value, Process -from scrapy.crawler import CrawlerProcess -from scrapy.utils.project import get_project_settings from database import Database from reddit_bot import RedditBot import praw diff --git a/task_db_init.sql b/task_db_init.sql new file mode 100644 index 0000000..f532463 --- /dev/null +++ b/task_db_init.sql @@ -0,0 +1,8 @@ + +CREATE TABLE Queue ( + id INTEGER PRIMARY KEY, + url TEXT, + priority INTEGER, + callback_type TEXT, + callback_args TEXT +); \ No newline at end of file diff --git a/test/test_Database.py b/test/test_Database.py deleted file mode 100644 index 6ecdfff..0000000 --- a/test/test_Database.py +++ /dev/null @@ -1,190 +0,0 @@ -from unittest import TestCase -import sqlite3 -from database import Database, File, Website, InvalidQueryException -import os - - -class DatabaseTest(TestCase): - - def tearDown(self): - if os.path.exists("test.sqlite3"): - os.remove("test.sqlite3") - - def test_init_database_existing(self): - - with open("test.sqlite3", "w"): - pass - - Database("test.sqlite3") - - self.assertEqual(os.path.getsize("test.sqlite3"), 0) - - def test_init_database_new(self): - - Database("test.sqlite3") - - conn = sqlite3.connect("test.sqlite3") - cur = conn.cursor() - - self.assertTrue(cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='Website'")) - - conn.close() - - def test_insert_website(self): - - db = Database("test.sqlite3") - website_id = db.insert_website(Website("https://google.ca", "127.0.0.1", "firefox")) - - conn = sqlite3.connect("test.sqlite3") - cursor = conn.cursor() - cursor.execute("SELECT * FROM Website WHERE id=?", (website_id, )) - - db_website = cursor.fetchone() - - self.assertEqual(db_website[0], 1) - self.assertEqual(db_website[1], "https://google.ca") - self.assertEqual(db_website[2], "127.0.0.1") - self.assertEqual(db_website[3], "firefox") - self.assertIsNotNone(db_website[4]) - - def test_insert_files(self): - - db = Database("test.sqlite3") - website_id = db.insert_website(Website("", "", "")) - db.insert_files([File(website_id, "/some/dir/", "text/plain", "file.txt", 1234)]) - - conn = sqlite3.connect("test.sqlite3") - cursor = conn.cursor() - - cursor.execute("SELECT * FROM File WHERE id=?", (1, )) - db_file = cursor.fetchone() - - cursor.execute("SELECT * FROM WebsitePath WHERE id=?", (db_file[1], )) - db_path = cursor.fetchone() - - self.assertEqual(db_file[0], 1) - self.assertEqual(db_file[1], db_path[0]) - self.assertEqual(db_file[3], "file.txt") - self.assertEqual(db_file[4], 1234) - self.assertEqual(db_path[1], website_id) - self.assertEqual(db_path[2], "/some/dir/") - - def test_import_json(self): - - db = Database("test.sqlite3") - - website_url = "http://google.ca/" - logged_ip = "127.0.0.1" - logged_useragent = "firefox" - - db.import_json("test/test_scan1.json", Website(website_url, logged_ip, logged_useragent)) - - with sqlite3.connect("test.sqlite3") as conn: - cursor = conn.cursor() - - cursor.execute("SELECT * FROM File WHERE name='Bleach - Chapter 001.cbz'") - db_file1 = cursor.fetchone() - - self.assertEqual(db_file1[4], 8770750) - - cursor.execute("SELECT * FROM File WHERE name='Bleach - Chapter 007.cbz'") - db_file2 = cursor.fetchone() - - self.assertEqual(db_file2[4], 3443820) - - def test_select_website(self): - - db = Database("test.sqlite3") - - website_id = db.insert_website(Website("https://simon987.net/", "127.0.0.1", "firefox")) - - website = db.get_website_by_url("https://simon987.net/") - - self.assertEqual(website.url, "https://simon987.net/") - self.assertEqual(website.logged_ip, "127.0.0.1") - self.assertEqual(website.logged_useragent, "firefox") - self.assertEqual(website.id, website_id) - self.assertIsNotNone(website.last_modified) - - self.assertIsNone(db.get_website_by_url("does not exist")) - - def test_enqueue(self): - - db = Database("test.sqlite3") - - web_id = db.insert_website(Website("https://simon987.net", "127.0.0.1", "firefox")) - - db.enqueue(web_id) - db.enqueue(web_id) - - with sqlite3.connect("test.sqlite3") as conn: - cursor = conn.cursor() - - cursor.execute("SELECT * FROM Queue") - db_queued_website = cursor.fetchone() - - self.assertEqual(db_queued_website[0], 1) - self.assertEqual(db_queued_website[1], web_id) - self.assertIsNone(cursor.fetchone()) - - def test_dequeue(self): - - db = Database("test.sqlite3") - - web_id_1 = db.insert_website(Website("", "", "")) - web_id_2 = db.insert_website(Website("", "", "")) - - db.enqueue(web_id_1) - db.enqueue(web_id_2, "postid") - - self.assertEqual(db.dequeue()[0], web_id_1) - self.assertEqual(db.dequeue()[1], "postid") - self.assertEqual(db.dequeue(), None) - self.assertEqual(db.dequeue(), None) - - def test_queue(self): - - db = Database("test.sqlite3") - - db.enqueue(db.insert_website(Website("w1", "i1", "a1"))) - db.enqueue(db.insert_website(Website("w2", "i2", "a2"))) - db.enqueue(db.insert_website(Website("w3", "i3", "a3"))) - - queue = db.queue() - - self.assertEqual(queue[0].url, "w1") - self.assertEqual(queue[1].logged_ip, "i2") - self.assertEqual(queue[2].logged_useragent, "a3") - self.assertIsNotNone(queue[2].last_modified) - self.assertEqual(len(queue), 3) - - def test_get_website_by_id(self): - - db = Database("test.sqlite3") - - website_id = db.insert_website(Website("a", "b", "c")) - - website = db.get_website_by_id(website_id) - - self.assertEqual(website.id, website_id) - self.assertEqual(website.url, "a") - self.assertEqual(website.logged_ip, "b") - self.assertEqual(website.logged_useragent, "c") - self.assertIsNone(db.get_website_by_id(999)) - - def test_search_handle_invalid_query(self): - - db = Database("test.sqlite3") - - with self.assertRaises(InvalidQueryException): - db.search(";DROP DATABASE;") - with self.assertRaises(InvalidQueryException): - db.search("invalidCol:") - with self.assertRaises(InvalidQueryException): - db.search("*test*") - - def test_stats(self): - - db = Database("test.sqlite3") - - db.get_stats() # todo test \ No newline at end of file diff --git a/test/test_crawl_server.py b/test/test_crawl_server.py new file mode 100644 index 0000000..57cc23d --- /dev/null +++ b/test/test_crawl_server.py @@ -0,0 +1,53 @@ +from flask_testing import LiveServerTestCase +import os +import json +import requests +from crawl_server.server import app +from crawl_server.task_manager import TaskManager + + +class CrawlServerTest(LiveServerTestCase): + + headers = { + "Content-Type": "application/json" + } + + HOST = "http://localhost:9999" + + def create_app(self): + + self.app = app + app.config['LIVESERVER_PORT'] = 9999 + return app + + def test_put_only_accepts_json(self): + + payload = json.dumps({"url": "", "priority": 1, "callback_type": "", "callback_args": "{}"}) + r = requests.post(self.HOST + "/task/put", data=payload) + self.assertEqual(400, r.status_code) + + r2 = requests.post(self.HOST + "/task/put", headers=self.headers, data=payload) + self.assertEqual(200, r2.status_code) + + def test_put_task(self): + + payload = json.dumps({ + "url": "a", + "priority": 2, + "callback_type": "c", + "callback_args": '{"d": 4}' + }) + + requests.post(self.HOST + "/task/put", data=payload, headers=self.headers) + + r = requests.get(self.HOST + "/task") + self.assertEqual(200, r.status_code) + + print(r.text) + result = json.loads(r.text)[0] + self.assertEqual(result["url"], "a") + self.assertEqual(result["priority"], 2) + self.assertEqual(result["callback_type"], "c") + self.assertEqual(result["callback_args"], '{"d": 4}') + + diff --git a/test/test_input.py b/test/test_input.py deleted file mode 100644 index b7b26b0..0000000 --- a/test/test_input.py +++ /dev/null @@ -1,22 +0,0 @@ -from unittest import TestCase -from od_util import is_valid_url - - -class InputValidationTest(TestCase): - - def test_valid_url(self): - self.assertTrue(is_valid_url("https://google.ca/")) - self.assertTrue(is_valid_url("http://google.ca/")) - self.assertTrue(is_valid_url("http://www.google.ca/")) - self.assertTrue(is_valid_url("http://www.subdomain.google.ca/")) - self.assertTrue(is_valid_url("http://mộtsốkýtựngẫunhiên.whatever/")) - self.assertTrue(is_valid_url("http://simon987.net:1234/")) - self.assertTrue(is_valid_url("http://simon987.net:12345/")) - - def test_invalid_url(self): - - self.assertFalse(is_valid_url("ftp://simon987.net")) - self.assertFalse(is_valid_url("git://simon987.net")) - self.assertFalse(is_valid_url("simon987.net")) - self.assertFalse(is_valid_url("http://simon987.net:8080")) - self.assertFalse(is_valid_url("http://simon987/")) diff --git a/test/test_scan1.json b/test/test_scan1.json deleted file mode 100644 index 243ebef..0000000 --- a/test/test_scan1.json +++ /dev/null @@ -1,4 +0,0 @@ -[ -{"path": "/", "name": "Bleach - Chapter 001.cbz", "size": 8770750, "mime": "application/x-cbr"}, -{"path": "/", "name": "Bleach - Chapter 007.cbz", "size": 3443820, "mime": "application/x-cbr"} -] \ No newline at end of file