Handle 429, multi-threaded index module

This commit is contained in:
2020-07-11 17:42:46 -04:00
parent ed15e89f45
commit 13f4dbed2d
15 changed files with 227 additions and 82 deletions

View File

@@ -14,9 +14,18 @@ typedef struct es_indexer {
} es_indexer_t;
static es_indexer_t *Indexer;
static __thread es_indexer_t *Indexer;
void delete_queue(int max);
void elastic_flush();
void elastic_cleanup() {
elastic_flush();
if (Indexer != NULL) {
free(Indexer->es_url);
free(Indexer);
}
}
void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
@@ -35,8 +44,12 @@ void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
cJSON_Delete(line);
}
void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
void index_json_func(void *arg) {
es_bulk_line_t *line = arg;
elastic_index_line(line);
}
void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
char *json = cJSON_PrintUnformatted(document);
size_t json_len = strlen(json);
@@ -48,7 +61,7 @@ void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
bulk_line->next = NULL;
cJSON_free(json);
elastic_index_line(bulk_line);
tpool_add_work(IndexCtx.pool, index_json_func, bulk_line);
}
void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]) {
@@ -89,33 +102,44 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]
cJSON_Delete(resp);
}
#define ACTION_STR_LEN 91
void *create_bulk_buffer(int max, int *count, size_t *buf_len) {
es_bulk_line_t *line = Indexer->line_head;
*count = 0;
size_t buf_size = 0;
size_t buf_cur = 0;
char *buf = malloc(1);
char *buf = malloc(8196);
size_t buf_capacity = 8196;
while (line != NULL && *count < max) {
char action_str[512];
snprintf(action_str, 512,
char action_str[256];
snprintf(action_str, 256,
"{\"index\":{\"_id\":\"%s\", \"_type\":\"_doc\", \"_index\":\"sist2\"}}\n", line->uuid_str);
size_t action_str_len = strlen(action_str);
size_t line_len = strlen(line->line);
buf = realloc(buf, buf_size + line_len + action_str_len);
buf_size += line_len + action_str_len;
memcpy(buf + buf_cur, action_str, action_str_len);
buf_cur += action_str_len;
while (buf_size + line_len + ACTION_STR_LEN > buf_capacity) {
buf_capacity *= 2;
buf = realloc(buf, buf_capacity);
}
buf_size += line_len + ACTION_STR_LEN;
memcpy(buf + buf_cur, action_str, ACTION_STR_LEN);
buf_cur += ACTION_STR_LEN;
memcpy(buf + buf_cur, line->line, line_len);
buf_cur += line_len;
line = line->next;
(*count)++;
}
buf = realloc(buf, buf_size + 1);
if (buf_size + 1 > buf_capacity) {
buf = realloc(buf, buf_capacity + 1);
}
*(buf + buf_cur) = '\0';
*buf_len = buf_cur;
@@ -123,7 +147,7 @@ void *create_bulk_buffer(int max, int *count, size_t *buf_len) {
}
void print_errors(response_t *r) {
char * tmp = malloc(r->size + 1);
char *tmp = malloc(r->size + 1);
memcpy(tmp, r->body, r->size);
*(tmp + r->size) = '\0';
@@ -181,6 +205,15 @@ void _elastic_flush(int max) {
_elastic_flush(max / 2);
return;
} else if (r->status_code == 429) {
free_response(r);
free(buf);
LOG_WARNING("elastic.c", "Got 429 status, will retry after delay")
usleep(1000000 * 20);
_elastic_flush(max);
return;
} else if (r->status_code != 200) {
print_errors(r);
delete_queue(Indexer->queued);
@@ -257,7 +290,7 @@ es_indexer_t *create_indexer(const char *url) {
return indexer;
}
void destroy_indexer(char *script, char index_id[UUID_STR_LEN]) {
void finish_indexer(char *script, char *index_id) {
char url[4096];
@@ -280,11 +313,6 @@ void destroy_indexer(char *script, char index_id[UUID_STR_LEN]) {
r = web_post(url, "");
LOG_INFOF("elastic.c", "Merge index <%d>", r->status_code);
free_response(r);
if (Indexer != NULL) {
free(Indexer->es_url);
free(Indexer);
}
}
void elastic_init(int force_reset) {