diff --git a/ftp_crawler.py b/ftp_crawler.py index 1587d94..d49f0b7 100644 --- a/ftp_crawler.py +++ b/ftp_crawler.py @@ -5,6 +5,7 @@ from queue import Queue import os import time import ftputil +import ftputil.error import random @@ -16,6 +17,7 @@ class File: self.mtime = mtime self.path = path self.is_dir = is_dir + self.ftp = None def __str__(self): return ("DIR " if self.is_dir else "FILE ") + self.path + "/" + self.name @@ -25,94 +27,90 @@ class FTPConnection(object): def __init__(self, host): self.host = host self.failed_attempts = 0 - self.max_attempts = 5 + self.max_attempts = 2 + self.ftp = None self.stop_when_connected() - self._list_fn = None def _connect(self): - # attempt an anonymous FTP connection - print("CONNECT %s ATTEMPT", self.host) + print("Connecting to " + self.host) self.ftp = ftputil.FTPHost(self.host, "anonymous", "od-database") - print("CONNECT %s SUCCESS", self.host) def stop_when_connected(self): - # continually tries to reconnect ad infinitum - # TODO: Max retries - try: - self._connect() - except Exception: - print("CONNECT %s FAILED; trying again...", self.host) - time.sleep(5 * random.uniform(0.5, 1.5)) - self.stop_when_connected() - - def list(self, path) -> list: - results = [] - self.ftp.chdir(path) - file_names = self.ftp.listdir(path) - - for file_name in file_names: - stat = self.ftp.stat(file_name) - is_dir = self.ftp.path.isdir(os.path.join(path, file_name)) - - results.append(File( - name=file_name, - mtime=stat.st_mtime, - size=-1 if is_dir else stat.st_size, - is_dir=is_dir, - path=path - )) - - return results - - def process_path(self, path): while self.failed_attempts < self.max_attempts: try: - results = self.list(path) + self._connect() self.failed_attempts = 0 - return results - except Exception as e: - print(e) + break + except ftputil.error.FTPError as e: + + if e.errno == 530: + print("Cancel connection - too many connections") + break + self.failed_attempts += 1 - self.ftp.close() print("LIST FAILED; reconnecting...") time.sleep(2 * random.uniform(0.5, 1.5)) self.stop_when_connected() - # if I get here, I never succeeded in getting the data - print("LIST ABANDONED %s", path) - self.failed_attempts = 0 - return [] + def list_dir(self, path) -> list: + if not self.ftp: + return [] + results = [] + self.ftp.chdir(path) + try: + file_names = self.ftp.listdir(path) + + for file_name in file_names: + stat = self.ftp.stat(file_name) + is_dir = self.ftp.path.isdir(os.path.join(path, file_name)) + + results.append(File( + name=file_name, + mtime=stat.st_mtime, + size=-1 if is_dir else stat.st_size, + is_dir=is_dir, + path=path + )) + except ftputil.error.FTPError: + print("ERROR parsing " + path) + + return results def process_and_queue(host, q: Queue): ftp = FTPConnection(host) - while True: + while ftp.ftp: file = q.get() if file.is_dir: print(file) - listing = ftp.process_path(os.path.join(file.path, file.name)) - for f in listing: - q.put(f) + try: + listing = ftp.list_dir(os.path.join(file.path, file.name)) + for f in listing: + q.put(f) + except ftputil.error.PermanentError as e: + if e.errno == 530: + # Too many connections, retry this dir but kill this thread + q.put(file) + ftp.ftp.close() + print("Dropping connection because too many") else: pass q.task_done() -def do_the_thing(): +def crawl_ftp_server(host: str, max_threads: int) -> list: - host = "80.252.155.68" ftp = FTPConnection(host) - root_listing = ftp.process_path("/") - ftp.ftp.close() + root_listing = ftp.list_dir("/") + if ftp.ftp: + ftp.ftp.close() q = Queue(maxsize=0) - num_threads = 10 - - for i in range(num_threads): + for i in range(max_threads): worker = Thread(target=process_and_queue, args=(host, q,)) worker.setDaemon(True) worker.start() @@ -121,7 +119,9 @@ def do_the_thing(): q.put(file) q.join() + return [] if __name__ == '__main__': - do_the_thing() + import sys + crawl_ftp_server(sys.argv[1], 50)