Should fix memory usage problem when crawling (part two)

This commit is contained in:
Simon 2018-06-16 14:53:48 -04:00
parent adb94cf326
commit 9d0a0a8b42
3 changed files with 47 additions and 49 deletions

View File

@ -1,7 +1,5 @@
import os
import logging
import ujson
import logging
from urllib.parse import urlparse
from timeout_decorator.timeout_decorator import TimeoutError
from threading import Thread
@ -93,14 +91,14 @@ class RemoteDirectoryCrawler:
def __init__(self, url, max_threads: int):
self.url = url
self.max_threads = max_threads
self.crawled_paths = set()
self.crawled_paths = list()
def crawl_directory(self, out_file: str) -> CrawlResult:
try:
directory = RemoteDirectoryFactory.get_directory(self.url)
root_listing = directory.list_dir("")
self.crawled_paths.add("")
self.crawled_paths.append("")
directory.close()
except TimeoutError:
return CrawlResult(0, "timeout")
@ -109,7 +107,7 @@ class RemoteDirectoryCrawler:
files_q = Queue(maxsize=0)
for f in root_listing:
if f.is_dir:
in_q.put(f)
in_q.put(os.path.join(f.path, f.name, ""))
else:
files_q.put(f)
@ -143,41 +141,41 @@ class RemoteDirectoryCrawler:
timeout_retries = RemoteDirectoryCrawler.MAX_TIMEOUT_RETRIES
while directory:
try:
file = in_q.get(timeout=60)
path = in_q.get(timeout=60)
except Empty:
directory.close()
break
if file is None:
if path is None:
break
try:
path = os.path.join(file.path, file.name, "")
if path not in self.crawled_paths:
self.crawled_paths.add(path)
self.crawled_paths.append(path)
listing = directory.list_dir(path)
timeout_retries = RemoteDirectoryCrawler.MAX_TIMEOUT_RETRIES
for f in listing:
if f.is_dir:
in_q.put(f)
in_q.put(os.path.join(f.path, f.name, ""))
else:
files_q.put(f)
print("LISTED " + repr(path) + "dirs:" + str(in_q.qsize()))
except TooManyConnectionsError:
print("Too many connections")
# Kill worker and resubmit listing task
directory.close()
in_q.put(file)
in_q.put(path)
break
except TimeoutError:
if timeout_retries > 0:
timeout_retries -= 1
# TODO: Remove debug info
print("TIMEOUT, " + str(timeout_retries) + " retries left")
in_q.put(file)
in_q.put(path)
else:
print("Dropping listing for " + os.path.join(file.path, file.name, ""))
print("Dropping listing for " + path)
finally:
in_q.task_done()
@ -190,7 +188,7 @@ class RemoteDirectoryCrawler:
while True:
try:
file = files_q.get(timeout=30)
file = files_q.get(timeout=240)
except Empty:
break
@ -202,6 +200,7 @@ class RemoteDirectoryCrawler:
files_q.task_done()
files_written.append(counter)
print("File writer done")

View File

@ -11,13 +11,6 @@ import config
from dateutil.parser import parse as parse_date
class Link:
def __init__(self, text: str, url: str):
self.text = text
self.url = url
class HttpDirectory(RemoteDirectory):
SCHEMES = ("http", "https",)
@ -37,6 +30,8 @@ class HttpDirectory(RemoteDirectory):
def __init__(self, url):
super().__init__(url)
self.parser = etree.HTMLParser(collect_ids=False, encoding='utf-8')
self.session = requests.Session()
self.session.headers = HttpDirectory.HEADERS
def list_dir(self, path) -> list:
results = []
@ -50,20 +45,19 @@ class HttpDirectory(RemoteDirectory):
urls_to_request = []
for link in links:
if self._should_ignore(link):
if self._should_ignore(self.base_url, link):
continue
file_url = urljoin(path_url, link.url)
file_url = urljoin(path_url, link[1])
path, file_name = os.path.split(file_url[len(self.base_url) - 1:])
if self._isdir(link):
results.append(File(
name=file_name,
mtime=0,
size=-1,
is_dir=True,
path=path
mtime=None,
size=None,
path=path,
is_dir=True
))
else:
urls_to_request.append(file_url)
@ -74,44 +68,40 @@ class HttpDirectory(RemoteDirectory):
def request_files(self, urls_to_request: list) -> list:
results = []
if len(urls_to_request) > 4:
if len(urls_to_request) > 30:
# Many urls, use multi-threaded solution
pool = ThreadPool(processes=10)
files = pool.starmap(HttpDirectory._request_file, zip(repeat(self), urls_to_request))
pool.close()
for file in files:
if file:
results.append(file)
return [f for f in files if f]
else:
# Too few urls to create thread pool
results = []
for url in urls_to_request:
file = self._request_file(url)
if file:
results.append(file)
return results
return results
def _get_url(self, path: str):
return urljoin(self.base_url, path)
@staticmethod
def _fetch_body(url: str):
def _fetch_body(self, url: str):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
r = requests.get(url, headers=HttpDirectory.HEADERS)
return r.text
r = self.session.get(url)
return r.content
except RequestException:
retries -= 1
return None
def _parse_links(self, body: bytes) -> set:
def _parse_links(self, body: bytes) -> list:
result = set()
result = list()
tree = etree.HTML(body, parser=self.parser)
links = []
try:
@ -120,25 +110,25 @@ class HttpDirectory(RemoteDirectory):
pass
for link in links:
result.add(Link(link.text, link.get("href")))
result.append((link.text, link.get("href")))
return result
@staticmethod
def _isdir(link: Link):
return link.url.rsplit("?", maxsplit=1)[0].endswith("/")
def _isdir(link: tuple):
return link[1].rsplit("?", maxsplit=1)[0].endswith("/")
def _request_file(self, url):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
r = requests.head(url, headers=HttpDirectory.HEADERS, allow_redirects=False, timeout=50)
r = self.session.head(url, allow_redirects=False, timeout=50)
stripped_url = url[len(self.base_url) - 1:]
path, name = os.path.split(stripped_url)
date = r.headers["Date"] if "Date" in r.headers else "1970-01-01"
date = r.headers["Last-Modified"] if "Last-Modified" in r.headers else "1970-01-01"
return File(
path=unquote(path).strip("/"),
name=unquote(name),
@ -152,6 +142,14 @@ class HttpDirectory(RemoteDirectory):
return None
@staticmethod
def _should_ignore(link: Link):
return link.text == "../" or link.url.endswith(HttpDirectory.BLACK_LIST)
def _should_ignore(base_url, link: tuple):
if link[0] == "../" or link[1].endswith(HttpDirectory.BLACK_LIST):
return True
# Ignore external links
if link[1].startswith("http") and not link[1].startswith(base_url):
return True
def close(self):
self.session.close()

View File

@ -77,6 +77,7 @@ class TaskManager:
task_result, db_path, current_tasks = result.result()
except Exception as e:
print("Exception during task " + str(e))
return
print(task_result.status_code)
print(task_result.file_count)