Tasks can now be queued from the web interface. Tasks are dispatched to the crawl server(s)

This commit is contained in:
Simon
2018-06-12 13:44:03 -04:00
parent 6d48f1f780
commit d61fd75890
14 changed files with 169 additions and 409 deletions

164
crawl_server/crawler.py Normal file
View File

@@ -0,0 +1,164 @@
import os
import json
from urllib.parse import urlparse
from timeout_decorator.timeout_decorator import TimeoutError
from threading import Thread
from queue import Queue, Empty
class TooManyConnectionsError(Exception):
pass
class File:
def __init__(self, name: str, size: int, mtime: int, path: str, is_dir: bool):
self.name = name
self.size = size
self.mtime = mtime
self.path = path
self.is_dir = is_dir
def __str__(self):
return ("DIR " if self.is_dir else "FILE ") + self.path + "/" + self.name
def to_json(self):
return json.dumps({
"name": self.name,
"size": self.size,
"mtime": self.mtime,
"path": self.path,
})
class RemoteDirectory:
SCHEMES = ()
def __init__(self, base_url):
self.base_url = base_url
def list_dir(self, path: str) -> list:
raise NotImplementedError
def close(self):
pass
class RemoteDirectoryFactory:
from crawl_server.remote_ftp import FtpDirectory
from crawl_server.remote_http import HttpDirectory
DIR_ENGINES = (FtpDirectory, HttpDirectory)
@staticmethod
def get_directory(url) -> RemoteDirectory:
parsed_url = urlparse(url)
for dir_engine in RemoteDirectoryFactory.DIR_ENGINES:
if parsed_url.scheme in dir_engine.SCHEMES:
return dir_engine(url)
def export_to_json(q: Queue, out_file: str) -> int:
counter = 0
with open(out_file, "w") as f:
while True:
try:
next_file: File = q.get_nowait()
f.write(next_file.to_json() + "\n")
counter += 1
except Empty:
break
return counter
class CrawlResult:
def __init__(self, file_count: int, status_code: str):
self.file_count = file_count
self.status_code = status_code
class RemoteDirectoryCrawler:
def __init__(self, url, max_threads: int):
self.url = url
self.max_threads = max_threads
self.crawled_paths = set()
def crawl_directory(self, out_file: str) -> CrawlResult:
try:
directory = RemoteDirectoryFactory.get_directory(self.url)
root_listing = directory.list_dir("")
self.crawled_paths.add("")
directory.close()
except TimeoutError:
return CrawlResult(0, "timeout")
in_q = Queue(maxsize=0)
files_q = Queue(maxsize=0)
for f in root_listing:
if f.is_dir:
in_q.put(f)
else:
files_q.put(f)
threads = []
for i in range(self.max_threads):
worker = Thread(target=RemoteDirectoryCrawler._process_listings, args=(self, self.url, in_q, files_q))
threads.append(worker)
worker.start()
in_q.join()
print("Done")
exported_count = export_to_json(files_q, out_file)
print("exported to " + out_file)
# Kill threads
for _ in threads:
in_q.put(None)
for t in threads:
t.join()
return CrawlResult(exported_count, "success")
def _process_listings(self, url: str, in_q: Queue, files_q: Queue):
directory = RemoteDirectoryFactory.get_directory(url)
while directory:
try:
file = in_q.get(timeout=60)
except Empty:
break
if file is None:
break
try:
path = os.path.join(file.path, file.name, "")
if path not in self.crawled_paths:
listing = directory.list_dir(path)
self.crawled_paths.add(path)
for f in listing:
if f.is_dir:
in_q.put(f)
else:
files_q.put(f)
except TooManyConnectionsError:
print("Too many connections")
except TimeoutError:
pass
finally:
in_q.task_done()

View File

