mirror of
				https://github.com/simon987/sist2.git
				synced 2025-10-31 16:06:53 +00:00 
			
		
		
		
	incremental scan: build delete index. only load from main & original.
This commit is contained in:
		
							parent
							
								
									cf56bdfb74
								
							
						
					
					
						commit
						7d40b9e959
					
				| @ -41,6 +41,7 @@ typedef struct { | ||||
| 
 | ||||
|     GHashTable *original_table; | ||||
|     GHashTable *copy_table; | ||||
|     GHashTable *new_table; | ||||
|     pthread_mutex_t copy_table_mu; | ||||
| 
 | ||||
|     pcre *exclude; | ||||
|  | ||||
| @ -524,3 +524,20 @@ void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, | ||||
| 
 | ||||
|     read_index(filepath, "", INDEX_TYPE_NDJSON, incremental_copy_handle_doc); | ||||
| } | ||||
| 
 | ||||
| void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *new_table) { | ||||
|     GHashTableIter iter; | ||||
|     gpointer key, UNUSED(value); | ||||
|     char path_md5[MD5_STR_LENGTH + 1]; | ||||
|     path_md5[MD5_STR_LENGTH] = '\0'; | ||||
|     path_md5[MD5_STR_LENGTH - 1] = '\n'; | ||||
|     initialize_writer_ctx(del_filepath); | ||||
|     g_hash_table_iter_init(&iter, orig_table); | ||||
|     while(g_hash_table_iter_next(&iter, &key, &value)) { | ||||
|         if (NULL == g_hash_table_lookup(new_table, key)) { | ||||
|             memcpy(path_md5, key, MD5_STR_LENGTH - 1); | ||||
|             zstd_write_string(path_md5, MD5_STR_LENGTH); | ||||
|         } | ||||
|     } | ||||
|     writer_cleanup(); | ||||
| } | ||||
|  | ||||
| @ -12,6 +12,8 @@ typedef void(*index_func)(cJSON *, const char[MD5_STR_LENGTH]); | ||||
| void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, | ||||
|                       const char *dst_filepath, GHashTable *copy_table); | ||||
| 
 | ||||
| void incremental_delete(const char *del_filepath, GHashTable *orig_table, GHashTable *new_table); | ||||
| 
 | ||||
| void write_document(document_t *doc); | ||||
| 
 | ||||
| void read_index(const char *path, const char[MD5_STR_LENGTH], const char *type, index_func); | ||||
| @ -29,4 +31,4 @@ void write_index_descriptor(char *path, index_descriptor_t *desc); | ||||
| 
 | ||||
| index_descriptor_t read_index_descriptor(char *path); | ||||
| 
 | ||||
