Simplified url joining and splitting, switched from lxml to html.parser, various memory usage optimizations

This commit is contained in:
Simon 2018-06-17 22:10:46 -04:00
parent 07d51a75cc
commit 344e7274d7
6 changed files with 136 additions and 101 deletions

View File

@ -11,6 +11,7 @@ class TooManyConnectionsError(Exception):
class File: class File:
__slots__ = "name", "size", "mtime", "path", "is_dir"
def __init__(self, name: str, size: int, mtime: int, path: str, is_dir: bool): def __init__(self, name: str, size: int, mtime: int, path: str, is_dir: bool):
self.name = name self.name = name
@ -61,22 +62,6 @@ class RemoteDirectoryFactory:
return dir_engine(url) return dir_engine(url)
def export_to_json(q: Queue, out_file: str) -> int:
counter = 0
with open(out_file, "w") as f:
while True:
try:
next_file = q.get_nowait()
f.write(next_file.to_json() + "\n")
counter += 1
except Empty:
break
return counter
class CrawlResult: class CrawlResult:
def __init__(self, file_count: int, status_code: str): def __init__(self, file_count: int, status_code: str):
@ -95,6 +80,8 @@ class RemoteDirectoryCrawler:
def crawl_directory(self, out_file: str) -> CrawlResult: def crawl_directory(self, out_file: str) -> CrawlResult:
import gc
gc.set_debug(gc.DEBUG_LEAK)
try: try:
directory = RemoteDirectoryFactory.get_directory(self.url) directory = RemoteDirectoryFactory.get_directory(self.url)
root_listing = directory.list_dir("") root_listing = directory.list_dir("")
@ -133,6 +120,7 @@ class RemoteDirectoryCrawler:
files_q.put(None) files_q.put(None)
file_writer_thread.join() file_writer_thread.join()
return CrawlResult(files_written[0], "success") return CrawlResult(files_written[0], "success")
def _process_listings(self, url: str, in_q: Queue, files_q: Queue): def _process_listings(self, url: str, in_q: Queue, files_q: Queue):
@ -161,6 +149,7 @@ class RemoteDirectoryCrawler:
in_q.put(os.path.join(f.path, f.name, "")) in_q.put(os.path.join(f.path, f.name, ""))
else: else:
files_q.put(f) files_q.put(f)
import sys
print("LISTED " + repr(path) + "dirs:" + str(in_q.qsize())) print("LISTED " + repr(path) + "dirs:" + str(in_q.qsize()))
except TooManyConnectionsError: except TooManyConnectionsError:
print("Too many connections") print("Too many connections")

View File

