From 344e7274d704f7b6855bc40804b1606caaabdead Mon Sep 17 00:00:00 2001 From: Simon Date: Sun, 17 Jun 2018 22:10:46 -0400 Subject: [PATCH] Simplified url joining and splitting, switched from lxml to html.parser, various memory usage optimizations --- crawl_server/crawler.py | 21 +---- crawl_server/remote_http.py | 171 ++++++++++++++++++++--------------- crawl_server/task_manager.py | 29 ++++-- debug_put.py | 3 +- requirements.txt | 1 - stress_test.py | 12 +++ 6 files changed, 136 insertions(+), 101 deletions(-) diff --git a/crawl_server/crawler.py b/crawl_server/crawler.py index fe2a841..0bea6fd 100644 --- a/crawl_server/crawler.py +++ b/crawl_server/crawler.py @@ -11,6 +11,7 @@ class TooManyConnectionsError(Exception): class File: + __slots__ = "name", "size", "mtime", "path", "is_dir" def __init__(self, name: str, size: int, mtime: int, path: str, is_dir: bool): self.name = name @@ -61,22 +62,6 @@ class RemoteDirectoryFactory: return dir_engine(url) -def export_to_json(q: Queue, out_file: str) -> int: - - counter = 0 - - with open(out_file, "w") as f: - while True: - try: - next_file = q.get_nowait() - f.write(next_file.to_json() + "\n") - counter += 1 - except Empty: - break - - return counter - - class CrawlResult: def __init__(self, file_count: int, status_code: str): @@ -95,6 +80,8 @@ class RemoteDirectoryCrawler: def crawl_directory(self, out_file: str) -> CrawlResult: + import gc + gc.set_debug(gc.DEBUG_LEAK) try: directory = RemoteDirectoryFactory.get_directory(self.url) root_listing = directory.list_dir("") @@ -133,6 +120,7 @@ class RemoteDirectoryCrawler: files_q.put(None) file_writer_thread.join() + return CrawlResult(files_written[0], "success") def _process_listings(self, url: str, in_q: Queue, files_q: Queue): @@ -161,6 +149,7 @@ class RemoteDirectoryCrawler: in_q.put(os.path.join(f.path, f.name, "")) else: files_q.put(f) + import sys print("LISTED " + repr(path) + "dirs:" + str(in_q.qsize())) except TooManyConnectionsError: print("Too many connections") diff --git a/crawl_server/remote_http.py b/crawl_server/remote_http.py index 7fdb78f..2775703 100644 --- a/crawl_server/remote_http.py +++ b/crawl_server/remote_http.py @@ -1,7 +1,7 @@ -from urllib.parse import urljoin, unquote +from urllib.parse import urljoin, unquote, quote import os -from lxml import etree +from html.parser import HTMLParser from itertools import repeat from crawl_server.crawler import RemoteDirectory, File import requests @@ -11,6 +11,45 @@ import config from dateutil.parser import parse as parse_date +class Anchor: + def __init__(self): + self.text = None + self.href = None + + +class HTMLAnchorParser(HTMLParser): + + def __init__(self): + super().__init__() + self.anchors = [] + self.current_anchor = None + + def handle_starttag(self, tag, attrs): + if tag == "a": + for attr in attrs: + if attr[0] == "href": + self.current_anchor = Anchor() + self.current_anchor.href = attr[1] + break + + def handle_data(self, data): + if self.current_anchor: + self.current_anchor.text = data + + def handle_endtag(self, tag): + if tag == "a": + if self.current_anchor: + self.anchors.append(self.current_anchor) + self.current_anchor = None + + def error(self, message): + pass + + def feed(self, data): + self.anchors.clear() + super().feed(data) + + class HttpDirectory(RemoteDirectory): SCHEMES = ("http", "https",) @@ -29,42 +68,39 @@ class HttpDirectory(RemoteDirectory): def __init__(self, url): super().__init__(url) - self.parser = etree.HTMLParser(collect_ids=False, encoding='utf-8') self.session = requests.Session() self.session.headers = HttpDirectory.HEADERS + self.session.verify = False + self.session.max_redirects = 1 - def list_dir(self, path) -> list: - results = [] + def list_dir(self, path): - path_url = os.path.join(self.base_url, path.strip("/"), "") - body, encoding = self._fetch_body(path_url) + path_url = self.base_url + path.strip("/") + "/" + body = self._stream_body(path_url) if not body: - return [] - links = self._parse_links(body, encoding) + return None + anchors = self._parse_links(body) urls_to_request = [] - for link in links: - if self._should_ignore(self.base_url, link): + for anchor in anchors: + if self._should_ignore(self.base_url, anchor): continue - file_url = urljoin(path_url, link[1]) - path, file_name = os.path.split(file_url[len(self.base_url) - 1:]) - - if self._isdir(link): - results.append(File( - name=file_name, + if self._isdir(anchor): + yield File( + name=anchor.href, mtime=None, size=None, path=path, is_dir=True - )) + ) else: - urls_to_request.append(file_url) + pass + urls_to_request.append(path_url + anchor.href) - results.extend(self.request_files(urls_to_request)) - - return results + for file in self.request_files(urls_to_request): + yield file def request_files(self, urls_to_request: list) -> list: @@ -73,61 +109,13 @@ class HttpDirectory(RemoteDirectory): pool = ThreadPool(processes=10) files = pool.starmap(HttpDirectory._request_file, zip(repeat(self), urls_to_request)) pool.close() - return [f for f in files if f] + return (f for f in files if f) else: # Too few urls to create thread pool - results = [] for url in urls_to_request: file = self._request_file(url) if file: - results.append(file) - - return results - - def _get_url(self, path: str): - return urljoin(self.base_url, path) - - def _fetch_body(self, url: str): - - retries = HttpDirectory.MAX_RETRIES - while retries > 0: - try: - r = self.session.get(url, timeout=40) - return r.content, r.encoding - except RequestException: - retries -= 1 - - return None - - def _parse_links(self, body: bytes, encoding) -> list: - - result = list() - try: - tree = etree.HTML(body, parser=self.parser) - links = [] - try: - links = tree.findall(".//a/[@href]") - except AttributeError: - pass - - for link in links: - result.append((link.text, link.get("href"))) - except UnicodeDecodeError: - tree = etree.HTML(body.decode(encoding, errors="ignore").encode("utf-8"), parser=self.parser) - links = [] - try: - links = tree.findall(".//a/[@href]") - except AttributeError: - pass - - for link in links: - result.append((link.text, link.get("href"))) - - return result - - @staticmethod - def _isdir(link: tuple): - return link[1].rsplit("?", maxsplit=1)[0].endswith("/") + yield file def _request_file(self, url): @@ -148,19 +136,52 @@ class HttpDirectory(RemoteDirectory): is_dir=False ) except RequestException: + self.session.close() + retries -= 1 + + return None + + def _stream_body(self, url: str): + + retries = HttpDirectory.MAX_RETRIES + while retries > 0: + try: + r = self.session.get(url, stream=True, timeout=40) + for chunk in r.iter_content(chunk_size=4096): + yield chunk + r.close() + del r + break + except RequestException: + self.session.close() retries -= 1 return None @staticmethod - def _should_ignore(base_url, link: tuple): - if link[0] == "../" or link[1].endswith(HttpDirectory.BLACK_LIST): + def _parse_links(body): + + parser = HTMLAnchorParser() + + for chunk in body: + parser.feed(chunk.decode("utf-8")) + for anchor in parser.anchors: + yield anchor + + @staticmethod + def _isdir(link: Anchor): + return link.href.endswith("/") + + @staticmethod + def _should_ignore(base_url, link: Anchor): + if link.text == "../" or link.href.endswith(HttpDirectory.BLACK_LIST): return True # Ignore external links - if link[1].startswith("http") and not link[1].startswith(base_url): + if link.href.startswith("http") and not link.href.startswith(base_url): return True def close(self): self.session.close() + diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index 13dad72..10729a9 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -1,6 +1,5 @@ from crawl_server.database import TaskManagerDatabase, Task, TaskResult -from concurrent.futures import ProcessPoolExecutor -from multiprocessing import Manager +from multiprocessing import Manager, Pool from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime from crawl_server.crawler import RemoteDirectoryCrawler @@ -12,7 +11,7 @@ class TaskManager: def __init__(self, db_path, max_processes=2): self.db_path = db_path self.db = TaskManagerDatabase(db_path) - self.pool = ProcessPoolExecutor(max_workers=max_processes) + self.pool = Pool(maxtasksperchild=1, processes=max_processes) self.max_processes = max_processes manager = Manager() self.current_tasks = manager.list() @@ -41,21 +40,28 @@ class TaskManager: print("pooled " + task.url) self.current_tasks.append(task) - self.pool.submit( + self.pool.apply_async( TaskManager.run_task, - task, self.db_path, self.current_tasks - ).add_done_callback(TaskManager.task_complete) + args=(task, self.db_path, self.current_tasks), + callback=TaskManager.task_complete, + error_callback=TaskManager.task_error + ) @staticmethod def run_task(task, db_path, current_tasks): + + # import gc + # gc.set_debug(gc.DEBUG_LEAK) + result = TaskResult() result.start_time = datetime.utcnow() result.website_id = task.website_id print("Starting task " + task.url) - crawler = RemoteDirectoryCrawler(task.url, 10) + crawler = RemoteDirectoryCrawler(task.url, 20) crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") + del crawler result.file_count = crawl_result.file_count result.status_code = crawl_result.status_code @@ -70,11 +76,16 @@ class TaskManager: return result, db_path, current_tasks + @staticmethod + def task_error(result): + print("TASK ERROR") + raise result + @staticmethod def task_complete(result): try: - task_result, db_path, current_tasks = result.result() + task_result, db_path, current_tasks = result except Exception as e: print("Exception during task " + str(e)) return @@ -92,3 +103,5 @@ class TaskManager: if task.website_id == task_result.website_id: del current_tasks[i] + + diff --git a/debug_put.py b/debug_put.py index 8dd789d..a586bed 100644 --- a/debug_put.py +++ b/debug_put.py @@ -4,8 +4,9 @@ import json payload = json.dumps({ "website_id": 123, - "url": "http://alphamediazone.com/data/Movies1/", + # "url": "http://alphamediazone.com/data/Movies1/", # "url": "http://localhost:8000/", + "url": "http://ubuntu.mirrorservice.org/", "priority": 2, "callback_type": "", "callback_args": "{}" diff --git a/requirements.txt b/requirements.txt index f0db9e3..2aebe99 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,6 @@ humanfriendly apscheduler bcrypt ftputil -lxml elasticsearch python-dateutil flask_httpauth diff --git a/stress_test.py b/stress_test.py index 2fb10a5..2ce618c 100644 --- a/stress_test.py +++ b/stress_test.py @@ -1,5 +1,6 @@ import os import json +import shutil import sys from search.search import ElasticSearchEngine from concurrent.futures import ThreadPoolExecutor @@ -80,7 +81,18 @@ def random_searches(count=10000000, max_workers=1000): pool.map(search, random.choices(terms, k=count)) +def make_wide_filesystem(count=100000): + + shutil.rmtree("stress_test") + os.mkdir("stress_test") + for _ in range(count): + new_path = "stress_test/" + random.choice(terms) + if not os.path.exists(new_path): + os.mkdir(new_path) + + # dump_local_filesystem("/mnt/") # index_file_list("random_dump.json", 1000) # random_searches(100000) # dump_random_files(20000 * 100000) +make_wide_filesystem(10000)