mirror of
https://github.com/simon987/sist2.git
synced 2025-12-19 18:24:54 +00:00
Still WIP..
This commit is contained in:
151
src/tpool.c
151
src/tpool.c
@@ -6,7 +6,9 @@
|
||||
#include <sys/wait.h>
|
||||
#include "mempool/mempool.h"
|
||||
|
||||
#define MAX_QUEUE_SIZE 5000
|
||||
#define BLANK_STR " "
|
||||
// TODO: Use slab OOM to control queue size
|
||||
#define MAX_QUEUE_SIZE 100000
|
||||
|
||||
typedef struct tpool_work {
|
||||
tpool_work_arg_shm_t *arg;
|
||||
@@ -21,6 +23,7 @@ typedef struct tpool {
|
||||
pthread_mutex_t work_mutex;
|
||||
pthread_mutex_t mem_mutex;
|
||||
|
||||
// TODO: Initialize with SHARED attr
|
||||
pthread_cond_t has_work_cond;
|
||||
pthread_cond_t working_cond;
|
||||
|
||||
@@ -30,9 +33,6 @@ typedef struct tpool {
|
||||
int work_cnt;
|
||||
int done_cnt;
|
||||
int busy_cnt;
|
||||
int throttle_stuck_cnt;
|
||||
size_t mem_limit;
|
||||
size_t page_size;
|
||||
|
||||
int stop;
|
||||
int waiting;
|
||||
@@ -41,8 +41,6 @@ typedef struct tpool {
|
||||
|
||||
void (*cleanup_func)();
|
||||
|
||||
// =========
|
||||
|
||||
void *shared_memory;
|
||||
size_t shared_memory_size;
|
||||
ncx_slab_pool_t *mempool;
|
||||
@@ -137,6 +135,61 @@ int tpool_add_work(tpool_t *pool, thread_func_t func, tpool_work_arg_t *arg) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void worker_thread_loop(tpool_t *pool) {
|
||||
while (TRUE) {
|
||||
pthread_mutex_lock(&pool->work_mutex);
|
||||
if (pool->stop) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pool->work_head == NULL) {
|
||||
pthread_cond_wait(&(pool->has_work_cond), &(pool->work_mutex));
|
||||
}
|
||||
|
||||
tpool_work_t *work = tpool_work_get(pool);
|
||||
|
||||
if (work != NULL) {
|
||||
pool->busy_cnt += 1;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&(pool->work_mutex));
|
||||
|
||||
if (work != NULL) {
|
||||
if (pool->stop) {
|
||||
break;
|
||||
}
|
||||
|
||||
work->func(work->arg);
|
||||
|
||||
pthread_mutex_lock(&pool->mem_mutex);
|
||||
ncx_slab_free(pool->mempool, work->arg);
|
||||
ncx_slab_free(pool->mempool, work);
|
||||
pthread_mutex_unlock(&pool->mem_mutex);
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&(pool->work_mutex));
|
||||
if (work != NULL) {
|
||||
pool->busy_cnt -= 1;
|
||||
pool->done_cnt++;
|
||||
}
|
||||
|
||||
if (pool->print_progress) {
|
||||
if (LogCtx.json_logs) {
|
||||
progress_bar_print_json(pool->done_cnt, pool->work_cnt, ScanCtx.stat_tn_size,
|
||||
ScanCtx.stat_index_size, pool->waiting);
|
||||
} else {
|
||||
progress_bar_print((double) pool->done_cnt / pool->work_cnt, ScanCtx.stat_tn_size,
|
||||
ScanCtx.stat_index_size);
|
||||
}
|
||||
}
|
||||
|
||||
if (pool->work_head == NULL) {
|
||||
pthread_cond_signal(&(pool->working_cond));
|
||||
}
|
||||
pthread_mutex_unlock(&(pool->work_mutex));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread worker function
|
||||
*/
|
||||
@@ -146,58 +199,8 @@ static void *tpool_worker(void *arg) {
|
||||
int pid = fork();
|
||||
|
||||
if (pid == 0) {
|
||||
while (TRUE) {
|
||||
pthread_mutex_lock(&pool->work_mutex);
|
||||
if (pool->stop) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pool->work_head == NULL) {
|
||||
pthread_cond_wait(&(pool->has_work_cond), &(pool->work_mutex));
|
||||
}
|
||||
|
||||
tpool_work_t *work = tpool_work_get(pool);
|
||||
|
||||
if (work != NULL) {
|
||||
pool->busy_cnt += 1;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&(pool->work_mutex));
|
||||
|
||||
if (work != NULL) {
|
||||
if (pool->stop) {
|
||||
break;
|
||||
}
|
||||
|
||||
work->func(work->arg);
|
||||
|
||||
pthread_mutex_lock(&pool->mem_mutex);
|
||||
ncx_slab_free(pool->mempool, work->arg);
|
||||
ncx_slab_free(pool->mempool, work);
|
||||
pthread_mutex_unlock(&pool->mem_mutex);
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&(pool->work_mutex));
|
||||
if (work != NULL) {
|
||||
pool->busy_cnt -= 1;
|
||||
pool->done_cnt++;
|
||||
}
|
||||
|
||||
if (pool->print_progress) {
|
||||
if (LogCtx.json_logs) {
|
||||
progress_bar_print_json(pool->done_cnt, pool->work_cnt, ScanCtx.stat_tn_size,
|
||||
ScanCtx.stat_index_size, pool->waiting);
|
||||
} else {
|
||||
progress_bar_print((double) pool->done_cnt / pool->work_cnt, ScanCtx.stat_tn_size,
|
||||
ScanCtx.stat_index_size);
|
||||
}
|
||||
}
|
||||
|
||||
if (pool->work_head == NULL) {
|
||||
pthread_cond_signal(&(pool->working_cond));
|
||||
}
|
||||
pthread_mutex_unlock(&(pool->work_mutex));
|
||||
}
|
||||
worker_thread_loop(pool);
|
||||
|
||||
if (pool->cleanup_func != NULL) {
|
||||
LOG_INFO("tpool.c", "Executing cleanup function")
|
||||
@@ -211,15 +214,27 @@ static void *tpool_worker(void *arg) {
|
||||
|
||||
} else {
|
||||
int status;
|
||||
// TODO: On crash, print debug info and resume thread
|
||||
waitpid(pid, &status, 0);
|
||||
|
||||
LOG_ERRORF("tpool.c", "child processed terminated with status code %d, signal=%d", WEXITSTATUS(status), WIFSTOPPED(status) ? WSTOPSIG(status) : -1)
|
||||
LOG_DEBUGF("tpool.c", "Child process terminated with status code %d", WEXITSTATUS(status))
|
||||
|
||||
if (WIFSTOPPED(status)) {
|
||||
pthread_mutex_lock(&(pool->work_mutex));
|
||||
pool->busy_cnt -= 1;
|
||||
pool->done_cnt++;
|
||||
pthread_mutex_unlock(&(pool->work_mutex));
|
||||
pthread_mutex_lock(&(pool->work_mutex));
|
||||
pool->busy_cnt -= 1;
|
||||
pool->done_cnt++;
|
||||
pthread_mutex_unlock(&(pool->work_mutex));
|
||||
|
||||
if (WIFSIGNALED(status)) {
|
||||
// parse_job_t *job = g_hash_table_lookup(ScanCtx.dbg_current_files, GINT_TO_POINTER(pthread_self()));
|
||||
const char *job_filepath = "TODO";
|
||||
|
||||
LOG_FATALF_NO_EXIT(
|
||||
"tpool.c",
|
||||
"Child process was terminated by signal (%s).\n"
|
||||
BLANK_STR "The process was working on %s",
|
||||
strsignal(WTERMSIG(status)),
|
||||
job_filepath
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -295,9 +310,8 @@ void tpool_destroy(tpool_t *pool) {
|
||||
* Create a thread pool
|
||||
* @param thread_cnt Worker threads count
|
||||
*/
|
||||
tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress, size_t mem_limit) {
|
||||
tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress) {
|
||||
|
||||
// =============
|
||||
size_t shm_size = 1024 * 1024 * 2000;
|
||||
|
||||
void *shared_memory = mmap(NULL, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
|
||||
@@ -312,20 +326,15 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress, s
|
||||
|
||||
ncx_slab_init(pool->mempool);
|
||||
|
||||
// =============
|
||||
|
||||
pool->thread_cnt = thread_cnt;
|
||||
pool->work_cnt = 0;
|
||||
pool->done_cnt = 0;
|
||||
pool->busy_cnt = 0;
|
||||
pool->throttle_stuck_cnt = 0;
|
||||
pool->mem_limit = mem_limit;
|
||||
pool->stop = FALSE;
|
||||
pool->waiting = FALSE;
|
||||
pool->cleanup_func = cleanup_func;
|
||||
memset(pool->threads, 0, sizeof(pool->threads));
|
||||
pool->print_progress = print_progress;
|
||||
pool->page_size = getpagesize();
|
||||
|
||||
pthread_mutexattr_t mutexattr;
|
||||
pthread_mutexattr_init(&mutexattr);
|
||||
@@ -339,7 +348,7 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress, s
|
||||
pthread_condattr_setpshared(&condattr, TRUE);
|
||||
|
||||
pthread_cond_init(&(pool->has_work_cond), &condattr);
|
||||
pthread_cond_init(&(pool->working_cond),&condattr);
|
||||
pthread_cond_init(&(pool->working_cond), &condattr);
|
||||
|
||||
pool->work_head = NULL;
|
||||
pool->work_tail = NULL;
|
||||
|
||||
Reference in New Issue
Block a user