from flask import Flask, render_template, request, redirect, flash, session, abort, send_file from storage import Directory, Option, Task, User from storage import LocalStorage, DuplicateDirectoryException, DuplicateUserException from crawler import RunningTask, TaskManager import json import os import shutil import bcrypt import config import humanfriendly from search import Search from PIL import Image from io import BytesIO app = Flask(__name__) app.secret_key = "A very secret key" storage = LocalStorage(config.db_path) tm = TaskManager(storage) search = Search("changeme") def get_dir_size(path): size = 0 for root, dirs, files in os.walk(path): for filename in files: full_path = os.path.join(root, filename) size += os.path.getsize(full_path) return size @app.route("/user/") def user_manage(user): return user @app.route("/logout") def logout(): session.pop("username") session.pop("admin") flash("Successfully logged out", "success") return redirect("/") @app.route("/login", methods=['POST']) def login(): username = request.form["username"] password = request.form["password"] if storage.auth_user(username, password): session["username"] = username session["admin"] = storage.users()[username].admin print(session["admin"]) flash("Successfully logged in", "success") else: flash("Invalid username or password", "danger") return redirect("/") @app.route("/user") def user_page(): return render_template("user.html", users=storage.users()) @app.route("/user/add", methods=['POST']) def user_add(): username = request.form["username"] password = bcrypt.hashpw(request.form["password"].encode("utf-8"), bcrypt.gensalt(config.bcrypt_rounds)) is_admin = True if "is_admin" in request.form else False try: storage.save_user(User(username, password, is_admin)) flash("Created new user", "success") except DuplicateUserException: flash("Couldn't create user Make sure that the username is unique", "danger") return redirect("/user") @app.route("/suggest") def suggest(): return json.dumps(search.suggest(request.args.get("prefix"))) @app.route("/document/") def document(doc_id): doc = search.get_doc(doc_id)["_source"] directory = storage.dirs()[doc["directory"]] del doc["directory"] return render_template("document.html", doc=doc, directory=directory, doc_id=doc_id) @app.route("/dl/") def file(doc_id): doc = search.get_doc(doc_id)["_source"] directory = storage.dirs()[doc["directory"]] extension = "" if doc["extension"] is None or doc["extension"] == "" else "." + doc["extension"] full_path = os.path.join(directory.path, doc["path"], doc["name"] + extension) if not os.path.exists(full_path): return abort(404) return send_file(full_path, mimetype=doc["mime"], as_attachment=True, attachment_filename=doc["name"] + extension) @app.route("/file/") def download(doc_id): doc = search.get_doc(doc_id)["_source"] directory = storage.dirs()[doc["directory"]] extension = "" if doc["extension"] is None or doc["extension"] == "" else "." + doc["extension"] full_path = os.path.join(directory.path, doc["path"], doc["name"] + extension) if not os.path.exists(full_path): return abort(404) return send_file(full_path, mimetype=doc["mime"], conditional=True) @app.route("/thumb/") def thumb(doc_id): doc = search.get_doc(doc_id) if doc is not None: tn_path = os.path.join("static/thumbnails/", str(doc["_source"]["directory"]), doc_id) print(tn_path) if os.path.isfile(tn_path): return send_file(tn_path) else: print("tn not found") default_thumbnail = BytesIO() Image.new("RGB", (255, 128), (0, 0, 0)).save(default_thumbnail, "JPEG") default_thumbnail.seek(0) return send_file(default_thumbnail, "image/jpeg") else: print("doc is none") default_thumbnail = BytesIO() Image.new("RGB", (255, 128), (0, 0, 0)).save(default_thumbnail, "JPEG") default_thumbnail.seek(0) return send_file(default_thumbnail, "image/jpeg") @app.route("/") def search_page(): mime_map = search.get_mime_map() mime_map.append({"id": "any", "text": "Any"}) return render_template("search.html", directories=storage.dirs(), mime_map=mime_map) @app.route("/list") def search_liste_page(): return render_template("searchList.html") @app.route("/search", methods=['POST']) def search_route(): query = request.json["q"] query = "" if query is None else query size_min = request.json["size_min"] size_max = request.json["size_max"] mime_types = request.json["mime_types"] must_match = request.json["must_match"] directories = request.json["directories"] # Remove disabled & non-existing directories for search_directory in directories: directory_exists = False for dir_id in storage.dirs(): if search_directory == dir_id: directory_exists = True if not storage.dirs()[dir_id].enabled: directories.remove(search_directory) break if not directory_exists: directories.remove(search_directory) path = request.json["path"] page = search.search(query, size_min, size_max, mime_types, must_match, directories, path) return json.dumps(page) @app.route("/scroll") def scroll_route(): scroll_id = request.args.get("scroll_id") page = search.scroll(scroll_id) return json.dumps(page) @app.route("/directory") def dir_list(): return render_template("directory.html", directories=storage.dirs()) @app.route("/directory/add") def directory_add(): path = request.args.get("path") name = request.args.get("name") if path is not None and name is not None: d = Directory(path, True, [], name) try: d.set_default_options() storage.save_directory(d) flash("Created directory", "success") except DuplicateDirectoryException: flash("Couldn't create directory Make sure that the path is unique", "danger") return redirect("/directory") @app.route("/directory/") def directory_manage(dir_id): directory = storage.dirs()[dir_id] tn_size = get_dir_size("static/thumbnails/" + str(dir_id)) tn_size_formatted = humanfriendly.format_size(tn_size) return render_template("directory_manage.html", directory=directory, tn_size=tn_size, tn_size_formatted=tn_size_formatted) @app.route("/directory//update") def directory_update(dir_id): directory = storage.dirs()[dir_id] name = request.args.get("name") name = directory.name if name is None else name enabled = request.args.get("enabled") enabled = directory.enabled if enabled is None else int(enabled) path = request.args.get("path") path = directory.path if path is None else path # Only name and enabled status can be updated updated_dir = Directory(path, enabled, directory.options, name) updated_dir.id = dir_id try: storage.update_directory(updated_dir) flash("Updated directory", "success") except DuplicateDirectoryException: flash("Couldn't update directory Make sure that the path is unique", "danger") return redirect("/directory/" + str(dir_id)) @app.route("/directory//update_opt") def directory_update_opt(dir_id): opt_id = request.args.get("id") opt_key = request.args.get("key") opt_value = request.args.get("value") storage.update_option(Option(opt_key, opt_value, dir_id, opt_id)) return redirect("/directory/" + str(dir_id)) @app.route("/directory//del") def directory_del(dir_id): search.delete_directory(dir_id) if os.path.exists("static/thumbnails/" + str(dir_id)): shutil.rmtree("static/thumbnails/" + str(dir_id)) storage.remove_directory(dir_id) flash("Deleted directory", "success") return redirect("/directory") @app.route("/directory//reset") def directory_reset(dir_id): directory = storage.dirs()[dir_id] for opt in directory.options: storage.del_option(opt.id) directory.set_default_options() for opt in directory.options: opt.dir_id = dir_id storage.save_option(opt) storage.dir_cache_outdated = True search.delete_directory(dir_id) flash("Reset directory options to default settings", "success") return redirect("directory/" + str(dir_id)) @app.route("/task") def task(): return render_template("task.html", tasks=storage.tasks(), directories=storage.dirs(), task_list=json.dumps(list(storage.tasks().keys()))) # return render_template("task.html", tasks=storage.tasks(), directories=storage.dirs()) @app.route("/task/current") def get_current_task(): if tm and tm.current_task: return tm.current_task.to_json() else: return "" @app.route("/task/add") def task_add(): type = request.args.get("type") directory = request.args.get("directory") storage.save_task(Task(type, directory)) return redirect("/task") @app.route("/task//del") def task_del(task_id): storage.del_task(task_id) if tm.current_task is not None and task_id == tm.current_task.task.id: tm.cancel_task() return redirect("/task") @app.route("/reset_es") def reset_es(): flash("Elasticsearch index has been reset. Modifications made in config.py have been applied.", "success") tm.indexer.init() if os.path.exists("static/thumbnails"): shutil.rmtree("static/thumbnails") return redirect("/dashboard") @app.route("/dashboard") def dashboard(): tn_sizes = {} tn_size_total = 0 for directory in storage.dirs(): tn_size = get_dir_size("static/thumbnails/" + str(directory)) tn_size_formatted = humanfriendly.format_size(tn_size) tn_sizes[directory] = tn_size_formatted tn_size_total += tn_size tn_size_total_formatted = humanfriendly.format_size(tn_size_total) return render_template("dashboard.html", version=config.VERSION, tn_sizes=tn_sizes, tn_size_total=tn_size_total_formatted, doc_size=humanfriendly.format_size(search.get_doc_size()), doc_count=search.get_doc_count(), db_path=config.db_path, elasticsearch_url=config.elasticsearch_url, index_size=humanfriendly.format_size(search.get_index_size())) if __name__ == "__main__": app.run("0.0.0.0", 8080, threaded=True)