From 291d30768929d19aad3d8528b860980459d3f0a5 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Wed, 19 Jan 2022 13:17:51 +0800 Subject: [PATCH] index: incremental indexing, add stub for index entries removal --- src/cli.h | 1 + src/index/elastic.c | 15 ++++++++++----- src/index/elastic.h | 2 ++ src/io/serialize.c | 18 ++++++++++++++---- src/io/serialize.h | 7 +++++++ src/main.c | 35 ++++++++++++++++++++++++++--------- 6 files changed, 60 insertions(+), 18 deletions(-) diff --git a/src/cli.h b/src/cli.h index 61fac9f..8cd4176 100644 --- a/src/cli.h +++ b/src/cli.h @@ -56,6 +56,7 @@ typedef struct index_args { int async_script; int force_reset; int threads; + int incremental; } index_args_t; typedef struct web_args { diff --git a/src/index/elastic.c b/src/index/elastic.c index 612158d..6da4079 100644 --- a/src/index/elastic.c +++ b/src/index/elastic.c @@ -17,7 +17,7 @@ typedef struct es_indexer { static __thread es_indexer_t *Indexer; -void delete_queue(int max); +void free_queue(int max); void elastic_flush(); @@ -67,6 +67,11 @@ void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) { tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); } +void delete_document(const char* document_id_str, void* data) { + const char* index_id_str = data; + // TODO bulk delete +} + void execute_update_script(const char *script, int async, const char index_id[MD5_STR_LENGTH]) { if (Indexer == NULL) { @@ -223,7 +228,7 @@ void _elastic_flush(int max) { LOG_ERRORF("elastic.c", "Single document too large, giving up: {%s}", Indexer->line_head->path_md5_str) free_response(r); free(buf); - delete_queue(1); + free_queue(1); if (Indexer->queued != 0) { elastic_flush(); } @@ -248,13 +253,13 @@ void _elastic_flush(int max) { } else if (r->status_code != 200) { print_errors(r); - delete_queue(Indexer->queued); + free_queue(Indexer->queued); } else { print_errors(r); LOG_DEBUGF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_len / 1024, r->status_code); - delete_queue(max); + free_queue(max); if (Indexer->queued != 0) { elastic_flush(); @@ -265,7 +270,7 @@ void _elastic_flush(int max) { free(buf); } -void delete_queue(int max) { +void free_queue(int max) { for (int i = 0; i < max; i++) { es_bulk_line_t *tmp = Indexer->line_head; Indexer->line_head = tmp->next; diff --git a/src/index/elastic.h b/src/index/elastic.h index a695db4..65cfc6c 100644 --- a/src/index/elastic.h +++ b/src/index/elastic.h @@ -40,6 +40,8 @@ void print_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]); void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]); +void delete_document(const char *document_id_str, void* data); + es_indexer_t *create_indexer(const char *url, const char *index); void elastic_cleanup(); diff --git a/src/io/serialize.c b/src/io/serialize.c index 36fa40e..5eb4f0c 100644 --- a/src/io/serialize.c +++ b/src/io/serialize.c @@ -398,7 +398,7 @@ void read_index_bin_handle_line(const char *line, const char *index_id, index_fu } } -void read_index_ndjson(const char *path, const char *index_id, index_func func) { +void read_lines(const char *path, const line_processor_t processor) { dyn_buffer_t buf = dyn_buffer_create(); // Initialize zstd things @@ -427,7 +427,7 @@ void read_index_ndjson(const char *path, const char *index_id, index_func func) if (c == '\n') { dyn_buffer_write_char(&buf, '\0'); - read_index_bin_handle_line(buf.buf, index_id, func); + processor.func(buf.buf, processor.data); buf.cur = 0; } else { dyn_buffer_write_char(&buf, c); @@ -452,12 +452,22 @@ void read_index_ndjson(const char *path, const char *index_id, index_func func) dyn_buffer_destroy(&buf); fclose(file); + +} + +void read_index_ndjson(const char *line, void* _data) { + void** data = _data; + const char* index_id = data[0]; + index_func func = data[1]; + read_index_bin_handle_line(line, index_id, func); } void read_index(const char *path, const char index_id[MD5_STR_LENGTH], const char *type, index_func func) { - if (strcmp(type, INDEX_TYPE_NDJSON) == 0) { - read_index_ndjson(path, index_id, func); + read_lines(path, (line_processor_t) { + .data = (void*[2]){(void*)index_id, func} , + .func = read_index_ndjson, + }); } } diff --git a/src/io/serialize.h b/src/io/serialize.h index 91b0ea9..9bf1d1f 100644 --- a/src/io/serialize.h +++ b/src/io/serialize.h @@ -7,6 +7,11 @@ #include #include +typedef struct line_processor { + void* data; + void (*func)(const char*, void*); +} line_processor_t; + typedef void(*index_func)(cJSON *, const char[MD5_STR_LENGTH]); void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, @@ -16,6 +21,8 @@ void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashT void write_document(document_t *doc); +void read_lines(const char *path, const line_processor_t processor); + void read_index(const char *path, const char[MD5_STR_LENGTH], const char *type, index_func); void incremental_read(GHashTable *table, const char *filepath, index_descriptor_t *desc); diff --git a/src/main.c b/src/main.c index 9167498..8bad80c 100644 --- a/src/main.c +++ b/src/main.c @@ -447,6 +447,7 @@ void sist2_scan(scan_args_t *args) { } void sist2_index(index_args_t *args) { + char file_path[PATH_MAX]; IndexCtx.es_url = args->es_url; IndexCtx.es_index = args->es_index; @@ -498,15 +499,25 @@ void sist2_index(index_args_t *args) { IndexCtx.pool = tpool_create(args->threads, cleanup, FALSE, args->print == 0); tpool_start(IndexCtx.pool); - struct dirent *de; - while ((de = readdir(dir)) != NULL) { - if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { - char file_path[PATH_MAX]; - snprintf(file_path, PATH_MAX, "%s/%s", args->index_path, de->d_name); - read_index(file_path, desc.id, desc.type, f); - LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) - } + snprintf(file_path, PATH_MAX, "%s/_index_original.ndjson.zst", args->index_path); + if (!args->incremental && 0 == access(file_path, R_OK)) { + read_index(file_path, desc.id, desc.type, f); + LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) } + snprintf(file_path, PATH_MAX, "%s/_index_main.ndjson.zst", args->index_path); + if (0 == access(file_path, R_OK)) { + read_index(file_path, desc.id, desc.type, f); + LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) + } + snprintf(file_path, PATH_MAX, "%s/_index_delete.list.zst", args->index_path); + if (0 == access(file_path, R_OK)) { + read_lines(file_path, (line_processor_t) { + .data = desc.id, + .func = delete_document + }); + LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) + } + closedir(dir); tpool_wait(IndexCtx.pool); @@ -595,6 +606,7 @@ int main(int argc, const char *argv[]) { char *common_es_url = NULL; char *common_es_index = NULL; char *common_script_path = NULL; + char *common_incremental = NULL; int common_async_script = 0; int common_threads = 0; @@ -613,7 +625,7 @@ int main(int argc, const char *argv[]) { "Thumbnail size, in pixels. Use negative value to disable. DEFAULT=500"), OPT_INTEGER(0, "content-size", &scan_args->content_size, "Number of bytes to be extracted from text documents. Use negative value to disable. DEFAULT=32768"), - OPT_STRING(0, "incremental", &scan_args->incremental, + OPT_STRING(0, "incremental", &common_incremental, "Reuse an existing index and only scan modified files."), OPT_STRING('o', "output", &scan_args->output, "Output directory. DEFAULT=index.sist2/"), OPT_STRING(0, "rewrite-url", &scan_args->rewrite_url, "Serve files from this url instead of from disk."), @@ -651,6 +663,8 @@ int main(int argc, const char *argv[]) { OPT_STRING(0, "es-url", &common_es_url, "Elasticsearch url with port. DEFAULT=http://localhost:9200"), OPT_STRING(0, "es-index", &common_es_index, "Elasticsearch index name. DEFAULT=sist2"), OPT_BOOLEAN('p', "print", &index_args->print, "Just print JSON documents to stdout."), + OPT_STRING(0, "incremental", &common_incremental, + "Conduct incremental indexing, assumes that the old index is already digested by Elasticsearch."), OPT_STRING(0, "script-file", &common_script_path, "Path to user script."), OPT_STRING(0, "mappings-file", &index_args->es_mappings_path, "Path to Elasticsearch mappings."), OPT_STRING(0, "settings-file", &index_args->es_settings_path, "Path to Elasticsearch settings."), @@ -707,6 +721,9 @@ int main(int argc, const char *argv[]) { exec_args->async_script = common_async_script; index_args->async_script = common_async_script; + scan_args->incremental = common_incremental; + index_args->incremental = (common_incremental != NULL); + if (argc == 0) { argparse_usage(&argparse); goto end;