Bug fixes + multi threading

This commit is contained in:
simon987 2019-03-10 20:15:53 -04:00
parent 36b9ed6cb7
commit 746ad25a4e
9 changed files with 157 additions and 50 deletions

View File

@ -29,6 +29,7 @@ Once the web server is running, you can connect to the search interface by typin
* Download and install [Elasticsearch](https://www.elastic.co/downloads/elasticsearch)
* Edit settings in [config.py](https://github.com/simon987/Simple-Incremental-Search-Tool/blob/master/config.py) (Default values are ok in most cases)
```bash
git clone https://github.com/simon987/Simple-Incremental-Search-Tool

View File

@ -26,7 +26,14 @@ bcrypt_rounds = 14
db_path = "./local_storage.db"
# Set to true to allow guests to search any directory
allow_guests = False
allow_guests = True
# Number of threads used for parsing
parse_threads = 8
# Number of threads used for thumbnail generation
tn_threads = 32
try:
import cairosvg
@ -34,4 +41,4 @@ try:
except:
cairosvg = False
VERSION = "1.0a"
VERSION = "1.1a"

View File

@ -2,6 +2,8 @@ import json
import os
import shutil
from multiprocessing import Process, Value
from queue import Queue, Empty, Full
from threading import Thread
from apscheduler.schedulers.background import BackgroundScheduler
@ -51,39 +53,42 @@ class Crawler:
self.mime_guesser = mime_guesser
def crawl(self, root_dir: str, counter: Value = None):
def crawl(self, root_dir: str, counter: Value = None, total_files = None):
document_counter = 0
in_q = Queue(50000) # TODO: get from config?
out_q = Queue()
threads = []
print("Creating %d threads" % (config.parse_threads,))
for _ in range(config.parse_threads):
t = Thread(target=self.parse_file, args=[in_q, out_q, ])
threads.append(t)
t.start()
indexer_thread = Thread(target=self.index_file, args=[out_q, counter, ])
indexer_thread.start()
for root, dirs, files in os.walk(root_dir):
for filename in files:
full_path = os.path.join(root, filename)
while True:
try:
in_q.put(os.path.join(root, filename), timeout=10)
if total_files:
total_files.value += 1
break
except Full:
continue
mime = self.mime_guesser.guess_mime(full_path)
in_q.join()
out_q.join()
parser = self.ext_map.get(mime, self.default_parser)
for _ in threads:
in_q.put(None)
out_q.put(None)
document_counter += 1
if document_counter >= config.index_every:
document_counter = 0
self.indexer.index(self.documents, self.dir_id)
self.documents.clear()
try:
if counter:
counter.value += 1
doc = parser.parse(full_path)
doc["mime"] = mime
self.documents.append(doc)
except FileNotFoundError:
continue # File was deleted
if self.indexer is not None and len(self.documents) > 0:
self.indexer.index(self.documents, self.dir_id)
indexer_thread.join()
for t in threads:
t.join()
def countFiles(self, root_dir: str):
count = 0
@ -93,6 +98,61 @@ class Crawler:
return count
def parse_file(self, in_q: Queue, out_q: Queue):
while True:
try:
full_path = in_q.get(timeout=1)
if full_path is None:
break
except Empty:
break
try:
mime = self.mime_guesser.guess_mime(full_path)
parser = self.ext_map.get(mime, self.default_parser)
doc = parser.parse(full_path)
doc["mime"] = mime
out_q.put(doc)
finally:
in_q.task_done()
def index_file(self, out_q: Queue, count: Value):
if self.indexer is None:
while True:
try:
doc = out_q.get(timeout=10)
if doc is None:
break
except Empty:
break
self.documents.append(doc)
out_q.task_done()
return
while True:
try:
doc = out_q.get(timeout=10)
if doc is None:
break
except Empty:
break
try:
self.documents.append(doc)
count.value += 1
if count.value % config.index_every == 0:
self.indexer.index(self.documents, self.dir_id)
self.documents.clear()
except:
pass
finally:
out_q.task_done()
self.indexer.index(self.documents, self.dir_id)
class TaskManager:
def __init__(self, storage: LocalStorage):
@ -112,10 +172,10 @@ class TaskManager:
if task.type == Task.INDEX:
c = Crawler([])
self.current_task.total_files.value = c.countFiles(directory.path)
self.current_process = Process(target=self.execute_crawl, args=(directory, self.current_task.parsed_files,
self.current_task.done))
self.current_process = Process(target=self.execute_crawl, args=(directory,
self.current_task.parsed_files,
self.current_task.done,
self.current_task.total_files))
elif task.type == Task.GEN_THUMBNAIL:
self.current_process = Process(target=self.execute_thumbnails, args=(directory,
@ -124,7 +184,7 @@ class TaskManager:
self.current_task.done))
self.current_process.start()
def execute_crawl(self, directory: Directory, counter: Value, done: Value):
def execute_crawl(self, directory: Directory, counter: Value, done: Value, total_files: Value):
Search("changeme").delete_directory(directory.id)
@ -151,7 +211,7 @@ class TaskManager:
DocxParser(chksum_calcs, int(directory.get_option("SpreadsheetContentLength")), directory.path),
EbookParser(chksum_calcs, int(directory.get_option("EbookContentLength")), directory.path)],
mime_guesser, self.indexer, directory.id)
c.crawl(directory.path, counter)
c.crawl(directory.path, counter, total_files)
done.value = 1
@ -161,14 +221,12 @@ class TaskManager:
if os.path.exists(dest_path):
shutil.rmtree(dest_path)
docs = list(Search("changeme").get_all_documents(directory.id))
total_files.value = len(docs)
docs = Search("changeme").get_all_documents(directory.id)
tn_generator = ThumbnailGenerator(int(directory.get_option("ThumbnailSize")),
int(directory.get_option("ThumbnailQuality")),
directory.get_option("ThumbnailColor"))
tn_generator.generate_all(docs, dest_path, counter, directory)
tn_generator.generate_all(docs, dest_path, counter, directory, total_files)
done.value = 1

View File

@ -143,7 +143,7 @@ class GenericFileParser(FileParser):
name, extension = os.path.splitext(name)
info["size"] = file_stat.st_size
info["path"] = path[self.root_dir_len:]
info["path"] = os.path.relpath(path, self.root_dir)
info["name"] = name
info["extension"] = extension[1:]
info["mtime"] = file_stat.st_mtime

2
run.py
View File

@ -241,7 +241,7 @@ def search_liste_page():
def get_allowed_dirs(username):
if config.allow_guests:
return [x for x in storage.dirs() if x.enabled]
return [x for x in storage.dirs() if storage.dirs()[x].enabled]
if username:
user = storage.users()[username]
return [x for x in storage.dirs() if storage.dirs()[x].enabled and x in user.readable_directories]

View File

@ -149,7 +149,7 @@ class Search:
"aggs": {
"total_size": {"sum": {"field": "size"}}
},
"size": 40}, index=self.index_name, scroll="3m")
"size": 40}, index=self.index_name, scroll="30m")
return page

View File

@ -267,7 +267,7 @@ function createDocCard(hit) {
}
thumbnailOverlay.appendChild(resolutionBadge);
var format = hit["_source"]["format"];
var format = hit["_source"]["format_name"];
//Hover
if(format === "GIF") {
@ -429,6 +429,8 @@ window.addEventListener("scroll", function () {
if (hits.length !== 0) {
coolingDown = false;
}
} else if (this.status === 500) {
window.location.reload()
}
};
xhttp.open("GET", "/scroll?scroll_id=" + scroll_id, true);

View File

@ -143,7 +143,7 @@
</div>
<div class="card">
<div class="card-header">Options <a href="#" style="float:right">Learn more <i
<div class="card-header">Options <a href="https://github.com/simon987/Simple-Incremental-Search-Tool/blob/master/config.py#L1-L13" style="float:right">Learn more <i
class="fas fa-external-link-alt"></i></a></div>
<div class="card-body">
<table class="info-table table-striped table-hover">

View File

@ -1,6 +1,10 @@
from queue import Full, Empty
from threading import Thread
from PIL import Image
import os
from multiprocessing import Value, Process
from queue import Queue
import ffmpeg
import config
@ -22,10 +26,11 @@ class ThumbnailGenerator:
if mime == "image/svg+xml" and config.cairosvg:
tmpfile = dest_path + "_tmp"
try:
p = Process(target=cairosvg.svg2png, kwargs={"url": path, "write_to": "tmp"})
p = Process(target=cairosvg.svg2png, kwargs={"url": path, "write_to": tmpfile})
p.start()
p.join(1)
p.join(5)
if p.is_alive():
p.terminate()
@ -35,8 +40,8 @@ class ThumbnailGenerator:
except Exception:
print("Couldn't make thumbnail for " + path)
if os.path.exists("tmp"):
os.remove("tmp")
if os.path.exists(tmpfile):
os.remove(tmpfile)
elif mime.startswith("image"):
@ -59,11 +64,16 @@ class ThumbnailGenerator:
if os.path.exists("tmp"):
os.remove("tmp")
def generate_all(self, docs, dest_path, counter: Value=None, directory=None):
def worker(self, in_q: Queue, counter: Value, dest_path, directory):
os.makedirs(dest_path, exist_ok=True)
while True:
try:
doc = in_q.get(timeout=1)
if doc is None:
break
except Empty:
break
for doc in docs:
extension = "" if doc["_source"]["extension"] == "" else "." + doc["_source"]["extension"]
full_path = os.path.join(directory.path, doc["_source"]["path"], doc["_source"]["name"] + extension)
@ -73,6 +83,35 @@ class ThumbnailGenerator:
if counter is not None:
counter.value += 1
in_q.task_done()
def generate_all(self, docs, dest_path, counter: Value = None, directory=None, total_count=None):
os.makedirs(dest_path, exist_ok=True)
in_q = Queue(50000) # TODO: load from config?
threads = []
for _ in range(config.tn_threads):
t = Thread(target=self.worker, args=[in_q, counter, dest_path, directory])
threads.append(t)
t.start()
for doc in docs:
while True:
try:
in_q.put(doc, timeout=10)
if total_count:
total_count.value += 1
break
except Full:
continue
in_q.join()
for _ in threads:
in_q.put(None)
for t in threads:
t.join()
def generate_image(self, path, dest_path):
with open(path, "rb") as image_file: