From f8abffba81236e699a9075923ae30a22ad06c281 Mon Sep 17 00:00:00 2001 From: simon987 Date: Thu, 9 Mar 2023 22:11:21 -0500 Subject: [PATCH] process pool mostly works, still WIP --- CMakeLists.txt | 1 + docs/USAGE.md | 2 + src/cli.c | 4 +- src/index/elastic.c | 27 +- src/io/serialize.c | 32 +- src/io/store.c | 150 +++-- src/io/store.h | 18 +- src/io/walk.c | 20 +- src/main.c | 17 +- src/mempool/mempool.c | 757 ++++++++++++++++++++++ src/mempool/mempool.h | 62 ++ src/parsing/parse.c | 31 +- src/parsing/parse.h | 4 +- src/tpool.c | 255 ++++---- src/tpool.h | 19 +- src/util.c | 2 + third-party/libscan/libscan/arc/arc.c | 22 +- third-party/libscan/libscan/ebook/ebook.c | 14 +- third-party/libscan/libscan/ebook/ebook.h | 1 - third-party/libscan/libscan/json/json.c | 2 +- third-party/libscan/libscan/media/media.c | 8 +- third-party/libscan/libscan/scan.h | 14 +- third-party/libscan/libscan/text/text.c | 4 +- third-party/libscan/libscan/util.h | 6 +- third-party/libscan/test/test_util.cpp | 14 +- 25 files changed, 1219 insertions(+), 267 deletions(-) create mode 100644 src/mempool/mempool.c create mode 100644 src/mempool/mempool.h diff --git a/CMakeLists.txt b/CMakeLists.txt index d7517ad..6857720 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,6 +39,7 @@ add_executable(sist2 src/cli.c src/cli.h src/stats.c src/stats.h src/ctx.c src/parsing/sidecar.c src/parsing/sidecar.h + src/mempool/mempool.c src/mempool/mempool.h src/auth0/auth0_c_api.h src/auth0/auth0_c_api.cpp diff --git a/docs/USAGE.md b/docs/USAGE.md index 3dfc31d..dffe11a 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -164,6 +164,8 @@ that is about `8000000 * 36kB = 288GB`. ![thumbnail_size](thumbnail_size.png) +// TODO: add note about LMDB page size 4096 + ### Scan examples Simple scan diff --git a/src/cli.c b/src/cli.c index 7349aa3..1bafba6 100644 --- a/src/cli.c +++ b/src/cli.c @@ -140,8 +140,8 @@ int scan_args_validate(scan_args_t *args, int argc, const char **argv) { if (args->threads == 0) { args->threads = 1; - } else if (args->threads < 0) { - fprintf(stderr, "Invalid value for --threads: %d. Must be a positive number\n", args->threads); + } else if (args->threads < 0 || args->threads > 256) { + fprintf(stderr, "Invalid value for --threads: %d. Must be a positive number <= 256\n", args->threads); return 1; } diff --git a/src/index/elastic.c b/src/index/elastic.c index 0012767..0b41b07 100644 --- a/src/index/elastic.c +++ b/src/index/elastic.c @@ -64,18 +64,26 @@ void print_json(cJSON *document, const char id_str[SIST_DOC_ID_LEN]) { cJSON_Delete(line); } -void index_json_func(void *arg) { - es_bulk_line_t *line = arg; +void index_json_func(tpool_work_arg_shm_t *arg) { + // Copy arg to heap because it's going to be free immediately after this function returns + es_bulk_line_t *line = malloc(arg->arg_size); + memcpy(line, arg->arg, arg->arg_size); + elastic_index_line(line); } -void delete_document(const char* document_id_str, void* UNUSED(_data)) { +void delete_document(const char *document_id_str, void *UNUSED(_data)) { es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t)); + bulk_line->type = ES_BULK_LINE_DELETE; bulk_line->next = NULL; - strcpy(bulk_line->doc_id, document_id_str); - tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); + + tpool_work_arg_t arg = { + .arg_size = sizeof(es_bulk_line_t), + .arg = bulk_line + }; + tpool_add_work(IndexCtx.pool, index_json_func, &arg); } @@ -92,7 +100,11 @@ void index_json(cJSON *document, const char doc_id[SIST_DOC_ID_LEN]) { bulk_line->next = NULL; cJSON_free(json); - tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); + tpool_work_arg_t arg = { + .arg_size = sizeof(es_bulk_line_t) + json_len + 2, + .arg = bulk_line + }; + tpool_add_work(IndexCtx.pool, index_json_func, &arg); } void execute_update_script(const char *script, int async, const char index_id[SIST_INDEX_ID_LEN]) { @@ -538,7 +550,8 @@ void elastic_init(int force_reset, const char *user_mappings, const char *user_s free_response(r); if (IS_LEGACY_VERSION(es_version)) { - snprintf(url, sizeof(url), "%s/%s/_mappings/_doc?include_type_name=true", IndexCtx.es_url, IndexCtx.es_index); + snprintf(url, sizeof(url), "%s/%s/_mappings/_doc?include_type_name=true", IndexCtx.es_url, + IndexCtx.es_index); } else { snprintf(url, sizeof(url), "%s/%s/_mappings", IndexCtx.es_url, IndexCtx.es_index); } diff --git a/src/io/serialize.c b/src/io/serialize.c index f00d737..c438025 100644 --- a/src/io/serialize.c +++ b/src/io/serialize.c @@ -197,7 +197,7 @@ static struct { ZSTD_CCtx *cctx; } WriterCtx = { - .out_file = NULL + .out_file = NULL }; #define ZSTD_COMPRESSION_LEVEL 10 @@ -229,7 +229,9 @@ void zstd_write_string(const char *string, const size_t len) { } while (input.pos != input.size); } -void write_document_func(void *arg) { +void write_document_func(tpool_work_arg_shm_t *arg) { + + const char *json_str = arg->arg; if (WriterCtx.out_file == NULL) { char dstfile[PATH_MAX]; @@ -237,17 +239,7 @@ void write_document_func(void *arg) { initialize_writer_ctx(dstfile); } - document_t *doc = arg; - - char *json_str = build_json_string(doc); - const size_t json_str_len = strlen(json_str); - - json_str = realloc(json_str, json_str_len + 1); - *(json_str + json_str_len) = '\n'; - - zstd_write_string(json_str, json_str_len + 1); - - free(json_str); + zstd_write_string(json_str, arg->arg_size); } void zstd_close() { @@ -345,7 +337,19 @@ index_descriptor_t read_index_descriptor(char *path) { void write_document(document_t *doc) { - tpool_add_work(ScanCtx.writer_pool, write_document_func, doc); + char *json_str = build_json_string(doc); + free(doc); + const size_t json_str_len = strlen(json_str); + + json_str = realloc(json_str, json_str_len + 1); + *(json_str + json_str_len) = '\n'; + + tpool_work_arg_t arg = { + .arg_size = json_str_len + 1, + .arg = json_str + }; + + tpool_add_work(ScanCtx.writer_pool, write_document_func, &arg); } void thread_cleanup() { diff --git a/src/io/store.c b/src/io/store.c index 1cdc754..2e03e8d 100644 --- a/src/io/store.c +++ b/src/io/store.c @@ -1,18 +1,13 @@ +#include #include "store.h" #include "src/ctx.h" -store_t *store_create(const char *path, size_t chunk_size) { - store_t *store = malloc(sizeof(struct store_t)); - mkdir(path, S_IWUSR | S_IRUSR | S_IXUSR); - strcpy(store->path, path); +//#define SIST_FAKE_STORE 1 -#if (SIST_FAKE_STORE != 1) - store->chunk_size = chunk_size; - pthread_rwlock_init(&store->lock, NULL); +void open_env(const char *path, MDB_env **env, MDB_dbi *dbi) { + mdb_env_create(env); - mdb_env_create(&store->env); - - int open_ret = mdb_env_open(store->env, + int open_ret = mdb_env_open(*env, path, MDB_WRITEMAP | MDB_MAPASYNC, S_IRUSR | S_IWUSR @@ -22,14 +17,33 @@ store_t *store_create(const char *path, size_t chunk_size) { LOG_FATALF("store.c", "Error while opening store: %s (%s)\n", mdb_strerror(open_ret), path) } - store->size = (size_t) store->chunk_size; - mdb_env_set_mapsize(store->env, store->size); - - // Open dbi MDB_txn *txn; - mdb_txn_begin(store->env, NULL, 0, &txn); - mdb_dbi_open(txn, NULL, 0, &store->dbi); + mdb_txn_begin(*env, NULL, 0, &txn); + mdb_dbi_open(txn, NULL, 0, dbi); mdb_txn_commit(txn); +} + +store_t *store_create(const char *path, size_t chunk_size) { + store_t *store = calloc(1, sizeof(struct store_t)); + mkdir(path, S_IWUSR | S_IRUSR | S_IXUSR); + strcpy(store->path, path); + + MDB_env *env; + MDB_dbi dbi; + +#if (SIST_FAKE_STORE != 1) + store->chunk_size = chunk_size; + + store->shared_memory = mmap(NULL, sizeof(*store->shm), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + store->shm = store->shared_memory; + + open_env(path, &env, &dbi); + + store->shm->size = (size_t) store->chunk_size; + mdb_env_set_mapsize(env, store->shm->size); + + // Close, child processes will open the environment again + mdb_env_close(env); #endif return store; @@ -37,26 +51,35 @@ store_t *store_create(const char *path, size_t chunk_size) { void store_destroy(store_t *store) { + LOG_DEBUG("store.c", "store_destroy()") #if (SIST_FAKE_STORE != 1) - pthread_rwlock_destroy(&store->lock); - mdb_dbi_close(store->env, store->dbi); - mdb_env_close(store->env); + munmap(store->shared_memory, sizeof(*store->shm)); + + mdb_dbi_close(store->proc.env, store->proc.dbi); + mdb_env_close(store->proc.env); #endif free(store); } void store_flush(store_t *store) { - mdb_env_sync(store->env, TRUE); + mdb_env_sync(store->proc.env, TRUE); } void store_write(store_t *store, char *key, size_t key_len, char *buf, size_t buf_len) { + ScanCtx.stat_tn_size += buf_len; + if (LogCtx.very_verbose) { LOG_DEBUGF("store.c", "Store write %s@{%s} %lu bytes", store->path, key, buf_len) } #if (SIST_FAKE_STORE != 1) + if (store->proc.env == NULL) { + open_env(store->path, &store->proc.env, &store->proc.dbi); + LOG_DEBUGF("store.c", "Opening mdb environment %s", store->path) + } + MDB_val mdb_key; mdb_key.mv_data = key; mdb_key.mv_size = key_len; @@ -66,70 +89,80 @@ void store_write(store_t *store, char *key, size_t key_len, char *buf, size_t bu mdb_value.mv_size = buf_len; MDB_txn *txn; - pthread_rwlock_rdlock(&store->lock); - mdb_txn_begin(store->env, NULL, 0, &txn); - - int put_ret = mdb_put(txn, store->dbi, &mdb_key, &mdb_value, 0); - ScanCtx.stat_tn_size += buf_len; int db_full = FALSE; + int put_ret = 0; int should_abort_transaction = FALSE; + int should_increase_size = TRUE; - if (put_ret == MDB_MAP_FULL) { + int begin_ret = mdb_txn_begin(store->proc.env, NULL, 0, &txn); + + if (begin_ret == MDB_MAP_RESIZED) { + // mapsize was increased by another process. We don't need to increase the size again, but we need + // to update the size of the environment for the current process. db_full = TRUE; - should_abort_transaction = TRUE; + should_increase_size = FALSE; } else { - int commit_ret = mdb_txn_commit(txn); + put_ret = mdb_put(txn, store->proc.dbi, &mdb_key, &mdb_value, 0); - if (commit_ret == MDB_MAP_FULL) { + if (put_ret == MDB_MAP_FULL) { + // Database is full, we need to increase the environment size db_full = TRUE; + should_abort_transaction = TRUE; + } else { + int commit_ret = mdb_txn_commit(txn); + + if (commit_ret == MDB_MAP_FULL) { + db_full = TRUE; + } } } if (db_full) { - LOG_DEBUGF("store.c", "Updating mdb mapsize to %lu bytes", store->size) + LOG_DEBUGF("store.c", "Updating mdb mapsize to %lu bytes", store->shm->size) if (should_abort_transaction) { mdb_txn_abort(txn); } - pthread_rwlock_unlock(&store->lock); - - // Cannot resize when there is a opened transaction. + // Cannot resize when there is an opened transaction in this process. // Resize take effect on the next commit. - pthread_rwlock_wrlock(&store->lock); - store->size += store->chunk_size; - int resize_ret = mdb_env_set_mapsize(store->env, store->size); - if (resize_ret != 0) { - LOG_ERROR("store.c", mdb_strerror(put_ret)) + if (should_increase_size) { + store->shm->size += store->chunk_size; } - mdb_txn_begin(store->env, NULL, 0, &txn); - int put_ret_retry = mdb_put(txn, store->dbi, &mdb_key, &mdb_value, 0); + int resize_ret = mdb_env_set_mapsize(store->proc.env, store->shm->size); + if (resize_ret != 0) { + LOG_ERRORF("store.c", "mdb_env_set_mapsize() failed: %s", mdb_strerror(resize_ret)) + } + mdb_txn_begin(store->proc.env, NULL, 0, &txn); + int put_ret_retry = mdb_put(txn, store->proc.dbi, &mdb_key, &mdb_value, 0); if (put_ret_retry != 0) { - LOG_ERROR("store.c", mdb_strerror(put_ret)) + LOG_ERRORF("store.c", "mdb_put() (retry) failed: %s", mdb_strerror(put_ret_retry)) } int ret = mdb_txn_commit(txn); if (ret != 0) { LOG_FATALF("store.c", "FIXME: Could not commit to store %s: %s (%d), %d, %d %d", store->path, mdb_strerror(ret), ret, - put_ret, put_ret_retry); + ret, put_ret_retry) } - LOG_DEBUGF("store.c", "Updated mdb mapsize to %lu bytes", store->size) + LOG_DEBUGF("store.c", "Updated mdb mapsize to %lu bytes", store->shm->size) } else if (put_ret != 0) { - LOG_ERROR("store.c", mdb_strerror(put_ret)) + LOG_ERRORF("store.c", "mdb_put() failed: %s", mdb_strerror(put_ret)) } - pthread_rwlock_unlock(&store->lock); - #endif } -char *store_read(store_t *store, char *key, size_t key_len, size_t *ret_vallen) { +char *store_read(store_t *store, char *key, size_t key_len, size_t *return_value_len) { char *buf = NULL; #if (SIST_FAKE_STORE != 1) + if (store->proc.env == NULL) { + open_env(store->path, &store->proc.env, &store->proc.dbi); + } + MDB_val mdb_key; mdb_key.mv_data = key; mdb_key.mv_size = key_len; @@ -137,14 +170,14 @@ char *store_read(store_t *store, char *key, size_t key_len, size_t *ret_vallen) MDB_val mdb_value; MDB_txn *txn; - mdb_txn_begin(store->env, NULL, MDB_RDONLY, &txn); + mdb_txn_begin(store->proc.env, NULL, MDB_RDONLY, &txn); - int get_ret = mdb_get(txn, store->dbi, &mdb_key, &mdb_value); + int get_ret = mdb_get(txn, store->proc.dbi, &mdb_key, &mdb_value); if (get_ret == MDB_NOTFOUND) { - *ret_vallen = 0; + *return_value_len = 0; } else { - *ret_vallen = mdb_value.mv_size; + *return_value_len = mdb_value.mv_size; buf = malloc(mdb_value.mv_size); memcpy(buf, mdb_value.mv_data, mdb_value.mv_size); } @@ -156,15 +189,20 @@ char *store_read(store_t *store, char *key, size_t key_len, size_t *ret_vallen) GHashTable *store_read_all(store_t *store) { + if (store->proc.env == NULL) { + open_env(store->path, &store->proc.env, &store->proc.dbi); + LOG_DEBUGF("store.c", "Opening mdb environment %s", store->path) + } + int count = 0; GHashTable *table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); MDB_txn *txn = NULL; - mdb_txn_begin(store->env, NULL, MDB_RDONLY, &txn); + mdb_txn_begin(store->proc.env, NULL, MDB_RDONLY, &txn); MDB_cursor *cur = NULL; - mdb_cursor_open(txn, store->dbi, &cur); + mdb_cursor_open(txn, store->proc.dbi, &cur); MDB_val key; MDB_val value; @@ -180,8 +218,8 @@ GHashTable *store_read_all(store_t *store) { } const char *path; - mdb_env_get_path(store->env, &path); - LOG_DEBUGF("store.c", "Read %d entries from %s", count, path); + mdb_env_get_path(store->proc.env, &path); + LOG_DEBUGF("store.c", "Read %d entries from %s", count, path) mdb_cursor_close(cur); mdb_txn_abort(txn); @@ -191,5 +229,5 @@ GHashTable *store_read_all(store_t *store) { void store_copy(store_t *store, const char *destination) { mkdir(destination, S_IWUSR | S_IRUSR | S_IXUSR); - mdb_env_copy(store->env, destination); + mdb_env_copy(store->proc.env, destination); } diff --git a/src/io/store.h b/src/io/store.h index a1f7217..b6c8472 100644 --- a/src/io/store.h +++ b/src/io/store.h @@ -10,14 +10,20 @@ #define STORE_SIZE_TAG (1024 * 1024) #define STORE_SIZE_META STORE_SIZE_TAG + typedef struct store_t { char path[PATH_MAX]; - char *tmp_path; - MDB_dbi dbi; - MDB_env *env; - size_t size; size_t chunk_size; - pthread_rwlock_t lock; + void *shared_memory; + + struct { + MDB_dbi dbi; + MDB_env *env; + } proc; + + struct { + size_t size; + } *shm; } store_t; store_t *store_create(const char *path, size_t chunk_size); @@ -28,7 +34,7 @@ void store_write(store_t *store, char *key, size_t key_len, char *buf, size_t bu void store_flush(store_t *store); -char *store_read(store_t *store, char *key, size_t key_len, size_t *ret_vallen); +char *store_read(store_t *store, char *key, size_t key_len, size_t *return_value_len); GHashTable *store_read_all(store_t *store); diff --git a/src/io/walk.c b/src/io/walk.c index cdaeda5..b6019cf 100644 --- a/src/io/walk.c +++ b/src/io/walk.c @@ -20,11 +20,13 @@ parse_job_t *create_fs_parse_job(const char *filepath, const struct stat *info, job->ext = len; } - job->vfile.info = *info; + job->vfile.st_size = info->st_size; + job->vfile.st_mode = info->st_mode; + job->vfile.mtime = (int) info->st_mtim.tv_sec; job->parent[0] = '\0'; - job->vfile.filepath = job->filepath; + memcpy(job->vfile.filepath, job->filepath, sizeof(job->vfile.filepath)); job->vfile.read = fs_read; // Filesystem reads are always rewindable job->vfile.read_rewindable = fs_read; @@ -68,7 +70,12 @@ int handle_entry(const char *filepath, const struct stat *info, int typeflag, st if (typeflag == FTW_F && S_ISREG(info->st_mode)) { parse_job_t *job = create_fs_parse_job(filepath, info, ftw->base); - tpool_add_work(ScanCtx.pool, parse, job); + + tpool_work_arg_t arg = { + .arg_size = sizeof(parse_job_t), + .arg = job + }; + tpool_add_work(ScanCtx.pool, parse, &arg); } return FTW_CONTINUE; @@ -128,7 +135,12 @@ int iterate_file_list(void *input_file) { parse_job_t *job = create_fs_parse_job(absolute_path, &info, base); free(absolute_path); - tpool_add_work(ScanCtx.pool, parse, job); + + tpool_work_arg_t arg = { + .arg = job, + .arg_size = sizeof(parse_job_t) + }; + tpool_add_work(ScanCtx.pool, parse, &arg); } return 0; diff --git a/src/main.c b/src/main.c index d5ad271..03e40a5 100644 --- a/src/main.c +++ b/src/main.c @@ -188,7 +188,7 @@ void initialize_scan_context(scan_args_t *args) { ScanCtx.arc_ctx.mode = args->archive_mode; ScanCtx.arc_ctx.log = _log; ScanCtx.arc_ctx.logf = _logf; - ScanCtx.arc_ctx.parse = (parse_callback_t) parse; + ScanCtx.arc_ctx.parse = (parse_callback_t) parse_job; if (args->archive_passphrase != NULL) { strcpy(ScanCtx.arc_ctx.passphrase, args->archive_passphrase); } else { @@ -206,7 +206,6 @@ void initialize_scan_context(scan_args_t *args) { ScanCtx.comic_ctx.cbz_mime = mime_get_mime_by_string(ScanCtx.mime_table, "application/x-cbz"); // Ebook - pthread_mutex_init(&ScanCtx.ebook_ctx.mupdf_mutex, NULL); ScanCtx.ebook_ctx.content_size = args->content_size; ScanCtx.ebook_ctx.enable_tn = args->tn_count > 0; ScanCtx.ebook_ctx.tn_size = args->tn_size; @@ -407,12 +406,12 @@ void sist2_scan(scan_args_t *args) { load_incremental_index(args); } - ScanCtx.pool = tpool_create(ScanCtx.threads, thread_cleanup, TRUE, TRUE, ScanCtx.mem_limit); - tpool_start(ScanCtx.pool); - - ScanCtx.writer_pool = tpool_create(1, writer_cleanup, TRUE, FALSE, 0); + ScanCtx.writer_pool = tpool_create(1, writer_cleanup, FALSE, 0); tpool_start(ScanCtx.writer_pool); + ScanCtx.pool = tpool_create(ScanCtx.threads, thread_cleanup, TRUE, ScanCtx.mem_limit); + tpool_start(ScanCtx.pool); + if (args->list_path) { // Scan using file list int list_ret = iterate_file_list(args->list_file); @@ -494,7 +493,7 @@ void sist2_index(index_args_t *args) { f = index_json; } - IndexCtx.pool = tpool_create(args->threads, elastic_cleanup, FALSE, args->print == 0, 0); + IndexCtx.pool = tpool_create(args->threads, elastic_cleanup, args->print == 0, 0); tpool_start(IndexCtx.pool); READ_INDICES(file_path, args->index_path, { @@ -616,8 +615,8 @@ int set_to_negative_if_value_is_zero(struct argparse *self, const struct argpars int main(int argc, const char *argv[]) { - sigsegv_handler = signal(SIGSEGV, sig_handler); - sigabrt_handler = signal(SIGABRT, sig_handler); +// sigsegv_handler = signal(SIGSEGV, sig_handler); +// sigabrt_handler = signal(SIGABRT, sig_handler); setlocale(LC_ALL, ""); diff --git a/src/mempool/mempool.c b/src/mempool/mempool.c new file mode 100644 index 0000000..d8dd0a8 --- /dev/null +++ b/src/mempool/mempool.c @@ -0,0 +1,757 @@ +#include "mempool.h" +#include + +#define NCX_SLAB_PAGE_MASK 3 +#define NCX_SLAB_PAGE 0 +#define NCX_SLAB_BIG 1 +#define NCX_SLAB_EXACT 2 +#define NCX_SLAB_SMALL 3 + +#define NCX_SLAB_PAGE_FREE 0 +#define NCX_SLAB_PAGE_BUSY 0xffffffffffffffff +#define NCX_SLAB_PAGE_START 0x8000000000000000 + +#define NCX_SLAB_SHIFT_MASK 0x000000000000000f +#define NCX_SLAB_MAP_MASK 0xffffffff00000000 +#define NCX_SLAB_MAP_SHIFT 32 + +#define NCX_SLAB_BUSY 0xffffffffffffffff + + +static ncx_slab_page_t *ncx_slab_alloc_pages(ncx_slab_pool_t *pool, ncx_uint_t pages); + +static void ncx_slab_free_pages(ncx_slab_pool_t *pool, ncx_slab_page_t *page, ncx_uint_t pages); + +static bool ncx_slab_empty(ncx_slab_pool_t *pool, ncx_slab_page_t *page); + +static ncx_uint_t ncx_slab_max_size; +static ncx_uint_t ncx_slab_exact_size; +static ncx_uint_t ncx_slab_exact_shift; +static ncx_uint_t ncx_pagesize; +static ncx_uint_t ncx_pagesize_shift; +static ncx_uint_t ncx_real_pages; + +void ncx_slab_init(ncx_slab_pool_t *pool) { + u_char *p; + size_t size; + ncx_uint_t i, n, pages; + ncx_slab_page_t *slots; + + /*pagesize*/ + ncx_pagesize = getpagesize(); + for (n = ncx_pagesize, ncx_pagesize_shift = 0; + n >>= 1; ncx_pagesize_shift++) { /* void */ } + + /* STUB */ + if (ncx_slab_max_size == 0) { + ncx_slab_max_size = ncx_pagesize / 2; + ncx_slab_exact_size = ncx_pagesize / (8 * sizeof(uintptr_t)); + for (n = ncx_slab_exact_size; n >>= 1; ncx_slab_exact_shift++) { + /* void */ + } + } + + pool->min_size = 1 << pool->min_shift; + + p = (u_char *) pool + sizeof(ncx_slab_pool_t); + slots = (ncx_slab_page_t *) p; + + n = ncx_pagesize_shift - pool->min_shift; + for (i = 0; i < n; i++) { + slots[i].slab = 0; + slots[i].next = &slots[i]; + slots[i].prev = 0; + } + + p += n * sizeof(ncx_slab_page_t); + + size = pool->end - p; + + pages = (ncx_uint_t) (size / (ncx_pagesize + sizeof(ncx_slab_page_t))); + + ncx_memzero(p, pages * sizeof(ncx_slab_page_t)); + + pool->pages = (ncx_slab_page_t *) p; + + pool->free.prev = 0; + pool->free.next = (ncx_slab_page_t *) p; + + pool->pages->slab = pages; + pool->pages->next = &pool->free; + pool->pages->prev = (uintptr_t) &pool->free; + + pool->start = (u_char *) + ncx_align_ptr((uintptr_t) p + pages * sizeof(ncx_slab_page_t), + ncx_pagesize); + + ncx_real_pages = (pool->end - pool->start) / ncx_pagesize; + pool->pages->slab = ncx_real_pages; +} + + +void *ncx_slab_alloc(ncx_slab_pool_t *pool, size_t size) { + size_t s; + uintptr_t p, n, m, mask, *bitmap; + ncx_uint_t i, slot, shift, map; + ncx_slab_page_t *page, *prev, *slots; + + if (size >= ncx_slab_max_size) { + + page = ncx_slab_alloc_pages(pool, (size >> ncx_pagesize_shift) + + ((size % ncx_pagesize) ? 1 : 0)); + if (page) { + p = (page - pool->pages) << ncx_pagesize_shift; + p += (uintptr_t) pool->start; + + } else { + p = 0; + } + + goto done; + } + + if (size > pool->min_size) { + shift = 1; + for (s = size - 1; s >>= 1; shift++) { /* void */ } + slot = shift - pool->min_shift; + + } else { + shift = pool->min_shift; + slot = 0; + } + + slots = (ncx_slab_page_t *) ((u_char *) pool + sizeof(ncx_slab_pool_t)); + page = slots[slot].next; + + if (page->next != page) { + + if (shift < ncx_slab_exact_shift) { + + do { + p = (page - pool->pages) << ncx_pagesize_shift; + bitmap = (uintptr_t *) (pool->start + p); + + map = (1 << (ncx_pagesize_shift - shift)) + / (sizeof(uintptr_t) * 8); + + for (n = 0; n < map; n++) { + + if (bitmap[n] != NCX_SLAB_BUSY) { + + for (m = 1, i = 0; m; m <<= 1, i++) { + if ((bitmap[n] & m)) { + continue; + } + + bitmap[n] |= m; + + i = ((n * sizeof(uintptr_t) * 8) << shift) + + (i << shift); + + if (bitmap[n] == NCX_SLAB_BUSY) { + for (n = n + 1; n < map; n++) { + if (bitmap[n] != NCX_SLAB_BUSY) { + p = (uintptr_t) bitmap + i; + + goto done; + } + } + + prev = (ncx_slab_page_t *) + (page->prev & ~NCX_SLAB_PAGE_MASK); + prev->next = page->next; + page->next->prev = page->prev; + + page->next = NULL; + page->prev = NCX_SLAB_SMALL; + } + + p = (uintptr_t) bitmap + i; + + goto done; + } + } + } + + page = page->next; + + } while (page); + + } else if (shift == ncx_slab_exact_shift) { + + do { + if (page->slab != NCX_SLAB_BUSY) { + + for (m = 1, i = 0; m; m <<= 1, i++) { + if ((page->slab & m)) { + continue; + } + + page->slab |= m; + + if (page->slab == NCX_SLAB_BUSY) { + prev = (ncx_slab_page_t *) + (page->prev & ~NCX_SLAB_PAGE_MASK); + prev->next = page->next; + page->next->prev = page->prev; + + page->next = NULL; + page->prev = NCX_SLAB_EXACT; + } + + p = (page - pool->pages) << ncx_pagesize_shift; + p += i << shift; + p += (uintptr_t) pool->start; + + goto done; + } + } + + page = page->next; + + } while (page); + + } else { /* shift > ncx_slab_exact_shift */ + + n = ncx_pagesize_shift - (page->slab & NCX_SLAB_SHIFT_MASK); + n = 1 << n; + n = ((uintptr_t) 1 << n) - 1; + mask = n << NCX_SLAB_MAP_SHIFT; + + do { + if ((page->slab & NCX_SLAB_MAP_MASK) != mask) { + + for (m = (uintptr_t) 1 << NCX_SLAB_MAP_SHIFT, i = 0; + m & mask; + m <<= 1, i++) { + if ((page->slab & m)) { + continue; + } + + page->slab |= m; + + if ((page->slab & NCX_SLAB_MAP_MASK) == mask) { + prev = (ncx_slab_page_t *) + (page->prev & ~NCX_SLAB_PAGE_MASK); + prev->next = page->next; + page->next->prev = page->prev; + + page->next = NULL; + page->prev = NCX_SLAB_BIG; + } + + p = (page - pool->pages) << ncx_pagesize_shift; + p += i << shift; + p += (uintptr_t) pool->start; + + goto done; + } + } + + page = page->next; + + } while (page); + } + } + + page = ncx_slab_alloc_pages(pool, 1); + + if (page) { + if (shift < ncx_slab_exact_shift) { + p = (page - pool->pages) << ncx_pagesize_shift; + bitmap = (uintptr_t *) (pool->start + p); + + s = 1 << shift; + n = (1 << (ncx_pagesize_shift - shift)) / 8 / s; + + if (n == 0) { + n = 1; + } + + bitmap[0] = (2 << n) - 1; + + map = (1 << (ncx_pagesize_shift - shift)) / (sizeof(uintptr_t) * 8); + + for (i = 1; i < map; i++) { + bitmap[i] = 0; + } + + page->slab = shift; + page->next = &slots[slot]; + page->prev = (uintptr_t) &slots[slot] | NCX_SLAB_SMALL; + + slots[slot].next = page; + + p = ((page - pool->pages) << ncx_pagesize_shift) + s * n; + p += (uintptr_t) pool->start; + + goto done; + + } else if (shift == ncx_slab_exact_shift) { + + page->slab = 1; + page->next = &slots[slot]; + page->prev = (uintptr_t) &slots[slot] | NCX_SLAB_EXACT; + + slots[slot].next = page; + + p = (page - pool->pages) << ncx_pagesize_shift; + p += (uintptr_t) pool->start; + + goto done; + + } else { /* shift > ncx_slab_exact_shift */ + + page->slab = ((uintptr_t) 1 << NCX_SLAB_MAP_SHIFT) | shift; + page->next = &slots[slot]; + page->prev = (uintptr_t) &slots[slot] | NCX_SLAB_BIG; + + slots[slot].next = page; + + p = (page - pool->pages) << ncx_pagesize_shift; + p += (uintptr_t) pool->start; + + goto done; + } + } + + p = 0; + + done: + + return (void *) p; +} + + +void ncx_slab_free(ncx_slab_pool_t *pool, void *p) { + size_t size; + uintptr_t slab, m, *bitmap; + ncx_uint_t n, type, slot, shift, map; + ncx_slab_page_t *slots, *page; + + if ((u_char *) p < pool->start || (u_char *) p > pool->end) { +// error("ncx_slab_free(): outside of pool"); + goto fail; + } + + n = ((u_char *) p - pool->start) >> ncx_pagesize_shift; + page = &pool->pages[n]; + slab = page->slab; + type = page->prev & NCX_SLAB_PAGE_MASK; + + switch (type) { + + case NCX_SLAB_SMALL: + + shift = slab & NCX_SLAB_SHIFT_MASK; + size = 1 << shift; + + if ((uintptr_t) p & (size - 1)) { + goto wrong_chunk; + } + + n = ((uintptr_t) p & (ncx_pagesize - 1)) >> shift; + m = (uintptr_t) 1 << (n & (sizeof(uintptr_t) * 8 - 1)); + n /= (sizeof(uintptr_t) * 8); + bitmap = (uintptr_t *) ((uintptr_t) p & ~(ncx_pagesize - 1)); + + if (bitmap[n] & m) { + + if (page->next == NULL) { + slots = (ncx_slab_page_t *) + ((u_char *) pool + sizeof(ncx_slab_pool_t)); + slot = shift - pool->min_shift; + + page->next = slots[slot].next; + slots[slot].next = page; + + page->prev = (uintptr_t) &slots[slot] | NCX_SLAB_SMALL; + page->next->prev = (uintptr_t) page | NCX_SLAB_SMALL; + } + + bitmap[n] &= ~m; + + n = (1 << (ncx_pagesize_shift - shift)) / 8 / (1 << shift); + + if (n == 0) { + n = 1; + } + + if (bitmap[0] & ~(((uintptr_t) 1 << n) - 1)) { + goto done; + } + + map = (1 << (ncx_pagesize_shift - shift)) / (sizeof(uintptr_t) * 8); + + for (n = 1; n < map; n++) { + if (bitmap[n]) { + goto done; + } + } + + ncx_slab_free_pages(pool, page, 1); + + goto done; + } + + goto chunk_already_free; + + case NCX_SLAB_EXACT: + + m = (uintptr_t) 1 << + (((uintptr_t) p & (ncx_pagesize - 1)) >> ncx_slab_exact_shift); + size = ncx_slab_exact_size; + + if ((uintptr_t) p & (size - 1)) { + goto wrong_chunk; + } + + if (slab & m) { + if (slab == NCX_SLAB_BUSY) { + slots = (ncx_slab_page_t *) + ((u_char *) pool + sizeof(ncx_slab_pool_t)); + slot = ncx_slab_exact_shift - pool->min_shift; + + page->next = slots[slot].next; + slots[slot].next = page; + + page->prev = (uintptr_t) &slots[slot] | NCX_SLAB_EXACT; + page->next->prev = (uintptr_t) page | NCX_SLAB_EXACT; + } + + page->slab &= ~m; + + if (page->slab) { + goto done; + } + + ncx_slab_free_pages(pool, page, 1); + + goto done; + } + + goto chunk_already_free; + + case NCX_SLAB_BIG: + + shift = slab & NCX_SLAB_SHIFT_MASK; + size = 1 << shift; + + if ((uintptr_t) p & (size - 1)) { + goto wrong_chunk; + } + + m = (uintptr_t) 1 << ((((uintptr_t) p & (ncx_pagesize - 1)) >> shift) + + NCX_SLAB_MAP_SHIFT); + + if (slab & m) { + + if (page->next == NULL) { + slots = (ncx_slab_page_t *) + ((u_char *) pool + sizeof(ncx_slab_pool_t)); + slot = shift - pool->min_shift; + + page->next = slots[slot].next; + slots[slot].next = page; + + page->prev = (uintptr_t) &slots[slot] | NCX_SLAB_BIG; + page->next->prev = (uintptr_t) page | NCX_SLAB_BIG; + } + + page->slab &= ~m; + + if (page->slab & NCX_SLAB_MAP_MASK) { + goto done; + } + + ncx_slab_free_pages(pool, page, 1); + + goto done; + } + + goto chunk_already_free; + + case NCX_SLAB_PAGE: + + if ((uintptr_t) p & (ncx_pagesize - 1)) { + goto wrong_chunk; + } + + if (slab == NCX_SLAB_PAGE_FREE) { +// alert("ncx_slab_free(): page is already free"); + goto fail; + } + + if (slab == NCX_SLAB_PAGE_BUSY) { +// alert("ncx_slab_free(): pointer to wrong page"); + goto fail; + } + + n = ((u_char *) p - pool->start) >> ncx_pagesize_shift; + size = slab & ~NCX_SLAB_PAGE_START; + + ncx_slab_free_pages(pool, &pool->pages[n], size); + + return; + } + + /* not reached */ + + return; + + done: + + return; + + wrong_chunk: + +// error("ncx_slab_free(): pointer to wrong chunk"); + + goto fail; + + chunk_already_free: + +// error("ncx_slab_free(): chunk is already free"); + + fail: + + return; +} + + +static ncx_slab_page_t *ncx_slab_alloc_pages(ncx_slab_pool_t *pool, ncx_uint_t pages) { + ncx_slab_page_t *page, *p; + + for (page = pool->free.next; page != &pool->free; page = page->next) { + + if (page->slab >= pages) { + + if (page->slab > pages) { + page[pages].slab = page->slab - pages; + page[pages].next = page->next; + page[pages].prev = page->prev; + + p = (ncx_slab_page_t *) page->prev; + p->next = &page[pages]; + page->next->prev = (uintptr_t) &page[pages]; + + } else { + p = (ncx_slab_page_t *) page->prev; + p->next = page->next; + page->next->prev = page->prev; + } + + page->slab = pages | NCX_SLAB_PAGE_START; + page->next = NULL; + page->prev = NCX_SLAB_PAGE; + + if (--pages == 0) { + return page; + } + + for (p = page + 1; pages; pages--) { + p->slab = NCX_SLAB_PAGE_BUSY; + p->next = NULL; + p->prev = NCX_SLAB_PAGE; + p++; + } + + return page; + } + } + +// error("ncx_slab_alloc() failed: no memory"); + + return NULL; +} + +static void ncx_slab_free_pages(ncx_slab_pool_t *pool, ncx_slab_page_t *page, ncx_uint_t pages) { + ncx_slab_page_t *prev; + + if (pages > 1) { + ncx_memzero(&page[1], (pages - 1) * sizeof(ncx_slab_page_t)); + } + + if (page->next) { + prev = (ncx_slab_page_t *) (page->prev & ~NCX_SLAB_PAGE_MASK); + prev->next = page->next; + page->next->prev = page->prev; + } + + page->slab = pages; + page->prev = (uintptr_t) &pool->free; + page->next = pool->free.next; + page->next->prev = (uintptr_t) page; + + pool->free.next = page; + +#ifdef PAGE_MERGE + if (pool->pages != page) { + prev = page - 1; + if (ncx_slab_empty(pool, prev)) { + for (; prev >= pool->pages; prev--) { + if (prev->slab != 0) + { + pool->free.next = page->next; + page->next->prev = (uintptr_t) &pool->free; + + prev->slab += pages; + ncx_memzero(page, sizeof(ncx_slab_page_t)); + + page = prev; + + break; + } + } + } + } + + if ((page - pool->pages + page->slab) < ncx_real_pages) { + next = page + page->slab; + if (ncx_slab_empty(pool, next)) + { + prev = (ncx_slab_page_t *) (next->prev); + prev->next = next->next; + next->next->prev = next->prev; + + page->slab += next->slab; + ncx_memzero(next, sizeof(ncx_slab_page_t)); + } + } + +#endif +} + +void ncx_slab_stat(ncx_slab_pool_t *pool, ncx_slab_stat_t *stat) { + uintptr_t m, n, mask, slab; + uintptr_t *bitmap; + ncx_uint_t i, j, map, type, obj_size; + ncx_slab_page_t *page; + + ncx_memzero(stat, sizeof(ncx_slab_stat_t)); + + page = pool->pages; + stat->pages = (pool->end - pool->start) / ncx_pagesize; + + for (i = 0; i < stat->pages; i++) { + slab = page->slab; + type = page->prev & NCX_SLAB_PAGE_MASK; + + switch (type) { + + case NCX_SLAB_SMALL: + + n = (page - pool->pages) << ncx_pagesize_shift; + bitmap = (uintptr_t *) (pool->start + n); + + obj_size = 1 << slab; + map = (1 << (ncx_pagesize_shift - slab)) + / (sizeof(uintptr_t) * 8); + + for (j = 0; j < map; j++) { + for (m = 1; m; m <<= 1) { + if ((bitmap[j] & m)) { + stat->used_size += obj_size; + stat->b_small += obj_size; + } + + } + } + + stat->p_small++; + + break; + + case NCX_SLAB_EXACT: + + if (slab == NCX_SLAB_BUSY) { + stat->used_size += sizeof(uintptr_t) * 8 * ncx_slab_exact_size; + stat->b_exact += sizeof(uintptr_t) * 8 * ncx_slab_exact_size; + } else { + for (m = 1; m; m <<= 1) { + if (slab & m) { + stat->used_size += ncx_slab_exact_size; + stat->b_exact += ncx_slab_exact_size; + } + } + } + + stat->p_exact++; + + break; + + case NCX_SLAB_BIG: + + j = ncx_pagesize_shift - (slab & NCX_SLAB_SHIFT_MASK); + j = 1 << j; + j = ((uintptr_t) 1 << j) - 1; + mask = j << NCX_SLAB_MAP_SHIFT; + obj_size = 1 << (slab & NCX_SLAB_SHIFT_MASK); + + for (m = (uintptr_t) 1 << NCX_SLAB_MAP_SHIFT; m & mask; m <<= 1) { + if ((page->slab & m)) { + stat->used_size += obj_size; + stat->b_big += obj_size; + } + } + + stat->p_big++; + + break; + + case NCX_SLAB_PAGE: + + if (page->prev == NCX_SLAB_PAGE) { + slab = slab & ~NCX_SLAB_PAGE_START; + stat->used_size += slab * ncx_pagesize; + stat->b_page += slab * ncx_pagesize; + stat->p_page += slab; + + i += (slab - 1); + + break; + } + + default: + + if (slab > stat->max_free_pages) { + stat->max_free_pages = page->slab; + } + + stat->free_page += slab; + + i += (slab - 1); + + break; + } + + page = pool->pages + i + 1; + } + + stat->pool_size = pool->end - pool->start; + stat->used_pct = stat->used_size * 100 / stat->pool_size; +} + +static bool ncx_slab_empty(ncx_slab_pool_t *pool, ncx_slab_page_t *page) { + ncx_slab_page_t *prev; + + if (page->slab == 0) { + return true; + } + + //page->prev == PAGE | SMALL | EXACT | BIG + if (page->next == NULL) { + return false; + } + + prev = (ncx_slab_page_t *) (page->prev & ~NCX_SLAB_PAGE_MASK); + while (prev >= pool->pages) { + prev = (ncx_slab_page_t *) (prev->prev & ~NCX_SLAB_PAGE_MASK); + } + + if (prev == &pool->free) { + return true; + } + + return false; +} \ No newline at end of file diff --git a/src/mempool/mempool.h b/src/mempool/mempool.h new file mode 100644 index 0000000..e903158 --- /dev/null +++ b/src/mempool/mempool.h @@ -0,0 +1,62 @@ +#ifndef SIST2_MEMPOOL_H +#define SIST2_MEMPOOL_H + +#include +#include +#include +#include +#include + +typedef unsigned char u_char; +typedef uintptr_t ncx_uint_t; + +#ifndef NCX_ALIGNMENT +#define NCX_ALIGNMENT sizeof(unsigned long) +#endif + +#define ncx_align(d, a) (((d) + (a - 1)) & ~(a - 1)) +#define ncx_align_ptr(p, a) (u_char *) (((uintptr_t) (p) + ((uintptr_t) a - 1)) & ~((uintptr_t) a - 1)) + +#define ncx_memzero(buf, n) (void) memset(buf, 0, n) +#define ncx_memset(buf, c, n) (void) memset(buf, c, n) + +typedef struct ncx_slab_page_s ncx_slab_page_t; + +struct ncx_slab_page_s { + uintptr_t slab; + ncx_slab_page_t *next; + uintptr_t prev; +}; + +typedef struct { + size_t min_size; + size_t min_shift; + + ncx_slab_page_t *pages; + ncx_slab_page_t free; + + u_char *start; + u_char *end; + + //ncx_shmtx_t mutex; + + void *addr; +} ncx_slab_pool_t; + +typedef struct { + size_t pool_size, used_size, used_pct; + size_t pages, free_page; + size_t p_small, p_exact, p_big, p_page; + size_t b_small, b_exact, b_big, b_page; + size_t max_free_pages; +} ncx_slab_stat_t; + +void ncx_slab_init(ncx_slab_pool_t *mempool); + +void *ncx_slab_alloc(ncx_slab_pool_t *mempool, size_t size); + +void ncx_slab_free(ncx_slab_pool_t *mempool, void *p); + +void ncx_slab_stat(ncx_slab_pool_t *mempool, ncx_slab_stat_t *stat); + +#endif //SIST2_MEMPOOL_H diff --git a/src/parsing/parse.c b/src/parsing/parse.c index b6882bc..a792ad6 100644 --- a/src/parsing/parse.c +++ b/src/parsing/parse.c @@ -56,9 +56,20 @@ void set_dbg_current_file(parse_job_t *job) { pthread_mutex_unlock(&ScanCtx.dbg_current_files_mu); } -void parse(void *arg) { +void parse_job(parse_job_t *job) { + tpool_work_arg_shm_t *arg = malloc(sizeof(tpool_work_arg_shm_t) + sizeof(*job)); - parse_job_t *job = arg; + memcpy(arg->arg, job, sizeof(*job)); + arg->arg_size = -1; + + parse(arg); + + free(arg); +} + +void parse(tpool_work_arg_shm_t *arg) { + + parse_job_t *job = (void*)arg->arg; document_t *doc = malloc(sizeof(document_t)); @@ -74,11 +85,11 @@ void parse(void *arg) { doc->meta_head = NULL; doc->meta_tail = NULL; doc->mime = 0; - doc->size = job->vfile.info.st_size; - doc->mtime = (int) job->vfile.info.st_mtim.tv_sec; + doc->size = job->vfile.st_size; + doc->mtime = (int) job->vfile.mtime; int inc_ts = incremental_get(ScanCtx.original_table, doc->doc_id); - if (inc_ts != 0 && inc_ts == job->vfile.info.st_mtim.tv_sec) { + if (inc_ts != 0 && inc_ts == job->vfile.mtime) { pthread_mutex_lock(&ScanCtx.copy_table_mu); incremental_mark_file(ScanCtx.copy_table, doc->doc_id); pthread_mutex_unlock(&ScanCtx.copy_table_mu); @@ -88,7 +99,6 @@ void parse(void *arg) { pthread_mutex_unlock(&ScanCtx.dbg_file_counts_mu); CLOSE_FILE(job->vfile) - free(doc->filepath); free(doc); return; @@ -106,13 +116,16 @@ void parse(void *arg) { LOG_DEBUGF(job->filepath, "Starting parse job {%s}", doc->doc_id) } - if (job->vfile.info.st_size == 0) { + if (job->ext > 4096) { + fprintf(stderr, "Ext is %d, filename is %s\n", job->ext, job->filepath); + } + + if (job->vfile.st_size == 0) { doc->mime = MIME_EMPTY; } else if (*(job->filepath + job->ext) != '\0' && (job->ext - job->base != 1)) { doc->mime = mime_get_mime_by_ext(ScanCtx.ext_table, job->filepath + job->ext); } - if (doc->mime == 0 && !ScanCtx.fast) { // Get mime type with libmagic @@ -136,7 +149,6 @@ void parse(void *arg) { pthread_mutex_unlock(&ScanCtx.dbg_file_counts_mu); CLOSE_FILE(job->vfile) - free(doc->filepath); free(doc); return; @@ -210,7 +222,6 @@ void parse(void *arg) { } else if (doc->mime == MIME_SIST2_SIDECAR) { parse_sidecar(&job->vfile, doc); CLOSE_FILE(job->vfile) - free(doc->filepath); free(doc); return; } else if (is_msdoc(&ScanCtx.msdoc_ctx, doc->mime)) { diff --git a/src/parsing/parse.h b/src/parsing/parse.h index a62dcc4..55ecf18 100644 --- a/src/parsing/parse.h +++ b/src/parsing/parse.h @@ -2,6 +2,7 @@ #define SIST2_PARSE_H #include "../sist.h" +#include "src/tpool.h" #define MAGIC_BUF_SIZE (4096 * 6) @@ -9,7 +10,8 @@ int fs_read(struct vfile *f, void *buf, size_t size); void fs_close(struct vfile *f); void fs_reset(struct vfile *f); -void parse(void *arg); +void parse_job(parse_job_t *job); +void parse(tpool_work_arg_shm_t *arg); void cleanup_parse(); diff --git a/src/tpool.c b/src/tpool.c index adf4423..7abea86 100644 --- a/src/tpool.c +++ b/src/tpool.c @@ -2,13 +2,14 @@ #include "ctx.h" #include "sist.h" #include +#include +#include +#include "mempool/mempool.h" -#define MAX_QUEUE_SIZE 1000000 - -typedef void (*thread_func_t)(void *arg); +#define MAX_QUEUE_SIZE 5000 typedef struct tpool_work { - void *arg; + tpool_work_arg_shm_t *arg; thread_func_t func; struct tpool_work *next; } tpool_work_t; @@ -18,11 +19,12 @@ typedef struct tpool { tpool_work_t *work_tail; pthread_mutex_t work_mutex; + pthread_mutex_t mem_mutex; pthread_cond_t has_work_cond; pthread_cond_t working_cond; - pthread_t *threads; + pthread_t threads[256]; int thread_cnt; int work_cnt; @@ -32,28 +34,46 @@ typedef struct tpool { size_t mem_limit; size_t page_size; - int free_arg; int stop; int waiting; int print_progress; void (*cleanup_func)(); + + // ========= + + void *shared_memory; + size_t shared_memory_size; + ncx_slab_pool_t *mempool; } tpool_t; /** * Create a work object */ -static tpool_work_t *tpool_work_create(thread_func_t func, void *arg) { +static tpool_work_t *tpool_work_create(tpool_t *pool, thread_func_t func, tpool_work_arg_t *arg) { if (func == NULL) { return NULL; } - tpool_work_t *work = malloc(sizeof(tpool_work_t)); + // Copy heap arg to shm arg + pthread_mutex_lock(&pool->mem_mutex); + + tpool_work_arg_shm_t *shm_arg = ncx_slab_alloc(pool->mempool, sizeof(tpool_work_arg_shm_t) + arg->arg_size); + + shm_arg->arg_size = arg->arg_size; + memcpy(shm_arg->arg, arg->arg, arg->arg_size); + + free(arg->arg); + + tpool_work_t *work = ncx_slab_alloc(pool->mempool, sizeof(tpool_work_t)); + + pthread_mutex_unlock(&pool->mem_mutex); + work->func = func; - work->arg = arg; + work->arg = shm_arg; work->next = NULL; return work; @@ -90,16 +110,15 @@ static tpool_work_t *tpool_work_get(tpool_t *pool) { /** * Push work object to thread pool */ -int tpool_add_work(tpool_t *pool, thread_func_t func, void *arg) { - - tpool_work_t *work = tpool_work_create(func, arg); - if (work == NULL) { - return 0; - } +int tpool_add_work(tpool_t *pool, thread_func_t func, tpool_work_arg_t *arg) { while ((pool->work_cnt - pool->done_cnt) >= MAX_QUEUE_SIZE) { usleep(10000); } + tpool_work_t *work = tpool_work_create(pool, func, arg); + if (work == NULL) { + return 0; + } pthread_mutex_lock(&(pool->work_mutex)); if (pool->work_head == NULL) { @@ -118,127 +137,92 @@ int tpool_add_work(tpool_t *pool, thread_func_t func, void *arg) { return 1; } -/** - * see: https://github.com/htop-dev/htop/blob/f782f821f7f8081cb43bbad1c37f32830a260a81/linux/LinuxProcessList.c - */ -__always_inline -static size_t _get_total_mem(tpool_t *pool) { - FILE *statmfile = fopen("/proc/self/statm", "r"); - if (!statmfile) - return 0; - - long int dummy, dummy2, dummy3, dummy4, dummy5, dummy6; - long int m_resident; - - int r = fscanf(statmfile, "%ld %ld %ld %ld %ld %ld %ld", - &dummy, /* m_virt */ - &m_resident, - &dummy2, /* m_share */ - &dummy3, /* m_trs */ - &dummy4, /* unused since Linux 2.6; always 0 */ - &dummy5, /* m_drs */ - &dummy6); /* unused since Linux 2.6; always 0 */ - fclose(statmfile); - - if (r == 7) { - return m_resident * pool->page_size; - } else { - return 0; - } -} - /** * Thread worker function */ static void *tpool_worker(void *arg) { tpool_t *pool = arg; - int stuck_notified = 0; - int throttle_ms = 0; - while (TRUE) { - pthread_mutex_lock(&pool->work_mutex); - if (pool->stop) { - break; - } - - if (pool->work_head == NULL) { - pthread_cond_wait(&(pool->has_work_cond), &(pool->work_mutex)); - } - - tpool_work_t *work = tpool_work_get(pool); - if (work != NULL) { - pool->busy_cnt += 1; - } - - pthread_mutex_unlock(&(pool->work_mutex)); - - if (work != NULL) { - stuck_notified = 0; - throttle_ms = 0; - while (!pool->stop && pool->mem_limit > 0 && _get_total_mem(pool) >= pool->mem_limit) { - if (!stuck_notified && throttle_ms >= 90000) { - // notify the pool that this thread is stuck. - pthread_mutex_lock(&(pool->work_mutex)); - pool->throttle_stuck_cnt += 1; - if (pool->throttle_stuck_cnt == pool->thread_cnt) { - LOG_ERROR("tpool.c", "Throttle memory limit too low, cannot proceed!"); - pool->stop = TRUE; - } - pthread_mutex_unlock(&(pool->work_mutex)); - stuck_notified = 1; - } - usleep(10000); - throttle_ms += 10; - } + int pid = fork(); + if (pid == 0) { + while (TRUE) { + pthread_mutex_lock(&pool->work_mutex); if (pool->stop) { break; } - // we are not stuck anymore. cancel our notification. - if (stuck_notified) { - pthread_mutex_lock(&(pool->work_mutex)); - pool->throttle_stuck_cnt -= 1; - pthread_mutex_unlock(&(pool->work_mutex)); + if (pool->work_head == NULL) { + pthread_cond_wait(&(pool->has_work_cond), &(pool->work_mutex)); } - work->func(work->arg); - if (pool->free_arg) { - free(work->arg); + tpool_work_t *work = tpool_work_get(pool); + + if (work != NULL) { + pool->busy_cnt += 1; } - free(work); + + pthread_mutex_unlock(&(pool->work_mutex)); + + if (work != NULL) { + if (pool->stop) { + break; + } + + work->func(work->arg); + + pthread_mutex_lock(&pool->mem_mutex); + ncx_slab_free(pool->mempool, work->arg); + ncx_slab_free(pool->mempool, work); + pthread_mutex_unlock(&pool->mem_mutex); + } + + pthread_mutex_lock(&(pool->work_mutex)); + if (work != NULL) { + pool->busy_cnt -= 1; + pool->done_cnt++; + } + + if (pool->print_progress) { + if (LogCtx.json_logs) { + progress_bar_print_json(pool->done_cnt, pool->work_cnt, ScanCtx.stat_tn_size, + ScanCtx.stat_index_size, pool->waiting); + } else { + progress_bar_print((double) pool->done_cnt / pool->work_cnt, ScanCtx.stat_tn_size, + ScanCtx.stat_index_size); + } + } + + if (pool->work_head == NULL) { + pthread_cond_signal(&(pool->working_cond)); + } + pthread_mutex_unlock(&(pool->work_mutex)); } - pthread_mutex_lock(&(pool->work_mutex)); - if (work != NULL) { + if (pool->cleanup_func != NULL) { + LOG_INFO("tpool.c", "Executing cleanup function") + pool->cleanup_func(); + LOG_DEBUG("tpool.c", "Done executing cleanup function") + } + + pthread_cond_signal(&(pool->working_cond)); + pthread_mutex_unlock(&(pool->work_mutex)); + exit(0); + + } else { + int status; + waitpid(pid, &status, 0); + + LOG_ERRORF("tpool.c", "child processed terminated with status code %d, signal=%d", WEXITSTATUS(status), WIFSTOPPED(status) ? WSTOPSIG(status) : -1) + + if (WIFSTOPPED(status)) { + pthread_mutex_lock(&(pool->work_mutex)); pool->busy_cnt -= 1; pool->done_cnt++; + pthread_mutex_unlock(&(pool->work_mutex)); } - - if (pool->print_progress) { - if (LogCtx.json_logs) { - progress_bar_print_json(pool->done_cnt, pool->work_cnt, ScanCtx.stat_tn_size, - ScanCtx.stat_index_size, pool->waiting); - } else { - progress_bar_print((double) pool->done_cnt / pool->work_cnt, ScanCtx.stat_tn_size, - ScanCtx.stat_index_size); - } - } - - if (pool->work_head == NULL) { - pthread_cond_signal(&(pool->working_cond)); - } - pthread_mutex_unlock(&(pool->work_mutex)); } - if (pool->cleanup_func != NULL) { - LOG_INFO("tpool.c", "Executing cleanup function") - pool->cleanup_func(); - LOG_DEBUG("tpool.c", "Done executing cleanup function") - } - - pthread_cond_signal(&(pool->working_cond)); - pthread_mutex_unlock(&(pool->work_mutex)); return NULL; } @@ -304,17 +288,32 @@ void tpool_destroy(tpool_t *pool) { pthread_cond_destroy(&(pool->has_work_cond)); pthread_cond_destroy(&(pool->working_cond)); - free(pool->threads); - free(pool); + munmap(pool->shared_memory, pool->shared_memory_size); } /** * Create a thread pool * @param thread_cnt Worker threads count */ -tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int print_progress, size_t mem_limit) { +tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int print_progress, size_t mem_limit) { + + // ============= + size_t shm_size = 1024 * 1024 * 2000; + + void *shared_memory = mmap(NULL, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + + tpool_t *pool = (tpool_t *) shared_memory; + pool->shared_memory = shared_memory; + pool->shared_memory_size = shm_size; + pool->mempool = (ncx_slab_pool_t *) (pool->shared_memory + sizeof(tpool_t)); + pool->mempool->addr = pool->mempool; + pool->mempool->min_shift = 4; + pool->mempool->end = pool->shared_memory + shm_size; + + ncx_slab_init(pool->mempool); + + // ============= - tpool_t *pool = malloc(sizeof(tpool_t)); pool->thread_cnt = thread_cnt; pool->work_cnt = 0; pool->done_cnt = 0; @@ -323,16 +322,24 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int pri pool->mem_limit = mem_limit; pool->stop = FALSE; pool->waiting = FALSE; - pool->free_arg = free_arg; pool->cleanup_func = cleanup_func; - pool->threads = calloc(sizeof(pthread_t), thread_cnt); + memset(pool->threads, 0, sizeof(pool->threads)); pool->print_progress = print_progress; pool->page_size = getpagesize(); - pthread_mutex_init(&(pool->work_mutex), NULL); + pthread_mutexattr_t mutexattr; + pthread_mutexattr_init(&mutexattr); + pthread_mutexattr_setpshared(&mutexattr, TRUE); - pthread_cond_init(&(pool->has_work_cond), NULL); - pthread_cond_init(&(pool->working_cond), NULL); + pthread_mutex_init(&(pool->work_mutex), &mutexattr); + pthread_mutex_init(&(pool->mem_mutex), &mutexattr); + + pthread_condattr_t condattr; + pthread_condattr_init(&condattr); + pthread_condattr_setpshared(&condattr, TRUE); + + pthread_cond_init(&(pool->has_work_cond), &condattr); + pthread_cond_init(&(pool->working_cond),&condattr); pool->work_head = NULL; pool->work_tail = NULL; diff --git a/src/tpool.h b/src/tpool.h index 71d202f..c1898f3 100644 --- a/src/tpool.h +++ b/src/tpool.h @@ -6,13 +6,26 @@ struct tpool; typedef struct tpool tpool_t; -typedef void (*thread_func_t)(void *arg); +typedef struct { + size_t arg_size; + void *arg; +} tpool_work_arg_t; + +typedef struct { + size_t arg_size; + char arg[0]; +} tpool_work_arg_shm_t; + +typedef void (*thread_func_t)(tpool_work_arg_shm_t *arg); + +tpool_t *tpool_create(int num, void (*cleanup_func)(), int print_progress, size_t mem_limit); -tpool_t *tpool_create(int num, void (*cleanup_func)(), int free_arg, int print_progress, size_t mem_limit); void tpool_start(tpool_t *pool); + void tpool_destroy(tpool_t *pool); -int tpool_add_work(tpool_t *pool, thread_func_t func, void *arg); +int tpool_add_work(tpool_t *pool, thread_func_t func, tpool_work_arg_t *arg); + void tpool_wait(tpool_t *pool); void tpool_dump_debug_info(tpool_t *pool); diff --git a/src/util.c b/src/util.c index f58cb6d..79d5bd2 100644 --- a/src/util.c +++ b/src/util.c @@ -103,7 +103,9 @@ void progress_bar_print_json(size_t done, size_t count, size_t tn_size, size_t i void progress_bar_print(double percentage, size_t tn_size, size_t index_size) { + // TODO: Fix this with shm/ctx static int last_val = -1; + int val = (int) (percentage * 100); if (last_val == val || val > 100) { return; diff --git a/third-party/libscan/libscan/arc/arc.c b/third-party/libscan/libscan/arc/arc.c index e547231..092bc8c 100644 --- a/third-party/libscan/libscan/arc/arc.c +++ b/third-party/libscan/libscan/arc/arc.c @@ -188,14 +188,13 @@ scan_code_t parse_archive(scan_arc_ctx_t *ctx, vfile_t *f, document_t *doc, pcre } else { - parse_job_t *sub_job = malloc(sizeof(parse_job_t) + PATH_MAX * 2); + parse_job_t *sub_job = malloc(sizeof(parse_job_t)); sub_job->vfile.close = arc_close; sub_job->vfile.read = arc_read; sub_job->vfile.read_rewindable = arc_read_rewindable; sub_job->vfile.reset = NULL; sub_job->vfile.arc = a; - sub_job->vfile.filepath = sub_job->filepath; sub_job->vfile.is_fs_file = FALSE; sub_job->vfile.rewind_buffer_size = 0; sub_job->vfile.rewind_buffer = NULL; @@ -206,22 +205,29 @@ scan_code_t parse_archive(scan_arc_ctx_t *ctx, vfile_t *f, document_t *doc, pcre strcpy(sub_job->parent, doc->doc_id); while (archive_read_next_header(a, &entry) == ARCHIVE_OK) { - sub_job->vfile.info = *archive_entry_stat(entry); + struct stat entry_stat = *archive_entry_stat(entry); + sub_job->vfile.st_mode = entry_stat.st_mode; + sub_job->vfile.st_size = entry_stat.st_size; + sub_job->vfile.mtime = (int) entry_stat.st_mtim.tv_sec; - double decompressed_size_ratio = (double) sub_job->vfile.info.st_size / (double) f->info.st_size; + double decompressed_size_ratio = (double) sub_job->vfile.st_size / (double) f->st_size; if (decompressed_size_ratio > MAX_DECOMPRESSED_SIZE_RATIO) { - CTX_LOG_DEBUGF("arc.c", "Skipped %s, possible zip bomb (decompressed_size_ratio=%f)", sub_job->filepath, decompressed_size_ratio) + CTX_LOG_DEBUGF("arc.c", "Skipped %s, possible zip bomb (decompressed_size_ratio=%f)", sub_job->filepath, + decompressed_size_ratio) continue; } - if (S_ISREG(sub_job->vfile.info.st_mode)) { + if (S_ISREG(sub_job->vfile.st_mode)) { const char *utf8_name = archive_entry_pathname_utf8(entry); if (utf8_name == NULL) { - sprintf(sub_job->filepath, "%s#/%s", f->filepath, archive_entry_pathname(entry)); + snprintf(sub_job->filepath, sizeof(sub_job->filepath), "%s#/%s", f->filepath, + archive_entry_pathname(entry)); + strcpy(sub_job->vfile.filepath, sub_job->filepath); } else { - sprintf(sub_job->filepath, "%s#/%s", f->filepath, utf8_name); + snprintf(sub_job->filepath, sizeof(sub_job->filepath), "%s#/%s", f->filepath, utf8_name); + strcpy(sub_job->vfile.filepath, sub_job->filepath); } sub_job->base = (int) (strrchr(sub_job->filepath, '/') - sub_job->filepath) + 1; diff --git a/third-party/libscan/libscan/ebook/ebook.c b/third-party/libscan/libscan/ebook/ebook.c index b0d6e78..acf70fd 100644 --- a/third-party/libscan/libscan/ebook/ebook.c +++ b/third-party/libscan/libscan/ebook/ebook.c @@ -1,28 +1,34 @@ #include "ebook.h" #include -#include #include #include "../media/media.h" #include "../arc/arc.h" #include "../ocr/ocr.h" +#if EBOOK_LOCKS +#include +pthread_mutex_t Mutex; +#endif + /* fill_image callback doesn't let us pass opaque pointers unless I create my own device */ __thread text_buffer_t thread_buffer; __thread scan_ebook_ctx_t thread_ctx; -pthread_mutex_t Mutex; - static void my_fz_lock(UNUSED(void *user), int lock) { +#if EBOOK_LOCKS if (lock == FZ_LOCK_FREETYPE) { pthread_mutex_lock(&Mutex); } +#endif } static void my_fz_unlock(UNUSED(void *user), int lock) { +#if EBOOK_LOCKS if (lock == FZ_LOCK_FREETYPE) { pthread_mutex_unlock(&Mutex); } +#endif } @@ -187,11 +193,13 @@ void fz_warn_callback(void *user, const char *message) { static void init_fzctx(fz_context *fzctx, document_t *doc) { fz_register_document_handlers(fzctx); +#if EBOOK_LOCKS static int mu_is_initialized = FALSE; if (!mu_is_initialized) { pthread_mutex_init(&Mutex, NULL); mu_is_initialized = TRUE; } +#endif fzctx->warn.print_user = doc; fzctx->warn.print = fz_warn_callback; diff --git a/third-party/libscan/libscan/ebook/ebook.h b/third-party/libscan/libscan/ebook/ebook.h index cae9d9a..7ee37c1 100644 --- a/third-party/libscan/libscan/ebook/ebook.h +++ b/third-party/libscan/libscan/ebook/ebook.h @@ -9,7 +9,6 @@ typedef struct { int enable_tn; const char *tesseract_lang; const char *tesseract_path; - pthread_mutex_t mupdf_mutex; log_callback_t log; logf_callback_t logf; diff --git a/third-party/libscan/libscan/json/json.c b/third-party/libscan/libscan/json/json.c index 3ba69f0..ef93405 100644 --- a/third-party/libscan/libscan/json/json.c +++ b/third-party/libscan/libscan/json/json.c @@ -32,7 +32,7 @@ int json_extract_text(cJSON *json, text_buffer_t *tex) { scan_code_t parse_json(scan_json_ctx_t *ctx, vfile_t *f, document_t *doc) { - if (f->info.st_size > JSON_MAX_FILE_SIZE) { + if (f->st_size > JSON_MAX_FILE_SIZE) { CTX_LOG_WARNINGF("json.c", "File larger than maximum allowed [%s]", f->filepath) return SCAN_ERR_SKIP; } diff --git a/third-party/libscan/libscan/media/media.c b/third-party/libscan/libscan/media/media.c index 956d3a4..fe9360e 100644 --- a/third-party/libscan/libscan/media/media.c +++ b/third-party/libscan/libscan/media/media.c @@ -687,7 +687,7 @@ long memfile_seek(void *ptr, long offset, int whence) { } int memfile_open(vfile_t *f, memfile_t *mem) { - mem->size = f->info.st_size; + mem->size = f->st_size; mem->buf = malloc(mem->size); if (mem->buf == NULL) { @@ -737,16 +737,16 @@ void parse_media_vfile(scan_media_ctx_t *ctx, struct vfile *f, document_t *doc, const char *filepath = get_filepath_with_ext(doc, f->filepath, mime_str); - if (f->info.st_size <= ctx->max_media_buffer) { + if (f->st_size <= ctx->max_media_buffer) { int ret = memfile_open(f, &memfile); if (ret == 0) { - CTX_LOG_DEBUGF(f->filepath, "Loading media file in memory (%ldB)", f->info.st_size) + CTX_LOG_DEBUGF(f->filepath, "Loading media file in memory (%ldB)", f->st_size) io_ctx = avio_alloc_context(buffer, AVIO_BUF_SIZE, 0, &memfile, memfile_read, NULL, memfile_seek); } } if (io_ctx == NULL) { - CTX_LOG_DEBUGF(f->filepath, "Reading media file without seek support", f->info.st_size) + CTX_LOG_DEBUGF(f->filepath, "Reading media file without seek support", f->st_size) io_ctx = avio_alloc_context(buffer, AVIO_BUF_SIZE, 0, f, vfile_read, NULL, NULL); } diff --git a/third-party/libscan/libscan/scan.h b/third-party/libscan/libscan/scan.h index 187161e..fd3fd1f 100644 --- a/third-party/libscan/libscan/scan.h +++ b/third-party/libscan/libscan/scan.h @@ -51,6 +51,8 @@ typedef int scan_code_t; #define SIST_DOC_ID_LEN MD5_STR_LENGTH #define SIST_INDEX_ID_LEN MD5_STR_LENGTH +#define EBOOK_LOCKS 0 + enum metakey { // String MetaContent = 1, @@ -100,7 +102,6 @@ typedef struct meta_line { union { char str_val[0]; unsigned long long_val; - double double_val; }; } meta_line_t; @@ -114,7 +115,7 @@ typedef struct document { short ext; meta_line_t *meta_head; meta_line_t *meta_tail; - char filepath[PATH_MAX]; + char filepath[PATH_MAX * 2 + 1]; } document_t; typedef struct vfile vfile_t; @@ -139,8 +140,11 @@ typedef struct vfile { int is_fs_file; int has_checksum; int calculate_checksum; - const char *filepath; - struct stat info; + char filepath[PATH_MAX * 2 + 1]; + + int mtime; + size_t st_size; + unsigned int st_mode; SHA_CTX sha1_ctx; unsigned char sha1_digest[SHA1_DIGEST_LENGTH]; @@ -162,7 +166,7 @@ typedef struct parse_job_t { int ext; struct vfile vfile; char parent[SIST_DOC_ID_LEN]; - char filepath[PATH_MAX]; + char filepath[PATH_MAX * 2 + 1]; } parse_job_t; diff --git a/third-party/libscan/libscan/text/text.c b/third-party/libscan/libscan/text/text.c index ba9403e..b4ffe33 100644 --- a/third-party/libscan/libscan/text/text.c +++ b/third-party/libscan/libscan/text/text.c @@ -2,7 +2,7 @@ scan_code_t parse_text(scan_text_ctx_t *ctx, vfile_t *f, document_t *doc) { - int to_read = MIN(ctx->content_size, f->info.st_size); + int to_read = MIN(ctx->content_size, f->st_size); if (to_read <= 2) { return SCAN_OK; @@ -39,7 +39,7 @@ scan_code_t parse_text(scan_text_ctx_t *ctx, vfile_t *f, document_t *doc) { scan_code_t parse_markup(scan_text_ctx_t *ctx, vfile_t *f, document_t *doc) { - int to_read = MIN(MAX_MARKUP_SIZE, f->info.st_size); + int to_read = MIN(MAX_MARKUP_SIZE, f->st_size); char *buf = malloc(to_read + 1); int ret = f->read(f, buf, to_read); diff --git a/third-party/libscan/libscan/util.h b/third-party/libscan/libscan/util.h index 33923fa..09f7ad5 100644 --- a/third-party/libscan/libscan/util.h +++ b/third-party/libscan/libscan/util.h @@ -325,10 +325,10 @@ static int text_buffer_append_markup(text_buffer_t *buf, const char *markup) { } static void *read_all(vfile_t *f, size_t *size) { - void *buf = malloc(f->info.st_size); - *size = f->read(f, buf, f->info.st_size); + void *buf = malloc(f->st_size); + *size = f->read(f, buf, f->st_size); - if (*size != f->info.st_size) { + if (*size != f->st_size) { free(buf); return NULL; } diff --git a/third-party/libscan/test/test_util.cpp b/third-party/libscan/test/test_util.cpp index ee78f10..fb9aa5c 100644 --- a/third-party/libscan/test/test_util.cpp +++ b/third-party/libscan/test/test_util.cpp @@ -50,14 +50,20 @@ void cleanup(document_t *doc, vfile_t *f) { } void load_file(const char *filepath, vfile_t *f) { - stat(filepath, &f->info); + struct stat info = {}; + stat(filepath, &info); + + f->mtime = (int)info.st_mtim.tv_sec; + f->st_size = info.st_size; + f->st_mode = info.st_mode; + f->fd = open(filepath, O_RDONLY); if (f->fd == -1) { FAIL() << FILE_NOT_FOUND_ERR; } - f->filepath = filepath; + memcpy(f->filepath, filepath, sizeof(f->filepath)); f->read = fs_read; f->close = fs_close; f->is_fs_file = TRUE; @@ -66,9 +72,9 @@ void load_file(const char *filepath, vfile_t *f) { } void load_mem(void *mem, size_t size, vfile_t *f) { - f->filepath = "_mem_"; + memcpy(f->filepath, "_mem_", strlen("_mem_")); f->_test_data = mem; - f->info.st_size = (int) size; + f->st_size = size; f->read = mem_read; f->close = nullptr; f->is_fs_file = TRUE;