mirror of
https://github.com/simon987/sist2.git
synced 2025-12-16 00:39:04 +00:00
CURL error handling, fix ES version handling, support for ES8, add --es-insecure-ssl argument
This commit is contained in:
@@ -53,7 +53,7 @@ void print_json(cJSON *document, const char id_str[SIST_DOC_ID_LEN]) {
|
||||
|
||||
cJSON_AddStringToObject(line, "_id", id_str);
|
||||
cJSON_AddStringToObject(line, "_index", IndexCtx.es_index);
|
||||
cJSON_AddStringToObject(line, "_type", "_doc");
|
||||
// cJSON_AddStringToObject(line, "_type", "_doc");
|
||||
cJSON_AddItemReferenceToObject(line, "_source", document);
|
||||
|
||||
char *json = cJSON_PrintUnformatted(line);
|
||||
@@ -119,7 +119,7 @@ void execute_update_script(const char *script, int async, const char index_id[SI
|
||||
} else {
|
||||
snprintf(url, sizeof(url), "%s/%s/_update_by_query", Indexer->es_url, Indexer->es_index);
|
||||
}
|
||||
response_t *r = web_post(url, str);
|
||||
response_t *r = web_post(url, str, IndexCtx.es_insecure_ssl);
|
||||
if (!async) {
|
||||
LOG_INFOF("elastic.c", "Executed user script <%d>", r->status_code);
|
||||
}
|
||||
@@ -150,7 +150,7 @@ void execute_update_script(const char *script, int async, const char index_id[SI
|
||||
cJSON_Delete(resp);
|
||||
}
|
||||
|
||||
void *create_bulk_buffer(int max, int *count, size_t *buf_len) {
|
||||
void *create_bulk_buffer(int max, int *count, size_t *buf_len, int legacy) {
|
||||
es_bulk_line_t *line = Indexer->line_head;
|
||||
*count = 0;
|
||||
|
||||
@@ -171,11 +171,20 @@ void *create_bulk_buffer(int max, int *count, size_t *buf_len) {
|
||||
while (line != NULL && *count < max) {
|
||||
char action_str[256];
|
||||
if (line->type == ES_BULK_LINE_INDEX) {
|
||||
snprintf(
|
||||
action_str, sizeof(action_str),
|
||||
"{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n",
|
||||
line->doc_id, Indexer->es_index
|
||||
);
|
||||
|
||||
if (legacy) {
|
||||
snprintf(
|
||||
action_str, sizeof(action_str),
|
||||
"{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n",
|
||||
line->doc_id, Indexer->es_index
|
||||
);
|
||||
} else {
|
||||
snprintf(
|
||||
action_str, sizeof(action_str),
|
||||
"{\"index\":{\"_id\":\"%s\",\"_index\":\"%s\"}}\n",
|
||||
line->doc_id, Indexer->es_index
|
||||
);
|
||||
}
|
||||
|
||||
size_t action_str_len = strlen(action_str);
|
||||
size_t line_len = strlen(line->line);
|
||||
@@ -263,11 +272,11 @@ void _elastic_flush(int max) {
|
||||
|
||||
size_t buf_len;
|
||||
int count;
|
||||
void *buf = create_bulk_buffer(max, &count, &buf_len);
|
||||
void *buf = create_bulk_buffer(max, &count, &buf_len, IS_LEGACY_VERSION(IndexCtx.es_version));
|
||||
|
||||
char bulk_url[4096];
|
||||
snprintf(bulk_url, sizeof(bulk_url), "%s/%s/_bulk?pipeline=tie", Indexer->es_url, Indexer->es_index);
|
||||
response_t *r = web_post(bulk_url, buf);
|
||||
response_t *r = web_post(bulk_url, buf, IndexCtx.es_insecure_ssl);
|
||||
|
||||
if (r->status_code == 0) {
|
||||
LOG_FATALF("elastic.c", "Could not connect to %s, make sure that elasticsearch is running!\n", IndexCtx.es_url)
|
||||
@@ -393,7 +402,7 @@ void finish_indexer(char *script, int async_script, char *index_id) {
|
||||
char url[4096];
|
||||
|
||||
snprintf(url, sizeof(url), "%s/%s/_refresh", IndexCtx.es_url, IndexCtx.es_index);
|
||||
response_t *r = web_post(url, "");
|
||||
response_t *r = web_post(url, "", IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Refresh index <%d>", r->status_code);
|
||||
free_response(r);
|
||||
|
||||
@@ -402,24 +411,24 @@ void finish_indexer(char *script, int async_script, char *index_id) {
|
||||
free(script);
|
||||
|
||||
snprintf(url, sizeof(url), "%s/%s/_refresh", IndexCtx.es_url, IndexCtx.es_index);
|
||||
r = web_post(url, "");
|
||||
r = web_post(url, "", IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Refresh index <%d>", r->status_code);
|
||||
free_response(r);
|
||||
}
|
||||
|
||||
snprintf(url, sizeof(url), "%s/%s/_forcemerge", IndexCtx.es_url, IndexCtx.es_index);
|
||||
r = web_post(url, "");
|
||||
r = web_post(url, "", IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Merge index <%d>", r->status_code);
|
||||
free_response(r);
|
||||
|
||||
snprintf(url, sizeof(url), "%s/%s/_settings", IndexCtx.es_url, IndexCtx.es_index);
|
||||
r = web_put(url, "{\"index\":{\"refresh_interval\":\"1s\"}}");
|
||||
r = web_put(url, "{\"index\":{\"refresh_interval\":\"1s\"}}", IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Set refresh interval <%d>", r->status_code);
|
||||
free_response(r);
|
||||
}
|
||||
|
||||
es_version_t *elastic_get_version(const char *es_url) {
|
||||
response_t *r = web_get(es_url, 30);
|
||||
es_version_t *elastic_get_version(const char *es_url, int insecure) {
|
||||
response_t *r = web_get(es_url, 30, insecure);
|
||||
|
||||
char *tmp = malloc(r->size + 1);
|
||||
memcpy(tmp, r->body, r->size);
|
||||
@@ -464,7 +473,7 @@ es_version_t *elastic_get_version(const char *es_url) {
|
||||
|
||||
void elastic_init(int force_reset, const char *user_mappings, const char *user_settings) {
|
||||
|
||||
es_version_t *es_version = elastic_get_version(IndexCtx.es_url);
|
||||
es_version_t *es_version = elastic_get_version(IndexCtx.es_url, IndexCtx.es_insecure_ssl);
|
||||
IndexCtx.es_version = es_version;
|
||||
|
||||
if (es_version == NULL) {
|
||||
@@ -473,33 +482,33 @@ void elastic_init(int force_reset, const char *user_mappings, const char *user_s
|
||||
|
||||
LOG_INFOF("elastic.c",
|
||||
"Elasticsearch version is %s (supported=%d, legacy=%d)",
|
||||
format_es_version(es_version), IS_SUPPORTED_ES_VERSION(es_version), USE_LEGACY_ES_SETTINGS(es_version));
|
||||
format_es_version(es_version), IS_SUPPORTED_ES_VERSION(es_version), IS_LEGACY_VERSION(es_version));
|
||||
|
||||
if (!IS_SUPPORTED_ES_VERSION(es_version)) {
|
||||
LOG_FATAL("elastic.c", "sist2 only supports Elasticsearch v6.8 or newer")
|
||||
LOG_FATAL("elastic.c", "This elasticsearch version is not supported!")
|
||||
}
|
||||
|
||||
char *settings = NULL;
|
||||
if (USE_LEGACY_ES_SETTINGS(es_version)) {
|
||||
settings = settings_json;
|
||||
} else {
|
||||
if (IS_LEGACY_VERSION(es_version)) {
|
||||
settings = settings_legacy_json;
|
||||
} else {
|
||||
settings = settings_json;
|
||||
}
|
||||
|
||||
// Check if index exists
|
||||
char url[4096];
|
||||
snprintf(url, sizeof(url), "%s/%s", IndexCtx.es_url, IndexCtx.es_index);
|
||||
response_t *r = web_get(url, 30);
|
||||
response_t *r = web_get(url, 30, IndexCtx.es_insecure_ssl);
|
||||
int index_exists = r->status_code == 200;
|
||||
free_response(r);
|
||||
|
||||
if (!index_exists || force_reset) {
|
||||
r = web_delete(url);
|
||||
r = web_delete(url, IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Delete index <%d>", r->status_code);
|
||||
free_response(r);
|
||||
|
||||
snprintf(url, sizeof(url), "%s/%s", IndexCtx.es_url, IndexCtx.es_index);
|
||||
r = web_put(url, "");
|
||||
r = web_put(url, "", IndexCtx.es_insecure_ssl);
|
||||
|
||||
if (r->status_code != 200) {
|
||||
print_error(r);
|
||||
@@ -510,17 +519,17 @@ void elastic_init(int force_reset, const char *user_mappings, const char *user_s
|
||||
free_response(r);
|
||||
|
||||
snprintf(url, sizeof(url), "%s/%s/_close", IndexCtx.es_url, IndexCtx.es_index);
|
||||
r = web_post(url, "");
|
||||
r = web_post(url, "", IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Close index <%d>", r->status_code);
|
||||
free_response(r);
|
||||
|
||||
snprintf(url, sizeof(url), "%s/_ingest/pipeline/tie", IndexCtx.es_url);
|
||||
r = web_put(url, pipeline_json);
|
||||
r = web_put(url, pipeline_json, IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Create pipeline <%d>", r->status_code);
|
||||
free_response(r);
|
||||
|
||||
snprintf(url, sizeof(url), "%s/%s/_settings", IndexCtx.es_url, IndexCtx.es_index);
|
||||
r = web_put(url, user_settings ? user_settings : settings);
|
||||
r = web_put(url, user_settings ? user_settings : settings, IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Update ES settings <%d>", r->status_code);
|
||||
if (r->status_code != 200) {
|
||||
print_error(r);
|
||||
@@ -528,8 +537,13 @@ void elastic_init(int force_reset, const char *user_mappings, const char *user_s
|
||||
}
|
||||
free_response(r);
|
||||
|
||||
snprintf(url, sizeof(url), "%s/%s/_mappings/_doc?include_type_name=true", IndexCtx.es_url, IndexCtx.es_index);
|
||||
r = web_put(url, user_mappings ? user_mappings : mappings_json);
|
||||
if (IS_LEGACY_VERSION(es_version)) {
|
||||
snprintf(url, sizeof(url), "%s/%s/_mappings/_doc?include_type_name=true", IndexCtx.es_url, IndexCtx.es_index);
|
||||
} else {
|
||||
snprintf(url, sizeof(url), "%s/%s/_mappings", IndexCtx.es_url, IndexCtx.es_index);
|
||||
}
|
||||
|
||||
r = web_put(url, user_mappings ? user_mappings : mappings_json, IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Update ES mappings <%d>", r->status_code);
|
||||
if (r->status_code != 200) {
|
||||
print_error(r);
|
||||
@@ -538,7 +552,7 @@ void elastic_init(int force_reset, const char *user_mappings, const char *user_s
|
||||
free_response(r);
|
||||
|
||||
snprintf(url, sizeof(url), "%s/%s/_open", IndexCtx.es_url, IndexCtx.es_index);
|
||||
r = web_post(url, "");
|
||||
r = web_post(url, "", IndexCtx.es_insecure_ssl);
|
||||
LOG_INFOF("elastic.c", "Open index <%d>", r->status_code);
|
||||
free_response(r);
|
||||
}
|
||||
@@ -548,7 +562,7 @@ cJSON *elastic_get_document(const char *id_str) {
|
||||
char url[4096];
|
||||
snprintf(url, sizeof(url), "%s/%s/_doc/%s", WebCtx.es_url, WebCtx.es_index, id_str);
|
||||
|
||||
response_t *r = web_get(url, 3);
|
||||
response_t *r = web_get(url, 3, WebCtx.es_insecure_ssl);
|
||||
cJSON *json = NULL;
|
||||
if (r->status_code == 200) {
|
||||
char *tmp = malloc(r->size + 1);
|
||||
@@ -566,7 +580,7 @@ char *elastic_get_status() {
|
||||
snprintf(url, sizeof(url),
|
||||
"%s/_cluster/state/metadata/%s?filter_path=metadata.indices.*.state", WebCtx.es_url, WebCtx.es_index);
|
||||
|
||||
response_t *r = web_get(url, 30);
|
||||
response_t *r = web_get(url, 30, IndexCtx.es_insecure_ssl);
|
||||
cJSON *json = NULL;
|
||||
char *status = malloc(128 * sizeof(char));
|
||||
status[0] = '\0';
|
||||
|
||||
Reference in New Issue
Block a user