diff --git a/crawl_server/crawler.py b/crawl_server/crawler.py index 6a637a1..6854c7f 100644 --- a/crawl_server/crawler.py +++ b/crawl_server/crawler.py @@ -82,6 +82,7 @@ class RemoteDirectoryCrawler: self.url = url self.max_threads = max_threads self.crawled_paths = list() + self.status_code = "success" def crawl_directory(self, out_file: str) -> CrawlResult: try: @@ -96,7 +97,7 @@ class RemoteDirectoryCrawler: return CrawlResult(0, "empty") directory.close() except TimeoutError: - return CrawlResult(0, "timeout") + return CrawlResult(0, "Timeout during initial request") in_q = Queue(maxsize=0) files_q = Queue(maxsize=0) @@ -128,18 +129,20 @@ class RemoteDirectoryCrawler: files_q.put(None) file_writer_thread.join() - return CrawlResult(files_written[0], "success") + return CrawlResult(files_written[0], self.status_code) except Exception as e: return CrawlResult(0, str(e) + " \nType:" + str(type(e))) def _process_listings(self, url: str, in_q: Queue, files_q: Queue): directory = RemoteDirectoryFactory.get_directory(url) + timeout_retries = 20 # If any worker threads reaches 20 retries, the whole queue is emptied while directory: try: path = in_q.get(timeout=150) except Empty: + logger.debug("in_q is Empty") directory.close() break @@ -156,15 +159,34 @@ class RemoteDirectoryCrawler: in_q.put(urljoin(f.path, f.name)) else: files_q.put(f) - logger.debug("LISTED " + self.url + path) - else: - logger.debug("Dropped " + self.url + path + " (was empty or already crawled)") + logger.debug("LISTED " + urljoin(self.url, path)) except TooManyConnectionsError: logger.debug("Too many connections, this thread will be killed and path resubmitted") # Kill worker and resubmit listing task directory.close() in_q.put(path) + # TODO: If all workers are killed the queue will never get processed and + # TODO: the crawler will be stuck forever break + except TimeoutError: + logger.error("Directory listing timed out, " + str(timeout_retries) + " retries left") + if timeout_retries > 0: + timeout_retries -= 1 + in_q.put(path) + else: + logger.error("Dropping website " + url) + self.status_code = "Timeout during website listing" + directory.close() + + logger.debug("Emptying queue") + while True: + try: + in_q.get_nowait() + in_q.task_done() + except Empty: + break + logger.debug("Emptied queue") + break finally: in_q.task_done() @@ -177,7 +199,7 @@ class RemoteDirectoryCrawler: while True: try: - file = files_q.get(timeout=800) + file = files_q.get(timeout=2000) except Empty: logger.error("File writer thread timed out") break diff --git a/crawl_server/remote_http.py b/crawl_server/remote_http.py index 5ee3e95..a9728c0 100644 --- a/crawl_server/remote_http.py +++ b/crawl_server/remote_http.py @@ -90,6 +90,7 @@ class HttpDirectory(RemoteDirectory): ) MAX_RETRIES = 2 + TIMEOUT = 1 def __init__(self, url): super().__init__(url) @@ -104,9 +105,6 @@ class HttpDirectory(RemoteDirectory): path_identifier = hashlib.sha1(current_dir_name.encode()) path_url = urljoin(self.base_url, path, "") body = self._stream_body(path_url) - if not body: - logger.info("No body returned @ " + path_url) - return None, None anchors = self._parse_links(body) urls_to_request = [] @@ -158,16 +156,16 @@ class HttpDirectory(RemoteDirectory): retries = HttpDirectory.MAX_RETRIES while retries > 0: try: - r = self.session.head(url, allow_redirects=False, timeout=40) + r = self.session.head(url, allow_redirects=False, timeout=HttpDirectory.TIMEOUT) stripped_url = url[len(self.base_url) - 1:] path, name = os.path.split(stripped_url) - date = r.headers["Last-Modified"] if "Last-Modified" in r.headers else "1970-01-01" + date = r.headers.get("Last-Modified", "1970-01-01") return File( path=unquote(path).strip("/"), name=unquote(name), - size=int(r.headers["Content-Length"]) if "Content-Length" in r.headers else -1, + size=int(r.headers.get("Content-Length", -1)), mtime=int(parse_date(date).timestamp()), is_dir=False ) @@ -175,26 +173,28 @@ class HttpDirectory(RemoteDirectory): self.session.close() retries -= 1 - return None + logger.debug("TimeoutError - _request_file") + raise TimeoutError def _stream_body(self, url: str): retries = HttpDirectory.MAX_RETRIES while retries > 0: try: - r = self.session.get(url, stream=True, timeout=40) - for chunk in r.iter_content(chunk_size=4096): + r = self.session.get(url, stream=True, timeout=HttpDirectory.TIMEOUT) + for chunk in r.iter_content(chunk_size=8192): try: yield chunk.decode(r.encoding if r.encoding else "utf-8", errors="ignore") except LookupError: # Unsupported encoding yield chunk.decode("utf-8", errors="ignore") r.close() - break + return except RequestException: self.session.close() retries -= 1 - return None + logger.debug("TimeoutError - _stream_body") + raise TimeoutError @staticmethod def _parse_links(body):