Task logs now stored on main server

This commit is contained in:
Simon 2018-06-24 20:32:02 -04:00
parent 059d9fd366
commit 5fd00f22af
10 changed files with 77 additions and 98 deletions

4
app.py
View File

@ -50,7 +50,7 @@ def downloads():
@app.route("/stats") @app.route("/stats")
def stats_page(): def stats_page():
crawl_server_stats = taskDispatcher.get_stats_by_server() crawl_server_stats = db.get_stats_by_server()
return render_template("stats.html", crawl_server_stats=crawl_server_stats) return render_template("stats.html", crawl_server_stats=crawl_server_stats)
@ -444,7 +444,7 @@ def admin_del_token():
def admin_crawl_logs(): def admin_crawl_logs():
if "username" in session: if "username" in session:
results = taskDispatcher.get_task_logs_by_server() results = db.get_crawl_logs()
return render_template("crawl_logs.html", logs=results) return render_template("crawl_logs.html", logs=results)
else: else:

View File

@ -5,13 +5,15 @@ import sqlite3
class TaskResult: class TaskResult:
def __init__(self, status_code=None, file_count=0, start_time=0, end_time=0, website_id=0, indexed_time=0): def __init__(self, status_code=None, file_count=0, start_time=0,
end_time=0, website_id=0, indexed_time=0, server_name=""):
self.status_code = status_code self.status_code = status_code
self.file_count = file_count self.file_count = file_count
self.start_time = start_time self.start_time = start_time
self.end_time = end_time self.end_time = end_time
self.website_id = website_id self.website_id = website_id
self.indexed_time = indexed_time self.indexed_time = indexed_time
self.server_name = server_name
def to_json(self): def to_json(self):
return { return {
@ -139,11 +141,3 @@ class TaskManagerDatabase:
return [TaskResult(r[0], r[1], r[2], r[3], r[4]) for r in db_result] return [TaskResult(r[0], r[1], r[2], r[3], r[4]) for r in db_result]
def get_all_results(self):
with sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) as conn:
cursor = conn.cursor()
cursor.execute("SELECT website_id, status_code, file_count, start_time, end_time, indexed_time "
"FROM TaskResult ORDER BY id ASC")
return [TaskResult(r[1], r[2], r[3].timestamp(), r[4].timestamp(), r[0], r[5].timestamp() if r[5] else None) for r in cursor.fetchall()]

View File

@ -98,12 +98,5 @@ def pop_queued_tasks():
return Response(json_str, mimetype="application/json") return Response(json_str, mimetype="application/json")
@app.route("/stats/")
@auth.login_required
def get_stats():
json_str = json.dumps(tm.get_stats())
return Response(json_str, mimetype="application/json")
if __name__ == "__main__": if __name__ == "__main__":
app.run(port=config.CRAWL_SERVER_PORT, host="0.0.0.0", ssl_context="adhoc") app.run(port=config.CRAWL_SERVER_PORT, host="0.0.0.0", ssl_context="adhoc")

View File

@ -36,9 +36,6 @@ class TaskManager:
def get_non_indexed_results(self): def get_non_indexed_results(self):
return self.db.get_non_indexed_results() return self.db.get_non_indexed_results()
def get_all_results(self):
return self.db.get_all_results()
def execute_queued_task(self): def execute_queued_task(self):
if len(self.current_tasks) <= self.max_processes: if len(self.current_tasks) <= self.max_processes:
@ -103,20 +100,4 @@ class TaskManager:
if task.website_id == task_result.website_id: if task.website_id == task_result.website_id:
del current_tasks[i] del current_tasks[i]
def get_stats(self):
task_results = self.get_all_results()
stats = dict()
if len(task_results) > 0:
stats["task_count"] = len(task_results)
stats["task_time"] = sum((task.end_time - task.start_time) for task in task_results)
stats["task_time_avg"] = stats["task_time"] / len(task_results)
stats["task_file_count"] = sum(task.file_count for task in task_results)
stats["task_file_count_avg"] = stats["task_file_count"] / len(task_results)
return stats

View File

@ -1,10 +1,12 @@
import sqlite3 import sqlite3
import datetime import datetime
from collections import defaultdict
from urllib.parse import urlparse from urllib.parse import urlparse
import os import os
import bcrypt import bcrypt
import uuid import uuid
import task import task
from crawl_server.database import TaskResult
class InvalidQueryException(Exception): class InvalidQueryException(Exception):
@ -312,6 +314,46 @@ class Database:
cursor.execute("UPDATE CrawlServer SET url=?, name=?, slots=? WHERE id=?", (url, name, slots, server_id)) cursor.execute("UPDATE CrawlServer SET url=?, name=?, slots=? WHERE id=?", (url, name, slots, server_id))
conn.commit() conn.commit()
def log_result(self, result: TaskResult):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO TaskResult "
"(server, website_id, status_code, file_count, start_time, end_time) "
"VALUES (?,?,?,?,?,?)",
(result.server_id, result.website_id, result.status_code,
result.file_count, result.start_time, result.end_time))
conn.commit()
def get_crawl_logs(self):
with sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) as conn:
cursor = conn.cursor()
cursor.execute("SELECT website_id, status_code, file_count, start_time, end_time, indexed_time, S.name "
"FROM TaskResult INNER JOIN CrawlServer S on TaskResult.server = S.id "
"ORDER BY end_time DESC")
return [TaskResult(r[1], r[2], r[3].timestamp(), r[4].timestamp(),
r[0], r[5].timestamp() if r[5] else None, r[6]) for r in cursor.fetchall()]
def get_stats_by_server(self):
stats = dict()
task_results = self.get_crawl_logs()
for server in self.get_crawl_servers():
task_count = sum(1 for result in task_results if result.server_name == server.name)
if task_count > 0:
stats[server.name] = dict()
stats[server.name]["file_count"] = sum(result.file_count for result in task_results if result.server_name == server.name)
stats[server.name]["time"] = sum((result.end_time - result.start_time) for result in task_results if result.server_name == server.name)
stats[server.name]["task_count"] = task_count
stats[server.name]["time_avg"] = stats[server.name]["time"] / task_count
stats[server.name]["file_count_avg"] = stats[server.name]["file_count"] / task_count
return stats

