Should fix memory usage problem when crawling

This commit is contained in:
Simon 2018-06-14 23:36:54 -04:00
parent 9aed18c2d2
commit adb94cf326
6 changed files with 46 additions and 12 deletions

View File

@ -0,0 +1 @@
Crawled directories are temporarily stored here

View File

@ -1,5 +1,7 @@
import os import os
import json import logging
import ujson
import logging
from urllib.parse import urlparse from urllib.parse import urlparse
from timeout_decorator.timeout_decorator import TimeoutError from timeout_decorator.timeout_decorator import TimeoutError
from threading import Thread from threading import Thread
@ -23,7 +25,7 @@ class File:
return ("DIR " if self.is_dir else "FILE ") + self.path + "/" + self.name return ("DIR " if self.is_dir else "FILE ") + self.path + "/" + self.name
def to_json(self): def to_json(self):
return json.dumps({ return ujson.dumps({
"name": self.name, "name": self.name,
"size": self.size, "size": self.size,
"mtime": self.mtime, "mtime": self.mtime,
@ -117,19 +119,23 @@ class RemoteDirectoryCrawler:
threads.append(worker) threads.append(worker)
worker.start() worker.start()
in_q.join() files_written = [] # Pass array to worker to get result
print("Done") file_writer_thread = Thread(target=RemoteDirectoryCrawler._log_to_file, args=(files_q, out_file, files_written))
file_writer_thread.start()
exported_count = export_to_json(files_q, out_file) in_q.join()
print("exported to " + out_file) files_q.join()
print("Done")
# Kill threads # Kill threads
for _ in threads: for _ in threads:
in_q.put(None) in_q.put(None)
for t in threads: for t in threads:
t.join() t.join()
files_q.put(None)
file_writer_thread.join()
return CrawlResult(exported_count, "success") return CrawlResult(files_written[0], "success")
def _process_listings(self, url: str, in_q: Queue, files_q: Queue): def _process_listings(self, url: str, in_q: Queue, files_q: Queue):
@ -175,4 +181,27 @@ class RemoteDirectoryCrawler:
finally: finally:
in_q.task_done() in_q.task_done()
@staticmethod
def _log_to_file(files_q: Queue, out_file: str, files_written: list):
counter = 0
with open(out_file, "w") as f:
while True:
try:
file = files_q.get(timeout=30)
except Empty:
break
if file is None:
break
f.write(file.to_json() + "\n")
counter += 1
files_q.task_done()
files_written.append(counter)

View File

@ -36,7 +36,7 @@ class HttpDirectory(RemoteDirectory):
def __init__(self, url): def __init__(self, url):
super().__init__(url) super().__init__(url)
self.parser = etree.HTMLParser(collect_ids=False) self.parser = etree.HTMLParser(collect_ids=False, encoding='utf-8')
def list_dir(self, path) -> list: def list_dir(self, path) -> list:
results = [] results = []
@ -103,7 +103,7 @@ class HttpDirectory(RemoteDirectory):
while retries > 0: while retries > 0:
try: try:
r = requests.get(url, headers=HttpDirectory.HEADERS) r = requests.get(url, headers=HttpDirectory.HEADERS)
return r.content return r.text
except RequestException: except RequestException:
retries -= 1 retries -= 1

View File

@ -9,7 +9,7 @@ auth = HTTPTokenAuth(scheme="Token")
tokens = [config.CRAWL_SERVER_TOKEN] tokens = [config.CRAWL_SERVER_TOKEN]
tm = TaskManager("tm_db.sqlite3", 64) tm = TaskManager("tm_db.sqlite3", 32)
@auth.verify_token @auth.verify_token

View File

@ -73,7 +73,10 @@ class TaskManager:
@staticmethod @staticmethod
def task_complete(result): def task_complete(result):
task_result, db_path, current_tasks = result.result() try:
task_result, db_path, current_tasks = result.result()
except Exception as e:
print("Exception during task " + str(e))
print(task_result.status_code) print(task_result.status_code)
print(task_result.file_count) print(task_result.file_count)

View File

@ -107,7 +107,8 @@ class ElasticSearchEngine(SearchEngine):
if len(docs) >= import_every: if len(docs) >= import_every:
self._index(docs) self._index(docs)
docs.clear() docs.clear()
self._index(docs) if docs:
self._index(docs)
def _index(self, docs): def _index(self, docs):
print("Indexing " + str(len(docs)) + " docs") print("Indexing " + str(len(docs)) + " docs")