process pool mostly works, still WIP

This commit is contained in:
2023-03-09 22:11:21 -05:00
parent 8c662bb8f8
commit f8abffba81
25 changed files with 1219 additions and 267 deletions

View File

@@ -2,13 +2,14 @@
#include "ctx.h"
#include "sist.h"
#include <pthread.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include "mempool/mempool.h"
#define MAX_QUEUE_SIZE 1000000
typedef void (*thread_func_t)(void *arg);
#define MAX_QUEUE_SIZE 5000
typedef struct tpool_work {
void *arg;
tpool_work_arg_shm_t *arg;
thread_func_t func;
struct tpool_work *next;
} tpool_work_t;
@@ -18,11 +19,12 @@ typedef struct tpool {
tpool_work_t *work_tail;
pthread_mutex_t work_mutex;
pthread_mutex_t mem_mutex;
pthread_cond_t has_work_cond;
pthread_cond_t working_cond;
pthread_t *threads;
pthread_t threads[256];
int thread_cnt;
int work_cnt;
@@ -32,28 +34,46 @@ typedef struct tpool {
size_t mem_limit;
size_t page_size;
int free_arg;
int stop;
int waiting;
int print_progress;
void (*cleanup_func)();
// =========
void *shared_memory;
size_t shared_memory_size;
ncx_slab_pool_t *mempool;
} tpool_t;
/**
* Create a work object
*/
static tpool_work_t *tpool_work_create(thread_func_t func, void *arg) {
static tpool_work_t *tpool_work_create(tpool_t *pool, thread_func_t func, tpool_work_arg_t *arg) {
if (func == NULL) {
return NULL;
}
tpool_work_t *work = malloc(sizeof(tpool_work_t));
// Copy heap arg to shm arg
pthread_mutex_lock(&pool->mem_mutex);
tpool_work_arg_shm_t *shm_arg = ncx_slab_alloc(pool->mempool, sizeof(tpool_work_arg_shm_t) + arg->arg_size);
shm_arg->arg_size = arg->arg_size;
memcpy(shm_arg->arg, arg->arg, arg->arg_size);
free(arg->arg);
tpool_work_t *work = ncx_slab_alloc(pool->mempool, sizeof(tpool_work_t));
pthread_mutex_unlock(&pool->mem_mutex);
work->func = func;
work->arg = arg;
work->arg = shm_arg;
work->next = NULL;
return work;
@@ -90,16 +110,15 @@ static tpool_work_t *tpool_work_get(tpool_t *pool) {
/**
* Push work object to thread pool
*/
int tpool_add_work(tpool_t *pool, thread_func_t func, void *arg) {
tpool_work_t *work = tpool_work_create(func, arg);
if (work == NULL) {
return 0;
}
int tpool_add_work(tpool_t *pool, thread_func_t func, tpool_work_arg_t *arg) {
while ((pool->work_cnt - pool->done_cnt) >= MAX_QUEUE_SIZE) {
usleep(10000);
}
tpool_work_t *work = tpool_work_create(pool, func, arg);
if (work == NULL) {
return 0;
}
pthread_mutex_lock(&(pool->work_mutex));
if (pool->work_head == NULL) {
@@ -118,127 +137,92 @@ int tpool_add_work(tpool_t *pool, thread_func_t func, void *arg) {
return 1;
}
/**
* see: https://github.com/htop-dev/htop/blob/f782f821f7f8081cb43bbad1c37f32830a260a81/linux/LinuxProcessList.c
*/
__always_inline
static size_t _get_total_mem(tpool_t *pool) {
FILE *statmfile = fopen("/proc/self/statm", "r");
if (!statmfile)
return 0;
long int dummy, dummy2, dummy3, dummy4, dummy5, dummy6;
long int m_resident;
int r = fscanf(statmfile, "%ld %ld %ld %ld %ld %ld %ld",
&dummy, /* m_virt */
&m_resident,
&dummy2, /* m_share */
&dummy3, /* m_trs */
&dummy4, /* unused since Linux 2.6; always 0 */
&dummy5, /* m_drs */
&dummy6); /* unused since Linux 2.6; always 0 */
fclose(statmfile);
if (r == 7) {
return m_resident * pool->page_size;
} else {
return 0;
}
}
/**
* Thread worker function
*/
static void *tpool_worker(void *arg) {
tpool_t *pool = arg;
int stuck_notified = 0;
int throttle_ms = 0;
while (TRUE) {
pthread_mutex_lock(&pool->work_mutex);
if (pool->stop) {
break;
}
if (pool->work_head == NULL) {
pthread_cond_wait(&(pool->has_work_cond), &(pool->work_mutex));
}
tpool_work_t *work = tpool_work_get(pool);
if (work != NULL) {
pool->busy_cnt += 1;
}
pthread_mutex_unlock(&(pool->work_mutex));
if (work != NULL) {
stuck_notified = 0;
throttle_ms = 0;
while (!pool->stop && pool->mem_limit > 0 && _get_total_mem(pool) >= pool->mem_limit) {
if (!stuck_notified && throttle_ms >= 90000) {
// notify the pool that this thread is stuck.
pthread_mutex_lock(&(pool->work_mutex));
pool->throttle_stuck_cnt += 1;
if (pool->throttle_stuck_cnt == pool->thread_cnt) {
LOG_ERROR("tpool.c", "Throttle memory limit too low, cannot proceed!");
pool->stop = TRUE;
}
pthread_mutex_unlock(&(pool->work_mutex));
stuck_notified = 1;
}
usleep(10000);
throttle_ms += 10;
}
int pid = fork();
if (pid == 0) {
while (TRUE) {
pthread_mutex_lock(&pool->work_mutex);
if (pool->stop) {
break;
}
// we are not stuck anymore. cancel our notification.
if (stuck_notified) {
pthread_mutex_lock(&(pool->work_mutex));
pool->throttle_stuck_cnt -= 1;
pthread_mutex_unlock(&(pool->work_mutex));
if (pool->work_head == NULL) {
pthread_cond_wait(&(pool->has_work_cond), &(pool->work_mutex));
}
work->func(work->arg);
if (pool->free_arg) {
free(work->arg);
tpool_work_t *work = tpool_work_get(pool);
if (work != NULL) {
pool->busy_cnt += 1;
}
free(work);
pthread_mutex_unlock(&(pool->work_mutex));
if (work != NULL) {
if (pool->stop) {
break;
}
work->func(work->arg);
pthread_mutex_lock(&pool->mem_mutex);
ncx_slab_free(pool->mempool, work->arg);
ncx_slab_free(pool->mempool, work);
pthread_mutex_unlock(&pool->mem_mutex);
}
pthread_mutex_lock(&(pool->work_mutex));
if (work != NULL) {
pool->busy_cnt -= 1;
pool->done_cnt++;
}
if (pool->print_progress) {
if (LogCtx.json_logs) {
progress_bar_print_json(pool->done_cnt, pool->work_cnt, ScanCtx.stat_tn_size,
ScanCtx.stat_index_size, pool->waiting);
} else {
progress_bar_print((double) pool->done_cnt / pool->work_cnt, ScanCtx.stat_tn_size,
ScanCtx.stat_index_size);
}
}
if (pool->work_head == NULL) {
pthread_cond_signal(&(pool->working_cond));
}
pthread_mutex_unlock(&(pool->work_mutex));
}
pthread_mutex_lock(&(pool->work_mutex));
if (work != NULL) {
if (pool->cleanup_func != NULL) {
LOG_INFO("tpool.c", "Executing cleanup function")
pool->cleanup_func();
LOG_DEBUG("tpool.c", "Done executing cleanup function")
}
pthread_cond_signal(&(pool->working_cond));
pthread_mutex_unlock(&(pool->work_mutex));
exit(0);
} else {
int status;
waitpid(pid, &status, 0);
LOG_ERRORF("tpool.c", "child processed terminated with status code %d, signal=%d", WEXITSTATUS(status), WIFSTOPPED(status) ? WSTOPSIG(status) : -1)
if (WIFSTOPPED(status)) {
pthread_mutex_lock(&(pool->work_mutex));
pool->busy_cnt -= 1;
pool->done_cnt++;
pthread_mutex_unlock(&(pool->work_mutex));
}
if (pool->print_progress) {
if (LogCtx.json_logs) {
progress_bar_print_json(pool->done_cnt, pool->work_cnt, ScanCtx.stat_tn_size,
ScanCtx.stat_index_size, pool->waiting);
} else {
progress_bar_print((double) pool->done_cnt / pool->work_cnt, ScanCtx.stat_tn_size,
ScanCtx.stat_index_size);
}
}
if (pool->work_head == NULL) {
pthread_cond_signal(&(pool->working_cond));
}
pthread_mutex_unlock(&(pool->work_mutex));
}
if (pool->cleanup_func != NULL) {
LOG_INFO("tpool.c", "Executing cleanup function")
pool->cleanup_func();
LOG_DEBUG("tpool.c", "Done executing cleanup function")
}
pthread_cond_signal(&(pool->working_cond));
pthread_mutex_unlock(&(pool->work_mutex));
return NULL;
}
@@ -304,17 +288,32 @@ void tpool_destroy(tpool_t *pool) {
pthread_cond_destroy(&(pool->has_work_cond));
pthread_cond_destroy(&(pool->working_cond));
free(pool->threads);
free(pool);
munmap(pool->shared_memory, pool->shared_memory_size);
}
/**
* Create a thread pool
* @param thread_cnt Worker threads count
*/
tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int print_progress, size_t mem_limit) {
tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress, size_t mem_limit) {
// =============
size_t shm_size = 1024 * 1024 * 2000;
void *shared_memory = mmap(NULL, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
tpool_t *pool = (tpool_t *) shared_memory;
pool->shared_memory = shared_memory;
pool->shared_memory_size = shm_size;
pool->mempool = (ncx_slab_pool_t *) (pool->shared_memory + sizeof(tpool_t));
pool->mempool->addr = pool->mempool;
pool->mempool->min_shift = 4;
pool->mempool->end = pool->shared_memory + shm_size;
ncx_slab_init(pool->mempool);
// =============
tpool_t *pool = malloc(sizeof(tpool_t));
pool->thread_cnt = thread_cnt;
pool->work_cnt = 0;
pool->done_cnt = 0;
@@ -323,16 +322,24 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int pri
pool->mem_limit = mem_limit;
pool->stop = FALSE;
pool->waiting = FALSE;
pool->free_arg = free_arg;
pool->cleanup_func = cleanup_func;
pool->threads = calloc(sizeof(pthread_t), thread_cnt);
memset(pool->threads, 0, sizeof(pool->threads));
pool->print_progress = print_progress;
pool->page_size = getpagesize();
pthread_mutex_init(&(pool->work_mutex), NULL);
pthread_mutexattr_t mutexattr;
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_setpshared(&mutexattr, TRUE);
pthread_cond_init(&(pool->has_work_cond), NULL);
pthread_cond_init(&(pool->working_cond), NULL);
pthread_mutex_init(&(pool->work_mutex), &mutexattr);
pthread_mutex_init(&(pool->mem_mutex), &mutexattr);
pthread_condattr_t condattr;
pthread_condattr_init(&condattr);
pthread_condattr_setpshared(&condattr, TRUE);
pthread_cond_init(&(pool->has_work_cond), &condattr);
pthread_cond_init(&(pool->working_cond),&condattr);
pool->work_head = NULL;
pool->work_tail = NULL;