View File

@ -30,4 +30,17 @@ CREATE TABLE CrawlServer (
name TEXT, name TEXT,
token TEXT, token TEXT,
slots INTEGER slots INTEGER
) );
CREATE TABLE TaskResult (
id INTEGER PRIMARY KEY,
server INT,
website_id INT,
status_code TEXT,
file_count INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
indexed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (server) REFERENCES CrawlServer(id)
);

59
task.py
View File

@ -38,7 +38,7 @@ class CrawlServer:
except ConnectionError: except ConnectionError:
return False return False
def fetch_completed_tasks(self) -> list: def pop_completed_tasks(self) -> list:
try: try:
r = requests.get(self.url + "/task/completed", headers=self._generate_headers(), verify=False) r = requests.get(self.url + "/task/completed", headers=self._generate_headers(), verify=False)
@ -113,36 +113,6 @@ class CrawlServer:
print(e) print(e)
return False return False
def fetch_crawl_logs(self):
try:
r = requests.get(self.url + "/task/logs/", headers=self._generate_headers(), verify=False)
if r.status_code != 200:
print("Problem while fetching crawl logs for '" + self.name + "': " + str(r.status_code))
print(r.text)
return []
return [
TaskResult(r["status_code"], r["file_count"], r["start_time"],
r["end_time"], r["website_id"], r["indexed_time"])
for r in json.loads(r.text)]
except ConnectionError:
return []
def fetch_stats(self):
try:
r = requests.get(self.url + "/stats/", headers=self._generate_headers(), verify=False)
if r.status_code != 200:
print("Problem while fetching stats for '" + self.name + "': " + str(r.status_code))
print(r.text)
return []
return json.loads(r.text)
except ConnectionError:
return {}
def pop_queued_tasks(self): def pop_queued_tasks(self):
try: try:
r = requests.get(self.url + "/task/pop_all", headers=self._generate_headers(), verify=False) r = requests.get(self.url + "/task/pop_all", headers=self._generate_headers(), verify=False)
@ -172,8 +142,11 @@ class TaskDispatcher:
def check_completed_tasks(self): def check_completed_tasks(self):
for server in self.db.get_crawl_servers(): for server in self.db.get_crawl_servers():
for task in server.fetch_completed_tasks(): for task in server.pop_completed_tasks():
print("Completed task") print("Completed task")
task.server_id = server.id
if task.file_count: if task.file_count:
# All files are overwritten # All files are overwritten
self.search.delete_docs(task.website_id) self.search.delete_docs(task.website_id)
@ -185,6 +158,8 @@ class TaskDispatcher:
# Update last_modified date for website # Update last_modified date for website
self.db.update_website_date_if_exists(task.website_id) self.db.update_website_date_if_exists(task.website_id)
self.db.log_result(task)
def dispatch_task(self, task: Task): def dispatch_task(self, task: Task):
self._get_available_crawl_server().queue_task(task) self._get_available_crawl_server().queue_task(task)
@ -248,26 +223,6 @@ class TaskDispatcher:
return current_tasks return current_tasks
def get_task_logs_by_server(self) -> dict:
task_logs = dict()
for server in self.db.get_crawl_servers():
task_logs[server.name] = server.fetch_crawl_logs()
return task_logs
def get_stats_by_server(self) -> dict:
stats = dict()
for server in self.db.get_crawl_servers():
server_stats = server.fetch_stats()
if server_stats:
stats[server.name] = server_stats
return stats
def redispatch_queued(self) -> int: def redispatch_queued(self) -> int:
counter = 0 counter = 0

