bulk indexing

This commit is contained in:
simon 2018-03-13 12:22:00 -04:00
parent e79a68ebe6
commit 9d75fc4d59
4 changed files with 54 additions and 16 deletions

View File

@ -4,7 +4,7 @@ import json
from multiprocessing import Process, Value from multiprocessing import Process, Value
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from parsing import GenericFileParser, Md5CheckSumCalculator, ExtensionMimeGuesser from parsing import GenericFileParser, Md5CheckSumCalculator, ExtensionMimeGuesser
import time from indexer import Indexer
class RunningTask: class RunningTask:
@ -13,6 +13,7 @@ class RunningTask:
self.total_files = 0 self.total_files = 0
self.parsed_files = Value("i", 0) self.parsed_files = Value("i", 0)
self.task = task self.task = task
self.done = Value("i", 0)
def to_json(self): def to_json(self):
return json.dumps({"parsed": self.parsed_files.value, "total": self.total_files, "id": self.task.id}) return json.dumps({"parsed": self.parsed_files.value, "total": self.total_files, "id": self.task.id})
@ -67,6 +68,7 @@ class TaskManager:
self.current_task = None self.current_task = None
self.storage = storage self.storage = storage
self.current_process = None self.current_process = None
self.indexer = Indexer("changeme")
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
scheduler.add_job(self.check_new_task, "interval", seconds=0.5) scheduler.add_job(self.check_new_task, "interval", seconds=0.5)
@ -75,20 +77,25 @@ class TaskManager:
def start_task(self, task: Task): def start_task(self, task: Task):
self.current_task = RunningTask(task) self.current_task = RunningTask(task)
c = Crawler([GenericFileParser([Md5CheckSumCalculator()], ExtensionMimeGuesser())]) c = Crawler([])
path = self.storage.dirs()[task.dir_id].path path = self.storage.dirs()[task.dir_id].path
self.current_task.total_files = c.countFiles(path) self.current_task.total_files = c.countFiles(path)
print("Started task - " + str(self.current_task.total_files) + " files") print("Started task - " + str(self.current_task.total_files) + " files")
print(path) print(path)
self.current_process = Process(target=self.execute_crawl, args=(c, path, self.current_task.parsed_files)) self.current_process = Process(target=self.execute_crawl, args=(path, self.current_task.parsed_files, self.current_task.done))
self.current_process.daemon = True # self.current_process.daemon = True
self.current_process.start() self.current_process.start()
def execute_crawl(self, c: Crawler, path: str, counter: Value): def execute_crawl(self, path: str, counter: Value, done: Value):
c = Crawler([GenericFileParser([Md5CheckSumCalculator()], ExtensionMimeGuesser())])
c.crawl(path, counter) c.crawl(path, counter)
Indexer("changeme").index(c.documents)
print("Done") print("Done")
done.value = 1
def cancel_task(self): def cancel_task(self):
self.current_task = None self.current_task = None
@ -101,9 +108,7 @@ class TaskManager:
if not self.storage.tasks()[i].completed: if not self.storage.tasks()[i].completed:
self.start_task(self.storage.tasks()[i]) self.start_task(self.storage.tasks()[i])
else: else:
print(self.current_task.parsed_files.value) if self.current_task.done.value == 1:
if self.current_task.parsed_files.value == self.current_task.total_files:
self.current_process.terminate() self.current_process.terminate()
self.storage.del_task(self.current_task.task.id) self.storage.del_task(self.current_task.task.id)

View File

@ -22,34 +22,60 @@ class Indexer:
t.daemon = True t.daemon = True
t.start() t.start()
time.sleep(5) time.sleep(10)
self.init()
@staticmethod @staticmethod
def run_elasticsearch(): def run_elasticsearch():
subprocess.Popen(["elasticsearch/bin/elasticsearch"]) subprocess.Popen(["elasticsearch/bin/elasticsearch"])
@staticmethod @staticmethod
def create_bulk_index_string(docs: list, index_name: str): def create_bulk_index_string(docs: list):
""" """
Creates a insert string for sending to elasticsearch Creates a insert string for sending to elasticsearch
""" """
print("Creating bulk index string...")
result = "" result = ""
action_string = '{"index":{"_index":"' + index_name + '","_type":"file"}}\n' action_string = '{"index":{}}\n'
for doc in docs: for doc in docs:
result += action_string result += action_string
result += json.dumps(doc) + "\n" result += json.dumps(doc) + "\n"
print(result)
return result return result
def index(self, docs: list): def index(self, docs: list):
print("Indexing " + str(len(docs)) + " docs")
index_string = self.create_bulk_index_string(docs, self.index_name) index_string = Indexer.create_bulk_index_string(docs)
self.es.bulk(index_string) print("bulk-start")
self.es.bulk(body=index_string, index=self.index_name, doc_type="file")
print("bulk-done")
def clear(self): def clear(self):
self.es.indices.delete(self.index_name) self.es.indices.delete(self.index_name)
self.es.indices.create(self.index_name) self.es.indices.create(self.index_name)
def init(self):
self.es.indices.delete(index=self.index_name)
self.es.indices.create(index=self.index_name)
self.es.indices.close(index=self.index_name)
self.es.indices.put_settings(body='{"analysis": {"analyzer": {"path_analyser": {'
'"tokenizer": "path_tokenizer"}}, "tokenizer": {"path_tokenizer": {'
'"type": "path_hierarchy"}}}}', index=self.index_name)
self.es.indices.put_mapping(body='{"properties": {'
'"name": {"type": "text", "analyzer": "path_analyser", "copy_to": "suggest-path"},'
'"suggest-path": {"type": "completion", "analyzer": "keyword"},'
'"mime": {"type": "keyword"}'
'}}', doc_type="file", index=self.index_name)
self.es.indices.open(index=self.index_name)
print("Initialised elesticsearch")

View File

@ -42,7 +42,7 @@
{# todo: box-shadow 0 1px 10px 1px #1AC8DE#} {# todo: box-shadow 0 1px 10px 1px #1AC8DE#}
</style> </style>
</head> </head>
<body class="keen-dashboard" style="padding-top: 80px;"> <body>
<div> <div>
<span>Navbar1</span> <span>Navbar1</span>

View File

@ -75,8 +75,15 @@
var percent = currentTask.parsed / currentTask.total * 100; var percent = currentTask.parsed / currentTask.total * 100;
try { try {
document.getElementById("task-bar-" + currentTask.id).setAttribute("style", "width: " + percent + "%;");
var bar = document.getElementById("task-bar-" + currentTask.id);
bar.setAttribute("style", "width: " + percent + "%;");
document.getElementById("task-label-" + currentTask.id).innerHTML = currentTask.parsed + " / " + currentTask.total + " (" + percent.toFixed(2) + "%)"; document.getElementById("task-label-" + currentTask.id).innerHTML = currentTask.parsed + " / " + currentTask.total + " (" + percent.toFixed(2) + "%)";
if (percent === 100) {
bar.classList.add("bg-success")
}
} catch (e) { } catch (e) {
window.reload(); window.reload();
} }