diff --git a/src/ctx.h b/src/ctx.h index 48e1e4a..a33e7ad 100644 --- a/src/ctx.h +++ b/src/ctx.h @@ -41,6 +41,7 @@ typedef struct { GHashTable *original_table; GHashTable *copy_table; + GHashTable *new_table; pthread_mutex_t copy_table_mu; pcre *exclude; diff --git a/src/io/serialize.c b/src/io/serialize.c index 89f1200..36fa40e 100644 --- a/src/io/serialize.c +++ b/src/io/serialize.c @@ -524,3 +524,20 @@ void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, read_index(filepath, "", INDEX_TYPE_NDJSON, incremental_copy_handle_doc); } + +void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *new_table) { + GHashTableIter iter; + gpointer key, UNUSED(value); + char path_md5[MD5_STR_LENGTH + 1]; + path_md5[MD5_STR_LENGTH] = '\0'; + path_md5[MD5_STR_LENGTH - 1] = '\n'; + initialize_writer_ctx(del_filepath); + g_hash_table_iter_init(&iter, orig_table); + while(g_hash_table_iter_next(&iter, &key, &value)) { + if (NULL == g_hash_table_lookup(new_table, key)) { + memcpy(path_md5, key, MD5_STR_LENGTH - 1); + zstd_write_string(path_md5, MD5_STR_LENGTH); + } + } + writer_cleanup(); +} diff --git a/src/io/serialize.h b/src/io/serialize.h index 90f9066..91b0ea9 100644 --- a/src/io/serialize.h +++ b/src/io/serialize.h @@ -12,6 +12,8 @@ typedef void(*index_func)(cJSON *, const char[MD5_STR_LENGTH]); void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, const char *dst_filepath, GHashTable *copy_table); +void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *new_table); + void write_document(document_t *doc); void read_index(const char *path, const char[MD5_STR_LENGTH], const char *type, index_func); @@ -29,4 +31,4 @@ void write_index_descriptor(char *path, index_descriptor_t *desc); index_descriptor_t read_index_descriptor(char *path); -#endif \ No newline at end of file +#endif diff --git a/src/main.c b/src/main.c index dd1e062..9167498 100644 --- a/src/main.c +++ b/src/main.c @@ -282,10 +282,22 @@ void initialize_scan_context(scan_args_t *args) { ScanCtx.json_ctx.ndjson_mime = mime_get_mime_by_string(ScanCtx.mime_table, "application/ndjson"); } - +/** + * Loads an existing index as the baseline for incremental scanning. + * 1. load old index files (original+main) => original_table + * 2. allocate empty table => copy_table + * 3. allocate empty table => new_table + * the original_table/copy_table/new_table will be populated in parsing/parse.c:parse + * and consumed in main.c:save_incremental_index + * + * Note: the existing index may or may not be of incremental index form. + */ void load_incremental_index(const scan_args_t *args) { + char file_path[PATH_MAX]; + ScanCtx.original_table = incremental_get_table(); ScanCtx.copy_table = incremental_get_table(); + ScanCtx.new_table = incremental_get_table(); DIR *dir = opendir(args->incremental); if (dir == NULL) { @@ -300,19 +312,78 @@ void load_incremental_index(const scan_args_t *args) { LOG_FATALF("main.c", "Version mismatch! Index is %s but executable is %s", original_desc.version, Version) } - struct dirent *de; - while ((de = readdir(dir)) != NULL) { - if (strncmp(de->d_name, "_index", sizeof("_index") - 1) == 0) { - char file_path[PATH_MAX]; - snprintf(file_path, PATH_MAX, "%s%s", args->incremental, de->d_name); - incremental_read(ScanCtx.original_table, file_path, &original_desc); - } + snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", args->incremental); + if (0 == access(file_path, R_OK)) { + incremental_read(ScanCtx.original_table, file_path, &original_desc); + } else { + LOG_FATALF("main.c", "Could not open original main index for incremental scan: %s", strerror(errno)); } + snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", args->incremental); + if (0 == access(file_path, R_OK)) { + incremental_read(ScanCtx.original_table, file_path, &original_desc); + } + closedir(dir); LOG_INFOF("main.c", "Loaded %d items in to mtime table.", g_hash_table_size(ScanCtx.original_table)) } +/** + * Saves an incremental index. + * Before calling this function, the scanner should have finished writing the main index. + * 1. Build original_table - new_table => delete_table + * 2. Incrementally copy from old index files [(original+main) /\ copy_table] => index_original.ndjson.zst & store + */ +void save_incremental_index(scan_args_t* args) { + char dst_path[PATH_MAX]; + char store_path[PATH_MAX]; + char file_path[PATH_MAX]; + snprintf(store_path, PATH_MAX, "%sthumbs", args->incremental); + snprintf(dst_path, PATH_MAX, "%s_index_original.ndjson.zst", ScanCtx.index.path); + store_t *source = store_create(store_path, STORE_SIZE_TN); + + DIR *dir = opendir(args->incremental); + if (dir == NULL) { + perror("opendir"); + return; + } + + snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", ScanCtx.index.path); + incremental_delete(file_path, ScanCtx.original_table, ScanCtx.new_table); + + snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", args->incremental); + if (0 == access(file_path, R_OK)) { + incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); + } else { + perror("incremental_copy"); + return; + } + snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", args->incremental); + if (0 == access(file_path, R_OK)) { + incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); + } + + closedir(dir); + store_destroy(source); + writer_cleanup(); + + snprintf(store_path, PATH_MAX, "%stags", args->incremental); + snprintf(dst_path, PATH_MAX, "%stags", ScanCtx.index.path); + store_t *source_tags = store_create(store_path, STORE_SIZE_TAG); + store_copy(source_tags, dst_path); + store_destroy(source_tags); +} + +/** + * An index can be either incremental or non-incremental (initial index). + * For an initial index, there is only the "main" index. + * For an incremental index, there are, additionally: + * - An "original" index, referencing all files unchanged since the previous index. + * - A "delete" index, referencing all files that exist in the previous index, but deleted since then. + * Therefore, for an incremental index, "main"+"original" covers all the current files in the live filesystem, + * and is orthognal with the "delete" index. When building an incremental index upon an old incremental index, + * the old "delete" index can be safely ignored. + */ void sist2_scan(scan_args_t *args) { ScanCtx.mime_table = mime_get_mime_table(); @@ -366,33 +437,7 @@ void sist2_scan(scan_args_t *args) { LOG_DEBUGF("main.c", "Failed files: %d", ScanCtx.dbg_failed_files_count) if (args->incremental != NULL) { - char dst_path[PATH_MAX]; - snprintf(store_path, PATH_MAX, "%sthumbs", args->incremental); - snprintf(dst_path, PATH_MAX, "%s_index_original.ndjson.zst", ScanCtx.index.path); - store_t *source = store_create(store_path, STORE_SIZE_TN); - - DIR *dir = opendir(args->incremental); - if (dir == NULL) { - perror("opendir"); - return; - } - struct dirent *de; - while ((de = readdir(dir)) != NULL) { - if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { - char file_path[PATH_MAX]; - snprintf(file_path, PATH_MAX, "%s%s", args->incremental, de->d_name); - incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); - } - } - closedir(dir); - store_destroy(source); - writer_cleanup(); - - snprintf(store_path, PATH_MAX, "%stags", args->incremental); - snprintf(dst_path, PATH_MAX, "%stags", ScanCtx.index.path); - store_t *source_tags = store_create(store_path, STORE_SIZE_TAG); - store_copy(source_tags, dst_path); - store_destroy(source_tags); + save_incremental_index(args); } generate_stats(&ScanCtx.index, args->treemap_threshold, ScanCtx.index.path); diff --git a/src/parsing/parse.c b/src/parsing/parse.c index bec4b21..1b74cd4 100644 --- a/src/parsing/parse.c +++ b/src/parsing/parse.c @@ -80,7 +80,8 @@ void parse(void *arg) { int inc_ts = incremental_get(ScanCtx.original_table, doc->path_md5); if (inc_ts != 0 && inc_ts == job->vfile.info.st_mtim.tv_sec) { pthread_mutex_lock(&ScanCtx.copy_table_mu); - incremental_mark_file_for_copy(ScanCtx.copy_table, doc->path_md5); + incremental_mark_file(ScanCtx.copy_table, doc->path_md5); + incremental_mark_file(ScanCtx.new_table, doc->path_md5); pthread_mutex_unlock(&ScanCtx.copy_table_mu); pthread_mutex_lock(&ScanCtx.dbg_file_counts_mu); @@ -89,6 +90,9 @@ void parse(void *arg) { return; } + pthread_mutex_lock(&ScanCtx.copy_table_mu); + incremental_mark_file(ScanCtx.new_table, doc->path_md5); + pthread_mutex_unlock(&ScanCtx.copy_table_mu); char *buf[MAGIC_BUF_SIZE]; diff --git a/src/util.h b/src/util.h index d3d71d0..3ee6cb9 100644 --- a/src/util.h +++ b/src/util.h @@ -134,10 +134,11 @@ static int incremental_get_str(GHashTable *table, const char *path_md5) { } /** - * Not thread safe! + * Marks a file by adding it to a table. + * !!Not thread safe. */ __always_inline -static int incremental_mark_file_for_copy(GHashTable *table, const unsigned char path_md5[MD5_DIGEST_LENGTH]) { +static int incremental_mark_file(GHashTable *table, const unsigned char path_md5[MD5_DIGEST_LENGTH]) { char *ptr = malloc(MD5_STR_LENGTH); buf2hex(path_md5, MD5_DIGEST_LENGTH, ptr); return g_hash_table_insert(table, ptr, GINT_TO_POINTER(1));