View File

@ -19,10 +19,9 @@
</thead> </thead>
<tbody> <tbody>
{% for server in logs %} {% for task_result in logs %}
{% for task_result in logs[server] %}
<tr> <tr>
<td>{{ server }}</td> <td>{{ task_result.server_name }}</td>
<td><a href="/website/{{ task_result.website_id }}/">#{{ task_result.website_id }}</a></td> <td><a href="/website/{{ task_result.website_id }}/">#{{ task_result.website_id }}</a></td>
<td>{{ task_result.status_code }}</td> <td>{{ task_result.status_code }}</td>
<td>{{ task_result.file_count }}</td> <td>{{ task_result.file_count }}</td>
@ -32,7 +31,6 @@
<td>{{ task_result.indexed_time | datetime_format }}</td> <td>{{ task_result.indexed_time | datetime_format }}</td>
</tr> </tr>
{% endfor %} {% endfor %}
{% endfor %}
</tbody> </tbody>
</table> </table>

View File

@ -7,6 +7,9 @@
<div class="card-header">Dashboard</div> <div class="card-header">Dashboard</div>
<div class="card-body"> <div class="card-body">
<a href="/logs">Logs</a>
<br>
<hr>
<h3>Crawl servers</h3> <h3>Crawl servers</h3>
<table class="table table-striped"> <table class="table table-striped">
<thead> <thead>

View File

@ -90,25 +90,25 @@
<tr> <tr>
<th>Crawl time</th> <th>Crawl time</th>
{% for server in crawl_server_stats %} {% for server in crawl_server_stats %}
<td>{{ crawl_server_stats[server].task_time|round(2) }}s</td> <td>{{ crawl_server_stats[server].time|round(2) }}s</td>
{% endfor %} {% endfor %}
</tr> </tr>
<tr> <tr>
<th>Crawl time average</th> <th>Crawl time average</th>
{% for server in crawl_server_stats %} {% for server in crawl_server_stats %}
<td>{{ crawl_server_stats[server].task_time_avg|round(2) }}s per task</td> <td>{{ crawl_server_stats[server].time_avg|round(2) }}s per task</td>
{% endfor %} {% endfor %}
</tr> </tr>
<tr> <tr>
<th>File crawled</th> <th>File crawled</th>
{% for server in crawl_server_stats %} {% for server in crawl_server_stats %}
<td>{{ crawl_server_stats[server].task_file_count }}</td> <td>{{ crawl_server_stats[server].file_count }}</td>
{% endfor %} {% endfor %}
</tr> </tr>
<tr> <tr>
<th>File crawled average</th> <th>File crawled average</th>
{% for server in crawl_server_stats %} {% for server in crawl_server_stats %}
<td>{{ crawl_server_stats[server].task_file_count_avg | round(2) }} per task</td> <td>{{ crawl_server_stats[server].file_count_avg | round(2) }} per task</td>
{% endfor %} {% endfor %}
</tr> </tr>
</tbody> </tbody>