From a8505cb8c1be0a423c11f6c1a03b5bd6c7efb826 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 20 Feb 2020 16:42:13 -0500 Subject: [PATCH] Fix for #28 --- src/index/elastic.c | 95 ++++++++++++++++++++++++++++++++++----------- src/main.c | 2 +- web/search.html | 2 +- 3 files changed, 74 insertions(+), 25 deletions(-) diff --git a/src/index/elastic.c b/src/index/elastic.c index 4fe9ac5..ded9bce 100644 --- a/src/index/elastic.c +++ b/src/index/elastic.c @@ -20,6 +20,8 @@ typedef struct es_indexer { static es_indexer_t *Indexer; +void delete_queue(int max); + void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) { cJSON *line = cJSON_CreateObject(); @@ -64,7 +66,7 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN] cJSON *term_obj = cJSON_AddObjectToObject(query, "term"); cJSON_AddStringToObject(term_obj, "index", index_id); - char * str = cJSON_Print(body); + char *str = cJSON_Print(body); char bulk_url[4096]; snprintf(bulk_url, 4096, "%s/sist2/_update_by_query?pretty", Indexer->es_url); @@ -87,24 +89,18 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN] cJSON_Delete(resp); } -void elastic_flush() { - - if (Indexer == NULL) { - Indexer = create_indexer(IndexCtx.es_url); - } - +void *create_bulk_buffer(int max, int *count, size_t *buf_len) { es_bulk_line_t *line = Indexer->line_head; - - int count = 0; + *count = 0; size_t buf_size = 0; size_t buf_cur = 0; char *buf = malloc(1); - while (line != NULL) { + while (line != NULL && *count < max) { char action_str[512]; snprintf(action_str, 512, - "{\"index\":{\"_id\":\"%s\", \"_type\":\"_doc\", \"_index\":\"sist2\"}}\n", line->uuid_str); + "{\"index\":{\"_id\":\"%s\", \"_type\":\"_doc\", \"_index\":\"sist2\"}}\n", line->uuid_str); size_t action_str_len = strlen(action_str); size_t line_len = strlen(line->line); @@ -116,17 +112,20 @@ void elastic_flush() { memcpy(buf + buf_cur, line->line, line_len); buf_cur += line_len; - es_bulk_line_t *tmp = line; line = line->next; - free(tmp); - count++; + (*count)++; } buf = realloc(buf, buf_size + 1); - *(buf+buf_cur) = '\0'; + *(buf + buf_cur) = '\0'; - Indexer->line_head = NULL; - Indexer->line_tail = NULL; - Indexer->queued = 0; + *buf_len = buf_cur; + return buf; +} + +void _elastic_flush(int max) { + size_t buf_len; + int count; + void *buf = create_bulk_buffer(max, &count, &buf_len); char bulk_url[4096]; snprintf(bulk_url, 4096, "%s/sist2/_bulk?pipeline=tie", Indexer->es_url); @@ -136,15 +135,33 @@ void elastic_flush() { LOG_FATALF("elastic.c", "Could not connect to %s, make sure that elasticsearch is running!\n", IndexCtx.es_url) } - LOG_INFOF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_cur / 1024, r->status_code); + if (r->status_code == 413) { - if (r->status_code != 200 && r->status_code != 413) { + if (max <= 1) { + LOG_ERRORF("elastic.c", "Single document too large, giving up: {%s}", Indexer->line_head->uuid_str) + free_response(r); + free(buf); + delete_queue(1); + if (Indexer->queued != 0) { + elastic_flush(); + } + return; + } + + LOG_WARNINGF("elastic.c", "Payload too large, retrying (%d documents)", count); + + free_response(r); + free(buf); + _elastic_flush(max / 2); + return; + + } else if (r->status_code != 200) { cJSON *ret_json = cJSON_Parse(r->body); if (cJSON_GetObjectItem(ret_json, "errors")->valueint != 0) { cJSON *err; cJSON_ArrayForEach(err, cJSON_GetObjectItem(ret_json, "items")) { if (cJSON_GetObjectItem(cJSON_GetObjectItem(err, "index"), "status")->valueint != 201) { - char* str = cJSON_Print(err); + char *str = cJSON_Print(err); LOG_ERRORF("elastic.c", "%s\n", str); cJSON_free(str); } @@ -152,12 +169,44 @@ void elastic_flush() { } cJSON_Delete(ret_json); + delete_queue(Indexer->queued); + + } else { + LOG_INFOF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_len / 1024, r->status_code); + + delete_queue(max); + + if (Indexer->queued != 0) { + elastic_flush(); + } } free_response(r); free(buf); } +void delete_queue(int max) { + for (int i = 0; i < max; i++) { + es_bulk_line_t *tmp = Indexer->line_head; + Indexer->line_head = tmp->next; + if (Indexer->line_head == NULL) { + Indexer->line_tail = NULL; + } else { + free(tmp); + } + Indexer->queued -= 1; + } +} + +void elastic_flush() { + + if (Indexer == NULL) { + Indexer = create_indexer(IndexCtx.es_url); + } + + _elastic_flush(Indexer->queued); +} + void elastic_index_line(es_bulk_line_t *line) { if (Indexer == NULL) { @@ -194,7 +243,7 @@ es_indexer_t *create_indexer(const char *url) { return indexer; } -void destroy_indexer(char * script, char index_id[UUID_STR_LEN]) { +void destroy_indexer(char *script, char index_id[UUID_STR_LEN]) { char url[4096]; @@ -285,7 +334,7 @@ cJSON *elastic_get_document(const char *uuid_str) { char *elastic_get_status() { char url[4096]; snprintf(url, 4096, - "%s/_cluster/state/metadata/sist2?filter_path=metadata.indices.*.state", WebCtx.es_url); + "%s/_cluster/state/metadata/sist2?filter_path=metadata.indices.*.state", WebCtx.es_url); response_t *r = web_get(url); cJSON *json = NULL; diff --git a/src/main.c b/src/main.c index e261f88..1271cc1 100644 --- a/src/main.c +++ b/src/main.c @@ -6,7 +6,7 @@ #define EPILOG "Made by simon987 . Released under GPL-3.0" -static const char *const Version = "1.2.12"; +static const char *const Version = "1.2.13"; static const char *const usage[] = { "sist2 scan [OPTION]... PATH", "sist2 index [OPTION]... INDEX", diff --git a/web/search.html b/web/search.html index 2fa3f7e..c6fdb76 100644 --- a/web/search.html +++ b/web/search.html @@ -11,7 +11,7 @@