mirror of
				https://github.com/simon987/sist2.git
				synced 2025-10-31 16:06:53 +00:00 
			
		
		
		
	Merge pull request #238 from yatli/dev
incremental scan: build delete index. only load from main & original; incremental indexing;
This commit is contained in:
		
						commit
						37919932de
					
				| @ -336,8 +336,7 @@ int index_args_validate(index_args_t *args, int argc, const char **argv) { | |||||||
|     if (index_path == NULL) { |     if (index_path == NULL) { | ||||||
|         LOG_FATALF("cli.c", "Invalid PATH argument. File not found: %s", argv[1]) |         LOG_FATALF("cli.c", "Invalid PATH argument. File not found: %s", argv[1]) | ||||||
|     } else { |     } else { | ||||||
|         args->index_path = argv[1]; |         args->index_path = index_path; | ||||||
|         free(index_path); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     if (args->es_url == NULL) { |     if (args->es_url == NULL) { | ||||||
| @ -522,8 +521,7 @@ int exec_args_validate(exec_args_t *args, int argc, const char **argv) { | |||||||
|     if (index_path == NULL) { |     if (index_path == NULL) { | ||||||
|         LOG_FATALF("cli.c", "Invalid index PATH argument. File not found: %s", argv[1]) |         LOG_FATALF("cli.c", "Invalid index PATH argument. File not found: %s", argv[1]) | ||||||
|     } else { |     } else { | ||||||
|         args->index_path = argv[1]; |         args->index_path = index_path; | ||||||
|         free(index_path); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     if (args->es_url == NULL) { |     if (args->es_url == NULL) { | ||||||
|  | |||||||
| @ -56,6 +56,7 @@ typedef struct index_args { | |||||||
|     int async_script; |     int async_script; | ||||||
|     int force_reset; |     int force_reset; | ||||||
|     int threads; |     int threads; | ||||||
|  |     int incremental; | ||||||
| } index_args_t; | } index_args_t; | ||||||
| 
 | 
 | ||||||
| typedef struct web_args { | typedef struct web_args { | ||||||
|  | |||||||
| @ -41,6 +41,7 @@ typedef struct { | |||||||
| 
 | 
 | ||||||
|     GHashTable *original_table; |     GHashTable *original_table; | ||||||
|     GHashTable *copy_table; |     GHashTable *copy_table; | ||||||
|  |     GHashTable *new_table; | ||||||
|     pthread_mutex_t copy_table_mu; |     pthread_mutex_t copy_table_mu; | ||||||
| 
 | 
 | ||||||
|     pcre *exclude; |     pcre *exclude; | ||||||
|  | |||||||
| @ -17,7 +17,7 @@ typedef struct es_indexer { | |||||||
| 
 | 
 | ||||||
| static __thread es_indexer_t *Indexer; | static __thread es_indexer_t *Indexer; | ||||||
| 
 | 
 | ||||||
| void delete_queue(int max); | void free_queue(int max); | ||||||
| 
 | 
 | ||||||
| void elastic_flush(); | void elastic_flush(); | ||||||
| 
 | 
 | ||||||
| @ -52,11 +52,22 @@ void index_json_func(void *arg) { | |||||||
|     elastic_index_line(line); |     elastic_index_line(line); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | void delete_document(const char* document_id_str, void* UNUSED(_data)) { | ||||||
|  |     size_t id_len = strlen(document_id_str); | ||||||
|  |     es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t)); | ||||||
|  |     bulk_line->type = ES_BULK_LINE_DELETE; | ||||||
|  |     bulk_line->next = NULL; | ||||||
|  |     memcpy(bulk_line->path_md5_str, document_id_str, MD5_STR_LENGTH); | ||||||
|  |     tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) { | void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) { | ||||||
|     char *json = cJSON_PrintUnformatted(document); |     char *json = cJSON_PrintUnformatted(document); | ||||||
| 
 | 
 | ||||||
|     size_t json_len = strlen(json); |     size_t json_len = strlen(json); | ||||||
|     es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t) + json_len + 2); |     es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t) + json_len + 2); | ||||||
|  |     bulk_line->type = ES_BULK_LINE_INDEX; | ||||||
|     memcpy(bulk_line->line, json, json_len); |     memcpy(bulk_line->line, json, json_len); | ||||||
|     memcpy(bulk_line->path_md5_str, index_id_str, MD5_STR_LENGTH); |     memcpy(bulk_line->path_md5_str, index_id_str, MD5_STR_LENGTH); | ||||||
|     *(bulk_line->line + json_len) = '\n'; |     *(bulk_line->line + json_len) = '\n'; | ||||||
| @ -125,9 +136,19 @@ void *create_bulk_buffer(int max, int *count, size_t *buf_len) { | |||||||
|     size_t buf_cur = 0; |     size_t buf_cur = 0; | ||||||
|     char *buf = malloc(8192); |     char *buf = malloc(8192); | ||||||
|     size_t buf_capacity = 8192; |     size_t buf_capacity = 8192; | ||||||
|  | #define GROW_BUF(delta)                       \ | ||||||
|  |     while (buf_size + (delta) > buf_capacity) { \ | ||||||
|  |       buf_capacity *= 2;                        \ | ||||||
|  |       buf = realloc(buf, buf_capacity);         \ | ||||||
|  |     }                                           \ | ||||||
|  |     buf_size += (delta);                        \ | ||||||
| 
 | 
 | ||||||
|  |     // see: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
 | ||||||
|  |     // ES_BULK_LINE_INDEX: two lines, 1st action, 2nd content
 | ||||||
|  |     // ES_BULK_LINE_DELETE: one line
 | ||||||
|     while (line != NULL && *count < max) { |     while (line != NULL && *count < max) { | ||||||
|         char action_str[256]; |         char action_str[256]; | ||||||
|  |         if (line->type == ES_BULK_LINE_INDEX) { | ||||||
|             snprintf( |             snprintf( | ||||||
|                     action_str, sizeof(action_str), |                     action_str, sizeof(action_str), | ||||||
|                     "{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n", |                     "{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n", | ||||||
| @ -137,18 +158,25 @@ void *create_bulk_buffer(int max, int *count, size_t *buf_len) { | |||||||
|             size_t action_str_len = strlen(action_str); |             size_t action_str_len = strlen(action_str); | ||||||
|             size_t line_len = strlen(line->line); |             size_t line_len = strlen(line->line); | ||||||
| 
 | 
 | ||||||
|         while (buf_size + line_len + action_str_len > buf_capacity) { |             GROW_BUF(action_str_len + line_len); | ||||||
|             buf_capacity *= 2; |  | ||||||
|             buf = realloc(buf, buf_capacity); |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         buf_size += line_len + action_str_len; |  | ||||||
| 
 | 
 | ||||||
|             memcpy(buf + buf_cur, action_str, action_str_len); |             memcpy(buf + buf_cur, action_str, action_str_len); | ||||||
|             buf_cur += action_str_len; |             buf_cur += action_str_len; | ||||||
|             memcpy(buf + buf_cur, line->line, line_len); |             memcpy(buf + buf_cur, line->line, line_len); | ||||||
|             buf_cur += line_len; |             buf_cur += line_len; | ||||||
| 
 | 
 | ||||||
|  |         } else if (line->type == ES_BULK_LINE_DELETE) { | ||||||
|  |             snprintf( | ||||||
|  |                     action_str, sizeof(action_str), | ||||||
|  |                     "{\"delete\":{\"_id\":\"%s\",\"_index\":\"%s\"}}\n", | ||||||
|  |                     line->path_md5_str, Indexer->es_index | ||||||
|  |             ); | ||||||
|  | 
 | ||||||
|  |             size_t action_str_len = strlen(action_str); | ||||||
|  |             GROW_BUF(action_str_len); | ||||||
|  |             memcpy(buf + buf_cur, action_str, action_str_len); | ||||||
|  |             buf_cur += action_str_len; | ||||||
|  |         } | ||||||
|         line = line->next; |         line = line->next; | ||||||
|         (*count)++; |         (*count)++; | ||||||
|     } |     } | ||||||
| @ -223,7 +251,7 @@ void _elastic_flush(int max) { | |||||||
|             LOG_ERRORF("elastic.c", "Single document too large, giving up: {%s}", Indexer->line_head->path_md5_str) |             LOG_ERRORF("elastic.c", "Single document too large, giving up: {%s}", Indexer->line_head->path_md5_str) | ||||||
|             free_response(r); |             free_response(r); | ||||||
|             free(buf); |             free(buf); | ||||||
|             delete_queue(1); |             free_queue(1); | ||||||
|             if (Indexer->queued != 0) { |             if (Indexer->queued != 0) { | ||||||
|                 elastic_flush(); |                 elastic_flush(); | ||||||
|             } |             } | ||||||
| @ -248,13 +276,13 @@ void _elastic_flush(int max) { | |||||||
| 
 | 
 | ||||||
|     } else if (r->status_code != 200) { |     } else if (r->status_code != 200) { | ||||||
|         print_errors(r); |         print_errors(r); | ||||||
|         delete_queue(Indexer->queued); |         free_queue(Indexer->queued); | ||||||
| 
 | 
 | ||||||
|     } else { |     } else { | ||||||
| 
 | 
 | ||||||
|         print_errors(r); |         print_errors(r); | ||||||
|         LOG_DEBUGF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_len / 1024, r->status_code); |         LOG_DEBUGF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_len / 1024, r->status_code); | ||||||
|         delete_queue(max); |         free_queue(max); | ||||||
| 
 | 
 | ||||||
|         if (Indexer->queued != 0) { |         if (Indexer->queued != 0) { | ||||||
|             elastic_flush(); |             elastic_flush(); | ||||||
| @ -265,7 +293,7 @@ void _elastic_flush(int max) { | |||||||
|     free(buf); |     free(buf); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void delete_queue(int max) { | void free_queue(int max) { | ||||||
|     for (int i = 0; i < max; i++) { |     for (int i = 0; i < max; i++) { | ||||||
|         es_bulk_line_t *tmp = Indexer->line_head; |         es_bulk_line_t *tmp = Indexer->line_head; | ||||||
|         Indexer->line_head = tmp->next; |         Indexer->line_head = tmp->next; | ||||||
|  | |||||||
| @ -3,9 +3,13 @@ | |||||||
| 
 | 
 | ||||||
| #include "src/sist.h" | #include "src/sist.h" | ||||||
| 
 | 
 | ||||||
|  | #define ES_BULK_LINE_INDEX 0 | ||||||
|  | #define ES_BULK_LINE_DELETE 1 | ||||||
|  | 
 | ||||||
| typedef struct es_bulk_line { | typedef struct es_bulk_line { | ||||||
|     struct es_bulk_line *next; |     struct es_bulk_line *next; | ||||||
|     char path_md5_str[MD5_STR_LENGTH]; |     char path_md5_str[MD5_STR_LENGTH]; | ||||||
|  |     int type; | ||||||
|     char line[0]; |     char line[0]; | ||||||
| } es_bulk_line_t; | } es_bulk_line_t; | ||||||
| 
 | 
 | ||||||
| @ -40,6 +44,8 @@ void print_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]); | |||||||
| 
 | 
 | ||||||
| void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]); | void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]); | ||||||
| 
 | 
 | ||||||
|  | void delete_document(const char *document_id_str, void* data); | ||||||
|  | 
 | ||||||
| es_indexer_t *create_indexer(const char *url, const char *index); | es_indexer_t *create_indexer(const char *url, const char *index); | ||||||
| 
 | 
 | ||||||
| void elastic_cleanup(); | void elastic_cleanup(); | ||||||
|  | |||||||
| @ -398,7 +398,7 @@ void read_index_bin_handle_line(const char *line, const char *index_id, index_fu | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void read_index_ndjson(const char *path, const char *index_id, index_func func) { | void read_lines(const char *path, const line_processor_t processor) { | ||||||
|     dyn_buffer_t buf = dyn_buffer_create(); |     dyn_buffer_t buf = dyn_buffer_create(); | ||||||
| 
 | 
 | ||||||
|     // Initialize zstd things
 |     // Initialize zstd things
 | ||||||
| @ -427,7 +427,7 @@ void read_index_ndjson(const char *path, const char *index_id, index_func func) | |||||||
| 
 | 
 | ||||||
|                 if (c == '\n') { |                 if (c == '\n') { | ||||||
|                     dyn_buffer_write_char(&buf, '\0'); |                     dyn_buffer_write_char(&buf, '\0'); | ||||||
|                     read_index_bin_handle_line(buf.buf, index_id, func); |                     processor.func(buf.buf, processor.data); | ||||||
|                     buf.cur = 0; |                     buf.cur = 0; | ||||||
|                 } else { |                 } else { | ||||||
|                     dyn_buffer_write_char(&buf, c); |                     dyn_buffer_write_char(&buf, c); | ||||||
| @ -452,12 +452,22 @@ void read_index_ndjson(const char *path, const char *index_id, index_func func) | |||||||
| 
 | 
 | ||||||
|     dyn_buffer_destroy(&buf); |     dyn_buffer_destroy(&buf); | ||||||
|     fclose(file); |     fclose(file); | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | void read_index_ndjson(const char *line, void* _data) { | ||||||
|  |     void** data = _data; | ||||||
|  |     const char* index_id = data[0]; | ||||||
|  |     index_func func = data[1]; | ||||||
|  |     read_index_bin_handle_line(line, index_id, func); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void read_index(const char *path, const char index_id[MD5_STR_LENGTH], const char *type, index_func func) { | void read_index(const char *path, const char index_id[MD5_STR_LENGTH], const char *type, index_func func) { | ||||||
| 
 |  | ||||||
|     if (strcmp(type, INDEX_TYPE_NDJSON) == 0) { |     if (strcmp(type, INDEX_TYPE_NDJSON) == 0) { | ||||||
|         read_index_ndjson(path, index_id, func); |         read_lines(path, (line_processor_t) { | ||||||
|  |             .data = (void*[2]){(void*)index_id, func} , | ||||||
|  |             .func = read_index_ndjson, | ||||||
|  |         }); | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -476,6 +486,7 @@ void incremental_read(GHashTable *table, const char *filepath, index_descriptor_ | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static __thread GHashTable *IncrementalCopyTable = NULL; | static __thread GHashTable *IncrementalCopyTable = NULL; | ||||||
|  | static __thread GHashTable *IncrementalNewTable = NULL; | ||||||
| static __thread store_t *IncrementalCopySourceStore = NULL; | static __thread store_t *IncrementalCopySourceStore = NULL; | ||||||
| static __thread store_t *IncrementalCopyDestinationStore = NULL; | static __thread store_t *IncrementalCopyDestinationStore = NULL; | ||||||
| 
 | 
 | ||||||
| @ -524,3 +535,33 @@ void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, | |||||||
| 
 | 
 | ||||||
|     read_index(filepath, "", INDEX_TYPE_NDJSON, incremental_copy_handle_doc); |     read_index(filepath, "", INDEX_TYPE_NDJSON, incremental_copy_handle_doc); | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | void incremental_delete_handle_doc(cJSON *document, UNUSED(const char id_str[MD5_STR_LENGTH])) { | ||||||
|  | 
 | ||||||
|  |     char path_md5_n[MD5_STR_LENGTH + 1]; | ||||||
|  |     path_md5_n[MD5_STR_LENGTH] = '\0'; | ||||||
|  |     path_md5_n[MD5_STR_LENGTH - 1] = '\n'; | ||||||
|  |     const char *path_md5_str = cJSON_GetObjectItem(document, "_id")->valuestring; | ||||||
|  | 
 | ||||||
|  |     // do not delete archive virtual entries
 | ||||||
|  |     if (cJSON_GetObjectItem(document, "parent") == NULL  | ||||||
|  |         && !incremental_get_str(IncrementalCopyTable, path_md5_str) | ||||||
|  |         && !incremental_get_str(IncrementalNewTable, path_md5_str) | ||||||
|  |         ) { | ||||||
|  |         memcpy(path_md5_n, path_md5_str, MD5_STR_LENGTH - 1); | ||||||
|  |         zstd_write_string(path_md5_n, MD5_STR_LENGTH); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | void incremental_delete(const char *del_filepath, const char* index_filepath,  | ||||||
|  |                         GHashTable *copy_table, GHashTable *new_table) { | ||||||
|  | 
 | ||||||
|  |     if (WriterCtx.out_file == NULL) { | ||||||
|  |         initialize_writer_ctx(del_filepath); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     IncrementalCopyTable = copy_table; | ||||||
|  |     IncrementalNewTable = new_table; | ||||||
|  | 
 | ||||||
|  |     read_index(index_filepath, "", INDEX_TYPE_NDJSON, incremental_delete_handle_doc); | ||||||
|  | } | ||||||
|  | |||||||
| @ -7,13 +7,23 @@ | |||||||
| #include <sys/syscall.h> | #include <sys/syscall.h> | ||||||
| #include <glib.h> | #include <glib.h> | ||||||
| 
 | 
 | ||||||
|  | typedef struct line_processor { | ||||||
|  |   void* data; | ||||||
|  |   void (*func)(const char*, void*); | ||||||
|  | } line_processor_t; | ||||||
|  | 
 | ||||||
| typedef void(*index_func)(cJSON *, const char[MD5_STR_LENGTH]); | typedef void(*index_func)(cJSON *, const char[MD5_STR_LENGTH]); | ||||||
| 
 | 
 | ||||||
| void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, | void incremental_copy(store_t *store, store_t *dst_store, const char *filepath, | ||||||
|                       const char *dst_filepath, GHashTable *copy_table); |                       const char *dst_filepath, GHashTable *copy_table); | ||||||
| 
 | 
 | ||||||
|  | void incremental_delete(const char *del_filepath, const char* index_filepath,  | ||||||
|  |                         GHashTable *copy_table, GHashTable *new_table); | ||||||
|  | 
 | ||||||
| void write_document(document_t *doc); | void write_document(document_t *doc); | ||||||
| 
 | 
 | ||||||
|  | void read_lines(const char *path, const line_processor_t processor); | ||||||
|  | 
 | ||||||
| void read_index(const char *path, const char[MD5_STR_LENGTH], const char *type, index_func); | void read_index(const char *path, const char[MD5_STR_LENGTH], const char *type, index_func); | ||||||
| 
 | 
 | ||||||
| void incremental_read(GHashTable *table, const char *filepath, index_descriptor_t *desc); | void incremental_read(GHashTable *table, const char *filepath, index_descriptor_t *desc); | ||||||
| @ -29,4 +39,18 @@ void write_index_descriptor(char *path, index_descriptor_t *desc); | |||||||
| 
 | 
 | ||||||
| index_descriptor_t read_index_descriptor(char *path); | index_descriptor_t read_index_descriptor(char *path); | ||||||
| 
 | 
 | ||||||
|  | // caller ensures char file_path[PATH_MAX]
 | ||||||
|  | #define READ_INDICES(file_path, index_path, action_ok, action_main_fail, cond_original) \ | ||||||
|  |     snprintf(file_path, PATH_MAX, "%s_index_main.ndjson.zst", index_path);              \ | ||||||
|  |     if (0 == access(file_path, R_OK)) {                                                 \ | ||||||
|  |         action_ok;                                                                      \ | ||||||
|  |     } else {                                                                            \ | ||||||
|  |         action_main_fail;                                                               \ | ||||||
|  |     }                                                                                   \ | ||||||
|  |     snprintf(file_path, PATH_MAX, "%s_index_original.ndjson.zst", index_path);          \ | ||||||
|  |     if ((cond_original) && (0 == access(file_path, R_OK))) {                            \ | ||||||
|  |         action_ok;                                                                      \ | ||||||
|  |     }                                                                                   \ | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| #endif | #endif | ||||||
							
								
								
									
										140
									
								
								src/main.c
									
									
									
									
									
								
							
							
						
						
									
										140
									
								
								src/main.c
									
									
									
									
									
								
							| @ -282,37 +282,84 @@ void initialize_scan_context(scan_args_t *args) { | |||||||
|     ScanCtx.json_ctx.ndjson_mime = mime_get_mime_by_string(ScanCtx.mime_table, "application/ndjson"); |     ScanCtx.json_ctx.ndjson_mime = mime_get_mime_by_string(ScanCtx.mime_table, "application/ndjson"); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | /**
 | ||||||
|  |  * Loads an existing index as the baseline for incremental scanning. | ||||||
|  |  *   1. load old index files (original+main) => original_table | ||||||
|  |  *   2. allocate empty table                 => copy_table | ||||||
|  |  *   3. allocate empty table                 => new_table | ||||||
|  |  * the original_table/copy_table/new_table will be populated in parsing/parse.c:parse | ||||||
|  |  * and consumed in main.c:save_incremental_index | ||||||
|  |  * | ||||||
|  |  * Note: the existing index may or may not be of incremental index form. | ||||||
|  |  */ | ||||||
| void load_incremental_index(const scan_args_t *args) { | void load_incremental_index(const scan_args_t *args) { | ||||||
|  |     char file_path[PATH_MAX]; | ||||||
|  | 
 | ||||||
|     ScanCtx.original_table = incremental_get_table(); |     ScanCtx.original_table = incremental_get_table(); | ||||||
|     ScanCtx.copy_table = incremental_get_table(); |     ScanCtx.copy_table = incremental_get_table(); | ||||||
| 
 |     ScanCtx.new_table = incremental_get_table(); | ||||||
|     DIR *dir = opendir(args->incremental); |  | ||||||
|     if (dir == NULL) { |  | ||||||
|         LOG_FATALF("main.c", "Could not open original index for incremental scan: %s", strerror(errno)) |  | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     char descriptor_path[PATH_MAX]; |     char descriptor_path[PATH_MAX]; | ||||||
|     snprintf(descriptor_path, PATH_MAX, "%s/descriptor.json", args->incremental); |     snprintf(descriptor_path, PATH_MAX, "%sdescriptor.json", args->incremental); | ||||||
|     index_descriptor_t original_desc = read_index_descriptor(descriptor_path); |     index_descriptor_t original_desc = read_index_descriptor(descriptor_path); | ||||||
| 
 | 
 | ||||||
|     if (strcmp(original_desc.version, Version) != 0) { |     if (strcmp(original_desc.version, Version) != 0) { | ||||||
|         LOG_FATALF("main.c", "Version mismatch! Index is %s but executable is %s", original_desc.version, Version) |         LOG_FATALF("main.c", "Version mismatch! Index is %s but executable is %s", original_desc.version, Version) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     struct dirent *de; |     READ_INDICES(file_path, args->incremental, incremental_read(ScanCtx.original_table, file_path, &original_desc), | ||||||
|     while ((de = readdir(dir)) != NULL) { |                  LOG_FATALF("main.c", "Could not open original main index for incremental scan: %s", strerror(errno)), 1); | ||||||
|         if (strncmp(de->d_name, "_index", sizeof("_index") - 1) == 0) { |  | ||||||
|             char file_path[PATH_MAX]; |  | ||||||
|             snprintf(file_path, PATH_MAX, "%s%s", args->incremental, de->d_name); |  | ||||||
|             incremental_read(ScanCtx.original_table, file_path, &original_desc); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     closedir(dir); |  | ||||||
| 
 | 
 | ||||||
|     LOG_INFOF("main.c", "Loaded %d items in to mtime table.", g_hash_table_size(ScanCtx.original_table)) |     LOG_INFOF("main.c", "Loaded %d items in to mtime table.", g_hash_table_size(ScanCtx.original_table)) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | /**
 | ||||||
|  |  * Saves an incremental index. | ||||||
|  |  * Before calling this function, the scanner should have finished writing the main index. | ||||||
|  |  *   1. Build original_table - new_table => delete_table | ||||||
|  |  *   2. Incrementally copy from old index files [(original+main) /\ copy_table] => index_original.ndjson.zst & store | ||||||
|  |  */ | ||||||
|  | void save_incremental_index(scan_args_t* args) { | ||||||
|  |     char dst_path[PATH_MAX]; | ||||||
|  |     char store_path[PATH_MAX]; | ||||||
|  |     char file_path[PATH_MAX]; | ||||||
|  |     char del_path[PATH_MAX]; | ||||||
|  |     snprintf(store_path, PATH_MAX, "%sthumbs", args->incremental); | ||||||
|  |     snprintf(dst_path, PATH_MAX, "%s_index_original.ndjson.zst", ScanCtx.index.path); | ||||||
|  |     store_t *source = store_create(store_path, STORE_SIZE_TN); | ||||||
|  | 
 | ||||||
|  |     LOG_INFOF("main.c", "incremental_delete: original size = %u, copy size = %u, new size = %u", | ||||||
|  |         g_hash_table_size(ScanCtx.original_table), | ||||||
|  |         g_hash_table_size(ScanCtx.copy_table), | ||||||
|  |         g_hash_table_size(ScanCtx.new_table)); | ||||||
|  |     snprintf(del_path, PATH_MAX, "%s_index_delete.list.zst", ScanCtx.index.path); | ||||||
|  |     READ_INDICES(file_path, args->incremental, incremental_delete(del_path, file_path, ScanCtx.copy_table, ScanCtx.new_table), | ||||||
|  |                  perror("incremental_delete"), 1); | ||||||
|  |     writer_cleanup(); | ||||||
|  | 
 | ||||||
|  |     READ_INDICES(file_path, args->incremental, incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table),  | ||||||
|  |                  perror("incremental_copy"), 1); | ||||||
|  |     writer_cleanup(); | ||||||
|  | 
 | ||||||
|  |     store_destroy(source); | ||||||
|  | 
 | ||||||
|  |     snprintf(store_path, PATH_MAX, "%stags", args->incremental); | ||||||
|  |     snprintf(dst_path, PATH_MAX, "%stags", ScanCtx.index.path); | ||||||
|  |     store_t *source_tags = store_create(store_path, STORE_SIZE_TAG); | ||||||
|  |     store_copy(source_tags, dst_path); | ||||||
|  |     store_destroy(source_tags); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /**
 | ||||||
|  |  * An index can be either incremental or non-incremental (initial index). | ||||||
|  |  * For an initial index, there is only the "main" index. | ||||||
|  |  * For an incremental index, there are, additionally: | ||||||
|  |  *   - An "original" index, referencing all files unchanged since the previous index. | ||||||
|  |  *   - A "delete" index, referencing all files that exist in the previous index, but deleted since then. | ||||||
|  |  * Therefore, for an incremental index, "main"+"original" covers all the current files in the live filesystem, | ||||||
|  |  * and is orthognal with the "delete" index. When building an incremental index upon an old incremental index, | ||||||
|  |  * the old "delete" index can be safely ignored. | ||||||
|  |  */ | ||||||
| void sist2_scan(scan_args_t *args) { | void sist2_scan(scan_args_t *args) { | ||||||
| 
 | 
 | ||||||
|     ScanCtx.mime_table = mime_get_mime_table(); |     ScanCtx.mime_table = mime_get_mime_table(); | ||||||
| @ -366,33 +413,7 @@ void sist2_scan(scan_args_t *args) { | |||||||
|     LOG_DEBUGF("main.c", "Failed files: %d", ScanCtx.dbg_failed_files_count) |     LOG_DEBUGF("main.c", "Failed files: %d", ScanCtx.dbg_failed_files_count) | ||||||
| 
 | 
 | ||||||
|     if (args->incremental != NULL) { |     if (args->incremental != NULL) { | ||||||
|         char dst_path[PATH_MAX]; |         save_incremental_index(args); | ||||||
|         snprintf(store_path, PATH_MAX, "%sthumbs", args->incremental); |  | ||||||
|         snprintf(dst_path, PATH_MAX, "%s_index_original.ndjson.zst", ScanCtx.index.path); |  | ||||||
|         store_t *source = store_create(store_path, STORE_SIZE_TN); |  | ||||||
| 
 |  | ||||||
|         DIR *dir = opendir(args->incremental); |  | ||||||
|         if (dir == NULL) { |  | ||||||
|             perror("opendir"); |  | ||||||
|             return; |  | ||||||
|         } |  | ||||||
|         struct dirent *de; |  | ||||||
|         while ((de = readdir(dir)) != NULL) { |  | ||||||
|             if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { |  | ||||||
|                 char file_path[PATH_MAX]; |  | ||||||
|                 snprintf(file_path, PATH_MAX, "%s%s", args->incremental, de->d_name); |  | ||||||
|                 incremental_copy(source, ScanCtx.index.store, file_path, dst_path, ScanCtx.copy_table); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         closedir(dir); |  | ||||||
|         store_destroy(source); |  | ||||||
|         writer_cleanup(); |  | ||||||
| 
 |  | ||||||
|         snprintf(store_path, PATH_MAX, "%stags", args->incremental); |  | ||||||
|         snprintf(dst_path, PATH_MAX, "%stags", ScanCtx.index.path); |  | ||||||
|         store_t *source_tags = store_create(store_path, STORE_SIZE_TAG); |  | ||||||
|         store_copy(source_tags, dst_path); |  | ||||||
|         store_destroy(source_tags); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     generate_stats(&ScanCtx.index, args->treemap_threshold, ScanCtx.index.path); |     generate_stats(&ScanCtx.index, args->treemap_threshold, ScanCtx.index.path); | ||||||
| @ -402,6 +423,7 @@ void sist2_scan(scan_args_t *args) { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void sist2_index(index_args_t *args) { | void sist2_index(index_args_t *args) { | ||||||
|  |     char file_path[PATH_MAX]; | ||||||
| 
 | 
 | ||||||
|     IndexCtx.es_url = args->es_url; |     IndexCtx.es_url = args->es_url; | ||||||
|     IndexCtx.es_index = args->es_index; |     IndexCtx.es_index = args->es_index; | ||||||
| @ -412,7 +434,7 @@ void sist2_index(index_args_t *args) { | |||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     char descriptor_path[PATH_MAX]; |     char descriptor_path[PATH_MAX]; | ||||||
|     snprintf(descriptor_path, PATH_MAX, "%s/descriptor.json", args->index_path); |     snprintf(descriptor_path, PATH_MAX, "%sdescriptor.json", args->index_path); | ||||||
| 
 | 
 | ||||||
|     index_descriptor_t desc = read_index_descriptor(descriptor_path); |     index_descriptor_t desc = read_index_descriptor(descriptor_path); | ||||||
| 
 | 
 | ||||||
| @ -428,11 +450,11 @@ void sist2_index(index_args_t *args) { | |||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     char path_tmp[PATH_MAX]; |     char path_tmp[PATH_MAX]; | ||||||
|     snprintf(path_tmp, sizeof(path_tmp), "%s/tags", args->index_path); |     snprintf(path_tmp, sizeof(path_tmp), "%stags", args->index_path); | ||||||
|     IndexCtx.tag_store = store_create(path_tmp, STORE_SIZE_TAG); |     IndexCtx.tag_store = store_create(path_tmp, STORE_SIZE_TAG); | ||||||
|     IndexCtx.tags = store_read_all(IndexCtx.tag_store); |     IndexCtx.tags = store_read_all(IndexCtx.tag_store); | ||||||
| 
 | 
 | ||||||
|     snprintf(path_tmp, sizeof(path_tmp), "%s/meta", args->index_path); |     snprintf(path_tmp, sizeof(path_tmp), "%smeta", args->index_path); | ||||||
|     IndexCtx.meta_store = store_create(path_tmp, STORE_SIZE_META); |     IndexCtx.meta_store = store_create(path_tmp, STORE_SIZE_META); | ||||||
|     IndexCtx.meta = store_read_all(IndexCtx.meta_store); |     IndexCtx.meta = store_read_all(IndexCtx.meta_store); | ||||||
| 
 | 
 | ||||||
| @ -453,15 +475,19 @@ void sist2_index(index_args_t *args) { | |||||||
|     IndexCtx.pool = tpool_create(args->threads, cleanup, FALSE, args->print == 0); |     IndexCtx.pool = tpool_create(args->threads, cleanup, FALSE, args->print == 0); | ||||||
|     tpool_start(IndexCtx.pool); |     tpool_start(IndexCtx.pool); | ||||||
| 
 | 
 | ||||||
|     struct dirent *de; |     READ_INDICES(file_path, args->index_path, { | ||||||
|     while ((de = readdir(dir)) != NULL) { |  | ||||||
|         if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { |  | ||||||
|             char file_path[PATH_MAX]; |  | ||||||
|             snprintf(file_path, PATH_MAX, "%s/%s", args->index_path, de->d_name); |  | ||||||
|         read_index(file_path, desc.id, desc.type, f); |         read_index(file_path, desc.id, desc.type, f); | ||||||
|  |         LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type); | ||||||
|  |     }, {}, !args->incremental); | ||||||
|  |     snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", args->index_path); | ||||||
|  |     if (0 == access(file_path, R_OK)) { | ||||||
|  |         read_lines(file_path, (line_processor_t) { | ||||||
|  |             .data = NULL, | ||||||
|  |             .func = delete_document | ||||||
|  |         }); | ||||||
|         LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) |         LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) | ||||||
|     } |     } | ||||||
|     } | 
 | ||||||
|     closedir(dir); |     closedir(dir); | ||||||
| 
 | 
 | ||||||
|     tpool_wait(IndexCtx.pool); |     tpool_wait(IndexCtx.pool); | ||||||
| @ -483,7 +509,7 @@ void sist2_exec_script(exec_args_t *args) { | |||||||
|     LogCtx.verbose = TRUE; |     LogCtx.verbose = TRUE; | ||||||
| 
 | 
 | ||||||
|     char descriptor_path[PATH_MAX]; |     char descriptor_path[PATH_MAX]; | ||||||
|     snprintf(descriptor_path, PATH_MAX, "%s/descriptor.json", args->index_path); |     snprintf(descriptor_path, PATH_MAX, "%sdescriptor.json", args->index_path); | ||||||
|     index_descriptor_t desc = read_index_descriptor(descriptor_path); |     index_descriptor_t desc = read_index_descriptor(descriptor_path); | ||||||
| 
 | 
 | ||||||
|     IndexCtx.es_url = args->es_url; |     IndexCtx.es_url = args->es_url; | ||||||
| @ -550,6 +576,7 @@ int main(int argc, const char *argv[]) { | |||||||
|     char *common_es_url = NULL; |     char *common_es_url = NULL; | ||||||
|     char *common_es_index = NULL; |     char *common_es_index = NULL; | ||||||
|     char *common_script_path = NULL; |     char *common_script_path = NULL; | ||||||
|  |     char *common_incremental = NULL; | ||||||
|     int common_async_script = 0; |     int common_async_script = 0; | ||||||
|     int common_threads = 0; |     int common_threads = 0; | ||||||
| 
 | 
 | ||||||
| @ -568,7 +595,7 @@ int main(int argc, const char *argv[]) { | |||||||
|                         "Thumbnail size, in pixels. Use negative value to disable. DEFAULT=500"), |                         "Thumbnail size, in pixels. Use negative value to disable. DEFAULT=500"), | ||||||
|             OPT_INTEGER(0, "content-size", &scan_args->content_size, |             OPT_INTEGER(0, "content-size", &scan_args->content_size, | ||||||
|                         "Number of bytes to be extracted from text documents. Use negative value to disable. DEFAULT=32768"), |                         "Number of bytes to be extracted from text documents. Use negative value to disable. DEFAULT=32768"), | ||||||
|             OPT_STRING(0, "incremental", &scan_args->incremental, |             OPT_STRING(0, "incremental", &common_incremental, | ||||||
|                        "Reuse an existing index and only scan modified files."), |                        "Reuse an existing index and only scan modified files."), | ||||||
|             OPT_STRING('o', "output", &scan_args->output, "Output directory. DEFAULT=index.sist2/"), |             OPT_STRING('o', "output", &scan_args->output, "Output directory. DEFAULT=index.sist2/"), | ||||||
|             OPT_STRING(0, "rewrite-url", &scan_args->rewrite_url, "Serve files from this url instead of from disk."), |             OPT_STRING(0, "rewrite-url", &scan_args->rewrite_url, "Serve files from this url instead of from disk."), | ||||||
| @ -606,6 +633,8 @@ int main(int argc, const char *argv[]) { | |||||||
|             OPT_STRING(0, "es-url", &common_es_url, "Elasticsearch url with port. DEFAULT=http://localhost:9200"), |             OPT_STRING(0, "es-url", &common_es_url, "Elasticsearch url with port. DEFAULT=http://localhost:9200"), | ||||||
|             OPT_STRING(0, "es-index", &common_es_index, "Elasticsearch index name. DEFAULT=sist2"), |             OPT_STRING(0, "es-index", &common_es_index, "Elasticsearch index name. DEFAULT=sist2"), | ||||||
|             OPT_BOOLEAN('p', "print", &index_args->print, "Just print JSON documents to stdout."), |             OPT_BOOLEAN('p', "print", &index_args->print, "Just print JSON documents to stdout."), | ||||||
|  |             OPT_STRING(0, "incremental", &common_incremental, | ||||||
|  |                        "Conduct incremental indexing, assumes that the old index is already digested by Elasticsearch."), | ||||||
|             OPT_STRING(0, "script-file", &common_script_path, "Path to user script."), |             OPT_STRING(0, "script-file", &common_script_path, "Path to user script."), | ||||||
|             OPT_STRING(0, "mappings-file", &index_args->es_mappings_path, "Path to Elasticsearch mappings."), |             OPT_STRING(0, "mappings-file", &index_args->es_mappings_path, "Path to Elasticsearch mappings."), | ||||||
|             OPT_STRING(0, "settings-file", &index_args->es_settings_path, "Path to Elasticsearch settings."), |             OPT_STRING(0, "settings-file", &index_args->es_settings_path, "Path to Elasticsearch settings."), | ||||||
| @ -662,6 +691,9 @@ int main(int argc, const char *argv[]) { | |||||||
|     exec_args->async_script = common_async_script; |     exec_args->async_script = common_async_script; | ||||||
|     index_args->async_script = common_async_script; |     index_args->async_script = common_async_script; | ||||||
| 
 | 
 | ||||||
|  |     scan_args->incremental = (common_incremental == NULL) ? NULL : strdup(common_incremental); | ||||||
|  |     index_args->incremental = (common_incremental != NULL); | ||||||
|  | 
 | ||||||
|     if (argc == 0) { |     if (argc == 0) { | ||||||
|         argparse_usage(&argparse); |         argparse_usage(&argparse); | ||||||
|         goto end; |         goto end; | ||||||
|  | |||||||
| @ -80,7 +80,7 @@ void parse(void *arg) { | |||||||
|     int inc_ts = incremental_get(ScanCtx.original_table, doc->path_md5); |     int inc_ts = incremental_get(ScanCtx.original_table, doc->path_md5); | ||||||
|     if (inc_ts != 0 && inc_ts == job->vfile.info.st_mtim.tv_sec) { |     if (inc_ts != 0 && inc_ts == job->vfile.info.st_mtim.tv_sec) { | ||||||
|         pthread_mutex_lock(&ScanCtx.copy_table_mu); |         pthread_mutex_lock(&ScanCtx.copy_table_mu); | ||||||
|         incremental_mark_file_for_copy(ScanCtx.copy_table, doc->path_md5); |         incremental_mark_file(ScanCtx.copy_table, doc->path_md5); | ||||||
|         pthread_mutex_unlock(&ScanCtx.copy_table_mu); |         pthread_mutex_unlock(&ScanCtx.copy_table_mu); | ||||||
| 
 | 
 | ||||||
|         pthread_mutex_lock(&ScanCtx.dbg_file_counts_mu); |         pthread_mutex_lock(&ScanCtx.dbg_file_counts_mu); | ||||||
| @ -89,6 +89,11 @@ void parse(void *arg) { | |||||||
| 
 | 
 | ||||||
|         return; |         return; | ||||||
|     } |     } | ||||||
|  |     if (ScanCtx.new_table != NULL) { | ||||||
|  |         pthread_mutex_lock(&ScanCtx.copy_table_mu); | ||||||
|  |         incremental_mark_file(ScanCtx.new_table, doc->path_md5); | ||||||
|  |         pthread_mutex_unlock(&ScanCtx.copy_table_mu); | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|     char *buf[MAGIC_BUF_SIZE]; |     char *buf[MAGIC_BUF_SIZE]; | ||||||
| 
 | 
 | ||||||
|  | |||||||
							
								
								
									
										10
									
								
								src/stats.c
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								src/stats.c
									
									
									
									
									
								
							| @ -96,16 +96,8 @@ void fill_tables(cJSON *document, UNUSED(const char index_id[MD5_STR_LENGTH])) { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void read_index_into_tables(index_t *index) { | void read_index_into_tables(index_t *index) { | ||||||
|     DIR *dir = opendir(index->path); |  | ||||||
|     struct dirent *de; |  | ||||||
|     while ((de = readdir(dir)) != NULL) { |  | ||||||
|         if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { |  | ||||||
|     char file_path[PATH_MAX]; |     char file_path[PATH_MAX]; | ||||||
|             snprintf(file_path, PATH_MAX, "%s%s", index->path, de->d_name); |     READ_INDICES(file_path, index->path, read_index(file_path, index->desc.id, index->desc.type, fill_tables), {}, 1); | ||||||
|             read_index(file_path, index->desc.id, index->desc.type, fill_tables); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     closedir(dir); |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static size_t rfind(const char *str, int c) { | static size_t rfind(const char *str, int c) { | ||||||
|  | |||||||
| @ -134,10 +134,11 @@ static int incremental_get_str(GHashTable *table, const char *path_md5) { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /**
 | /**
 | ||||||
|  * Not thread safe! |  * Marks a file by adding it to a table. | ||||||
|  |  * !!Not thread safe. | ||||||
|  */ |  */ | ||||||
| __always_inline | __always_inline | ||||||
| static int incremental_mark_file_for_copy(GHashTable *table, const unsigned char path_md5[MD5_DIGEST_LENGTH]) { | static int incremental_mark_file(GHashTable *table, const unsigned char path_md5[MD5_DIGEST_LENGTH]) { | ||||||
|     char *ptr = malloc(MD5_STR_LENGTH); |     char *ptr = malloc(MD5_STR_LENGTH); | ||||||
|     buf2hex(path_md5, MD5_DIGEST_LENGTH, ptr); |     buf2hex(path_md5, MD5_DIGEST_LENGTH, ptr); | ||||||
|     return g_hash_table_insert(table, ptr, GINT_TO_POINTER(1)); |     return g_hash_table_insert(table, ptr, GINT_TO_POINTER(1)); | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user