import json import logging import os import shutil from io import BytesIO import bcrypt import humanfriendly from PIL import Image from flask import Flask, render_template, request, redirect, flash, session, abort, send_file import config from crawler import TaskManager from search import Search from storage import Directory, Option, Task, User from storage import LocalStorage, DuplicateDirectoryException, DuplicateUserException from thumbnail import ThumbnailGenerator app = Flask(__name__) app.secret_key = "A very secret key" storage = LocalStorage(config.db_path) # Disable flask logging flaskLogger = logging.getLogger('werkzeug') flaskLogger.setLevel(logging.ERROR) tm = TaskManager(storage) search = Search(config.elasticsearch_index) def get_dir_size(path): size = 0 for root, dirs, files in os.walk(path): for filename in files: full_path = os.path.join(root, filename) size += os.path.getsize(full_path) return size @app.route("/user/") def user_manage(user): if "admin" in session and session["admin"]: return render_template("user_manage.html", directories=storage.dirs(), user=storage.users()[user]) flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/logout") def logout(): session.pop("username") session.pop("admin") flash("Successfully logged out", "success") return redirect("/") @app.route("/login", methods=['POST']) def login(): username = request.form["username"] password = request.form["password"] if storage.auth_user(username, password): session["username"] = username session["admin"] = storage.users()[username].admin flash("Successfully logged in", "success") else: flash("Invalid username or password", "danger") return redirect("/") @app.route("/user") def user_page(): admin_account_present = False for user in storage.users(): if storage.users()[user].admin: admin_account_present = True break if not admin_account_present or ("admin" in session and session["admin"]): return render_template("user.html", users=storage.users(), admin_account_present=admin_account_present) flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/user//set_access") def user_set_access(username): if "admin" in session and session["admin"]: dir_id = request.args["dir_id"] user = storage.users()[username] if request.args["access"] == "1": user.readable_directories.add(int(dir_id)) else: if int(dir_id) in user.readable_directories: user.readable_directories.remove(int(dir_id)) storage.update_user(user) flash("Permissions mises à jour", "success") return redirect("/user/" + username) flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/user//set_admin") def user_set_admin(username): if "admin" in session and session["admin"]: user = storage.users()[username] if user.username == session["username"]: flash("You cannot modifiy your own account", "warning") else: user.admin = request.args["admin"] == "1" storage.update_user(user) flash("Permissions updated", "success") return redirect("/user/" + username) @app.route("/user/add", methods=['POST']) def user_add(): admin_account_present = False for user in storage.users(): if storage.users()[user].admin: admin_account_present = True break if not admin_account_present or ("admin" in session and session["admin"]): username = request.form["username"] password = bcrypt.hashpw(request.form["password"].encode("utf-8"), bcrypt.gensalt(config.bcrypt_rounds)) is_admin = True if "is_admin" in request.form else False try: storage.save_user(User(username, password, is_admin)) flash("Created new user", "success") except DuplicateUserException: flash("Couldn't create user " "Make sure that the username is unique", "danger") return redirect("/user") flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/user//del") def user_del(username): if "admin" in session and session["admin"]: if session["username"] == username: flash("You cannot delete your own account", "warning") return redirect("/user/" + username) storage.remove_user(username) flash("User deleted", "success") return redirect("/user") flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/suggest") def suggest(): return json.dumps(search.suggest(request.args.get("prefix"))) @app.route("/document/") def document(doc_id): doc = search.get_doc(doc_id)["_source"] directory = storage.dirs()[doc["directory"]] del doc["directory"] return render_template("document.html", doc=doc, directory=directory, doc_id=doc_id) @app.route("/dl/") def serve_file(doc_id): doc = search.get_doc(doc_id)["_source"] directory = storage.dirs()[doc["directory"]] extension = "" if doc["extension"] is None or doc["extension"] == "" else "." + doc["extension"] full_path = os.path.join(directory.path, doc["path"], doc["name"] + extension) if not os.path.exists(full_path): return abort(404) return send_file(full_path, mimetype=doc["mime"], as_attachment=True, attachment_filename=doc["name"] + extension) @app.route("/file/") def download(doc_id): doc = search.get_doc(doc_id)["_source"] directory = storage.dirs()[doc["directory"]] extension = "" if doc["extension"] is None or doc["extension"] == "" else "." + doc["extension"] full_path = os.path.join(directory.path, doc["path"], doc["name"] + extension) if not os.path.exists(full_path): return abort(404) return send_file(full_path, mimetype=doc["mime"], conditional=True) @app.route("/thumb/") def thumb(doc_id): doc = search.get_doc(doc_id) if doc is not None: dest_path = "static/thumbnails/" + str(doc["_source"]["directory"]) os.makedirs(dest_path, exist_ok=True) tn_path = os.path.join(dest_path, doc_id) if os.path.isfile(tn_path): return send_file(tn_path) directory = storage.dirs()[doc["_source"]["directory"]] extension = "" if doc["_source"]["extension"] is None or doc["_source"]["extension"] == "" else \ "." + doc["_source"]["extension"] full_path = os.path.join(directory.path, doc["_source"]["path"], doc["_source"]["name"] + extension) tn_generator = ThumbnailGenerator(int(directory.get_option("ThumbnailSize")), int(directory.get_option("ThumbnailQuality")), directory.get_option("ThumbnailColor")) tn_generator.generate(full_path, tn_path, doc["_source"]["mime"]) if os.path.isfile(tn_path): return send_file(tn_path) default_thumbnail = BytesIO() Image.new("RGB", (255, 128), (0, 0, 0)).save(default_thumbnail, "JPEG") default_thumbnail.seek(0) return send_file(default_thumbnail, "image/jpeg") @app.route("/") def search_page(): mime_map = search.get_mime_map() mime_map.append({"id": "any", "text": "All"}) directories = [storage.dirs()[x] for x in get_allowed_dirs(session["username"] if "username" in session else None)] return render_template("search.html", directories=directories, mime_map=mime_map) @app.route("/list") def search_liste_page(): return render_template("searchList.html") def get_allowed_dirs(username): if config.allow_guests: return [x for x in storage.dirs() if storage.dirs()[x].enabled] if username: user = storage.users()[username] return [x for x in storage.dirs() if storage.dirs()[x].enabled and x in user.readable_directories] return list() @app.route("/search", methods=['POST']) def search_route(): query = request.json["q"] query = "" if query is None else query size_min = request.json["size_min"] size_max = request.json["size_max"] mime_types = request.json["mime_types"] must_match = request.json["must_match"] directories = request.json["directories"] # Remove disabled & non-existing directories allowed_dirs = get_allowed_dirs(session["username"] if "username" in session else None) directories = [x for x in directories if x in allowed_dirs] path = request.json["path"] page = search.search(query, size_min, size_max, mime_types, must_match, directories, path) return json.dumps(page) @app.route("/scroll") def scroll_route(): scroll_id = request.args.get("scroll_id") page = search.scroll(scroll_id) return json.dumps(page) @app.route("/directory") def dir_list(): if "admin" in session and session["admin"]: return render_template("directory.html", directories=storage.dirs()) flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/directory/add") def directory_add(): if "admin" in session and session["admin"]: path = request.args.get("path") name = request.args.get("name") if path is not None and name is not None: d = Directory(path, True, [], name) try: d.set_default_options() storage.save_directory(d) flash("Directory created", "success") except DuplicateDirectoryException: flash("The directory couldn't be created Make sure to chose a unique name", "danger") return redirect("/directory") flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/directory/") def directory_manage(dir_id): if "admin" in session and session["admin"]: directory = storage.dirs()[dir_id] tn_size = get_dir_size("static/thumbnails/" + str(dir_id)) tn_size_formatted = humanfriendly.format_size(tn_size) return render_template("directory_manage.html", directory=directory, tn_size=tn_size, tn_size_formatted=tn_size_formatted) flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/directory//update") def directory_update(dir_id): if "admin" in session and session["admin"]: directory = storage.dirs()[dir_id] name = request.args.get("name") name = directory.name if name is None else name enabled = request.args.get("enabled") enabled = directory.enabled if enabled is None else int(enabled) path = request.args.get("path") path = directory.path if path is None else path # Only name and enabled status can be updated updated_dir = Directory(path, enabled, directory.options, name) updated_dir.id = dir_id try: storage.update_directory(updated_dir) flash("Updated directory", "success") except DuplicateDirectoryException: flash("The directory couldn't be updated Make the that the path is unique", "danger") return redirect("/directory/" + str(dir_id)) flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/directory//update_opt") def directory_update_opt(dir_id): if "admin" in session and session["admin"]: opt_id = request.args.get("id") opt_key = request.args.get("key") opt_value = request.args.get("value") storage.update_option(Option(opt_key, opt_value, dir_id, opt_id)) return redirect("/directory/" + str(dir_id)) flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/directory//del") def directory_del(dir_id): if "admin" in session and session["admin"]: search.delete_directory(dir_id) if os.path.exists("static/thumbnails/" + str(dir_id)): shutil.rmtree("static/thumbnails/" + str(dir_id)) storage.remove_directory(dir_id) flash("Deleted folder", "success") return redirect("/directory") flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/directory//reset") def directory_reset(dir_id): if "admin" in session and session["admin"]: directory = storage.dirs()[dir_id] for opt in directory.options: storage.del_option(opt.id) directory.set_default_options() for opt in directory.options: opt.dir_id = dir_id storage.save_option(opt) storage.dir_cache_outdated = True search.delete_directory(dir_id) flash("Reset directory options", "success") return redirect("directory/" + str(dir_id)) flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/task") def task(): if "admin" in session and session["admin"]: return render_template("task.html", tasks=storage.tasks(), directories=storage.dirs(), task_list=json.dumps(list(storage.tasks().keys()))) flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/task/current") def get_current_task(): if "admin" in session and session["admin"]: if tm and tm.current_task: return tm.current_task.to_json() return "" flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/task/add") def task_add(): if "admin" in session and session["admin"]: task_type = request.args.get("type") directory = request.args.get("directory") if task_type not in ("1", "2"): flash("Please choose a task type", "danger") return redirect("/task") if directory.isdigit() and int(directory) in storage.dirs(): storage.save_task(Task(task_type, directory)) else: flash("You must choose a directory", "danger") return redirect("/task") flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/task//del") def task_del(task_id): if "admin" in session and session["admin"]: storage.del_task(task_id) if tm.current_task is not None and task_id == tm.current_task.task.id: tm.cancel_task() return redirect("/task") flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/reset_es") def reset_es(): if "admin" in session and session["admin"]: flash("Elasticsearch index has been reset. Modifications made in config.py have been applied.", "success") tm.indexer.init() if os.path.exists("static/thumbnails"): shutil.rmtree("static/thumbnails") return redirect("/dashboard") flash("You are not authorized to access this page", "warning") return redirect("/") @app.route("/dashboard") def dashboard(): if "admin" in session and session["admin"]: tn_sizes = {} tn_size_total = 0 for directory in storage.dirs(): tn_size = get_dir_size("static/thumbnails/" + str(directory)) tn_size_formatted = humanfriendly.format_size(tn_size) tn_sizes[directory] = tn_size_formatted tn_size_total += tn_size tn_size_total_formatted = humanfriendly.format_size(tn_size_total) return render_template("dashboard.html", version=config.VERSION, tn_sizes=tn_sizes, tn_size_total=tn_size_total_formatted, doc_size=humanfriendly.format_size(search.get_doc_size()), doc_count=search.get_doc_count(), db_path=config.db_path, elasticsearch_url=config.elasticsearch_url, index_size=humanfriendly.format_size(search.get_index_size())) flash("You are not authorized to access this page", "warning") return redirect("/") if __name__ == "__main__": app.run("0.0.0.0", 8080, threaded=True)