mirror of
https://github.com/simon987/od-database.git
synced 2025-04-19 18:36:44 +00:00
220 lines
6.8 KiB
Python
220 lines
6.8 KiB
Python
import os
|
|
import ujson
|
|
from urllib.parse import urlparse, urljoin
|
|
from threading import Thread
|
|
from queue import Queue, Empty
|
|
from crawl_server import logger
|
|
from pybloom_live import ScalableBloomFilter
|
|
|
|
|
|
class TooManyConnectionsError(Exception):
|
|
pass
|
|
|
|
|
|
class File:
|
|
__slots__ = "name", "size", "mtime", "path", "is_dir"
|
|
|
|
def __init__(self, name: str, size: int, mtime: int, path: str, is_dir: bool):
|
|
self.name = name
|
|
self.size = size
|
|
self.mtime = mtime
|
|
self.path = path
|
|
self.is_dir = is_dir
|
|
|
|
def __bytes__(self):
|
|
return b"".join([
|
|
self.name.encode(),
|
|
b"D" if self.is_dir else b"F",
|
|
abs(self.size).to_bytes(6, byteorder="little"),
|
|
abs(self.mtime).to_bytes(6, byteorder="little"),
|
|
])
|
|
|
|
def to_json(self):
|
|
return ujson.dumps({
|
|
"name": self.name,
|
|
"size": self.size,
|
|
"mtime": self.mtime,
|
|
"path": self.path,
|
|
})
|
|
|
|
|
|
class RemoteDirectory:
|
|
|
|
SCHEMES = ()
|
|
|
|
def __init__(self, base_url):
|
|
self.base_url = base_url
|
|
|
|
def list_dir(self, path: str):
|
|
raise NotImplementedError
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
class RemoteDirectoryFactory:
|
|
|
|
from crawl_server.remote_ftp import FtpDirectory
|
|
from crawl_server.remote_http import HttpDirectory
|
|
DIR_ENGINES = (FtpDirectory, HttpDirectory)
|
|
|
|
@staticmethod
|
|
def get_directory(url) -> RemoteDirectory:
|
|
|
|
parsed_url = urlparse(url)
|
|
|
|
for dir_engine in RemoteDirectoryFactory.DIR_ENGINES:
|
|
if parsed_url.scheme in dir_engine.SCHEMES:
|
|
return dir_engine(url)
|
|
|
|
|
|
class CrawlResult:
|
|
|
|
def __init__(self, file_count: int, status_code: str):
|
|
self.file_count = file_count
|
|
self.status_code = status_code
|
|
|
|
|
|
class RemoteDirectoryCrawler:
|
|
|
|
MAX_TIMEOUT_RETRIES = 3
|
|
|
|
def __init__(self, url, max_threads: int):
|
|
self.url = url
|
|
self.max_threads = max_threads
|
|
self.crawled_paths = ScalableBloomFilter(error_rate=0.0001)
|
|
self.status_code = "success"
|
|
|
|
def crawl_directory(self, out_file: str) -> CrawlResult:
|
|
try:
|
|
try:
|
|
directory = RemoteDirectoryFactory.get_directory(self.url)
|
|
logger.info("Crawling directory " + self.url + " with " + str(type(directory)))
|
|
path_id, root_listing = directory.list_dir(urlparse(self.url).path)
|
|
if root_listing:
|
|
self.crawled_paths.add(path_id)
|
|
else:
|
|
logger.info("No files in root listing for " + self.url)
|
|
return CrawlResult(0, "empty")
|
|
directory.close()
|
|
except TimeoutError:
|
|
return CrawlResult(0, "Timeout during initial request")
|
|
|
|
in_q = Queue(maxsize=0)
|
|
files_q = Queue(maxsize=0)
|
|
for f in root_listing:
|
|
if f.is_dir:
|
|
in_q.put(os.path.join(f.path, f.name, ""))
|
|
else:
|
|
files_q.put(f)
|
|
|
|
threads = []
|
|
for i in range(self.max_threads):
|
|
worker = Thread(target=RemoteDirectoryCrawler._process_listings, args=(self, self.url, in_q, files_q))
|
|
threads.append(worker)
|
|
worker.start()
|
|
|
|
files_written = [] # Pass array to worker to get result
|
|
file_writer_thread = Thread(target=RemoteDirectoryCrawler._log_to_file, args=(files_q, out_file, files_written))
|
|
file_writer_thread.start()
|
|
|
|
in_q.join()
|
|
files_q.join()
|
|
logger.info("Crawling for " + self.url + " done, waiting for threads to terminate...")
|
|
|
|
# Kill threads
|
|
for _ in threads:
|
|
in_q.put(None)
|
|
for t in threads:
|
|
t.join()
|
|
files_q.put(None)
|
|
file_writer_thread.join()
|
|
|
|
return CrawlResult(files_written[0], self.status_code)
|
|
except Exception as e:
|
|
return CrawlResult(0, str(e) + " \nType:" + str(type(e)))
|
|
|
|
def _process_listings(self, url: str, in_q: Queue, files_q: Queue):
|
|
|
|
directory = RemoteDirectoryFactory.get_directory(url)
|
|
timeout_retries = 20 # If any worker threads reaches 20 retries, the whole queue is emptied
|
|
|
|
while directory:
|
|
try:
|
|
path = in_q.get(timeout=2000)
|
|
except Empty:
|
|
logger.debug("in_q is Empty")
|
|
directory.close()
|
|
break
|
|
|
|
if path is None:
|
|
break
|
|
|
|
try:
|
|
path_id, listing = directory.list_dir(path)
|
|
if len(listing) > 0 and path_id not in self.crawled_paths:
|
|
self.crawled_paths.add(path_id)
|
|
|
|
for f in listing:
|
|
if f.is_dir:
|
|
in_q.put(urljoin(f.path, f.name))
|
|
else:
|
|
files_q.put(f)
|
|
logger.debug("LISTED " + urljoin(self.url, path))
|
|
except TooManyConnectionsError:
|
|
logger.debug("Too many connections, this thread will be killed and path resubmitted")
|
|
# Kill worker and resubmit listing task
|
|
directory.close()
|
|
in_q.put(path)
|
|
# TODO: If all workers are killed the queue will never get processed and
|
|
# TODO: the crawler will be stuck forever
|
|
break
|
|
except TimeoutError:
|
|
logger.error("Directory listing timed out, " + str(timeout_retries) + " retries left")
|
|
if timeout_retries > 0:
|
|
timeout_retries -= 1
|
|
in_q.put(path)
|
|
else:
|
|
logger.error("Dropping website " + url)
|
|
self.status_code = "Timeout during website listing"
|
|
directory.close()
|
|
|
|
logger.debug("Emptying queue")
|
|
while True:
|
|
try:
|
|
in_q.get_nowait()
|
|
in_q.task_done()
|
|
except Empty:
|
|
break
|
|
logger.debug("Emptied queue")
|
|
break
|
|
finally:
|
|
in_q.task_done()
|
|
|
|
@staticmethod
|
|
def _log_to_file(files_q: Queue, out_file: str, files_written: list):
|
|
|
|
counter = 0
|
|
|
|
with open(out_file, "w") as f:
|
|
while True:
|
|
|
|
try:
|
|
file = files_q.get(timeout=2000)
|
|
except Empty:
|
|
logger.error("File writer thread timed out")
|
|
break
|
|
|
|
if file is None:
|
|
break
|
|
|
|
f.write(file.to_json() + "\n")
|
|
counter += 1
|
|
files_q.task_done()
|
|
|
|
files_written.append(counter)
|
|
logger.info("File writer thread done")
|
|
|
|
|
|
|