diff --git a/crawl_server/database.py b/crawl_server/database.py index dfff4d1..c0acc25 100644 --- a/crawl_server/database.py +++ b/crawl_server/database.py @@ -118,9 +118,8 @@ class TaskManagerDatabase: cursor.execute("SELECT status_code, file_count, start_time, end_time, website_id" " FROM TaskResult WHERE indexed_time IS NULL") db_result = cursor.fetchall() - print(len(db_result)) - cursor.execute("UPDATE TaskResult SET indexed_time = CURRENT_TIMESTAMP WHERE indexed_time IS NULL") + cursor.execute("UPDATE TaskResult SET indexed_time=CURRENT_TIMESTAMP WHERE indexed_time IS NULL") conn.commit() return [TaskResult(r[0], r[1], r[2], r[3], r[4]) for r in db_result] diff --git a/crawl_server/server.py b/crawl_server/server.py index ac16364..078077d 100644 --- a/crawl_server/server.py +++ b/crawl_server/server.py @@ -1,4 +1,4 @@ -from flask import Flask, request, abort, Response +from flask import Flask, request, abort, Response, send_from_directory import json from crawl_server.task_manager import TaskManager, Task, TaskResult app = Flask(__name__) @@ -45,5 +45,10 @@ def get_current_tasks(): return current_tasks +@app.route("/file_list//") +def get_file_list(website_id): + return send_from_directory(directory="./crawled/", filename=str(website_id) + ".json") + + if __name__ == "__main__": app.run(port=5001) diff --git a/task_db_init.sql b/crawl_server/task_db_init.sql similarity index 100% rename from task_db_init.sql rename to crawl_server/task_db_init.sql diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index 4957893..32c0711 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -55,14 +55,13 @@ class TaskManager: print("Starting task " + task.url) crawler = RemoteDirectoryCrawler(task.url, 100) - crawl_result = crawler.crawl_directory("crawled/" + str(task.website_id) + ".json") + crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") result.file_count = crawl_result.file_count result.status_code = crawl_result.status_code - print("End task " + task.url) - result.end_time = datetime.utcnow() + print("End task " + task.url) return dict(result=result, db_path=db_path) @@ -77,6 +76,7 @@ class TaskManager: db = TaskManagerDatabase(db_path) db.log_result(result) + print("Logged result to DB") @staticmethod def task_error(err): diff --git a/search/search.py b/search/search.py index 76386b8..101cab7 100644 --- a/search/search.py +++ b/search/search.py @@ -11,7 +11,7 @@ class SearchEngine: def __init__(self): pass - def import_json(self, in_file: str, website_id: int): + def import_json(self, in_str: str, website_id: int): raise NotImplementedError def search(self, query) -> {}: @@ -79,21 +79,19 @@ class ElasticSearchEngine(SearchEngine): def ping(self): return self.es.ping() - def import_json(self, in_file: str, website_id: int): + def import_json(self, in_str: str, website_id: int): import_every = 1000 - with open(in_file, "r") as f: - docs = [] + print(in_str) + docs = [] - line = f.readline() - while line: - docs.append(line[:-1]) # Remove trailing new line + for line in in_str.splitlines(): + docs.append(line) - if len(docs) >= import_every: - self._index(docs, website_id) - docs.clear() - line = f.readline() - self._index(docs, website_id) + if len(docs) >= import_every: + self._index(docs, website_id) + docs.clear() + self._index(docs, website_id) def _index(self, docs, website_id): print("Indexing " + str(len(docs)) + " docs") @@ -107,14 +105,10 @@ class ElasticSearchEngine(SearchEngine): @staticmethod def create_bulk_index_string(docs: list, website_id: int): - result = "" - action_string = '{"index":{}}\n' website_id_string = ',"website_id":' + str(website_id) + '}\n' # Add website_id param to each doc - for doc in docs: - result += action_string + doc[:-1] + website_id_string - return result + return "\n".join("".join([action_string, doc[:-1], website_id_string]) for doc in docs) def search(self, query) -> {}: diff --git a/task.py b/task.py index 4f9fc5f..ae69dec 100644 --- a/task.py +++ b/task.py @@ -1,4 +1,5 @@ from apscheduler.schedulers.background import BackgroundScheduler +from search.search import ElasticSearchEngine from crawl_server.database import Task, TaskResult import requests import json @@ -46,6 +47,11 @@ class CrawlServer: for t in json.loads(r.text) ] + def get_file_list(self, website_id) -> str: + + r = requests.get(self.url + "/file_list/" + str(website_id) + "/") + return r.text + class TaskDispatcher: @@ -58,19 +64,20 @@ class TaskDispatcher: scheduler.add_job(self.check_completed_tasks, "interval", seconds=1) scheduler.start() + self.search = ElasticSearchEngine("od-database") + # TODO load from config self.crawl_servers = [ CrawlServer("http://localhost:5001"), ] def check_completed_tasks(self): - completed_tasks = [] for server in self.crawl_servers: - completed_tasks.extend(server.get_completed_tasks()) - - if completed_tasks: - print(str(len(completed_tasks)) + " completed tasks. Will index immediately") + for task in server.get_completed_tasks(): + print("Completed task") + file_list = server.get_file_list(task.website_id) + self.search.import_json(file_list, task.website_id) def dispatch_task(self, task: Task): self._get_available_crawl_server().queue_task(task) diff --git a/test/test_search.py b/test/test_search.py index 14c8cbb..f94c165 100644 --- a/test/test_search.py +++ b/test/test_search.py @@ -24,11 +24,11 @@ class SearchTest(TestCase): {"name": "Dead Racer", "size": 1000, "path": "Speed Machine [FLAC]", "mtime": 12345} ] - with open("tmp.json", "w") as f: - for file in files: - f.write(json.dumps(file) + "\n") + in_str = "" + for file in files: + in_str += json.dumps(file) + "\n" - self.search.import_json("tmp.json", 123) + self.search.import_json(in_str, 123) time.sleep(2) self.assertEqual(4, self.search.es.count(self.search.index_name, "file")["count"]) @@ -50,8 +50,6 @@ class SearchTest(TestCase): page = self.search.search("10'000") self.assertEqual(1, page["hits"]["total"]) - os.remove("tmp.json") - def test_scroll(self): files = [ @@ -61,11 +59,11 @@ class SearchTest(TestCase): {"name": "Dead Racer", "size": 1000, "path": "Speed Machine [FLAC]", "mtime": 12345} ] - with open("tmp.json", "w") as f: - for file in files: - f.write(json.dumps(file) + "\n") + in_str = "" + for file in files: + in_str += json.dumps(file) + "\n" - self.search.import_json("tmp.json", 123) + self.search.import_json(in_str, 123) time.sleep(2) page = self.search.search("")