From 7f496ce7a853c9e5be7dc69679dcb775fd2dca00 Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 11 Jun 2018 15:46:55 -0400 Subject: [PATCH] Slowly losing my sanity part 1: Removed scrapy dependency and moved to custom solution. Added multi-threaded ftp crawler --- {scrapy_od_database => crawler}/__init__.py | 0 crawler/crawler.py | 133 ++++++++++++++++++ crawler/ftp.py | 79 +++++++++++ crawler/http.py | 123 ++++++++++++++++ ftp_crawler.py | 127 ----------------- od_util.py | 2 +- requirements.txt | 4 +- scrapy_od_database/settings.py | 71 ---------- scrapy_od_database/spiders/__init__.py | 4 - scrapy_od_database/spiders/od_links_spider.py | 79 ----------- 10 files changed, 338 insertions(+), 284 deletions(-) rename {scrapy_od_database => crawler}/__init__.py (100%) create mode 100644 crawler/crawler.py create mode 100644 crawler/ftp.py create mode 100644 crawler/http.py delete mode 100644 ftp_crawler.py delete mode 100644 scrapy_od_database/settings.py delete mode 100644 scrapy_od_database/spiders/__init__.py delete mode 100644 scrapy_od_database/spiders/od_links_spider.py diff --git a/scrapy_od_database/__init__.py b/crawler/__init__.py similarity index 100% rename from scrapy_od_database/__init__.py rename to crawler/__init__.py diff --git a/crawler/crawler.py b/crawler/crawler.py new file mode 100644 index 0000000..08dc271 --- /dev/null +++ b/crawler/crawler.py @@ -0,0 +1,133 @@ +import os +import json +from urllib.parse import urlparse +from timeout_decorator.timeout_decorator import TimeoutError +from threading import Thread +from queue import Queue, Empty + + +class TooManyConnectionsError(Exception): + pass + + +class File: + + def __init__(self, name: str, size: int, mtime: str, path: str, is_dir: bool): + self.name = name + self.size = size + self.mtime = mtime + self.path = path + self.is_dir = is_dir + + def __str__(self): + return ("DIR " if self.is_dir else "FILE ") + self.path + "/" + self.name + + def to_json(self): + return json.dumps({ + "name": self.name, + "size": self.size, + "mtime": self.mtime, + "path": self.path, + }) + + +class RemoteDirectory: + + SCHEMES = () + + def __init__(self, base_url): + self.base_url = base_url + + def list_dir(self, path: str) -> list: + raise NotImplementedError + + def close(self): + pass + + +class RemoteDirectoryFactory: + + from crawler.ftp import FtpDirectory + from crawler.http import HttpDirectory + DIR_ENGINES = (FtpDirectory, HttpDirectory) + + @staticmethod + def get_directory(url) -> RemoteDirectory: + + parsed_url = urlparse(url) + + for dir_engine in RemoteDirectoryFactory.DIR_ENGINES: + if parsed_url.scheme in dir_engine.SCHEMES: + return dir_engine(url) + + +class RemoteDirectoryCrawler: + + def __init__(self, url, max_threads: int): + self.url = url + self.max_threads = max_threads + + def crawl_directory(self): + + try: + directory = RemoteDirectoryFactory.get_directory(self.url) + root_listing = directory.list_dir("/dl2/") # todo get path + directory.close() + except TimeoutError: + return + + in_q = Queue(maxsize=0) + files_q = Queue(maxsize=0) + for f in root_listing: + if f.is_dir: + in_q.put(f) + else: + files_q.put(f) + + threads = [] + for i in range(self.max_threads): + worker = Thread(target=RemoteDirectoryCrawler._process_listings, args=(self.url, in_q, files_q)) + threads.append(worker) + worker.start() + + in_q.join() + print("DONE") + + # Kill threads + for _ in threads: + in_q.put(None) + for t in threads: + t.join() + + print(files_q.qsize()) + return [] + + @staticmethod + def _process_listings(url: str, in_q: Queue, files_q: Queue): + + directory = RemoteDirectoryFactory.get_directory(url) + + while directory: + + try: + file = in_q.get(timeout=60) + except Empty: + break + + if file is None: + break + + try: + listing = directory.list_dir(os.path.join(file.path, file.name, "")) + + for f in listing: + if f.is_dir: + in_q.put(f) + else: + files_q.put(f) + except TooManyConnectionsError as e: + print("TOO MANY CONNNS") + except TimeoutError: + pass + finally: + in_q.task_done() diff --git a/crawler/ftp.py b/crawler/ftp.py new file mode 100644 index 0000000..4eb3436 --- /dev/null +++ b/crawler/ftp.py @@ -0,0 +1,79 @@ +#! /usr/bin/env python + +from urllib.parse import urlparse +import os +import time +import ftputil +import ftputil.error +from ftputil.session import session_factory +import random +import timeout_decorator +from crawler.crawler import RemoteDirectory, File, TooManyConnectionsError + + +class FtpDirectory(RemoteDirectory): + + SCHEMES = ("ftp", ) + + def __init__(self, url): + host = urlparse(url).netloc + super().__init__(host) + self.failed_attempts = 0 + self.max_attempts = 2 + self.ftp = None + self.stop_when_connected() + + def _connect(self): + self.ftp = ftputil.FTPHost(self.base_url, "anonymous", "od-database", session_factory=session_factory( + use_passive_mode=False + )) + + def stop_when_connected(self): + while self.failed_attempts < self.max_attempts: + try: + self._connect() + self.failed_attempts = 0 + break + except ftputil.error.FTPError as e: + + if e.errno == 530: + print("Cancel connection - too many connections") + break + + self.failed_attempts += 1 + print("Connection error; reconnecting...") + time.sleep(2 * random.uniform(0.5, 1.5)) + self.stop_when_connected() + + @timeout_decorator.timeout(15, use_signals=False) + def list_dir(self, path) -> list: + if not self.ftp: + print("Conn closed") + return [] + results = [] + try: + self.ftp.chdir(path) + file_names = self.ftp.listdir(path) + + for file_name in file_names: + stat = self.ftp.stat(file_name) + is_dir = self.ftp.path.isdir(os.path.join(path, file_name)) + + results.append(File( + name=file_name, + mtime=stat.st_mtime, + size=-1 if is_dir else stat.st_size, + is_dir=is_dir, + path=path + )) + except ftputil.error.FTPError as e: + if e.errno == 530: + raise TooManyConnectionsError() + pass + + return results + + def close(self): + if self.ftp: + self.ftp.close() + diff --git a/crawler/http.py b/crawler/http.py new file mode 100644 index 0000000..b092086 --- /dev/null +++ b/crawler/http.py @@ -0,0 +1,123 @@ +from urllib.parse import urlparse, urljoin, unquote + +import os +from lxml import etree +from itertools import repeat +from crawler.crawler import RemoteDirectory, File +import requests +from requests.exceptions import RequestException +from multiprocessing.pool import ThreadPool + + +class HttpDirectory(RemoteDirectory): + + SCHEMES = ("http", "https",) + HEADERS = {} + BLACK_LIST = ( + "?C=N&O=D", + "?C=M&O=A", + "?C=S&O=A", + "?C=D&O=A", + "?C=N;O=D", + "?C=M;O=A", + "?C=S;O=A", + "?C=D;O=A" + ) + + def __init__(self, url): + super().__init__(url) + self.parser = etree.HTMLParser(collect_ids=False) + + def list_dir(self, path) -> list: + + results = [] + + path_url = urljoin(self.base_url, path) + body = self._fetch_body(path_url) + links = self._parse_links(body) + + urls_to_request = [] + + for link in links: + + if self._should_ignore(link): + continue + file_url = urljoin(path_url, link[1]) + path, file_name = os.path.split(file_url[len(self.base_url) - 1:]) + + if self._isdir(link): + + results.append(File( + name=file_name, + mtime="", + size=-1, + is_dir=True, + path=path + )) + else: + urls_to_request.append(file_url) + + pool = ThreadPool(processes=10) + files = pool.starmap(HttpDirectory._request_file, zip(repeat(self), urls_to_request)) + for f in files: + if f: + results.append(f) + + return results + + def _get_url(self, path: str): + return urljoin(self.base_url, path) + + @staticmethod + def _fetch_body(url: str): + + # todo timeout + print("FETCH " + url) + r = requests.get(url, headers=HttpDirectory.HEADERS) + return r.text + + def _parse_links(self, body: str) -> set: + + result = set() + tree = etree.HTML(body, parser=self.parser) + links = tree.findall(".//a/[@href]") + + for link in links: + result.add((link.text, link.get("href"))) + + return result + + @staticmethod + def _isdir(url): + return url[1].rsplit("?", maxsplit=1)[0].endswith("/") + + def _request_file(self, url): + + # todo timeout + retries = 3 + while retries > 0: + try: + print("HEAD " + url) + r = requests.head(url, headers=HttpDirectory.HEADERS, allow_redirects=False, timeout=50) + + stripped_url = r.url[len(self.base_url) - 1:] + + path, name = os.path.split(stripped_url) + + return File( + path=unquote(path).strip("/"), + name=unquote(name), + size=int(r.headers["Content-Length"]) if "Content-Length" in r.headers else -1, + mtime=r.headers["Date"] if "Date" in r.headers else "?", + is_dir=False + ) + except RequestException: + retries -= 1 + + return None + + + @staticmethod + def _should_ignore(link): + return link[0] == "../" or link[1].endswith(HttpDirectory.BLACK_LIST) + diff --git a/ftp_crawler.py b/ftp_crawler.py deleted file mode 100644 index d49f0b7..0000000 --- a/ftp_crawler.py +++ /dev/null @@ -1,127 +0,0 @@ -#! /usr/bin/env python - -from threading import Thread -from queue import Queue -import os -import time -import ftputil -import ftputil.error -import random - - -class File: - - def __init__(self, name: str, size: int, mtime: str, path: str, is_dir: bool): - self.name = name - self.size = size - self.mtime = mtime - self.path = path - self.is_dir = is_dir - self.ftp = None - - def __str__(self): - return ("DIR " if self.is_dir else "FILE ") + self.path + "/" + self.name - - -class FTPConnection(object): - def __init__(self, host): - self.host = host - self.failed_attempts = 0 - self.max_attempts = 2 - self.ftp = None - self.stop_when_connected() - - def _connect(self): - print("Connecting to " + self.host) - self.ftp = ftputil.FTPHost(self.host, "anonymous", "od-database") - - def stop_when_connected(self): - while self.failed_attempts < self.max_attempts: - try: - self._connect() - self.failed_attempts = 0 - break - except ftputil.error.FTPError as e: - - if e.errno == 530: - print("Cancel connection - too many connections") - break - - self.failed_attempts += 1 - print("LIST FAILED; reconnecting...") - time.sleep(2 * random.uniform(0.5, 1.5)) - self.stop_when_connected() - - def list_dir(self, path) -> list: - if not self.ftp: - return [] - results = [] - self.ftp.chdir(path) - try: - file_names = self.ftp.listdir(path) - - for file_name in file_names: - stat = self.ftp.stat(file_name) - is_dir = self.ftp.path.isdir(os.path.join(path, file_name)) - - results.append(File( - name=file_name, - mtime=stat.st_mtime, - size=-1 if is_dir else stat.st_size, - is_dir=is_dir, - path=path - )) - except ftputil.error.FTPError: - print("ERROR parsing " + path) - - return results - - -def process_and_queue(host, q: Queue): - - ftp = FTPConnection(host) - - while ftp.ftp: - file = q.get() - - if file.is_dir: - print(file) - try: - listing = ftp.list_dir(os.path.join(file.path, file.name)) - for f in listing: - q.put(f) - except ftputil.error.PermanentError as e: - if e.errno == 530: - # Too many connections, retry this dir but kill this thread - q.put(file) - ftp.ftp.close() - print("Dropping connection because too many") - else: - pass - - q.task_done() - - -def crawl_ftp_server(host: str, max_threads: int) -> list: - - ftp = FTPConnection(host) - root_listing = ftp.list_dir("/") - if ftp.ftp: - ftp.ftp.close() - - q = Queue(maxsize=0) - for i in range(max_threads): - worker = Thread(target=process_and_queue, args=(host, q,)) - worker.setDaemon(True) - worker.start() - - for file in root_listing: - q.put(file) - - q.join() - return [] - - -if __name__ == '__main__': - import sys - crawl_ftp_server(sys.argv[1], 50) diff --git a/od_util.py b/od_util.py index f84bee7..2961d4b 100644 --- a/od_util.py +++ b/od_util.py @@ -75,7 +75,7 @@ def is_od(url): ftp.close() return True else: - r = requests.get(url, timeout=15, allow_redirects=False) + r = requests.get(url, timeout=30, allow_redirects=False) if r.status_code != 200: print("No redirects allowed!") return False diff --git a/requirements.txt b/requirements.txt index f6cb662..a001e40 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -scrapy flask requests bs4 @@ -9,4 +8,5 @@ praw humanfriendly apscheduler bcrypt -ftputil \ No newline at end of file +ftputil +lxml \ No newline at end of file diff --git a/scrapy_od_database/settings.py b/scrapy_od_database/settings.py deleted file mode 100644 index 0e14ea1..0000000 --- a/scrapy_od_database/settings.py +++ /dev/null @@ -1,71 +0,0 @@ -# -*- coding: utf-8 -*- - -# For simplicity, this file contains only settings considered important or -# commonly used. You can find more settings consulting the documentation: -# -# https://doc.scrapy.org/en/latest/topics/settings.html -# https://doc.scrapy.org/en/latest/topics/downloader-middleware.html -# https://doc.scrapy.org/en/latest/topics/spider-middleware.html - -BOT_NAME = 'scrapy_od_database' - -SPIDER_MODULES = ['scrapy_od_database.spiders'] -NEWSPIDER_MODULE = 'scrapy_od_database.spiders' -DOWNLOAD_HANDLERS = {'ftp': 'scrapy_od_database.handlers.FtpListingHandler'} - -LOG_LEVEL = 'ERROR' -FEED_FORMAT = 'json' -FEED_URI = 'data.json' - -USER_AGENT = 'Mozilla/5.0 (X11; od-database.simon987.net) AppleWebKit/537.36 ' \ - '(KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36' - -# Obey robots.txt rules -ROBOTSTXT_OBEY = False - -# Configure maximum concurrent requests performed by Scrapy (default: 16) -CONCURRENT_REQUESTS = 40 -RETRY_TIMES = 6 -DOWNLOAD_TIMEOUT = 90 - -# Configure a delay for requests for the same website (default: 0) -# See https://doc.scrapy.org/en/latest/topics/settings.html#download-delay -# See also autothrottle settings and docs -#DOWNLOAD_DELAY = 3 -# The download delay setting will honor only one of: -CONCURRENT_REQUESTS_PER_DOMAIN = 40 -# CONCURRENT_REQUESTS_PER_IP = 16 - -# Disable cookies (enabled by default) -#COOKIES_ENABLED = False - -# Disable Telnet Console (enabled by default) -TELNETCONSOLE_ENABLED = False - -# Override the default request headers: -#DEFAULT_REQUEST_HEADERS = { -# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', -# 'Accept-Language': 'en', -#} - -# Enable and configure the AutoThrottle extension (disabled by default) -# See https://doc.scrapy.org/en/latest/topics/autothrottle.html -#AUTOTHROTTLE_ENABLED = True -# The initial download delay -#AUTOTHROTTLE_START_DELAY = 5 -# The maximum download delay to be set in case of high latencies -#AUTOTHROTTLE_MAX_DELAY = 60 -# The average number of requests Scrapy should be sending in parallel to -# each remote server -#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 -# Enable showing throttling stats for every response received: -#AUTOTHROTTLE_DEBUG = False - -# Enable and configure HTTP caching (disabled by default) -# See https://doc.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings -#HTTPCACHE_ENABLED = True -#HTTPCACHE_EXPIRATION_SECS = 0 -#HTTPCACHE_DIR = 'httpcache' -#HTTPCACHE_IGNORE_HTTP_CODES = [] -#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage' - diff --git a/scrapy_od_database/spiders/__init__.py b/scrapy_od_database/spiders/__init__.py deleted file mode 100644 index ebd689a..0000000 --- a/scrapy_od_database/spiders/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# This package will contain the spiders of your Scrapy project -# -# Please refer to the documentation for information on how to create and manage -# your spiders. diff --git a/scrapy_od_database/spiders/od_links_spider.py b/scrapy_od_database/spiders/od_links_spider.py deleted file mode 100644 index fe80f19..0000000 --- a/scrapy_od_database/spiders/od_links_spider.py +++ /dev/null @@ -1,79 +0,0 @@ -import scrapy -import os -from urllib.parse import unquote - - -class LinksSpider(scrapy.Spider): - """Scrapy spider for open directories. Will gather all download links recursively""" - - name = "od_links" - - black_list = ( - "?C=N&O=D", - "?C=M&O=A", - "?C=S&O=A", - "?C=D&O=A", - "?C=N;O=D", - "?C=M;O=A", - "?C=S;O=A", - "?C=D;O=A" - ) - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.crawled_links = set() - - def __index__(self, **kw): - super(LinksSpider, self).__init__(**kw) - self.base_url = kw.get("base_url") - - def should_ask_headers(self, link): - """Whether or not to send HEAD request""" - return link not in self.crawled_links and not link.rsplit("?", maxsplit=1)[0].endswith("/") - - def should_crawl(self, link): - """Whether or not the link should be followed""" - if link in self.crawled_links: - return False - - if link.endswith(tuple(self.black_list)): - return False - - if not link.startswith(self.base_url): - return False - - return link.rsplit("?", maxsplit=1)[0].endswith("/") - - def start_requests(self): - yield scrapy.Request(url=self.base_url, callback=self.parse) - - def parse(self, response): - if response.status == 200: - links = response.xpath('//a/@href').extract() - for link in links: - full_link = response.urljoin(link) - - if self.should_ask_headers(full_link): - yield scrapy.Request(full_link, method="HEAD", callback=self.save_file) - elif self.should_crawl(full_link): - self.crawled_links.add(full_link) - yield scrapy.Request(full_link, callback=self.parse) - - def save_file(self, response): - - if response.status == 200: - # Save file information - stripped_url = response.url[len(self.base_url) - 1:] - self.crawled_links.add(response.url) - - path, name = os.path.split(stripped_url) - - yield { - "path": unquote(path).strip("/"), - "name": unquote(name), - "size": int(response.headers["Content-Length"].decode("utf-8")) if "Content-Length" in response.headers else -1, - "mime": response.headers["Content-Type"].decode("utf-8").split(";", maxsplit=1)[0] - if "Content-Type" in response.headers else "?", - "mtime": response.headers["Date"].decode("utf-8") if "Date" in response.headers else "?" - } -