sist2/src/io/store.c
2019-10-26 12:42:25 -04:00

106 lines
2.6 KiB
C

#include "store.h"
#include "src/ctx.h"
store_t *store_create(char *path) {
store_t *store = malloc(sizeof(struct store_t));
pthread_rwlock_init(&store->lock, NULL);
mdb_env_create(&store->env);
int open_ret = mdb_env_open(store->env,
path,
MDB_WRITEMAP | MDB_MAPASYNC,
S_IRUSR | S_IWUSR
);
if (open_ret != 0) {
fprintf(stderr, "Error while opening store: %s", mdb_strerror(open_ret));
exit(1);
}
store->size = (size_t) 1024 * 1024 * 5;
ScanCtx.stat_tn_size = 0;
mdb_env_set_mapsize(store->env, store->size);
// Open dbi
MDB_txn *txn;
mdb_txn_begin(store->env, NULL, 0, &txn);
mdb_dbi_open(txn, NULL, 0, &store->dbi);
mdb_txn_commit(txn);
return store;
}
void store_destroy(store_t *store) {
pthread_rwlock_destroy(&store->lock);
mdb_close(store->env, store->dbi);
mdb_env_close(store->env);
free(store);
}
void store_write(store_t *store, char *key, size_t key_len, char *buf, size_t buf_len) {
MDB_val mdb_key;
mdb_key.mv_data = key;
mdb_key.mv_size = key_len;
MDB_val mdb_value;
mdb_value.mv_data = buf;
mdb_value.mv_size = buf_len;
MDB_txn *txn;
pthread_rwlock_rdlock(&store->lock);
mdb_txn_begin(store->env, NULL, 0, &txn);
int put_ret = mdb_put(txn, store->dbi, &mdb_key, &mdb_value, 0);
ScanCtx.stat_tn_size += buf_len;
if (put_ret == MDB_MAP_FULL) {
mdb_txn_abort(txn);
pthread_rwlock_unlock(&store->lock);
// Cannot resize when there is a opened transaction.
// Resize take effect on the next commit.
pthread_rwlock_wrlock(&store->lock);
store->size += 1024 * 1024 * 5;
mdb_env_set_mapsize(store->env, store->size);
mdb_txn_begin(store->env, NULL, 0, &txn);
put_ret = mdb_put(txn, store->dbi, &mdb_key, &mdb_value, 0);
}
mdb_txn_commit(txn);
pthread_rwlock_unlock(&store->lock);
if (put_ret != 0) {
printf("%s\n", mdb_strerror(put_ret));
}
}
char *store_read(store_t *store, char *key, size_t key_len, size_t *ret_vallen) {
char *buf = NULL;
MDB_val mdb_key;
mdb_key.mv_data = key;
mdb_key.mv_size = key_len;
MDB_val mdb_value;
MDB_txn *txn;
mdb_txn_begin(store->env, NULL, MDB_RDONLY, &txn);
int get_ret = mdb_get(txn, store->dbi, &mdb_key, &mdb_value);
if (get_ret == MDB_NOTFOUND) {
*ret_vallen = 0;
} else {
*ret_vallen = mdb_value.mv_size;
buf = malloc(mdb_value.mv_size);
memcpy(buf, mdb_value.mv_data, mdb_value.mv_size);
}
mdb_txn_abort(txn);
return buf;
}