Fix some memory leaks, fix tests, fix --print regression

This commit is contained in:
simon987 2022-02-11 11:09:29 -05:00
parent 37919932de
commit 8fa34da02f
9 changed files with 83 additions and 47 deletions

2
scripts/start_dev_es.sh Executable file
View File

@ -0,0 +1,2 @@
docker run --rm -it -p 9200:9200 -e "discovery.type=single-node" \
-e "ES_JAVA_OPTS=-Xms8g -Xmx8g" elasticsearch:7.14.0

View File

@ -65,6 +65,10 @@ void index_args_destroy(index_args_t *args) {
if (args->es_settings_path) {
free(args->es_settings);
}
if (args->index_path != NULL) {
free(args->index_path);
}
free(args);
}

View File

@ -44,7 +44,7 @@ int scan_args_validate(scan_args_t *args, int argc, const char **argv);
typedef struct index_args {
char *es_url;
char *es_index;
const char *index_path;
char *index_path;
const char *script_path;
char *script;
const char *es_settings_path;

View File

@ -86,6 +86,10 @@ typedef struct {
GHashTable *tags;
store_t *meta_store;
GHashTable *meta;
/**
* Set to false when using --print
*/
int needs_es_connection;
} IndexCtx_t;
typedef struct {

View File

@ -15,19 +15,34 @@ typedef struct es_indexer {
} es_indexer_t;
static __thread es_indexer_t *Indexer;
static __thread es_indexer_t *Indexer = NULL;
void free_queue(int max);
void elastic_flush();
void elastic_cleanup() {
elastic_flush();
if (Indexer != NULL) {
free(Indexer->es_index);
free(Indexer->es_url);
free(Indexer);
void destroy_indexer(es_indexer_t *indexer) {
if (indexer == NULL) {
return;
}
LOG_DEBUG("elastic.c", "Destroying indexer")
if (indexer->es_url != NULL) {
free(indexer->es_url);
free(indexer->es_index);
}
free(indexer);
}
void elastic_cleanup() {
if (IndexCtx.needs_es_connection) {
elastic_flush();
}
destroy_indexer(Indexer);
}
void print_json(cJSON *document, const char id_str[MD5_STR_LENGTH]) {
@ -53,10 +68,10 @@ void index_json_func(void *arg) {
}
void delete_document(const char* document_id_str, void* UNUSED(_data)) {
size_t id_len = strlen(document_id_str);
es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t));
bulk_line->type = ES_BULK_LINE_DELETE;
bulk_line->next = NULL;
memcpy(bulk_line->path_md5_str, document_id_str, MD5_STR_LENGTH);
tpool_add_work(IndexCtx.pool, index_json_func, bulk_line);
}
@ -337,16 +352,22 @@ void elastic_index_line(es_bulk_line_t *line) {
es_indexer_t *create_indexer(const char *url, const char *index) {
char *es_url = malloc(strlen(url) + 1);
strcpy(es_url, url);
char *es_index = malloc(strlen(index) + 1);
strcpy(es_index, index);
es_indexer_t *indexer = malloc(sizeof(es_indexer_t));
indexer->es_url = es_url;
indexer->es_index = es_index;
if (IndexCtx.needs_es_connection) {
char *es_url = malloc(strlen(url) + 1);
strcpy(es_url, url);
char *es_index = malloc(strlen(index) + 1);
strcpy(es_index, index);
indexer->es_url = es_url;
indexer->es_index = es_index;
} else {
indexer->es_url = NULL;
indexer->es_index = NULL;
}
indexer->queued = 0;
indexer->line_head = NULL;
indexer->line_tail = NULL;

View File

@ -428,8 +428,9 @@ void sist2_index(index_args_t *args) {
IndexCtx.es_url = args->es_url;
IndexCtx.es_index = args->es_index;
IndexCtx.batch_size = args->batch_size;
IndexCtx.needs_es_connection = !args->print;
if (!args->print) {
if (IndexCtx.needs_es_connection) {
elastic_init(args->force_reset, args->es_mappings, args->es_settings);
}
@ -465,27 +466,24 @@ void sist2_index(index_args_t *args) {
f = index_json;
}
void (*cleanup)();
if (args->print) {
cleanup = NULL;
} else {
cleanup = elastic_cleanup;
}
IndexCtx.pool = tpool_create(args->threads, cleanup, FALSE, args->print == 0);
IndexCtx.pool = tpool_create(args->threads, elastic_cleanup, FALSE, args->print == 0);
tpool_start(IndexCtx.pool);
READ_INDICES(file_path, args->index_path, {
read_index(file_path, desc.id, desc.type, f);
LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type);
}, {}, !args->incremental);
snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", args->index_path);
if (0 == access(file_path, R_OK)) {
read_lines(file_path, (line_processor_t) {
.data = NULL,
.func = delete_document
});
LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type)
// Only read the _delete index if we're sending data to ES
if (!args->print) {
snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", args->index_path);
if (0 == access(file_path, R_OK)) {
read_lines(file_path, (line_processor_t) {
.data = NULL,
.func = delete_document
});
LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type)
}
}
closedir(dir);
@ -494,7 +492,7 @@ void sist2_index(index_args_t *args) {
tpool_destroy(IndexCtx.pool);
if (!args->print) {
if (IndexCtx.needs_es_connection) {
finish_indexer(args->script, args->async_script, desc.id);
}
@ -576,7 +574,6 @@ int main(int argc, const char *argv[]) {
char *common_es_url = NULL;
char *common_es_index = NULL;
char *common_script_path = NULL;
char *common_incremental = NULL;
int common_async_script = 0;
int common_threads = 0;
@ -595,7 +592,7 @@ int main(int argc, const char *argv[]) {
"Thumbnail size, in pixels. Use negative value to disable. DEFAULT=500"),
OPT_INTEGER(0, "content-size", &scan_args->content_size,
"Number of bytes to be extracted from text documents. Use negative value to disable. DEFAULT=32768"),
OPT_STRING(0, "incremental", &common_incremental,
OPT_STRING(0, "incremental", &scan_args->incremental,
"Reuse an existing index and only scan modified files."),
OPT_STRING('o', "output", &scan_args->output, "Output directory. DEFAULT=index.sist2/"),
OPT_STRING(0, "rewrite-url", &scan_args->rewrite_url, "Serve files from this url instead of from disk."),
@ -633,7 +630,7 @@ int main(int argc, const char *argv[]) {
OPT_STRING(0, "es-url", &common_es_url, "Elasticsearch url with port. DEFAULT=http://localhost:9200"),
OPT_STRING(0, "es-index", &common_es_index, "Elasticsearch index name. DEFAULT=sist2"),
OPT_BOOLEAN('p', "print", &index_args->print, "Just print JSON documents to stdout."),
OPT_STRING(0, "incremental", &common_incremental,
OPT_BOOLEAN(0, "incremental-index", &index_args->incremental,
"Conduct incremental indexing, assumes that the old index is already digested by Elasticsearch."),
OPT_STRING(0, "script-file", &common_script_path, "Path to user script."),
OPT_STRING(0, "mappings-file", &index_args->es_mappings_path, "Path to Elasticsearch mappings."),
@ -691,9 +688,6 @@ int main(int argc, const char *argv[]) {
exec_args->async_script = common_async_script;
index_args->async_script = common_async_script;
scan_args->incremental = (common_incremental == NULL) ? NULL : strdup(common_incremental);
index_args->incremental = (common_incremental != NULL);
if (argc == 0) {
argparse_usage(&argparse);
goto end;

View File

@ -87,8 +87,13 @@ void parse(void *arg) {
ScanCtx.dbg_skipped_files_count += 1;
pthread_mutex_unlock(&ScanCtx.dbg_file_counts_mu);
CLOSE_FILE(job->vfile)
free(doc->filepath);
free(doc);
return;
}
if (ScanCtx.new_table != NULL) {
pthread_mutex_lock(&ScanCtx.copy_table_mu);
incremental_mark_file(ScanCtx.new_table, doc->path_md5);
@ -128,11 +133,14 @@ void parse(void *arg) {
LOG_ERRORF(job->filepath, "(virtual) read(): [%d] %s", bytes_read, archive_error_string(job->vfile.arc))
}
CLOSE_FILE(job->vfile)
pthread_mutex_lock(&ScanCtx.dbg_file_counts_mu);
ScanCtx.dbg_failed_files_count += 1;
pthread_mutex_unlock(&ScanCtx.dbg_file_counts_mu);
CLOSE_FILE(job->vfile)
free(doc->filepath);
free(doc);
return;
}

File diff suppressed because one or more lines are too long

View File

@ -39,7 +39,7 @@ def sist2_index(files, *args):
return iter(sist2_index_to_dict("test_i"))
def sist2_incremental_index(files, func=None, *args):
def sist2_incremental_index(files, func=None, incremental_index=False, *args):
path = copy_files(files)
if func:
@ -47,11 +47,13 @@ def sist2_incremental_index(files, func=None, *args):
shutil.rmtree("test_i_inc", ignore_errors=True)
sist2("scan", path, "-o", "test_i_inc", "--incremental", "test_i", *args)
return iter(sist2_index_to_dict("test_i_inc"))
return iter(sist2_index_to_dict("test_i_inc", incremental_index))
def sist2_index_to_dict(index):
res = sist2("index", "--print", index)
def sist2_index_to_dict(index, incremental_index=False):
args = ["--incremental-index"] if incremental_index else []
res = sist2("index", "--print", "--very-verbose", *args, index)
for line in res.splitlines():
if line:
@ -75,6 +77,7 @@ class ScanTest(unittest.TestCase):
file_count = sum(1 for _ in sist2_index(TEST_FILES))
self.assertEqual(sum(1 for _ in sist2_incremental_index(TEST_FILES, remove_files)), file_count - 2)
self.assertEqual(sum(1 for _ in sist2_incremental_index(TEST_FILES, add_files, incremental_index=True)), 3)
self.assertEqual(sum(1 for _ in sist2_incremental_index(TEST_FILES, add_files)), file_count + 3)