From d259b950179714fe7f98d3a2b44459f5c5d06895 Mon Sep 17 00:00:00 2001 From: simon987 Date: Sun, 26 Feb 2023 10:42:20 -0500 Subject: [PATCH] Update sist2-admin database schema, fix thumbnail-size --- sist2-admin/sist2_admin/app.py | 85 +++++++++++++++----------------- sist2-admin/sist2_admin/cron.py | 2 +- sist2-admin/sist2_admin/jobs.py | 16 +++--- sist2-admin/sist2_admin/sist2.py | 5 +- sist2-admin/sist2_admin/state.py | 35 ++++++++++++- 5 files changed, 85 insertions(+), 58 deletions(-) diff --git a/sist2-admin/sist2_admin/app.py b/sist2-admin/sist2_admin/app.py index f949a7c..ef254d6 100644 --- a/sist2-admin/sist2_admin/app.py +++ b/sist2-admin/sist2_admin/app.py @@ -21,13 +21,11 @@ from config import LOG_FOLDER, logger, WEBSERVER_PORT, DATA_FOLDER, SIST2_BINARY from jobs import Sist2Job, Sist2ScanTask, TaskQueue, Sist2IndexTask, JobStatus from notifications import Subscribe, Notifications from sist2 import Sist2 -from state import PickleTable, RUNNING_FRONTENDS, TESSERACT_LANGS, DB_SCHEMA_VERSION +from state import migrate_v1_to_v2, RUNNING_FRONTENDS, TESSERACT_LANGS, DB_SCHEMA_VERSION from web import Sist2Frontend -VERSION = "1.0" - sist2 = Sist2(SIST2_BINARY, DATA_FOLDER) -db = PersistentState(table_factory=PickleTable, dbfile=os.path.join(DATA_FOLDER, "state.db")) +db = PersistentState(dbfile=os.path.join(DATA_FOLDER, "state.db")) notifications = Notifications() task_queue = TaskQueue(sist2, db, notifications) @@ -52,7 +50,6 @@ async def home(): @app.get("/api") async def api(): return { - "version": VERSION, "tesseract_langs": TESSERACT_LANGS, "logs_folder": LOG_FOLDER } @@ -60,18 +57,17 @@ async def api(): @app.get("/api/job/{name:str}") async def get_job(name: str): - row = db["jobs"][name] - if row: - return row["job"] - raise HTTPException(status_code=404) + job = db["jobs"][name] + if not job: + raise HTTPException(status_code=404) + return job @app.get("/api/frontend/{name:str}") async def get_frontend(name: str): - row = db["frontends"][name] - if row: - frontend = row["frontend"] - frontend: Sist2Frontend + frontend = db["frontends"][name] + frontend: Sist2Frontend + if frontend: frontend.running = frontend.name in RUNNING_FRONTENDS return frontend raise HTTPException(status_code=404) @@ -79,16 +75,16 @@ async def get_frontend(name: str): @app.get("/api/job/") async def get_jobs(): - return [row["job"] for row in db["jobs"]] + return list(db["jobs"]) @app.put("/api/job/{name:str}") -async def update_job(name: str, job: Sist2Job): +async def update_job(name: str, new_job: Sist2Job): # TODO: Check etag - job.last_modified = datetime.now() - row = db["jobs"][name] - if not row: + new_job.last_modified = datetime.now() + job = db["jobs"][name] + if not job: raise HTTPException(status_code=404) args_that_trigger_full_scan = [ @@ -108,15 +104,15 @@ async def update_job(name: str, job: Sist2Job): "read_subtitles", ] for arg in args_that_trigger_full_scan: - if getattr(row["job"].scan_options, arg) != getattr(job.scan_options, arg): - job.do_full_scan = True + if getattr(new_job.scan_options, arg) != getattr(job.scan_options, arg): + new_job.do_full_scan = True - db["jobs"][name] = {"job": job} + db["jobs"][name] = new_job @app.put("/api/frontend/{name:str}") async def update_frontend(name: str, frontend: Sist2Frontend): - db["frontends"][name] = {"frontend": frontend} + db["frontends"][name] = frontend # TODO: Check etag @@ -142,7 +138,7 @@ def _run_job(job: Sist2Job): job.last_modified = datetime.now() if job.status == JobStatus("created"): job.status = JobStatus("started") - db["jobs"][job.name] = {"job": job} + db["jobs"][job.name] = job scan_task = Sist2ScanTask(job, f"Scan [{job.name}]") index_task = Sist2IndexTask(job, f"Index [{job.name}]", depends_on=scan_task) @@ -153,19 +149,19 @@ def _run_job(job: Sist2Job): @app.get("/api/job/{name:str}/run") async def run_job(name: str): - row = db["jobs"][name] - if not row: + job = db["jobs"][name] + if not job: raise HTTPException(status_code=404) - _run_job(row["job"]) + _run_job(job) return "ok" @app.delete("/api/job/{name:str}") async def delete_job(name: str): - row = db["jobs"][name] - if row: + job = db["jobs"][name] + if job: del db["jobs"][name] else: raise HTTPException(status_code=404) @@ -177,8 +173,8 @@ async def delete_frontend(name: str): os.kill(RUNNING_FRONTENDS[name], signal.SIGTERM) del RUNNING_FRONTENDS[name] - row = db["frontends"][name] - if row: + frontend = db["frontends"][name] + if frontend: del db["frontends"][name] else: raise HTTPException(status_code=404) @@ -190,18 +186,18 @@ async def create_job(name: str): raise ValueError("Job with the same name already exists") job = Sist2Job.create_default(name) - db["jobs"][name] = {"job": job} + db["jobs"][name] = job return job @app.post("/api/frontend/{name:str}") async def create_frontend(name: str): - if db["frontend"][name]: + if db["frontends"][name]: raise ValueError("Frontend with the same name already exists") frontend = Sist2Frontend.create_default(name) - db["frontends"][name] = {"frontend": frontend} + db["frontends"][name] = frontend return frontend @@ -255,7 +251,7 @@ def check_es_version(es_url: str, insecure: bool): def start_frontend_(frontend: Sist2Frontend): - frontend.web_options.indices = list(map(lambda j: db["jobs"][j]["job"].last_index, frontend.jobs)) + frontend.web_options.indices = list(map(lambda j: db["jobs"][j].last_index, frontend.jobs)) pid = sist2.web(frontend.web_options, frontend.name) RUNNING_FRONTENDS[frontend.name] = pid @@ -263,11 +259,11 @@ def start_frontend_(frontend: Sist2Frontend): @app.post("/api/frontend/{name:str}/start") async def start_frontend(name: str): - row = db["frontends"][name] - if not row: + frontend = db["frontends"][name] + if not frontend: raise HTTPException(status_code=404) - start_frontend_(row["frontend"]) + start_frontend_(frontend) @app.post("/api/frontend/{name:str}/stop") @@ -280,8 +276,7 @@ async def stop_frontend(name: str): @app.get("/api/frontend/") async def get_frontends(): res = [] - for row in db["frontends"]: - frontend = row["frontend"] + for frontend in db["frontends"]: frontend: Sist2Frontend frontend.running = frontend.name in RUNNING_FRONTENDS res.append(frontend) @@ -364,14 +359,14 @@ def initialize_db(): db["sist2_admin"]["info"] = {"version": DB_SCHEMA_VERSION} frontend = Sist2Frontend.create_default("default") - db["frontends"]["default"] = {"frontend": frontend} + db["frontends"]["default"] = frontend logger.info("Initialized database.") def start_frontends(): - for row in db["frontends"]: - frontend: Sist2Frontend = row["frontend"] + for frontend in db["frontends"]: + frontend: Sist2Frontend if frontend.auto_start and len(frontend.jobs) > 0: start_frontend_(frontend) @@ -380,9 +375,9 @@ if __name__ == '__main__': if not db["sist2_admin"]["info"]: initialize_db() - elif db["sist2_admin"]["info"]["version"] != DB_SCHEMA_VERSION: - print("Database has incompatible schema version! Delete state.db to continue.") - exit(-1) + if db["sist2_admin"]["info"]["version"] == "1": + logger.info("Migrating to v2 database schema") + migrate_v1_to_v2(db) start_frontends() cron.initialize(db, _run_job) diff --git a/sist2-admin/sist2_admin/cron.py b/sist2-admin/sist2_admin/cron.py index c055592..7ee0a1d 100644 --- a/sist2-admin/sist2_admin/cron.py +++ b/sist2-admin/sist2_admin/cron.py @@ -10,7 +10,7 @@ from jobs import Sist2Job def _check_schedule(db: PersistentState, run_job): - for job in (row["job"] for row in db["jobs"]): + for job in db["jobs"]: job: Sist2Job if job.schedule_enabled: diff --git a/sist2-admin/sist2_admin/jobs.py b/sist2-admin/sist2_admin/jobs.py index 955e8f8..cbf29b1 100644 --- a/sist2-admin/sist2_admin/jobs.py +++ b/sist2-admin/sist2_admin/jobs.py @@ -58,10 +58,10 @@ class Sist2Job(BaseModel): cron_expression="0 0 * * *" ) - @validator("etag", always=True) - def validate_etag(cls, value, values): - s = values["name"] + values["scan_options"].json() + values["index_options"].json() + values["cron_expression"] - return md5(s.encode()).hexdigest() + # @validator("etag", always=True) + # def validate_etag(cls, value, values): + # s = values["name"] + values["scan_options"].json() + values["index_options"].json() + values["cron_expression"] + # return md5(s.encode()).hexdigest() class Sist2TaskProgress: @@ -147,7 +147,7 @@ class Sist2ScanTask(Sist2Task): self.job.last_index = index.path self.job.last_index_date = datetime.now() self.job.do_full_scan = False - db["jobs"][self.job.name] = {"job": self.job} + db["jobs"][self.job.name] = self.job self._logger.info(json.dumps({"sist2-admin": f"Save last_index={self.job.last_index}"})) logger.info(f"Completed {self.display_name} ({return_code=})") @@ -185,7 +185,7 @@ class Sist2IndexTask(Sist2Task): # Update status self.job.status = JobStatus("indexed") if ok else JobStatus("failed") - db["jobs"][self.job.name] = {"job": self.job} + db["jobs"][self.job.name] = self.job self._logger.info(json.dumps({"sist2-admin": f"Sist2Scan task finished {return_code=}, {duration=}"})) @@ -195,7 +195,7 @@ class Sist2IndexTask(Sist2Task): def restart_running_frontends(self, db: PersistentState, sist2: Sist2): for frontend_name, pid in RUNNING_FRONTENDS.items(): - frontend = db["frontends"][frontend_name]["frontend"] + frontend = db["frontends"][frontend_name] frontend: Sist2Frontend os.kill(pid, signal.SIGTERM) @@ -204,7 +204,7 @@ class Sist2IndexTask(Sist2Task): except ChildProcessError: pass - frontend.web_options.indices = map(lambda j: db["jobs"][j]["job"].last_index, frontend.jobs) + frontend.web_options.indices = map(lambda j: db["jobs"][j].last_index, frontend.jobs) pid = sist2.web(frontend.web_options, frontend.name) RUNNING_FRONTENDS[frontend_name] = pid diff --git a/sist2-admin/sist2_admin/sist2.py b/sist2-admin/sist2_admin/sist2.py index 8f35866..d5eebde 100644 --- a/sist2-admin/sist2_admin/sist2.py +++ b/sist2-admin/sist2_admin/sist2.py @@ -140,8 +140,9 @@ class ScanOptions(BaseModel): def args(self): args = ["scan", self.path, f"--threads={self.threads}", f"--mem-throttle={self.mem_throttle}", f"--thumbnail-quality={self.thumbnail_quality}", f"--thumbnail-count={self.thumbnail_count}", - f"--content-size={self.content_size}", f"--output={self.output}", f"--depth={self.depth}", - f"--archive={self.archive}", f"--mem-buffer={self.mem_buffer}"] + f"--thumbnail-size={self.thumbnail_size}", f"--content-size={self.content_size}", + f"--output={self.output}", f"--depth={self.depth}", f"--archive={self.archive}", + f"--mem-buffer={self.mem_buffer}"] if self.incremental: args.append(f"--incremental={self.incremental}") diff --git a/sist2-admin/sist2_admin/state.py b/sist2-admin/sist2_admin/state.py index f147892..1d1dcd4 100644 --- a/sist2-admin/sist2_admin/state.py +++ b/sist2-admin/sist2_admin/state.py @@ -1,6 +1,8 @@ from typing import Dict +import shutil -from hexlib.db import Table +from deprecated import deprecated +from hexlib.db import Table, PersistentState import pickle from tesseract import get_tesseract_langs @@ -9,7 +11,7 @@ RUNNING_FRONTENDS: Dict[str, int] = {} TESSERACT_LANGS = get_tesseract_langs() -DB_SCHEMA_VERSION = "1" +DB_SCHEMA_VERSION = "2" from pydantic import BaseModel @@ -28,6 +30,7 @@ def _deserialize(item): return item +@deprecated("Use default table factory in hexlib 1.83+") class PickleTable(Table): def __getitem__(self, item): @@ -48,3 +51,31 @@ class PickleTable(Table): for row in super().sql(where_clause, *params): yield dict((k, _deserialize(v)) for k, v in row.items()) + +def migrate_v1_to_v2(db: PersistentState): + + shutil.copy(db.dbfile, db.dbfile + "-before-migrate-v2.bak") + + # Frontends + db._table_factory = PickleTable + frontends = [row["frontend"] for row in db["frontends"]] + del db["frontends"] + + db._table_factory = Table + for frontend in frontends: + db["frontends"][frontend.name] = frontend + list(db["frontends"]) + + # Jobs + db._table_factory = PickleTable + jobs = [row["job"] for row in db["jobs"]] + del db["jobs"] + + db._table_factory = Table + for job in jobs: + db["jobs"][job.name] = job + list(db["jobs"]) + + db["sist2_admin"]["info"] = { + "version": "2" + } \ No newline at end of file