From 13f4dbed2df679f0bdaa85fe1fbcfaefb345417f Mon Sep 17 00:00:00 2001 From: simon987 Date: Sat, 11 Jul 2020 17:42:46 -0400 Subject: [PATCH] Handle 429, multi-threaded index module --- CMakeLists.txt | 5 +- ci/build.sh | 9 ++- docs/USAGE.md | 1 + src/cli.c | 7 ++ src/cli.h | 1 + src/ctx.h | 1 + src/index/elastic.c | 66 +++++++++++----- src/index/elastic.h | 5 +- src/index/web.c | 157 +++++++++++++++++++++++++++---------- src/main.c | 27 +++++-- src/static/search.html | 2 +- src/static/stats.html | 2 +- src/tpool.c | 20 ++++- src/tpool.h | 2 +- src/web/static_generated.c | 4 +- 15 files changed, 227 insertions(+), 82 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index de2b981..d4efa15 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ find_package(cJSON CONFIG REQUIRED) find_package(unofficial-glib CONFIG REQUIRED) find_package(unofficial-mongoose CONFIG REQUIRED) find_library(UUID_LIB NAMES uuid) +find_package(CURL CONFIG REQUIRED) #find_package(OpenSSL REQUIRED) @@ -56,8 +57,6 @@ target_compile_options( sist2 PRIVATE -fPIC - -Werror -# -Wstringop-overflow=0 ) if (SIST_DEBUG) @@ -68,6 +67,7 @@ if (SIST_DEBUG) -fstack-protector -fno-omit-frame-pointer -fsanitize=address + -O2 ) target_link_options( sist2 @@ -107,6 +107,7 @@ target_link_libraries( unofficial::glib::glib unofficial::mongoose::mongoose # OpenSSL::SSL OpenSSL::Crypto + CURL::libcurl ${UUID_LIB} pthread diff --git a/ci/build.sh b/ci/build.sh index ea2b8c8..c5f0e43 100755 --- a/ci/build.sh +++ b/ci/build.sh @@ -1,16 +1,17 @@ #!/usr/bin/env bash +VCPKG_ROOT="/vcpkg" rm *.gz rm -rf CMakeFiles CMakeCache.txt -cmake -DSIST_DEBUG=off -DCMAKE_TOOLCHAIN_FILE=/vcpkg/scripts/buildsystems/vcpkg.cmake . -make +cmake -DSIST_DEBUG=off -DVCPKG_BUILD_TYPE=release -DCMAKE_TOOLCHAIN_FILE="${VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake" . +make -j 12 strip sist2 gzip -9 sist2 rm -rf CMakeFiles CMakeCache.txt -cmake -DSIST_DEBUG=on -DCMAKE_TOOLCHAIN_FILE=/vcpkg/scripts/buildsystems/vcpkg.cmake . -make +cmake -DSIST_DEBUG=on -DVCPKG_BUILD_TYPE=debug -DCMAKE_TOOLCHAIN_FILE="${VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake" . +make -j 12 cp /usr/lib/x86_64-linux-gnu/libasan.so.2.0.0 libasan.so.2 tar -czf sist2_debug.tar.gz sist2_debug libasan.so.2 diff --git a/docs/USAGE.md b/docs/USAGE.md index 6186ed8..f0bbadd 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -46,6 +46,7 @@ Scan options --mem-buffer= Maximum memory buffer size per thread in MB for files inside archives (see USAGE.md). DEFAULT: 2000 Index options + -t, --threads= Number of threads. DEFAULT=1 --es-url= Elasticsearch url with port. DEFAULT=http://localhost:9200 -p, --print Just print JSON documents to stdout. --script-file= Path to user script. diff --git a/src/cli.c b/src/cli.c index e597a31..7d0ff3c 100644 --- a/src/cli.c +++ b/src/cli.c @@ -260,6 +260,13 @@ int index_args_validate(index_args_t *args, int argc, const char **argv) { return 1; } + if (args->threads == 0) { + args->threads = 1; + } else if (args->threads < 0) { + fprintf(stderr, "Invalid threads: %d\n", args->threads); + return 1; + } + char *index_path = abspath(argv[1]); if (index_path == NULL) { fprintf(stderr, "File not found: %s\n", argv[1]); diff --git a/src/cli.h b/src/cli.h index 6da2bdf..c0f785d 100644 --- a/src/cli.h +++ b/src/cli.h @@ -41,6 +41,7 @@ typedef struct index_args { int print; int batch_size; int force_reset; + int threads; } index_args_t; typedef struct web_args { diff --git a/src/ctx.h b/src/ctx.h index 42125a6..a35cfb9 100644 --- a/src/ctx.h +++ b/src/ctx.h @@ -58,6 +58,7 @@ typedef struct { typedef struct { char *es_url; int batch_size; + tpool_t *pool; } IndexCtx_t; typedef struct { diff --git a/src/index/elastic.c b/src/index/elastic.c index 5e4f7d9..e3dcb9d 100644 --- a/src/index/elastic.c +++ b/src/index/elastic.c @@ -14,9 +14,18 @@ typedef struct es_indexer { } es_indexer_t; -static es_indexer_t *Indexer; +static __thread es_indexer_t *Indexer; void delete_queue(int max); +void elastic_flush(); + +void elastic_cleanup() { + elastic_flush(); + if (Indexer != NULL) { + free(Indexer->es_url); + free(Indexer); + } +} void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) { @@ -35,8 +44,12 @@ void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) { cJSON_Delete(line); } -void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) { +void index_json_func(void *arg) { + es_bulk_line_t *line = arg; + elastic_index_line(line); +} +void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) { char *json = cJSON_PrintUnformatted(document); size_t json_len = strlen(json); @@ -48,7 +61,7 @@ void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) { bulk_line->next = NULL; cJSON_free(json); - elastic_index_line(bulk_line); + tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); } void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]) { @@ -89,33 +102,44 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN] cJSON_Delete(resp); } +#define ACTION_STR_LEN 91 + void *create_bulk_buffer(int max, int *count, size_t *buf_len) { es_bulk_line_t *line = Indexer->line_head; *count = 0; size_t buf_size = 0; size_t buf_cur = 0; - char *buf = malloc(1); + char *buf = malloc(8196); + size_t buf_capacity = 8196; while (line != NULL && *count < max) { - char action_str[512]; - snprintf(action_str, 512, + char action_str[256]; + snprintf(action_str, 256, "{\"index\":{\"_id\":\"%s\", \"_type\":\"_doc\", \"_index\":\"sist2\"}}\n", line->uuid_str); - size_t action_str_len = strlen(action_str); size_t line_len = strlen(line->line); - buf = realloc(buf, buf_size + line_len + action_str_len); - buf_size += line_len + action_str_len; - memcpy(buf + buf_cur, action_str, action_str_len); - buf_cur += action_str_len; + while (buf_size + line_len + ACTION_STR_LEN > buf_capacity) { + buf_capacity *= 2; + buf = realloc(buf, buf_capacity); + } + + buf_size += line_len + ACTION_STR_LEN; + + memcpy(buf + buf_cur, action_str, ACTION_STR_LEN); + buf_cur += ACTION_STR_LEN; memcpy(buf + buf_cur, line->line, line_len); buf_cur += line_len; line = line->next; (*count)++; } - buf = realloc(buf, buf_size + 1); + + if (buf_size + 1 > buf_capacity) { + buf = realloc(buf, buf_capacity + 1); + } + *(buf + buf_cur) = '\0'; *buf_len = buf_cur; @@ -123,7 +147,7 @@ void *create_bulk_buffer(int max, int *count, size_t *buf_len) { } void print_errors(response_t *r) { - char * tmp = malloc(r->size + 1); + char *tmp = malloc(r->size + 1); memcpy(tmp, r->body, r->size); *(tmp + r->size) = '\0'; @@ -181,6 +205,15 @@ void _elastic_flush(int max) { _elastic_flush(max / 2); return; + } else if (r->status_code == 429) { + + free_response(r); + free(buf); + LOG_WARNING("elastic.c", "Got 429 status, will retry after delay") + usleep(1000000 * 20); + _elastic_flush(max); + return; + } else if (r->status_code != 200) { print_errors(r); delete_queue(Indexer->queued); @@ -257,7 +290,7 @@ es_indexer_t *create_indexer(const char *url) { return indexer; } -void destroy_indexer(char *script, char index_id[UUID_STR_LEN]) { +void finish_indexer(char *script, char *index_id) { char url[4096]; @@ -280,11 +313,6 @@ void destroy_indexer(char *script, char index_id[UUID_STR_LEN]) { r = web_post(url, ""); LOG_INFOF("elastic.c", "Merge index <%d>", r->status_code); free_response(r); - - if (Indexer != NULL) { - free(Indexer->es_url); - free(Indexer); - } } void elastic_init(int force_reset) { diff --git a/src/index/elastic.h b/src/index/elastic.h index b42ad65..298657d 100644 --- a/src/index/elastic.h +++ b/src/index/elastic.h @@ -16,15 +16,14 @@ typedef struct es_indexer es_indexer_t; void elastic_index_line(es_bulk_line_t *line); -void elastic_flush(); - void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]); void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]); es_indexer_t *create_indexer(const char* es_url); -void destroy_indexer(char *script, char index_id[UUID_STR_LEN]); +void elastic_cleanup(); +void finish_indexer(char *script, char *index_id); void elastic_init(int force_reset); diff --git a/src/index/web.c b/src/index/web.c index 8eaa16a..95e67b2 100644 --- a/src/index/web.c +++ b/src/index/web.c @@ -1,11 +1,19 @@ #include "web.h" #include "src/sist.h" -#include "src/ctx.h" #include #include +#include +size_t write_cb(char *ptr, size_t size, size_t nmemb, void *user_data) { + + size_t real_size = size * nmemb; + dyn_buffer_t *buf = user_data; + dyn_buffer_write(buf, ptr, real_size); + return real_size; +} + void free_response(response_t *resp) { if (resp->body != NULL) { free(resp->body); @@ -100,55 +108,124 @@ subreq_ctx_t *http_req(const char *url, const char *extra_headers, const char *p return ctx; } -response_t *web_get(const char *url) { - subreq_ctx_t *ctx = http_req(url, SIST2_HEADERS, NULL, "GET"); - while (ctx->ev_data.done == FALSE) { - mg_mgr_poll(&ctx->mgr, 50); - } - mg_mgr_free(&ctx->mgr); - - response_t *ret = ctx->ev_data.resp; - free(ctx); - return ret; -} - subreq_ctx_t *web_post_async(const char *url, const char *data) { return http_req(url, SIST2_HEADERS, data, "POST"); } -response_t *web_post(const char *url, const char *data) { - subreq_ctx_t *ctx = http_req(url, SIST2_HEADERS, data, "POST"); +response_t *web_get(const char *url) { + response_t *resp = malloc(sizeof(response_t)); - while (ctx->ev_data.done == FALSE) { - mg_mgr_poll(&ctx->mgr, 50); - } - mg_mgr_free(&ctx->mgr); + CURL *curl; + dyn_buffer_t buffer = dyn_buffer_create(); - response_t *ret = ctx->ev_data.resp; - free(ctx); - return ret; + curl = curl_easy_init(); + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&buffer)); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2"); + + struct curl_slist *headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + + curl_easy_perform(curl); + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code); + + curl_easy_cleanup(curl); + + resp->body = buffer.buf; + resp->size = buffer.cur; + return resp; } -response_t *web_put(const char *url, const char *data) { - subreq_ctx_t *ctx = http_req(url, SIST2_HEADERS, data, "PUT"); - while (ctx->ev_data.done == FALSE) { - mg_mgr_poll(&ctx->mgr, 50); - } - mg_mgr_free(&ctx->mgr); +response_t *web_post(const char *url, const char *data) { - response_t *ret = ctx->ev_data.resp; - free(ctx); - return ret; + response_t *resp = malloc(sizeof(response_t)); + + CURL *curl; + dyn_buffer_t buffer = dyn_buffer_create(); + + curl = curl_easy_init(); + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&buffer)); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt(curl, CURLOPT_POST, 1); + curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2"); + + struct curl_slist *headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data); + + curl_easy_perform(curl); + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code); + + curl_easy_cleanup(curl); + curl_slist_free_all(headers); + + resp->body = buffer.buf; + resp->size = buffer.cur; + + return resp; +} + + +response_t *web_put(const char *url, const char *data) { + + response_t *resp = malloc(sizeof(response_t)); + + CURL *curl; + dyn_buffer_t buffer = dyn_buffer_create(); + + curl = curl_easy_init(); + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&buffer)); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); + curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2"); + curl_easy_setopt(curl, CURLOPT_DNS_USE_GLOBAL_CACHE, 0); + curl_easy_setopt(curl, CURLOPT_IPRESOLVE, CURLOPT_DNS_LOCAL_IP4 ); + + struct curl_slist *headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data); + + curl_easy_perform(curl); + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code); + + curl_easy_cleanup(curl); + curl_slist_free_all(headers); + + resp->body = buffer.buf; + resp->size = buffer.cur; + return resp; } response_t *web_delete(const char *url) { - subreq_ctx_t *ctx = http_req(url, SIST2_HEADERS, NULL, "DELETE"); - while (ctx->ev_data.done == FALSE) { - mg_mgr_poll(&ctx->mgr, 50); - } - mg_mgr_free(&ctx->mgr); - response_t *ret = ctx->ev_data.resp; - free(ctx); - return ret; -} + response_t *resp = malloc(sizeof(response_t)); + + CURL *curl; + dyn_buffer_t buffer = dyn_buffer_create(); + + curl = curl_easy_init(); + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&buffer)); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); + curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2"); + + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, ""); + struct curl_slist *headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + + curl_easy_perform(curl); + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code); + + curl_easy_cleanup(curl); + curl_slist_free_all(headers); + + resp->body = buffer.buf; + resp->size = buffer.cur; + return resp; +} \ No newline at end of file diff --git a/src/main.c b/src/main.c index ae7aa3b..393ed6f 100644 --- a/src/main.c +++ b/src/main.c @@ -21,7 +21,7 @@ #define EPILOG "Made by simon987 . Released under GPL-3.0" -static const char *const Version = "2.5.1"; +static const char *const Version = "2.5.2"; static const char *const usage[] = { "sist2 scan [OPTION]... PATH", "sist2 index [OPTION]... INDEX", @@ -211,7 +211,7 @@ void sist2_scan(scan_args_t *args) { LOG_INFOF("main.c", "Loaded %d items in to mtime table.", g_hash_table_size(ScanCtx.original_table)) } - ScanCtx.pool = tpool_create(args->threads, thread_cleanup); + ScanCtx.pool = tpool_create(args->threads, thread_cleanup, TRUE); tpool_start(ScanCtx.pool); walk_directory_tree(ScanCtx.index.desc.root); tpool_wait(ScanCtx.pool); @@ -278,6 +278,16 @@ void sist2_index(index_args_t *args) { f = index_json; } + void (*cleanup)(); + if (args->print) { + cleanup = NULL; + } else { + cleanup = elastic_cleanup; + } + + IndexCtx.pool = tpool_create(args->threads, cleanup, FALSE); + tpool_start(IndexCtx.pool); + struct dirent *de; while ((de = readdir(dir)) != NULL) { if (strncmp(de->d_name, "_index_", sizeof("_index_") - 1) == 0) { @@ -288,10 +298,13 @@ void sist2_index(index_args_t *args) { } closedir(dir); + tpool_wait(IndexCtx.pool); + if (!args->print) { - elastic_flush(); - destroy_indexer(args->script, desc.uuid); + finish_indexer(args->script, desc.uuid); } + + tpool_destroy(IndexCtx.pool); } void sist2_exec_script(exec_args_t *args) { @@ -352,6 +365,7 @@ int main(int argc, const char *argv[]) { char *common_es_url = NULL; char *common_script_path = NULL; + int common_threads = 0; struct argparse_option options[] = { OPT_HELP(), @@ -361,7 +375,7 @@ int main(int argc, const char *argv[]) { OPT_BOOLEAN(0, "very-verbose", &LogCtx.very_verbose, "Turn on debug messages"), OPT_GROUP("Scan options"), - OPT_INTEGER('t', "threads", &scan_args->threads, "Number of threads. DEFAULT=1"), + OPT_INTEGER('t', "threads", &common_threads, "Number of threads. DEFAULT=1"), OPT_FLOAT('q', "quality", &scan_args->quality, "Thumbnail quality, on a scale of 1.0 to 31.0, 1.0 being the best. DEFAULT=5"), OPT_INTEGER(0, "size", &scan_args->size, @@ -389,6 +403,7 @@ int main(int argc, const char *argv[]) { "(see USAGE.md). DEFAULT: 2000"), OPT_GROUP("Index options"), + OPT_INTEGER('t', "threads", &common_threads, "Number of threads. DEFAULT=1"), OPT_STRING(0, "es-url", &common_es_url, "Elasticsearch url with port. DEFAULT=http://localhost:9200"), OPT_BOOLEAN('p', "print", &index_args->print, "Just print JSON documents to stdout."), OPT_STRING(0, "script-file", &common_script_path, "Path to user script."), @@ -426,6 +441,8 @@ int main(int argc, const char *argv[]) { exec_args->es_url = common_es_url; index_args->script_path = common_script_path; exec_args->script_path = common_script_path; + index_args->threads = common_threads; + scan_args->threads = common_threads; if (argc == 0) { argparse_usage(&argparse); diff --git a/src/static/search.html b/src/static/search.html index facd8bf..4923d81 100644 --- a/src/static/search.html +++ b/src/static/search.html @@ -11,7 +11,7 @@