diff --git a/crawl_server/server.py b/crawl_server/server.py index 16889b7..b7b65fc 100644 --- a/crawl_server/server.py +++ b/crawl_server/server.py @@ -72,6 +72,17 @@ def get_file_list(website_id): return abort(404) +@app.route("/file_list//free") +@auth.login_required +def free_file_list(website_id): + file_name = "./crawled/" + str(website_id) + ".json" + if os.path.exists(file_name): + os.remove(file_name) + return '{"ok": "true"}' + else: + return abort(404) + + @app.route("/task/logs/") @auth.login_required def get_task_logs(): diff --git a/database.py b/database.py index 2d2e3a6..7b8249a 100644 --- a/database.py +++ b/database.py @@ -45,6 +45,14 @@ class Database: conn.executescript(init_script) conn.commit() + def update_website_date_if_exists(self, website_id): + + with sqlite3.connect(self.db_path) as conn: + + cursor = conn.cursor() + cursor.execute("UPDATE Website SET last_modified=CURRENT_TIMESTAMP WHERE id=?", (website_id, )) + conn.commit() + def insert_website(self, website: Website): with sqlite3.connect(self.db_path) as conn: diff --git a/debug_put.py b/debug_put.py index 2b27f3c..3629b2d 100644 --- a/debug_put.py +++ b/debug_put.py @@ -3,9 +3,9 @@ import json payload = json.dumps({ - "website_id": 123, - "url": "ftp://132.249.213.137", - # "url": "http://localhost:8000/", + "website_id": 3, + # "url": "ftp://132.249.213.137", + "url": "http://localhost:8000/", # "url": "http://ubuntu.mirrorservice.org/", "priority": 2, "callback_type": "", diff --git a/search/search.py b/search/search.py index fe0ff2d..58fe940 100644 --- a/search/search.py +++ b/search/search.py @@ -91,6 +91,22 @@ class ElasticSearchEngine(SearchEngine): def ping(self): return self.es.ping() + def delete_docs(self, website_id): + + try: + print("Deleting docs of " + str(website_id)) + self.es.delete_by_query(body={ + "query": { + "constant_score": { + "filter": { + "term": {"website_id": website_id} + } + } + } + }, index=self.index_name) + except elasticsearch.exceptions.ConflictError: + print("Error: multiple delete tasks at the same time") + def import_json(self, in_lines, website_id: int): import_every = 5000 @@ -270,7 +286,8 @@ class ElasticSearchEngine(SearchEngine): stats["es_index_size"] = es_stats["indices"][self.index_name]["total"]["store"]["size_in_bytes"] stats["es_search_count"] = es_stats["indices"][self.index_name]["total"]["search"]["query_total"] stats["es_search_time"] = es_stats["indices"][self.index_name]["total"]["search"]["query_time_in_millis"] - stats["es_search_time_avg"] = stats["es_search_time"] / (stats["es_search_count"] if stats["es_search_count"] != 0 else 1) + stats["es_search_time_avg"] = stats["es_search_time"] / ( + stats["es_search_count"] if stats["es_search_count"] != 0 else 1) stats["total_count"] = es_stats["indices"][self.index_name]["total"]["indexing"]["index_total"] stats["total_count_nonzero"] = total_stats["hits"]["total"] stats["total_size"] = total_stats["aggregations"]["file_stats"]["sum"] diff --git a/task.py b/task.py index fc72160..5366acc 100644 --- a/task.py +++ b/task.py @@ -5,6 +5,7 @@ import requests from requests.exceptions import ConnectionError import json import config +from database import Database class CrawlServer: @@ -71,6 +72,15 @@ class CrawlServer: except ConnectionError: return "" + def free_website_files(self, website_id) -> bool: + + try: + r = requests.get(self.url + "/file_list/" + str(website_id) + "/free", headers=CrawlServer.headers) + return r.status_code == 200 + except ConnectionError as e: + print(e) + return False + def fetch_crawl_logs(self): try: @@ -97,6 +107,7 @@ class TaskDispatcher: scheduler.start() self.search = ElasticSearchEngine("od-database") + self.db = Database("db.sqlite3") # TODO load from config self.crawl_servers = [ @@ -108,10 +119,18 @@ class TaskDispatcher: for server in self.crawl_servers: for task in server.fetch_completed_tasks(): print("Completed task") + # All files are overwritten + self.search.delete_docs(task.website_id) file_list = server.fetch_website_files(task.website_id) if file_list: self.search.import_json(file_list, task.website_id) + # Update last_modified date for website + self.db.update_website_date_if_exists(task.website_id) + + # File list is safe to delete once indexed + server.free_website_files(task.website_id) + def dispatch_task(self, task: Task): self._get_available_crawl_server().queue_task(task)