Files are indexed into ES when task is complete

This commit is contained in:
Simon 2018-06-12 15:45:00 -04:00
parent 6c912ea8c5
commit 1718bb91ca
7 changed files with 41 additions and 38 deletions

View File

@ -118,9 +118,8 @@ class TaskManagerDatabase:
cursor.execute("SELECT status_code, file_count, start_time, end_time, website_id"
" FROM TaskResult WHERE indexed_time IS NULL")
db_result = cursor.fetchall()
print(len(db_result))
cursor.execute("UPDATE TaskResult SET indexed_time = CURRENT_TIMESTAMP WHERE indexed_time IS NULL")
cursor.execute("UPDATE TaskResult SET indexed_time=CURRENT_TIMESTAMP WHERE indexed_time IS NULL")
conn.commit()
return [TaskResult(r[0], r[1], r[2], r[3], r[4]) for r in db_result]

View File

@ -1,4 +1,4 @@
from flask import Flask, request, abort, Response
from flask import Flask, request, abort, Response, send_from_directory
import json
from crawl_server.task_manager import TaskManager, Task, TaskResult
app = Flask(__name__)
@ -45,5 +45,10 @@ def get_current_tasks():
return current_tasks
@app.route("/file_list/<int:website_id>/")
def get_file_list(website_id):
return send_from_directory(directory="./crawled/", filename=str(website_id) + ".json")
if __name__ == "__main__":
app.run(port=5001)

View File

@ -55,14 +55,13 @@ class TaskManager:
print("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, 100)
crawl_result = crawler.crawl_directory("crawled/" + str(task.website_id) + ".json")
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
result.file_count = crawl_result.file_count
result.status_code = crawl_result.status_code
print("End task " + task.url)
result.end_time = datetime.utcnow()
print("End task " + task.url)
return dict(result=result, db_path=db_path)
@ -77,6 +76,7 @@ class TaskManager:
db = TaskManagerDatabase(db_path)
db.log_result(result)
print("Logged result to DB")
@staticmethod
def task_error(err):

View File

@ -11,7 +11,7 @@ class SearchEngine:
def __init__(self):
pass
def import_json(self, in_file: str, website_id: int):
def import_json(self, in_str: str, website_id: int):
raise NotImplementedError
def search(self, query) -> {}:
@ -79,21 +79,19 @@ class ElasticSearchEngine(SearchEngine):
def ping(self):
return self.es.ping()
def import_json(self, in_file: str, website_id: int):
def import_json(self, in_str: str, website_id: int):
import_every = 1000
with open(in_file, "r") as f:
docs = []
print(in_str)
docs = []
line = f.readline()
while line:
docs.append(line[:-1]) # Remove trailing new line
for line in in_str.splitlines():
docs.append(line)
if len(docs) >= import_every:
self._index(docs, website_id)
docs.clear()
line = f.readline()
self._index(docs, website_id)
if len(docs) >= import_every:
self._index(docs, website_id)
docs.clear()
self._index(docs, website_id)
def _index(self, docs, website_id):
print("Indexing " + str(len(docs)) + " docs")
@ -107,14 +105,10 @@ class ElasticSearchEngine(SearchEngine):
@staticmethod
def create_bulk_index_string(docs: list, website_id: int):
result = ""
action_string = '{"index":{}}\n'
website_id_string = ',"website_id":' + str(website_id) + '}\n' # Add website_id param to each doc
for doc in docs:
result += action_string + doc[:-1] + website_id_string
return result
return "\n".join("".join([action_string, doc[:-1], website_id_string]) for doc in docs)
def search(self, query) -> {}:

17
task.py
View File

@ -1,4 +1,5 @@
from apscheduler.schedulers.background import BackgroundScheduler
from search.search import ElasticSearchEngine
from crawl_server.database import Task, TaskResult
import requests
import json
@ -46,6 +47,11 @@ class CrawlServer:
for t in json.loads(r.text)
]
def get_file_list(self, website_id) -> str:
r = requests.get(self.url + "/file_list/" + str(website_id) + "/")
return r.text
class TaskDispatcher:
@ -58,19 +64,20 @@ class TaskDispatcher:
scheduler.add_job(self.check_completed_tasks, "interval", seconds=1)
scheduler.start()
self.search = ElasticSearchEngine("od-database")
# TODO load from config
self.crawl_servers = [
CrawlServer("http://localhost:5001"),
]
def check_completed_tasks(self):
completed_tasks = []
for server in self.crawl_servers:
completed_tasks.extend(server.get_completed_tasks())
if completed_tasks:
print(str(len(completed_tasks)) + " completed tasks. Will index immediately")
for task in server.get_completed_tasks():
print("Completed task")
file_list = server.get_file_list(task.website_id)
self.search.import_json(file_list, task.website_id)
def dispatch_task(self, task: Task):
self._get_available_crawl_server().queue_task(task)

View File

@ -24,11 +24,11 @@ class SearchTest(TestCase):
{"name": "Dead Racer", "size": 1000, "path": "Speed Machine [FLAC]", "mtime": 12345}
]
with open("tmp.json", "w") as f:
for file in files:
f.write(json.dumps(file) + "\n")
in_str = ""
for file in files:
in_str += json.dumps(file) + "\n"
self.search.import_json("tmp.json", 123)
self.search.import_json(in_str, 123)
time.sleep(2)
self.assertEqual(4, self.search.es.count(self.search.index_name, "file")["count"])
@ -50,8 +50,6 @@ class SearchTest(TestCase):
page = self.search.search("10'000")
self.assertEqual(1, page["hits"]["total"])
os.remove("tmp.json")
def test_scroll(self):
files = [
@ -61,11 +59,11 @@ class SearchTest(TestCase):
{"name": "Dead Racer", "size": 1000, "path": "Speed Machine [FLAC]", "mtime": 12345}
]
with open("tmp.json", "w") as f:
for file in files:
f.write(json.dumps(file) + "\n")
in_str = ""
for file in files:
in_str += json.dumps(file) + "\n"
self.search.import_json("tmp.json", 123)
self.search.import_json(in_str, 123)
time.sleep(2)
page = self.search.search("")