diff --git a/README.md b/README.md index a60098a..e9d9aeb 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Assuming you have Python 3 and git installed: sudo apt install libssl-dev libcurl4-openssl-dev git clone https://github.com/simon987/od-database cd od-database +git submodule update --init --recursive sudo pip3 install -r requirements.txt ``` Create `/config.py` and fill out the parameters. Sample config: @@ -34,12 +35,6 @@ CAPTCHA_S_SECRET_KEY = "" # Flask secret key for sessions FLASK_SECRET = "" RESULTS_PER_PAGE = (25, 50, 100, 250, 500, 1000) -# Headers for http crawler -HEADERS = {} -# Number of crawler instances (one per task) -CRAWL_SERVER_PROCESSES = 3 -# Number of threads per crawler instance -CRAWL_SERVER_THREADS = 20 # Allow ftp websites in /submit SUBMIT_FTP = False # Allow http(s) websites in /submit @@ -50,19 +45,16 @@ API_TOKEN = "5817926d-f2f9-4422-a411-a98f1bfe4b6c" ``` ## Running the crawl server -```bash -cd od-database -export PYTHONPATH=$(pwd) -cd crawl_server -python3 run.py -``` -## Running the web server (development) +The python crawler that was a part of this project is discontinued, +[the go implementation](https://github.com/terorie/od-database-crawler) is currently in use. + +## Running the web server (debug) ```bash cd od-database python3 app.py ``` -## Running the web server with nginx (production) +## Running the web server with Nginx (production) * Install dependencies: ```bash sudo apt install build-essential python-dev diff --git a/crawl_server/__init__.py b/crawl_server/__init__.py deleted file mode 100644 index fc75507..0000000 --- a/crawl_server/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -import logging -import sys -from logging import FileHandler, StreamHandler - -logger = logging.getLogger("default") -logger.setLevel(logging.DEBUG) - -formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') -file_handler = FileHandler("crawl_server.log") -file_handler.setFormatter(formatter) -logger.addHandler(file_handler) -logger.addHandler(StreamHandler(sys.stdout)) diff --git a/crawl_server/crawled/README.md b/crawl_server/crawled/README.md deleted file mode 100644 index 495afc7..0000000 --- a/crawl_server/crawled/README.md +++ /dev/null @@ -1 +0,0 @@ -Crawled directories are temporarily stored here \ No newline at end of file diff --git a/crawl_server/crawler.py b/crawl_server/crawler.py deleted file mode 100644 index 6e8b8d4..0000000 --- a/crawl_server/crawler.py +++ /dev/null @@ -1,219 +0,0 @@ -import os -import ujson -from urllib.parse import urlparse, urljoin -from threading import Thread -from queue import Queue, Empty -from crawl_server import logger -from pybloom_live import ScalableBloomFilter - - -class TooManyConnectionsError(Exception): - pass - - -class File: - __slots__ = "name", "size", "mtime", "path", "is_dir" - - def __init__(self, name: str, size: int, mtime: int, path: str, is_dir: bool): - self.name = name - self.size = size - self.mtime = mtime - self.path = path - self.is_dir = is_dir - - def __bytes__(self): - return b"".join([ - self.name.encode(), - b"D" if self.is_dir else b"F", - abs(self.size).to_bytes(6, byteorder="little"), - abs(self.mtime).to_bytes(6, byteorder="little"), - ]) - - def to_json(self): - return ujson.dumps({ - "name": self.name, - "size": self.size, - "mtime": self.mtime, - "path": self.path, - }) - - -class RemoteDirectory: - - SCHEMES = () - - def __init__(self, base_url): - self.base_url = base_url - - def list_dir(self, path: str): - raise NotImplementedError - - def close(self): - pass - - -class RemoteDirectoryFactory: - - from crawl_server.remote_ftp import FtpDirectory - from crawl_server.remote_http import HttpDirectory - DIR_ENGINES = (FtpDirectory, HttpDirectory) - - @staticmethod - def get_directory(url) -> RemoteDirectory: - - parsed_url = urlparse(url) - - for dir_engine in RemoteDirectoryFactory.DIR_ENGINES: - if parsed_url.scheme in dir_engine.SCHEMES: - return dir_engine(url) - - -class CrawlResult: - - def __init__(self, file_count: int, status_code: str): - self.file_count = file_count - self.status_code = status_code - - -class RemoteDirectoryCrawler: - - MAX_TIMEOUT_RETRIES = 3 - - def __init__(self, url, max_threads: int): - self.url = url - self.max_threads = max_threads - self.crawled_paths = ScalableBloomFilter(error_rate=0.0001) - self.status_code = "success" - - def crawl_directory(self, out_file: str) -> CrawlResult: - try: - try: - directory = RemoteDirectoryFactory.get_directory(self.url) - logger.info("Crawling directory " + self.url + " with " + str(type(directory))) - path_id, root_listing = directory.list_dir(urlparse(self.url).path) - if root_listing: - self.crawled_paths.add(path_id) - else: - logger.info("No files in root listing for " + self.url) - return CrawlResult(0, "empty") - directory.close() - except TimeoutError: - return CrawlResult(0, "Timeout during initial request") - - in_q = Queue(maxsize=0) - files_q = Queue(maxsize=0) - for f in root_listing: - if f.is_dir: - in_q.put(os.path.join(f.path, f.name, "")) - else: - files_q.put(f) - - threads = [] - for i in range(self.max_threads): - worker = Thread(target=RemoteDirectoryCrawler._process_listings, args=(self, self.url, in_q, files_q)) - threads.append(worker) - worker.start() - - files_written = [] # Pass array to worker to get result - file_writer_thread = Thread(target=RemoteDirectoryCrawler._log_to_file, args=(files_q, out_file, files_written)) - file_writer_thread.start() - - in_q.join() - files_q.join() - logger.info("Crawling for " + self.url + " done, waiting for threads to terminate...") - - # Kill threads - for _ in threads: - in_q.put(None) - for t in threads: - t.join() - files_q.put(None) - file_writer_thread.join() - - return CrawlResult(files_written[0], self.status_code) - except Exception as e: - return CrawlResult(0, str(e) + " \nType:" + str(type(e))) - - def _process_listings(self, url: str, in_q: Queue, files_q: Queue): - - directory = RemoteDirectoryFactory.get_directory(url) - timeout_retries = 20 # If any worker threads reaches 20 retries, the whole queue is emptied - - while directory: - try: - path = in_q.get(timeout=2000) - except Empty: - logger.debug("in_q is Empty") - directory.close() - break - - if path is None: - break - - try: - path_id, listing = directory.list_dir(path) - if len(listing) > 0 and path_id not in self.crawled_paths: - self.crawled_paths.add(path_id) - - for f in listing: - if f.is_dir: - in_q.put(urljoin(f.path, f.name)) - else: - files_q.put(f) - logger.debug("LISTED " + urljoin(self.url, path)) - except TooManyConnectionsError: - logger.debug("Too many connections, this thread will be killed and path resubmitted") - # Kill worker and resubmit listing task - directory.close() - in_q.put(path) - # TODO: If all workers are killed the queue will never get processed and - # TODO: the crawler will be stuck forever - break - except TimeoutError: - logger.error("Directory listing timed out, " + str(timeout_retries) + " retries left") - if timeout_retries > 0: - timeout_retries -= 1 - in_q.put(path) - else: - logger.error("Dropping website " + url) - self.status_code = "Timeout during website listing" - directory.close() - - logger.debug("Emptying queue") - while True: - try: - in_q.get_nowait() - in_q.task_done() - except Empty: - break - logger.debug("Emptied queue") - break - finally: - in_q.task_done() - - @staticmethod - def _log_to_file(files_q: Queue, out_file: str, files_written: list): - - counter = 0 - - with open(out_file, "w") as f: - while True: - - try: - file = files_q.get(timeout=2000) - except Empty: - logger.error("File writer thread timed out") - break - - if file is None: - break - - f.write(file.to_json() + "\n") - counter += 1 - files_q.task_done() - - files_written.append(counter) - logger.info("File writer thread done") - - - diff --git a/crawl_server/remote_ftp.py b/crawl_server/remote_ftp.py deleted file mode 100644 index 6259b83..0000000 --- a/crawl_server/remote_ftp.py +++ /dev/null @@ -1,118 +0,0 @@ -#! /usr/bin/env python -from crawl_server import logger -from urllib.parse import urlparse -import os -import time -import ftputil -import ftputil.error -from ftputil.session import session_factory -from crawl_server.crawler import RemoteDirectory, File, TooManyConnectionsError - - -class FtpDirectory(RemoteDirectory): - - SCHEMES = ("ftp", ) - - CANCEL_LISTING_CODE = ( - 550, # Forbidden - ) - - def __init__(self, url): - - host = urlparse(url).netloc - super().__init__(host) - self.max_attempts = 3 - self.ftp = None - self.stop_when_connected() - - def _connect(self): - self.ftp = ftputil.FTPHost(self.base_url, "anonymous", "od-database", session_factory=session_factory( - use_passive_mode=True - )) - self.ftp._session.timeout = 30 - - def stop_when_connected(self): - failed_attempts = 0 - while failed_attempts < self.max_attempts: - try: - self._connect() - logger.debug("New FTP connection @ " + self.base_url) - return True - except ftputil.error.FTPError as e: - - if e.errno == 530 or e.errno == 421: - break - - failed_attempts += 1 - print("Connection error; reconnecting..." + e.strerror + " " + str(e.errno)) - time.sleep(2) - return False - - def list_dir(self, path): - if not self.ftp: - # No connection - assuming that connection was dropped because too many - raise TooManyConnectionsError() - results = [] - failed_attempts = 0 - while failed_attempts < self.max_attempts: - try: - file_names = self.ftp.listdir(path) - - for file_name in file_names: - file_path = os.path.join(path, file_name) - stat = self.try_stat(file_path) - is_dir = self.ftp.path.isdir(file_path) - - results.append(File( - name=os.path.join(file_name, "") if is_dir else file_name, - mtime=stat.st_mtime, - size=-1 if is_dir else stat.st_size, - is_dir=is_dir, - path=path.strip("/") if not is_dir else path - )) - return path, results - except ftputil.error.ParserError as e: - logger.error("TODO: fix parsing error: " + e.strerror + " @ " + str(e.file_name)) - break - except ftputil.error.FTPError as e: - if e.errno in FtpDirectory.CANCEL_LISTING_CODE: - break - failed_attempts += 1 - self.reconnect() - except ftputil.error.PermanentError as e: - if e.errno == 530: - raise TooManyConnectionsError() - if e.errno is None: - failed_attempts += 1 - self.reconnect() - else: - print(str(e.strerror) + " errno:" + str(e.errno)) - break - except Exception as e: - failed_attempts += 1 - self.reconnect() - logger.error("Exception while processing FTP listing for " + self.base_url + ": " + str(e)) - - return path, [] - - def reconnect(self): - if self.ftp: - self.ftp.close() - success = self.stop_when_connected() - logger.debug("Reconnecting to FTP server " + self.base_url + (" (OK)" if success else " (ERR)")) - - def try_stat(self, path): - - try: - return self.ftp.stat(path) - except ftputil.error.ParserError as e: - # TODO: Try to parse it ourselves? - logger.error("Exception while parsing FTP listing for " + self.base_url + path + " " + e.strerror) - return None - - def close(self): - if self.ftp: - self.ftp.close() - self.ftp = None - logger.debug("Closing FtpRemoteDirectory for " + self.base_url) - diff --git a/crawl_server/remote_http.py b/crawl_server/remote_http.py deleted file mode 100644 index 7f527f9..0000000 --- a/crawl_server/remote_http.py +++ /dev/null @@ -1,269 +0,0 @@ -import pycurl -from io import BytesIO - -from crawl_server import logger -from urllib.parse import unquote, urljoin -import os -from html.parser import HTMLParser -from itertools import repeat -from crawl_server.crawler import RemoteDirectory, File -from multiprocessing.pool import ThreadPool -import config -from dateutil.parser import parse as parse_date -from pycurl import Curl -import hashlib - -import urllib3 -urllib3.disable_warnings() - - -class Anchor: - def __init__(self): - self.text = None - self.href = None - - def __str__(self): - return "<" + self.href + ", " + str(self.text).strip() + ">" - - -class HTMLAnchorParser(HTMLParser): - - def __init__(self): - super().__init__() - self.anchors = [] - self.current_anchor = None - - def handle_starttag(self, tag, attrs): - if tag == "a": - for attr in attrs: - if attr[0] == "href": - self.current_anchor = Anchor() - self.current_anchor.href = attr[1] - break - - def handle_data(self, data): - if self.current_anchor: - self.current_anchor.text = data - - def handle_endtag(self, tag): - if tag == "a": - if self.current_anchor: - self.anchors.append(self.current_anchor) - self.current_anchor = None - - def error(self, message): - logger.debug("HTML Parser error: " + message) - - def feed(self, data): - self.anchors.clear() - super().feed(data) - - -class HttpDirectory(RemoteDirectory): - - SCHEMES = ("http", "https",) - HEADERS = config.HEADERS - BLACK_LIST = ( - "?C=N&O=D", - "?C=M&O=A", - "?C=S&O=A", - "?C=D&O=A", - "?C=N;O=D", - "?C=M;O=A", - "?C=M&O=D", - "?C=S;O=A", - "?C=S&O=D", - "?C=D;O=A", - "?MA", - "?SA", - "?DA", - "?ND", - "?C=N&O=A", - "?C=N&O=A", - "?M=A", - "?N=D", - "?S=A", - "?D=A", - ) - FILE_NAME_BLACKLIST = ( - "Parent Directory", - " Parent Directory" - "../", - - ) - MAX_RETRIES = 2 - TIMEOUT = 25 - - def __init__(self, url): - super().__init__(url) - self.curl = None - self.curl_head = None - self.init_curl() - - def init_curl(self): - - self.curl = Curl() - self.curl.setopt(self.curl.SSL_VERIFYPEER, 0) - self.curl.setopt(self.curl.SSL_VERIFYHOST, 0) - self.curl.setopt(pycurl.TIMEOUT, HttpDirectory.TIMEOUT) - self.curl.setopt(pycurl.USERAGENT, config.HEADERS["User-Agent"]) - - self.curl_head = self._curl_handle() - - @staticmethod - def _curl_handle(): - - curl_head = Curl() - curl_head.setopt(pycurl.SSL_VERIFYPEER, 0) - curl_head.setopt(pycurl.SSL_VERIFYHOST, 0) - curl_head.setopt(pycurl.NOBODY, 1) - curl_head.setopt(pycurl.TIMEOUT, HttpDirectory.TIMEOUT) - curl_head.setopt(pycurl.USERAGENT, config.HEADERS["User-Agent"]) - - return curl_head - - def list_dir(self, path): - - current_dir_name = path[path.rstrip("/").rfind("/") + 1: -1] - path_identifier = hashlib.md5(current_dir_name.encode()) - path_url = urljoin(self.base_url, path, "") - body = self._fetch_body(path_url) - anchors = self._parse_links(body) - - urls_to_request = [] - files = [] - - for anchor in anchors: - if self._should_ignore(self.base_url, path, anchor): - continue - - if self._isdir(anchor): - - directory = File( - name=anchor.href, # todo handle external links here - mtime=0, - size=0, - path=path, - is_dir=True - ) - path_identifier.update(bytes(directory)) - files.append(directory) - else: - urls_to_request.append(urljoin(path_url, anchor.href)) - - for file in self.request_files(urls_to_request): - path_identifier.update(bytes(file)) - files.append(file) - - return path_identifier.hexdigest(), files - - def request_files(self, urls_to_request: list) -> list: - - if len(urls_to_request) > 150: - # Many urls, use multi-threaded solution - pool = ThreadPool(processes=10) - files = pool.starmap(self._request_file, zip(urls_to_request, repeat(self.base_url))) - pool.close() - for file in files: - if file: - yield file - else: - # Too few urls to create thread pool - for url in urls_to_request: - file = self._request_file(url, self.base_url) - if file: - yield file - - @staticmethod - def _request_file(url, base_url): - - retries = HttpDirectory.MAX_RETRIES - while retries > 0: - try: - curl = HttpDirectory._curl_handle() - raw_headers = BytesIO() - curl.setopt(pycurl.URL, url.encode("utf-8", errors="ignore")) - curl.setopt(pycurl.HEADERFUNCTION, raw_headers.write) - curl.perform() - - stripped_url = url[len(base_url) - 1:] - headers = HttpDirectory._parse_dict_header(raw_headers.getvalue().decode("utf-8", errors="ignore")) - raw_headers.close() - - path, name = os.path.split(stripped_url) - date = headers.get("Last-Modified", "1970-01-01") - curl.close() - return File( - path=unquote(path).strip("/"), - name=unquote(name), - size=int(headers.get("Content-Length", -1)), - mtime=int(parse_date(date).timestamp()), - is_dir=False - ) - except pycurl.error: - retries -= 1 - - logger.debug("TimeoutError - _request_file") - raise TimeoutError - - def _fetch_body(self, url: str): - retries = HttpDirectory.MAX_RETRIES - while retries > 0: - try: - content = BytesIO() - self.curl.setopt(pycurl.URL, url.encode("utf-8", errors="ignore")) - self.curl.setopt(pycurl.WRITEDATA, content) - self.curl.perform() - - return content.getvalue().decode("utf-8", errors="ignore") - except pycurl.error: - self.close() - retries -= 1 - - logger.debug("TimeoutError - _fetch_body") - raise TimeoutError - - @staticmethod - def _parse_links(body): - - parser = HTMLAnchorParser() - parser.feed(body) - return parser.anchors - - @staticmethod - def _isdir(link: Anchor): - return link.href.endswith("/") - - @staticmethod - def _should_ignore(base_url, current_path, link: Anchor): - - full_url = urljoin(base_url, link.href) - if full_url == urljoin(urljoin(base_url, current_path), "../") or full_url == base_url: - return True - - if link.href.endswith(HttpDirectory.BLACK_LIST): - return True - - # Ignore external links - if not full_url.startswith(base_url): - return True - - # Ignore parameters in url - if "?" in link.href: - return True - - @staticmethod - def _parse_dict_header(raw): - headers = dict() - for line in raw.split("\r\n")[1:]: # Ignore first 'HTTP/1.0 200 OK' line - if line: - k, v = line.split(":", maxsplit=1) - headers[k.strip()] = v.strip() - - return headers - - def close(self): - self.curl.close() - self.init_curl() - - diff --git a/crawl_server/run.py b/crawl_server/run.py deleted file mode 100644 index ec27d71..0000000 --- a/crawl_server/run.py +++ /dev/null @@ -1,9 +0,0 @@ -from crawl_server.task_manager import TaskManager -import time -import config - -tm = TaskManager(config.CRAWL_SERVER_PROCESSES) -# TODO: On start, indicate that all tasks assigned to this crawler have been dropped - -while True: - time.sleep(1) diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py deleted file mode 100644 index cee1e17..0000000 --- a/crawl_server/task_manager.py +++ /dev/null @@ -1,143 +0,0 @@ -from crawl_server import logger -import os -from tasks import TaskResult, Task -import config -import requests -import json -from multiprocessing import Manager, Pool -from apscheduler.schedulers.background import BackgroundScheduler -from datetime import datetime -from crawl_server.crawler import RemoteDirectoryCrawler - - -class TaskManager: - - def __init__(self, max_processes=2): - self.pool = Pool(maxtasksperchild=1, processes=max_processes) - self.max_processes = max_processes - manager = Manager() - self.current_tasks = manager.list() - - scheduler = BackgroundScheduler() - scheduler.add_job(self.execute_queued_task, "interval", seconds=1) - scheduler.start() - - def fetch_task(self): - try: - payload = { - "token": config.API_TOKEN - } - r = requests.post(config.SERVER_URL + "/task/get", data=payload) - - if r.status_code == 200: - text = r.text - logger.info("Fetched task from server : " + text) - task_json = json.loads(text) - return Task(task_json["website_id"], task_json["url"]) - - return None - - except Exception as e: - raise e - - @staticmethod - def push_result(task_result: TaskResult): - - try: - - logger.info("Uploading file list in small chunks") - filename = "./crawled/" + str(task_result.website_id) + ".json" - CHUNK_SIZE = 500000 * 10 # 5Mb - if os.path.exists(filename): - with open(filename) as f: - chunk = f.read(CHUNK_SIZE) - while chunk: - try: - payload = { - "token": config.API_TOKEN, - "website_id": task_result.website_id - } - - files = { - "file_list": chunk - } - - r = requests.post(config.SERVER_URL + "/task/upload", data=payload, files=files) - logger.info("RESPONSE: " + r.text + "<" + str(r.status_code) + ">") - except Exception as e: - logger.error("Exception while sending file_list chunk: " + str(e)) - pass - chunk = f.read(CHUNK_SIZE) - - payload = { - "token": config.API_TOKEN, - "result": json.dumps(task_result.to_json()) - } - - r = requests.post(config.SERVER_URL + "/task/complete", data=payload) - logger.info("RESPONSE: " + r.text + "<" + str(r.status_code) + ">") - - if os.path.exists(filename): - os.remove(filename) - - except Exception as e: - logger.error("Error during push_result: " + str(e)) - - def execute_queued_task(self): - - if len(self.current_tasks) <= self.max_processes: - - task = self.fetch_task() - - if task: - logger.info("Submitted " + task.url + " to process pool") - self.current_tasks.append(task) - - self.pool.apply_async( - TaskManager.run_task, - args=(task, self.current_tasks), - callback=TaskManager.task_complete, - error_callback=TaskManager.task_error - ) - - @staticmethod - def run_task(task, current_tasks): - - result = TaskResult() - result.start_time = datetime.utcnow().timestamp() - result.website_id = task.website_id - - logger.info("Starting task " + task.url) - - crawler = RemoteDirectoryCrawler(task.url, config.CRAWL_SERVER_THREADS) - crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") - - result.file_count = crawl_result.file_count - result.status_code = crawl_result.status_code - - result.end_time = datetime.utcnow().timestamp() - logger.info("End task " + task.url) - - return result, current_tasks - - @staticmethod - def task_error(result): - logger.error("Uncaught exception during a task: ") - raise result - - @staticmethod - def task_complete(result): - - task_result, current_tasks = result - - logger.info("Task completed, sending result to server") - logger.info("Status code: " + task_result.status_code) - logger.info("File count: " + str(task_result.file_count)) - - TaskManager.push_result(task_result) - - for i, task in enumerate(current_tasks): - if task.website_id == task_result.website_id: - del current_tasks[i] - -