diff --git a/src/index/elastic.c b/src/index/elastic.c index 6da4079..61eb166 100644 --- a/src/index/elastic.c +++ b/src/index/elastic.c @@ -52,11 +52,22 @@ void index_json_func(void *arg) { elastic_index_line(line); } +void delete_document(const char* document_id_str, void* UNUSED(_data)) { + size_t id_len = strlen(document_id_str); + es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t)); + bulk_line->type = ES_BULK_LINE_DELETE; + bulk_line->next = NULL; + memcpy(bulk_line->path_md5_str, document_id_str, MD5_STR_LENGTH); + tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); +} + + void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) { char *json = cJSON_PrintUnformatted(document); size_t json_len = strlen(json); es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t) + json_len + 2); + bulk_line->type = ES_BULK_LINE_INDEX; memcpy(bulk_line->line, json, json_len); memcpy(bulk_line->path_md5_str, index_id_str, MD5_STR_LENGTH); *(bulk_line->line + json_len) = '\n'; @@ -67,11 +78,6 @@ void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) { tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); } -void delete_document(const char* document_id_str, void* data) { - const char* index_id_str = data; - // TODO bulk delete -} - void execute_update_script(const char *script, int async, const char index_id[MD5_STR_LENGTH]) { if (Indexer == NULL) { @@ -130,30 +136,47 @@ void *create_bulk_buffer(int max, int *count, size_t *buf_len) { size_t buf_cur = 0; char *buf = malloc(8192); size_t buf_capacity = 8192; +#define GROW_BUF(delta) \ + while (buf_size + (delta) > buf_capacity) { \ + buf_capacity *= 2; \ + buf = realloc(buf, buf_capacity); \ + } \ + buf_size += (delta); \ + // see: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + // ES_BULK_LINE_INDEX: two lines, 1st action, 2nd content + // ES_BULK_LINE_DELETE: one line while (line != NULL && *count < max) { char action_str[256]; - snprintf( - action_str, sizeof(action_str), - "{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n", - line->path_md5_str, Indexer->es_index - ); + if (line->type == ES_BULK_LINE_INDEX) { + snprintf( + action_str, sizeof(action_str), + "{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n", + line->path_md5_str, Indexer->es_index + ); - size_t action_str_len = strlen(action_str); - size_t line_len = strlen(line->line); + size_t action_str_len = strlen(action_str); + size_t line_len = strlen(line->line); - while (buf_size + line_len + action_str_len > buf_capacity) { - buf_capacity *= 2; - buf = realloc(buf, buf_capacity); + GROW_BUF(action_str_len + line_len); + + memcpy(buf + buf_cur, action_str, action_str_len); + buf_cur += action_str_len; + memcpy(buf + buf_cur, line->line, line_len); + buf_cur += line_len; + + } else if (line->type == ES_BULK_LINE_DELETE) { + snprintf( + action_str, sizeof(action_str), + "{\"delete\":{\"_id\":\"%s\",\"_index\":\"%s\"}}\n", + line->path_md5_str, Indexer->es_index + ); + + size_t action_str_len = strlen(action_str); + GROW_BUF(action_str_len); + memcpy(buf + buf_cur, action_str, action_str_len); + buf_cur += action_str_len; } - - buf_size += line_len + action_str_len; - - memcpy(buf + buf_cur, action_str, action_str_len); - buf_cur += action_str_len; - memcpy(buf + buf_cur, line->line, line_len); - buf_cur += line_len; - line = line->next; (*count)++; } diff --git a/src/index/elastic.h b/src/index/elastic.h index 65cfc6c..5a33d15 100644 --- a/src/index/elastic.h +++ b/src/index/elastic.h @@ -3,9 +3,13 @@ #include "src/sist.h" +#define ES_BULK_LINE_INDEX 0 +#define ES_BULK_LINE_DELETE 1 + typedef struct es_bulk_line { struct es_bulk_line *next; char path_md5_str[MD5_STR_LENGTH]; + int type; char line[0]; } es_bulk_line_t; diff --git a/src/main.c b/src/main.c index 5b8cd88..3bb9401 100644 --- a/src/main.c +++ b/src/main.c @@ -489,7 +489,7 @@ void sist2_index(index_args_t *args) { snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", args->index_path); if (0 == access(file_path, R_OK)) { read_lines(file_path, (line_processor_t) { - .data = desc.id, + .data = NULL, .func = delete_document }); LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) @@ -698,7 +698,7 @@ int main(int argc, const char *argv[]) { exec_args->async_script = common_async_script; index_args->async_script = common_async_script; - scan_args->incremental = common_incremental; + scan_args->incremental = (common_incremental == NULL) ? NULL : strdup(common_incremental); index_args->incremental = (common_incremental != NULL); if (argc == 0) {