index: bulk delete

This commit is contained in:
Yatao Li 2022-01-20 03:08:13 +08:00
parent 679e12f786
commit 2cb57f3634
3 changed files with 52 additions and 25 deletions

View File

@ -52,11 +52,22 @@ void index_json_func(void *arg) {
elastic_index_line(line); elastic_index_line(line);
} }
void delete_document(const char* document_id_str, void* UNUSED(_data)) {
size_t id_len = strlen(document_id_str);
es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t));
bulk_line->type = ES_BULK_LINE_DELETE;
bulk_line->next = NULL;
memcpy(bulk_line->path_md5_str, document_id_str, MD5_STR_LENGTH);
tpool_add_work(IndexCtx.pool, index_json_func, bulk_line);
}
void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) { void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) {
char *json = cJSON_PrintUnformatted(document); char *json = cJSON_PrintUnformatted(document);
size_t json_len = strlen(json); size_t json_len = strlen(json);
es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t) + json_len + 2); es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t) + json_len + 2);
bulk_line->type = ES_BULK_LINE_INDEX;
memcpy(bulk_line->line, json, json_len); memcpy(bulk_line->line, json, json_len);
memcpy(bulk_line->path_md5_str, index_id_str, MD5_STR_LENGTH); memcpy(bulk_line->path_md5_str, index_id_str, MD5_STR_LENGTH);
*(bulk_line->line + json_len) = '\n'; *(bulk_line->line + json_len) = '\n';
@ -67,11 +78,6 @@ void index_json(cJSON *document, const char index_id_str[MD5_STR_LENGTH]) {
tpool_add_work(IndexCtx.pool, index_json_func, bulk_line); tpool_add_work(IndexCtx.pool, index_json_func, bulk_line);
} }
void delete_document(const char* document_id_str, void* data) {
const char* index_id_str = data;
// TODO bulk delete
}
void execute_update_script(const char *script, int async, const char index_id[MD5_STR_LENGTH]) { void execute_update_script(const char *script, int async, const char index_id[MD5_STR_LENGTH]) {
if (Indexer == NULL) { if (Indexer == NULL) {
@ -130,30 +136,47 @@ void *create_bulk_buffer(int max, int *count, size_t *buf_len) {
size_t buf_cur = 0; size_t buf_cur = 0;
char *buf = malloc(8192); char *buf = malloc(8192);
size_t buf_capacity = 8192; size_t buf_capacity = 8192;
#define GROW_BUF(delta) \
while (buf_size + (delta) > buf_capacity) { \
buf_capacity *= 2; \
buf = realloc(buf, buf_capacity); \
} \
buf_size += (delta); \
// see: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
// ES_BULK_LINE_INDEX: two lines, 1st action, 2nd content
// ES_BULK_LINE_DELETE: one line
while (line != NULL && *count < max) { while (line != NULL && *count < max) {
char action_str[256]; char action_str[256];
snprintf( if (line->type == ES_BULK_LINE_INDEX) {
action_str, sizeof(action_str), snprintf(
"{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n", action_str, sizeof(action_str),
line->path_md5_str, Indexer->es_index "{\"index\":{\"_id\":\"%s\",\"_type\":\"_doc\",\"_index\":\"%s\"}}\n",
); line->path_md5_str, Indexer->es_index
);
size_t action_str_len = strlen(action_str); size_t action_str_len = strlen(action_str);
size_t line_len = strlen(line->line); size_t line_len = strlen(line->line);
while (buf_size + line_len + action_str_len > buf_capacity) { GROW_BUF(action_str_len + line_len);
buf_capacity *= 2;
buf = realloc(buf, buf_capacity); memcpy(buf + buf_cur, action_str, action_str_len);
buf_cur += action_str_len;
memcpy(buf + buf_cur, line->line, line_len);
buf_cur += line_len;
} else if (line->type == ES_BULK_LINE_DELETE) {
snprintf(
action_str, sizeof(action_str),
"{\"delete\":{\"_id\":\"%s\",\"_index\":\"%s\"}}\n",
line->path_md5_str, Indexer->es_index
);
size_t action_str_len = strlen(action_str);
GROW_BUF(action_str_len);
memcpy(buf + buf_cur, action_str, action_str_len);
buf_cur += action_str_len;
} }
buf_size += line_len + action_str_len;
memcpy(buf + buf_cur, action_str, action_str_len);
buf_cur += action_str_len;
memcpy(buf + buf_cur, line->line, line_len);
buf_cur += line_len;
line = line->next; line = line->next;
(*count)++; (*count)++;
} }

View File

@ -3,9 +3,13 @@
#include "src/sist.h" #include "src/sist.h"
#define ES_BULK_LINE_INDEX 0
#define ES_BULK_LINE_DELETE 1
typedef struct es_bulk_line { typedef struct es_bulk_line {
struct es_bulk_line *next; struct es_bulk_line *next;
char path_md5_str[MD5_STR_LENGTH]; char path_md5_str[MD5_STR_LENGTH];
int type;
char line[0]; char line[0];
} es_bulk_line_t; } es_bulk_line_t;

View File

@ -489,7 +489,7 @@ void sist2_index(index_args_t *args) {
snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", args->index_path); snprintf(file_path, PATH_MAX, "%s_index_delete.list.zst", args->index_path);
if (0 == access(file_path, R_OK)) { if (0 == access(file_path, R_OK)) {
read_lines(file_path, (line_processor_t) { read_lines(file_path, (line_processor_t) {
.data = desc.id, .data = NULL,
.func = delete_document .func = delete_document
}); });
LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type) LOG_DEBUGF("main.c", "Read index file %s (%s)", file_path, desc.type)
@ -698,7 +698,7 @@ int main(int argc, const char *argv[]) {
exec_args->async_script = common_async_script; exec_args->async_script = common_async_script;
index_args->async_script = common_async_script; index_args->async_script = common_async_script;
scan_args->incremental = common_incremental; scan_args->incremental = (common_incremental == NULL) ? NULL : strdup(common_incremental);
index_args->incremental = (common_incremental != NULL); index_args->incremental = (common_incremental != NULL);
if (argc == 0) { if (argc == 0) {