Log user script task, add async arg

This commit is contained in:
simon987 2020-08-03 19:44:43 -04:00
parent 465d017e18
commit 995a196690
5 changed files with 28 additions and 9 deletions

View File

@ -296,6 +296,7 @@ int index_args_validate(index_args_t *args, int argc, const char **argv) {
LOG_DEBUGF("cli.c", "arg es_url=%s", args->es_url) LOG_DEBUGF("cli.c", "arg es_url=%s", args->es_url)
LOG_DEBUGF("cli.c", "arg index_path=%s", args->index_path) LOG_DEBUGF("cli.c", "arg index_path=%s", args->index_path)
LOG_DEBUGF("cli.c", "arg script_path=%s", args->script_path) LOG_DEBUGF("cli.c", "arg script_path=%s", args->script_path)
LOG_DEBUGF("cli.c", "arg async_script=%s", args->async_script)
LOG_DEBUGF("cli.c", "arg script=%s", args->script) LOG_DEBUGF("cli.c", "arg script=%s", args->script)
LOG_DEBUGF("cli.c", "arg print=%d", args->print) LOG_DEBUGF("cli.c", "arg print=%d", args->print)
LOG_DEBUGF("cli.c", "arg batch_size=%d", args->batch_size) LOG_DEBUGF("cli.c", "arg batch_size=%d", args->batch_size)

View File

@ -40,6 +40,7 @@ typedef struct index_args {
char *script; char *script;
int print; int print;
int batch_size; int batch_size;
int async_script;
int force_reset; int force_reset;
int threads; int threads;
} index_args_t; } index_args_t;
@ -61,6 +62,7 @@ typedef struct exec_args {
char *es_url; char *es_url;
const char *index_path; const char *index_path;
const char *script_path; const char *script_path;
int async_script;
char *script; char *script;
} exec_args_t; } exec_args_t;

View File

