Crawl tasks are now fetched by the crawlers instead of pushed by the server

This commit is contained in:
Simon 2018-07-14 17:31:18 -04:00
parent d9e9f53f92
commit fe1d29aaea
20 changed files with 376 additions and 749 deletions

View File

@ -22,9 +22,6 @@ FLASK_SECRET = ""
RESULTS_PER_PAGE = (25, 50, 100, 250, 500, 1000)
# Headers for http crawler
HEADERS = {}
# Token for the crawl server, used by the server to communicate to the crawl server
CRAWL_SERVER_TOKEN = ""
CRAWL_SERVER_PORT = 5001
# Number of crawler instances (one per task)
CRAWL_SERVER_PROCESSES = 3
# Number of threads per crawler instance
@ -33,6 +30,9 @@ CRAWL_SERVER_THREADS = 20
SUBMIT_FTP = False
# Allow http(s) websites in /submit
SUBMIT_HTTP = True
SERVER_URL = "http://localhost/api"
API_TOKEN = "5817926d-f2f9-4422-a411-a98f1bfe4b6c"
```
## Running the crawl server

114
app.py
View File

@ -3,13 +3,14 @@ import json
from urllib.parse import urlparse
import os
import time
import datetime
import itertools
from database import Database, Website, InvalidQueryException
from flask_recaptcha import ReCaptcha
import od_util
import config
from flask_caching import Cache
from task import TaskDispatcher, Task, CrawlServer
from tasks import TaskManager, Task, TaskResult
from search.search import ElasticSearchEngine
app = Flask(__name__)
@ -26,12 +27,12 @@ app.jinja_env.globals.update(truncate_path=od_util.truncate_path)
app.jinja_env.globals.update(get_color=od_util.get_color)
app.jinja_env.globals.update(get_mime=od_util.get_category)
taskDispatcher = TaskDispatcher()
taskManager = TaskManager()
searchEngine = ElasticSearchEngine("od-database")
@app.template_filter("date_format")
def datetime_format(value, format='%Y-%m-%d'):
def date_format(value, format='%Y-%m-%d'):
return time.strftime(format, time.gmtime(value))
@ -40,6 +41,11 @@ def datetime_format(value, format='%Y-%m-%d %H:%M:%S'):
return time.strftime(format, time.gmtime(value))
@app.template_filter("from_timestamp")
def from_timestamp(value):
return datetime.datetime.fromtimestamp(value)
@app.route("/dl")
def downloads():
try:
@ -53,7 +59,7 @@ def downloads():
@app.route("/stats")
def stats_page():
crawl_server_stats = db.get_stats_by_server()
crawl_server_stats = db.get_stats_by_crawler()
return render_template("stats.html", crawl_server_stats=crawl_server_stats)
@ -136,7 +142,7 @@ def random_website():
def admin_redispatch_queued():
if "username" in session:
count = taskDispatcher.redispatch_queued()
count = taskManager.redispatch_queued()
flash("Re-dispatched " + str(count) + " tasks", "success")
return redirect("/dashboard")
@ -145,7 +151,7 @@ def admin_redispatch_queued():
def get_empty_websites():
current_tasks = itertools.chain(taskDispatcher.get_queued_tasks(), taskDispatcher.get_current_tasks())
current_tasks = taskManager.get_queued_tasks()
queued_websites = [task.website_id for task in current_tasks]
all_websites = db.get_all_websites()
@ -180,7 +186,7 @@ def admin_queue_empty_websites():
for website_id in get_empty_websites():
website = db.get_website_by_id(website_id)
task = Task(website.id, website.url, 1)
taskDispatcher.dispatch_task(task)
taskManager.queue_task(task)
flash("Dispatched empty websites", "success")
return redirect("/dashboard")
@ -221,7 +227,7 @@ def admin_rescan_website(website_id):
if website:
priority = request.args.get("priority") if "priority" in request.args else 1
task = Task(website_id, website.url, priority)
taskDispatcher.dispatch_task(task)
taskManager.queue_task(task)
flash("Enqueued rescan task", "success")
else:
@ -320,16 +326,14 @@ def home():
try:
stats = searchEngine.get_global_stats()
stats["website_count"] = len(db.get_all_websites())
current_websites = ", ".join(task.url for task in taskDispatcher.get_current_tasks())
except:
stats = {}
current_websites = None
return render_template("home.html", stats=stats, current_websites=current_websites)
return render_template("home.html", stats=stats)
@app.route("/submit")
def submit():
queued_websites = taskDispatcher.get_queued_tasks()
queued_websites = taskManager.get_queued_tasks()
return render_template("submit.html", queue=queued_websites, recaptcha=recaptcha, show_captcha=config.CAPTCHA_SUBMIT)
@ -362,7 +366,7 @@ def try_enqueue(url):
web_id = db.insert_website(Website(url, str(request.remote_addr), str(request.user_agent)))
task = Task(web_id, url, priority=1)
taskDispatcher.dispatch_task(task)
taskManager.queue_task(task)
return "The website has been added to the queue", "success"
@ -450,9 +454,8 @@ def admin_dashboard():
tokens = db.get_tokens()
blacklist = db.get_blacklist()
crawl_servers = db.get_crawl_servers()
return render_template("dashboard.html", api_tokens=tokens, blacklist=blacklist, crawl_servers=crawl_servers)
return render_template("dashboard.html", api_tokens=tokens, blacklist=blacklist)
else:
return abort(403)
@ -516,52 +519,59 @@ def admin_crawl_logs():
return abort(403)
@app.route("/crawl_server/add", methods=["POST"])
def admin_add_crawl_server():
if "username" in session:
@app.route("/api/task/get", methods=["POST"])
def api_get_task():
token = request.form.get("token")
name = db.check_api_token(token)
server = CrawlServer(
request.form.get("url"),
request.form.get("name"),
request.form.get("slots"),
request.form.get("token")
)
db.add_crawl_server(server)
flash("Added crawl server", "success")
return redirect("/dashboard")
if name:
task = db.pop_task(name)
if task:
print("Assigning task " + str(task.website_id) + " to " + name)
return Response(str(task), mimetype="application/json")
else:
return abort(404)
else:
return abort(403)
@app.route("/crawl_server/<int:server_id>/delete")
def admin_delete_crawl_server(server_id):
if "username" in session:
@app.route("/api/task/complete", methods=["POST"])
def api_complete_task():
token = request.form.get("token")
tr = json.loads(request.form.get("result"))
print(tr)
task_result = TaskResult(tr["status_code"], tr["file_count"], tr["start_time"], tr["end_time"], tr["website_id"])
db.remove_crawl_server(server_id)
flash("Deleted crawl server", "success")
return redirect("/dashboard")
name = db.check_api_token(token)
if name:
print("Task for " + str(task_result.website_id) + " completed by " + name)
task = db.complete_task(task_result.website_id, name)
if task:
if "file_list" in request.files:
file = request.files['file_list']
filename = "./tmp/" + str(task_result.website_id) + ".json"
print("Saving temp file " + filename + " ...")
file.save(filename)
print("Done")
else:
filename = None
taskManager.complete_task(filename, task, task_result, name)
if os.path.exists(filename):
os.remove(filename)
# TODO: handle callback here
return "Successfully logged task result and indexed files"
else:
abort(403)
@app.route("/crawl_server/<int:server_id>/update", methods=["POST"])
def admin_update_crawl_server(server_id):
crawl_servers = db.get_crawl_servers()
for server in crawl_servers:
if server.id == server_id:
new_slots = request.form.get("slots") if "slots" in request.form else server.slots
new_name = request.form.get("name") if "name" in request.form else server.name
new_url = request.form.get("url") if "url" in request.form else server.url
db.update_crawl_server(server_id, new_url, new_name, new_slots)
flash("Updated crawl server", "success")
return redirect("/dashboard")
flash("Couldn't find crawl server with this id: " + str(server_id), "danger")
return redirect("/dashboard")
print("ERROR: " + name + " indicated that task for " + str(task_result.website_id) +
" was completed but there is no such task in the database.")
print("No such task")
if __name__ == '__main__':

View File

@ -1,4 +1,4 @@
from crawl_server.database import Task
from tasks import Task
from crawl_server.reddit_bot import RedditBot
import praw

View File

@ -1,5 +1,6 @@
import logging
from logging import FileHandler
import sys
from logging import FileHandler, StreamHandler
logger = logging.getLogger("default")
logger.setLevel(logging.DEBUG)
@ -8,3 +9,4 @@ formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
file_handler = FileHandler("crawl_server.log")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(StreamHandler(sys.stdout))

View File

@ -1,145 +0,0 @@
from crawl_server import logger
import os
import json
import sqlite3
class TaskResult:
def __init__(self, status_code=None, file_count=0, start_time=0,
end_time=0, website_id=0, indexed_time=0, server_name=""):
self.status_code = status_code
self.file_count = file_count
self.start_time = start_time
self.end_time = end_time
self.website_id = website_id
self.indexed_time = indexed_time
self.server_name = server_name
def to_json(self):
return {
"status_code": self.status_code,
"file_count": self.file_count,
"start_time": self.start_time,
"end_time": self.end_time,
"website_id": self.website_id,
"indexed_time": self.indexed_time
}
class Task:
def __init__(self, website_id: int, url: str, priority: int = 1,
callback_type: str = None, callback_args: str = None):
self.website_id = website_id
self.url = url
self.priority = priority
self.callback_type = callback_type
self.callback_args = json.loads(callback_args) if callback_args else {}
def to_json(self):
return {
"website_id": self.website_id,
"url": self.url,
"priority": self.priority,
"callback_type": self.callback_type,
"callback_args": json.dumps(self.callback_args)
}
def __str__(self):
return json.dumps(self.to_json())
def __repr__(self):
return self.__str__()
class TaskManagerDatabase:
def __init__(self, db_path):
self.db_path = db_path
if not os.path.exists(db_path):
self.init_database()
logger.info("Initialised database")
def init_database(self):
with open("task_db_init.sql", "r") as f:
init_script = f.read()
with sqlite3.connect(self.db_path) as conn:
conn.executescript(init_script)
conn.commit()
def pop_task(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, website_id, url, priority, callback_type, callback_args"
" FROM Queue ORDER BY priority DESC, Queue.id ASC LIMIT 1")
task = cursor.fetchone()
if task:
cursor.execute("DELETE FROM Queue WHERE id=?", (task[0],))
conn.commit()
return Task(task[1], task[2], task[3], task[4], task[5])
else:
return None
def pop_all_tasks(self):
tasks = self.get_tasks()
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM Queue")
return tasks
def put_task(self, task: Task):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO Queue (website_id, url, priority, callback_type, callback_args) "
"VALUES (?,?,?,?,?)",
(task.website_id, task.url, task.priority,
task.callback_type, json.dumps(task.callback_args)))
conn.commit()
def get_tasks(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT website_id, url, priority, callback_type, callback_args FROM Queue")
tasks = cursor.fetchall()
return [Task(t[0], t[1], t[2], t[3], t[4]) for t in tasks]
def log_result(self, result: TaskResult):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO TaskResult (website_id, status_code, file_count, start_time, end_time) "
"VALUES (?,?,?,?,?)", (result.website_id, result.status_code, result.file_count,
result.start_time, result.end_time))
conn.commit()
def get_non_indexed_results(self):
"""Get a list of new TaskResults since the last call of this method"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT status_code, file_count, start_time, end_time, website_id"
" FROM TaskResult WHERE indexed_time IS NULL")
db_result = cursor.fetchall()
cursor.execute("UPDATE TaskResult SET indexed_time=CURRENT_TIMESTAMP WHERE indexed_time IS NULL")
conn.commit()
return [TaskResult(r[0], r[1], r[2], r[3], r[4]) for r in db_result]

8
crawl_server/run.py Normal file
View File

@ -0,0 +1,8 @@
from crawl_server.task_manager import TaskManager
import time
import config
tm = TaskManager(config.CRAWL_SERVER_PROCESSES)
while True:
time.sleep(1)

View File

@ -1,104 +0,0 @@
from flask import Flask, request, abort, Response, send_file
from flask_httpauth import HTTPTokenAuth
import json
from crawl_server import logger
from crawl_server.task_manager import TaskManager, Task
import os
import config
app = Flask(__name__)
auth = HTTPTokenAuth(scheme="Token")
token = config.CRAWL_SERVER_TOKEN
tm = TaskManager("tm_db.sqlite3", config.CRAWL_SERVER_PROCESSES)
@auth.verify_token
def verify_token(provided_token):
return token == provided_token
@app.route("/task/")
@auth.login_required
def get_tasks():
json_str = json.dumps([task.to_json() for task in tm.get_tasks()])
return Response(json_str, mimetype="application/json")
@app.route("/task/put", methods=["POST"])
@auth.login_required
def task_put():
if request.json:
try:
website_id = request.json["website_id"]
url = request.json["url"]
priority = request.json["priority"]
callback_type = request.json["callback_type"]
callback_args = request.json["callback_args"]
except KeyError as e:
logger.error("Invalid task put request from " + request.remote_addr + " missing key: " + str(e))
return abort(400)
task = Task(website_id, url, priority, callback_type, callback_args)
tm.put_task(task)
logger.info("Submitted new task to queue: " + str(task.to_json()))
return '{"ok": "true"}'
return abort(400)
@app.route("/task/completed", methods=["GET"])
@auth.login_required
def get_completed_tasks():
json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()])
logger.debug("Webserver has requested list of newly completed tasks from " + request.remote_addr)
return Response(json_str, mimetype="application/json")
@app.route("/task/current", methods=["GET"])
@auth.login_required
def get_current_tasks():
current_tasks = tm.get_current_tasks()
logger.debug("Webserver has requested list of current tasks from " + request.remote_addr)
return json.dumps([t.to_json() for t in current_tasks])
@app.route("/file_list/<int:website_id>/")
@auth.login_required
def get_file_list(website_id):
file_name = "./crawled/" + str(website_id) + ".json"
if os.path.exists(file_name):
logger.info("Webserver requested file list of website with id" + str(website_id))
return send_file(file_name)
else:
logger.error("Webserver requested file list of non-existent or empty website with id: " + str(website_id))
return abort(404)
@app.route("/file_list/<int:website_id>/free")
@auth.login_required
def free_file_list(website_id):
file_name = "./crawled/" + str(website_id) + ".json"
if os.path.exists(file_name):
os.remove(file_name)
logger.debug("Webserver indicated that the files for the website with id " +
str(website_id) + " are safe to delete")
return '{"ok": "true"}'
else:
return abort(404)
@app.route("/task/pop_all")
@auth.login_required
def pop_queued_tasks():
json_str = json.dumps([task.to_json() for task in tm.pop_tasks()])
logger.info("Webserver poped all queued tasks")
return Response(json_str, mimetype="application/json")
if __name__ == "__main__":
app.run(port=config.CRAWL_SERVER_PORT, host="0.0.0.0", ssl_context="adhoc")

View File

@ -1,19 +0,0 @@
CREATE TABLE Queue (
id INTEGER PRIMARY KEY,
website_id INTEGER,
url TEXT,
priority INTEGER,
callback_type TEXT,
callback_args TEXT
);
CREATE TABLE TaskResult (
id INTEGER PRIMARY KEY,
website_id INT,
status_code TEXT,
file_count INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
indexed_time TIMESTAMP DEFAULT NULL
);

View File

@ -1,6 +1,8 @@
from crawl_server import logger
from tasks import TaskResult, Task
import config
from crawl_server.database import TaskManagerDatabase, Task, TaskResult
import requests
import json
from multiprocessing import Manager, Pool
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
@ -9,9 +11,7 @@ from crawl_server.crawler import RemoteDirectoryCrawler
class TaskManager:
def __init__(self, db_path, max_processes=2):
self.db_path = db_path
self.db = TaskManagerDatabase(db_path)
def __init__(self, max_processes=2):
self.pool = Pool(maxtasksperchild=1, processes=max_processes)
self.max_processes = max_processes
manager = Manager()
@ -21,41 +21,68 @@ class TaskManager:
scheduler.add_job(self.execute_queued_task, "interval", seconds=1)
scheduler.start()
def put_task(self, task: Task):
self.db.put_task(task)
def fetch_task(self):
try:
payload = {
"token": config.API_TOKEN
}
r = requests.post(config.SERVER_URL + "/task/get", data=payload)
def get_tasks(self):
return self.db.get_tasks()
if r.status_code == 200:
text = r.text
logger.info("Fetched task from server : " + text)
task_json = json.loads(text)
return Task(task_json["website_id"], task_json["url"])
def pop_tasks(self):
return self.db.pop_all_tasks()
return None
def get_current_tasks(self):
return self.current_tasks
except Exception as e:
raise e
def get_non_indexed_results(self):
return self.db.get_non_indexed_results()
@staticmethod
def push_result(task_result: TaskResult):
try:
payload = {
"token": config.API_TOKEN,
"result": json.dumps(task_result.to_json())
}
files = {
# "file_list": open("./crawled/" + str(task_result.website_id) + ".json")
"file_list": open("./local.json")
}
r = requests.post(config.SERVER_URL + "/task/complete", data=payload, files=files)
logger.info("RESPONSE: " + r.text)
except Exception as e:
raise e
def execute_queued_task(self):
if len(self.current_tasks) <= self.max_processes:
task = self.db.pop_task()
task = self.fetch_task()
if task:
logger.info("Submitted " + task.url + " to process pool")
self.current_tasks.append(task)
self.pool.apply_async(
TaskManager.run_task,
args=(task, self.db_path, self.current_tasks),
args=(task, self.current_tasks),
callback=TaskManager.task_complete,
error_callback=TaskManager.task_error
)
@staticmethod
def run_task(task, db_path, current_tasks):
def run_task(task, current_tasks):
result = TaskResult()
result.start_time = datetime.utcnow()
result.start_time = datetime.utcnow().timestamp()
result.website_id = task.website_id
logger.info("Starting task " + task.url)
@ -67,15 +94,10 @@ class TaskManager:
result.file_count = crawl_result.file_count
result.status_code = crawl_result.status_code
result.end_time = datetime.utcnow()
result.end_time = datetime.utcnow().timestamp()
logger.info("End task " + task.url)
# TODO: Figure out the callbacks
# callback = PostCrawlCallbackFactory.get_callback(task)
# if callback:
# callback.run()
return result, db_path, current_tasks
return result, current_tasks
@staticmethod
def task_error(result):
@ -85,14 +107,13 @@ class TaskManager:
@staticmethod
def task_complete(result):
task_result, db_path, current_tasks = result
task_result, current_tasks = result
logger.info("Task completed, logger result to database")
logger.info("Task completed, sending result to server")
logger.info("Status code: " + task_result.status_code)
logger.info("File count: " + str(task_result.file_count))
db = TaskManagerDatabase(db_path)
db.log_result(task_result)
TaskManager.push_result(task_result)
for i, task in enumerate(current_tasks):
if task.website_id == task_result.website_id:

View File

@ -1,13 +1,11 @@
import sqlite3
import json
import datetime
from collections import defaultdict
from urllib.parse import urlparse
import os
import bcrypt
import uuid
import task
from crawl_server.database import TaskResult
import tasks
class InvalidQueryException(Exception):
pass
@ -29,11 +27,37 @@ class Website:
self.id = website_id
class ApiToken:
class ApiClient:
def __init__(self, token, description):
def __init__(self, token, name):
self.token = token
self.description = description
self.name = name
class Task:
def __init__(self, website_id: int, url: str, priority: int = 1,
callback_type: str = None, callback_args: str = None):
self.website_id = website_id
self.url = url
self.priority = priority
self.callback_type = callback_type
self.callback_args = json.loads(callback_args) if callback_args else {}
def to_json(self):
return {
"website_id": self.website_id,
"url": self.url,
"priority": self.priority,
"callback_type": self.callback_type,
"callback_args": json.dumps(self.callback_args)
}
def __str__(self):
return json.dumps(self.to_json())
def __repr__(self):
return self.__str__()
class Database:
@ -171,21 +195,22 @@ class Database:
cursor.execute("INSERT INTO Admin (username, password) VALUES (?,?)", (username, hashed_pw))
conn.commit()
def check_api_token(self, token) -> bool:
def check_api_token(self, token) -> str:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT token FROM ApiToken WHERE token=?", (token, ))
return cursor.fetchone() is not None
cursor.execute("SELECT name FROM ApiClient WHERE token=?", (token, ))
result = cursor.fetchone()
return result[0] if result else None
def generate_api_token(self, description: str) -> str:
def generate_api_token(self, name: str) -> str:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
token = str(uuid.uuid4())
cursor.execute("INSERT INTO ApiToken (token, description) VALUES (?, ?)", (token, description))
cursor.execute("INSERT INTO ApiClient (token, name) VALUES (?, ?)", (token, name))
conn.commit()
return token
@ -195,16 +220,16 @@ class Database:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM ApiToken")
cursor.execute("SELECT token, name FROM ApiClient")
return [ApiToken(x[0], x[1]) for x in cursor.fetchall()]
return [ApiClient(x[0], x[1]) for x in cursor.fetchall()]
def delete_token(self, token: str) -> None:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM ApiToken WHERE token=?", (token, ))
cursor.execute("DELETE FROM ApiClient WHERE token=?", (token, ))
conn.commit()
def get_all_websites(self) -> dict:
@ -289,41 +314,7 @@ class Database:
cursor.execute("SELECT * FROM BlacklistedWebsite")
return [BlacklistedWebsite(r[0], r[1]) for r in cursor.fetchall()]
def add_crawl_server(self, server: task.CrawlServer):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO CrawlServer (url, name, slots, token) VALUES (?,?,?,?)",
(server.url, server.name, server.slots, server.token))
conn.commit()
def remove_crawl_server(self, server_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM CrawlServer WHERE id=?", (server_id, ))
conn.commit()
def get_crawl_servers(self) -> list:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT url, name, slots, token, id FROM CrawlServer")
return [task.CrawlServer(r[0], r[1], r[2], r[3], r[4]) for r in cursor.fetchall()]
def update_crawl_server(self, server_id, url, name, slots):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("UPDATE CrawlServer SET url=?, name=?, slots=? WHERE id=?", (url, name, slots, server_id))
conn.commit()
def log_result(self, result: TaskResult):
def log_result(self, result):
with sqlite3.connect(self.db_path) as conn:
@ -338,29 +329,27 @@ class Database:
def get_crawl_logs(self):
with sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) as conn:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT website_id, status_code, file_count, start_time, end_time, indexed_time, S.name "
"FROM TaskResult INNER JOIN CrawlServer S on TaskResult.server = S.id "
"ORDER BY end_time DESC")
return [TaskResult(r[1], r[2], r[3].timestamp(), r[4].timestamp(),
r[0], r[5].timestamp() if r[5] else None, r[6]) for r in cursor.fetchall()]
cursor.execute("SELECT website_id, status_code, file_count, start_time, end_time, server "
"FROM TaskResult ORDER BY end_time DESC")
return [tasks.TaskResult(r[1], r[2], r[3], r[4], r[0], r[5]) for r in cursor.fetchall()]
def get_stats_by_server(self):
def get_stats_by_crawler(self):
stats = dict()
task_results = self.get_crawl_logs()
for server in self.get_crawl_servers():
task_count = sum(1 for result in task_results if result.server_name == server.name)
for crawler in self.get_tokens():
task_count = sum(1 for result in task_results if result.server_name == crawler.name)
if task_count > 0:
stats[server.name] = dict()
stats[server.name]["file_count"] = sum(result.file_count for result in task_results if result.server_name == server.name)
stats[server.name]["time"] = sum((result.end_time - result.start_time) for result in task_results if result.server_name == server.name)
stats[server.name]["task_count"] = task_count
stats[server.name]["time_avg"] = stats[server.name]["time"] / task_count
stats[server.name]["file_count_avg"] = stats[server.name]["file_count"] / task_count
stats[crawler.name] = dict()
stats[crawler.name]["file_count"] = sum(result.file_count for result in task_results if result.server_name == crawler.name)
stats[crawler.name]["time"] = sum((result.end_time - result.start_time) for result in task_results if result.server_name == crawler.name)
stats[crawler.name]["task_count"] = task_count
stats[crawler.name]["time_avg"] = stats[crawler.name]["time"] / task_count
stats[crawler.name]["file_count_avg"] = stats[crawler.name]["file_count"] / task_count
return stats
@ -374,8 +363,61 @@ class Database:
conn.commit()
def put_task(self, task: Task) -> None:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO Queue (website_id, url, priority, callback_type, callback_args) "
"VALUES (?,?,?,?,?)",
(task.website_id, task.url, task.priority,
task.callback_type, json.dumps(task.callback_args)))
conn.commit()
def get_tasks(self) -> list:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT website_id, url, priority, callback_type, callback_args FROM Queue "
"WHERE assigned_crawler is NULL ")
db_tasks = cursor.fetchall()
return [Task(t[0], t[1], t[2], t[3], t[4]) for t in db_tasks]
def pop_task(self, name) -> Task:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, website_id, url, priority, callback_type, callback_args "
"FROM Queue WHERE assigned_crawler is NULL "
"ORDER BY priority DESC, Queue.id ASC LIMIT 1")
task = cursor.fetchone()
if task:
cursor.execute("UPDATE Queue SET assigned_crawler=? WHERE id=?", (name, task[0],))
conn.commit()
return Task(task[1], task[2], task[3], task[4], task[5])
else:
return None
def complete_task(self, website_id: int, name: str) -> Task:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, website_id, url, priority, callback_type, callback_args FROM "
"Queue WHERE website_id=? AND assigned_crawler=?", (website_id, name))
task = cursor.fetchone()
if task:
cursor.execute("DELETE FROM Queue WHERE website_id=? AND assigned_crawler=?", (website_id, name))
conn.commit()
return Task(task[1], task[2], task[3], task[4], task[5])
else:
return None

View File

@ -14,27 +14,14 @@ CREATE TABLE Admin (
password TEXT
);
CREATE TABLE ApiToken (
token TEXT PRIMARY KEY NOT NULL,
description TEXT
);
CREATE TABLE BlacklistedWebsite (
id INTEGER PRIMARY KEY NOT NULL,
url TEXT
);
CREATE TABLE CrawlServer (
id INTEGER PRIMARY KEY NOT NULL,
url TEXT,
name TEXT,
token TEXT,
slots INTEGER
);
CREATE TABLE TaskResult (
id INTEGER PRIMARY KEY,
server INT,
server TEXT,
website_id INT,
status_code TEXT,
file_count INT,
@ -42,7 +29,12 @@ CREATE TABLE TaskResult (
end_time TIMESTAMP,
indexed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (server) REFERENCES CrawlServer(id)
FOREIGN KEY (server) REFERENCES ApiClient(name)
);
CREATE TABLE ApiClient (
name TEXT PRIMARY KEY NOT NULL,
token TEXT NOT NULL
);
@ -55,3 +47,15 @@ CREATE TABLE SearchLogEntry (
extensions TEXT,
page INT
);
CREATE TABLE Queue (
id INTEGER PRIMARY KEY,
website_id INTEGER,
url TEXT,
priority INTEGER,
callback_type TEXT,
callback_args TEXT,
assigned_crawler TEXT NULL DEFAULT NULL,
FOREIGN KEY (assigned_crawler) REFERENCES ApiClient(name)
);

View File

@ -115,7 +115,7 @@ class ElasticSearchEngine(SearchEngine):
def import_json(self, in_lines, website_id: int):
import_every = 1000
cooldown_time = 0.5
cooldown_time = 1
docs = []

View File

@ -91,8 +91,8 @@ def make_wide_filesystem(count=100000):
os.mkdir(new_path)
# dump_local_filesystem("/mnt/")
index_file_list("local_filesystem.json", 4)
dump_local_filesystem("/mnt/")
# index_file_list("local_filesystem.json", 4)
# random_searches(100000)
# dump_random_files(20000 * 100000)
# make_wide_filesystem(10000)

237
task.py
View File

@ -1,237 +0,0 @@
from apscheduler.schedulers.background import BackgroundScheduler
from search.search import ElasticSearchEngine
from crawl_server.database import Task, TaskResult
import requests
from requests.exceptions import ConnectionError, ReadTimeout
import json
import database
from concurrent.futures import ThreadPoolExecutor
import urllib3
urllib3.disable_warnings()
class CrawlServer:
def __init__(self, url, name, slots, token, server_id=None):
self.url = url
self.name = name
self.slots = slots
self.used_slots = 0
self.token = token
self.id = server_id
def _generate_headers(self):
return {
"Content-Type": "application/json",
"Authorization": "Token " + self.token,
}
def queue_task(self, task: Task) -> bool:
print("Sending task to crawl server " + self.url)
try:
payload = json.dumps(task.to_json())
r = requests.post(self.url + "/task/put", headers=self._generate_headers(), data=payload, verify=False,
timeout=20)
print(r) # TODO: If the task could not be added, fallback to another server
return r.status_code == 200
except (ConnectionError, ReadTimeout):
return False
def pop_completed_tasks(self) -> list:
try:
r = requests.get(self.url + "/task/completed", headers=self._generate_headers(), verify=False, timeout=15)
if r.status_code != 200:
print("Problem while fetching completed tasks for '" + self.name + "': " + str(r.status_code))
print(r.text)
return []
return [
TaskResult(r["status_code"], r["file_count"], r["start_time"], r["end_time"], r["website_id"])
for r in json.loads(r.text)]
except (ConnectionError, ReadTimeout):
print("Crawl server cannot be reached @ " + self.url)
return []
def fetch_queued_tasks(self):
try:
r = requests.get(self.url + "/task/", headers=self._generate_headers(), verify=False, timeout=15)
if r.status_code != 200:
print("Problem while fetching queued tasks for '" + self.name + "' " + str(r.status_code))
print(r.text)
return None
return [
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
for t in json.loads(r.text)
]
except (ConnectionError, ReadTimeout):
return None
def fetch_current_tasks(self):
try:
r = requests.get(self.url + "/task/current", headers=self._generate_headers(), verify=False, timeout=10)
if r.status_code != 200:
print("Problem while fetching current tasks for '" + self.name + "' " + str(r.status_code))
print(r.text)
return None
return [
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
for t in json.loads(r.text)
]
except (ConnectionError, ReadTimeout):
return None
def fetch_website_files(self, website_id) -> str:
try:
r = requests.get(self.url + "/file_list/" + str(website_id) + "/", stream=True,
headers=self._generate_headers(), verify=False)
if r.status_code != 200:
print("Problem while fetching website files for '" + self.name + "': " + str(r.status_code))
print(r.text)
return ""
for line in r.iter_lines(chunk_size=1024 * 256):
yield line
except (ConnectionError, ReadTimeout):
return ""
def free_website_files(self, website_id) -> bool:
try:
r = requests.get(self.url + "/file_list/" + str(website_id) + "/free", headers=self._generate_headers(),
verify=False)
return r.status_code == 200
except (ConnectionError, ReadTimeout) as e:
print(e)
return False
def pop_queued_tasks(self):
try:
r = requests.get(self.url + "/task/pop_all", headers=self._generate_headers(), verify=False)
if r.status_code != 200:
print("Problem while popping tasks for '" + self.name + "': " + str(r.status_code))
print(r.text)
return [
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
for t in json.loads(r.text)
]
except (ConnectionError, ReadTimeout):
return []
class TaskDispatcher:
def __init__(self):
scheduler = BackgroundScheduler()
scheduler.add_job(self.check_completed_tasks, "interval", seconds=10)
scheduler.start()
self.search = ElasticSearchEngine("od-database")
self.db = database.Database("db.sqlite3")
def check_completed_tasks(self):
for server in self.db.get_crawl_servers():
for task in server.pop_completed_tasks():
print("Completed task")
task.server_id = server.id
if task.file_count:
# All files are overwritten
self.search.delete_docs(task.website_id)
file_list = server.fetch_website_files(task.website_id)
self.search.import_json(file_list, task.website_id)
# File list is safe to delete once indexed
server.free_website_files(task.website_id)
# Update last_modified date for website
self.db.update_website_date_if_exists(task.website_id)
self.db.log_result(task)
def dispatch_task(self, task: Task):
self._get_available_crawl_server().queue_task(task)
def _get_available_crawl_server(self) -> CrawlServer:
queued_tasks_by_server = self._get_queued_tasks_by_server()
server_with_most_free_slots = None
most_free_slots = -10000
for server in queued_tasks_by_server:
free_slots = server.slots - len(queued_tasks_by_server[server])
if free_slots > most_free_slots:
server_with_most_free_slots = server
most_free_slots = free_slots
print("Dispatching task to '" +
server_with_most_free_slots.name + "' " +
str(most_free_slots) + " free out of " + str(server_with_most_free_slots.slots))
return server_with_most_free_slots
def get_queued_tasks(self):
queued_tasks_by_server = self._get_queued_tasks_by_server()
for queued_tasks in queued_tasks_by_server.values():
for task in queued_tasks:
yield task
def _get_queued_tasks_by_server(self) -> dict:
queued_tasks = dict()
pool = ThreadPoolExecutor(max_workers=10)
crawl_servers = self.db.get_crawl_servers()
responses = list(pool.map(lambda s: s.fetch_queued_tasks(), crawl_servers))
pool.shutdown()
for i, server in enumerate(crawl_servers):
if responses[i] is not None:
queued_tasks[server] = responses[i]
return queued_tasks
def get_current_tasks(self):
current_tasks_by_server = self._get_current_tasks_by_server()
for current_tasks in current_tasks_by_server.values():
for task in current_tasks:
yield task
def _get_current_tasks_by_server(self) -> dict:
current_tasks = dict()
pool = ThreadPoolExecutor(max_workers=10)
crawl_servers = self.db.get_crawl_servers()
responses = list(pool.map(lambda s: s.fetch_current_tasks(), crawl_servers))
pool.shutdown()
for i, server in enumerate(crawl_servers):
if responses[i] is not None:
current_tasks[server] = responses[i]
return current_tasks
def redispatch_queued(self) -> int:
counter = 0
for server in self.db.get_crawl_servers():
for task in server.pop_queued_tasks():
self.dispatch_task(task)
counter += 1
return counter

90
tasks.py Normal file
View File

@ -0,0 +1,90 @@
from apscheduler.schedulers.background import BackgroundScheduler
from werkzeug.datastructures import FileStorage
from search.search import ElasticSearchEngine
import json
import database
import urllib3
urllib3.disable_warnings()
class Task:
def __init__(self, website_id: int, url: str, priority: int = 1,
callback_type: str = None, callback_args: str = None):
self.website_id = website_id
self.url = url
self.priority = priority
self.callback_type = callback_type
self.callback_args = json.loads(callback_args) if callback_args else {}
def to_json(self):
return {
"website_id": self.website_id,
"url": self.url,
"priority": self.priority,
"callback_type": self.callback_type,
"callback_args": json.dumps(self.callback_args)
}
def __str__(self):
return json.dumps(self.to_json())
def __repr__(self):
return self.__str__()
class TaskResult:
def __init__(self, status_code=None, file_count=0, start_time=0,
end_time=0, website_id=0, server_name=""):
self.status_code = status_code
self.file_count = file_count
self.start_time = start_time
self.end_time = end_time
self.website_id = website_id
self.server_name = server_name
def to_json(self):
return {
"status_code": self.status_code,
"file_count": self.file_count,
"start_time": self.start_time,
"end_time": self.end_time,
"website_id": self.website_id
}
class TaskManager:
def __init__(self):
self.search = ElasticSearchEngine("od-database")
self.db = database.Database("db.sqlite3")
def complete_task(self, file_list, task, task_result, crawler_name):
if file_list:
self.search.delete_docs(task_result.website_id)
def iter_lines():
with open(file_list, "r") as f:
line = f.readline()
while line:
yield line
line = f.readline()
self.search.import_json(iter_lines(), task.website_id)
self.db.update_website_date_if_exists(task.website_id)
task_result.server_id = crawler_name
self.db.log_result(task_result)
def queue_task(self, task: Task):
self.db.put_task(task)
print("Queued task and made it available to crawlers: " + str(task.website_id))
def get_queued_tasks(self) -> list:
return self.db.get_tasks()

View File

@ -7,14 +7,13 @@
<table class="table table-striped">
<thead>
<tr>
<th>Server</th>
<th>Crawler</th>
<th>Website</th>
<th>Status code</th>
<th>File count</th>
<th>Start</th>
<th>End</th>
<th>Delta</th>
<th>Index</th>
</tr>
</thead>
@ -25,10 +24,9 @@
<td><a href="/website/{{ task_result.website_id }}/">#{{ task_result.website_id }}</a></td>
<td>{{ task_result.status_code }}</td>
<td>{{ task_result.file_count }}</td>
<td>{{ task_result.start_time | datetime_format }}</td>
<td>{{ task_result.end_time | datetime_format }}</td>
<td>{{ task_result.start_time | int | datetime_format }}</td>
<td>{{ task_result.end_time | int | datetime_format }}</td>
<td>{{ ((task_result.end_time - task_result.start_time)) | int }} sec</td>
<td>{{ task_result.indexed_time | datetime_format }}</td>
</tr>
{% endfor %}
</tbody>

View File

@ -9,55 +9,13 @@
<a href="/logs">Logs</a>
<br>
<hr>
<h3>Crawl servers</h3>
<table class="table table-striped">
<thead>
<tr>
<th>Url</th>
<th>Name</th>
<th>Slots</th>
<th>Action</th>
</tr>
</thead>
<tbody>
{% for server in crawl_servers %}
<tr>
<td>{{ server.url }}</td>
<td>{{ server.name }}</td>
<td id="slots-{{ server.id }}" onclick="changeSlots({{ server.id }})">{{ server.slots }}</td>
<td><a class="btn btn-danger" href="/crawl_server/{{ server.id }}/delete">Delete</a></td>
</tr>
{% endfor %}
</tbody>
</table>
<form action="/crawl_server/add" method="post">
<div class="form-row">
<div class="col col-md-3">
<input class="form-control" name="url" placeholder="Url">
</div>
<div class="col col-md-3">
<input class="form-control" name="name" placeholder="Name">
</div>
<div class="col col-md-2">
<input class="form-control" name="token" placeholder="Token">
</div>
<div class="col col-md-2">
<input class="form-control" name="slots" placeholder="Slots" type="number">
</div>
<div class="col col-md-2">
<input type="submit" class="form-control btn btn-primary" value="Add server">
</div>
</div>
</form>
<br>
<hr>
<h3>API Keys</h3>
<table class="table table-striped">
<thead>
<tr>
<th>Description</th>
<th>Key</th>
<th>Name</th>
<th>Token</th>
<th>Action</th>
</tr>
</thead>
@ -65,7 +23,7 @@
<tbody>
{% for token in api_tokens %}
<tr>
<td>{{ token.description }}</td>
<td>{{ token.name }}</td>
<td><code>{{ token.token }}</code></td>
<td>
<form action="/del_token" method="post">
@ -122,7 +80,8 @@
<hr>
<h3>Misc actions</h3>
<a class="btn btn-danger" href="/website/delete_empty">Delete websites with no associated files that are not queued</a>
<a class="btn btn-danger" href="/website/delete_empty">Delete websites with no associated files that are
not queued</a>
<a class="btn btn-danger" href="/website/redispatch_queued">Re-dispatch queued tasks</a>
<a class="btn btn-danger" href="/website/queue_empty">Re-queue websites with no associated files</a>

View File

@ -11,9 +11,6 @@
{% if stats and stats["total_size"] %}
<p class="lead">{{ stats["total_count"] }} files totalling
~{{ stats["total_size"] | filesizeformat }} from {{ stats["website_count"] }} websites</p>
{% if current_websites %}
<p>Currently indexing <code>{{ current_websites }}</code><span class="vim-caret">&nbsp;</span> </p>
{% endif %}
{% else %}
<p class="lead">We're currently experiencing a high volume of traffic. The search function
may be unresponsive.</p>

View File

@ -100,13 +100,13 @@
{% endfor %}
</tr>
<tr>
<th>File crawled</th>
<th>Files crawled</th>
{% for server in crawl_server_stats %}
<td>{{ crawl_server_stats[server].file_count }}</td>
{% endfor %}
</tr>
<tr>
<th>File crawled average</th>
<th>Files crawled average</th>
{% for server in crawl_server_stats %}
<td>{{ crawl_server_stats[server].file_count_avg | round(2) }} per task</td>
{% endfor %}

1
tmp/README.md Normal file
View File

@ -0,0 +1 @@
Files currently being indexing goes here