|
|
|
|
@@ -20,6 +20,8 @@ typedef struct es_indexer {
|
|
|
|
|
|
|
|
|
|
static es_indexer_t *Indexer;
|
|
|
|
|
|
|
|
|
|
void delete_queue(int max);
|
|
|
|
|
|
|
|
|
|
void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
|
|
|
|
|
|
|
|
|
|
cJSON *line = cJSON_CreateObject();
|
|
|
|
|
@@ -64,7 +66,7 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]
|
|
|
|
|
cJSON *term_obj = cJSON_AddObjectToObject(query, "term");
|
|
|
|
|
cJSON_AddStringToObject(term_obj, "index", index_id);
|
|
|
|
|
|
|
|
|
|
char * str = cJSON_Print(body);
|
|
|
|
|
char *str = cJSON_Print(body);
|
|
|
|
|
|
|
|
|
|
char bulk_url[4096];
|
|
|
|
|
snprintf(bulk_url, 4096, "%s/sist2/_update_by_query?pretty", Indexer->es_url);
|
|
|
|
|
@@ -87,24 +89,18 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]
|
|
|
|
|
cJSON_Delete(resp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void elastic_flush() {
|
|
|
|
|
|
|
|
|
|
if (Indexer == NULL) {
|
|
|
|
|
Indexer = create_indexer(IndexCtx.es_url);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void *create_bulk_buffer(int max, int *count, size_t *buf_len) {
|
|
|
|
|
es_bulk_line_t *line = Indexer->line_head;
|
|
|
|
|
|
|
|
|
|
int count = 0;
|
|
|
|
|
*count = 0;
|
|
|
|
|
|
|
|
|
|
size_t buf_size = 0;
|
|
|
|
|
size_t buf_cur = 0;
|
|
|
|
|
char *buf = malloc(1);
|
|
|
|
|
|
|
|
|
|
while (line != NULL) {
|
|
|
|
|
while (line != NULL && *count < max) {
|
|
|
|
|
char action_str[512];
|
|
|
|
|
snprintf(action_str, 512,
|
|
|
|
|
"{\"index\":{\"_id\":\"%s\", \"_type\":\"_doc\", \"_index\":\"sist2\"}}\n", line->uuid_str);
|
|
|
|
|
"{\"index\":{\"_id\":\"%s\", \"_type\":\"_doc\", \"_index\":\"sist2\"}}\n", line->uuid_str);
|
|
|
|
|
size_t action_str_len = strlen(action_str);
|
|
|
|
|
|
|
|
|
|
size_t line_len = strlen(line->line);
|
|
|
|
|
@@ -116,17 +112,20 @@ void elastic_flush() {
|
|
|
|
|
memcpy(buf + buf_cur, line->line, line_len);
|
|
|
|
|
buf_cur += line_len;
|
|
|
|
|
|
|
|
|
|
es_bulk_line_t *tmp = line;
|
|
|
|
|
line = line->next;
|
|
|
|
|
free(tmp);
|
|
|
|
|
count++;
|
|
|
|
|
(*count)++;
|
|
|
|
|
}
|
|
|
|
|
buf = realloc(buf, buf_size + 1);
|
|
|
|
|
*(buf+buf_cur) = '\0';
|
|
|
|
|
*(buf + buf_cur) = '\0';
|
|
|
|
|
|
|
|
|
|
Indexer->line_head = NULL;
|
|
|
|
|
Indexer->line_tail = NULL;
|
|
|
|
|
Indexer->queued = 0;
|
|
|
|
|
*buf_len = buf_cur;
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void _elastic_flush(int max) {
|
|
|
|
|
size_t buf_len;
|
|
|
|
|
int count;
|
|
|
|
|
void *buf = create_bulk_buffer(max, &count, &buf_len);
|
|
|
|
|
|
|
|
|
|
char bulk_url[4096];
|
|
|
|
|
snprintf(bulk_url, 4096, "%s/sist2/_bulk?pipeline=tie", Indexer->es_url);
|
|
|
|
|
@@ -136,15 +135,33 @@ void elastic_flush() {
|
|
|
|
|
LOG_FATALF("elastic.c", "Could not connect to %s, make sure that elasticsearch is running!\n", IndexCtx.es_url)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_INFOF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_cur / 1024, r->status_code);
|
|
|
|
|
if (r->status_code == 413) {
|
|
|
|
|
|
|
|
|
|
if (r->status_code != 200 && r->status_code != 413) {
|
|
|
|
|
if (max <= 1) {
|
|
|
|
|
LOG_ERRORF("elastic.c", "Single document too large, giving up: {%s}", Indexer->line_head->uuid_str)
|
|
|
|
|
free_response(r);
|
|
|
|
|
free(buf);
|
|
|
|
|
delete_queue(1);
|
|
|
|
|
if (Indexer->queued != 0) {
|
|
|
|
|
elastic_flush();
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_WARNINGF("elastic.c", "Payload too large, retrying (%d documents)", count);
|
|
|
|
|
|
|
|
|
|
free_response(r);
|
|
|
|
|
free(buf);
|
|
|
|
|
_elastic_flush(max / 2);
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
} else if (r->status_code != 200) {
|
|
|
|
|
cJSON *ret_json = cJSON_Parse(r->body);
|
|
|
|
|
if (cJSON_GetObjectItem(ret_json, "errors")->valueint != 0) {
|
|
|
|
|
cJSON *err;
|
|
|
|
|
cJSON_ArrayForEach(err, cJSON_GetObjectItem(ret_json, "items")) {
|
|
|
|
|
if (cJSON_GetObjectItem(cJSON_GetObjectItem(err, "index"), "status")->valueint != 201) {
|
|
|
|
|
char* str = cJSON_Print(err);
|
|
|
|
|
char *str = cJSON_Print(err);
|
|
|
|
|
LOG_ERRORF("elastic.c", "%s\n", str);
|
|
|
|
|
cJSON_free(str);
|
|
|
|
|
}
|
|
|
|
|
@@ -152,12 +169,44 @@ void elastic_flush() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cJSON_Delete(ret_json);
|
|
|
|
|
delete_queue(Indexer->queued);
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
LOG_INFOF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_len / 1024, r->status_code);
|
|
|
|
|
|
|
|
|
|
delete_queue(max);
|
|
|
|
|
|
|
|
|
|
if (Indexer->queued != 0) {
|
|
|
|
|
elastic_flush();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
free_response(r);
|
|
|
|
|
free(buf);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void delete_queue(int max) {
|
|
|
|
|
for (int i = 0; i < max; i++) {
|
|
|
|
|
es_bulk_line_t *tmp = Indexer->line_head;
|
|
|
|
|
Indexer->line_head = tmp->next;
|
|
|
|
|
if (Indexer->line_head == NULL) {
|
|
|
|
|
Indexer->line_tail = NULL;
|
|
|
|
|
} else {
|
|
|
|
|
free(tmp);
|
|
|
|
|
}
|
|
|
|
|
Indexer->queued -= 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void elastic_flush() {
|
|
|
|
|
|
|
|
|
|
if (Indexer == NULL) {
|
|
|
|
|
Indexer = create_indexer(IndexCtx.es_url);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_elastic_flush(Indexer->queued);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void elastic_index_line(es_bulk_line_t *line) {
|
|
|
|
|
|
|
|
|
|
if (Indexer == NULL) {
|
|
|
|
|
@@ -194,7 +243,7 @@ es_indexer_t *create_indexer(const char *url) {
|
|
|
|
|
return indexer;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void destroy_indexer(char * script, char index_id[UUID_STR_LEN]) {
|
|
|
|
|
void destroy_indexer(char *script, char index_id[UUID_STR_LEN]) {
|
|
|
|
|
|
|
|
|
|
char url[4096];
|
|
|
|
|
|
|
|
|
|
@@ -285,7 +334,7 @@ cJSON *elastic_get_document(const char *uuid_str) {
|
|
|
|
|
char *elastic_get_status() {
|
|
|
|
|
char url[4096];
|
|
|
|
|
snprintf(url, 4096,
|
|
|
|
|
"%s/_cluster/state/metadata/sist2?filter_path=metadata.indices.*.state", WebCtx.es_url);
|
|
|
|
|
"%s/_cluster/state/metadata/sist2?filter_path=metadata.indices.*.state", WebCtx.es_url);
|
|
|
|
|
|
|
|
|
|
response_t *r = web_get(url);
|
|
|
|
|
cJSON *json = NULL;
|
|
|
|
|
|