use sqlite to save index, major thread pool refactor

This commit is contained in:
2023-04-03 21:39:50 -04:00
parent ca973d63a4
commit fc36f33d52
62 changed files with 3630 additions and 4673 deletions

View File

@@ -4,257 +4,250 @@
#include <pthread.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include "mempool/mempool.h"
#include "parsing/parse.h"
#define BLANK_STR " "
// TODO: Use slab OOM to control queue size
#define MAX_QUEUE_SIZE 100000
typedef struct tpool_work {
tpool_work_arg_shm_t *arg;
thread_func_t func;
struct tpool_work *next;
} tpool_work_t;
typedef struct {
int thread_id;
tpool_t *pool;
} start_thread_arg_t;
typedef struct tpool {
tpool_work_t *work_head;
tpool_work_t *work_tail;
pthread_mutex_t work_mutex;
pthread_mutex_t mem_mutex;
// TODO: Initialize with SHARED attr
pthread_cond_t has_work_cond;
pthread_cond_t working_cond;
pthread_t threads[256];
int thread_cnt;
int work_cnt;
int done_cnt;
int busy_cnt;
int stop;
int waiting;
int num_threads;
int fork;
int print_progress;
void (*cleanup_func)();
void *shared_memory;
size_t shared_memory_size;
ncx_slab_pool_t *mempool;
struct {
job_type_t job_type;
int stop;
int waiting;
database_ipc_ctx_t ipc_ctx;
pthread_mutex_t mutex;
pthread_mutex_t data_mutex;
pthread_cond_t done_working_cond;
pthread_cond_t workers_initialized_cond;
int busy_count;
int initialized_count;
} *shm;
} tpool_t;
/**
* Create a work object
*/
static tpool_work_t *tpool_work_create(tpool_t *pool, thread_func_t func, tpool_work_arg_t *arg) {
if (func == NULL) {
return NULL;
void job_destroy(job_t *job) {
if (job->type == JOB_PARSE_JOB) {
free(job->parse_job);
}
// Copy heap arg to shm arg
pthread_mutex_lock(&pool->mem_mutex);
tpool_work_arg_shm_t *shm_arg = ncx_slab_alloc(pool->mempool, sizeof(tpool_work_arg_shm_t) + arg->arg_size);
shm_arg->arg_size = arg->arg_size;
memcpy(shm_arg->arg, arg->arg, arg->arg_size);
free(arg->arg);
tpool_work_t *work = ncx_slab_alloc(pool->mempool, sizeof(tpool_work_t));
pthread_mutex_unlock(&pool->mem_mutex);
work->func = func;
work->arg = shm_arg;
work->next = NULL;
return work;
free(job);
}
void tpool_dump_debug_info(tpool_t *pool) {
LOG_DEBUGF("tpool.c", "pool->thread_cnt = %d", pool->thread_cnt)
LOG_DEBUGF("tpool.c", "pool->work_cnt = %d", pool->work_cnt)
LOG_DEBUGF("tpool.c", "pool->done_cnt = %d", pool->done_cnt)
LOG_DEBUGF("tpool.c", "pool->busy_cnt = %d", pool->busy_cnt)
LOG_DEBUGF("tpool.c", "pool->stop = %d", pool->stop)
}
/**
* Pop work object from thread pool
*/
static tpool_work_t *tpool_work_get(tpool_t *pool) {
tpool_work_t *work = pool->work_head;
if (work == NULL) {
return NULL;
}
if (work->next == NULL) {
pool->work_head = NULL;
pool->work_tail = NULL;
} else {
pool->work_head = work->next;
}
return work;
// TODO
LOG_DEBUGF("tpool.c", "pool->num_threads = %d", pool->num_threads);
}
/**
* Push work object to thread pool
*/
int tpool_add_work(tpool_t *pool, thread_func_t func, tpool_work_arg_t *arg) {
int tpool_add_work(tpool_t *pool, job_t *job) {
while ((pool->work_cnt - pool->done_cnt) >= MAX_QUEUE_SIZE) {
usleep(10000);
}
tpool_work_t *work = tpool_work_create(pool, func, arg);
if (work == NULL) {
return 0;
if (pool->shm->job_type == JOB_UNDEFINED) {
pool->shm->job_type = job->type;
} else if (pool->shm->job_type != job->type) {
LOG_FATAL("tpool.c", "FIXME: tpool cannot queue jobs with different types!");
}
pthread_mutex_lock(&(pool->work_mutex));
if (pool->work_head == NULL) {
pool->work_head = work;
pool->work_tail = pool->work_head;
} else {
pool->work_tail->next = work;
pool->work_tail = work;
}
database_add_work(ProcData.ipc_db, job);
pool->work_cnt++;
pthread_cond_broadcast(&(pool->has_work_cond));
pthread_mutex_unlock(&(pool->work_mutex));
return 1;
return TRUE;
}
static void worker_thread_loop(tpool_t *pool) {
while (TRUE) {
pthread_mutex_lock(&pool->work_mutex);
if (pool->stop) {
if (pool->shm->stop) {
break;
}
if (pool->work_head == NULL) {
pthread_cond_wait(&(pool->has_work_cond), &(pool->work_mutex));
if (pool->shm->job_type == JOB_UNDEFINED) {
// Wait before first job is queued
pthread_mutex_lock(&pool->shm->mutex);
pthread_cond_timedwait_ms(&pool->shm->ipc_ctx.has_work_cond, &pool->shm->mutex, 1000);
pthread_mutex_unlock(&pool->shm->mutex);
}
tpool_work_t *work = tpool_work_get(pool);
job_t *job = database_get_work(ProcData.ipc_db, pool->shm->job_type);
if (work != NULL) {
pool->busy_cnt += 1;
}
if (job != NULL) {
pthread_mutex_lock(&(pool->shm->data_mutex));
pool->shm->busy_count += 1;
pthread_mutex_unlock(&(pool->shm->data_mutex));
pthread_mutex_unlock(&(pool->work_mutex));
if (work != NULL) {
if (pool->stop) {
if (pool->shm->stop) {
break;
}
work->func(work->arg);
if (job->type == JOB_PARSE_JOB) {
parse(job->parse_job);
} else if (job->type == JOB_BULK_LINE) {
elastic_index_line(job->bulk_line);
}
pthread_mutex_lock(&pool->mem_mutex);
ncx_slab_free(pool->mempool, work->arg);
ncx_slab_free(pool->mempool, work);
pthread_mutex_unlock(&pool->mem_mutex);
}
job_destroy(job);
pthread_mutex_lock(&(pool->work_mutex));
if (work != NULL) {
pool->busy_cnt -= 1;
pool->done_cnt++;
pthread_mutex_lock(&(pool->shm->data_mutex));
pool->shm->busy_count -= 1;
pthread_mutex_unlock(&(pool->shm->data_mutex));
pthread_mutex_lock(&(pool->shm->ipc_ctx.mutex));
pool->shm->ipc_ctx.completed_job_count += 1;
pthread_mutex_unlock(&(pool->shm->ipc_ctx.mutex));
}
if (pool->print_progress) {
int done = pool->shm->ipc_ctx.completed_job_count;
int count = pool->shm->ipc_ctx.completed_job_count + pool->shm->ipc_ctx.job_count;
if (LogCtx.json_logs) {
progress_bar_print_json(pool->done_cnt, pool->work_cnt, ScanCtx.stat_tn_size,
ScanCtx.stat_index_size, pool->waiting);
progress_bar_print_json(done,
count,
ScanCtx.stat_tn_size,
ScanCtx.stat_index_size, pool->shm->waiting);
} else {
progress_bar_print((double) pool->done_cnt / pool->work_cnt, ScanCtx.stat_tn_size,
ScanCtx.stat_index_size);
progress_bar_print((double) done / count,
ScanCtx.stat_tn_size, ScanCtx.stat_index_size);
}
}
if (pool->work_head == NULL) {
pthread_cond_signal(&(pool->working_cond));
if (job == NULL) {
pthread_mutex_lock(&pool->shm->mutex);
pthread_cond_signal(&pool->shm->done_working_cond);
pthread_mutex_unlock(&pool->shm->mutex);
}
pthread_mutex_unlock(&(pool->work_mutex));
}
}
static void worker_proc_init(tpool_t *pool, int thread_id) {
// TODO create PID -> thread_id mapping for signal handler
ProcData.thread_id = thread_id;
if (ScanCtx.index.path[0] != '\0') {
// TODO This should be closed in proc cleanup function
ProcData.index_db = database_create(ScanCtx.index.path, INDEX_DATABASE);
ProcData.index_db->ipc_ctx = &pool->shm->ipc_ctx;
database_open(ProcData.index_db);
}
// TODO /dev/shm
pthread_mutex_lock(&pool->shm->mutex);
ProcData.ipc_db = database_create("/dev/shm/ipc.sist2", IPC_CONSUMER_DATABASE);
ProcData.ipc_db->ipc_ctx = &pool->shm->ipc_ctx;
database_open(ProcData.ipc_db);
pthread_mutex_unlock(&pool->shm->mutex);
}
void worker_proc_cleanup(tpool_t* pool) {
if (ProcData.index_db != NULL) {
database_close(ProcData.index_db, FALSE);
}
database_close(ProcData.ipc_db, FALSE);
}
/**
* Thread worker function
*/
static void *tpool_worker(void *arg) {
tpool_t *pool = arg;
tpool_t *pool = ((start_thread_arg_t *) arg)->pool;
int pid = fork();
if (pool->fork) {
while (TRUE) {
int pid = fork();
if (pid == 0) {
if (pid == 0) {
worker_proc_init(pool, ((start_thread_arg_t *) arg)->thread_id);
pthread_mutex_lock(&pool->shm->mutex);
pthread_cond_signal(&pool->shm->workers_initialized_cond);
pool->shm->initialized_count += 1;
pthread_mutex_unlock(&pool->shm->mutex);
worker_thread_loop(pool);
pthread_mutex_lock(&pool->shm->mutex);
pthread_cond_signal(&pool->shm->done_working_cond);
pthread_mutex_unlock(&pool->shm->mutex);
worker_proc_cleanup(pool);
exit(0);
} else {
int status;
// TODO: On crash, print debug info and resume thread
waitpid(pid, &status, 0);
LOG_DEBUGF("tpool.c", "Child process terminated with status code %d", WEXITSTATUS(status));
pthread_mutex_lock(&(pool->shm->ipc_ctx.mutex));
pool->shm->ipc_ctx.completed_job_count += 1;
pthread_mutex_unlock(&(pool->shm->ipc_ctx.mutex));
pthread_mutex_lock(&(pool->shm->data_mutex));
pool->shm->busy_count -= 1;
pthread_mutex_unlock(&(pool->shm->data_mutex));
if (WIFSIGNALED(status)) {
// TODO: Get current_job based on PID
const char *job_filepath = "TODO";
LOG_FATALF_NO_EXIT(
"tpool.c",
"Child process was terminated by signal (%s).\n"
BLANK_STR "The process was working on %s",
strsignal(WTERMSIG(status)),
job_filepath
);
}
break;
}
}
} else {
worker_proc_init(pool, ((start_thread_arg_t *) arg)->thread_id);
pthread_mutex_lock(&pool->shm->mutex);
pthread_cond_signal(&pool->shm->workers_initialized_cond);
pool->shm->initialized_count += 1;
pthread_mutex_unlock(&pool->shm->mutex);
worker_thread_loop(pool);
if (pool->cleanup_func != NULL) {
LOG_INFO("tpool.c", "Executing cleanup function")
pool->cleanup_func();
LOG_DEBUG("tpool.c", "Done executing cleanup function")
}
pthread_mutex_lock(&pool->shm->mutex);
pthread_cond_signal(&pool->shm->done_working_cond);
pthread_mutex_unlock(&pool->shm->mutex);
pthread_cond_signal(&(pool->working_cond));
pthread_mutex_unlock(&(pool->work_mutex));
exit(0);
} else {
int status;
// TODO: On crash, print debug info and resume thread
waitpid(pid, &status, 0);
LOG_DEBUGF("tpool.c", "Child process terminated with status code %d", WEXITSTATUS(status))
pthread_mutex_lock(&(pool->work_mutex));
pool->busy_cnt -= 1;
pool->done_cnt++;
pthread_mutex_unlock(&(pool->work_mutex));
if (WIFSIGNALED(status)) {
// parse_job_t *job = g_hash_table_lookup(ScanCtx.dbg_current_files, GINT_TO_POINTER(pthread_self()));
const char *job_filepath = "TODO";
LOG_FATALF_NO_EXIT(
"tpool.c",
"Child process was terminated by signal (%s).\n"
BLANK_STR "The process was working on %s",
strsignal(WTERMSIG(status)),
job_filepath
)
}
return NULL;
}
return NULL;
}
void tpool_wait(tpool_t *pool) {
LOG_DEBUG("tpool.c", "Waiting for worker threads to finish")
pthread_mutex_lock(&(pool->work_mutex));
LOG_DEBUG("tpool.c", "Waiting for worker threads to finish");
pthread_mutex_lock(&pool->shm->mutex);
pool->waiting = TRUE;
pool->shm->waiting = TRUE;
pool->shm->ipc_ctx.no_more_jobs = TRUE;
while (TRUE) {
if (pool->done_cnt < pool->work_cnt) {
pthread_cond_wait(&(pool->working_cond), &(pool->work_mutex));
if (pool->shm->ipc_ctx.job_count > 0) {
pthread_cond_wait(&(pool->shm->done_working_cond), &pool->shm->mutex);
} else {
LOG_INFOF("tpool.c", "Received head=NULL signal, busy_cnt=%d", pool->busy_cnt);
if (pool->done_cnt == pool->work_cnt && pool->busy_cnt == 0) {
pool->stop = TRUE;
if (pool->shm->ipc_ctx.job_count == 0 && pool->shm->busy_count == 0) {
pool->shm->stop = TRUE;
break;
}
}
@@ -262,34 +255,25 @@ void tpool_wait(tpool_t *pool) {
if (pool->print_progress && !LogCtx.json_logs) {
progress_bar_print(1.0, ScanCtx.stat_tn_size, ScanCtx.stat_index_size);
}
pthread_mutex_unlock(&(pool->work_mutex));
pthread_mutex_unlock(&pool->shm->mutex);
LOG_INFO("tpool.c", "Worker threads finished")
LOG_INFO("tpool.c", "Worker threads finished");
}
void tpool_destroy(tpool_t *pool) {
if (pool == NULL) {
return;
}
LOG_INFO("tpool.c", "Destroying thread pool");
LOG_INFO("tpool.c", "Destroying thread pool")
database_close(ProcData.ipc_db, FALSE);
pthread_mutex_lock(&(pool->work_mutex));
tpool_work_t *work = pool->work_head;
int count = 0;
while (work != NULL) {
tpool_work_t *tmp = work->next;
free(work);
work = tmp;
count += 1;
}
LOG_DEBUGF("tpool.c", "Destroyed %d jobs", count);
pthread_cond_broadcast(&(pool->has_work_cond));
pthread_mutex_unlock(&(pool->work_mutex));
pthread_mutex_lock(&pool->shm->mutex);
pthread_cond_broadcast(&pool->shm->ipc_ctx.has_work_cond);
pthread_mutex_unlock(&pool->shm->mutex);
for (size_t i = 0; i < pool->thread_cnt; i++) {
for (size_t i = 0; i < pool->num_threads; i++) {
pthread_t thread = pool->threads[i];
if (thread != 0) {
void *_;
@@ -297,42 +281,33 @@ void tpool_destroy(tpool_t *pool) {
}
}
LOG_INFO("tpool.c", "Final cleanup")
pthread_mutex_destroy(&pool->shm->ipc_ctx.mutex);
pthread_mutex_destroy(&pool->shm->mutex);
pthread_cond_destroy(&pool->shm->ipc_ctx.has_work_cond);
pthread_cond_destroy(&pool->shm->done_working_cond);
pthread_mutex_destroy(&(pool->work_mutex));
pthread_cond_destroy(&(pool->has_work_cond));
pthread_cond_destroy(&(pool->working_cond));
munmap(pool->shared_memory, pool->shared_memory_size);
munmap(pool->shm, sizeof(*pool->shm));
}
/**
* Create a thread pool
* @param thread_cnt Worker threads count
*/
tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress) {
tpool_t *tpool_create(int thread_cnt, int print_progress) {
size_t shm_size = 1024 * 1024 * 2000;
int fork = FALSE;
void *shared_memory = mmap(NULL, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
tpool_t *pool = malloc(sizeof(tpool_t));
tpool_t *pool = (tpool_t *) shared_memory;
pool->shared_memory = shared_memory;
pool->shared_memory_size = shm_size;
pool->mempool = (ncx_slab_pool_t *) (pool->shared_memory + sizeof(tpool_t));
pool->mempool->addr = pool->mempool;
pool->mempool->min_shift = 4;
pool->mempool->end = pool->shared_memory + shm_size;
pool->shm = mmap(NULL, sizeof(*pool->shm), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
ncx_slab_init(pool->mempool);
pool->thread_cnt = thread_cnt;
pool->work_cnt = 0;
pool->done_cnt = 0;
pool->busy_cnt = 0;
pool->stop = FALSE;
pool->waiting = FALSE;
pool->cleanup_func = cleanup_func;
pool->fork = fork;
pool->num_threads = thread_cnt;
pool->shm->ipc_ctx.job_count = 0;
pool->shm->ipc_ctx.no_more_jobs = FALSE;
pool->shm->stop = FALSE;
pool->shm->waiting = FALSE;
pool->shm->job_type = JOB_UNDEFINED;
memset(pool->threads, 0, sizeof(pool->threads));
pool->print_progress = print_progress;
@@ -340,27 +315,50 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress) {
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_setpshared(&mutexattr, TRUE);
pthread_mutex_init(&(pool->work_mutex), &mutexattr);
pthread_mutex_init(&(pool->mem_mutex), &mutexattr);
pthread_mutex_init(&(pool->shm->mutex), &mutexattr);
pthread_mutex_init(&(pool->shm->data_mutex), &mutexattr);
pthread_mutex_init(&(pool->shm->ipc_ctx.mutex), &mutexattr);
pthread_mutex_init(&(pool->shm->ipc_ctx.db_mutex), &mutexattr);
pthread_mutex_init(&(pool->shm->ipc_ctx.index_db_mutex), &mutexattr);
pthread_condattr_t condattr;
pthread_condattr_init(&condattr);
pthread_condattr_setpshared(&condattr, TRUE);
pthread_cond_init(&(pool->has_work_cond), &condattr);
pthread_cond_init(&(pool->working_cond), &condattr);
pthread_cond_init(&(pool->shm->ipc_ctx.has_work_cond), &condattr);
pthread_cond_init(&(pool->shm->done_working_cond), &condattr);
pthread_cond_init(&(pool->shm->workers_initialized_cond), &condattr);
pool->work_head = NULL;
pool->work_tail = NULL;
remove("/dev/shm/ipc.sist2");
remove("/dev/shm/ipc.sist2-wal");
remove("/dev/shm/ipc.sist2-shm");
ProcData.ipc_db = database_create("/dev/shm/ipc.sist2", IPC_PRODUCER_DATABASE);
ProcData.ipc_db->ipc_ctx = &pool->shm->ipc_ctx;
database_initialize(ProcData.ipc_db);
return pool;
}
void tpool_start(tpool_t *pool) {
LOG_INFOF("tpool.c", "Starting thread pool with %d threads", pool->thread_cnt)
LOG_INFOF("tpool.c", "Starting thread pool with %d threads", pool->num_threads);
for (size_t i = 0; i < pool->thread_cnt; i++) {
pthread_create(&pool->threads[i], NULL, tpool_worker, pool);
pthread_mutex_lock(&pool->shm->mutex);
for (int i = 0; i < pool->num_threads; i++) {
start_thread_arg_t *arg = malloc(sizeof(start_thread_arg_t));
arg->thread_id = i + 1;
arg->pool = pool;
pthread_create(&pool->threads[i], NULL, tpool_worker, arg);
}
// Only open the database when all workers are done initializing
while (pool->shm->initialized_count != pool->num_threads) {
pthread_cond_wait(&pool->shm->workers_initialized_cond, &pool->shm->mutex);
}
pthread_mutex_unlock(&pool->shm->mutex);
database_open(ProcData.ipc_db);
}