mirror of
https://github.com/simon987/sist2.git
synced 2025-12-19 18:24:54 +00:00
Fixes and cleanup
This commit is contained in:
153
src/tpool.c
153
src/tpool.c
@@ -6,7 +6,7 @@
|
||||
#include <sys/wait.h>
|
||||
#include "parsing/parse.h"
|
||||
|
||||
#define BLANK_STR " "
|
||||
#define BLANK_STR " "
|
||||
|
||||
typedef struct {
|
||||
int thread_id;
|
||||
@@ -17,7 +17,6 @@ typedef struct {
|
||||
typedef struct tpool {
|
||||
pthread_t threads[256];
|
||||
int num_threads;
|
||||
int fork;
|
||||
|
||||
int print_progress;
|
||||
|
||||
@@ -32,6 +31,8 @@ typedef struct tpool {
|
||||
pthread_cond_t workers_initialized_cond;
|
||||
int busy_count;
|
||||
int initialized_count;
|
||||
int thread_id_to_pid_mapping[MAX_THREADS];
|
||||
char ipc_database_filepath[128];
|
||||
} *shm;
|
||||
} tpool_t;
|
||||
|
||||
@@ -43,11 +44,6 @@ void job_destroy(job_t *job) {
|
||||
free(job);
|
||||
}
|
||||
|
||||
void tpool_dump_debug_info(tpool_t *pool) {
|
||||
// TODO
|
||||
LOG_DEBUGF("tpool.c", "pool->num_threads = %d", pool->num_threads);
|
||||
}
|
||||
|
||||
/**
|
||||
* Push work object to thread pool
|
||||
*/
|
||||
@@ -130,108 +126,124 @@ static void worker_thread_loop(tpool_t *pool) {
|
||||
}
|
||||
|
||||
static void worker_proc_init(tpool_t *pool, int thread_id) {
|
||||
// TODO create PID -> thread_id mapping for signal handler
|
||||
pthread_mutex_lock(&pool->shm->data_mutex);
|
||||
pool->shm->thread_id_to_pid_mapping[thread_id] = getpid();
|
||||
pthread_mutex_unlock(&pool->shm->data_mutex);
|
||||
|
||||
ProcData.thread_id = thread_id;
|
||||
|
||||
if (ScanCtx.index.path[0] != '\0') {
|
||||
// TODO This should be closed in proc cleanup function
|
||||
ProcData.index_db = database_create(ScanCtx.index.path, INDEX_DATABASE);
|
||||
ProcData.index_db->ipc_ctx = &pool->shm->ipc_ctx;
|
||||
database_open(ProcData.index_db);
|
||||
}
|
||||
|
||||
// TODO /dev/shm
|
||||
pthread_mutex_lock(&pool->shm->mutex);
|
||||
ProcData.ipc_db = database_create("/dev/shm/ipc.sist2", IPC_CONSUMER_DATABASE);
|
||||
ProcData.ipc_db = database_create(pool->shm->ipc_database_filepath, IPC_CONSUMER_DATABASE);
|
||||
ProcData.ipc_db->ipc_ctx = &pool->shm->ipc_ctx;
|
||||
database_open(ProcData.ipc_db);
|
||||
pthread_mutex_unlock(&pool->shm->mutex);
|
||||
}
|
||||
|
||||
void worker_proc_cleanup(tpool_t* pool) {
|
||||
void worker_proc_cleanup(tpool_t *pool) {
|
||||
if (ProcData.index_db != NULL) {
|
||||
database_close(ProcData.index_db, FALSE);
|
||||
}
|
||||
database_close(ProcData.ipc_db, FALSE);
|
||||
}
|
||||
|
||||
#ifndef SIST_DEBUG
|
||||
#define TPOOL_FORK
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Thread worker function
|
||||
*/
|
||||
static void *tpool_worker(void *arg) {
|
||||
tpool_t *pool = ((start_thread_arg_t *) arg)->pool;
|
||||
|
||||
if (pool->fork) {
|
||||
while (TRUE) {
|
||||
int pid = fork();
|
||||
#ifdef TPOOL_FORK
|
||||
while (TRUE) {
|
||||
int pid = fork();
|
||||
|
||||
if (pid == 0) {
|
||||
worker_proc_init(pool, ((start_thread_arg_t *) arg)->thread_id);
|
||||
if (pid == 0) {
|
||||
worker_proc_init(pool, ((start_thread_arg_t *) arg)->thread_id);
|
||||
|
||||
pthread_mutex_lock(&pool->shm->mutex);
|
||||
pthread_cond_signal(&pool->shm->workers_initialized_cond);
|
||||
pool->shm->initialized_count += 1;
|
||||
pthread_mutex_unlock(&pool->shm->mutex);
|
||||
pthread_mutex_lock(&pool->shm->mutex);
|
||||
pthread_cond_signal(&pool->shm->workers_initialized_cond);
|
||||
pool->shm->initialized_count += 1;
|
||||
pthread_mutex_unlock(&pool->shm->mutex);
|
||||
|
||||
worker_thread_loop(pool);
|
||||
worker_thread_loop(pool);
|
||||
|
||||
pthread_mutex_lock(&pool->shm->mutex);
|
||||
pthread_cond_signal(&pool->shm->done_working_cond);
|
||||
pthread_mutex_unlock(&pool->shm->mutex);
|
||||
pthread_mutex_lock(&pool->shm->mutex);
|
||||
pthread_cond_signal(&pool->shm->done_working_cond);
|
||||
pthread_mutex_unlock(&pool->shm->mutex);
|
||||
|
||||
worker_proc_cleanup(pool);
|
||||
worker_proc_cleanup(pool);
|
||||
|
||||
exit(0);
|
||||
exit(0);
|
||||
|
||||
} else {
|
||||
int status;
|
||||
// TODO: On crash, print debug info and resume thread
|
||||
waitpid(pid, &status, 0);
|
||||
} else {
|
||||
int status;
|
||||
waitpid(pid, &status, 0);
|
||||
|
||||
LOG_DEBUGF("tpool.c", "Child process terminated with status code %d", WEXITSTATUS(status));
|
||||
LOG_DEBUGF("tpool.c", "Child process terminated with status code %d", WEXITSTATUS(status));
|
||||
|
||||
pthread_mutex_lock(&(pool->shm->ipc_ctx.mutex));
|
||||
pool->shm->ipc_ctx.completed_job_count += 1;
|
||||
pthread_mutex_unlock(&(pool->shm->ipc_ctx.mutex));
|
||||
pthread_mutex_lock(&(pool->shm->ipc_ctx.mutex));
|
||||
pool->shm->ipc_ctx.completed_job_count += 1;
|
||||
pthread_mutex_unlock(&(pool->shm->ipc_ctx.mutex));
|
||||
|
||||
pthread_mutex_lock(&(pool->shm->data_mutex));
|
||||
pool->shm->busy_count -= 1;
|
||||
pthread_mutex_unlock(&(pool->shm->data_mutex));
|
||||
pthread_mutex_lock(&(pool->shm->data_mutex));
|
||||
pool->shm->busy_count -= 1;
|
||||
pthread_mutex_unlock(&(pool->shm->data_mutex));
|
||||
|
||||
if (WIFSIGNALED(status)) {
|
||||
// TODO: Get current_job based on PID
|
||||
const char *job_filepath = "TODO";
|
||||
|
||||
LOG_FATALF_NO_EXIT(
|
||||
"tpool.c",
|
||||
"Child process was terminated by signal (%s).\n"
|
||||
BLANK_STR "The process was working on %s",
|
||||
strsignal(WTERMSIG(status)),
|
||||
job_filepath
|
||||
);
|
||||
if (WIFSIGNALED(status)) {
|
||||
int crashed_thread_id = -1;
|
||||
for (int i = 0; i < MAX_THREADS; i++) {
|
||||
if (pool->shm->thread_id_to_pid_mapping[i] == pid) {
|
||||
crashed_thread_id = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
const char *job_filepath;
|
||||
if (crashed_thread_id != -1) {
|
||||
job_filepath = pool->shm->ipc_ctx.current_job[crashed_thread_id];
|
||||
} else {
|
||||
job_filepath = "unknown";
|
||||
}
|
||||
|
||||
LOG_FATALF_NO_EXIT(
|
||||
"tpool.c",
|
||||
"Child process crashed (%s).\n"
|
||||
BLANK_STR "The process was working on %s\n"
|
||||
BLANK_STR "Please consider creating a bug report at https://github.com/simon987/sist2/issues !\n"
|
||||
BLANK_STR "sist2 is an open source project and relies on the collaboration of its users to diagnose and fix bugs.\n",
|
||||
strsignal(WTERMSIG(status)),
|
||||
job_filepath
|
||||
);
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
} else {
|
||||
worker_proc_init(pool, ((start_thread_arg_t *) arg)->thread_id);
|
||||
|
||||
pthread_mutex_lock(&pool->shm->mutex);
|
||||
pthread_cond_signal(&pool->shm->workers_initialized_cond);
|
||||
pool->shm->initialized_count += 1;
|
||||
pthread_mutex_unlock(&pool->shm->mutex);
|
||||
|
||||
worker_thread_loop(pool);
|
||||
|
||||
pthread_mutex_lock(&pool->shm->mutex);
|
||||
pthread_cond_signal(&pool->shm->done_working_cond);
|
||||
pthread_mutex_unlock(&pool->shm->mutex);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#else
|
||||
worker_proc_init(pool, ((start_thread_arg_t *) arg)->thread_id);
|
||||
|
||||
pthread_mutex_lock(&pool->shm->mutex);
|
||||
pthread_cond_signal(&pool->shm->workers_initialized_cond);
|
||||
pool->shm->initialized_count += 1;
|
||||
pthread_mutex_unlock(&pool->shm->mutex);
|
||||
|
||||
worker_thread_loop(pool);
|
||||
|
||||
pthread_mutex_lock(&pool->shm->mutex);
|
||||
pthread_cond_signal(&pool->shm->done_working_cond);
|
||||
pthread_mutex_unlock(&pool->shm->mutex);
|
||||
#endif
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -295,13 +307,10 @@ void tpool_destroy(tpool_t *pool) {
|
||||
*/
|
||||
tpool_t *tpool_create(int thread_cnt, int print_progress) {
|
||||
|
||||
int fork = FALSE;
|
||||
|
||||
tpool_t *pool = malloc(sizeof(tpool_t));
|
||||
|
||||
pool->shm = mmap(NULL, sizeof(*pool->shm), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
|
||||
|
||||
pool->fork = fork;
|
||||
pool->num_threads = thread_cnt;
|
||||
pool->shm->ipc_ctx.job_count = 0;
|
||||
pool->shm->ipc_ctx.no_more_jobs = FALSE;
|
||||
@@ -310,6 +319,7 @@ tpool_t *tpool_create(int thread_cnt, int print_progress) {
|
||||
pool->shm->job_type = JOB_UNDEFINED;
|
||||
memset(pool->threads, 0, sizeof(pool->threads));
|
||||
pool->print_progress = print_progress;
|
||||
sprintf(pool->shm->ipc_database_filepath, "/dev/shm/sist2-ipc-%d.sqlite", getpid());
|
||||
|
||||
pthread_mutexattr_t mutexattr;
|
||||
pthread_mutexattr_init(&mutexattr);
|
||||
@@ -329,10 +339,7 @@ tpool_t *tpool_create(int thread_cnt, int print_progress) {
|
||||
pthread_cond_init(&(pool->shm->done_working_cond), &condattr);
|
||||
pthread_cond_init(&(pool->shm->workers_initialized_cond), &condattr);
|
||||
|
||||
remove("/dev/shm/ipc.sist2");
|
||||
remove("/dev/shm/ipc.sist2-wal");
|
||||
remove("/dev/shm/ipc.sist2-shm");
|
||||
ProcData.ipc_db = database_create("/dev/shm/ipc.sist2", IPC_PRODUCER_DATABASE);
|
||||
ProcData.ipc_db = database_create(pool->shm->ipc_database_filepath, IPC_PRODUCER_DATABASE);
|
||||
ProcData.ipc_db->ipc_ctx = &pool->shm->ipc_ctx;
|
||||
database_initialize(ProcData.ipc_db);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user