| #endif | ||||
| #endif | ||||
|  | ||||
							
								
								
									
										115
									
								
								src/main.c
									
									
									
									
									
								
							
							
						
						
									
										115
									
								
								src/main.c
									
									
									
									
									
								
							| @ -282,10 +282,22 @@ void initialize_scan_context(scan_args_t *args) { | ||||
|     ScanCtx.json_ctx.ndjson_mime = mime_get_mime_by_string(ScanCtx.mime_table, "application/ndjson"); | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| /**
 | ||||
|  * Loads an existing index as the baseline for incremental scanning. | ||||
|  *   1. load old index files (original+main) => original_table | ||||
|  *   2. allocate empty table                 => copy_table | ||||
|  *   3. allocate empty table                 => new_table | ||||
|  * the original_table/copy_table/new_table will be populated in parsing/parse.c:parse | ||||
|  * and consumed in main.c:save_incremental_index | ||||
|  * | ||||
|  * Note: the existing index may or may not be of incremental index form. | ||||
|  */ | ||||
| void load_incremental_index(const scan_args_t *args) { | ||||
|     char file_path[PATH_MAX]; | ||||
| 
 | ||||
|     ScanCtx.original_table = incremental_get_table(); | ||||
|     ScanCtx.copy_table = incremental_get_table(); | ||||
|     ScanCtx.new_table = incremental_get_table(); | ||||
| 
 | ||||
|     DIR *dir = opendir(args->incremental); | ||||
|     if (dir == NULL) { | ||||
| @ -300,19 +312,78 @@ void load_incremental_index(const scan_args_t *args) { | ||||
|         LOG_FATALF("main.c", "Version mismatch! Index is %s but executable is %s", original_desc.version, Version) | ||||
|     } | ||||
| 
 | ||||
|     struct dirent *de; | ||||
|     while ((de = readdir(dir)) != NULL) { | ||||
|         if (strncmp(de->d_name, "_index", sizeof("_index") - 1) == 0) { | ||||
|             char file_path[PATH_MAX]; | ||||
|             snprintf(file_path, PATH_MAX, "%s%s", args->incremental, de->d_name); | ||||
|             incremental_read(ScanCtx.original_table, file_path, &original_desc); | ||||
|         } | ||||
|     snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", args->incremental); | ||||
|     if (0 == access(file_path, R_OK)) { | ||||
|         incremental_read(ScanCtx.original_table, file_path, &original_desc); | ||||
|     } else { | ||||
|         LOG_FATALF("main.c", "Could not open original main index for incremental scan: %s", strerror(errno)); | ||||
|     } | ||||
|     snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", args->incremental); | ||||
|     if (0 == access(file_path, R_OK)) { | ||||
|         incremental_read(ScanCtx.original_table, file_path, &original_desc); | ||||
|     } | ||||
| 
 | ||||
|     closedir(dir); | ||||
| 
 | ||||
|     LOG_INFOF("main.c", "Loaded %d items in to mtime table.", g_hash_table_size(ScanCtx.original_table)) | ||||
| } | ||||
| 
 | ||||
| /**
 | ||||
|  * Saves an incremental index. | ||||
|  * Before calling this function, the scanner should have finished writing the main index. | ||||
|  *   1. Build original_table - new_table => delete_table | ||||
|  *   2. Incrementally copy from old index files [(original+main) /\ copy_table] => index_original.ndjson.zst & store | ||||
|  */ | ||||
| void save_incremental_index(scan_args_t* args) { | ||||
|     char dst_path[PATH_MAX]; | ||||
|     char store_path[PATH_MAX]; | ||||
|     char file_path[PATH_MAX]; | ||||
|     snprintf(store_path, PATH_MAX, "%sthumbs", args->incremental); | ||||
|     snprintf(dst_path, PATH_MAX, "%s_index_original.ndjson.zst", ScanCtx.index.path); | ||||
|     store_t *source = store_create(store_path, STORE_SIZE_TN); | ||||
| 
 | ||||
|     DIR *dir = opendir(args->incremental); | ||||
|     if (dir == NULL) { | ||||
|         perror("opendir"); | ||||
|         return; | ||||
|     } | ||||
| 
 | ||||
|     snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", ScanCtx.index.path); | ||||
|     incremental_delete(file_path, ScanCtx.original_table, ScanCtx.new_table); | ||||
| 
 | ||||
|     snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", args->incremental); | ||||
|     if (0 == access(file_path, R_OK)) { | ||||
|         incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); | ||||
|     } else { | ||||
|         perror("incremental_copy"); | ||||
|         return; | ||||
|     } | ||||
|     snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", args->incremental); | ||||
|     if (0 == access(file_path, R_OK)) { | ||||
|         incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); | ||||
|     } | ||||
| 
 | ||||
|     closedir(dir); | ||||
|     store_destroy(source); | ||||
|     writer_cleanup(); | ||||
| 
 | ||||
|     snprintf(store_path, PATH_MAX, "%stags", args->incremental); | ||||
|     snprintf(dst_path, PATH_MAX, "%stags", ScanCtx.index.path); | ||||
|     store_t *source_tags = store_create(store_path, STORE_SIZE_TAG); | ||||
|     store_copy(source_tags, dst_path); | ||||
|     store_destroy(source_tags); | ||||
| } | ||||
| 
 | ||||
| /**
 | ||||
|  * An index can be either incremental or non-incremental (initial index). | ||||
|  * For an initial index, there is only the "main" index. | ||||
|  * For an incremental index, there are, additionally: | ||||
|  *   - An "original" index, referencing all files unchanged since the previous index. | ||||
|  *   - A "delete" index, referencing all files that exist in the previous index, but deleted since then. | ||||
|  * Therefore, for an incremental index, "main"+"original" covers all the current files in the live filesystem, | ||||
|  * and is orthognal with the "delete" index. When building an incremental index upon an old incremental index, | ||||
|  * the old "delete" index can be safely ignored. | ||||
|  */ | ||||
| void sist2_scan(scan_args_t *args) { | ||||
| 
 | ||||
|     ScanCtx.mime_table = mime_get_mime_table(); | ||||
| @ -366,33 +437,7 @@ void sist2_scan(scan_args_t *args) { | ||||
|     LOG_DEBUGF("main.c", "Failed files: %d", ScanCtx.dbg_failed_files_count) | ||||
| 
 | ||||
|     if (args->incremental != NULL) { | ||||
|         char dst_path[PATH_MAX]; | ||||
|         snprintf(store_path, PATH_MAX, "%sthumbs", args->incremental); | ||||
|         snprintf(dst_path, PATH_MAX, "%s_index_original.ndjson.zst", ScanCtx.index.path); | ||||
|         store_t *source = store_create(store_path, STORE_SIZE_TN); | ||||
| 
 | ||||
|         DIR *dir = opendir(args->incremental); | ||||
|         if (dir == NULL) { | ||||
|             perror("opendir"); | ||||
|             return; | ||||
|         } | ||||
|         struct dirent *de; | ||||
|         while ((de = readdir(dir)) != NULL) { | ||||
|             if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { | ||||
|                 char file_path[PATH_MAX]; | ||||
|                 snprintf(file_path, PATH_MAX, "%s%s", args->incremental, de->d_name); | ||||
|                 incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); | ||||
|             } | ||||
|         } | ||||
|         closedir(dir); | ||||
|         store_destroy(source); | ||||
|         writer_cleanup(); | ||||
| 
 | ||||
|         snprintf(store_path, PATH_MAX, "%stags", args->incremental); | ||||
|         snprintf(dst_path, PATH_MAX, "%stags", ScanCtx.index.path); | ||||
|         store_t *source_tags = store_create(store_path, STORE_SIZE_TAG); | ||||
|         store_copy(source_tags, dst_path); | ||||
|         store_destroy(source_tags); | ||||
|         save_incremental_index(args); | ||||
|     } | ||||
| 
 | ||||
|     generate_stats(&ScanCtx.index, args->treemap_threshold, ScanCtx.index.path); | ||||
|  | ||||
| @ -80,7 +80,8 @@ void parse(void *arg) { | ||||
|     int inc_ts = incremental_get(ScanCtx.original_table, doc->path_md5); | ||||
|     if (inc_ts != 0 && inc_ts == job->vfile.info.st_mtim.tv_sec) { | ||||
|         pthread_mutex_lock(&ScanCtx.copy_table_mu); | ||||
|         incremental_mark_file_for_copy(ScanCtx.copy_table, doc->path_md5); | ||||
|         incremental_mark_file(ScanCtx.copy_table, doc->path_md5); | ||||
|         incremental_mark_file(ScanCtx.new_table, doc->path_md5); | ||||
|         pthread_mutex_unlock(&ScanCtx.copy_table_mu); | ||||
| 
 | ||||
|         pthread_mutex_lock(&ScanCtx.dbg_file_counts_mu); | ||||
| @ -89,6 +90,9 @@ void parse(void *arg) { | ||||
| 
 | ||||
|         return; | ||||
|     } | ||||
|     pthread_mutex_lock(&ScanCtx.copy_table_mu); | ||||
|     incremental_mark_file(ScanCtx.new_table, doc->path_md5); | ||||
|     pthread_mutex_unlock(&ScanCtx.copy_table_mu); | ||||
| 
 | ||||
|     char *buf[MAGIC_BUF_SIZE]; | ||||
| 
 | ||||
|  | ||||
| @ -134,10 +134,11 @@ static int incremental_get_str(GHashTable *table, const char *path_md5) { | ||||
| } | ||||
| 
 | ||||
| /**
 | ||||
|  * Not thread safe! | ||||
|  * Marks a file by adding it to a table. | ||||
|  * !!Not thread safe. | ||||
|  */ | ||||
| __always_inline | ||||
| static int incremental_mark_file_for_copy(GHashTable *table, const unsigned char path_md5[MD5_DIGEST_LENGTH]) { | ||||
| static int incremental_mark_file(GHashTable *table, const unsigned char path_md5[MD5_DIGEST_LENGTH]) { | ||||
|     char *ptr = malloc(MD5_STR_LENGTH); | ||||
|     buf2hex(path_md5, MD5_DIGEST_LENGTH, ptr); | ||||
|     return g_hash_table_insert(table, ptr, GINT_TO_POINTER(1)); | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user