mirror of
https://github.com/simon987/sist2.git
synced 2025-12-12 06:58:54 +00:00
SQLite backend support for sist2-admin #366
This commit is contained in:
@@ -20,9 +20,9 @@ import cron
|
||||
from config import LOG_FOLDER, logger, WEBSERVER_PORT, DATA_FOLDER, SIST2_BINARY
|
||||
from jobs import Sist2Job, Sist2ScanTask, TaskQueue, Sist2IndexTask, JobStatus
|
||||
from notifications import Subscribe, Notifications
|
||||
from sist2 import Sist2
|
||||
from sist2 import Sist2, Sist2SearchBackend
|
||||
from state import migrate_v1_to_v2, RUNNING_FRONTENDS, TESSERACT_LANGS, DB_SCHEMA_VERSION, migrate_v3_to_v4, \
|
||||
get_log_files_to_remove, delete_log_file
|
||||
get_log_files_to_remove, delete_log_file, create_default_search_backends
|
||||
from web import Sist2Frontend
|
||||
|
||||
sist2 = Sist2(SIST2_BINARY, DATA_FOLDER)
|
||||
@@ -174,12 +174,22 @@ async def task_history(n: int, name: str):
|
||||
|
||||
@app.delete("/api/job/{name:str}")
|
||||
async def delete_job(name: str):
|
||||
job = db["jobs"][name]
|
||||
if job:
|
||||
del db["jobs"][name]
|
||||
else:
|
||||
job: Sist2Job = db["jobs"][name]
|
||||
if not job:
|
||||
raise HTTPException(status_code=404)
|
||||
|
||||
if any(name in frontend.jobs for frontend in db["frontends"]):
|
||||
raise HTTPException(status_code=400, detail="in use (frontend)")
|
||||
|
||||
try:
|
||||
os.remove(job.previous_index)
|
||||
except:
|
||||
pass
|
||||
|
||||
del db["jobs"][name]
|
||||
|
||||
return "ok"
|
||||
|
||||
|
||||
@app.delete("/api/frontend/{name:str}")
|
||||
async def delete_frontend(name: str):
|
||||
@@ -267,7 +277,16 @@ def check_es_version(es_url: str, insecure: bool):
|
||||
def start_frontend_(frontend: Sist2Frontend):
|
||||
frontend.web_options.indices = list(map(lambda j: db["jobs"][j].index_path, frontend.jobs))
|
||||
|
||||
pid = sist2.web(frontend.web_options, frontend.name)
|
||||
backend_name = frontend.web_options.search_backend
|
||||
search_backend = db["search_backends"][backend_name]
|
||||
if search_backend is None:
|
||||
logger.error(
|
||||
f"Error while running task: search backend not found: {backend_name}")
|
||||
return -1
|
||||
|
||||
logger.debug(f"Fetched search backend options for {backend_name}")
|
||||
|
||||
pid = sist2.web(frontend.web_options, search_backend, frontend.name)
|
||||
RUNNING_FRONTENDS[frontend.name] = pid
|
||||
|
||||
|
||||
@@ -297,6 +316,62 @@ async def get_frontends():
|
||||
return res
|
||||
|
||||
|
||||
@app.get("/api/search_backend/")
|
||||
async def get_search_backends():
|
||||
return list(db["search_backends"])
|
||||
|
||||
|
||||
@app.put("/api/search_backend/{name:str}")
|
||||
async def update_search_backend(name: str, backend: Sist2SearchBackend):
|
||||
if not db["search_backends"][name]:
|
||||
raise HTTPException(status_code=404)
|
||||
|
||||
db["search_backends"][name] = backend
|
||||
return "ok"
|
||||
|
||||
|
||||
@app.get("/api/search_backend/{name:str}")
|
||||
def get_search_backend(name: str):
|
||||
backend = db["search_backends"][name]
|
||||
if not backend:
|
||||
raise HTTPException(status_code=404)
|
||||
|
||||
return backend
|
||||
|
||||
|
||||
@app.delete("/api/search_backend/{name:str}")
|
||||
def delete_search_backend(name: str):
|
||||
backend: Sist2SearchBackend = db["search_backends"][name]
|
||||
if not backend:
|
||||
raise HTTPException(status_code=404)
|
||||
|
||||
if any(frontend.web_options.search_backend == name for frontend in db["frontends"]):
|
||||
raise HTTPException(status_code=400, detail="in use (frontend)")
|
||||
|
||||
if any(job.index_options.search_backend == name for job in db["jobs"]):
|
||||
raise HTTPException(status_code=400, detail="in use (job)")
|
||||
|
||||
del db["search_backends"][name]
|
||||
|
||||
try:
|
||||
os.remove(backend.search_index)
|
||||
except:
|
||||
pass
|
||||
|
||||
return "ok"
|
||||
|
||||
|
||||
@app.post("/api/search_backend/{name:str}")
|
||||
def create_search_backend(name: str):
|
||||
if db["search_backends"][name] is not None:
|
||||
return HTTPException(status_code=400, detail="already exists")
|
||||
|
||||
backend = Sist2SearchBackend.create_default(name)
|
||||
db["search_backends"][name] = backend
|
||||
|
||||
return backend
|
||||
|
||||
|
||||
def tail(filepath: str, n: int):
|
||||
with open(filepath) as file:
|
||||
|
||||
@@ -374,6 +449,8 @@ def initialize_db():
|
||||
frontend = Sist2Frontend.create_default("default")
|
||||
db["frontends"]["default"] = frontend
|
||||
|
||||
create_default_search_backends(db)
|
||||
|
||||
logger.info("Initialized database.")
|
||||
|
||||
|
||||
@@ -398,6 +475,9 @@ if __name__ == '__main__':
|
||||
logger.info("Migrating to v4 database schema")
|
||||
migrate_v3_to_v4(db)
|
||||
|
||||
if db["sist2_admin"]["info"]["version"] != DB_SCHEMA_VERSION:
|
||||
raise Exception(f"Incompatible database version for {db.dbfile}")
|
||||
|
||||
start_frontends()
|
||||
cron.initialize(db, _run_job)
|
||||
|
||||
|
||||
@@ -59,11 +59,6 @@ class Sist2Job(BaseModel):
|
||||
cron_expression="0 0 * * *"
|
||||
)
|
||||
|
||||
# @validator("etag", always=True)
|
||||
# def validate_etag(cls, value, values):
|
||||
# s = values["name"] + values["scan_options"].json() + values["index_options"].json() + values["cron_expression"]
|
||||
# return md5(s.encode()).hexdigest()
|
||||
|
||||
|
||||
class Sist2TaskProgress:
|
||||
|
||||
@@ -173,7 +168,14 @@ class Sist2IndexTask(Sist2Task):
|
||||
|
||||
self.job.index_options.path = self.job.scan_options.output
|
||||
|
||||
return_code = sist2.index(self.job.index_options, logs_cb=self.log_callback)
|
||||
search_backend = db["search_backends"][self.job.index_options.search_backend]
|
||||
if search_backend is None:
|
||||
logger.error(f"Error while running task: search backend not found: {self.job.index_options.search_backend}")
|
||||
return -1
|
||||
|
||||
logger.debug(f"Fetched search backend options for {self.job.index_options.search_backend}")
|
||||
|
||||
return_code = sist2.index(self.job.index_options, search_backend, logs_cb=self.log_callback)
|
||||
self.ended = datetime.utcnow()
|
||||
|
||||
duration = self.ended - self.started
|
||||
@@ -208,9 +210,17 @@ class Sist2IndexTask(Sist2Task):
|
||||
except ChildProcessError:
|
||||
pass
|
||||
|
||||
backend_name = frontend.web_options.search_backend
|
||||
search_backend = db["search_backends"][backend_name]
|
||||
if search_backend is None:
|
||||
logger.error(f"Error while running task: search backend not found: {backend_name}")
|
||||
return -1
|
||||
|
||||
logger.debug(f"Fetched search backend options for {backend_name}")
|
||||
|
||||
frontend.web_options.indices = map(lambda j: db["jobs"][j].index_path, frontend.jobs)
|
||||
|
||||
pid = sist2.web(frontend.web_options, frontend.name)
|
||||
pid = sist2.web(frontend.web_options, search_backend, frontend.name)
|
||||
RUNNING_FRONTENDS[frontend_name] = pid
|
||||
|
||||
self._logger.info(json.dumps({"sist2-admin": f"Restart frontend {pid=} {frontend_name=}"}))
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
import logging
|
||||
import os.path
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from io import TextIOWrapper
|
||||
from logging import FileHandler
|
||||
from subprocess import Popen, PIPE
|
||||
@@ -12,7 +13,7 @@ from typing import List
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from config import logger, LOG_FOLDER
|
||||
from config import logger, LOG_FOLDER, DATA_FOLDER
|
||||
|
||||
|
||||
class Sist2Version:
|
||||
@@ -25,77 +26,57 @@ class Sist2Version:
|
||||
return f"{self.major}.{self.minor}.{self.patch}"
|
||||
|
||||
|
||||
class WebOptions(BaseModel):
|
||||
indices: List[str] = []
|
||||
class SearchBackendType(Enum):
|
||||
SQLITE = "sqlite"
|
||||
ELASTICSEARCH = "elasticsearch"
|
||||
|
||||
|
||||
class Sist2SearchBackend(BaseModel):
|
||||
backend_type: SearchBackendType = SearchBackendType("elasticsearch")
|
||||
name: str
|
||||
|
||||
search_index: str = ""
|
||||
|
||||
es_url: str = "http://elasticsearch:9200"
|
||||
es_insecure_ssl: bool = False
|
||||
es_index: str = "sist2"
|
||||
bind: str = "0.0.0.0:4090"
|
||||
auth: str = None
|
||||
tag_auth: str = None
|
||||
tagline: str = "Lightning-fast file system indexer and search tool"
|
||||
dev: bool = False
|
||||
lang: str = "en"
|
||||
auth0_audience: str = None
|
||||
auth0_domain: str = None
|
||||
auth0_client_id: str = None
|
||||
auth0_public_key: str = None
|
||||
auth0_public_key_file: str = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def args(self):
|
||||
args = ["web", f"--es-url={self.es_url}", f"--es-index={self.es_index}", f"--bind={self.bind}",
|
||||
f"--tagline={self.tagline}", f"--lang={self.lang}"]
|
||||
|
||||
if self.auth0_audience:
|
||||
args.append(f"--auth0-audience={self.auth0_audience}")
|
||||
if self.auth0_domain:
|
||||
args.append(f"--auth0-domain={self.auth0_domain}")
|
||||
if self.auth0_client_id:
|
||||
args.append(f"--auth0-client-id={self.auth0_client_id}")
|
||||
if self.auth0_public_key_file:
|
||||
args.append(f"--auth0-public-key-file={self.auth0_public_key_file}")
|
||||
if self.es_insecure_ssl:
|
||||
args.append(f"--es-insecure-ssl")
|
||||
if self.auth:
|
||||
args.append(f"--auth={self.auth}")
|
||||
if self.tag_auth:
|
||||
args.append(f"--tag-auth={self.tag_auth}")
|
||||
if self.dev:
|
||||
args.append(f"--dev")
|
||||
|
||||
args.extend(self.indices)
|
||||
|
||||
return args
|
||||
|
||||
|
||||
class IndexOptions(BaseModel):
|
||||
path: str = None
|
||||
threads: int = 1
|
||||
es_url: str = "http://elasticsearch:9200"
|
||||
es_insecure_ssl: bool = False
|
||||
es_index: str = "sist2"
|
||||
incremental_index: bool = True
|
||||
script: str = ""
|
||||
script_file: str = None
|
||||
batch_size: int = 70
|
||||
|
||||
@staticmethod
|
||||
def create_default(name: str, backend_type: SearchBackendType = SearchBackendType("elasticsearch")):
|
||||
return Sist2SearchBackend(
|
||||
name=name,
|
||||
search_index=os.path.join(DATA_FOLDER, f"search-index-{name.replace('/', '_')}.sist2"),
|
||||
backend_type=backend_type
|
||||
)
|
||||
|
||||
|
||||
class IndexOptions(BaseModel):
|
||||
path: str = None
|
||||
incremental_index: bool = True
|
||||
search_backend: str = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def args(self):
|
||||
def args(self, search_backend):
|
||||
if search_backend.backend_type == SearchBackendType("sqlite"):
|
||||
args = ["sqlite-index", self.path, "--search-index", search_backend.search_index]
|
||||
else:
|
||||
args = ["index", self.path, f"--threads={search_backend.threads}",
|
||||
f"--es-url={search_backend.es_url}",
|
||||
f"--es-index={search_backend.es_index}",
|
||||
f"--batch-size={search_backend.batch_size}"]
|
||||
|
||||
args = ["index", self.path, f"--threads={self.threads}", f"--es-url={self.es_url}",
|
||||
f"--es-index={self.es_index}", f"--batch-size={self.batch_size}"]
|
||||
|
||||
if self.script_file:
|
||||
args.append(f"--script-file={self.script_file}")
|
||||
if self.es_insecure_ssl:
|
||||
args.append(f"--es-insecure-ssl")
|
||||
if self.incremental_index:
|
||||
args.append(f"--incremental-index")
|
||||
if search_backend.script_file:
|
||||
args.append(f"--script-file={search_backend.script_file}")
|
||||
if search_backend.es_insecure_ssl:
|
||||
args.append(f"--es-insecure-ssl")
|
||||
if self.incremental_index:
|
||||
args.append(f"--incremental-index")
|
||||
|
||||
return args
|
||||
|
||||
@@ -200,6 +181,56 @@ class Sist2Index:
|
||||
def name(self) -> str:
|
||||
return self._descriptor["name"]
|
||||
|
||||
class WebOptions(BaseModel):
|
||||
indices: List[str] = []
|
||||
|
||||
search_backend: str = "elasticsearch"
|
||||
|
||||
bind: str = "0.0.0.0:4090"
|
||||
auth: str = None
|
||||
tag_auth: str = None
|
||||
tagline: str = "Lightning-fast file system indexer and search tool"
|
||||
dev: bool = False
|
||||
lang: str = "en"
|
||||
auth0_audience: str = None
|
||||
auth0_domain: str = None
|
||||
auth0_client_id: str = None
|
||||
auth0_public_key: str = None
|
||||
auth0_public_key_file: str = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def args(self, search_backend: Sist2SearchBackend):
|
||||
args = ["web", f"--bind={self.bind}", f"--tagline={self.tagline}",
|
||||
f"--lang={self.lang}"]
|
||||
|
||||
if search_backend.backend_type == SearchBackendType("sqlite"):
|
||||
args.append(f"--search-index={search_backend.search_index}")
|
||||
else:
|
||||
args.append(f"--es-url={search_backend.es_url}")
|
||||
args.append(f"--es-index={search_backend.es_index}")
|
||||
if search_backend.es_insecure_ssl:
|
||||
args.append(f"--es-insecure-ssl")
|
||||
|
||||
if self.auth0_audience:
|
||||
args.append(f"--auth0-audience={self.auth0_audience}")
|
||||
if self.auth0_domain:
|
||||
args.append(f"--auth0-domain={self.auth0_domain}")
|
||||
if self.auth0_client_id:
|
||||
args.append(f"--auth0-client-id={self.auth0_client_id}")
|
||||
if self.auth0_public_key_file:
|
||||
args.append(f"--auth0-public-key-file={self.auth0_public_key_file}")
|
||||
if self.auth:
|
||||
args.append(f"--auth={self.auth}")
|
||||
if self.tag_auth:
|
||||
args.append(f"--tag-auth={self.tag_auth}")
|
||||
if self.dev:
|
||||
args.append(f"--dev")
|
||||
|
||||
args.extend(self.indices)
|
||||
|
||||
return args
|
||||
|
||||
class Sist2:
|
||||
|
||||
@@ -207,21 +238,23 @@ class Sist2:
|
||||
self._bin_path = bin_path
|
||||
self._data_dir = data_directory
|
||||
|
||||
def index(self, options: IndexOptions, logs_cb):
|
||||
def index(self, options: IndexOptions, search_backend: Sist2SearchBackend, logs_cb):
|
||||
|
||||
if options.script:
|
||||
if search_backend.script and search_backend.backend_type == SearchBackendType("elasticsearch"):
|
||||
with NamedTemporaryFile("w", prefix="sist2-admin", suffix=".painless", delete=False) as f:
|
||||
f.write(options.script)
|
||||
options.script_file = f.name
|
||||
f.write(search_backend.script)
|
||||
search_backend.script_file = f.name
|
||||
else:
|
||||
options.script_file = None
|
||||
search_backend.script_file = None
|
||||
|
||||
args = [
|
||||
self._bin_path,
|
||||
*options.args(),
|
||||
*options.args(search_backend),
|
||||
"--json-logs",
|
||||
"--very-verbose"
|
||||
]
|
||||
|
||||
logs_cb({"sist2-admin": f"Starting sist2 command with args {args}"})
|
||||
proc = Popen(args, stdout=PIPE, stderr=PIPE)
|
||||
|
||||
t_stderr = Thread(target=self._consume_logs_stderr, args=(logs_cb, proc))
|
||||
@@ -290,7 +323,7 @@ class Sist2:
|
||||
except NameError:
|
||||
pass
|
||||
|
||||
def web(self, options: WebOptions, name: str):
|
||||
def web(self, options: WebOptions, search_backend: Sist2SearchBackend, name: str):
|
||||
|
||||
if options.auth0_public_key:
|
||||
with NamedTemporaryFile("w", prefix="sist2-admin", suffix=".txt", delete=False) as f:
|
||||
@@ -301,7 +334,7 @@ class Sist2:
|
||||
|
||||
args = [
|
||||
self._bin_path,
|
||||
*options.args()
|
||||
*options.args(search_backend)
|
||||
]
|
||||
|
||||
web_logger = logging.Logger(name=f"sist2-frontend-{name}")
|
||||
@@ -321,3 +354,5 @@ class Sist2:
|
||||
t_stdout.start()
|
||||
|
||||
return proc.pid
|
||||
|
||||
|
||||
|
||||
@@ -7,7 +7,8 @@ import pickle
|
||||
|
||||
from tesseract import get_tesseract_langs
|
||||
import sqlite3
|
||||
from config import LOG_FOLDER
|
||||
from config import LOG_FOLDER, logger
|
||||
from sist2 import SearchBackendType, Sist2SearchBackend
|
||||
|
||||
RUNNING_FRONTENDS: Dict[str, int] = {}
|
||||
|
||||
@@ -109,13 +110,26 @@ def migrate_v1_to_v2(db: PersistentState):
|
||||
}
|
||||
|
||||
|
||||
def create_default_search_backends(db: PersistentState):
|
||||
es_backend = Sist2SearchBackend.create_default(name="elasticsearch",
|
||||
backend_type=SearchBackendType("elasticsearch"))
|
||||
db["search_backends"]["elasticsearch"] = es_backend
|
||||
sqlite_backend = Sist2SearchBackend.create_default(name="sqlite", backend_type=SearchBackendType("sqlite"))
|
||||
db["search_backends"]["sqlite"] = sqlite_backend
|
||||
|
||||
|
||||
def migrate_v3_to_v4(db: PersistentState):
|
||||
shutil.copy(db.dbfile, db.dbfile + "-before-migrate-v4.bak")
|
||||
|
||||
conn = sqlite3.connect(db.dbfile)
|
||||
conn.execute("ALTER TABLE task_done ADD COLUMN has_logs INTEGER DEFAULT 1")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
create_default_search_backends(db)
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(db.dbfile)
|
||||
conn.execute("ALTER TABLE task_done ADD COLUMN has_logs INTEGER DEFAULT 1")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
|
||||
db["sist2_admin"]["info"] = {
|
||||
"version": "4"
|
||||
|
||||
Reference in New Issue
Block a user