mirror of
https://github.com/simon987/od-database.git
synced 2025-04-19 18:36:44 +00:00
Cleanup of custom crawler
This commit is contained in:
parent
f2d914060b
commit
b649b82854
@ -5,6 +5,7 @@ from queue import Queue
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import ftputil
|
import ftputil
|
||||||
|
import ftputil.error
|
||||||
import random
|
import random
|
||||||
|
|
||||||
|
|
||||||
@ -16,6 +17,7 @@ class File:
|
|||||||
self.mtime = mtime
|
self.mtime = mtime
|
||||||
self.path = path
|
self.path = path
|
||||||
self.is_dir = is_dir
|
self.is_dir = is_dir
|
||||||
|
self.ftp = None
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return ("DIR " if self.is_dir else "FILE ") + self.path + "/" + self.name
|
return ("DIR " if self.is_dir else "FILE ") + self.path + "/" + self.name
|
||||||
@ -25,29 +27,37 @@ class FTPConnection(object):
|
|||||||
def __init__(self, host):
|
def __init__(self, host):
|
||||||
self.host = host
|
self.host = host
|
||||||
self.failed_attempts = 0
|
self.failed_attempts = 0
|
||||||
self.max_attempts = 5
|
self.max_attempts = 2
|
||||||
|
self.ftp = None
|
||||||
self.stop_when_connected()
|
self.stop_when_connected()
|
||||||
self._list_fn = None
|
|
||||||
|
|
||||||
def _connect(self):
|
def _connect(self):
|
||||||
# attempt an anonymous FTP connection
|
print("Connecting to " + self.host)
|
||||||
print("CONNECT %s ATTEMPT", self.host)
|
|
||||||
self.ftp = ftputil.FTPHost(self.host, "anonymous", "od-database")
|
self.ftp = ftputil.FTPHost(self.host, "anonymous", "od-database")
|
||||||
print("CONNECT %s SUCCESS", self.host)
|
|
||||||
|
|
||||||
def stop_when_connected(self):
|
def stop_when_connected(self):
|
||||||
# continually tries to reconnect ad infinitum
|
while self.failed_attempts < self.max_attempts:
|
||||||
# TODO: Max retries
|
|
||||||
try:
|
try:
|
||||||
self._connect()
|
self._connect()
|
||||||
except Exception:
|
self.failed_attempts = 0
|
||||||
print("CONNECT %s FAILED; trying again...", self.host)
|
break
|
||||||
time.sleep(5 * random.uniform(0.5, 1.5))
|
except ftputil.error.FTPError as e:
|
||||||
|
|
||||||
|
if e.errno == 530:
|
||||||
|
print("Cancel connection - too many connections")
|
||||||
|
break
|
||||||
|
|
||||||
|
self.failed_attempts += 1
|
||||||
|
print("LIST FAILED; reconnecting...")
|
||||||
|
time.sleep(2 * random.uniform(0.5, 1.5))
|
||||||
self.stop_when_connected()
|
self.stop_when_connected()
|
||||||
|
|
||||||
def list(self, path) -> list:
|
def list_dir(self, path) -> list:
|
||||||
|
if not self.ftp:
|
||||||
|
return []
|
||||||
results = []
|
results = []
|
||||||
self.ftp.chdir(path)
|
self.ftp.chdir(path)
|
||||||
|
try:
|
||||||
file_names = self.ftp.listdir(path)
|
file_names = self.ftp.listdir(path)
|
||||||
|
|
||||||
for file_name in file_names:
|
for file_name in file_names:
|
||||||
@ -61,58 +71,46 @@ class FTPConnection(object):
|
|||||||
is_dir=is_dir,
|
is_dir=is_dir,
|
||||||
path=path
|
path=path
|
||||||
))
|
))
|
||||||
|
except ftputil.error.FTPError:
|
||||||
|
print("ERROR parsing " + path)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def process_path(self, path):
|
|
||||||
while self.failed_attempts < self.max_attempts:
|
|
||||||
try:
|
|
||||||
results = self.list(path)
|
|
||||||
self.failed_attempts = 0
|
|
||||||
return results
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
self.failed_attempts += 1
|
|
||||||
self.ftp.close()
|
|
||||||
print("LIST FAILED; reconnecting...")
|
|
||||||
time.sleep(2 * random.uniform(0.5, 1.5))
|
|
||||||
self.stop_when_connected()
|
|
||||||
|
|
||||||
# if I get here, I never succeeded in getting the data
|
|
||||||
print("LIST ABANDONED %s", path)
|
|
||||||
self.failed_attempts = 0
|
|
||||||
return []
|
|
||||||
|
|
||||||
|
|
||||||
def process_and_queue(host, q: Queue):
|
def process_and_queue(host, q: Queue):
|
||||||
|
|
||||||
ftp = FTPConnection(host)
|
ftp = FTPConnection(host)
|
||||||
|
|
||||||
while True:
|
while ftp.ftp:
|
||||||
file = q.get()
|
file = q.get()
|
||||||
|
|
||||||
if file.is_dir:
|
if file.is_dir:
|
||||||
print(file)
|
print(file)
|
||||||
listing = ftp.process_path(os.path.join(file.path, file.name))
|
try:
|
||||||
|
listing = ftp.list_dir(os.path.join(file.path, file.name))
|
||||||
for f in listing:
|
for f in listing:
|
||||||
q.put(f)
|
q.put(f)
|
||||||
|
except ftputil.error.PermanentError as e:
|
||||||
|
if e.errno == 530:
|
||||||
|
# Too many connections, retry this dir but kill this thread
|
||||||
|
q.put(file)
|
||||||
|
ftp.ftp.close()
|
||||||
|
print("Dropping connection because too many")
|
||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
q.task_done()
|
q.task_done()
|
||||||
|
|
||||||
|
|
||||||
def do_the_thing():
|
def crawl_ftp_server(host: str, max_threads: int) -> list:
|
||||||
|
|
||||||
host = "80.252.155.68"
|
|
||||||
ftp = FTPConnection(host)
|
ftp = FTPConnection(host)
|
||||||
root_listing = ftp.process_path("/")
|
root_listing = ftp.list_dir("/")
|
||||||
|
if ftp.ftp:
|
||||||
ftp.ftp.close()
|
ftp.ftp.close()
|
||||||
|
|
||||||
q = Queue(maxsize=0)
|
q = Queue(maxsize=0)
|
||||||
num_threads = 10
|
for i in range(max_threads):
|
||||||
|
|
||||||
for i in range(num_threads):
|
|
||||||
worker = Thread(target=process_and_queue, args=(host, q,))
|
worker = Thread(target=process_and_queue, args=(host, q,))
|
||||||
worker.setDaemon(True)
|
worker.setDaemon(True)
|
||||||
worker.start()
|
worker.start()
|
||||||
@ -121,7 +119,9 @@ def do_the_thing():
|
|||||||
q.put(file)
|
q.put(file)
|
||||||
|
|
||||||
q.join()
|
q.join()
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
do_the_thing()
|
import sys
|
||||||
|
crawl_ftp_server(sys.argv[1], 50)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user