From adb94cf32624f191ea1a662471dba232f9f8f0b4 Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 14 Jun 2018 23:36:54 -0400 Subject: [PATCH] Should fix memory usage problem when crawling --- crawl_server/crawled/README.md | 1 + crawl_server/crawler.py | 43 ++++++++++++++++++++++++++++------ crawl_server/remote_http.py | 4 ++-- crawl_server/server.py | 2 +- crawl_server/task_manager.py | 5 +++- search/search.py | 3 ++- 6 files changed, 46 insertions(+), 12 deletions(-) create mode 100644 crawl_server/crawled/README.md diff --git a/crawl_server/crawled/README.md b/crawl_server/crawled/README.md new file mode 100644 index 0000000..495afc7 --- /dev/null +++ b/crawl_server/crawled/README.md @@ -0,0 +1 @@ +Crawled directories are temporarily stored here \ No newline at end of file diff --git a/crawl_server/crawler.py b/crawl_server/crawler.py index 6913b18..25e9fe5 100644 --- a/crawl_server/crawler.py +++ b/crawl_server/crawler.py @@ -1,5 +1,7 @@ import os -import json +import logging +import ujson +import logging from urllib.parse import urlparse from timeout_decorator.timeout_decorator import TimeoutError from threading import Thread @@ -23,7 +25,7 @@ class File: return ("DIR " if self.is_dir else "FILE ") + self.path + "/" + self.name def to_json(self): - return json.dumps({ + return ujson.dumps({ "name": self.name, "size": self.size, "mtime": self.mtime, @@ -117,19 +119,23 @@ class RemoteDirectoryCrawler: threads.append(worker) worker.start() - in_q.join() - print("Done") + files_written = [] # Pass array to worker to get result + file_writer_thread = Thread(target=RemoteDirectoryCrawler._log_to_file, args=(files_q, out_file, files_written)) + file_writer_thread.start() - exported_count = export_to_json(files_q, out_file) - print("exported to " + out_file) + in_q.join() + files_q.join() + print("Done") # Kill threads for _ in threads: in_q.put(None) for t in threads: t.join() + files_q.put(None) + file_writer_thread.join() - return CrawlResult(exported_count, "success") + return CrawlResult(files_written[0], "success") def _process_listings(self, url: str, in_q: Queue, files_q: Queue): @@ -175,4 +181,27 @@ class RemoteDirectoryCrawler: finally: in_q.task_done() + @staticmethod + def _log_to_file(files_q: Queue, out_file: str, files_written: list): + + counter = 0 + + with open(out_file, "w") as f: + while True: + + try: + file = files_q.get(timeout=30) + except Empty: + break + + if file is None: + break + + f.write(file.to_json() + "\n") + counter += 1 + files_q.task_done() + + files_written.append(counter) + + diff --git a/crawl_server/remote_http.py b/crawl_server/remote_http.py index d8d2e24..141b618 100644 --- a/crawl_server/remote_http.py +++ b/crawl_server/remote_http.py @@ -36,7 +36,7 @@ class HttpDirectory(RemoteDirectory): def __init__(self, url): super().__init__(url) - self.parser = etree.HTMLParser(collect_ids=False) + self.parser = etree.HTMLParser(collect_ids=False, encoding='utf-8') def list_dir(self, path) -> list: results = [] @@ -103,7 +103,7 @@ class HttpDirectory(RemoteDirectory): while retries > 0: try: r = requests.get(url, headers=HttpDirectory.HEADERS) - return r.content + return r.text except RequestException: retries -= 1 diff --git a/crawl_server/server.py b/crawl_server/server.py index 40622fd..35d9d20 100644 --- a/crawl_server/server.py +++ b/crawl_server/server.py @@ -9,7 +9,7 @@ auth = HTTPTokenAuth(scheme="Token") tokens = [config.CRAWL_SERVER_TOKEN] -tm = TaskManager("tm_db.sqlite3", 64) +tm = TaskManager("tm_db.sqlite3", 32) @auth.verify_token diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index ac4710d..47ac3cb 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -73,7 +73,10 @@ class TaskManager: @staticmethod def task_complete(result): - task_result, db_path, current_tasks = result.result() + try: + task_result, db_path, current_tasks = result.result() + except Exception as e: + print("Exception during task " + str(e)) print(task_result.status_code) print(task_result.file_count) diff --git a/search/search.py b/search/search.py index c7dec54..c0eee63 100644 --- a/search/search.py +++ b/search/search.py @@ -107,7 +107,8 @@ class ElasticSearchEngine(SearchEngine): if len(docs) >= import_every: self._index(docs) docs.clear() - self._index(docs) + if docs: + self._index(docs) def _index(self, docs): print("Indexing " + str(len(docs)) + " docs")