@@ -5,12 +5,21 @@ import sqlite3
class TaskResult:
def __init__(self):
self.status_code: str = None
self.file_count = 0
self.start_time = None
self.end_time = None
self.website_id = None
def __init__(self, status_code=None, file_count=0, start_time=0, end_time=0, website_id=0):
self.status_code = status_code
self.file_count = file_count
self.start_time = start_time
self.end_time = end_time
self.website_id = website_id
def to_json(self):
return {
"status_code": self.status_code,
"file_count": self.file_count,
"start_time": self.start_time,
"end_time": self.end_time,
"website_id": self.website_id
}
class Task:
@@ -24,13 +33,16 @@ class Task:
self.callback_args = json.loads(callback_args) if callback_args else {}
def to_json(self):
return ({
return {
"website_id": self.website_id,
"url": self.url,
"priority": self.priority,
"callback_type": self.callback_type,
"callback_args": json.dumps(self.callback_args)
})
}
def __repr__(self):
return json.dumps(self.to_json())
class TaskManagerDatabase:
@@ -96,3 +108,17 @@ class TaskManagerDatabase:
"VALUES (?,?,?,?,?)", (result.website_id, result.status_code, result.file_count,
result.start_time, result.end_time))
conn.commit()
def get_non_indexed_results(self):
"""Get a list of new TaskResults since the last call of this method"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT status_code, file_count, start_time, end_time, website_id"
" FROM TaskResult WHERE indexed_time != NULL")
db_result = cursor.fetchall()
cursor.execute("UPDATE TaskResult SET indexed_time = CURRENT_TIMESTAMP")
return [TaskResult(r[0], r[1], r[2], r[3], r[4]) for r in db_result]

View File

@@ -0,0 +1,79 @@
#! /usr/bin/env python
from urllib.parse import urlparse
import os
import time
import ftputil
import ftputil.error
from ftputil.session import session_factory
import random
import timeout_decorator
from crawl_server.crawler import RemoteDirectory, File, TooManyConnectionsError
class FtpDirectory(RemoteDirectory):
SCHEMES = ("ftp", )
def __init__(self, url):
host = urlparse(url).netloc
super().__init__(host)
self.failed_attempts = 0
self.max_attempts = 2
self.ftp = None
self.stop_when_connected()
def _connect(self):
self.ftp = ftputil.FTPHost(self.base_url, "anonymous", "od-database", session_factory=session_factory(
use_passive_mode=False
))
def stop_when_connected(self):
while self.failed_attempts < self.max_attempts:
try:
self._connect()
self.failed_attempts = 0
break
except ftputil.error.FTPError as e:
if e.errno == 530:
print("Cancel connection - too many connections")
break
self.failed_attempts += 1
print("Connection error; reconnecting...")
time.sleep(2 * random.uniform(0.5, 1.5))
self.stop_when_connected()
@timeout_decorator.timeout(15, use_signals=False)
def list_dir(self, path) -> list:
if not self.ftp:
print("Conn closed")
return []
results = []
try:
self.ftp.chdir(path)
file_names = self.ftp.listdir(path)
for file_name in file_names:
stat = self.ftp.stat(file_name)
is_dir = self.ftp.path.isdir(os.path.join(path, file_name))
results.append(File(
name=file_name,
mtime=stat.st_mtime, # TODO: check
size=-1 if is_dir else stat.st_size,
is_dir=is_dir,
path=path
))
except ftputil.error.FTPError as e:
if e.errno == 530:
raise TooManyConnectionsError()
pass
return results
def close(self):
if self.ftp:
self.ftp.close()

153
crawl_server/remote_http.py Normal file
View File

@@ -0,0 +1,153 @@
from urllib.parse import urljoin, unquote
import os
from lxml import etree
from itertools import repeat
from crawl_server.crawler import RemoteDirectory, File
import requests
from requests.exceptions import RequestException
from multiprocessing.pool import ThreadPool
import config
from dateutil.parser import parse as parse_date
class Link:
def __init__(self, text: str, url: str):
self.text = text
self.url = url
class HttpDirectory(RemoteDirectory):
SCHEMES = ("http", "https",)
HEADERS = config.HEADERS
BLACK_LIST = (
"?C=N&O=D",
"?C=M&O=A",
"?C=S&O=A",
"?C=D&O=A",
"?C=N;O=D",
"?C=M;O=A",
"?C=S;O=A",
"?C=D;O=A"
)
MAX_RETRIES = 3
def __init__(self, url):
super().__init__(url)
self.parser = etree.HTMLParser(collect_ids=False)
def list_dir(self, path) -> list:
results = []
path_url = os.path.join(self.base_url, path.strip("/"), "")
body = self._fetch_body(path_url)
if not body:
return []
links = self._parse_links(body)
urls_to_request = []
for link in links:
if self._should_ignore(link):
continue
file_url = urljoin(path_url, link.url)
path, file_name = os.path.split(file_url[len(self.base_url) - 1:])
if self._isdir(link):
results.append(File(
name=file_name,
mtime=0,
size=-1,
is_dir=True,
path=path
))
else:
urls_to_request.append(file_url)
results.extend(self.request_files(urls_to_request))
return results
def request_files(self, urls_to_request: list) -> list:
results = []
if len(urls_to_request) > 4:
# Many urls, use multi-threaded solution
pool = ThreadPool(processes=10)
files = pool.starmap(HttpDirectory._request_file, zip(repeat(self), urls_to_request))
pool.close()
for file in files:
if file:
results.append(file)
else:
# Too few urls to create thread pool
for url in urls_to_request:
file = self._request_file(url)
if file:
results.append(file)
return results
def _get_url(self, path: str):
return urljoin(self.base_url, path)
@staticmethod
def _fetch_body(url: str):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
r = requests.get(url, headers=HttpDirectory.HEADERS)
return r.text
except RequestException:
retries -= 1
return None
def _parse_links(self, body: str) -> set:
result = set()
tree = etree.HTML(body, parser=self.parser)
links = tree.findall(".//a/[@href]")
for link in links:
result.add(Link(link.text, link.get("href")))
return result
@staticmethod
def _isdir(link: Link):
return link.url.rsplit("?", maxsplit=1)[0].endswith("/")
def _request_file(self, url):
retries = HttpDirectory.MAX_RETRIES
while retries > 0:
try:
r = requests.head(url, headers=HttpDirectory.HEADERS, allow_redirects=False, timeout=50)
stripped_url = url[len(self.base_url) - 1:]
path, name = os.path.split(stripped_url)
date = r.headers["Date"] if "Date" in r.headers else "1970-01-01"
return File(
path=unquote(path).strip("/"),
name=unquote(name),
size=int(r.headers["Content-Length"]) if "Content-Length" in r.headers else -1,
mtime=int(parse_date(date).timestamp()),
is_dir=False
)
except RequestException:
retries -= 1
return None
@staticmethod
def _should_ignore(link: Link):
return link.text == "../" or link.url.endswith(HttpDirectory.BLACK_LIST)

View File

@@ -1,16 +1,11 @@
from flask import Flask, request, abort, Response
import json
from crawl_server.task_manager import TaskManager, Task
from crawl_server.task_manager import TaskManager, Task, TaskResult
app = Flask(__name__)
tm = TaskManager("tm_db.sqlite3")
@app.route("/")
def hello():
return "Hello World!"
@app.route("/task/")
def get_tasks():
json_str = json.dumps([task.to_json() for task in tm.get_tasks()])
@@ -37,5 +32,18 @@ def task_put():
return abort(400)
@app.route("/task/completed", methods=["GET"])
def get_completed_tasks():
json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()])
return json_str
@app.route("/task/current", methods=["GET"])
def get_current_tasks():
current_tasks = tm.get_current_tasks()
return current_tasks
if __name__ == "__main__":
app.run()
app.run(port=5001)

View File

@@ -2,7 +2,7 @@ from crawl_server.database import TaskManagerDatabase, Task, TaskResult
from multiprocessing import Pool
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from crawler.crawler import RemoteDirectoryCrawler
from crawl_server.crawler import RemoteDirectoryCrawler
class TaskManager:
@@ -12,8 +12,10 @@ class TaskManager:
self.db = TaskManagerDatabase(db_path)
self.pool = Pool(processes=max_processes)
self.current_tasks = []
scheduler = BackgroundScheduler()
scheduler.add_job(self.execute_queued_task, "interval", seconds=1)
scheduler.add_job(self.execute_queued_task, "interval", seconds=5)
scheduler.start()
def put_task(self, task: Task):
@@ -22,11 +24,21 @@ class TaskManager:
def get_tasks(self):
return self.db.get_tasks()
def get_current_tasks(self):
return self.current_tasks
def get_non_indexed_results(self):
return self.db.get_non_indexed_results()
def execute_queued_task(self):
task = self.db.pop_task()
if task:
self.current_tasks.append(task)
print("pooled " + task.url)
self.pool.apply_async(
TaskManager.run_task,
args=(task, self.db_path),
@@ -68,8 +80,9 @@ class TaskManager:
@staticmethod
def task_error(err):
print("ERROR")
print("FIXME: Task failed (This should not happen)")
print(err)
raise err