mirror of
https://github.com/simon987/sist2.git
synced 2025-04-19 10:16:42 +00:00
Use async curl for ES requests #108
This commit is contained in:
parent
0cd2523b05
commit
b6ddeee0e0
132
src/index/web.c
132
src/index/web.c
@ -1,5 +1,6 @@
|
|||||||
#include "web.h"
|
#include "web.h"
|
||||||
#include "src/sist.h"
|
#include "src/sist.h"
|
||||||
|
#include "src/ctx.h"
|
||||||
|
|
||||||
#include <mongoose.h>
|
#include <mongoose.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
@ -21,95 +22,82 @@ void free_response(response_t *resp) {
|
|||||||
free(resp);
|
free(resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define SIST2_HEADERS "User-Agent: sist2\r\nContent-Type: application/json\r\n"
|
void web_post_async_poll(subreq_ctx_t* req) {
|
||||||
|
fd_set fdread;
|
||||||
|
fd_set fdwrite;
|
||||||
|
fd_set fdexcep;
|
||||||
|
int maxfd = -1;
|
||||||
|
|
||||||
|
FD_ZERO(&fdread);
|
||||||
|
FD_ZERO(&fdwrite);
|
||||||
|
FD_ZERO(&fdexcep);
|
||||||
|
|
||||||
void http_req_ev(struct mg_connection *nc, int ev, void *ptr) {
|
CURLMcode mc = curl_multi_fdset(req->multi, &fdread, &fdwrite, &fdexcep, &maxfd);
|
||||||
|
|
||||||
http_ev_data_t *ev_data = (http_ev_data_t *) nc->user_data;
|
if(mc != CURLM_OK) {
|
||||||
|
req->done = TRUE;
|
||||||
switch (ev) {
|
return;
|
||||||
case MG_EV_CONNECT: {
|
|
||||||
int connect_status = *(int *) ptr;
|
|
||||||
if (connect_status != 0) {
|
|
||||||
ev_data->done = TRUE;
|
|
||||||
ev_data->resp->status_code = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (maxfd == -1) {
|
||||||
|
// no fds ready yet
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct timeval timeout = {1, 0};
|
||||||
|
int rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
|
||||||
|
|
||||||
|
switch(rc) {
|
||||||
|
case -1:
|
||||||
|
req->done = TRUE;
|
||||||
break;
|
break;
|
||||||
}
|
case 0:
|
||||||
case MG_EV_HTTP_REPLY: {
|
|
||||||
struct http_message *hm = (struct http_message *) ptr;
|
|
||||||
|
|
||||||
//TODO: Check errors?
|
|
||||||
|
|
||||||
ev_data->resp->size = hm->body.len;
|
|
||||||
ev_data->resp->status_code = hm->resp_code;
|
|
||||||
ev_data->resp->body = malloc(hm->body.len + 1);
|
|
||||||
memcpy(ev_data->resp->body, hm->body.p, hm->body.len);
|
|
||||||
*(ev_data->resp->body + hm->body.len) = '\0';
|
|
||||||
|
|
||||||
ev_data->done = TRUE;
|
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
case MG_EV_CLOSE: {
|
|
||||||
ev_data->done = TRUE;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
|
curl_multi_perform(req->multi, &req->running_handles);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (req->running_handles == 0) {
|
||||||
|
req->done = TRUE;
|
||||||
|
req->response->body = req->response_buf.buf;
|
||||||
|
req->response->size = req->response_buf.cur;
|
||||||
|
curl_easy_getinfo(req->handle, CURLINFO_RESPONSE_CODE, &req->response->status_code);
|
||||||
|
|
||||||
|
curl_multi_cleanup(req->multi);
|
||||||
|
curl_easy_cleanup(req->handle);
|
||||||
|
curl_slist_free_all(req->headers);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
subreq_ctx_t *http_req(const char *url, const char *extra_headers, const char *post_data, const char *method) {
|
subreq_ctx_t *web_post_async(const char *url, char *data) {
|
||||||
|
subreq_ctx_t *req = calloc(1, sizeof(subreq_ctx_t));
|
||||||
|
req->response = calloc(1, sizeof(response_t));
|
||||||
|
req->data = data;
|
||||||
|
req->response_buf = dyn_buffer_create();
|
||||||
|
|
||||||
struct mg_str scheme;
|
req->handle = curl_easy_init();
|
||||||
struct mg_str user_info;
|
CURL *curl = req->handle;
|
||||||
struct mg_str host;
|
curl_easy_setopt(curl, CURLOPT_URL, url);
|
||||||
unsigned int port;
|
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&req->response_buf));
|
||||||
struct mg_str path;
|
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb);
|
||||||
struct mg_str query;
|
curl_easy_setopt(curl, CURLOPT_POST, 1);
|
||||||
struct mg_str fragment;
|
curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2");
|
||||||
|
|
||||||
if (post_data == NULL) post_data = "";
|
struct curl_slist *headers = NULL;
|
||||||
if (extra_headers == NULL) extra_headers = "";
|
headers = curl_slist_append(headers, "Content-Type: application/json");
|
||||||
if (path.len == 0) path = mg_mk_str("/");
|
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
|
||||||
if (host.len == 0) host = mg_mk_str("");
|
|
||||||
|
|
||||||
// [scheme://[user_info@]]host[:port][/path][?query][#fragment]
|
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data);
|
||||||
mg_parse_uri(mg_mk_str(url), &scheme, &user_info, &host, &port, &path, &query, &fragment);
|
|
||||||
|
|
||||||
if (query.len > 0) path.len += query.len + 1;
|
req->multi = curl_multi_init();
|
||||||
|
curl_multi_add_handle(req->multi, curl);
|
||||||
|
curl_multi_perform(req->multi, &req->running_handles);
|
||||||
|
|
||||||
subreq_ctx_t *ctx = malloc(sizeof(subreq_ctx_t));
|
LOG_DEBUGF("web.c", "async request POST %s", url)
|
||||||
mg_mgr_init(&ctx->mgr, NULL);
|
|
||||||
|
|
||||||
char address[8192];
|
return req;
|
||||||
snprintf(address, sizeof(address), "tcp://%.*s:%u", (int) host.len, host.p, port);
|
|
||||||
struct mg_connection *nc = mg_connect(&ctx->mgr, address, http_req_ev);
|
|
||||||
nc->user_data = &ctx->ev_data;
|
|
||||||
mg_set_protocol_http_websocket(nc);
|
|
||||||
|
|
||||||
ctx->ev_data.resp = calloc(1, sizeof(response_t));
|
|
||||||
ctx->ev_data.done = FALSE;
|
|
||||||
|
|
||||||
mg_printf(
|
|
||||||
nc, "%s %.*s HTTP/1.1\r\n"
|
|
||||||
"Host: %.*s\r\n"
|
|
||||||
"Content-Length: %zu\r\n"
|
|
||||||
"%s\r\n"
|
|
||||||
"%s",
|
|
||||||
method, (int) path.len, path.p,
|
|
||||||
(int) (path.p - host.p), host.p,
|
|
||||||
strlen(post_data),
|
|
||||||
extra_headers,
|
|
||||||
post_data
|
|
||||||
);
|
|
||||||
|
|
||||||
return ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
subreq_ctx_t *web_post_async(const char *url, const char *data) {
|
|
||||||
return http_req(url, SIST2_HEADERS, data, "POST");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response_t *web_get(const char *url, int timeout) {
|
response_t *web_get(const char *url, int timeout) {
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
#include "src/sist.h"
|
#include "src/sist.h"
|
||||||
#include <mongoose.h>
|
#include <mongoose.h>
|
||||||
|
#include <curl/curl.h>
|
||||||
|
|
||||||
typedef struct response {
|
typedef struct response {
|
||||||
char *body;
|
char *body;
|
||||||
@ -16,13 +17,20 @@ typedef struct {
|
|||||||
} http_ev_data_t;
|
} http_ev_data_t;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
http_ev_data_t ev_data;
|
char* data;
|
||||||
struct mg_mgr mgr;
|
dyn_buffer_t response_buf;
|
||||||
|
struct curl_slist *headers;
|
||||||
|
CURL *handle;
|
||||||
|
CURLM *multi;
|
||||||
|
response_t *response;
|
||||||
|
int running_handles;
|
||||||
|
int done;
|
||||||
} subreq_ctx_t;
|
} subreq_ctx_t;
|
||||||
|
|
||||||
response_t *web_get(const char *url, int timeout);
|
response_t *web_get(const char *url, int timeout);
|
||||||
response_t *web_post(const char * url, const char * data);
|
response_t *web_post(const char * url, const char * data);
|
||||||
subreq_ctx_t *web_post_async(const char *url, const char *data);
|
void web_post_async_poll(subreq_ctx_t* req);
|
||||||
|
subreq_ctx_t *web_post_async(const char *url, char *data);
|
||||||
response_t *web_put(const char *url, const char *data);
|
response_t *web_put(const char *url, const char *data);
|
||||||
response_t *web_delete(const char *url);
|
response_t *web_delete(const char *url);
|
||||||
|
|
||||||
|
@ -10,8 +10,6 @@
|
|||||||
|
|
||||||
#include <mongoose.h>
|
#include <mongoose.h>
|
||||||
|
|
||||||
#define CHUNK_SIZE 1024 * 1024 * 10
|
|
||||||
|
|
||||||
|
|
||||||
static int has_prefix(const struct mg_str *str, const struct mg_str *prefix) {
|
static int has_prefix(const struct mg_str *str, const struct mg_str *prefix) {
|
||||||
return str->len > prefix->len && memcmp(str->p, prefix->p, prefix->len) == 0;
|
return str->len > prefix->len && memcmp(str->p, prefix->p, prefix->len) == 0;
|
||||||
@ -240,7 +238,6 @@ void search(struct mg_connection *nc, struct http_message *hm) {
|
|||||||
snprintf(url, 4096, "%s/%s/_search", WebCtx.es_url, WebCtx.es_index);
|
snprintf(url, 4096, "%s/%s/_search", WebCtx.es_url, WebCtx.es_index);
|
||||||
|
|
||||||
nc->user_data = web_post_async(url, body);
|
nc->user_data = web_post_async(url, body);
|
||||||
free(body);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void serve_file_from_url(cJSON *json, index_t *idx, struct mg_connection *nc) {
|
void serve_file_from_url(cJSON *json, index_t *idx, struct mg_connection *nc) {
|
||||||
@ -668,11 +665,11 @@ static void ev_router(struct mg_connection *nc, int ev, void *p) {
|
|||||||
if (nc->user_data != NULL) {
|
if (nc->user_data != NULL) {
|
||||||
//Waiting for ES reply
|
//Waiting for ES reply
|
||||||
subreq_ctx_t *ctx = (subreq_ctx_t *) nc->user_data;
|
subreq_ctx_t *ctx = (subreq_ctx_t *) nc->user_data;
|
||||||
mg_mgr_poll(&ctx->mgr, 0);
|
web_post_async_poll(ctx);
|
||||||
|
|
||||||
if (ctx->ev_data.done == TRUE) {
|
if (ctx->done == TRUE) {
|
||||||
|
|
||||||
response_t *r = ctx->ev_data.resp;
|
response_t *r = ctx->response;
|
||||||
|
|
||||||
if (r->status_code == 200) {
|
if (r->status_code == 200) {
|
||||||
send_response_line(nc, 200, r->size, "Content-Type: application/json");
|
send_response_line(nc, 200, r->size, "Content-Type: application/json");
|
||||||
@ -695,6 +692,8 @@ static void ev_router(struct mg_connection *nc, int ev, void *p) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
free_response(r);
|
free_response(r);
|
||||||
|
free(ctx->data);
|
||||||
|
free(ctx);
|
||||||
nc->flags |= MG_F_SEND_AND_CLOSE;
|
nc->flags |= MG_F_SEND_AND_CLOSE;
|
||||||
nc->user_data = NULL;
|
nc->user_data = NULL;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user