Initial commit (squashed)

This commit is contained in:
2019-09-27 22:56:07 -04:00
commit 564a17a8fa
75 changed files with 7518 additions and 0 deletions

208
src/index/elastic.c Normal file
View File

@@ -0,0 +1,208 @@
#include "elastic.h"
#include "src/ctx.h"
#include <stdlib.h>
#include "web.h"
#include <stdio.h>
#include <string.h>
#include <cJSON/cJSON.h>
#include <src/ctx.h>
#include "static_generated.c"
#define BULK_INDEX_SIZE 100
typedef struct es_indexer {
int queued;
char *es_url;
es_bulk_line_t *line_head;
es_bulk_line_t *line_tail;
} es_indexer_t;
static es_indexer_t *Indexer;
void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
cJSON *line = cJSON_CreateObject();
cJSON_AddStringToObject(line, "_id", uuid_str);
cJSON_AddStringToObject(line, "_index", "sist2");
cJSON_AddStringToObject(line, "_type", "_doc");
cJSON_AddItemToObject(line, "_source", document);
char *json = cJSON_PrintUnformatted(line);
printf("%s\n", json);
cJSON_free(line);
}
void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]) {
char *json = cJSON_PrintUnformatted(document);
size_t json_len = strlen(json);
es_bulk_line_t *bulk_line = malloc(sizeof(es_bulk_line_t) + json_len + 2);
memcpy(bulk_line->line, json, json_len);
memcpy(bulk_line->uuid_str, uuid_str, UUID_STR_LEN);
*(bulk_line->line + json_len) = '\n';
*(bulk_line->line + json_len + 1) = '\0';
bulk_line->next = NULL;
cJSON_free(json);
elastic_index_line(bulk_line);
}
void elastic_flush() {
if (Indexer == NULL) {
Indexer = create_indexer(IndexCtx.es_url);
}
es_bulk_line_t *line = Indexer->line_head;
int count = 0;
size_t buf_size = 0;
size_t buf_cur = 0;
char *buf = malloc(1);
while (line != NULL) {
char action_str[512];
snprintf(action_str, 512, "{\"index\":{\"_id\": \"%s\"}}\n", line->uuid_str);
size_t action_str_len = strlen(action_str);
size_t line_len = strlen(line->line);
buf = realloc(buf, buf_size + line_len + action_str_len);
buf_size += line_len + action_str_len;
memcpy(buf + buf_cur, action_str, action_str_len);
buf_cur += action_str_len;
memcpy(buf + buf_cur, line->line, line_len);
buf_cur += line_len;
es_bulk_line_t *tmp = line;
line = line->next;
free(tmp);
count++;
}
buf = realloc(buf, buf_size + 1);
*(buf+buf_cur) = '\0';
Indexer->line_head = NULL;
Indexer->line_tail = NULL;
Indexer->queued = 0;
char bulk_url[4096];
snprintf(bulk_url, 4096, "%s/sist2/_bulk", Indexer->es_url);
response_t *r = web_post(bulk_url, buf, "Content-Type: application/x-ndjson");
printf("Indexed %3d documents (%zukB) <%d>\n", count, buf_cur / 1024, r->status_code);
free_response(r);
}
void elastic_index_line(es_bulk_line_t *line) {
if (Indexer == NULL) {
Indexer = create_indexer(IndexCtx.es_url);
}
if (Indexer->line_head == NULL) {
Indexer->line_head = line;
Indexer->line_tail = Indexer->line_head;
} else {
Indexer->line_tail->next = line;
Indexer->line_tail = line;
}
Indexer->queued += 1;
if (Indexer->queued >= BULK_INDEX_SIZE) {
elastic_flush();
}
}
es_indexer_t *create_indexer(const char *url) {
size_t url_len = strlen(url);
char *es_url = malloc(url_len);
strcpy(es_url, url);
es_indexer_t *indexer = malloc(sizeof(es_indexer_t));
indexer->es_url = es_url;
indexer->queued = 0;
indexer->line_head = NULL;
indexer->line_tail = NULL;
return indexer;
}
void destroy_indexer() {
char url[4096];
snprintf(url, sizeof(url), "%s/sist2/_refresh", IndexCtx.es_url);
response_t *r = web_post(url, "", NULL);
printf("Refresh index <%d>\n", r->status_code);
free_response(r);
snprintf(url, sizeof(url), "%s/sist2/_forcemerge", IndexCtx.es_url);
r = web_post(url, "", NULL);
printf("Merge index <%d>\n", r->status_code);
free_response(r);
if (Indexer != NULL) {
free(Indexer->es_url);
free(Indexer);
}
}
void elastic_init(int force_reset) {
// Check if index exists
char url[4096];
snprintf(url, 4096, "%s/sist2", IndexCtx.es_url);
response_t *r = web_get(url);
int index_exists = r->status_code == 200;
free_response(r);
if (!index_exists || force_reset) {
r = web_delete(url);
printf("Delete index <%d>\n", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2", IndexCtx.es_url);
r = web_put(url, "", NULL);
printf("Create index <%d>\n", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2/_close", IndexCtx.es_url);
r = web_post(url, "", NULL);
printf("Close index <%d>\n", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2/_settings", IndexCtx.es_url);
r = web_put(url, settings_json, "Content-Type: application/json");
printf("Update settings <%d>\n", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2/_mappings", IndexCtx.es_url);
r = web_put(url, mappings_json, "Content-Type: application/json");
printf("Update mappings <%d>\n", r->status_code);
free_response(r);
snprintf(url, 4096, "%s/sist2/_open", IndexCtx.es_url);
r = web_post(url, "", NULL);
printf("Open index <%d>\n", r->status_code);
free_response(r);
}
}
cJSON *elastic_get_document(const char *uuid_str) {
char url[4096];
snprintf(url, 4096, "%s/sist2/_source/%s", WebCtx.es_url, uuid_str);
response_t *r = web_get(url);
return cJSON_Parse(r->body);
}

33
src/index/elastic.h Normal file
View File

@@ -0,0 +1,33 @@
#ifndef SIST2_ELASTIC_H
#define SIST2_ELASTIC_H
#include "src/sist.h"
typedef struct es_bulk_line {
struct es_bulk_line *next;
char uuid_str[UUID_STR_LEN];
char line[0];
} es_bulk_line_t;
/**
* Note: indexer is *not* thread safe
*/
typedef struct es_indexer es_indexer_t;
void elastic_index_line(es_bulk_line_t *line);
void elastic_flush();
void print_json(cJSON *document, const char uuid_str[UUID_STR_LEN]);
void index_json(cJSON *document, const char uuid_str[UUID_STR_LEN]);
es_indexer_t *create_indexer(const char* es_url);
void destroy_indexer();
void elastic_init(int force_reset);
cJSON *elastic_get_document(const char *uuid_str);
#endif

File diff suppressed because one or more lines are too long

130
src/index/web.c Normal file
View File

@@ -0,0 +1,130 @@
#include "web.h"
size_t write_cb(char *ptr, size_t size, size_t nmemb, void *user_data) {
size_t real_size = size * nmemb;
dyn_buffer_t *buf = user_data;
dyn_buffer_write(buf, ptr, real_size);
return real_size;
}
void free_response(response_t *resp) {
free(resp->body);
free(resp);
}
response_t *web_get(const char *url) {
response_t *resp = malloc(sizeof(response_t));
CURL *curl;
dyn_buffer_t buffer = dyn_buffer_create();
curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&buffer));
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2");
curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code);
curl_easy_cleanup(curl);
resp->body = buffer.buf;
resp->size = buffer.cur;
return resp;
}
response_t *web_post(const char *url, const char *data, const char *header) {
response_t *resp = malloc(sizeof(response_t));
CURL *curl;
dyn_buffer_t buffer = dyn_buffer_create();
curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&buffer));
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(curl, CURLOPT_POST, 1);
curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2");
if (header != NULL) {
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, header);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
}
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data);
int r1 = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code);
curl_easy_cleanup(curl);
resp->body = buffer.buf;
resp->size = buffer.cur;
return resp;
}
response_t *web_put(const char *url, const char *data, const char *header) {
response_t *resp = malloc(sizeof(response_t));
CURL *curl;
dyn_buffer_t buffer = dyn_buffer_create();
curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&buffer));
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2");
curl_easy_setopt(curl, CURLOPT_DNS_USE_GLOBAL_CACHE, 0);
curl_easy_setopt(curl, CURLOPT_IPRESOLVE, CURLOPT_DNS_LOCAL_IP4 );
if (header != NULL) {
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, header);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
}
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data);
curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code);
curl_easy_cleanup(curl);
resp->body = buffer.buf;
resp->size = buffer.cur;
return resp;
}
response_t *web_delete(const char *url) {
response_t *resp = malloc(sizeof(response_t));
CURL *curl;
dyn_buffer_t buffer = dyn_buffer_create();
curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&buffer));
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2");
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, "");
curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &resp->status_code);
curl_easy_cleanup(curl);
resp->body = buffer.buf;
resp->size = buffer.cur;
return resp;
}

19
src/index/web.h Normal file
View File

@@ -0,0 +1,19 @@
#ifndef SIST2_WEB_H
#define SIST2_WEB_H
#include "src/sist.h"
typedef struct response {
char *body;
size_t size;
int status_code;
} response_t;
response_t *web_get(const char *url);
response_t *web_post(const char * url, const char * data, const char* header);
response_t *web_put(const char *url, const char *data, const char *header);
response_t *web_delete(const char *url);
void free_response(response_t *resp);
#endif