Compare commits

..

2 Commits

5 changed files with 31 additions and 13 deletions

View File

@ -81,6 +81,11 @@ void web_args_destroy(web_args_t *args) {
} }
void exec_args_destroy(exec_args_t *args) { void exec_args_destroy(exec_args_t *args) {
if (args->index_path != NULL) {
free(args->index_path);
}
free(args); free(args);
} }

View File

@ -85,7 +85,7 @@ typedef struct web_args {
typedef struct exec_args { typedef struct exec_args {
char *es_url; char *es_url;
char *es_index; char *es_index;
const char *index_path; char *index_path;
const char *script_path; const char *script_path;
int async_script; int async_script;
char *script; char *script;

View File

@ -110,16 +110,16 @@ void execute_update_script(const char *script, int async, const char index_id[SI
cJSON *term_obj = cJSON_AddObjectToObject(query, "term"); cJSON *term_obj = cJSON_AddObjectToObject(query, "term");
cJSON_AddStringToObject(term_obj, "index", index_id); cJSON_AddStringToObject(term_obj, "index", index_id);
char *str = cJSON_Print(body); char *str = cJSON_PrintUnformatted(body);
char bulk_url[4096]; char url[4096];
if (async) { if (async) {
snprintf(bulk_url, sizeof(bulk_url), "%s/%s/_update_by_query?wait_for_completion=false", Indexer->es_url, snprintf(url, sizeof(url), "%s/%s/_update_by_query?wait_for_completion=false", Indexer->es_url,
Indexer->es_index); Indexer->es_index);
} else { } else {
snprintf(bulk_url, sizeof(bulk_url), "%s/%s/_update_by_query", Indexer->es_url, Indexer->es_index); snprintf(url, sizeof(url), "%s/%s/_update_by_query", Indexer->es_url, Indexer->es_index);
} }
response_t *r = web_post(bulk_url, str); response_t *r = web_post(url, str);
if (!async) { if (!async) {
LOG_INFOF("elastic.c", "Executed user script <%d>", r->status_code); LOG_INFOF("elastic.c", "Executed user script <%d>", r->status_code);
} }
@ -139,6 +139,11 @@ void execute_update_script(const char *script, int async, const char index_id[SI
if (async) { if (async) {
cJSON *task = cJSON_GetObjectItem(resp, "task"); cJSON *task = cJSON_GetObjectItem(resp, "task");
if (task == NULL) {
LOG_FATALF("elastic.c", "FIXME: Could not get task id: %s", r->body);
}
LOG_INFOF("elastic.c", "User script queued: %s/_tasks/%s", Indexer->es_url, task->valuestring); LOG_INFOF("elastic.c", "User script queued: %s/_tasks/%s", Indexer->es_url, task->valuestring);
} }

View File

@ -22,7 +22,7 @@ void free_response(response_t *resp) {
free(resp); free(resp);
} }
void web_post_async_poll(subreq_ctx_t* req) { void web_post_async_poll(subreq_ctx_t *req) {
fd_set fdread; fd_set fdread;
fd_set fdwrite; fd_set fdwrite;
fd_set fdexcep; fd_set fdexcep;
@ -34,7 +34,7 @@ void web_post_async_poll(subreq_ctx_t* req) {
CURLMcode mc = curl_multi_fdset(req->multi, &fdread, &fdwrite, &fdexcep, &maxfd); CURLMcode mc = curl_multi_fdset(req->multi, &fdread, &fdwrite, &fdexcep, &maxfd);
if(mc != CURLM_OK) { if (mc != CURLM_OK) {
req->done = TRUE; req->done = TRUE;
return; return;
} }
@ -47,7 +47,7 @@ void web_post_async_poll(subreq_ctx_t* req) {
struct timeval timeout = {1, 0}; struct timeval timeout = {1, 0};
int rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout); int rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
switch(rc) { switch (rc) {
case -1: case -1:
req->done = TRUE; req->done = TRUE;
break; break;
@ -142,6 +142,9 @@ response_t *web_post(const char *url, const char *data) {
curl_easy_setopt(curl, CURLOPT_POST, 1); curl_easy_setopt(curl, CURLOPT_POST, 1);
curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2"); curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2");
char err_buffer[CURL_ERROR_SIZE + 1] = {};
curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, err_buffer);
struct curl_slist *headers = NULL; struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Content-Type: application/json"); headers = curl_slist_append(headers, "Content-Type: application/json");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
@ -151,12 +154,16 @@ response_t *web_post(const char *url, const char *data) {
curl_easy_perform(curl); curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code); curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code);
curl_easy_cleanup(curl);
curl_slist_free_all(headers);
resp->body = buffer.buf; resp->body = buffer.buf;
resp->size = buffer.cur; resp->size = buffer.cur;
if (resp->status_code == 0) {
LOG_ERRORF("web.c", "CURL Error: %s", err_buffer)
}
curl_easy_cleanup(curl);
curl_slist_free_all(headers);
return resp; return resp;
} }
@ -175,7 +182,7 @@ response_t *web_put(const char *url, const char *data) {
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2"); curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2");
curl_easy_setopt(curl, CURLOPT_DNS_USE_GLOBAL_CACHE, 0); curl_easy_setopt(curl, CURLOPT_DNS_USE_GLOBAL_CACHE, 0);
curl_easy_setopt(curl, CURLOPT_IPRESOLVE, CURLOPT_DNS_LOCAL_IP4 ); curl_easy_setopt(curl, CURLOPT_IPRESOLVE, CURLOPT_DNS_LOCAL_IP4);
struct curl_slist *headers = NULL; struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Content-Type: application/json"); headers = curl_slist_append(headers, "Content-Type: application/json");

View File

@ -534,6 +534,7 @@ void sist2_exec_script(exec_args_t *args) {
IndexCtx.es_url = args->es_url; IndexCtx.es_url = args->es_url;
IndexCtx.es_index = args->es_index; IndexCtx.es_index = args->es_index;
IndexCtx.needs_es_connection = TRUE;
LOG_DEBUGF("main.c", "descriptor version %s (%s)", desc.version, desc.type) LOG_DEBUGF("main.c", "descriptor version %s (%s)", desc.version, desc.type)