mirror of
https://github.com/simon987/Simple-Incremental-Search-Tool.git
synced 2025-04-04 07:52:58 +00:00
518 lines
17 KiB
Python
518 lines
17 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from io import BytesIO
|
|
|
|
import bcrypt
|
|
import humanfriendly
|
|
from PIL import Image
|
|
from flask import Flask, render_template, request, redirect, flash, session, abort, send_file
|
|
|
|
import config
|
|
from crawler import TaskManager
|
|
from search import Search
|
|
from storage import Directory, Option, Task, User
|
|
from storage import LocalStorage, DuplicateDirectoryException, DuplicateUserException
|
|
from thumbnail import ThumbnailGenerator
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = "A very secret key"
|
|
storage = LocalStorage(config.db_path)
|
|
|
|
# Disable flask logging
|
|
flaskLogger = logging.getLogger('werkzeug')
|
|
flaskLogger.setLevel(logging.ERROR)
|
|
|
|
tm = TaskManager(storage)
|
|
search = Search(config.elasticsearch_index)
|
|
|
|
|
|
def get_dir_size(path):
|
|
size = 0
|
|
|
|
for root, dirs, files in os.walk(path):
|
|
for filename in files:
|
|
full_path = os.path.join(root, filename)
|
|
size += os.path.getsize(full_path)
|
|
|
|
return size
|
|
|
|
|
|
@app.route("/user/<user>")
|
|
def user_manage(user):
|
|
if "admin" in session and session["admin"]:
|
|
return render_template("user_manage.html", directories=storage.dirs(), user=storage.users()[user])
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session.pop("username")
|
|
session.pop("admin")
|
|
flash("Successfully logged out", "success")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/login", methods=['POST'])
|
|
def login():
|
|
username = request.form["username"]
|
|
password = request.form["password"]
|
|
|
|
if storage.auth_user(username, password):
|
|
session["username"] = username
|
|
session["admin"] = storage.users()[username].admin
|
|
|
|
flash("Successfully logged in", "success")
|
|
else:
|
|
flash("Invalid username or password", "danger")
|
|
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/user")
|
|
def user_page():
|
|
admin_account_present = False
|
|
|
|
for user in storage.users():
|
|
if storage.users()[user].admin:
|
|
admin_account_present = True
|
|
break
|
|
|
|
if not admin_account_present or ("admin" in session and session["admin"]):
|
|
return render_template("user.html", users=storage.users(), admin_account_present=admin_account_present)
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/user/<username>/set_access")
|
|
def user_set_access(username):
|
|
if "admin" in session and session["admin"]:
|
|
dir_id = request.args["dir_id"]
|
|
user = storage.users()[username]
|
|
|
|
if request.args["access"] == "1":
|
|
user.readable_directories.add(int(dir_id))
|
|
else:
|
|
if int(dir_id) in user.readable_directories:
|
|
user.readable_directories.remove(int(dir_id))
|
|
|
|
storage.update_user(user)
|
|
|
|
flash("Permissions mises à jour", "success")
|
|
return redirect("/user/" + username)
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/user/<username>/set_admin")
|
|
def user_set_admin(username):
|
|
if "admin" in session and session["admin"]:
|
|
|
|
user = storage.users()[username]
|
|
|
|
if user.username == session["username"]:
|
|
flash("You cannot modifiy your own account", "warning")
|
|
else:
|
|
user.admin = request.args["admin"] == "1"
|
|
storage.update_user(user)
|
|
|
|
flash("Permissions updated", "success")
|
|
|
|
return redirect("/user/" + username)
|
|
|
|
|
|
@app.route("/user/add", methods=['POST'])
|
|
def user_add():
|
|
admin_account_present = False
|
|
|
|
for user in storage.users():
|
|
if storage.users()[user].admin:
|
|
admin_account_present = True
|
|
break
|
|
|
|
if not admin_account_present or ("admin" in session and session["admin"]):
|
|
username = request.form["username"]
|
|
password = bcrypt.hashpw(request.form["password"].encode("utf-8"), bcrypt.gensalt(config.bcrypt_rounds))
|
|
is_admin = True if "is_admin" in request.form else False
|
|
|
|
try:
|
|
storage.save_user(User(username, password, is_admin))
|
|
flash("Created new user", "success")
|
|
except DuplicateUserException:
|
|
flash("<strong>Couldn't create user</strong> "
|
|
"Make sure that the username is unique", "danger")
|
|
|
|
return redirect("/user")
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/user/<username>/del")
|
|
def user_del(username):
|
|
if "admin" in session and session["admin"]:
|
|
|
|
if session["username"] == username:
|
|
flash("You cannot delete your own account", "warning")
|
|
return redirect("/user/" + username)
|
|
storage.remove_user(username)
|
|
flash("User deleted", "success")
|
|
return redirect("/user")
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/suggest")
|
|
def suggest():
|
|
return json.dumps(search.suggest(request.args.get("prefix")))
|
|
|
|
|
|
@app.route("/document/<doc_id>")
|
|
def document(doc_id):
|
|
doc = search.get_doc(doc_id)["_source"]
|
|
directory = storage.dirs()[doc["directory"]]
|
|
|
|
del doc["directory"]
|
|
|
|
return render_template("document.html", doc=doc, directory=directory, doc_id=doc_id)
|
|
|
|
|
|
@app.route("/dl/<doc_id>")
|
|
def serve_file(doc_id):
|
|
doc = search.get_doc(doc_id)["_source"]
|
|
directory = storage.dirs()[doc["directory"]]
|
|
|
|
extension = "" if doc["extension"] is None or doc["extension"] == "" else "." + doc["extension"]
|
|
full_path = os.path.join(directory.path, doc["path"], doc["name"] + extension)
|
|
|
|
if not os.path.exists(full_path):
|
|
return abort(404)
|
|
|
|
return send_file(full_path, mimetype=doc["mime"], as_attachment=True, attachment_filename=doc["name"] + extension)
|
|
|
|
|
|
@app.route("/file/<doc_id>")
|
|
def download(doc_id):
|
|
doc = search.get_doc(doc_id)["_source"]
|
|
directory = storage.dirs()[doc["directory"]]
|
|
extension = "" if doc["extension"] is None or doc["extension"] == "" else "." + doc["extension"]
|
|
full_path = os.path.join(directory.path, doc["path"], doc["name"] + extension)
|
|
|
|
if not os.path.exists(full_path):
|
|
return abort(404)
|
|
|
|
return send_file(full_path, mimetype=doc["mime"], conditional=True)
|
|
|
|
|
|
@app.route("/thumb/<doc_id>")
|
|
def thumb(doc_id):
|
|
doc = search.get_doc(doc_id)
|
|
|
|
if doc is not None:
|
|
dest_path = "static/thumbnails/" + str(doc["_source"]["directory"])
|
|
os.makedirs(dest_path, exist_ok=True)
|
|
tn_path = os.path.join(dest_path, doc_id)
|
|
if os.path.isfile(tn_path):
|
|
return send_file(tn_path)
|
|
|
|
directory = storage.dirs()[doc["_source"]["directory"]]
|
|
extension = "" if doc["_source"]["extension"] is None or doc["_source"]["extension"] == "" else \
|
|
"." + doc["_source"]["extension"]
|
|
full_path = os.path.join(directory.path,
|
|
doc["_source"]["path"],
|
|
doc["_source"]["name"] + extension)
|
|
tn_generator = ThumbnailGenerator(int(directory.get_option("ThumbnailSize")),
|
|
int(directory.get_option("ThumbnailQuality")),
|
|
directory.get_option("ThumbnailColor"))
|
|
tn_generator.generate(full_path, tn_path, doc["_source"]["mime"])
|
|
if os.path.isfile(tn_path):
|
|
return send_file(tn_path)
|
|
|
|
default_thumbnail = BytesIO()
|
|
Image.new("RGB", (255, 128), (0, 0, 0)).save(default_thumbnail, "JPEG")
|
|
default_thumbnail.seek(0)
|
|
return send_file(default_thumbnail, "image/jpeg")
|
|
|
|
|
|
@app.route("/")
|
|
def search_page():
|
|
mime_map = search.get_mime_map()
|
|
mime_map.append({"id": "any", "text": "All"})
|
|
|
|
directories = [storage.dirs()[x] for x in get_allowed_dirs(session["username"] if "username" in session else None)]
|
|
|
|
return render_template("search.html",
|
|
directories=directories,
|
|
mime_map=mime_map)
|
|
|
|
|
|
@app.route("/list")
|
|
def search_liste_page():
|
|
return render_template("searchList.html")
|
|
|
|
|
|
def get_allowed_dirs(username):
|
|
if config.allow_guests:
|
|
return [x for x in storage.dirs() if storage.dirs()[x].enabled]
|
|
if username:
|
|
user = storage.users()[username]
|
|
return [x for x in storage.dirs() if storage.dirs()[x].enabled and x in user.readable_directories]
|
|
return list()
|
|
|
|
|
|
@app.route("/search", methods=['POST'])
|
|
def search_route():
|
|
query = request.json["q"]
|
|
query = "" if query is None else query
|
|
|
|
size_min = request.json["size_min"]
|
|
size_max = request.json["size_max"]
|
|
mime_types = request.json["mime_types"]
|
|
must_match = request.json["must_match"]
|
|
directories = request.json["directories"]
|
|
|
|
# Remove disabled & non-existing directories
|
|
allowed_dirs = get_allowed_dirs(session["username"] if "username" in session else None)
|
|
directories = [x for x in directories if x in allowed_dirs]
|
|
|
|
path = request.json["path"]
|
|
|
|
page = search.search(query, size_min, size_max, mime_types, must_match, directories, path)
|
|
|
|
return json.dumps(page)
|
|
|
|
|
|
@app.route("/scroll")
|
|
def scroll_route():
|
|
scroll_id = request.args.get("scroll_id")
|
|
|
|
page = search.scroll(scroll_id)
|
|
|
|
return json.dumps(page)
|
|
|
|
|
|
@app.route("/directory")
|
|
def dir_list():
|
|
if "admin" in session and session["admin"]:
|
|
return render_template("directory.html", directories=storage.dirs())
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/directory/add")
|
|
def directory_add():
|
|
if "admin" in session and session["admin"]:
|
|
path = request.args.get("path")
|
|
name = request.args.get("name")
|
|
|
|
if path is not None and name is not None:
|
|
d = Directory(path, True, [], name)
|
|
|
|
try:
|
|
d.set_default_options()
|
|
storage.save_directory(d)
|
|
flash("<strong>Directory created</strong>", "success")
|
|
except DuplicateDirectoryException:
|
|
flash("<strong>The directory couldn't be created</strong> Make sure to chose a unique name",
|
|
"danger")
|
|
|
|
return redirect("/directory")
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/directory/<int:dir_id>")
|
|
def directory_manage(dir_id):
|
|
if "admin" in session and session["admin"]:
|
|
directory = storage.dirs()[dir_id]
|
|
tn_size = get_dir_size("static/thumbnails/" + str(dir_id))
|
|
tn_size_formatted = humanfriendly.format_size(tn_size)
|
|
|
|
return render_template("directory_manage.html", directory=directory, tn_size=tn_size,
|
|
tn_size_formatted=tn_size_formatted)
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/directory/<int:dir_id>/update")
|
|
def directory_update(dir_id):
|
|
if "admin" in session and session["admin"]:
|
|
directory = storage.dirs()[dir_id]
|
|
|
|
name = request.args.get("name")
|
|
name = directory.name if name is None else name
|
|
|
|
enabled = request.args.get("enabled")
|
|
enabled = directory.enabled if enabled is None else int(enabled)
|
|
|
|
path = request.args.get("path")
|
|
path = directory.path if path is None else path
|
|
|
|
# Only name and enabled status can be updated
|
|
updated_dir = Directory(path, enabled, directory.options, name)
|
|
updated_dir.id = dir_id
|
|
|
|
try:
|
|
storage.update_directory(updated_dir)
|
|
flash("<strong>Updated directory</strong>", "success")
|
|
|
|
except DuplicateDirectoryException:
|
|
flash("<strong>The directory couldn't be updated</strong> Make the that the path is unique",
|
|
"danger")
|
|
|
|
return redirect("/directory/" + str(dir_id))
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/directory/<int:dir_id>/update_opt")
|
|
def directory_update_opt(dir_id):
|
|
if "admin" in session and session["admin"]:
|
|
opt_id = request.args.get("id")
|
|
opt_key = request.args.get("key")
|
|
opt_value = request.args.get("value")
|
|
|
|
storage.update_option(Option(opt_key, opt_value, dir_id, opt_id))
|
|
|
|
return redirect("/directory/" + str(dir_id))
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/directory/<int:dir_id>/del")
|
|
def directory_del(dir_id):
|
|
if "admin" in session and session["admin"]:
|
|
search.delete_directory(dir_id)
|
|
if os.path.exists("static/thumbnails/" + str(dir_id)):
|
|
shutil.rmtree("static/thumbnails/" + str(dir_id))
|
|
|
|
storage.remove_directory(dir_id)
|
|
flash("<strong>Deleted folder</strong>", "success")
|
|
|
|
return redirect("/directory")
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/directory/<int:dir_id>/reset")
|
|
def directory_reset(dir_id):
|
|
if "admin" in session and session["admin"]:
|
|
directory = storage.dirs()[dir_id]
|
|
|
|
for opt in directory.options:
|
|
storage.del_option(opt.id)
|
|
|
|
directory.set_default_options()
|
|
|
|
for opt in directory.options:
|
|
opt.dir_id = dir_id
|
|
storage.save_option(opt)
|
|
|
|
storage.dir_cache_outdated = True
|
|
|
|
search.delete_directory(dir_id)
|
|
|
|
flash("<strong>Reset directory options</strong>", "success")
|
|
return redirect("directory/" + str(dir_id))
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/task")
|
|
def task():
|
|
if "admin" in session and session["admin"]:
|
|
return render_template("task.html", tasks=storage.tasks(), directories=storage.dirs(),
|
|
task_list=json.dumps(list(storage.tasks().keys())))
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/task/current")
|
|
def get_current_task():
|
|
if "admin" in session and session["admin"]:
|
|
|
|
if tm and tm.current_task:
|
|
return tm.current_task.to_json()
|
|
return ""
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/task/add")
|
|
def task_add():
|
|
if "admin" in session and session["admin"]:
|
|
task_type = request.args.get("type")
|
|
directory = request.args.get("directory")
|
|
|
|
if task_type not in ("1", "2"):
|
|
flash("Please choose a task type", "danger")
|
|
return redirect("/task")
|
|
|
|
if directory.isdigit() and int(directory) in storage.dirs():
|
|
storage.save_task(Task(task_type, directory))
|
|
else:
|
|
flash("You must choose a directory", "danger")
|
|
|
|
return redirect("/task")
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/task/<int:task_id>/del")
|
|
def task_del(task_id):
|
|
if "admin" in session and session["admin"]:
|
|
storage.del_task(task_id)
|
|
|
|
if tm.current_task is not None and task_id == tm.current_task.task.id:
|
|
tm.cancel_task()
|
|
|
|
return redirect("/task")
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/reset_es")
|
|
def reset_es():
|
|
if "admin" in session and session["admin"]:
|
|
flash("Elasticsearch index has been reset. Modifications made in <b>config.py</b> have been applied.",
|
|
"success")
|
|
|
|
tm.indexer.init()
|
|
if os.path.exists("static/thumbnails"):
|
|
shutil.rmtree("static/thumbnails")
|
|
|
|
return redirect("/dashboard")
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/dashboard")
|
|
def dashboard():
|
|
if "admin" in session and session["admin"]:
|
|
tn_sizes = {}
|
|
tn_size_total = 0
|
|
for directory in storage.dirs():
|
|
tn_size = get_dir_size("static/thumbnails/" + str(directory))
|
|
tn_size_formatted = humanfriendly.format_size(tn_size)
|
|
|
|
tn_sizes[directory] = tn_size_formatted
|
|
tn_size_total += tn_size
|
|
|
|
tn_size_total_formatted = humanfriendly.format_size(tn_size_total)
|
|
|
|
return render_template("dashboard.html", version=config.VERSION, tn_sizes=tn_sizes,
|
|
tn_size_total=tn_size_total_formatted,
|
|
doc_size=humanfriendly.format_size(search.get_doc_size()),
|
|
doc_count=search.get_doc_count(),
|
|
db_path=config.db_path,
|
|
elasticsearch_url=config.elasticsearch_url,
|
|
index_size=humanfriendly.format_size(search.get_index_size()))
|
|
|
|
flash("You are not authorized to access this page", "warning")
|
|
return redirect("/")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run("0.0.0.0", 8080, threaded=True)
|