Very slow websites are skipped. Should fix infinite waiting bug

This commit is contained in:
Simon 2018-07-20 13:34:40 -04:00
parent 004ade8935
commit 1df5d194d2
2 changed files with 39 additions and 17 deletions

View File

@ -82,6 +82,7 @@ class RemoteDirectoryCrawler:
self.url = url
self.max_threads = max_threads
self.crawled_paths = list()
self.status_code = "success"
def crawl_directory(self, out_file: str) -> CrawlResult:
try:
@ -96,7 +97,7 @@ class RemoteDirectoryCrawler:
return CrawlResult(0, "empty")
directory.close()
except TimeoutError:
return CrawlResult(0, "timeout")
return CrawlResult(0, "Timeout during initial request")
in_q = Queue(maxsize=0)
files_q = Queue(maxsize=0)
@ -128,18 +129,20 @@ class RemoteDirectoryCrawler:
files_q.put(None)
file_writer_thread.join()
return CrawlResult(files_written[0], "success")
return CrawlResult(files_written[0], self.status_code)
except Exception as e:
return CrawlResult(0, str(e) + " \nType:" + str(type(e)))
def _process_listings(self, url: str, in_q: Queue, files_q: Queue):
directory = RemoteDirectoryFactory.get_directory(url)
timeout_retries = 20 # If any worker threads reaches 20 retries, the whole queue is emptied
while directory:
try:
path = in_q.get(timeout=150)
except Empty:
logger.debug("in_q is Empty")
directory.close()
break
@ -156,15 +159,34 @@ class RemoteDirectoryCrawler:
in_q.put(urljoin(f.path, f.name))
else:
files_q.put(f)
logger.debug("LISTED " + self.url + path)
else:
logger.debug("Dropped " + self.url + path + " (was empty or already crawled)")
logger.debug("LISTED " + urljoin(self.url, path))
except TooManyConnectionsError:
logger.debug("Too many connections, this thread will be killed and path resubmitted")
# Kill worker and resubmit listing task
directory.close()
in_q.put(path)
# TODO: If all workers are killed the queue will never get processed and
# TODO: the crawler will be stuck forever
break
except TimeoutError:
logger.error("Directory listing timed out, " + str(timeout_retries) + " retries left")
if timeout_retries > 0:
timeout_retries -= 1
in_q.put(path)
else:
logger.error("Dropping website " + url)
self.status_code = "Timeout during website listing"
directory.close()
logger.debug("Emptying queue")
while True:
try:
in_q.get_nowait()
in_q.task_done()
except Empty:
break
logger.debug("Emptied queue")
break
finally:
in_q.task_done()
@ -177,7 +199,7 @@ class RemoteDirectoryCrawler:
while True:
try:
file = files_q.get(timeout=800)
file = files_q.get(timeout=2000)
except Empty:
logger.error("File writer thread timed out")
break

View File

@ -90,6 +90,7 @@ class HttpDirectory(RemoteDirectory):
)
MAX_RETRIES = 2
TIMEOUT = 1
def __init__(self, url):
super().__init__(url)
@ -104,9 +105,6 @@ class HttpDirectory(RemoteDirectory):
path_identifier = hashlib.sha1(current_dir_name.encode())
path_url = urljoin(self.base_url, path, "")
body = self._stream_body(path_url)
if not body:
logger.info("No body returned @ " + path_url)
return None, None
anchors = self._parse_links(body)
urls_to_request = []
@ -158,16 +156,16 @@ class HttpDirectory(RemoteDirectory):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
r = self.session.head(url, allow_redirects=False, timeout=40)
r = self.session.head(url, allow_redirects=False, timeout=HttpDirectory.TIMEOUT)
stripped_url = url[len(self.base_url) - 1:]
path, name = os.path.split(stripped_url)
date = r.headers["Last-Modified"] if "Last-Modified" in r.headers else "1970-01-01"
date = r.headers.get("Last-Modified", "1970-01-01")
return File(
path=unquote(path).strip("/"),
name=unquote(name),
size=int(r.headers["Content-Length"]) if "Content-Length" in r.headers else -1,
size=int(r.headers.get("Content-Length", -1)),
mtime=int(parse_date(date).timestamp()),
is_dir=False
)
@ -175,26 +173,28 @@ class HttpDirectory(RemoteDirectory):
self.session.close()
retries -= 1
return None
logger.debug("TimeoutError - _request_file")
raise TimeoutError
def _stream_body(self, url: str):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
r = self.session.get(url, stream=True, timeout=40)
for chunk in r.iter_content(chunk_size=4096):
r = self.session.get(url, stream=True, timeout=HttpDirectory.TIMEOUT)
for chunk in r.iter_content(chunk_size=8192):
try:
yield chunk.decode(r.encoding if r.encoding else "utf-8", errors="ignore")
except LookupError:
# Unsupported encoding
yield chunk.decode("utf-8", errors="ignore")
r.close()
break
return
except RequestException:
self.session.close()
retries -= 1
return None
logger.debug("TimeoutError - _stream_body")
raise TimeoutError
@staticmethod
def _parse_links(body):