Removed crawl_server module

This commit is contained in:
simon 2019-01-05 09:10:44 -05:00
parent 38dfb657ed
commit d905c3efd5
8 changed files with 6 additions and 785 deletions

View File

@ -18,6 +18,7 @@ Assuming you have Python 3 and git installed:
sudo apt install libssl-dev libcurl4-openssl-dev
git clone https://github.com/simon987/od-database
cd od-database
git submodule update --init --recursive
sudo pip3 install -r requirements.txt
```
Create `/config.py` and fill out the parameters. Sample config:
@ -34,12 +35,6 @@ CAPTCHA_S_SECRET_KEY = ""
# Flask secret key for sessions
FLASK_SECRET = ""
RESULTS_PER_PAGE = (25, 50, 100, 250, 500, 1000)
# Headers for http crawler
HEADERS = {}
# Number of crawler instances (one per task)
CRAWL_SERVER_PROCESSES = 3
# Number of threads per crawler instance
CRAWL_SERVER_THREADS = 20
# Allow ftp websites in /submit
SUBMIT_FTP = False
# Allow http(s) websites in /submit
@ -50,19 +45,16 @@ API_TOKEN = "5817926d-f2f9-4422-a411-a98f1bfe4b6c"
```
## Running the crawl server
```bash
cd od-database
export PYTHONPATH=$(pwd)
cd crawl_server
python3 run.py
```
## Running the web server (development)
The python crawler that was a part of this project is discontinued,
[the go implementation](https://github.com/terorie/od-database-crawler) is currently in use.
## Running the web server (debug)
```bash
cd od-database
python3 app.py
```
## Running the web server with nginx (production)
## Running the web server with Nginx (production)
* Install dependencies:
```bash
sudo apt install build-essential python-dev

View File

@ -1,12 +0,0 @@
import logging
import sys
from logging import FileHandler, StreamHandler
logger = logging.getLogger("default")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
file_handler = FileHandler("crawl_server.log")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(StreamHandler(sys.stdout))

View File

@ -1 +0,0 @@
Crawled directories are temporarily stored here

View File

@ -1,219 +0,0 @@
import os
import ujson
from urllib.parse import urlparse, urljoin
from threading import Thread
from queue import Queue, Empty
from crawl_server import logger
from pybloom_live import ScalableBloomFilter
class TooManyConnectionsError(Exception):
pass
class File:
__slots__ = "name", "size", "mtime", "path", "is_dir"
def __init__(self, name: str, size: int, mtime: int, path: str, is_dir: bool):
self.name = name
self.size = size
self.mtime = mtime
self.path = path
self.is_dir = is_dir
def __bytes__(self):
return b"".join([
self.name.encode(),
b"D" if self.is_dir else b"F",
abs(self.size).to_bytes(6, byteorder="little"),
abs(self.mtime).to_bytes(6, byteorder="little"),
])
def to_json(self):
return ujson.dumps({
"name": self.name,
"size": self.size,
"mtime": self.mtime,
"path": self.path,
})
class RemoteDirectory:
SCHEMES = ()
def __init__(self, base_url):
self.base_url = base_url
def list_dir(self, path: str):
raise NotImplementedError
def close(self):
pass
class RemoteDirectoryFactory:
from crawl_server.remote_ftp import FtpDirectory
from crawl_server.remote_http import HttpDirectory
DIR_ENGINES = (FtpDirectory, HttpDirectory)
@staticmethod
def get_directory(url) -> RemoteDirectory:
parsed_url = urlparse(url)
for dir_engine in RemoteDirectoryFactory.DIR_ENGINES:
if parsed_url.scheme in dir_engine.SCHEMES:
return dir_engine(url)
class CrawlResult:
def __init__(self, file_count: int, status_code: str):
self.file_count = file_count
self.status_code = status_code
class RemoteDirectoryCrawler:
MAX_TIMEOUT_RETRIES = 3
def __init__(self, url, max_threads: int):
self.url = url
self.max_threads = max_threads
self.crawled_paths = ScalableBloomFilter(error_rate=0.0001)
self.status_code = "success"
def crawl_directory(self, out_file: str) -> CrawlResult:
try:
try:
directory = RemoteDirectoryFactory.get_directory(self.url)
logger.info("Crawling directory " + self.url + " with " + str(type(directory)))
path_id, root_listing = directory.list_dir(urlparse(self.url).path)
if root_listing:
self.crawled_paths.add(path_id)
else:
logger.info("No files in root listing for " + self.url)
return CrawlResult(0, "empty")
directory.close()
except TimeoutError:
return CrawlResult(0, "Timeout during initial request")
in_q = Queue(maxsize=0)
files_q = Queue(maxsize=0)
for f in root_listing:
if f.is_dir:
in_q.put(os.path.join(f.path, f.name, ""))
else:
files_q.put(f)
threads = []
for i in range(self.max_threads):
worker = Thread(target=RemoteDirectoryCrawler._process_listings, args=(self, self.url, in_q, files_q))
threads.append(worker)
worker.start()
files_written = [] # Pass array to worker to get result
file_writer_thread = Thread(target=RemoteDirectoryCrawler._log_to_file, args=(files_q, out_file, files_written))
file_writer_thread.start()
in_q.join()
files_q.join()
logger.info("Crawling for " + self.url + " done, waiting for threads to terminate...")
# Kill threads
for _ in threads:
in_q.put(None)
for t in threads:
t.join()
files_q.put(None)
file_writer_thread.join()
return CrawlResult(files_written[0], self.status_code)
except Exception as e:
return CrawlResult(0, str(e) + " \nType:" + str(type(e)))
def _process_listings(self, url: str, in_q: Queue, files_q: Queue):
directory = RemoteDirectoryFactory.get_directory(url)
timeout_retries = 20 # If any worker threads reaches 20 retries, the whole queue is emptied
while directory:
try:
path = in_q.get(timeout=2000)
except Empty:
logger.debug("in_q is Empty")
directory.close()
break
if path is None:
break
try:
path_id, listing = directory.list_dir(path)
if len(listing) > 0 and path_id not in self.crawled_paths:
self.crawled_paths.add(path_id)
for f in listing:
if f.is_dir:
in_q.put(urljoin(f.path, f.name))
else:
files_q.put(f)
logger.debug("LISTED " + urljoin(self.url, path))
except TooManyConnectionsError:
logger.debug("Too many connections, this thread will be killed and path resubmitted")
# Kill worker and resubmit listing task
directory.close()
in_q.put(path)
# TODO: If all workers are killed the queue will never get processed and
# TODO: the crawler will be stuck forever
break
except TimeoutError:
logger.error("Directory listing timed out, " + str(timeout_retries) + " retries left")
if timeout_retries > 0:
timeout_retries -= 1
in_q.put(path)
else:
logger.error("Dropping website " + url)
self.status_code = "Timeout during website listing"
directory.close()
logger.debug("Emptying queue")
while True:
try:
in_q.get_nowait()
in_q.task_done()
except Empty:
break
logger.debug("Emptied queue")
break
finally:
in_q.task_done()
@staticmethod
def _log_to_file(files_q: Queue, out_file: str, files_written: list):
counter = 0
with open(out_file, "w") as f:
while True:
try:
file = files_q.get(timeout=2000)
except Empty:
logger.error("File writer thread timed out")
break
if file is None:
break
f.write(file.to_json() + "\n")
counter += 1
files_q.task_done()
files_written.append(counter)
logger.info("File writer thread done")

View File

@ -1,118 +0,0 @@
#! /usr/bin/env python
from crawl_server import logger
from urllib.parse import urlparse
import os
import time
import ftputil
import ftputil.error
from ftputil.session import session_factory
from crawl_server.crawler import RemoteDirectory, File, TooManyConnectionsError
class FtpDirectory(RemoteDirectory):
SCHEMES = ("ftp", )
CANCEL_LISTING_CODE = (
550, # Forbidden
)
def __init__(self, url):
host = urlparse(url).netloc
super().__init__(host)
self.max_attempts = 3
self.ftp = None
self.stop_when_connected()
def _connect(self):
self.ftp = ftputil.FTPHost(self.base_url, "anonymous", "od-database", session_factory=session_factory(
use_passive_mode=True
))
self.ftp._session.timeout = 30
def stop_when_connected(self):
failed_attempts = 0
while failed_attempts < self.max_attempts:
try:
self._connect()
logger.debug("New FTP connection @ " + self.base_url)
return True
except ftputil.error.FTPError as e:
if e.errno == 530 or e.errno == 421:
break
failed_attempts += 1
print("Connection error; reconnecting..." + e.strerror + " " + str(e.errno))
time.sleep(2)
return False
def list_dir(self, path):
if not self.ftp:
# No connection - assuming that connection was dropped because too many
raise TooManyConnectionsError()
results = []
failed_attempts = 0
while failed_attempts < self.max_attempts:
try:
file_names = self.ftp.listdir(path)
for file_name in file_names:
file_path = os.path.join(path, file_name)
stat = self.try_stat(file_path)
is_dir = self.ftp.path.isdir(file_path)
results.append(File(
name=os.path.join(file_name, "") if is_dir else file_name,
mtime=stat.st_mtime,
size=-1 if is_dir else stat.st_size,
is_dir=is_dir,
path=path.strip("/") if not is_dir else path
))
return path, results
except ftputil.error.ParserError as e:
logger.error("TODO: fix parsing error: " + e.strerror + " @ " + str(e.file_name))
break
except ftputil.error.FTPError as e:
if e.errno in FtpDirectory.CANCEL_LISTING_CODE:
break
failed_attempts += 1
self.reconnect()
except ftputil.error.PermanentError as e:
if e.errno == 530:
raise TooManyConnectionsError()
if e.errno is None:
failed_attempts += 1
self.reconnect()
else:
print(str(e.strerror) + " errno:" + str(e.errno))
break
except Exception as e:
failed_attempts += 1
self.reconnect()
logger.error("Exception while processing FTP listing for " + self.base_url + ": " + str(e))
return path, []
def reconnect(self):
if self.ftp:
self.ftp.close()
success = self.stop_when_connected()
logger.debug("Reconnecting to FTP server " + self.base_url + (" (OK)" if success else " (ERR)"))
def try_stat(self, path):
try:
return self.ftp.stat(path)
except ftputil.error.ParserError as e:
# TODO: Try to parse it ourselves?
logger.error("Exception while parsing FTP listing for " + self.base_url + path + " " + e.strerror)
return None
def close(self):
if self.ftp:
self.ftp.close()
self.ftp = None
logger.debug("Closing FtpRemoteDirectory for " + self.base_url)

View File

@ -1,269 +0,0 @@
import pycurl
from io import BytesIO
from crawl_server import logger
from urllib.parse import unquote, urljoin
import os
from html.parser import HTMLParser
from itertools import repeat
from crawl_server.crawler import RemoteDirectory, File
from multiprocessing.pool import ThreadPool
import config
from dateutil.parser import parse as parse_date
from pycurl import Curl
import hashlib
import urllib3
urllib3.disable_warnings()
class Anchor:
def __init__(self):
self.text = None
self.href = None
def __str__(self):
return "<" + self.href + ", " + str(self.text).strip() + ">"
class HTMLAnchorParser(HTMLParser):
def __init__(self):
super().__init__()
self.anchors = []
self.current_anchor = None
def handle_starttag(self, tag, attrs):
if tag == "a":
for attr in attrs:
if attr[0] == "href":
self.current_anchor = Anchor()
self.current_anchor.href = attr[1]
break
def handle_data(self, data):
if self.current_anchor:
self.current_anchor.text = data
def handle_endtag(self, tag):
if tag == "a":
if self.current_anchor:
self.anchors.append(self.current_anchor)
self.current_anchor = None
def error(self, message):
logger.debug("HTML Parser error: " + message)
def feed(self, data):
self.anchors.clear()
super().feed(data)
class HttpDirectory(RemoteDirectory):
SCHEMES = ("http", "https",)
HEADERS = config.HEADERS
BLACK_LIST = (
"?C=N&O=D",
"?C=M&O=A",
"?C=S&O=A",
"?C=D&O=A",
"?C=N;O=D",
"?C=M;O=A",
"?C=M&O=D",
"?C=S;O=A",
"?C=S&O=D",
"?C=D;O=A",
"?MA",
"?SA",
"?DA",
"?ND",
"?C=N&O=A",
"?C=N&O=A",
"?M=A",
"?N=D",
"?S=A",
"?D=A",
)
FILE_NAME_BLACKLIST = (
"Parent Directory",
" Parent Directory"
"../",
)
MAX_RETRIES = 2
TIMEOUT = 25
def __init__(self, url):
super().__init__(url)
self.curl = None
self.curl_head = None
self.init_curl()
def init_curl(self):
self.curl = Curl()
self.curl.setopt(self.curl.SSL_VERIFYPEER, 0)
self.curl.setopt(self.curl.SSL_VERIFYHOST, 0)
self.curl.setopt(pycurl.TIMEOUT, HttpDirectory.TIMEOUT)
self.curl.setopt(pycurl.USERAGENT, config.HEADERS["User-Agent"])
self.curl_head = self._curl_handle()
@staticmethod
def _curl_handle():
curl_head = Curl()
curl_head.setopt(pycurl.SSL_VERIFYPEER, 0)
curl_head.setopt(pycurl.SSL_VERIFYHOST, 0)
curl_head.setopt(pycurl.NOBODY, 1)
curl_head.setopt(pycurl.TIMEOUT, HttpDirectory.TIMEOUT)
curl_head.setopt(pycurl.USERAGENT, config.HEADERS["User-Agent"])
return curl_head
def list_dir(self, path):
current_dir_name = path[path.rstrip("/").rfind("/") + 1: -1]
path_identifier = hashlib.md5(current_dir_name.encode())
path_url = urljoin(self.base_url, path, "")
body = self._fetch_body(path_url)
anchors = self._parse_links(body)
urls_to_request = []
files = []
for anchor in anchors:
if self._should_ignore(self.base_url, path, anchor):
continue
if self._isdir(anchor):
directory = File(
name=anchor.href, # todo handle external links here
mtime=0,
size=0,
path=path,
is_dir=True
)
path_identifier.update(bytes(directory))
files.append(directory)
else:
urls_to_request.append(urljoin(path_url, anchor.href))
for file in self.request_files(urls_to_request):
path_identifier.update(bytes(file))
files.append(file)
return path_identifier.hexdigest(), files
def request_files(self, urls_to_request: list) -> list:
if len(urls_to_request) > 150:
# Many urls, use multi-threaded solution
pool = ThreadPool(processes=10)
files = pool.starmap(self._request_file, zip(urls_to_request, repeat(self.base_url)))
pool.close()
for file in files:
if file:
yield file
else:
# Too few urls to create thread pool
for url in urls_to_request:
file = self._request_file(url, self.base_url)
if file:
yield file
@staticmethod
def _request_file(url, base_url):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
curl = HttpDirectory._curl_handle()
raw_headers = BytesIO()
curl.setopt(pycurl.URL, url.encode("utf-8", errors="ignore"))
curl.setopt(pycurl.HEADERFUNCTION, raw_headers.write)
curl.perform()
stripped_url = url[len(base_url) - 1:]
headers = HttpDirectory._parse_dict_header(raw_headers.getvalue().decode("utf-8", errors="ignore"))
raw_headers.close()
path, name = os.path.split(stripped_url)
date = headers.get("Last-Modified", "1970-01-01")
curl.close()
return File(
path=unquote(path).strip("/"),
name=unquote(name),
size=int(headers.get("Content-Length", -1)),
mtime=int(parse_date(date).timestamp()),
is_dir=False
)
except pycurl.error:
retries -= 1
logger.debug("TimeoutError - _request_file")
raise TimeoutError
def _fetch_body(self, url: str):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
content = BytesIO()
self.curl.setopt(pycurl.URL, url.encode("utf-8", errors="ignore"))
self.curl.setopt(pycurl.WRITEDATA, content)
self.curl.perform()
return content.getvalue().decode("utf-8", errors="ignore")
except pycurl.error:
self.close()
retries -= 1
logger.debug("TimeoutError - _fetch_body")
raise TimeoutError
@staticmethod
def _parse_links(body):
parser = HTMLAnchorParser()
parser.feed(body)
return parser.anchors
@staticmethod
def _isdir(link: Anchor):
return link.href.endswith("/")
@staticmethod
def _should_ignore(base_url, current_path, link: Anchor):
full_url = urljoin(base_url, link.href)
if full_url == urljoin(urljoin(base_url, current_path), "../") or full_url == base_url:
return True
if link.href.endswith(HttpDirectory.BLACK_LIST):
return True
# Ignore external links
if not full_url.startswith(base_url):
return True
# Ignore parameters in url
if "?" in link.href:
return True
@staticmethod
def _parse_dict_header(raw):
headers = dict()
for line in raw.split("\r\n")[1:]: # Ignore first 'HTTP/1.0 200 OK' line
if line:
k, v = line.split(":", maxsplit=1)
headers[k.strip()] = v.strip()
return headers
def close(self):
self.curl.close()
self.init_curl()

View File

@ -1,9 +0,0 @@
from crawl_server.task_manager import TaskManager
import time
import config
tm = TaskManager(config.CRAWL_SERVER_PROCESSES)
# TODO: On start, indicate that all tasks assigned to this crawler have been dropped
while True:
time.sleep(1)

View File

@ -1,143 +0,0 @@
from crawl_server import logger
import os
from tasks import TaskResult, Task
import config
import requests
import json
from multiprocessing import Manager, Pool
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from crawl_server.crawler import RemoteDirectoryCrawler
class TaskManager:
def __init__(self, max_processes=2):
self.pool = Pool(maxtasksperchild=1, processes=max_processes)
self.max_processes = max_processes
manager = Manager()
self.current_tasks = manager.list()
scheduler = BackgroundScheduler()
scheduler.add_job(self.execute_queued_task, "interval", seconds=1)
scheduler.start()
def fetch_task(self):
try:
payload = {
"token": config.API_TOKEN
}
r = requests.post(config.SERVER_URL + "/task/get", data=payload)
if r.status_code == 200:
text = r.text
logger.info("Fetched task from server : " + text)
task_json = json.loads(text)
return Task(task_json["website_id"], task_json["url"])
return None
except Exception as e:
raise e
@staticmethod
def push_result(task_result: TaskResult):
try:
logger.info("Uploading file list in small chunks")
filename = "./crawled/" + str(task_result.website_id) + ".json"
CHUNK_SIZE = 500000 * 10 # 5Mb
if os.path.exists(filename):
with open(filename) as f:
chunk = f.read(CHUNK_SIZE)
while chunk:
try:
payload = {
"token": config.API_TOKEN,
"website_id": task_result.website_id
}
files = {
"file_list": chunk
}
r = requests.post(config.SERVER_URL + "/task/upload", data=payload, files=files)
logger.info("RESPONSE: " + r.text + "<" + str(r.status_code) + ">")
except Exception as e:
logger.error("Exception while sending file_list chunk: " + str(e))
pass
chunk = f.read(CHUNK_SIZE)
payload = {
"token": config.API_TOKEN,
"result": json.dumps(task_result.to_json())
}
r = requests.post(config.SERVER_URL + "/task/complete", data=payload)
logger.info("RESPONSE: " + r.text + "<" + str(r.status_code) + ">")
if os.path.exists(filename):
os.remove(filename)
except Exception as e:
logger.error("Error during push_result: " + str(e))
def execute_queued_task(self):
if len(self.current_tasks) <= self.max_processes:
task = self.fetch_task()
if task:
logger.info("Submitted " + task.url + " to process pool")
self.current_tasks.append(task)
self.pool.apply_async(
TaskManager.run_task,
args=(task, self.current_tasks),
callback=TaskManager.task_complete,
error_callback=TaskManager.task_error
)
@staticmethod
def run_task(task, current_tasks):
result = TaskResult()
result.start_time = datetime.utcnow().timestamp()
result.website_id = task.website_id
logger.info("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, config.CRAWL_SERVER_THREADS)
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
result.file_count = crawl_result.file_count
result.status_code = crawl_result.status_code
result.end_time = datetime.utcnow().timestamp()
logger.info("End task " + task.url)
return result, current_tasks
@staticmethod
def task_error(result):
logger.error("Uncaught exception during a task: ")
raise result
@staticmethod
def task_complete(result):
task_result, current_tasks = result
logger.info("Task completed, sending result to server")
logger.info("Status code: " + task_result.status_code)
logger.info("File count: " + str(task_result.file_count))
TaskManager.push_result(task_result)
for i, task in enumerate(current_tasks):
if task.website_id == task_result.website_id:
del current_tasks[i]