diff --git a/crawl_server/server.py b/crawl_server/server.py index 99d4527..652dd08 100644 --- a/crawl_server/server.py +++ b/crawl_server/server.py @@ -1,4 +1,4 @@ -from flask import Flask, request, abort, Response +from flask import Flask, request, abort, Response, send_file from flask_httpauth import HTTPTokenAuth import json from crawl_server.task_manager import TaskManager, Task @@ -14,7 +14,6 @@ tm = TaskManager("tm_db.sqlite3", 8) @auth.verify_token def verify_token(token): - print(token) if token in tokens: return True @@ -68,15 +67,10 @@ def get_file_list(website_id): file_name = "./crawled/" + str(website_id) + ".json" if os.path.exists(file_name): - with open(file_name, "r") as f: - file_list = f.read() - - os.remove(file_name) - - return file_list + return send_file(file_name) else: return abort(404) if __name__ == "__main__": - app.run(port=5002) + app.run(port=5001) diff --git a/requirements.txt b/requirements.txt index d69162a..264574d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,4 @@ ftputil lxml elasticsearch python-dateutil -flask_httpauth \ No newline at end of file +flask_httpauth diff --git a/search/search.py b/search/search.py index 31b84e5..1a19e18 100644 --- a/search/search.py +++ b/search/search.py @@ -1,7 +1,7 @@ import elasticsearch from elasticsearch import helpers import os -import json +import ujson class IndexingError(Exception): @@ -90,17 +90,14 @@ class ElasticSearchEngine(SearchEngine): def ping(self): return self.es.ping() - def import_json(self, in_str: str, website_id: int): + def import_json(self, in_lines, website_id: int): - if not in_str: - return - - import_every = 5000 + import_every = 25000 docs = [] - for line in in_str.splitlines(): - doc = json.loads(line) + for line in in_lines: + doc = ujson.loads(line) name, ext = os.path.splitext(doc["name"]) doc["ext"] = ext[1:].lower() if ext and len(ext) > 1 else "" doc["name"] = name @@ -125,7 +122,7 @@ class ElasticSearchEngine(SearchEngine): def create_bulk_index_string(docs: list): action_string = '{"index":{}}\n' - return "\n".join("".join([action_string, json.dumps(doc)]) for doc in docs) + return "\n".join("".join([action_string, ujson.dumps(doc)]) for doc in docs) def search(self, query, page, per_page, sort_order) -> {}: diff --git a/stress_test.py b/stress_test.py index 1d23edc..2fb10a5 100644 --- a/stress_test.py +++ b/stress_test.py @@ -51,8 +51,8 @@ def get_random_file(): doc = dict() doc["name"] = random_file_name() doc["path"] = random_path() - doc["mtime"] = random.randint(0, 10000000) - doc["size"] = random.randint(-1, 100000000000000) + doc["mtime"] = random.randint(0, 1000000000000) + doc["size"] = random.randint(-1, 1000000000) return doc @@ -80,8 +80,7 @@ def random_searches(count=10000000, max_workers=1000): pool.map(search, random.choices(terms, k=count)) - # dump_local_filesystem("/mnt/") -# index_file_list("crawl_server/crawled/123.json", 10) +# index_file_list("random_dump.json", 1000) # random_searches(100000) -dump_random_files(20000 * 100000) +# dump_random_files(20000 * 100000) diff --git a/task.py b/task.py index aa06b67..e559a15 100644 --- a/task.py +++ b/task.py @@ -31,7 +31,7 @@ class CrawlServer: def fetch_completed_tasks(self) -> list: try: - r = requests.get(self.url + "/task/completed") + r = requests.get(self.url + "/task/completed", headers=CrawlServer.headers) return [ TaskResult(r["status_code"], r["file_count"], r["start_time"], r["end_time"], r["website_id"]) for r in json.loads(r.text)] @@ -42,7 +42,7 @@ class CrawlServer: def fetch_queued_tasks(self) -> list: try: - r = requests.get(self.url + "/task/") + r = requests.get(self.url + "/task/", headers=CrawlServer.headers) return [ Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) for t in json.loads(r.text) @@ -53,7 +53,7 @@ class CrawlServer: def fetch_current_tasks(self): try: - r = requests.get(self.url + "/task/current") + r = requests.get(self.url + "/task/current", headers=CrawlServer.headers) return [ Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"]) for t in json.loads(r.text) @@ -64,8 +64,9 @@ class CrawlServer: def fetch_website_files(self, website_id) -> str: try: - r = requests.get(self.url + "/file_list/" + str(website_id) + "/") - return r.text if r.status_code == 200 else "" + r = requests.get(self.url + "/file_list/" + str(website_id) + "/", stream=True, headers=CrawlServer.headers) + for line in r.iter_lines(chunk_size=1024 * 256): + yield line except ConnectionError: return "" @@ -74,7 +75,7 @@ class TaskDispatcher: def __init__(self): scheduler = BackgroundScheduler() - scheduler.add_job(self.check_completed_tasks, "interval", seconds=1) + scheduler.add_job(self.check_completed_tasks, "interval", seconds=10) scheduler.start() self.search = ElasticSearchEngine("od-database")