mirror of
https://github.com/simon987/Simple-Incremental-Search-Tool.git
synced 2025-04-04 07:52:58 +00:00
270 lines
9.5 KiB
Python
270 lines
9.5 KiB
Python
import json
|
|
import os
|
|
import shutil
|
|
from multiprocessing import Process, Value
|
|
from queue import Queue, Empty, Full
|
|
from threading import Thread
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
|
|
import config
|
|
from indexer import Indexer
|
|
from parsing import GenericFileParser, Md5CheckSumCalculator, ExtensionMimeGuesser, MediaFileParser, TextFileParser, \
|
|
PictureFileParser, Sha1CheckSumCalculator, Sha256CheckSumCalculator, ContentMimeGuesser, MimeGuesser, FontParser, \
|
|
PdfFileParser, DocxParser, EbookParser, SpreadSheetParser
|
|
from search import Search
|
|
from storage import Directory
|
|
from storage import Task, LocalStorage
|
|
from thumbnail import ThumbnailGenerator
|
|
|
|
|
|
class RunningTask:
|
|
|
|
def __init__(self, task: Task):
|
|
self.total_files = Value("i", 0)
|
|
self.parsed_files = Value("i", 0)
|
|
self.task = task
|
|
self.done = Value("i", 0)
|
|
|
|
def to_json(self):
|
|
return json.dumps({"parsed": self.parsed_files.value, "total": self.total_files.value, "id": self.task.id})
|
|
|
|
|
|
class Crawler:
|
|
|
|
def __init__(self, enabled_parsers: list, mime_guesser: MimeGuesser = ExtensionMimeGuesser(), indexer=None,
|
|
dir_id=0,
|
|
root_dir="/"):
|
|
self.documents = []
|
|
self.enabled_parsers = enabled_parsers
|
|
self.indexer = indexer
|
|
self.dir_id = dir_id
|
|
self.root_dir = root_dir
|
|
|
|
for parser in self.enabled_parsers:
|
|
if parser.is_default:
|
|
self.default_parser = parser
|
|
|
|
self.ext_map = {}
|
|
|
|
for parser in self.enabled_parsers:
|
|
for ext in parser.mime_types:
|
|
self.ext_map[ext] = parser
|
|
|
|
self.mime_guesser = mime_guesser
|
|
|
|
def crawl(self, root_dir: str, counter: Value = None, total_files=None):
|
|
|
|
in_q = Queue(50000) # TODO: get from config?
|
|
out_q = Queue()
|
|
|
|
threads = []
|
|
print("Creating %d threads" % (config.parse_threads,))
|
|
for _ in range(config.parse_threads):
|
|
t = Thread(target=self.parse_file, args=[in_q, out_q, ])
|
|
threads.append(t)
|
|
t.start()
|
|
|
|
indexer_thread = Thread(target=self.index_file, args=[out_q, counter, ])
|
|
indexer_thread.start()
|
|
|
|
for root, dirs, files in os.walk(root_dir):
|
|
for filename in files:
|
|
while True:
|
|
try:
|
|
in_q.put(os.path.join(root, filename), timeout=10)
|
|
if total_files:
|
|
total_files.value += 1
|
|
break
|
|
except Full:
|
|
continue
|
|
|
|
in_q.join()
|
|
out_q.join()
|
|
|
|
for _ in threads:
|
|
in_q.put(None)
|
|
out_q.put(None)
|
|
|
|
indexer_thread.join()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
def countFiles(self, root_dir: str):
|
|
count = 0
|
|
|
|
for root, dirs, files in os.walk(root_dir):
|
|
count += len(files)
|
|
|
|
return count
|
|
|
|
def parse_file(self, in_q: Queue, out_q: Queue):
|
|
|
|
while True:
|
|
try:
|
|
full_path = in_q.get(timeout=1)
|
|
if full_path is None:
|
|
break
|
|
except Empty:
|
|
break
|
|
|
|
try:
|
|
mime = self.mime_guesser.guess_mime(full_path)
|
|
parser = self.ext_map.get(mime, self.default_parser)
|
|
|
|
doc = parser.parse(full_path)
|
|
doc["mime"] = mime
|
|
out_q.put(doc)
|
|
except:
|
|
pass
|
|
finally:
|
|
in_q.task_done()
|
|
|
|
def index_file(self, out_q: Queue, count: Value):
|
|
|
|
if self.indexer is None:
|
|
while True:
|
|
try:
|
|
doc = out_q.get(timeout=120)
|
|
if doc is None:
|
|
break
|
|
except Empty:
|
|
break
|
|
self.documents.append(doc)
|
|
out_q.task_done()
|
|
return
|
|
|
|
while True:
|
|
try:
|
|
doc = out_q.get(timeout=600)
|
|
if doc is None:
|
|
break
|
|
except Empty:
|
|
print("outq empty")
|
|
break
|
|
|
|
try:
|
|
self.documents.append(doc)
|
|
count.value += 1
|
|
|
|
if count.value % config.index_every == 0:
|
|
self.indexer.index(self.documents, self.dir_id)
|
|
self.documents.clear()
|
|
except:
|
|
pass
|
|
finally:
|
|
out_q.task_done()
|
|
if self.documents:
|
|
self.indexer.index(self.documents, self.dir_id)
|
|
|
|
|
|
class TaskManager:
|
|
def __init__(self, storage: LocalStorage):
|
|
self.current_task = None
|
|
self.storage = storage
|
|
self.current_process = None
|
|
self.indexer = Indexer(config.elasticsearch_index)
|
|
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(self.check_new_task, "interval", seconds=0.5)
|
|
scheduler.start()
|
|
|
|
def start_task(self, task: Task):
|
|
self.current_task = RunningTask(task)
|
|
|
|
directory = self.storage.dirs()[task.dir_id]
|
|
|
|
if task.type == Task.INDEX:
|
|
self.current_process = Process(target=self.execute_crawl, args=(directory,
|
|
self.current_task.parsed_files,
|
|
self.current_task.done,
|
|
self.current_task.total_files))
|
|
|
|
elif task.type == Task.GEN_THUMBNAIL:
|
|
self.current_process = Process(target=self.execute_thumbnails, args=(directory,
|
|
self.current_task.total_files,
|
|
self.current_task.parsed_files,
|
|
self.current_task.done))
|
|
self.current_process.start()
|
|
|
|
def execute_crawl(self, directory: Directory, counter: Value, done: Value, total_files: Value):
|
|
|
|
Search(config.elasticsearch_index).delete_directory(directory.id)
|
|
|
|
chksum_calcs = self.make_checksums_list(directory)
|
|
|
|
mime_guesser = ExtensionMimeGuesser() if directory.get_option("MimeGuesser") == "extension" \
|
|
else ContentMimeGuesser()
|
|
|
|
c = Crawler(self.make_parser_list(chksum_calcs, directory), mime_guesser, self.indexer, directory.id)
|
|
c.crawl(directory.path, counter, total_files)
|
|
|
|
done.value = 1
|
|
|
|
@staticmethod
|
|
def make_checksums_list(directory):
|
|
chksum_calcs = []
|
|
for arg in directory.get_option("CheckSumCalculators").split(","):
|
|
if arg.strip() == "md5":
|
|
chksum_calcs.append(Md5CheckSumCalculator())
|
|
elif arg.strip() == "sha1":
|
|
chksum_calcs.append(Sha1CheckSumCalculator())
|
|
elif arg.strip() == "sha256":
|
|
chksum_calcs.append(Sha256CheckSumCalculator())
|
|
return chksum_calcs
|
|
|
|
@staticmethod
|
|
def make_parser_list(chksum_calcs, directory):
|
|
p = [p.strip() for p in directory.get_option("FileParsers").split(",")]
|
|
parsers = [GenericFileParser(chksum_calcs, directory.path)]
|
|
if "media" in p:
|
|
parsers.append(MediaFileParser(chksum_calcs, directory.path))
|
|
if "text" in p:
|
|
parsers.append(
|
|
TextFileParser(chksum_calcs, int(directory.get_option("TextFileContentLength")), directory.path))
|
|
if "picture" in p:
|
|
parsers.append(PictureFileParser(chksum_calcs, directory.path))
|
|
if "font" in p:
|
|
parsers.append(FontParser(chksum_calcs, directory.path))
|
|
if "pdf" in p:
|
|
parsers.append(
|
|
PdfFileParser(chksum_calcs, int(directory.get_option("PdfFileContentLength")), directory.path))
|
|
if "docx" in p:
|
|
parsers.append(DocxParser(chksum_calcs, int(directory.get_option("DocxContentLength")), directory.path))
|
|
if "spreadsheet" in p:
|
|
parsers.append(
|
|
SpreadSheetParser(chksum_calcs, int(directory.get_option("SpreadSheetContentLength")), directory.path))
|
|
if "ebook" in p:
|
|
parsers.append(EbookParser(chksum_calcs, int(directory.get_option("EbookContentLength")), directory.path))
|
|
return parsers
|
|
|
|
def execute_thumbnails(self, directory: Directory, total_files: Value, counter: Value, done: Value):
|
|
|
|
dest_path = os.path.join("static/thumbnails", str(directory.id))
|
|
if os.path.exists(dest_path):
|
|
shutil.rmtree(dest_path)
|
|
|
|
docs = Search(config.elasticsearch_index).get_all_documents(directory.id)
|
|
|
|
tn_generator = ThumbnailGenerator(int(directory.get_option("ThumbnailSize")),
|
|
int(directory.get_option("ThumbnailQuality")),
|
|
directory.get_option("ThumbnailColor"))
|
|
tn_generator.generate_all(docs, dest_path, counter, directory, total_files)
|
|
|
|
done.value = 1
|
|
|
|
def cancel_task(self):
|
|
self.current_task.done.value = 1
|
|
|
|
def check_new_task(self):
|
|
|
|
if self.current_task is None:
|
|
tasks = self.storage.tasks()
|
|
if len(tasks) > 0:
|
|
self.start_task(tasks[sorted(tasks)[0]])
|
|
else:
|
|
if self.current_task.done.value == 1:
|
|
self.current_process.terminate()
|
|
self.storage.del_task(self.current_task.task.id)
|
|
self.current_task = None
|