@ -1,7 +1,7 @@
from urllib.parse import urljoin, unquote from urllib.parse import urljoin, unquote, quote
import os import os
from lxml import etree from html.parser import HTMLParser
from itertools import repeat from itertools import repeat
from crawl_server.crawler import RemoteDirectory, File from crawl_server.crawler import RemoteDirectory, File
import requests import requests
@ -11,6 +11,45 @@ import config
from dateutil.parser import parse as parse_date from dateutil.parser import parse as parse_date
class Anchor:
def __init__(self):
self.text = None
self.href = None
class HTMLAnchorParser(HTMLParser):
def __init__(self):
super().__init__()
self.anchors = []
self.current_anchor = None
def handle_starttag(self, tag, attrs):
if tag == "a":
for attr in attrs:
if attr[0] == "href":
self.current_anchor = Anchor()
self.current_anchor.href = attr[1]
break
def handle_data(self, data):
if self.current_anchor:
self.current_anchor.text = data
def handle_endtag(self, tag):
if tag == "a":
if self.current_anchor:
self.anchors.append(self.current_anchor)
self.current_anchor = None
def error(self, message):
pass
def feed(self, data):
self.anchors.clear()
super().feed(data)
class HttpDirectory(RemoteDirectory): class HttpDirectory(RemoteDirectory):
SCHEMES = ("http", "https",) SCHEMES = ("http", "https",)
@ -29,42 +68,39 @@ class HttpDirectory(RemoteDirectory):
def __init__(self, url): def __init__(self, url):
super().__init__(url) super().__init__(url)
self.parser = etree.HTMLParser(collect_ids=False, encoding='utf-8')
self.session = requests.Session() self.session = requests.Session()
self.session.headers = HttpDirectory.HEADERS self.session.headers = HttpDirectory.HEADERS
self.session.verify = False
self.session.max_redirects = 1
def list_dir(self, path) -> list: def list_dir(self, path):
results = []
path_url = os.path.join(self.base_url, path.strip("/"), "") path_url = self.base_url + path.strip("/") + "/"
body, encoding = self._fetch_body(path_url) body = self._stream_body(path_url)
if not body: if not body:
return [] return None
links = self._parse_links(body, encoding) anchors = self._parse_links(body)
urls_to_request = [] urls_to_request = []
for link in links: for anchor in anchors:
if self._should_ignore(self.base_url, link): if self._should_ignore(self.base_url, anchor):
continue continue
file_url = urljoin(path_url, link[1]) if self._isdir(anchor):
path, file_name = os.path.split(file_url[len(self.base_url) - 1:]) yield File(
name=anchor.href,
if self._isdir(link):
results.append(File(
name=file_name,
mtime=None, mtime=None,
size=None, size=None,
path=path, path=path,
is_dir=True is_dir=True
)) )
else: else:
urls_to_request.append(file_url) pass
urls_to_request.append(path_url + anchor.href)
results.extend(self.request_files(urls_to_request)) for file in self.request_files(urls_to_request):
yield file
return results
def request_files(self, urls_to_request: list) -> list: def request_files(self, urls_to_request: list) -> list:
@ -73,61 +109,13 @@ class HttpDirectory(RemoteDirectory):
pool = ThreadPool(processes=10) pool = ThreadPool(processes=10)
files = pool.starmap(HttpDirectory._request_file, zip(repeat(self), urls_to_request)) files = pool.starmap(HttpDirectory._request_file, zip(repeat(self), urls_to_request))
pool.close() pool.close()
return [f for f in files if f] return (f for f in files if f)
else: else:
# Too few urls to create thread pool # Too few urls to create thread pool
results = []
for url in urls_to_request: for url in urls_to_request:
file = self._request_file(url) file = self._request_file(url)
if file: if file:
results.append(file) yield file
return results
def _get_url(self, path: str):
return urljoin(self.base_url, path)
def _fetch_body(self, url: str):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
r = self.session.get(url, timeout=40)
return r.content, r.encoding
except RequestException:
retries -= 1
return None
def _parse_links(self, body: bytes, encoding) -> list:
result = list()
try:
tree = etree.HTML(body, parser=self.parser)
links = []
try:
links = tree.findall(".//a/[@href]")
except AttributeError:
pass
for link in links:
result.append((link.text, link.get("href")))
except UnicodeDecodeError:
tree = etree.HTML(body.decode(encoding, errors="ignore").encode("utf-8"), parser=self.parser)
links = []
try:
links = tree.findall(".//a/[@href]")
except AttributeError:
pass
for link in links:
result.append((link.text, link.get("href")))
return result
@staticmethod
def _isdir(link: tuple):
return link[1].rsplit("?", maxsplit=1)[0].endswith("/")
def _request_file(self, url): def _request_file(self, url):
@ -148,19 +136,52 @@ class HttpDirectory(RemoteDirectory):
is_dir=False is_dir=False
) )
except RequestException: except RequestException:
self.session.close()
retries -= 1
return None
def _stream_body(self, url: str):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
r = self.session.get(url, stream=True, timeout=40)
for chunk in r.iter_content(chunk_size=4096):
yield chunk
r.close()
del r
break
except RequestException:
self.session.close()
retries -= 1 retries -= 1
return None return None
@staticmethod @staticmethod
def _should_ignore(base_url, link: tuple): def _parse_links(body):
if link[0] == "../" or link[1].endswith(HttpDirectory.BLACK_LIST):
parser = HTMLAnchorParser()
for chunk in body:
parser.feed(chunk.decode("utf-8"))
for anchor in parser.anchors:
yield anchor
@staticmethod
def _isdir(link: Anchor):
return link.href.endswith("/")
@staticmethod
def _should_ignore(base_url, link: Anchor):
if link.text == "../" or link.href.endswith(HttpDirectory.BLACK_LIST):
return True return True
# Ignore external links # Ignore external links
if link[1].startswith("http") and not link[1].startswith(base_url): if link.href.startswith("http") and not link.href.startswith(base_url):
return True return True
def close(self): def close(self):
self.session.close() self.session.close()

View File

@ -1,6 +1,5 @@
from crawl_server.database import TaskManagerDatabase, Task, TaskResult from crawl_server.database import TaskManagerDatabase, Task, TaskResult
from concurrent.futures import ProcessPoolExecutor from multiprocessing import Manager, Pool
from multiprocessing import Manager
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime from datetime import datetime
from crawl_server.crawler import RemoteDirectoryCrawler from crawl_server.crawler import RemoteDirectoryCrawler
@ -12,7 +11,7 @@ class TaskManager:
def __init__(self, db_path, max_processes=2): def __init__(self, db_path, max_processes=2):
self.db_path = db_path self.db_path = db_path
self.db = TaskManagerDatabase(db_path) self.db = TaskManagerDatabase(db_path)
self.pool = ProcessPoolExecutor(max_workers=max_processes) self.pool = Pool(maxtasksperchild=1, processes=max_processes)
self.max_processes = max_processes self.max_processes = max_processes
manager = Manager() manager = Manager()
self.current_tasks = manager.list() self.current_tasks = manager.list()
@ -41,21 +40,28 @@ class TaskManager:
print("pooled " + task.url) print("pooled " + task.url)
self.current_tasks.append(task) self.current_tasks.append(task)
self.pool.submit( self.pool.apply_async(
TaskManager.run_task, TaskManager.run_task,
task, self.db_path, self.current_tasks args=(task, self.db_path, self.current_tasks),
).add_done_callback(TaskManager.task_complete) callback=TaskManager.task_complete,
error_callback=TaskManager.task_error
)
@staticmethod @staticmethod
def run_task(task, db_path, current_tasks): def run_task(task, db_path, current_tasks):
# import gc
# gc.set_debug(gc.DEBUG_LEAK)
result = TaskResult() result = TaskResult()
result.start_time = datetime.utcnow() result.start_time = datetime.utcnow()
result.website_id = task.website_id result.website_id = task.website_id
print("Starting task " + task.url) print("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, 10) crawler = RemoteDirectoryCrawler(task.url, 20)
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
del crawler
result.file_count = crawl_result.file_count result.file_count = crawl_result.file_count
result.status_code = crawl_result.status_code result.status_code = crawl_result.status_code
@ -70,11 +76,16 @@ class TaskManager:
return result, db_path, current_tasks return result, db_path, current_tasks
@staticmethod
def task_error(result):
print("TASK ERROR")
raise result
@staticmethod @staticmethod
def task_complete(result): def task_complete(result):
try: try:
task_result, db_path, current_tasks = result.result() task_result, db_path, current_tasks = result
except Exception as e: except Exception as e:
print("Exception during task " + str(e)) print("Exception during task " + str(e))
return return
@ -92,3 +103,5 @@ class TaskManager:
if task.website_id == task_result.website_id: if task.website_id == task_result.website_id:
del current_tasks[i] del current_tasks[i]

View File

@ -4,8 +4,9 @@ import json
payload = json.dumps({ payload = json.dumps({
"website_id": 123, "website_id": 123,
"url": "http://alphamediazone.com/data/Movies1/", # "url": "http://alphamediazone.com/data/Movies1/",
# "url": "http://localhost:8000/", # "url": "http://localhost:8000/",
"url": "http://ubuntu.mirrorservice.org/",
"priority": 2, "priority": 2,
"callback_type": "", "callback_type": "",
"callback_args": "{}" "callback_args": "{}"

View File

@ -10,7 +10,6 @@ humanfriendly
apscheduler apscheduler
bcrypt bcrypt
ftputil ftputil
lxml
elasticsearch elasticsearch
python-dateutil python-dateutil
flask_httpauth flask_httpauth

View File

@ -1,5 +1,6 @@
import os import os
import json import json
import shutil
import sys import sys
from search.search import ElasticSearchEngine from search.search import ElasticSearchEngine
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
@ -80,7 +81,18 @@ def random_searches(count=10000000, max_workers=1000):
pool.map(search, random.choices(terms, k=count)) pool.map(search, random.choices(terms, k=count))
def make_wide_filesystem(count=100000):
shutil.rmtree("stress_test")
os.mkdir("stress_test")
for _ in range(count):
new_path = "stress_test/" + random.choice(terms)
if not os.path.exists(new_path):
os.mkdir(new_path)
# dump_local_filesystem("/mnt/") # dump_local_filesystem("/mnt/")
# index_file_list("random_dump.json", 1000) # index_file_list("random_dump.json", 1000)
# random_searches(100000) # random_searches(100000)
# dump_random_files(20000 * 100000) # dump_random_files(20000 * 100000)
make_wide_filesystem(10000)