From 8e96174e1ff6f6118f0ef54407de92b2375acdf2 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Sun, 23 Jan 2022 03:35:27 +0800 Subject: [PATCH 1/7] scan memory threshold --- src/cli.c | 6 ++++++ src/cli.h | 1 + src/ctx.h | 1 + src/main.c | 2 ++ src/tpool.c | 36 ++++++++++++++++++++++++++++++++++++ 5 files changed, 46 insertions(+) diff --git a/src/cli.c b/src/cli.c index fc2b762..afd0163 100644 --- a/src/cli.c +++ b/src/cli.c @@ -19,6 +19,8 @@ #define DEFAULT_MAX_MEM_BUFFER 2000 +#define DEFAULT_THROTTLE_MEMORY_THRESHOLD 0 + const char *TESS_DATAPATHS[] = { "/usr/share/tessdata/", "/usr/share/tesseract-ocr/tessdata/", @@ -255,6 +257,10 @@ int scan_args_validate(scan_args_t *args, int argc, const char **argv) { args->max_memory_buffer = DEFAULT_MAX_MEM_BUFFER; } + if (args->throttle_memory_threshold <= 0) { + args->throttle_memory_threshold = DEFAULT_THROTTLE_MEMORY_THRESHOLD; + } + if (args->list_path != NULL) { if (strcmp(args->list_path, "-") == 0) { args->list_file = stdin; diff --git a/src/cli.h b/src/cli.h index a32d40a..1cd92ba 100644 --- a/src/cli.h +++ b/src/cli.h @@ -10,6 +10,7 @@ typedef struct scan_args { int size; int content_size; int threads; + int throttle_memory_threshold; char *incremental; char *output; char *rewrite_url; diff --git a/src/ctx.h b/src/ctx.h index fb55998..6d6b224 100644 --- a/src/ctx.h +++ b/src/ctx.h @@ -35,6 +35,7 @@ typedef struct { int threads; int depth; int calculate_checksums; + size_t mem_limit; size_t stat_tn_size; size_t stat_index_size; diff --git a/src/main.c b/src/main.c index a1b567f..669037f 100644 --- a/src/main.c +++ b/src/main.c @@ -253,6 +253,7 @@ void initialize_scan_context(scan_args_t *args) { ScanCtx.threads = args->threads; ScanCtx.depth = args->depth; + ScanCtx.mem_limit = args->throttle_memory_threshold * 1024 * 1024; strncpy(ScanCtx.index.path, args->output, sizeof(ScanCtx.index.path)); strncpy(ScanCtx.index.desc.name, args->name, sizeof(ScanCtx.index.desc.name)); @@ -586,6 +587,7 @@ int main(int argc, const char *argv[]) { OPT_GROUP("Scan options"), OPT_INTEGER('t', "threads", &common_threads, "Number of threads. DEFAULT=1"), + OPT_STRING(0, "mem-throttle", &scan_args->throttle_memory_threshold, "Total memory threshold in MB for scan throttling. DEFAULT=0"), OPT_FLOAT('q', "quality", &scan_args->quality, "Thumbnail quality, on a scale of 1.0 to 31.0, 1.0 being the best. DEFAULT=3"), OPT_INTEGER(0, "size", &scan_args->size, diff --git a/src/tpool.c b/src/tpool.c index e0de5e8..be2fff5 100644 --- a/src/tpool.c +++ b/src/tpool.c @@ -114,6 +114,35 @@ int tpool_add_work(tpool_t *pool, thread_func_t func, void *arg) { return 1; } +/** + * see: https://github.com/htop-dev/htop/blob/f782f821f7f8081cb43bbad1c37f32830a260a81/linux/LinuxProcessList.c + */ +__always_inline +static size_t _get_total_mem_mb() { + FILE* statmfile = fopen("/proc/self/statm", "r"); + if (!statmfile) + return 0; + + long int dummy, dummy2, dummy3, dummy4, dummy5, dummy6; + long int m_resident; + + int r = fscanf(statmfile, "%ld %ld %ld %ld %ld %ld %ld", + &dummy, /* m_virt */ + &m_resident, + &dummy2, /* m_share */ + &dummy3, /* m_trs */ + &dummy4, /* unused since Linux 2.6; always 0 */ + &dummy5, /* m_drs */ + &dummy6); /* unused since Linux 2.6; always 0 */ + fclose(statmfile); + + if (r == 7) { + return m_resident * 4 / 1024; // XXX assume 4KB pages. + } else { + return 0; + } +} + /** * Thread worker function */ @@ -142,6 +171,13 @@ static void *tpool_worker(void *arg) { break; } + while(ScanCtx.mem_limit > 0 && _get_total_mem_mb() >= ScanCtx.mem_limit) { + if (pool->stop) { + break; + } + usleep(10000); + } + work->func(work->arg); if (pool->free_arg) { free(work->arg); From de187eff1c1af349bfe307f5901ce879e26f76ce Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Sun, 23 Jan 2022 03:54:20 +0800 Subject: [PATCH 2/7] minor fix --- src/tpool.c | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/tpool.c b/src/tpool.c index be2fff5..d38af8b 100644 --- a/src/tpool.c +++ b/src/tpool.c @@ -118,7 +118,7 @@ int tpool_add_work(tpool_t *pool, thread_func_t func, void *arg) { * see: https://github.com/htop-dev/htop/blob/f782f821f7f8081cb43bbad1c37f32830a260a81/linux/LinuxProcessList.c */ __always_inline -static size_t _get_total_mem_mb() { +static size_t _get_total_mem() { FILE* statmfile = fopen("/proc/self/statm", "r"); if (!statmfile) return 0; @@ -137,7 +137,7 @@ static size_t _get_total_mem_mb() { fclose(statmfile); if (r == 7) { - return m_resident * 4 / 1024; // XXX assume 4KB pages. + return m_resident * 4096; // XXX assume 4KB pages. } else { return 0; } @@ -167,15 +167,12 @@ static void *tpool_worker(void *arg) { pthread_mutex_unlock(&(pool->work_mutex)); if (work != NULL) { - if (pool->stop) { - break; + while(!pool->stop && ScanCtx.mem_limit > 0 && _get_total_mem() >= ScanCtx.mem_limit) { + usleep(10000); } - while(ScanCtx.mem_limit > 0 && _get_total_mem_mb() >= ScanCtx.mem_limit) { - if (pool->stop) { - break; - } - usleep(10000); + if (pool->stop) { + break; } work->func(work->arg); From f3674ffa0200db057b275dd874ad9d81afd95508 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Sun, 23 Jan 2022 04:31:30 +0800 Subject: [PATCH 3/7] stop threadpool when the memory limit is too low for any worker thread to proceed --- src/tpool.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/tpool.c b/src/tpool.c index d38af8b..a10f2e8 100644 --- a/src/tpool.c +++ b/src/tpool.c @@ -28,6 +28,7 @@ typedef struct tpool { int work_cnt; int done_cnt; int busy_cnt; + int throttle_stuck_cnt; int free_arg; int stop; @@ -148,6 +149,8 @@ static size_t _get_total_mem() { */ static void *tpool_worker(void *arg) { tpool_t *pool = arg; + int stuck_notified = 0; + int throttle_ms = 0; while (1) { pthread_mutex_lock(&pool->work_mutex); @@ -167,14 +170,34 @@ static void *tpool_worker(void *arg) { pthread_mutex_unlock(&(pool->work_mutex)); if (work != NULL) { + stuck_notified = 0; while(!pool->stop && ScanCtx.mem_limit > 0 && _get_total_mem() >= ScanCtx.mem_limit) { + if (!stuck_notified && throttle_ms >= 90000) { + // notify the pool that this thread is stuck. + pthread_mutex_lock(&(pool->work_mutex)); + pool->throttle_stuck_cnt += 1; + if (pool->throttle_stuck_cnt == pool->thread_cnt) { + LOG_FATAL("tpool.c", "Throttle memory limit too low, cannot proceed!"); + pool->stop = TRUE; + } + pthread_mutex_unlock(&(pool->work_mutex)); + stuck_notified = 1; + } usleep(10000); + throttle_ms += 10; } if (pool->stop) { break; } + // we are not stuck anymore. cancel our notification. + if (stuck_notified) { + pthread_mutex_lock(&(pool->work_mutex)); + pool->throttle_stuck_cnt -= 1; + pthread_mutex_unlock(&(pool->work_mutex)); + } + work->func(work->arg); if (pool->free_arg) { free(work->arg); @@ -283,6 +306,7 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int pri pool->work_cnt = 0; pool->done_cnt = 0; pool->busy_cnt = 0; + pool->throttle_stuck_cnt = 0; pool->stop = FALSE; pool->free_arg = free_arg; pool->cleanup_func = cleanup_func; From 6075c21a3ac5eeebda6ee4b8f326b1c9cb1a8170 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Sun, 23 Jan 2022 04:41:13 +0800 Subject: [PATCH 4/7] do not throttle writer/index thread pools --- src/cli.c | 4 ++-- src/cli.h | 2 +- src/main.c | 10 +++++----- src/tpool.c | 6 ++++-- src/tpool.h | 2 +- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/cli.c b/src/cli.c index afd0163..665d109 100644 --- a/src/cli.c +++ b/src/cli.c @@ -257,8 +257,8 @@ int scan_args_validate(scan_args_t *args, int argc, const char **argv) { args->max_memory_buffer = DEFAULT_MAX_MEM_BUFFER; } - if (args->throttle_memory_threshold <= 0) { - args->throttle_memory_threshold = DEFAULT_THROTTLE_MEMORY_THRESHOLD; + if (args->scan_mem_limit <= 0) { + args->scan_mem_limit = DEFAULT_THROTTLE_MEMORY_THRESHOLD; } if (args->list_path != NULL) { diff --git a/src/cli.h b/src/cli.h index 1cd92ba..b74fe96 100644 --- a/src/cli.h +++ b/src/cli.h @@ -10,7 +10,7 @@ typedef struct scan_args { int size; int content_size; int threads; - int throttle_memory_threshold; + int scan_mem_limit; char *incremental; char *output; char *rewrite_url; diff --git a/src/main.c b/src/main.c index 669037f..b7f3134 100644 --- a/src/main.c +++ b/src/main.c @@ -253,7 +253,7 @@ void initialize_scan_context(scan_args_t *args) { ScanCtx.threads = args->threads; ScanCtx.depth = args->depth; - ScanCtx.mem_limit = args->throttle_memory_threshold * 1024 * 1024; + ScanCtx.mem_limit = args->scan_mem_limit * 1024 * 1024; strncpy(ScanCtx.index.path, args->output, sizeof(ScanCtx.index.path)); strncpy(ScanCtx.index.desc.name, args->name, sizeof(ScanCtx.index.desc.name)); @@ -383,10 +383,10 @@ void sist2_scan(scan_args_t *args) { load_incremental_index(args); } - ScanCtx.pool = tpool_create(args->threads, thread_cleanup, TRUE, TRUE); + ScanCtx.pool = tpool_create(ScanCtx.threads, thread_cleanup, TRUE, TRUE, ScanCtx.mem_limit); tpool_start(ScanCtx.pool); - ScanCtx.writer_pool = tpool_create(1, writer_cleanup, TRUE, FALSE); + ScanCtx.writer_pool = tpool_create(1, writer_cleanup, TRUE, FALSE, 0); tpool_start(ScanCtx.writer_pool); if (args->list_path) { @@ -467,7 +467,7 @@ void sist2_index(index_args_t *args) { f = index_json; } - IndexCtx.pool = tpool_create(args->threads, elastic_cleanup, FALSE, args->print == 0); + IndexCtx.pool = tpool_create(args->threads, elastic_cleanup, FALSE, args->print == 0, 0); tpool_start(IndexCtx.pool); READ_INDICES(file_path, args->index_path, { @@ -587,7 +587,7 @@ int main(int argc, const char *argv[]) { OPT_GROUP("Scan options"), OPT_INTEGER('t', "threads", &common_threads, "Number of threads. DEFAULT=1"), - OPT_STRING(0, "mem-throttle", &scan_args->throttle_memory_threshold, "Total memory threshold in MB for scan throttling. DEFAULT=0"), + OPT_STRING(0, "mem-throttle", &scan_args->scan_mem_limit, "Total memory threshold in MB for scan throttling. DEFAULT=0"), OPT_FLOAT('q', "quality", &scan_args->quality, "Thumbnail quality, on a scale of 1.0 to 31.0, 1.0 being the best. DEFAULT=3"), OPT_INTEGER(0, "size", &scan_args->size, diff --git a/src/tpool.c b/src/tpool.c index a10f2e8..3419f85 100644 --- a/src/tpool.c +++ b/src/tpool.c @@ -29,6 +29,7 @@ typedef struct tpool { int done_cnt; int busy_cnt; int throttle_stuck_cnt; + size_t mem_limit; int free_arg; int stop; @@ -171,7 +172,7 @@ static void *tpool_worker(void *arg) { if (work != NULL) { stuck_notified = 0; - while(!pool->stop && ScanCtx.mem_limit > 0 && _get_total_mem() >= ScanCtx.mem_limit) { + while(!pool->stop && pool->mem_limit > 0 && _get_total_mem() >= pool->mem_limit) { if (!stuck_notified && throttle_ms >= 90000) { // notify the pool that this thread is stuck. pthread_mutex_lock(&(pool->work_mutex)); @@ -299,7 +300,7 @@ void tpool_destroy(tpool_t *pool) { * Create a thread pool * @param thread_cnt Worker threads count */ -tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int print_progress) { +tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int print_progress, size_t mem_limit) { tpool_t *pool = malloc(sizeof(tpool_t)); pool->thread_cnt = thread_cnt; @@ -307,6 +308,7 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int pri pool->done_cnt = 0; pool->busy_cnt = 0; pool->throttle_stuck_cnt = 0; + pool->mem_limit = mem_limit; pool->stop = FALSE; pool->free_arg = free_arg; pool->cleanup_func = cleanup_func; diff --git a/src/tpool.h b/src/tpool.h index 6c84a93..71d202f 100644 --- a/src/tpool.h +++ b/src/tpool.h @@ -8,7 +8,7 @@ typedef struct tpool tpool_t; typedef void (*thread_func_t)(void *arg); -tpool_t *tpool_create(int num, void (*cleanup_func)(), int free_arg, int print_progress); +tpool_t *tpool_create(int num, void (*cleanup_func)(), int free_arg, int print_progress, size_t mem_limit); void tpool_start(tpool_t *pool); void tpool_destroy(tpool_t *pool); From d1f13f2c84a80385bc6d9d2d39c23faf28228a78 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Thu, 27 Jan 2022 20:41:01 +0800 Subject: [PATCH 5/7] stop scanning gracefully if memory limit target cannot be met --- src/tpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tpool.c b/src/tpool.c index 3419f85..9ecce2c 100644 --- a/src/tpool.c +++ b/src/tpool.c @@ -178,7 +178,7 @@ static void *tpool_worker(void *arg) { pthread_mutex_lock(&(pool->work_mutex)); pool->throttle_stuck_cnt += 1; if (pool->throttle_stuck_cnt == pool->thread_cnt) { - LOG_FATAL("tpool.c", "Throttle memory limit too low, cannot proceed!"); + LOG_ERROR("tpool.c", "Throttle memory limit too low, cannot proceed!"); pool->stop = TRUE; } pthread_mutex_unlock(&(pool->work_mutex)); From 68252b4e80ebf72de959444c3d4a6d2cd6f72043 Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Thu, 27 Jan 2022 20:49:40 +0800 Subject: [PATCH 6/7] query page size on tpool creation --- src/tpool.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/tpool.c b/src/tpool.c index 9ecce2c..e5ba3b4 100644 --- a/src/tpool.c +++ b/src/tpool.c @@ -30,6 +30,7 @@ typedef struct tpool { int busy_cnt; int throttle_stuck_cnt; size_t mem_limit; + size_t page_size; int free_arg; int stop; @@ -120,7 +121,7 @@ int tpool_add_work(tpool_t *pool, thread_func_t func, void *arg) { * see: https://github.com/htop-dev/htop/blob/f782f821f7f8081cb43bbad1c37f32830a260a81/linux/LinuxProcessList.c */ __always_inline -static size_t _get_total_mem() { +static size_t _get_total_mem(tpool_t* pool) { FILE* statmfile = fopen("/proc/self/statm", "r"); if (!statmfile) return 0; @@ -139,7 +140,7 @@ static size_t _get_total_mem() { fclose(statmfile); if (r == 7) { - return m_resident * 4096; // XXX assume 4KB pages. + return m_resident * pool->page_size; } else { return 0; } @@ -172,7 +173,7 @@ static void *tpool_worker(void *arg) { if (work != NULL) { stuck_notified = 0; - while(!pool->stop && pool->mem_limit > 0 && _get_total_mem() >= pool->mem_limit) { + while(!pool->stop && pool->mem_limit > 0 && _get_total_mem(pool) >= pool->mem_limit) { if (!stuck_notified && throttle_ms >= 90000) { // notify the pool that this thread is stuck. pthread_mutex_lock(&(pool->work_mutex)); @@ -314,6 +315,7 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int pri pool->cleanup_func = cleanup_func; pool->threads = calloc(sizeof(pthread_t), thread_cnt); pool->print_progress = print_progress; + pool->page_size = getpagesize(); pthread_mutex_init(&(pool->work_mutex), NULL); From 9a6e7c7c47bbed2fa2f2bd966912c82cd9d65f2f Mon Sep 17 00:00:00 2001 From: Yatao Li Date: Fri, 28 Jan 2022 02:33:19 +0800 Subject: [PATCH 7/7] reset throttle timer for each work item --- src/tpool.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tpool.c b/src/tpool.c index e5ba3b4..970d8c3 100644 --- a/src/tpool.c +++ b/src/tpool.c @@ -173,6 +173,7 @@ static void *tpool_worker(void *arg) { if (work != NULL) { stuck_notified = 0; + throttle_ms = 0; while(!pool->stop && pool->mem_limit > 0 && _get_total_mem(pool) >= pool->mem_limit) { if (!stuck_notified && throttle_ms >= 90000) { // notify the pool that this thread is stuck.