270 lines
9.5 KiB
Python

import json
import os
import shutil
from multiprocessing import Process, Value
from queue import Queue, Empty, Full
from threading import Thread
from apscheduler.schedulers.background import BackgroundScheduler
import config
from indexer import Indexer
from parsing import GenericFileParser, Md5CheckSumCalculator, ExtensionMimeGuesser, MediaFileParser, TextFileParser, \
PictureFileParser, Sha1CheckSumCalculator, Sha256CheckSumCalculator, ContentMimeGuesser, MimeGuesser, FontParser, \
PdfFileParser, DocxParser, EbookParser, SpreadSheetParser
from search import Search
from storage import Directory
from storage import Task, LocalStorage
from thumbnail import ThumbnailGenerator
class RunningTask:
def __init__(self, task: Task):
self.total_files = Value("i", 0)
self.parsed_files = Value("i", 0)
self.task = task
self.done = Value("i", 0)
def to_json(self):
return json.dumps({"parsed": self.parsed_files.value, "total": self.total_files.value, "id": self.task.id})
class Crawler:
def __init__(self, enabled_parsers: list, mime_guesser: MimeGuesser = ExtensionMimeGuesser(), indexer=None,
dir_id=0,
root_dir="/"):
self.documents = []
self.enabled_parsers = enabled_parsers
self.indexer = indexer
self.dir_id = dir_id
self.root_dir = root_dir
for parser in self.enabled_parsers:
if parser.is_default:
self.default_parser = parser
self.ext_map = {}
for parser in self.enabled_parsers:
for ext in parser.mime_types:
self.ext_map[ext] = parser
self.mime_guesser = mime_guesser
def crawl(self, root_dir: str, counter: Value = None, total_files=None):
in_q = Queue(50000) # TODO: get from config?
out_q = Queue()
threads = []
print("Creating %d threads" % (config.parse_threads,))
for _ in range(config.parse_threads):
t = Thread(target=self.parse_file, args=[in_q, out_q, ])
threads.append(t)
t.start()
indexer_thread = Thread(target=self.index_file, args=[out_q, counter, ])
indexer_thread.start()
for root, dirs, files in os.walk(root_dir):
for filename in files:
while True:
try:
in_q.put(os.path.join(root, filename), timeout=10)
if total_files:
total_files.value += 1
break
except Full:
continue
in_q.join()
out_q.join()
for _ in threads:
in_q.put(None)
out_q.put(None)
indexer_thread.join()
for t in threads:
t.join()
def countFiles(self, root_dir: str):
count = 0
for root, dirs, files in os.walk(root_dir):
count += len(files)
return count
def parse_file(self, in_q: Queue, out_q: Queue):
while True:
try:
full_path = in_q.get(timeout=1)
if full_path is None:
break
except Empty:
break
try:
mime = self.mime_guesser.guess_mime(full_path)
parser = self.ext_map.get(mime, self.default_parser)
doc = parser.parse(full_path)
doc["mime"] = mime
out_q.put(doc)
except:
pass
finally:
in_q.task_done()
def index_file(self, out_q: Queue, count: Value):
if self.indexer is None:
while True:
try:
doc = out_q.get(timeout=120)
if doc is None:
break
except Empty:
break
self.documents.append(doc)
out_q.task_done()
return
while True:
try:
doc = out_q.get(timeout=600)
if doc is None:
break
except Empty:
print("outq empty")
break
try:
self.documents.append(doc)
count.value += 1
if count.value % config.index_every == 0:
self.indexer.index(self.documents, self.dir_id)
self.documents.clear()
except:
pass
finally:
out_q.task_done()
if self.documents:
self.indexer.index(self.documents, self.dir_id)
class TaskManager:
def __init__(self, storage: LocalStorage):
self.current_task = None
self.storage = storage
self.current_process = None
self.indexer = Indexer(config.elasticsearch_index)
scheduler = BackgroundScheduler()
scheduler.add_job(self.check_new_task, "interval", seconds=0.5)
scheduler.start()
def start_task(self, task: Task):
self.current_task = RunningTask(task)
directory = self.storage.dirs()[task.dir_id]
if task.type == Task.INDEX:
self.current_process = Process(target=self.execute_crawl, args=(directory,
self.current_task.parsed_files,
self.current_task.done,
self.current_task.total_files))
elif task.type == Task.GEN_THUMBNAIL:
self.current_process = Process(target=self.execute_thumbnails, args=(directory,
self.current_task.total_files,
self.current_task.parsed_files,
self.current_task.done))
self.current_process.start()
def execute_crawl(self, directory: Directory, counter: Value, done: Value, total_files: Value):
Search(config.elasticsearch_index).delete_directory(directory.id)
chksum_calcs = self.make_checksums_list(directory)
mime_guesser = ExtensionMimeGuesser() if directory.get_option("MimeGuesser") == "extension" \
else ContentMimeGuesser()
c = Crawler(self.make_parser_list(chksum_calcs, directory), mime_guesser, self.indexer, directory.id)
c.crawl(directory.path, counter, total_files)
done.value = 1
@staticmethod
def make_checksums_list(directory):
chksum_calcs = []
for arg in directory.get_option("CheckSumCalculators").split(","):
if arg.strip() == "md5":
chksum_calcs.append(Md5CheckSumCalculator())
elif arg.strip() == "sha1":
chksum_calcs.append(Sha1CheckSumCalculator())
elif arg.strip() == "sha256":
chksum_calcs.append(Sha256CheckSumCalculator())
return chksum_calcs
@staticmethod
def make_parser_list(chksum_calcs, directory):
p = [p.strip() for p in directory.get_option("FileParsers").split(",")]
parsers = [GenericFileParser(chksum_calcs, directory.path)]
if "media" in p:
parsers.append(MediaFileParser(chksum_calcs, directory.path))
if "text" in p:
parsers.append(
TextFileParser(chksum_calcs, int(directory.get_option("TextFileContentLength")), directory.path))
if "picture" in p:
parsers.append(PictureFileParser(chksum_calcs, directory.path))
if "font" in p:
parsers.append(FontParser(chksum_calcs, directory.path))
if "pdf" in p:
parsers.append(
PdfFileParser(chksum_calcs, int(directory.get_option("PdfFileContentLength")), directory.path))
if "docx" in p:
parsers.append(DocxParser(chksum_calcs, int(directory.get_option("DocxContentLength")), directory.path))
if "spreadsheet" in p:
parsers.append(
SpreadSheetParser(chksum_calcs, int(directory.get_option("SpreadSheetContentLength")), directory.path))
if "ebook" in p:
parsers.append(EbookParser(chksum_calcs, int(directory.get_option("EbookContentLength")), directory.path))
return parsers
def execute_thumbnails(self, directory: Directory, total_files: Value, counter: Value, done: Value):
dest_path = os.path.join("static/thumbnails", str(directory.id))
if os.path.exists(dest_path):
shutil.rmtree(dest_path)
docs = Search(config.elasticsearch_index).get_all_documents(directory.id)
tn_generator = ThumbnailGenerator(int(directory.get_option("ThumbnailSize")),
int(directory.get_option("ThumbnailQuality")),
directory.get_option("ThumbnailColor"))
tn_generator.generate_all(docs, dest_path, counter, directory, total_files)
done.value = 1
def cancel_task(self):
self.current_task.done.value = 1
def check_new_task(self):
if self.current_task is None:
tasks = self.storage.tasks()
if len(tasks) > 0:
self.start_task(tasks[sorted(tasks)[0]])
else:
if self.current_task.done.value == 1:
self.current_process.terminate()
self.storage.del_task(self.current_task.task.id)
self.current_task = None