This commit is contained in:
2020-08-16 19:16:28 -04:00
parent a0739d5177
commit 1c898640cf
10 changed files with 99 additions and 42 deletions

View File

@@ -9,6 +9,7 @@
typedef struct es_indexer {
int queued;
char *es_url;
char *es_index;
es_bulk_line_t *line_head;
es_bulk_line_t *line_tail;
} es_indexer_t;
@@ -17,11 +18,13 @@ typedef struct es_indexer {
static __thread es_indexer_t *Indexer;
void delete_queue(int max);
void elastic_flush();
void elastic_cleanup() {
elastic_flush();
if (Indexer != NULL) {
free(Indexer->es_index);
free(Indexer->es_url);
free(Indexer);
}
@@ -32,7 +35,7 @@ void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
cJSON *line = cJSON_CreateObject();
cJSON_AddStringToObject(line, "_id", uuid_str);
cJSON_AddStringToObject(line, "_index", "sist2");
cJSON_AddStringToObject(line, "_index", IndexCtx.es_index);
cJSON_AddStringToObject(line, "_type", "_doc");
cJSON_AddItemReferenceToObject(line, "_source", document);
@@ -67,7 +70,7 @@ void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
void execute_update_script(const char *script, int async, const char index_id[UUID_STR_LEN]) {
if (Indexer == NULL) {
Indexer = create_indexer(IndexCtx.es_url);
Indexer = create_indexer(IndexCtx.es_url, IndexCtx.es_index);
}
cJSON *body = cJSON_CreateObject();
@@ -83,9 +86,10 @@ void execute_update_script(const char *script, int async, const char index_id[UU
char bulk_url[4096];
if (async) {
snprintf(bulk_url, sizeof(bulk_url), "%s/sist2/_update_by_query?wait_for_completion=false", Indexer->es_url);
snprintf(bulk_url, sizeof(bulk_url), "%s/%s/_update_by_query?wait_for_completion=false", Indexer->es_url,
Indexer->es_index);
} else {
snprintf(bulk_url, sizeof(bulk_url), "%s/sist2/_update_by_query", Indexer->es_url);
snprintf(bulk_url, sizeof(bulk_url), "%s/%s/_update_by_query", Indexer->es_url, Indexer->es_index);
}
response_t *r = web_post(bulk_url, str);
if (!async) {
@@ -113,8 +117,6 @@ void execute_update_script(const char *script, int async, const char index_id[UU
cJSON_Delete(resp);
}
#define ACTION_STR_LEN 91
void *create_bulk_buffer(int max, int *count, size_t *buf_len) {
es_bulk_line_t *line = Indexer->line_head;
*count = 0;
@@ -126,20 +128,24 @@ void *create_bulk_buffer(int max, int *count, size_t *buf_len) {
while (line != NULL && *count < max) {
char action_str[256];
snprintf(action_str, 256,
"{\"index\":{\"_id\":\"%s\", \"_type\":\"_doc\", \"_index\":\"sist2\"}}\n", line->uuid_str);
snprintf(
action_str, 256,
"{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n",
line->uuid_str, Indexer->es_index
);
size_t action_str_len = strlen(action_str);
size_t line_len = strlen(line->line);
while (buf_size + line_len + ACTION_STR_LEN > buf_capacity) {
while (buf_size + line_len + action_str_len > buf_capacity) {
buf_capacity *= 2;
buf = realloc(buf, buf_capacity);
}
buf_size += line_len + ACTION_STR_LEN;
buf_size += line_len + action_str_len;
memcpy(buf + buf_cur, action_str, ACTION_STR_LEN);
buf_cur += ACTION_STR_LEN;
memcpy(buf + buf_cur, action_str, action_str_len);
buf_cur += action_str_len;
memcpy(buf + buf_cur, line->line, line_len);
buf_cur += line_len;
@@ -177,6 +183,21 @@ void print_errors(response_t *r) {
free(tmp);
}
void print_error(response_t *r) {
char *tmp = malloc(r->size + 1);
memcpy(tmp, r->body, r->size);
*(tmp + r->size) = '\0';
cJSON *ret_json = cJSON_Parse(tmp);
if (cJSON_GetObjectItem(ret_json, "error") != NULL) {
char *str = cJSON_Print(cJSON_GetObjectItem(ret_json, "error"));
LOG_ERRORF("elastic.c", "%s\n", str);
cJSON_free(str);
}
cJSON_Delete(ret_json);
free(tmp);
}
void _elastic_flush(int max) {
if (max == 0) {
@@ -189,7 +210,7 @@ void _elastic_flush(int max) {
void *buf = create_bulk_buffer(max, &count, &buf_len);
char bulk_url[4096];
snprintf(bulk_url, 4096, "%s/sist2/_bulk?pipeline=tie", Indexer->es_url);
snprintf(bulk_url, sizeof(bulk_url), "%s/%s/_bulk?pipeline=tie", Indexer->es_url, Indexer->es_index);
response_t *r = web_post(bulk_url, buf);
if (r->status_code == 0) {
@@ -259,7 +280,7 @@ void delete_queue(int max) {
void elastic_flush() {
if (Indexer == NULL) {
Indexer = create_indexer(IndexCtx.es_url);
Indexer = create_indexer(IndexCtx.es_url, IndexCtx.es_index);
}
_elastic_flush(Indexer->queued);
@@ -268,7 +289,7 @@ void elastic_flush() {
void elastic_index_line(es_bulk_line_t *line) {
if (Indexer == NULL) {
Indexer = create_indexer(IndexCtx.es_url);
Indexer = create_indexer(IndexCtx.es_url, IndexCtx.es_index);
}
if (Indexer->line_head == NULL) {
@@ -286,14 +307,18 @@ void elastic_index_line(es_bulk_line_t *line) {
}
}
es_indexer_t *create_indexer(const char *url) {
es_indexer_t *create_indexer(const char *url, const char *index) {
char *es_url = malloc(strlen(url) + 1);
strcpy(es_url, url);
char *es_index = malloc(strlen(index) + 1);
strcpy(es_index, index);
es_indexer_t *indexer = malloc(sizeof(es_indexer_t));
indexer->es_url = es_url;
indexer->es_index = es_index;
indexer->queued = 0;
indexer->line_head = NULL;
indexer->line_tail = NULL;
@@ -305,7 +330,7 @@ void finish_indexer(char *script, int async_script, char *index_id) {
char url[4096];
snprintf(url, sizeof(url), "%s/sist2/_refresh", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s/_refresh", IndexCtx.es_url, IndexCtx.es_index);
response_t *r = web_post(url, "");
LOG_INFOF("elastic.c", "Refresh index <%d>", r->status_code);
free_response(r);
@@ -314,18 +339,18 @@ void finish_indexer(char *script, int async_script, char *index_id) {
execute_update_script(script, async_script, index_id);
free(script);
snprintf(url, sizeof(url), "%s/sist2/_refresh", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s/_refresh", IndexCtx.es_url, IndexCtx.es_index);
r = web_post(url, "");
LOG_INFOF("elastic.c", "Refresh index <%d>", r->status_code);
free_response(r);
}
snprintf(url, sizeof(url), "%s/sist2/_forcemerge", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s/_forcemerge", IndexCtx.es_url, IndexCtx.es_index);
r = web_post(url, "");
LOG_INFOF("elastic.c", "Merge index <%d>", r->status_code);
free_response(r);
snprintf(url, sizeof(url), "%s/sist2/_settings", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s/_settings", IndexCtx.es_url, IndexCtx.es_index);
r = web_put(url, "{\"index\":{\"refresh_interval\":\"1s\"}}");
LOG_INFOF("elastic.c", "Set refresh interval <%d>", r->status_code);
free_response(r);
@@ -335,7 +360,7 @@ void elastic_init(int force_reset) {
// Check if index exists
char url[4096];
snprintf(url, 4096, "%s/sist2", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s", IndexCtx.es_url, IndexCtx.es_index);
response_t *r = web_get(url, 30);
int index_exists = r->status_code == 200;
free_response(r);
@@ -345,32 +370,38 @@ void elastic_init(int force_reset) {
LOG_INFOF("elastic.c", "Delete index <%d>", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s", IndexCtx.es_url, IndexCtx.es_index);
r = web_put(url, "");
if (r->status_code != 200) {
print_error(r);
LOG_FATAL("elastic.c", "Could not create index")
}
LOG_INFOF("elastic.c", "Create index <%d>", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2/_close", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s/_close", IndexCtx.es_url, IndexCtx.es_index);
r = web_post(url, "");
LOG_INFOF("elastic.c", "Close index <%d>", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/_ingest/pipeline/tie", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/_ingest/pipeline/tie", IndexCtx.es_url);
r = web_put(url, pipeline_json);
LOG_INFOF("elastic.c", "Create pipeline <%d>", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2/_settings", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s/_settings", IndexCtx.es_url, IndexCtx.es_index);
r = web_put(url, settings_json);
LOG_INFOF("elastic.c", "Update settings <%d>", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2/_mappings/_doc?include_type_name=true", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s/_mappings/_doc?include_type_name=true", IndexCtx.es_url, IndexCtx.es_index);
r = web_put(url, mappings_json);
LOG_INFOF("elastic.c", "Update mappings <%d>", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2/_open", IndexCtx.es_url);
snprintf(url, sizeof(url), "%s/%s/_open", IndexCtx.es_url, IndexCtx.es_index);
r = web_post(url, "");
LOG_INFOF("elastic.c", "Open index <%d>", r->status_code);
free_response(r);
@@ -379,7 +410,7 @@ void elastic_init(int force_reset) {
cJSON *elastic_get_document(const char *uuid_str) {
char url[4096];
snprintf(url, 4096, "%s/sist2/_doc/%s", WebCtx.es_url, uuid_str);
snprintf(url, sizeof(url), "%s/%s/_doc/%s", WebCtx.es_url, IndexCtx.es_index, uuid_str);
response_t *r = web_get(url, 3);
cJSON *json = NULL;
@@ -392,8 +423,8 @@ cJSON *elastic_get_document(const char *uuid_str) {
char *elastic_get_status() {
char url[4096];
snprintf(url, 4096,
"%s/_cluster/state/metadata/sist2?filter_path=metadata.indices.*.state", WebCtx.es_url);
snprintf(url, sizeof(url),
"%s/_cluster/state/metadata/%s?filter_path=metadata.indices.*.state", WebCtx.es_url, WebCtx.es_index);
response_t *r = web_get(url, 30);
cJSON *json = NULL;
@@ -405,8 +436,8 @@ char *elastic_get_status() {
const cJSON *metadata = cJSON_GetObjectItem(json, "metadata");
if (metadata != NULL) {
const cJSON *indices = cJSON_GetObjectItem(metadata, "indices");
const cJSON *sist2 = cJSON_GetObjectItem(indices, "sist2");
const cJSON *state = cJSON_GetObjectItem(sist2, "state");
const cJSON *index = cJSON_GetObjectItem(indices, WebCtx.es_index);
const cJSON *state = cJSON_GetObjectItem(index, "state");
strcpy(status, state->valuestring);
}
}