From ca973d63a47f073f058adf6b6eae1aa35bbc8e95 Mon Sep 17 00:00:00 2001 From: simon987 Date: Sun, 12 Mar 2023 11:38:31 -0400 Subject: [PATCH] Still WIP.. --- README.md | 4 +- docker-compose.yml | 4 +- scripts/start_dev_es.sh | 2 +- src/cli.c | 4 -- src/cli.h | 1 - src/ctx.h | 1 - src/index/elastic.c | 2 +- src/io/store.c | 5 +- src/io/store.h | 1 - src/log.h | 5 ++ src/main.c | 11 ++- src/sist.h | 2 +- src/tpool.c | 151 +++++++++++++++++++++------------------- src/tpool.h | 2 +- 14 files changed, 99 insertions(+), 96 deletions(-) diff --git a/README.md b/README.md index 2703260..298dbe2 100644 --- a/README.md +++ b/README.md @@ -37,12 +37,12 @@ sist2 (Simple incremental search tool) 1. Download [from official website](https://www.elastic.co/downloads/elasticsearch) 1. *(or)* Run using docker: ```bash - docker run -d -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.14.0 + docker run -d -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.17.9 ``` 1. *(or)* Run using docker-compose: ```yaml elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:7.14.0 + image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9 environment: - discovery.type=single-node - "ES_JAVA_OPTS=-Xms1G -Xmx2G" diff --git a/docker-compose.yml b/docker-compose.yml index d6d891c..3b9aef6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: "3" services: elasticsearch: - image: elasticsearch:7.14.0 + image: elasticsearch:7.17.9 container_name: sist2-es environment: - "discovery.type=single-node" @@ -15,9 +15,9 @@ services: - /mnt/array/sist2-admin-data/:/sist2-admin/ - /:/host ports: + - 4090:4090 # NOTE: Don't export this port publicly! - 8080:8080 - - 4090:4090 working_dir: /root/sist2-admin/ entrypoint: python3 command: diff --git a/scripts/start_dev_es.sh b/scripts/start_dev_es.sh index 6bd0bd5..ce091b5 100755 --- a/scripts/start_dev_es.sh +++ b/scripts/start_dev_es.sh @@ -1,3 +1,3 @@ docker run --rm -it --name "sist2-dev-es"\ -p 9200:9200 -e "discovery.type=single-node" \ - -e "ES_JAVA_OPTS=-Xms8g -Xmx8g" elasticsearch:7.14.0 + -e "ES_JAVA_OPTS=-Xms8g -Xmx8g" elasticsearch:7.17.9 diff --git a/src/cli.c b/src/cli.c index 1bafba6..8ff8875 100644 --- a/src/cli.c +++ b/src/cli.c @@ -273,10 +273,6 @@ int scan_args_validate(scan_args_t *args, int argc, const char **argv) { args->max_memory_buffer_mib = DEFAULT_MAX_MEM_BUFFER; } - if (args->scan_mem_limit_mib == OPTION_VALUE_UNSPECIFIED || args->scan_mem_limit_mib == OPTION_VALUE_DISABLE) { - args->scan_mem_limit_mib = DEFAULT_THROTTLE_MEMORY_THRESHOLD; - } - if (args->list_path != OPTION_VALUE_UNSPECIFIED) { if (strcmp(args->list_path, "-") == 0) { args->list_file = stdin; diff --git a/src/cli.h b/src/cli.h index 556d1d6..d953621 100644 --- a/src/cli.h +++ b/src/cli.h @@ -13,7 +13,6 @@ typedef struct scan_args { int tn_size; int content_size; int threads; - int scan_mem_limit_mib; char *incremental; char *output; char *rewrite_url; diff --git a/src/ctx.h b/src/ctx.h index a859eee..49fdbb7 100644 --- a/src/ctx.h +++ b/src/ctx.h @@ -35,7 +35,6 @@ typedef struct { int threads; int depth; int calculate_checksums; - size_t mem_limit; size_t stat_tn_size; size_t stat_index_size; diff --git a/src/index/elastic.c b/src/index/elastic.c index 0b41b07..9d582fc 100644 --- a/src/index/elastic.c +++ b/src/index/elastic.c @@ -65,7 +65,7 @@ void print_json(cJSON *document, const char id_str[SIST_DOC_ID_LEN]) { } void index_json_func(tpool_work_arg_shm_t *arg) { - // Copy arg to heap because it's going to be free immediately after this function returns + // Copy arg to heap because it's going to be freed immediately after this function returns es_bulk_line_t *line = malloc(arg->arg_size); memcpy(line, arg->arg, arg->arg_size); diff --git a/src/io/store.c b/src/io/store.c index 2e03e8d..dad686b 100644 --- a/src/io/store.c +++ b/src/io/store.c @@ -34,8 +34,7 @@ store_t *store_create(const char *path, size_t chunk_size) { #if (SIST_FAKE_STORE != 1) store->chunk_size = chunk_size; - store->shared_memory = mmap(NULL, sizeof(*store->shm), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); - store->shm = store->shared_memory; + store->shm = mmap(NULL, sizeof(*store->shm), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); open_env(path, &env, &dbi); @@ -53,7 +52,7 @@ void store_destroy(store_t *store) { LOG_DEBUG("store.c", "store_destroy()") #if (SIST_FAKE_STORE != 1) - munmap(store->shared_memory, sizeof(*store->shm)); + munmap(store->shm, sizeof(*store->shm)); mdb_dbi_close(store->proc.env, store->proc.dbi); mdb_env_close(store->proc.env); diff --git a/src/io/store.h b/src/io/store.h index b6c8472..ce27ded 100644 --- a/src/io/store.h +++ b/src/io/store.h @@ -14,7 +14,6 @@ typedef struct store_t { char path[PATH_MAX]; size_t chunk_size; - void *shared_memory; struct { MDB_dbi dbi; diff --git a/src/log.h b/src/log.h index e2e7cd0..113a577 100644 --- a/src/log.h +++ b/src/log.h @@ -37,6 +37,11 @@ sist_log(filepath, LOG_SIST_FATAL, str);\ exit(-1); +#define LOG_FATALF_NO_EXIT(filepath, fmt, ...) \ + sist_logf(filepath, LOG_SIST_FATAL, fmt, __VA_ARGS__); +#define LOG_FATAL_NO_EXIT(filepath, str) \ + sist_log(filepath, LOG_SIST_FATAL, str); + #include "sist.h" void sist_logf(const char *filepath, int level, char *format, ...); diff --git a/src/main.c b/src/main.c index 03e40a5..38a5dd5 100644 --- a/src/main.c +++ b/src/main.c @@ -17,6 +17,7 @@ #include #include +#include #include "stats.h" @@ -268,7 +269,6 @@ void initialize_scan_context(scan_args_t *args) { ScanCtx.threads = args->threads; ScanCtx.depth = args->depth; - ScanCtx.mem_limit = (size_t) args->scan_mem_limit_mib * 1024 * 1024; strncpy(ScanCtx.index.path, args->output, sizeof(ScanCtx.index.path)); strncpy(ScanCtx.index.desc.name, args->name, sizeof(ScanCtx.index.desc.name)); @@ -406,10 +406,10 @@ void sist2_scan(scan_args_t *args) { load_incremental_index(args); } - ScanCtx.writer_pool = tpool_create(1, writer_cleanup, FALSE, 0); + ScanCtx.writer_pool = tpool_create(1, writer_cleanup, FALSE); tpool_start(ScanCtx.writer_pool); - ScanCtx.pool = tpool_create(ScanCtx.threads, thread_cleanup, TRUE, ScanCtx.mem_limit); + ScanCtx.pool = tpool_create(ScanCtx.threads, thread_cleanup, TRUE); tpool_start(ScanCtx.pool); if (args->list_path) { @@ -493,7 +493,7 @@ void sist2_index(index_args_t *args) { f = index_json; } - IndexCtx.pool = tpool_create(args->threads, elastic_cleanup, args->print == 0, 0); + IndexCtx.pool = tpool_create(args->threads, elastic_cleanup, args->print == 0); tpool_start(IndexCtx.pool); READ_INDICES(file_path, args->index_path, { @@ -644,9 +644,6 @@ int main(int argc, const char *argv[]) { OPT_GROUP("Scan options"), OPT_INTEGER('t', "threads", &common_threads, "Number of threads. DEFAULT=1"), - OPT_INTEGER(0, "mem-throttle", &scan_args->scan_mem_limit_mib, - "Total memory threshold in MiB for scan throttling. DEFAULT=0", - set_to_negative_if_value_is_zero, (intptr_t) &scan_args->scan_mem_limit_mib), OPT_INTEGER('q', "thumbnail-quality", &scan_args->tn_quality, "Thumbnail quality, on a scale of 2 to 31, 2 being the best. DEFAULT=2", set_to_negative_if_value_is_zero, (intptr_t) &scan_args->tn_quality), diff --git a/src/sist.h b/src/sist.h index 6dff0d3..30ded55 100644 --- a/src/sist.h +++ b/src/sist.h @@ -49,7 +49,7 @@ #include #include "git_hash.h" -#define VERSION "2.14.2" +#define VERSION "2.14.3" static const char *const Version = VERSION; #ifndef SIST_PLATFORM diff --git a/src/tpool.c b/src/tpool.c index 7abea86..8678e22 100644 --- a/src/tpool.c +++ b/src/tpool.c @@ -6,7 +6,9 @@ #include #include "mempool/mempool.h" -#define MAX_QUEUE_SIZE 5000 +#define BLANK_STR " " +// TODO: Use slab OOM to control queue size +#define MAX_QUEUE_SIZE 100000 typedef struct tpool_work { tpool_work_arg_shm_t *arg; @@ -21,6 +23,7 @@ typedef struct tpool { pthread_mutex_t work_mutex; pthread_mutex_t mem_mutex; + // TODO: Initialize with SHARED attr pthread_cond_t has_work_cond; pthread_cond_t working_cond; @@ -30,9 +33,6 @@ typedef struct tpool { int work_cnt; int done_cnt; int busy_cnt; - int throttle_stuck_cnt; - size_t mem_limit; - size_t page_size; int stop; int waiting; @@ -41,8 +41,6 @@ typedef struct tpool { void (*cleanup_func)(); - // ========= - void *shared_memory; size_t shared_memory_size; ncx_slab_pool_t *mempool; @@ -137,6 +135,61 @@ int tpool_add_work(tpool_t *pool, thread_func_t func, tpool_work_arg_t *arg) { return 1; } +static void worker_thread_loop(tpool_t *pool) { + while (TRUE) { + pthread_mutex_lock(&pool->work_mutex); + if (pool->stop) { + break; + } + + if (pool->work_head == NULL) { + pthread_cond_wait(&(pool->has_work_cond), &(pool->work_mutex)); + } + + tpool_work_t *work = tpool_work_get(pool); + + if (work != NULL) { + pool->busy_cnt += 1; + } + + pthread_mutex_unlock(&(pool->work_mutex)); + + if (work != NULL) { + if (pool->stop) { + break; + } + + work->func(work->arg); + + pthread_mutex_lock(&pool->mem_mutex); + ncx_slab_free(pool->mempool, work->arg); + ncx_slab_free(pool->mempool, work); + pthread_mutex_unlock(&pool->mem_mutex); + } + + pthread_mutex_lock(&(pool->work_mutex)); + if (work != NULL) { + pool->busy_cnt -= 1; + pool->done_cnt++; + } + + if (pool->print_progress) { + if (LogCtx.json_logs) { + progress_bar_print_json(pool->done_cnt, pool->work_cnt, ScanCtx.stat_tn_size, + ScanCtx.stat_index_size, pool->waiting); + } else { + progress_bar_print((double) pool->done_cnt / pool->work_cnt, ScanCtx.stat_tn_size, + ScanCtx.stat_index_size); + } + } + + if (pool->work_head == NULL) { + pthread_cond_signal(&(pool->working_cond)); + } + pthread_mutex_unlock(&(pool->work_mutex)); + } +} + /** * Thread worker function */ @@ -146,58 +199,8 @@ static void *tpool_worker(void *arg) { int pid = fork(); if (pid == 0) { - while (TRUE) { - pthread_mutex_lock(&pool->work_mutex); - if (pool->stop) { - break; - } - if (pool->work_head == NULL) { - pthread_cond_wait(&(pool->has_work_cond), &(pool->work_mutex)); - } - - tpool_work_t *work = tpool_work_get(pool); - - if (work != NULL) { - pool->busy_cnt += 1; - } - - pthread_mutex_unlock(&(pool->work_mutex)); - - if (work != NULL) { - if (pool->stop) { - break; - } - - work->func(work->arg); - - pthread_mutex_lock(&pool->mem_mutex); - ncx_slab_free(pool->mempool, work->arg); - ncx_slab_free(pool->mempool, work); - pthread_mutex_unlock(&pool->mem_mutex); - } - - pthread_mutex_lock(&(pool->work_mutex)); - if (work != NULL) { - pool->busy_cnt -= 1; - pool->done_cnt++; - } - - if (pool->print_progress) { - if (LogCtx.json_logs) { - progress_bar_print_json(pool->done_cnt, pool->work_cnt, ScanCtx.stat_tn_size, - ScanCtx.stat_index_size, pool->waiting); - } else { - progress_bar_print((double) pool->done_cnt / pool->work_cnt, ScanCtx.stat_tn_size, - ScanCtx.stat_index_size); - } - } - - if (pool->work_head == NULL) { - pthread_cond_signal(&(pool->working_cond)); - } - pthread_mutex_unlock(&(pool->work_mutex)); - } + worker_thread_loop(pool); if (pool->cleanup_func != NULL) { LOG_INFO("tpool.c", "Executing cleanup function") @@ -211,15 +214,27 @@ static void *tpool_worker(void *arg) { } else { int status; + // TODO: On crash, print debug info and resume thread waitpid(pid, &status, 0); - LOG_ERRORF("tpool.c", "child processed terminated with status code %d, signal=%d", WEXITSTATUS(status), WIFSTOPPED(status) ? WSTOPSIG(status) : -1) + LOG_DEBUGF("tpool.c", "Child process terminated with status code %d", WEXITSTATUS(status)) - if (WIFSTOPPED(status)) { - pthread_mutex_lock(&(pool->work_mutex)); - pool->busy_cnt -= 1; - pool->done_cnt++; - pthread_mutex_unlock(&(pool->work_mutex)); + pthread_mutex_lock(&(pool->work_mutex)); + pool->busy_cnt -= 1; + pool->done_cnt++; + pthread_mutex_unlock(&(pool->work_mutex)); + + if (WIFSIGNALED(status)) { +// parse_job_t *job = g_hash_table_lookup(ScanCtx.dbg_current_files, GINT_TO_POINTER(pthread_self())); + const char *job_filepath = "TODO"; + + LOG_FATALF_NO_EXIT( + "tpool.c", + "Child process was terminated by signal (%s).\n" + BLANK_STR "The process was working on %s", + strsignal(WTERMSIG(status)), + job_filepath + ) } } @@ -295,9 +310,8 @@ void tpool_destroy(tpool_t *pool) { * Create a thread pool * @param thread_cnt Worker threads count */ -tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress, size_t mem_limit) { +tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress) { - // ============= size_t shm_size = 1024 * 1024 * 2000; void *shared_memory = mmap(NULL, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); @@ -312,20 +326,15 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress, s ncx_slab_init(pool->mempool); - // ============= - pool->thread_cnt = thread_cnt; pool->work_cnt = 0; pool->done_cnt = 0; pool->busy_cnt = 0; - pool->throttle_stuck_cnt = 0; - pool->mem_limit = mem_limit; pool->stop = FALSE; pool->waiting = FALSE; pool->cleanup_func = cleanup_func; memset(pool->threads, 0, sizeof(pool->threads)); pool->print_progress = print_progress; - pool->page_size = getpagesize(); pthread_mutexattr_t mutexattr; pthread_mutexattr_init(&mutexattr); @@ -339,7 +348,7 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress, s pthread_condattr_setpshared(&condattr, TRUE); pthread_cond_init(&(pool->has_work_cond), &condattr); - pthread_cond_init(&(pool->working_cond),&condattr); + pthread_cond_init(&(pool->working_cond), &condattr); pool->work_head = NULL; pool->work_tail = NULL; diff --git a/src/tpool.h b/src/tpool.h index c1898f3..8f73449 100644 --- a/src/tpool.h +++ b/src/tpool.h @@ -18,7 +18,7 @@ typedef struct { typedef void (*thread_func_t)(tpool_work_arg_shm_t *arg); -tpool_t *tpool_create(int num, void (*cleanup_func)(), int print_progress, size_t mem_limit); +tpool_t *tpool_create(int num, void (*cleanup_func)(), int print_progress); void tpool_start(tpool_t *pool);