@ -64,7 +64,7 @@ void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); tpool_add_work(IndexCtx.pool, index_json_func, bulk_line);
} }
void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]) { void execute_update_script(const char *script, int async, const char index_id[UUID_STR_LEN]) {
if (Indexer == NULL) { if (Indexer == NULL) {
Indexer = create_indexer(IndexCtx.es_url); Indexer = create_indexer(IndexCtx.es_url);
@ -82,9 +82,15 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]
char *str = cJSON_Print(body); char *str = cJSON_Print(body);
char bulk_url[4096]; char bulk_url[4096];
snprintf(bulk_url, 4096, "%s/sist2/_update_by_query?wait_for_completion=false", Indexer->es_url); if (async) {
snprintf(bulk_url, sizeof(bulk_url), "%s/sist2/_update_by_query?wait_for_completion=false", Indexer->es_url);
} else {
snprintf(bulk_url, sizeof(bulk_url), "%s/sist2/_update_by_query", Indexer->es_url);
}
response_t *r = web_post(bulk_url, str); response_t *r = web_post(bulk_url, str);
LOG_INFOF("elastic.c", "Executed user script <%d>", r->status_code); if (!async) {
LOG_INFOF("elastic.c", "Executed user script <%d>", r->status_code);
}
cJSON *resp = cJSON_Parse(r->body); cJSON *resp = cJSON_Parse(r->body);
cJSON_free(str); cJSON_free(str);
@ -99,6 +105,11 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]
cJSON_free(error_str); cJSON_free(error_str);
} }
if (async) {
cJSON *task = cJSON_GetObjectItem(resp, "task");
LOG_INFOF("elastic.c", "User script queued: %s/_tasks/%s", Indexer->es_url, task->valuestring);
}
cJSON_Delete(resp); cJSON_Delete(resp);
} }
@ -290,7 +301,7 @@ es_indexer_t *create_indexer(const char *url) {
return indexer; return indexer;
} }
void finish_indexer(char *script, char *index_id) { void finish_indexer(char *script, int async_script, char *index_id) {
char url[4096]; char url[4096];
@ -300,7 +311,7 @@ void finish_indexer(char *script, char *index_id) {
free_response(r); free_response(r);
if (script != NULL) { if (script != NULL) {
execute_update_script(script, index_id); execute_update_script(script, async_script, index_id);
free(script); free(script);
snprintf(url, sizeof(url), "%s/sist2/_refresh", IndexCtx.es_url); snprintf(url, sizeof(url), "%s/sist2/_refresh", IndexCtx.es_url);

View File

@ -23,7 +23,7 @@ void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]);
es_indexer_t *create_indexer(const char* es_url); es_indexer_t *create_indexer(const char* es_url);
void elastic_cleanup(); void elastic_cleanup();
void finish_indexer(char *script, char *index_id); void finish_indexer(char *script, int async_script, char *index_id);
void elastic_init(int force_reset); void elastic_init(int force_reset);
@ -31,6 +31,6 @@ cJSON *elastic_get_document(const char *uuid_str);
char *elastic_get_status(); char *elastic_get_status();
void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]); void execute_update_script(const char *script, int async, const char index_id[UUID_STR_LEN]);
#endif #endif

View File

@ -320,7 +320,7 @@ void sist2_index(index_args_t *args) {
tpool_destroy(IndexCtx.pool); tpool_destroy(IndexCtx.pool);
if (!args->print) { if (!args->print) {
finish_indexer(args->script, desc.uuid); finish_indexer(args->script, args->async_script, desc.uuid);
} }
store_destroy(IndexCtx.tag_store); store_destroy(IndexCtx.tag_store);
@ -340,7 +340,7 @@ void sist2_exec_script(exec_args_t *args) {
LOG_DEBUGF("main.c", "descriptor version %s (%s)", desc.version, desc.type) LOG_DEBUGF("main.c", "descriptor version %s (%s)", desc.version, desc.type)
execute_update_script(args->script, desc.uuid); execute_update_script(args->script, args->async_script, desc.uuid);
free(args->script); free(args->script);
} }
@ -391,6 +391,7 @@ int main(int argc, const char *argv[]) {
char *common_es_url = NULL; char *common_es_url = NULL;
char *common_script_path = NULL; char *common_script_path = NULL;
int common_async_script = 0;
int common_threads = 0; int common_threads = 0;
struct argparse_option options[] = { struct argparse_option options[] = {
@ -433,6 +434,7 @@ int main(int argc, const char *argv[]) {
OPT_STRING(0, "es-url", &common_es_url, "Elasticsearch url with port. DEFAULT=http://localhost:9200"), OPT_STRING(0, "es-url", &common_es_url, "Elasticsearch url with port. DEFAULT=http://localhost:9200"),
OPT_BOOLEAN('p', "print", &index_args->print, "Just print JSON documents to stdout."), OPT_BOOLEAN('p', "print", &index_args->print, "Just print JSON documents to stdout."),
OPT_STRING(0, "script-file", &common_script_path, "Path to user script."), OPT_STRING(0, "script-file", &common_script_path, "Path to user script."),
OPT_BOOLEAN(0, "async-script", &common_async_script, "Execute user script asynchronously."),
OPT_INTEGER(0, "batch-size", &index_args->batch_size, "Index batch size. DEFAULT: 100"), OPT_INTEGER(0, "batch-size", &index_args->batch_size, "Index batch size. DEFAULT: 100"),
OPT_BOOLEAN('f', "force-reset", &index_args->force_reset, "Reset Elasticsearch mappings and settings. " OPT_BOOLEAN('f', "force-reset", &index_args->force_reset, "Reset Elasticsearch mappings and settings. "
"(You must use this option the first time you use the index command)"), "(You must use this option the first time you use the index command)"),
@ -445,6 +447,7 @@ int main(int argc, const char *argv[]) {
OPT_GROUP("Exec-script options"), OPT_GROUP("Exec-script options"),
OPT_STRING(0, "script-file", &common_script_path, "Path to user script."), OPT_STRING(0, "script-file", &common_script_path, "Path to user script."),
OPT_BOOLEAN(0, "async-script", &common_async_script, "Execute user script asynchronously."),
OPT_END(), OPT_END(),
}; };
@ -470,6 +473,8 @@ int main(int argc, const char *argv[]) {
exec_args->script_path = common_script_path; exec_args->script_path = common_script_path;
index_args->threads = common_threads; index_args->threads = common_threads;
scan_args->threads = common_threads; scan_args->threads = common_threads;
exec_args->async_script = common_async_script;
index_args->async_script = common_async_script;
if (argc == 0) { if (argc == 0) {
argparse_usage(&argparse); argparse_usage(&argparse);