From 7d40b9e959dc4e66f74b1b0bb1473650e50a3826 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Wed, 19 Jan 2022 10:37:43 +0800 Subject: [PATCH 1/7] incremental scan: build delete index. only load from main & original. --- src/ctx.h | 1 + src/io/serialize.c | 17 +++++++ src/io/serialize.h | 4 +- src/main.c | 115 ++++++++++++++++++++++++++++++-------------- src/parsing/parse.c | 6 ++- src/util.h | 5 +- 6 files changed, 109 insertions(+), 39 deletions(-) diff --git a/src/ctx.h b/src/ctx.h index 48e1e4a..a33e7ad 100644 --- a/src/ctx.h +++ b/src/ctx.h @@ -41,6 +41,7 @@ typedef struct { GHashTable *original_table; GHashTable *copy_table; + GHashTable *new_table; pthread_mutex_t copy_table_mu; pcre *exclude; diff --git a/src/io/serialize.c b/src/io/serialize.c index 89f1200..36fa40e 100644 --- a/src/io/serialize.c +++ b/src/io/serialize.c @@ -524,3 +524,20 @@ void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, read_index(filepath, "", INDEX_TYPE_NDJSON, incremental_copy_handle_doc); } + +void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *new_table) { + GHashTableIter iter; + gpointer key, UNUSED(value); + char path_md5[MD5_STR_LENGTH + 1]; + path_md5[MD5_STR_LENGTH] = '\0'; + path_md5[MD5_STR_LENGTH - 1] = '\n'; + initialize_writer_ctx(del_filepath); + g_hash_table_iter_init(&iter, orig_table); + while(g_hash_table_iter_next(&iter, &key, &value)) { + if (NULL == g_hash_table_lookup(new_table, key)) { + memcpy(path_md5, key, MD5_STR_LENGTH - 1); + zstd_write_string(path_md5, MD5_STR_LENGTH); + } + } + writer_cleanup(); +} diff --git a/src/io/serialize.h b/src/io/serialize.h index 90f9066..91b0ea9 100644 --- a/src/io/serialize.h +++ b/src/io/serialize.h @@ -12,6 +12,8 @@ typedef void(*index_func)(cJSON *, const char[MD5_STR_LENGTH]); void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, const char *dst_filepath, GHashTable *copy_table); +void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *new_table); + void write_document(document_t *doc); void read_index(const char *path, const char[MD5_STR_LENGTH], const char *type, index_func); @@ -29,4 +31,4 @@ void write_index_descriptor(char *path, index_descriptor_t *desc); index_descriptor_t read_index_descriptor(char *path); -#endif \ No newline at end of file +#endif diff --git a/src/main.c b/src/main.c index dd1e062..9167498 100644 --- a/src/main.c +++ b/src/main.c @@ -282,10 +282,22 @@ void initialize_scan_context(scan_args_t *args) { ScanCtx.json_ctx.ndjson_mime = mime_get_mime_by_string(ScanCtx.mime_table, "application/ndjson"); } - +/** + * Loads an existing index as the baseline for incremental scanning. + * 1. load old index files (original+main) => original_table + * 2. allocate empty table => copy_table + * 3. allocate empty table => new_table + * the original_table/copy_table/new_table will be populated in parsing/parse.c:parse + * and consumed in main.c:save_incremental_index + * + * Note: the existing index may or may not be of incremental index form. + */ void load_incremental_index(const scan_args_t *args) { + char file_path[PATH_MAX]; + ScanCtx.original_table = incremental_get_table(); ScanCtx.copy_table = incremental_get_table(); + ScanCtx.new_table = incremental_get_table(); DIR *dir = opendir(args->incremental); if (dir == NULL) { @@ -300,19 +312,78 @@ void load_incremental_index(const scan_args_t *args) { LOG_FATALF("main.c", "Version mismatch! Index is %s but executable is %s", original_desc.version, Version) } - struct dirent *de; - while ((de = readdir(dir)) != NULL) { - if (strncmp(de->d_name, "_index", sizeof("_index") - 1) == 0) { - char file_path[PATH_MAX]; - snprintf(file_path, PATH_MAX, "%s%s", args->incremental, de->d_name); - incremental_read(ScanCtx.original_table, file_path, &original_desc); - } + snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", args->incremental); + if (0 == access(file_path, R_OK)) { + incremental_read(ScanCtx.original_table, file_path, &original_desc); + } else { + LOG_FATALF("main.c", "Could not open original main index for incremental scan: %s", strerror(errno)); } + snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", args->incremental); + if (0 == access(file_path, R_OK)) { + incremental_read(ScanCtx.original_table, file_path, &original_desc); + } + closedir(dir); LOG_INFOF("main.c", "Loaded %d items in to mtime table.", g_hash_table_size(ScanCtx.original_table)) } +/** + * Saves an incremental index. + * Before calling this function, the scanner should have finished writing the main index. + * 1. Build original_table - new_table => delete_table + * 2. Incrementally copy from old index files [(original+main) /\ copy_table] => index_original.ndjson.zst & store + */ +void save_incremental_index(scan_args_t* args) { + char dst_path[PATH_MAX]; + char store_path[PATH_MAX]; + char file_path[PATH_MAX]; + snprintf(store_path, PATH_MAX, "%sthumbs", args->incremental); + snprintf(dst_path, PATH_MAX, "%s_index_original.ndjson.zst", ScanCtx.index.path); + store_t *source = store_create(store_path, STORE_SIZE_TN); + + DIR *dir = opendir(args->incremental); + if (dir == NULL) { + perror("opendir"); + return; + } + + snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", ScanCtx.index.path); + incremental_delete(file_path, ScanCtx.original_table, ScanCtx.new_table); + + snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", args->incremental); + if (0 == access(file_path, R_OK)) { + incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); + } else { + perror("incremental_copy"); + return; + } + snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", args->incremental); + if (0 == access(file_path, R_OK)) { + incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); + } + + closedir(dir); + store_destroy(source); + writer_cleanup(); + + snprintf(store_path, PATH_MAX, "%stags", args->incremental); + snprintf(dst_path, PATH_MAX, "%stags", ScanCtx.index.path); + store_t *source_tags = store_create(store_path, STORE_SIZE_TAG); + store_copy(source_tags, dst_path); + store_destroy(source_tags); +} + +/** + * An index can be either incremental or non-incremental (initial index). + * For an initial index, there is only the "main" index. + * For an incremental index, there are, additionally: + * - An "original" index, referencing all files unchanged since the previous index. + * - A "delete" index, referencing all files that exist in the previous index, but deleted since then. + * Therefore, for an incremental index, "main"+"original" covers all the current files in the live filesystem, + * and is orthognal with the "delete" index. When building an incremental index upon an old incremental index, + * the old "delete" index can be safely ignored. + */ void sist2_scan(scan_args_t *args) { ScanCtx.mime_table = mime_get_mime_table(); @@ -366,33 +437,7 @@ void sist2_scan(scan_args_t *args) { LOG_DEBUGF("main.c", "Failed files: %d", ScanCtx.dbg_failed_files_count) if (args->incremental != NULL) { - char dst_path[PATH_MAX]; - snprintf(store_path, PATH_MAX, "%sthumbs", args->incremental); - snprintf(dst_path, PATH_MAX, "%s_index_original.ndjson.zst", ScanCtx.index.path); - store_t *source = store_create(store_path, STORE_SIZE_TN); - - DIR *dir = opendir(args->incremental); - if (dir == NULL) { - perror("opendir"); - return; - } - struct dirent *de; - while ((de = readdir(dir)) != NULL) { - if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { - char file_path[PATH_MAX]; - snprintf(file_path, PATH_MAX, "%s%s", args->incremental, de->d_name); - incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); - } - } - closedir(dir); - store_destroy(source); - writer_cleanup(); - - snprintf(store_path, PATH_MAX, "%stags", args->incremental); - snprintf(dst_path, PATH_MAX, "%stags", ScanCtx.index.path); - store_t *source_tags = store_create(store_path, STORE_SIZE_TAG); - store_copy(source_tags, dst_path); - store_destroy(source_tags); + save_incremental_index(args); } generate_stats(&ScanCtx.index, args->treemap_threshold, ScanCtx.index.path); diff --git a/src/parsing/parse.c b/src/parsing/parse.c index bec4b21..1b74cd4 100644 --- a/src/parsing/parse.c +++ b/src/parsing/parse.c @@ -80,7 +80,8 @@ void parse(void *arg) { int inc_ts = incremental_get(ScanCtx.original_table, doc->path_md5); if (inc_ts != 0 && inc_ts == job->vfile.info.st_mtim.tv_sec) { pthread_mutex_lock(&ScanCtx.copy_table_mu); - incremental_mark_file_for_copy(ScanCtx.copy_table, doc->path_md5); + incremental_mark_file(ScanCtx.copy_table, doc->path_md5); + incremental_mark_file(ScanCtx.new_table, doc->path_md5); pthread_mutex_unlock(&ScanCtx.copy_table_mu); pthread_mutex_lock(&ScanCtx.dbg_file_counts_mu); @@ -89,6 +90,9 @@ void parse(void *arg) { return; } + pthread_mutex_lock(&ScanCtx.copy_table_mu); + incremental_mark_file(ScanCtx.new_table, doc->path_md5); + pthread_mutex_unlock(&ScanCtx.copy_table_mu); char *buf[MAGIC_BUF_SIZE]; diff --git a/src/util.h b/src/util.h index d3d71d0..3ee6cb9 100644 --- a/src/util.h +++ b/src/util.h @@ -134,10 +134,11 @@ static int incremental_get_str(GHashTable *table, const char *path_md5) { } /** - * Not thread safe! + * Marks a file by adding it to a table. + * !!Not thread safe. */ __always_inline -static int incremental_mark_file_for_copy(GHashTable *table, const unsigned char path_md5[MD5_DIGEST_LENGTH]) { +static int incremental_mark_file(GHashTable *table, const unsigned char path_md5[MD5_DIGEST_LENGTH]) { char *ptr = malloc(MD5_STR_LENGTH); buf2hex(path_md5, MD5_DIGEST_LENGTH, ptr); return g_hash_table_insert(table, ptr, GINT_TO_POINTER(1)); From 291d30768929d19aad3d8528b860980459d3f0a5 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Wed, 19 Jan 2022 13:17:51 +0800 Subject: [PATCH 2/7] index: incremental indexing, add stub for index entries removal --- src/cli.h | 1 + src/index/elastic.c | 15 ++++++++++----- src/index/elastic.h | 2 ++ src/io/serialize.c | 18 ++++++++++++++---- src/io/serialize.h | 7 +++++++ src/main.c | 35 ++++++++++++++++++++++++++--------- 6 files changed, 60 insertions(+), 18 deletions(-) diff --git a/src/cli.h b/src/cli.h index 61fac9f..8cd4176 100644 --- a/src/cli.h +++ b/src/cli.h @@ -56,6 +56,7 @@ typedef struct index_args { int async_script; int force_reset; int threads; + int incremental; } index_args_t; typedef struct web_args { diff --git a/src/index/elastic.c b/src/index/elastic.c index 612158d..6da4079 100644 --- a/src/index/elastic.c +++ b/src/index/elastic.c @@ -17,7 +17,7 @@ typedef struct es_indexer { static __thread es_indexer_t *Indexer; -void delete_queue(int max); +void free_queue(int max); void elastic_flush(); @@ -67,6 +67,11 @@ void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) { tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); } +void delete_document(const char* document_id_str, void* data) { + const char* index_id_str = data; + // TODO bulk delete +} + void execute_update_script(const char *script, int async, const char index_id[MD5_STR_LENGTH]) { if (Indexer == NULL) { @@ -223,7 +228,7 @@ void _elastic_flush(int max) { LOG_ERRORF("elastic.c", "Single document too large, giving up: {%s}", Indexer->line_head->path_md5_str) free_response(r); free(buf); - delete_queue(1); + free_queue(1); if (Indexer->queued != 0) { elastic_flush(); } @@ -248,13 +253,13 @@ void _elastic_flush(int max) { } else if (r->status_code != 200) { print_errors(r); - delete_queue(Indexer->queued); + free_queue(Indexer->queued); } else { print_errors(r); LOG_DEBUGF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_len / 1024, r->status_code); - delete_queue(max); + free_queue(max); if (Indexer->queued != 0) { elastic_flush(); @@ -265,7 +270,7 @@ void _elastic_flush(int max) { free(buf); } -void delete_queue(int max) { +void free_queue(int max) { for (int i = 0; i < max; i++) { es_bulk_line_t *tmp = Indexer->line_head; Indexer->line_head = tmp->next; diff --git a/src/index/elastic.h b/src/index/elastic.h index a695db4..65cfc6c 100644 --- a/src/index/elastic.h +++ b/src/index/elastic.h @@ -40,6 +40,8 @@ void print_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]); void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]); +void delete_document(const char *document_id_str, void* data); + es_indexer_t *create_indexer(const char *url, const char *index); void elastic_cleanup(); diff --git a/src/io/serialize.c b/src/io/serialize.c index 36fa40e..5eb4f0c 100644 --- a/src/io/serialize.c +++ b/src/io/serialize.c @@ -398,7 +398,7 @@ void read_index_bin_handle_line(const char *line, const char *index_id, index_fu } } -void read_index_ndjson(const char *path, const char *index_id, index_func func) { +void read_lines(const char *path, const line_processor_t processor) { dyn_buffer_t buf = dyn_buffer_create(); // Initialize zstd things @@ -427,7 +427,7 @@ void read_index_ndjson(const char *path, const char *index_id, index_func func) if (c == '\n') { dyn_buffer_write_char(&buf, '\0'); - read_index_bin_handle_line(buf.buf, index_id, func); + processor.func(buf.buf, processor.data); buf.cur = 0; } else { dyn_buffer_write_char(&buf, c); @@ -452,12 +452,22 @@ void read_index_ndjson(const char *path, const char *index_id, index_func func) dyn_buffer_destroy(&buf); fclose(file); + +} + +void read_index_ndjson(const char *line, void* _data) { + void** data = _data; + const char* index_id = data[0]; + index_func func = data[1]; + read_index_bin_handle_line(line, index_id, func); } void read_index(const char *path, const char index_id[MD5_STR_LENGTH], const char *type, index_func func) { - if (strcmp(type, INDEX_TYPE_NDJSON) == 0) { - read_index_ndjson(path, index_id, func); + read_lines(path, (line_processor_t) { + .data = (void*[2]){(void*)index_id, func} , + .func = read_index_ndjson, + }); } } diff --git a/src/io/serialize.h b/src/io/serialize.h index 91b0ea9..9bf1d1f 100644 --- a/src/io/serialize.h +++ b/src/io/serialize.h @@ -7,6 +7,11 @@ #include #include +typedef struct line_processor { + void* data; + void (*func)(const char*, void*); +} line_processor_t; + typedef void(*index_func)(cJSON *, const char[MD5_STR_LENGTH]); void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, @@ -16,6 +21,8 @@ void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashT void write_document(document_t *doc); +void read_lines(const char *path, const line_processor_t processor); + void read_index(const char *path, const char[MD5_STR_LENGTH], const char *type, index_func); void incremental_read(GHashTable *table, const char *filepath, index_descriptor_t *desc); diff --git a/src/main.c b/src/main.c index 9167498..8bad80c 100644 --- a/src/main.c +++ b/src/main.c @@ -447,6 +447,7 @@ void sist2_scan(scan_args_t *args) { } void sist2_index(index_args_t *args) { + char file_path[PATH_MAX]; IndexCtx.es_url = args->es_url; IndexCtx.es_index = args->es_index; @@ -498,15 +499,25 @@ void sist2_index(index_args_t *args) { IndexCtx.pool = tpool_create(args->threads, cleanup, FALSE, args->print == 0); tpool_start(IndexCtx.pool); - struct dirent *de; - while ((de = readdir(dir)) != NULL) { - if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { - char file_path[PATH_MAX]; - snprintf(file_path, PATH_MAX, "%s/%s", args->index_path, de->d_name); - read_index(file_path, desc.id, desc.type, f); - LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) - } + snprintf(file_path, PATH_MAX, "%s/_index_original.ndjson.zst", args->index_path); + if (!args->incremental && 0 == access(file_path, R_OK)) { + read_index(file_path, desc.id, desc.type, f); + LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) } + snprintf(file_path, PATH_MAX, "%s/_index_main.ndjson.zst", args->index_path); + if (0 == access(file_path, R_OK)) { + read_index(file_path, desc.id, desc.type, f); + LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) + } + snprintf(file_path, PATH_MAX, "%s/_index_delete.list.zst", args->index_path); + if (0 == access(file_path, R_OK)) { + read_lines(file_path, (line_processor_t) { + .data = desc.id, + .func = delete_document + }); + LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) + } + closedir(dir); tpool_wait(IndexCtx.pool); @@ -595,6 +606,7 @@ int main(int argc, const char *argv[]) { char *common_es_url = NULL; char *common_es_index = NULL; char *common_script_path = NULL; + char *common_incremental = NULL; int common_async_script = 0; int common_threads = 0; @@ -613,7 +625,7 @@ int main(int argc, const char *argv[]) { "Thumbnail size, in pixels. Use negative value to disable. DEFAULT=500"), OPT_INTEGER(0, "content-size", &scan_args->content_size, "Number of bytes to be extracted from text documents. Use negative value to disable. DEFAULT=32768"), - OPT_STRING(0, "incremental", &scan_args->incremental, + OPT_STRING(0, "incremental", &common_incremental, "Reuse an existing index and only scan modified files."), OPT_STRING('o', "output", &scan_args->output, "Output directory. DEFAULT=index.sist2/"), OPT_STRING(0, "rewrite-url", &scan_args->rewrite_url, "Serve files from this url instead of from disk."), @@ -651,6 +663,8 @@ int main(int argc, const char *argv[]) { OPT_STRING(0, "es-url", &common_es_url, "Elasticsearch url with port. DEFAULT=http://localhost:9200"), OPT_STRING(0, "es-index", &common_es_index, "Elasticsearch index name. DEFAULT=sist2"), OPT_BOOLEAN('p', "print", &index_args->print, "Just print JSON documents to stdout."), + OPT_STRING(0, "incremental", &common_incremental, + "Conduct incremental indexing, assumes that the old index is already digested by Elasticsearch."), OPT_STRING(0, "script-file", &common_script_path, "Path to user script."), OPT_STRING(0, "mappings-file", &index_args->es_mappings_path, "Path to Elasticsearch mappings."), OPT_STRING(0, "settings-file", &index_args->es_settings_path, "Path to Elasticsearch settings."), @@ -707,6 +721,9 @@ int main(int argc, const char *argv[]) { exec_args->async_script = common_async_script; index_args->async_script = common_async_script; + scan_args->incremental = common_incremental; + index_args->incremental = (common_incremental != NULL); + if (argc == 0) { argparse_usage(&argparse); goto end; From 679e12f786a18f8c7065cf8194e552f655729f25 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Wed, 19 Jan 2022 21:30:35 +0800 Subject: [PATCH 3/7] unify READ_INDICES to reduce clutter --- src/cli.c | 6 ++---- src/io/serialize.h | 14 +++++++++++++ src/main.c | 49 ++++++++++++---------------------------------- src/stats.c | 12 ++---------- 4 files changed, 31 insertions(+), 50 deletions(-) diff --git a/src/cli.c b/src/cli.c index 8d9477f..f139d5f 100644 --- a/src/cli.c +++ b/src/cli.c @@ -336,8 +336,7 @@ int index_args_validate(index_args_t *args, int argc, const char **argv) { if (index_path == NULL) { LOG_FATALF("cli.c", "Invalid PATH argument. File not found: %s", argv[1]) } else { - args->index_path = argv[1]; - free(index_path); + args->index_path = index_path; } if (args->es_url == NULL) { @@ -522,8 +521,7 @@ int exec_args_validate(exec_args_t *args, int argc, const char **argv) { if (index_path == NULL) { LOG_FATALF("cli.c", "Invalid index PATH argument. File not found: %s", argv[1]) } else { - args->index_path = argv[1]; - free(index_path); + args->index_path = index_path; } if (args->es_url == NULL) { diff --git a/src/io/serialize.h b/src/io/serialize.h index 9bf1d1f..5ded868 100644 --- a/src/io/serialize.h +++ b/src/io/serialize.h @@ -38,4 +38,18 @@ void write_index_descriptor(char *path, index_descriptor_t *desc); index_descriptor_t read_index_descriptor(char *path); +// caller ensures char file_path[PATH_MAX] +#define READ_INDICES(file_path, index_path, action_ok, action_main_fail, cond_original) \ + snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", index_path); \ + if (0 == access(file_path, R_OK)) { \ + action_ok; \ + } else { \ + action_main_fail; \ + } \ + snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", index_path); \ + if ((cond_original) && 0 == access(file_path, R_OK)) { \ + action_ok; \ + } \ + + #endif diff --git a/src/main.c b/src/main.c index 8bad80c..5b8cd88 100644 --- a/src/main.c +++ b/src/main.c @@ -305,23 +305,15 @@ void load_incremental_index(const scan_args_t *args) { } char descriptor_path[PATH_MAX]; - snprintf(descriptor_path, PATH_MAX, "%s/descriptor.json", args->incremental); + snprintf(descriptor_path, PATH_MAX, "%sdescriptor.json", args->incremental); index_descriptor_t original_desc = read_index_descriptor(descriptor_path); if (strcmp(original_desc.version, Version) != 0) { LOG_FATALF("main.c", "Version mismatch! Index is %s but executable is %s", original_desc.version, Version) } - snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", args->incremental); - if (0 == access(file_path, R_OK)) { - incremental_read(ScanCtx.original_table, file_path, &original_desc); - } else { - LOG_FATALF("main.c", "Could not open original main index for incremental scan: %s", strerror(errno)); - } - snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", args->incremental); - if (0 == access(file_path, R_OK)) { - incremental_read(ScanCtx.original_table, file_path, &original_desc); - } + READ_INDICES(file_path, args->incremental, incremental_read(ScanCtx.original_table, file_path, &original_desc), + LOG_FATALF("main.c", "Could not open original main index for incremental scan: %s", strerror(errno)), 1); closedir(dir); @@ -351,17 +343,8 @@ void save_incremental_index(scan_args_t* args) { snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", ScanCtx.index.path); incremental_delete(file_path, ScanCtx.original_table, ScanCtx.new_table); - snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", args->incremental); - if (0 == access(file_path, R_OK)) { - incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); - } else { - perror("incremental_copy"); - return; - } - snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", args->incremental); - if (0 == access(file_path, R_OK)) { - incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); - } + READ_INDICES(file_path, args->incremental, incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table), + perror("incremental_copy"), 1); closedir(dir); store_destroy(source); @@ -458,7 +441,7 @@ void sist2_index(index_args_t *args) { } char descriptor_path[PATH_MAX]; - snprintf(descriptor_path, PATH_MAX, "%s/descriptor.json", args->index_path); + snprintf(descriptor_path, PATH_MAX, "%sdescriptor.json", args->index_path); index_descriptor_t desc = read_index_descriptor(descriptor_path); @@ -474,11 +457,11 @@ void sist2_index(index_args_t *args) { } char path_tmp[PATH_MAX]; - snprintf(path_tmp, sizeof(path_tmp), "%s/tags", args->index_path); + snprintf(path_tmp, sizeof(path_tmp), "%stags", args->index_path); IndexCtx.tag_store = store_create(path_tmp, STORE_SIZE_TAG); IndexCtx.tags = store_read_all(IndexCtx.tag_store); - snprintf(path_tmp, sizeof(path_tmp), "%s/meta", args->index_path); + snprintf(path_tmp, sizeof(path_tmp), "%smeta", args->index_path); IndexCtx.meta_store = store_create(path_tmp, STORE_SIZE_META); IndexCtx.meta = store_read_all(IndexCtx.meta_store); @@ -499,17 +482,11 @@ void sist2_index(index_args_t *args) { IndexCtx.pool = tpool_create(args->threads, cleanup, FALSE, args->print == 0); tpool_start(IndexCtx.pool); - snprintf(file_path, PATH_MAX, "%s/_index_original.ndjson.zst", args->index_path); - if (!args->incremental && 0 == access(file_path, R_OK)) { + READ_INDICES(file_path, args->index_path, { read_index(file_path, desc.id, desc.type, f); - LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) - } - snprintf(file_path, PATH_MAX, "%s/_index_main.ndjson.zst", args->index_path); - if (0 == access(file_path, R_OK)) { - read_index(file_path, desc.id, desc.type, f); - LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) - } - snprintf(file_path, PATH_MAX, "%s/_index_delete.list.zst", args->index_path); + LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type); + }, {}, !args->incremental); + snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", args->index_path); if (0 == access(file_path, R_OK)) { read_lines(file_path, (line_processor_t) { .data = desc.id, @@ -539,7 +516,7 @@ void sist2_exec_script(exec_args_t *args) { LogCtx.verbose = TRUE; char descriptor_path[PATH_MAX]; - snprintf(descriptor_path, PATH_MAX, "%s/descriptor.json", args->index_path); + snprintf(descriptor_path, PATH_MAX, "%sdescriptor.json", args->index_path); index_descriptor_t desc = read_index_descriptor(descriptor_path); IndexCtx.es_url = args->es_url; diff --git a/src/stats.c b/src/stats.c index b2292c9..b90604f 100644 --- a/src/stats.c +++ b/src/stats.c @@ -96,16 +96,8 @@ void fill_tables(cJSON *document, UNUSED(const char index_id[MD5_STR_LENGTH])) { } void read_index_into_tables(index_t *index) { - DIR *dir = opendir(index->path); - struct dirent *de; - while ((de = readdir(dir)) != NULL) { - if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { - char file_path[PATH_MAX]; - snprintf(file_path, PATH_MAX, "%s%s", index->path, de->d_name); - read_index(file_path, index->desc.id, index->desc.type, fill_tables); - } - } - closedir(dir); + char file_path[PATH_MAX]; + READ_INDICES(file_path, index->path, read_index(file_path, index->desc.id, index->desc.type, fill_tables), {}, 1); } static size_t rfind(const char *str, int c) { From 2cb57f363468f4b5c7e800c03d902a372b0cdf4d Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Thu, 20 Jan 2022 03:08:13 +0800 Subject: [PATCH 4/7] index: bulk delete --- src/index/elastic.c | 69 ++++++++++++++++++++++++++++++--------------- src/index/elastic.h | 4 +++ src/main.c | 4 +-- 3 files changed, 52 insertions(+), 25 deletions(-) diff --git a/src/index/elastic.c b/src/index/elastic.c index 6da4079..61eb166 100644 --- a/src/index/elastic.c +++ b/src/index/elastic.c @@ -52,11 +52,22 @@ void index_json_func(void *arg) { elastic_index_line(line); } +void delete_document(const char* document_id_str, void* UNUSED(_data)) { + size_t id_len = strlen(document_id_str); + es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t)); + bulk_line->type = ES_BULK_LINE_DELETE; + bulk_line->next = NULL; + memcpy(bulk_line->path_md5_str, document_id_str, MD5_STR_LENGTH); + tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); +} + + void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) { char *json = cJSON_PrintUnformatted(document); size_t json_len = strlen(json); es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t) + json_len + 2); + bulk_line->type = ES_BULK_LINE_INDEX; memcpy(bulk_line->line, json, json_len); memcpy(bulk_line->path_md5_str, index_id_str, MD5_STR_LENGTH); *(bulk_line->line + json_len) = '\n'; @@ -67,11 +78,6 @@ void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) { tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); } -void delete_document(const char* document_id_str, void* data) { - const char* index_id_str = data; - // TODO bulk delete -} - void execute_update_script(const char *script, int async, const char index_id[MD5_STR_LENGTH]) { if (Indexer == NULL) { @@ -130,30 +136,47 @@ void *create_bulk_buffer(int max, int *count, size_t *buf_len) { size_t buf_cur = 0; char *buf = malloc(8192); size_t buf_capacity = 8192; +#define GROW_BUF(delta) \ + while (buf_size + (delta) > buf_capacity) { \ + buf_capacity *= 2; \ + buf = realloc(buf, buf_capacity); \ + } \ + buf_size += (delta); \ + // see: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + // ES_BULK_LINE_INDEX: two lines, 1st action, 2nd content + // ES_BULK_LINE_DELETE: one line while (line != NULL && *count < max) { char action_str[256]; - snprintf( - action_str, sizeof(action_str), - "{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n", - line->path_md5_str, Indexer->es_index - ); + if (line->type == ES_BULK_LINE_INDEX) { + snprintf( + action_str, sizeof(action_str), + "{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n", + line->path_md5_str, Indexer->es_index + ); - size_t action_str_len = strlen(action_str); - size_t line_len = strlen(line->line); + size_t action_str_len = strlen(action_str); + size_t line_len = strlen(line->line); - while (buf_size + line_len + action_str_len > buf_capacity) { - buf_capacity *= 2; - buf = realloc(buf, buf_capacity); + GROW_BUF(action_str_len + line_len); + + memcpy(buf + buf_cur, action_str, action_str_len); + buf_cur += action_str_len; + memcpy(buf + buf_cur, line->line, line_len); + buf_cur += line_len; + + } else if (line->type == ES_BULK_LINE_DELETE) { + snprintf( + action_str, sizeof(action_str), + "{\"delete\":{\"_id\":\"%s\",\"_index\":\"%s\"}}\n", + line->path_md5_str, Indexer->es_index + ); + + size_t action_str_len = strlen(action_str); + GROW_BUF(action_str_len); + memcpy(buf + buf_cur, action_str, action_str_len); + buf_cur += action_str_len; } - - buf_size += line_len + action_str_len; - - memcpy(buf + buf_cur, action_str, action_str_len); - buf_cur += action_str_len; - memcpy(buf + buf_cur, line->line, line_len); - buf_cur += line_len; - line = line->next; (*count)++; } diff --git a/src/index/elastic.h b/src/index/elastic.h index 65cfc6c..5a33d15 100644 --- a/src/index/elastic.h +++ b/src/index/elastic.h @@ -3,9 +3,13 @@ #include "src/sist.h" +#define ES_BULK_LINE_INDEX 0 +#define ES_BULK_LINE_DELETE 1 + typedef struct es_bulk_line { struct es_bulk_line *next; char path_md5_str[MD5_STR_LENGTH]; + int type; char line[0]; } es_bulk_line_t; diff --git a/src/main.c b/src/main.c index 5b8cd88..3bb9401 100644 --- a/src/main.c +++ b/src/main.c @@ -489,7 +489,7 @@ void sist2_index(index_args_t *args) { snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", args->index_path); if (0 == access(file_path, R_OK)) { read_lines(file_path, (line_processor_t) { - .data = desc.id, + .data = NULL, .func = delete_document }); LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) @@ -698,7 +698,7 @@ int main(int argc, const char *argv[]) { exec_args->async_script = common_async_script; index_args->async_script = common_async_script; - scan_args->incremental = common_incremental; + scan_args->incremental = (common_incremental == NULL) ? NULL : strdup(common_incremental); index_args->incremental = (common_incremental != NULL); if (argc == 0) { From e65905a16553c8678bbda3c2576793d901a8b398 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Thu, 20 Jan 2022 23:39:38 +0800 Subject: [PATCH 5/7] only add new entries into new_table to save memory --- src/io/serialize.c | 5 +++-- src/io/serialize.h | 2 +- src/main.c | 2 +- src/parsing/parse.c | 1 - 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/io/serialize.c b/src/io/serialize.c index 5eb4f0c..36c9e9e 100644 --- a/src/io/serialize.c +++ b/src/io/serialize.c @@ -535,7 +535,7 @@ void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, read_index(filepath, "", INDEX_TYPE_NDJSON, incremental_copy_handle_doc); } -void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *new_table) { +void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *copy_table, GHashTable *new_table) { GHashTableIter iter; gpointer key, UNUSED(value); char path_md5[MD5_STR_LENGTH + 1]; @@ -544,7 +544,8 @@ void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashT initialize_writer_ctx(del_filepath); g_hash_table_iter_init(&iter, orig_table); while(g_hash_table_iter_next(&iter, &key, &value)) { - if (NULL == g_hash_table_lookup(new_table, key)) { + if (NULL == g_hash_table_lookup(new_table, key) && + NULL == g_hash_table_lookup(copy_table, key)) { memcpy(path_md5, key, MD5_STR_LENGTH - 1); zstd_write_string(path_md5, MD5_STR_LENGTH); } diff --git a/src/io/serialize.h b/src/io/serialize.h index 5ded868..d3339df 100644 --- a/src/io/serialize.h +++ b/src/io/serialize.h @@ -17,7 +17,7 @@ typedef void(*index_func)(cJSON *, const char[MD5_STR_LENGTH]); void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, const char *dst_filepath, GHashTable *copy_table); -void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *new_table); +void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *copy_table, GHashTable *new_table); void write_document(document_t *doc); diff --git a/src/main.c b/src/main.c index 3bb9401..d3958f1 100644 --- a/src/main.c +++ b/src/main.c @@ -341,7 +341,7 @@ void save_incremental_index(scan_args_t* args) { } snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", ScanCtx.index.path); - incremental_delete(file_path, ScanCtx.original_table, ScanCtx.new_table); + incremental_delete(file_path, ScanCtx.original_table, ScanCtx.copy_table, ScanCtx.new_table); READ_INDICES(file_path, args->incremental, incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table), perror("incremental_copy"), 1); diff --git a/src/parsing/parse.c b/src/parsing/parse.c index 1b74cd4..7394c5f 100644 --- a/src/parsing/parse.c +++ b/src/parsing/parse.c @@ -81,7 +81,6 @@ void parse(void *arg) { if (inc_ts != 0 && inc_ts == job->vfile.info.st_mtim.tv_sec) { pthread_mutex_lock(&ScanCtx.copy_table_mu); incremental_mark_file(ScanCtx.copy_table, doc->path_md5); - incremental_mark_file(ScanCtx.new_table, doc->path_md5); pthread_mutex_unlock(&ScanCtx.copy_table_mu); pthread_mutex_lock(&ScanCtx.dbg_file_counts_mu); From 8f7edf319044890b5e24bdd1d140e9ae581bf917 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Fri, 21 Jan 2022 03:30:07 +0800 Subject: [PATCH 6/7] incremental_delete: read from index file so that we have parent info --- src/io/serialize.c | 43 ++++++++++++++++++++++++++++--------------- src/io/serialize.h | 5 +++-- src/main.c | 29 +++++++++++------------------ 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/src/io/serialize.c b/src/io/serialize.c index 36c9e9e..c0bb9d1 100644 --- a/src/io/serialize.c +++ b/src/io/serialize.c @@ -486,6 +486,7 @@ void incremental_read(GHashTable *table, const char *filepath, index_descriptor_ } static __thread GHashTable *IncrementalCopyTable = NULL; +static __thread GHashTable *IncrementalNewTable = NULL; static __thread store_t *IncrementalCopySourceStore = NULL; static __thread store_t *IncrementalCopyDestinationStore = NULL; @@ -535,20 +536,32 @@ void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, read_index(filepath, "", INDEX_TYPE_NDJSON, incremental_copy_handle_doc); } -void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *copy_table, GHashTable *new_table) { - GHashTableIter iter; - gpointer key, UNUSED(value); - char path_md5[MD5_STR_LENGTH + 1]; - path_md5[MD5_STR_LENGTH] = '\0'; - path_md5[MD5_STR_LENGTH - 1] = '\n'; - initialize_writer_ctx(del_filepath); - g_hash_table_iter_init(&iter, orig_table); - while(g_hash_table_iter_next(&iter, &key, &value)) { - if (NULL == g_hash_table_lookup(new_table, key) && - NULL == g_hash_table_lookup(copy_table, key)) { - memcpy(path_md5, key, MD5_STR_LENGTH - 1); - zstd_write_string(path_md5, MD5_STR_LENGTH); - } +void incremental_delete_handle_doc(cJSON *document, UNUSED(const char id_str[MD5_STR_LENGTH])) { + + char path_md5_n[MD5_STR_LENGTH + 1]; + path_md5_n[MD5_STR_LENGTH] = '\0'; + path_md5_n[MD5_STR_LENGTH - 1] = '\n'; + const char *path_md5_str = cJSON_GetObjectItem(document, "_id")->valuestring; + + // do not delete archive virtual entries + if (cJSON_GetObjectItem(document, "parent") == NULL + && !incremental_get_str(IncrementalCopyTable, path_md5_str) + && !incremental_get_str(IncrementalNewTable, path_md5_str) + ) { + memcpy(path_md5_n, path_md5_str, MD5_STR_LENGTH - 1); + zstd_write_string(path_md5_n, MD5_STR_LENGTH); } - writer_cleanup(); +} + +void incremental_delete(const char *del_filepath, const char* index_filepath, + GHashTable *copy_table, GHashTable *new_table) { + + if (WriterCtx.out_file == NULL) { + initialize_writer_ctx(del_filepath); + } + + IncrementalCopyTable = copy_table; + IncrementalNewTable = new_table; + + read_index(index_filepath, "", INDEX_TYPE_NDJSON, incremental_delete_handle_doc); } diff --git a/src/io/serialize.h b/src/io/serialize.h index d3339df..0d8275e 100644 --- a/src/io/serialize.h +++ b/src/io/serialize.h @@ -17,7 +17,8 @@ typedef void(*index_func)(cJSON *, const char[MD5_STR_LENGTH]); void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, const char *dst_filepath, GHashTable *copy_table); -void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *copy_table, GHashTable *new_table); +void incremental_delete(const char *del_filepath, const char* index_filepath, + GHashTable *copy_table, GHashTable *new_table); void write_document(document_t *doc); @@ -47,7 +48,7 @@ index_descriptor_t read_index_descriptor(char *path); action_main_fail; \ } \ snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", index_path); \ - if ((cond_original) && 0 == access(file_path, R_OK)) { \ + if ((cond_original) && (0 == access(file_path, R_OK))) { \ action_ok; \ } \ diff --git a/src/main.c b/src/main.c index d3958f1..f514569 100644 --- a/src/main.c +++ b/src/main.c @@ -299,11 +299,6 @@ void load_incremental_index(const scan_args_t *args) { ScanCtx.copy_table = incremental_get_table(); ScanCtx.new_table = incremental_get_table(); - DIR *dir = opendir(args->incremental); - if (dir == NULL) { - LOG_FATALF("main.c", "Could not open original index for incremental scan: %s", strerror(errno)) - } - char descriptor_path[PATH_MAX]; snprintf(descriptor_path, PATH_MAX, "%sdescriptor.json", args->incremental); index_descriptor_t original_desc = read_index_descriptor(descriptor_path); @@ -315,8 +310,6 @@ void load_incremental_index(const scan_args_t *args) { READ_INDICES(file_path, args->incremental, incremental_read(ScanCtx.original_table, file_path, &original_desc), LOG_FATALF("main.c", "Could not open original main index for incremental scan: %s", strerror(errno)), 1); - closedir(dir); - LOG_INFOF("main.c", "Loaded %d items in to mtime table.", g_hash_table_size(ScanCtx.original_table)) } @@ -330,26 +323,26 @@ void save_incremental_index(scan_args_t* args) { char dst_path[PATH_MAX]; char store_path[PATH_MAX]; char file_path[PATH_MAX]; + char del_path[PATH_MAX]; snprintf(store_path, PATH_MAX, "%sthumbs", args->incremental); snprintf(dst_path, PATH_MAX, "%s_index_original.ndjson.zst", ScanCtx.index.path); store_t *source = store_create(store_path, STORE_SIZE_TN); - DIR *dir = opendir(args->incremental); - if (dir == NULL) { - perror("opendir"); - return; - } - - snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", ScanCtx.index.path); - incremental_delete(file_path, ScanCtx.original_table, ScanCtx.copy_table, ScanCtx.new_table); + LOG_INFOF("main.c", "incremental_delete: original size = %u, copy size = %u, new size = %u", + g_hash_table_size(ScanCtx.original_table), + g_hash_table_size(ScanCtx.copy_table), + g_hash_table_size(ScanCtx.new_table)); + snprintf(del_path, PATH_MAX, "%s_index_delete.list.zst", ScanCtx.index.path); + READ_INDICES(file_path, args->incremental, incremental_delete(del_path, file_path, ScanCtx.copy_table, ScanCtx.new_table), + perror("incremental_delete"), 1); + writer_cleanup(); READ_INDICES(file_path, args->incremental, incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table), perror("incremental_copy"), 1); - - closedir(dir); - store_destroy(source); writer_cleanup(); + store_destroy(source); + snprintf(store_path, PATH_MAX, "%stags", args->incremental); snprintf(dst_path, PATH_MAX, "%stags", ScanCtx.index.path); store_t *source_tags = store_create(store_path, STORE_SIZE_TAG); From 501064da104d2d757e3c158388969847d1bad59c Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Sun, 23 Jan 2022 00:41:13 +0800 Subject: [PATCH 7/7] parse: fix full scan regression --- src/parsing/parse.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/parsing/parse.c b/src/parsing/parse.c index 7394c5f..b09ec4f 100644 --- a/src/parsing/parse.c +++ b/src/parsing/parse.c @@ -89,9 +89,11 @@ void parse(void *arg) { return; } - pthread_mutex_lock(&ScanCtx.copy_table_mu); - incremental_mark_file(ScanCtx.new_table, doc->path_md5); - pthread_mutex_unlock(&ScanCtx.copy_table_mu); + if (ScanCtx.new_table != NULL) { + pthread_mutex_lock(&ScanCtx.copy_table_mu); + incremental_mark_file(ScanCtx.new_table, doc->path_md5); + pthread_mutex_unlock(&ScanCtx.copy_table_mu); + } char *buf[MAGIC_BUF_SIZE];