index: incremental indexing, add stub for index entries removal

This commit is contained in:
Yatao Li
2022-01-19 13:17:51 +08:00
parent 7d40b9e959
commit 291d307689
6 changed files with 60 additions and 18 deletions

View File

@@ -17,7 +17,7 @@ typedef struct es_indexer {
static __thread es_indexer_t *Indexer;
void delete_queue(int max);
void free_queue(int max);
void elastic_flush();
@@ -67,6 +67,11 @@ void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) {
tpool_add_work(IndexCtx.pool, index_json_func, bulk_line);
}
void delete_document(const char* document_id_str, void* data) {
const char* index_id_str = data;
// TODO bulk delete
}
void execute_update_script(const char *script, int async, const char index_id[MD5_STR_LENGTH]) {
if (Indexer == NULL) {
@@ -223,7 +228,7 @@ void _elastic_flush(int max) {
LOG_ERRORF("elastic.c", "Single document too large, giving up: {%s}", Indexer->line_head->path_md5_str)
free_response(r);
free(buf);
delete_queue(1);
free_queue(1);
if (Indexer->queued != 0) {
elastic_flush();
}
@@ -248,13 +253,13 @@ void _elastic_flush(int max) {
} else if (r->status_code != 200) {
print_errors(r);
delete_queue(Indexer->queued);
free_queue(Indexer->queued);
} else {
print_errors(r);
LOG_DEBUGF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_len / 1024, r->status_code);
delete_queue(max);
free_queue(max);
if (Indexer->queued != 0) {
elastic_flush();
@@ -265,7 +270,7 @@ void _elastic_flush(int max) {
free(buf);
}
void delete_queue(int max) {
void free_queue(int max) {
for (int i = 0; i < max; i++) {
es_bulk_line_t *tmp = Indexer->line_head;
Indexer->line_head = tmp->next;

View File

@@ -40,6 +40,8 @@ void print_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]);
void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]);
void delete_document(const char *document_id_str, void* data);
es_indexer_t *create_indexer(const char *url, const char *index);
void elastic_cleanup();