from flask import Flask, render_template, request, redirect, flash, session, abort, send_file from storage import Directory, Option, Task, User from storage import LocalStorage, DuplicateDirectoryException, DuplicateUserException from crawler import RunningTask, TaskManager import json import os import shutil import bcrypt import config import humanfriendly from search import Search from PIL import Image from io import BytesIO app = Flask(__name__) app.secret_key = "A very secret key" storage = LocalStorage(config.db_path) tm = TaskManager(storage) search = Search("changeme") def get_dir_size(path): size = 0 for root, dirs, files in os.walk(path): for filename in files: full_path = os.path.join(root, filename) size += os.path.getsize(full_path) return size @app.route("/user/") def user_manage(user): if "admin" in session and session["admin"]: return render_template("user_manage.html", directories=storage.dirs(), user=storage.users()[user]) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/logout") def logout(): session.pop("username") session.pop("admin") flash("Déconnection réussie", "success") return redirect("/") @app.route("/login", methods=['POST']) def login(): username = request.form["username"] password = request.form["password"] if storage.auth_user(username, password): session["username"] = username session["admin"] = storage.users()[username].admin flash("Connexion réussie", "success") else: flash("Nom d'utilisateur ou mot de passe invalide", "danger") return redirect("/") @app.route("/user") def user_page(): admin_account_present = False for user in storage.users(): if storage.users()[user].admin: admin_account_present = True break if not admin_account_present or ("admin" in session and session["admin"]): return render_template("user.html", users=storage.users(), admin_account_present=admin_account_present) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/user//set_access") def user_set_access(username): if "admin" in session and session["admin"]: dir_id = request.args["dir_id"] user = storage.users()[username] if request.args["access"] == "1": user.readable_directories.add(int(dir_id)) else: if int(dir_id) in user.readable_directories: user.readable_directories.remove(int(dir_id)) storage.update_user(user) flash("Permissions mises à jour", "success") return redirect("/user/" + username) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/user//set_admin") def user_set_admin(username): if "admin" in session and session["admin"]: user = storage.users()[username] if user.username == session["username"]: flash("Vous n'êtes pas autorisé à changer votre propre type de compte", "warning") else: user.admin = request.args["admin"] == "1" storage.update_user(user) flash("Permissions mises à jour", "success") return redirect("/user/" + username) @app.route("/user/add", methods=['POST']) def user_add(): admin_account_present = False for user in storage.users(): if storage.users()[user].admin: admin_account_present = True break if not admin_account_present or ("admin" in session and session["admin"]): username = request.form["username"] password = bcrypt.hashpw(request.form["password"].encode("utf-8"), bcrypt.gensalt(config.bcrypt_rounds)) is_admin = True if "is_admin" in request.form else False try: storage.save_user(User(username, password, is_admin)) flash("Nouvel utilisateur créé", "success") except DuplicateUserException: flash("L'utilisateur n'as pas pu être créé Assurez vous que le nom d'utilisateur est unique", "danger") return redirect("/user") else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/user//del") def user_del(username): if "admin" in session and session["admin"]: if session["username"] == username: flash("Vous ne pouvez pas supprimer votre propre compte", "warning") return redirect("/user/" + username) else: storage.remove_user(username) flash("Utilisateur supprimé", "success") return redirect("/user") else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/suggest") def suggest(): return json.dumps(search.suggest(request.args.get("prefix"))) @app.route("/document/") def document(doc_id): doc = search.get_doc(doc_id)["_source"] directory = storage.dirs()[doc["directory"]] del doc["directory"] return render_template("document.html", doc=doc, directory=directory, doc_id=doc_id) @app.route("/dl/") def file(doc_id): doc = search.get_doc(doc_id)["_source"] directory = storage.dirs()[doc["directory"]] extension = "" if doc["extension"] is None or doc["extension"] == "" else "." + doc["extension"] full_path = os.path.join(directory.path, doc["path"], doc["name"] + extension) if not os.path.exists(full_path): return abort(404) return send_file(full_path, mimetype=doc["mime"], as_attachment=True, attachment_filename=doc["name"] + extension) @app.route("/file/") def download(doc_id): doc = search.get_doc(doc_id)["_source"] directory = storage.dirs()[doc["directory"]] extension = "" if doc["extension"] is None or doc["extension"] == "" else "." + doc["extension"] full_path = os.path.join(directory.path, doc["path"], doc["name"] + extension) if not os.path.exists(full_path): return abort(404) return send_file(full_path, mimetype=doc["mime"], conditional=True) @app.route("/thumb/") def thumb(doc_id): doc = search.get_doc(doc_id) if doc is not None: tn_path = os.path.join("static/thumbnails/", str(doc["_source"]["directory"]), doc_id) print(tn_path) if os.path.isfile(tn_path): return send_file(tn_path) else: default_thumbnail = BytesIO() Image.new("RGB", (255, 128), (0, 0, 0)).save(default_thumbnail, "JPEG") default_thumbnail.seek(0) return send_file(default_thumbnail, "image/jpeg") else: default_thumbnail = BytesIO() Image.new("RGB", (255, 128), (0, 0, 0)).save(default_thumbnail, "JPEG") default_thumbnail.seek(0) return send_file(default_thumbnail, "image/jpeg") @app.route("/") def search_page(): mime_map = search.get_mime_map() mime_map.append({"id": "any", "text": "Tous"}) directories = [storage.dirs()[x] for x in get_allowed_dirs(session["username"] if "username" in session else None)] return render_template("search.html", directories=directories, mime_map=mime_map) @app.route("/list") def search_liste_page(): return render_template("searchList.html") def get_allowed_dirs(username): if config.allow_guests: return [x for x in storage.dirs() if x.enabled] else: if username: user = storage.users()[username] return [x for x in storage.dirs() if storage.dirs()[x].enabled and x in user.readable_directories] else: return list() @app.route("/search", methods=['POST']) def search_route(): query = request.json["q"] query = "" if query is None else query size_min = request.json["size_min"] size_max = request.json["size_max"] mime_types = request.json["mime_types"] must_match = request.json["must_match"] directories = request.json["directories"] # Remove disabled & non-existing directories allowed_dirs = get_allowed_dirs(session["username"] if "username" in session else None) directories = [x for x in directories if x in allowed_dirs] path = request.json["path"] page = search.search(query, size_min, size_max, mime_types, must_match, directories, path) return json.dumps(page) @app.route("/scroll") def scroll_route(): scroll_id = request.args.get("scroll_id") page = search.scroll(scroll_id) return json.dumps(page) @app.route("/directory") def dir_list(): if "admin" in session and session["admin"]: return render_template("directory.html", directories=storage.dirs()) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/directory/add") def directory_add(): if "admin" in session and session["admin"]: path = request.args.get("path") name = request.args.get("name") if path is not None and name is not None: d = Directory(path, True, [], name) try: d.set_default_options() storage.save_directory(d) flash("Dossier créé", "success") except DuplicateDirectoryException: flash("Le dossier n'a pas pu être créé Assurer vous de choisir un nom unique", "danger") return redirect("/directory") else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/directory/") def directory_manage(dir_id): if "admin" in session and session["admin"]: directory = storage.dirs()[dir_id] tn_size = get_dir_size("static/thumbnails/" + str(dir_id)) tn_size_formatted = humanfriendly.format_size(tn_size) return render_template("directory_manage.html", directory=directory, tn_size=tn_size, tn_size_formatted=tn_size_formatted) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/directory//update") def directory_update(dir_id): if "admin" in session and session["admin"]: directory = storage.dirs()[dir_id] name = request.args.get("name") name = directory.name if name is None else name enabled = request.args.get("enabled") enabled = directory.enabled if enabled is None else int(enabled) path = request.args.get("path") path = directory.path if path is None else path # Only name and enabled status can be updated updated_dir = Directory(path, enabled, directory.options, name) updated_dir.id = dir_id try: storage.update_directory(updated_dir) flash("Dossier mis à jour", "success") except DuplicateDirectoryException: flash("Le dossier n'a pas pu être mis à jour Assurez vous que le chemin est unique", "danger") return redirect("/directory/" + str(dir_id)) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/directory//update_opt") def directory_update_opt(dir_id): if "admin" in session and session["admin"]: opt_id = request.args.get("id") opt_key = request.args.get("key") opt_value = request.args.get("value") storage.update_option(Option(opt_key, opt_value, dir_id, opt_id)) return redirect("/directory/" + str(dir_id)) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/directory//del") def directory_del(dir_id): if "admin" in session and session["admin"]: search.delete_directory(dir_id) if os.path.exists("static/thumbnails/" + str(dir_id)): shutil.rmtree("static/thumbnails/" + str(dir_id)) storage.remove_directory(dir_id) flash("Dossier supprimé", "success") return redirect("/directory") else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/directory//reset") def directory_reset(dir_id): if "admin" in session and session["admin"]: directory = storage.dirs()[dir_id] for opt in directory.options: storage.del_option(opt.id) directory.set_default_options() for opt in directory.options: opt.dir_id = dir_id storage.save_option(opt) storage.dir_cache_outdated = True search.delete_directory(dir_id) flash("Options du dossier réinitialisés", "success") return redirect("directory/" + str(dir_id)) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/task") def task(): if "admin" in session and session["admin"]: return render_template("task.html", tasks=storage.tasks(), directories=storage.dirs(), task_list=json.dumps(list(storage.tasks().keys()))) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/task/current") def get_current_task(): if "admin" in session and session["admin"]: if tm and tm.current_task: return tm.current_task.to_json() else: return "" else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/task/add") def task_add(): if "admin" in session and session["admin"]: task_type = request.args.get("type") directory = request.args.get("directory") if task_type not in ("1", "2"): flash("Vous devez choisir un type de tâche", "danger") return redirect("/task") if directory.isdigit() and int(directory) in storage.dirs(): storage.save_task(Task(task_type, directory)) else: flash("Vous devez choisir un dossier", "danger") return redirect("/task") else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/task//del") def task_del(task_id): if "admin" in session and session["admin"]: storage.del_task(task_id) if tm.current_task is not None and task_id == tm.current_task.task.id: tm.cancel_task() return redirect("/task") else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/reset_es") def reset_es(): if "admin" in session and session["admin"]: flash("Elasticsearch a été réinitialisé, les changements dans config.py ont été appliqués", "success") tm.indexer.init() if os.path.exists("static/thumbnails"): shutil.rmtree("static/thumbnails") return redirect("/dashboard") else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") @app.route("/dashboard") def dashboard(): if "admin" in session and session["admin"]: tn_sizes = {} tn_size_total = 0 for directory in storage.dirs(): tn_size = get_dir_size("static/thumbnails/" + str(directory)) tn_size_formatted = humanfriendly.format_size(tn_size) tn_sizes[directory] = tn_size_formatted tn_size_total += tn_size tn_size_total_formatted = humanfriendly.format_size(tn_size_total) return render_template("dashboard.html", version=config.VERSION, tn_sizes=tn_sizes, tn_size_total=tn_size_total_formatted, doc_size=humanfriendly.format_size(search.get_doc_size()), doc_count=search.get_doc_count(), db_path=config.db_path, elasticsearch_url=config.elasticsearch_url, index_size=humanfriendly.format_size(search.get_index_size())) else: flash("Vous n'êtes pas autorisé à accéder à cette page", "warning") return redirect("/") if __name__ == "__main__": app.run("0.0.0.0", 8080, threaded=True)