Overwrite document on re-index, update website last_modified on task complete, delete website files on index complete

This commit is contained in:
Simon 2018-06-19 11:24:28 -04:00
parent 8486555426
commit e54609972c
5 changed files with 59 additions and 4 deletions

View File

@ -72,6 +72,17 @@ def get_file_list(website_id):
return abort(404)
@app.route("/file_list/<int:website_id>/free")
@auth.login_required
def free_file_list(website_id):
file_name = "./crawled/" + str(website_id) + ".json"
if os.path.exists(file_name):
os.remove(file_name)
return '{"ok": "true"}'
else:
return abort(404)
@app.route("/task/logs/")
@auth.login_required
def get_task_logs():

View File

@ -45,6 +45,14 @@ class Database:
conn.executescript(init_script)
conn.commit()
def update_website_date_if_exists(self, website_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("UPDATE Website SET last_modified=CURRENT_TIMESTAMP WHERE id=?", (website_id, ))
conn.commit()
def insert_website(self, website: Website):
with sqlite3.connect(self.db_path) as conn:

View File

@ -3,9 +3,9 @@ import json
payload = json.dumps({
"website_id": 123,
"url": "ftp://132.249.213.137",
# "url": "http://localhost:8000/",
"website_id": 3,
# "url": "ftp://132.249.213.137",
"url": "http://localhost:8000/",
# "url": "http://ubuntu.mirrorservice.org/",
"priority": 2,
"callback_type": "",

View File

@ -91,6 +91,22 @@ class ElasticSearchEngine(SearchEngine):
def ping(self):
return self.es.ping()
def delete_docs(self, website_id):
try:
print("Deleting docs of " + str(website_id))
self.es.delete_by_query(body={
"query": {
"constant_score": {
"filter": {
"term": {"website_id": website_id}
}
}
}
}, index=self.index_name)
except elasticsearch.exceptions.ConflictError:
print("Error: multiple delete tasks at the same time")
def import_json(self, in_lines, website_id: int):
import_every = 5000
@ -270,7 +286,8 @@ class ElasticSearchEngine(SearchEngine):
stats["es_index_size"] = es_stats["indices"][self.index_name]["total"]["store"]["size_in_bytes"]
stats["es_search_count"] = es_stats["indices"][self.index_name]["total"]["search"]["query_total"]
stats["es_search_time"] = es_stats["indices"][self.index_name]["total"]["search"]["query_time_in_millis"]
stats["es_search_time_avg"] = stats["es_search_time"] / (stats["es_search_count"] if stats["es_search_count"] != 0 else 1)
stats["es_search_time_avg"] = stats["es_search_time"] / (
stats["es_search_count"] if stats["es_search_count"] != 0 else 1)
stats["total_count"] = es_stats["indices"][self.index_name]["total"]["indexing"]["index_total"]
stats["total_count_nonzero"] = total_stats["hits"]["total"]
stats["total_size"] = total_stats["aggregations"]["file_stats"]["sum"]

19
task.py
View File

@ -5,6 +5,7 @@ import requests
from requests.exceptions import ConnectionError
import json
import config
from database import Database
class CrawlServer:
@ -71,6 +72,15 @@ class CrawlServer:
except ConnectionError:
return ""
def free_website_files(self, website_id) -> bool:
try:
r = requests.get(self.url + "/file_list/" + str(website_id) + "/free", headers=CrawlServer.headers)
return r.status_code == 200
except ConnectionError as e:
print(e)
return False
def fetch_crawl_logs(self):
try:
@ -97,6 +107,7 @@ class TaskDispatcher:
scheduler.start()
self.search = ElasticSearchEngine("od-database")
self.db = Database("db.sqlite3")
# TODO load from config
self.crawl_servers = [
@ -108,10 +119,18 @@ class TaskDispatcher:
for server in self.crawl_servers:
for task in server.fetch_completed_tasks():
print("Completed task")
# All files are overwritten
self.search.delete_docs(task.website_id)
file_list = server.fetch_website_files(task.website_id)
if file_list:
self.search.import_json(file_list, task.website_id)
# Update last_modified date for website
self.db.update_website_date_if_exists(task.website_id)
# File list is safe to delete once indexed
server.free_website_files(task.website_id)
def dispatch_task(self, task: Task):
self._get_available_crawl_server().queue_task(task)