This commit is contained in:
simon 2020-02-20 16:42:13 -05:00
parent ae8652d86e
commit a8505cb8c1
3 changed files with 74 additions and 25 deletions

View File

@ -20,6 +20,8 @@ typedef struct es_indexer {
static es_indexer_t *Indexer; static es_indexer_t *Indexer;
void delete_queue(int max);
void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) { void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
cJSON *line = cJSON_CreateObject(); cJSON *line = cJSON_CreateObject();
@ -87,21 +89,15 @@ void execute_update_script(const char *script, const char index_id[UUID_STR_LEN]
cJSON_Delete(resp); cJSON_Delete(resp);
} }
void elastic_flush() { void *create_bulk_buffer(int max, int *count, size_t *buf_len) {
if (Indexer == NULL) {
Indexer = create_indexer(IndexCtx.es_url);
}
es_bulk_line_t *line = Indexer->line_head; es_bulk_line_t *line = Indexer->line_head;
*count = 0;
int count = 0;
size_t buf_size = 0; size_t buf_size = 0;
size_t buf_cur = 0; size_t buf_cur = 0;
char *buf = malloc(1); char *buf = malloc(1);
while (line != NULL) { while (line != NULL && *count < max) {
char action_str[512]; char action_str[512];
snprintf(action_str, 512, snprintf(action_str, 512,
"{\"index\":{\"_id\":\"%s\", \"_type\":\"_doc\", \"_index\":\"sist2\"}}\n", line->uuid_str); "{\"index\":{\"_id\":\"%s\", \"_type\":\"_doc\", \"_index\":\"sist2\"}}\n", line->uuid_str);
@ -116,17 +112,20 @@ void elastic_flush() {
memcpy(buf + buf_cur, line->line, line_len); memcpy(buf + buf_cur, line->line, line_len);
buf_cur += line_len; buf_cur += line_len;
es_bulk_line_t *tmp = line;
line = line->next; line = line->next;
free(tmp); (*count)++;
count++;
} }
buf = realloc(buf, buf_size + 1); buf = realloc(buf, buf_size + 1);
*(buf + buf_cur) = '\0'; *(buf + buf_cur) = '\0';
Indexer->line_head = NULL; *buf_len = buf_cur;
Indexer->line_tail = NULL; return buf;
Indexer->queued = 0; }
void _elastic_flush(int max) {
size_t buf_len;
int count;
void *buf = create_bulk_buffer(max, &count, &buf_len);
char bulk_url[4096]; char bulk_url[4096];
snprintf(bulk_url, 4096, "%s/sist2/_bulk?pipeline=tie", Indexer->es_url); snprintf(bulk_url, 4096, "%s/sist2/_bulk?pipeline=tie", Indexer->es_url);
@ -136,9 +135,27 @@ void elastic_flush() {
LOG_FATALF("elastic.c", "Could not connect to %s, make sure that elasticsearch is running!\n", IndexCtx.es_url) LOG_FATALF("elastic.c", "Could not connect to %s, make sure that elasticsearch is running!\n", IndexCtx.es_url)
} }
LOG_INFOF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_cur / 1024, r->status_code); if (r->status_code == 413) {
if (r->status_code != 200 && r->status_code != 413) { if (max <= 1) {
LOG_ERRORF("elastic.c", "Single document too large, giving up: {%s}", Indexer->line_head->uuid_str)
free_response(r);
free(buf);
delete_queue(1);
if (Indexer->queued != 0) {
elastic_flush();
}
return;
}
LOG_WARNINGF("elastic.c", "Payload too large, retrying (%d documents)", count);
free_response(r);
free(buf);
_elastic_flush(max / 2);
return;
} else if (r->status_code != 200) {
cJSON *ret_json = cJSON_Parse(r->body); cJSON *ret_json = cJSON_Parse(r->body);
if (cJSON_GetObjectItem(ret_json, "errors")->valueint != 0) { if (cJSON_GetObjectItem(ret_json, "errors")->valueint != 0) {
cJSON *err; cJSON *err;
@ -152,12 +169,44 @@ void elastic_flush() {
} }
cJSON_Delete(ret_json); cJSON_Delete(ret_json);
delete_queue(Indexer->queued);
} else {
LOG_INFOF("elastic.c", "Indexed %d documents (%zukB) <%d>", count, buf_len / 1024, r->status_code);
delete_queue(max);
if (Indexer->queued != 0) {
elastic_flush();
}
} }
free_response(r); free_response(r);
free(buf); free(buf);
} }
void delete_queue(int max) {
for (int i = 0; i < max; i++) {
es_bulk_line_t *tmp = Indexer->line_head;
Indexer->line_head = tmp->next;
if (Indexer->line_head == NULL) {
Indexer->line_tail = NULL;
} else {
free(tmp);
}
Indexer->queued -= 1;
}
}
void elastic_flush() {
if (Indexer == NULL) {
Indexer = create_indexer(IndexCtx.es_url);
}
_elastic_flush(Indexer->queued);
}
void elastic_index_line(es_bulk_line_t *line) { void elastic_index_line(es_bulk_line_t *line) {
if (Indexer == NULL) { if (Indexer == NULL) {

View File

@ -6,7 +6,7 @@
#define EPILOG "Made by simon987 <me@simon987.net>. Released under GPL-3.0" #define EPILOG "Made by simon987 <me@simon987.net>. Released under GPL-3.0"
static const char *const Version = "1.2.12"; static const char *const Version = "1.2.13";
static const char *const usage[] = { static const char *const usage[] = {
"sist2 scan [OPTION]... PATH", "sist2 scan [OPTION]... PATH",
"sist2 index [OPTION]... INDEX", "sist2 index [OPTION]... INDEX",

View File

@ -11,7 +11,7 @@
<nav class="navbar navbar-expand-lg"> <nav class="navbar navbar-expand-lg">
<a class="navbar-brand" href="/">sist2</a> <a class="navbar-brand" href="/">sist2</a>
<span class="badge badge-pill version">v1.2.12</span> <span class="badge badge-pill version">v1.2.13</span>
<span class="tagline">Lightning-fast file system indexer and search tool </span> <span class="tagline">Lightning-fast file system indexer and search tool </span>
<a style="margin-left: auto" id="theme" class="btn" title="Toggle theme" href="/">Theme</a> <a style="margin-left: auto" id="theme" class="btn" title="Toggle theme" href="/">Theme</a>
</nav> </nav>