Bug fixes for FTP crawler

This commit is contained in:
Simon 2018-06-13 15:54:45 -04:00
parent 9bde8cb629
commit 1bd58468eb
5 changed files with 45 additions and 12 deletions

View File

@ -86,6 +86,8 @@ class CrawlResult:
class RemoteDirectoryCrawler: class RemoteDirectoryCrawler:
MAX_TIMEOUT_RETRIES = 3
def __init__(self, url, max_threads: int): def __init__(self, url, max_threads: int):
self.url = url self.url = url
self.max_threads = max_threads self.max_threads = max_threads
@ -132,6 +134,7 @@ class RemoteDirectoryCrawler:
def _process_listings(self, url: str, in_q: Queue, files_q: Queue): def _process_listings(self, url: str, in_q: Queue, files_q: Queue):
directory = RemoteDirectoryFactory.get_directory(url) directory = RemoteDirectoryFactory.get_directory(url)
timeout_retries = RemoteDirectoryCrawler.MAX_TIMEOUT_RETRIES
while directory: while directory:
@ -148,6 +151,7 @@ class RemoteDirectoryCrawler:
if path not in self.crawled_paths: if path not in self.crawled_paths:
self.crawled_paths.add(path) self.crawled_paths.add(path)
listing = directory.list_dir(path) listing = directory.list_dir(path)
timeout_retries = RemoteDirectoryCrawler.MAX_TIMEOUT_RETRIES
for f in listing: for f in listing:
if f.is_dir: if f.is_dir:
@ -156,8 +160,18 @@ class RemoteDirectoryCrawler:
files_q.put(f) files_q.put(f)
except TooManyConnectionsError: except TooManyConnectionsError:
print("Too many connections") print("Too many connections")
# Kill worker and resubmit listing task
directory.close()
in_q.put(file)
break
except TimeoutError: except TimeoutError:
pass if timeout_retries > 0:
timeout_retries -= 1
# TODO: Remove debug info
print("TIMEOUT, " + str(timeout_retries) + " retries left")
in_q.put(file)
else:
print("Dropping listing for " + os.path.join(file.path, file.name, ""))
finally: finally:
in_q.task_done() in_q.task_done()

View File

@ -41,39 +41,58 @@ class FtpDirectory(RemoteDirectory):
break break
self.failed_attempts += 1 self.failed_attempts += 1
print("Connection error; reconnecting...") print("Connection error; reconnecting..." + e.strerror + " " + str(e.errno))
time.sleep(2 * random.uniform(0.5, 1.5)) time.sleep(2 * random.uniform(0.5, 1.5))
self.stop_when_connected() self.stop_when_connected()
@timeout_decorator.timeout(15, use_signals=False) @timeout_decorator.timeout(60, use_signals=False)
def list_dir(self, path) -> list: def list_dir(self, path) -> list:
if not self.ftp: if not self.ftp:
print("Conn closed") # No connection - assuming that connection was dropped because too many
return [] raise TooManyConnectionsError()
print("LIST " + path)
results = [] results = []
try: try:
self.ftp.chdir(path)
file_names = self.ftp.listdir(path) file_names = self.ftp.listdir(path)
for file_name in file_names: for file_name in file_names:
stat = self.ftp.stat(file_name) stat = self.try_stat(os.path.join(path, file_name))
is_dir = self.ftp.path.isdir(os.path.join(path, file_name)) is_dir = self.ftp.path.isdir(os.path.join(path, file_name))
results.append(File( results.append(File(
name=file_name, name=file_name,
mtime=stat.st_mtime, # TODO: check mtime=stat.st_mtime,
size=-1 if is_dir else stat.st_size, size=-1 if is_dir else stat.st_size,
is_dir=is_dir, is_dir=is_dir,
path=path path=path
)) ))
except ftputil.error.ParserError as e:
print("TODO: fix parsing error: " + e.strerror + " @ " + e.file_name)
except ftputil.error.FTPError as e: except ftputil.error.FTPError as e:
if e.errno == 530: if e.errno == 530:
raise TooManyConnectionsError() raise TooManyConnectionsError()
pass print(e.strerror)
except Exception as e:
# TODO remove that debug info
print("ERROR:" + str(e))
print(type(e))
raise e
return results return results
def try_stat(self, path):
try:
return self.ftp.stat(path)
except ftputil.error.ParserError as e:
# TODO: Try to parse it ourselves?
print("Could not parse " + path + " " + e.strerror)
return None
def close(self): def close(self):
if self.ftp: if self.ftp:
self.ftp.close() self.ftp.close()
self.ftp = None

View File

@ -53,7 +53,7 @@ class TaskManager:
print("Starting task " + task.url) print("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, 100) crawler = RemoteDirectoryCrawler(task.url, 30)
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
result.file_count = crawl_result.file_count result.file_count = crawl_result.file_count

View File

@ -4,7 +4,7 @@ import json
payload = json.dumps({ payload = json.dumps({
"website_id": 123, "website_id": 123,
"url": "http://liminaire.fr/TEXTES/", "url": "ftp://ien11-3-88-183-194-246.fbx.proxad.net/",
"priority": 2, "priority": 2,
"callback_type": "", "callback_type": "",
"callback_args": "{}" "callback_args": "{}"

View File

@ -52,5 +52,5 @@ def random_searches(count=10000000, max_workers=1000):
# dump_local_filesystem("/mnt/") # dump_local_filesystem("/mnt/")
# index_file_list("local_filesystem.json", 10) index_file_list("crawl_server/crawled/123.json", 10)
# random_searches(100000) # random_searches(100000)