From 995a1966905f146e10b7e76b60cf3ed85dbe251b Mon Sep 17 00:00:00 2001 From: simon987 Date: Mon, 3 Aug 2020 19:44:43 -0400 Subject: [PATCH] Log user script task, add async arg --- src/cli.c | 1 + src/cli.h | 2 ++ src/index/elastic.c | 21 ++++++++++++++++----- src/index/elastic.h | 4 ++-- src/main.c | 9 +++++++-- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/src/cli.c b/src/cli.c index 462cda9..60f29d5 100644 --- a/src/cli.c +++ b/src/cli.c @@ -296,6 +296,7 @@ int index_args_validate(index_args_t *args, int argc, const char **argv) { LOG_DEBUGF("cli.c", "arg es_url=%s", args->es_url) LOG_DEBUGF("cli.c", "arg index_path=%s", args->index_path) LOG_DEBUGF("cli.c", "arg script_path=%s", args->script_path) + LOG_DEBUGF("cli.c", "arg async_script=%s", args->async_script) LOG_DEBUGF("cli.c", "arg script=%s", args->script) LOG_DEBUGF("cli.c", "arg print=%d", args->print) LOG_DEBUGF("cli.c", "arg batch_size=%d", args->batch_size) diff --git a/src/cli.h b/src/cli.h index ebd853d..5133665 100644 --- a/src/cli.h +++ b/src/cli.h @@ -40,6 +40,7 @@ typedef struct index_args { char *script; int print; int batch_size; + int async_script; int force_reset; int threads; } index_args_t; @@ -61,6 +62,7 @@ typedef struct exec_args { char *es_url; const char *index_path; const char *script_path; + int async_script; char *script; } exec_args_t; diff --git a/src/index/elastic.c b/src/index/elastic.c index 666931c..33aa69e 100644 --- a/src/index/elastic.c +++ b/src/index/elastic.c @@ -64,7 +64,7 @@ void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) { tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); } -void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]) { +void execute_update_script(const char *script, int async, const char index_id[UUID_STR_LEN]) { if (Indexer == NULL) { Indexer = create_indexer(IndexCtx.es_url); @@ -82,9 +82,15 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN] char *str = cJSON_Print(body); char bulk_url[4096]; - snprintf(bulk_url, 4096, "%s/sist2/_update_by_query?wait_for_completion=false", Indexer->es_url); + if (async) { + snprintf(bulk_url, sizeof(bulk_url), "%s/sist2/_update_by_query?wait_for_completion=false", Indexer->es_url); + } else { + snprintf(bulk_url, sizeof(bulk_url), "%s/sist2/_update_by_query", Indexer->es_url); + } response_t *r = web_post(bulk_url, str); - LOG_INFOF("elastic.c", "Executed user script <%d>", r->status_code); + if (!async) { + LOG_INFOF("elastic.c", "Executed user script <%d>", r->status_code); + } cJSON *resp = cJSON_Parse(r->body); cJSON_free(str); @@ -99,6 +105,11 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN] cJSON_free(error_str); } + if (async) { + cJSON *task = cJSON_GetObjectItem(resp, "task"); + LOG_INFOF("elastic.c", "User script queued: %s/_tasks/%s", Indexer->es_url, task->valuestring); + } + cJSON_Delete(resp); } @@ -290,7 +301,7 @@ es_indexer_t *create_indexer(const char *url) { return indexer; } -void finish_indexer(char *script, char *index_id) { +void finish_indexer(char *script, int async_script, char *index_id) { char url[4096]; @@ -300,7 +311,7 @@ void finish_indexer(char *script, char *index_id) { free_response(r); if (script != NULL) { - execute_update_script(script, index_id); + execute_update_script(script, async_script, index_id); free(script); snprintf(url, sizeof(url), "%s/sist2/_refresh", IndexCtx.es_url); diff --git a/src/index/elastic.h b/src/index/elastic.h index 298657d..fcbdc21 100644 --- a/src/index/elastic.h +++ b/src/index/elastic.h @@ -23,7 +23,7 @@ void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]); es_indexer_t *create_indexer(const char* es_url); void elastic_cleanup(); -void finish_indexer(char *script, char *index_id); +void finish_indexer(char *script, int async_script, char *index_id); void elastic_init(int force_reset); @@ -31,6 +31,6 @@ cJSON *elastic_get_document(const char *uuid_str); char *elastic_get_status(); -void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]); +void execute_update_script(const char *script, int async, const char index_id[UUID_STR_LEN]); #endif diff --git a/src/main.c b/src/main.c index 9951c38..3ecb4cb 100644 --- a/src/main.c +++ b/src/main.c @@ -320,7 +320,7 @@ void sist2_index(index_args_t *args) { tpool_destroy(IndexCtx.pool); if (!args->print) { - finish_indexer(args->script, desc.uuid); + finish_indexer(args->script, args->async_script, desc.uuid); } store_destroy(IndexCtx.tag_store); @@ -340,7 +340,7 @@ void sist2_exec_script(exec_args_t *args) { LOG_DEBUGF("main.c", "descriptor version %s (%s)", desc.version, desc.type) - execute_update_script(args->script, desc.uuid); + execute_update_script(args->script, args->async_script, desc.uuid); free(args->script); } @@ -391,6 +391,7 @@ int main(int argc, const char *argv[]) { char *common_es_url = NULL; char *common_script_path = NULL; + int common_async_script = 0; int common_threads = 0; struct argparse_option options[] = { @@ -433,6 +434,7 @@ int main(int argc, const char *argv[]) { OPT_STRING(0, "es-url", &common_es_url, "Elasticsearch url with port. DEFAULT=http://localhost:9200"), OPT_BOOLEAN('p', "print", &index_args->print, "Just print JSON documents to stdout."), OPT_STRING(0, "script-file", &common_script_path, "Path to user script."), + OPT_BOOLEAN(0, "async-script", &common_async_script, "Execute user script asynchronously."), OPT_INTEGER(0, "batch-size", &index_args->batch_size, "Index batch size. DEFAULT: 100"), OPT_BOOLEAN('f', "force-reset", &index_args->force_reset, "Reset Elasticsearch mappings and settings. " "(You must use this option the first time you use the index command)"), @@ -445,6 +447,7 @@ int main(int argc, const char *argv[]) { OPT_GROUP("Exec-script options"), OPT_STRING(0, "script-file", &common_script_path, "Path to user script."), + OPT_BOOLEAN(0, "async-script", &common_async_script, "Execute user script asynchronously."), OPT_END(), }; @@ -470,6 +473,8 @@ int main(int argc, const char *argv[]) { exec_args->script_path = common_script_path; index_args->threads = common_threads; scan_args->threads = common_threads; + exec_args->async_script = common_async_script; + index_args->async_script = common_async_script; if (argc == 0) { argparse_usage(&argparse);