diff --git a/src/index/web.c b/src/index/web.c index 20e80fe..b6cc992 100644 --- a/src/index/web.c +++ b/src/index/web.c @@ -1,5 +1,6 @@ #include "web.h" #include "src/sist.h" +#include "src/ctx.h" #include #include @@ -21,95 +22,82 @@ void free_response(response_t *resp) { free(resp); } -#define SIST2_HEADERS "User-Agent: sist2\r\nContent-Type: application/json\r\n" +void web_post_async_poll(subreq_ctx_t* req) { + fd_set fdread; + fd_set fdwrite; + fd_set fdexcep; + int maxfd = -1; + FD_ZERO(&fdread); + FD_ZERO(&fdwrite); + FD_ZERO(&fdexcep); -void http_req_ev(struct mg_connection *nc, int ev, void *ptr) { + CURLMcode mc = curl_multi_fdset(req->multi, &fdread, &fdwrite, &fdexcep, &maxfd); - http_ev_data_t *ev_data = (http_ev_data_t *) nc->user_data; + if(mc != CURLM_OK) { + req->done = TRUE; + return; + } - switch (ev) { - case MG_EV_CONNECT: { - int connect_status = *(int *) ptr; - if (connect_status != 0) { - ev_data->done = TRUE; - ev_data->resp->status_code = 0; - } + if (maxfd == -1) { + // no fds ready yet + return; + } + + struct timeval timeout = {1, 0}; + int rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout); + + switch(rc) { + case -1: + req->done = TRUE; break; - } - case MG_EV_HTTP_REPLY: { - struct http_message *hm = (struct http_message *) ptr; - - //TODO: Check errors? - - ev_data->resp->size = hm->body.len; - ev_data->resp->status_code = hm->resp_code; - ev_data->resp->body = malloc(hm->body.len + 1); - memcpy(ev_data->resp->body, hm->body.p, hm->body.len); - *(ev_data->resp->body + hm->body.len) = '\0'; - - ev_data->done = TRUE; + case 0: break; - } - case MG_EV_CLOSE: { - ev_data->done = TRUE; - break; - } default: + curl_multi_perform(req->multi, &req->running_handles); break; } + + if (req->running_handles == 0) { + req->done = TRUE; + req->response->body = req->response_buf.buf; + req->response->size = req->response_buf.cur; + curl_easy_getinfo(req->handle, CURLINFO_RESPONSE_CODE, &req->response->status_code); + + curl_multi_cleanup(req->multi); + curl_easy_cleanup(req->handle); + curl_slist_free_all(req->headers); + return; + } } -subreq_ctx_t *http_req(const char *url, const char *extra_headers, const char *post_data, const char *method) { +subreq_ctx_t *web_post_async(const char *url, char *data) { + subreq_ctx_t *req = calloc(1, sizeof(subreq_ctx_t)); + req->response = calloc(1, sizeof(response_t)); + req->data = data; + req->response_buf = dyn_buffer_create(); - struct mg_str scheme; - struct mg_str user_info; - struct mg_str host; - unsigned int port; - struct mg_str path; - struct mg_str query; - struct mg_str fragment; + req->handle = curl_easy_init(); + CURL *curl = req->handle; + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) (&req->response_buf)); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt(curl, CURLOPT_POST, 1); + curl_easy_setopt(curl, CURLOPT_USERAGENT, "sist2"); - if (post_data == NULL) post_data = ""; - if (extra_headers == NULL) extra_headers = ""; - if (path.len == 0) path = mg_mk_str("/"); - if (host.len == 0) host = mg_mk_str(""); + struct curl_slist *headers = NULL; + headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - // [scheme://[user_info@]]host[:port][/path][?query][#fragment] - mg_parse_uri(mg_mk_str(url), &scheme, &user_info, &host, &port, &path, &query, &fragment); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data); - if (query.len > 0) path.len += query.len + 1; + req->multi = curl_multi_init(); + curl_multi_add_handle(req->multi, curl); + curl_multi_perform(req->multi, &req->running_handles); - subreq_ctx_t *ctx = malloc(sizeof(subreq_ctx_t)); - mg_mgr_init(&ctx->mgr, NULL); + LOG_DEBUGF("web.c", "async request POST %s", url) - char address[8192]; - snprintf(address, sizeof(address), "tcp://%.*s:%u", (int) host.len, host.p, port); - struct mg_connection *nc = mg_connect(&ctx->mgr, address, http_req_ev); - nc->user_data = &ctx->ev_data; - mg_set_protocol_http_websocket(nc); - - ctx->ev_data.resp = calloc(1, sizeof(response_t)); - ctx->ev_data.done = FALSE; - - mg_printf( - nc, "%s %.*s HTTP/1.1\r\n" - "Host: %.*s\r\n" - "Content-Length: %zu\r\n" - "%s\r\n" - "%s", - method, (int) path.len, path.p, - (int) (path.p - host.p), host.p, - strlen(post_data), - extra_headers, - post_data - ); - - return ctx; -} - -subreq_ctx_t *web_post_async(const char *url, const char *data) { - return http_req(url, SIST2_HEADERS, data, "POST"); + return req; } response_t *web_get(const char *url, int timeout) { diff --git a/src/index/web.h b/src/index/web.h index 0a49e4d..7f29bc2 100644 --- a/src/index/web.h +++ b/src/index/web.h @@ -3,6 +3,7 @@ #include "src/sist.h" #include +#include typedef struct response { char *body; @@ -16,13 +17,20 @@ typedef struct { } http_ev_data_t; typedef struct { - http_ev_data_t ev_data; - struct mg_mgr mgr; + char* data; + dyn_buffer_t response_buf; + struct curl_slist *headers; + CURL *handle; + CURLM *multi; + response_t *response; + int running_handles; + int done; } subreq_ctx_t; response_t *web_get(const char *url, int timeout); response_t *web_post(const char * url, const char * data); -subreq_ctx_t *web_post_async(const char *url, const char *data); +void web_post_async_poll(subreq_ctx_t* req); +subreq_ctx_t *web_post_async(const char *url, char *data); response_t *web_put(const char *url, const char *data); response_t *web_delete(const char *url); diff --git a/src/web/serve.c b/src/web/serve.c index c964d5a..17f6441 100644 --- a/src/web/serve.c +++ b/src/web/serve.c @@ -10,8 +10,6 @@ #include -#define CHUNK_SIZE 1024 * 1024 * 10 - static int has_prefix(const struct mg_str *str, const struct mg_str *prefix) { return str->len > prefix->len && memcmp(str->p, prefix->p, prefix->len) == 0; @@ -240,7 +238,6 @@ void search(struct mg_connection *nc, struct http_message *hm) { snprintf(url, 4096, "%s/%s/_search", WebCtx.es_url, WebCtx.es_index); nc->user_data = web_post_async(url, body); - free(body); } void serve_file_from_url(cJSON *json, index_t *idx, struct mg_connection *nc) { @@ -668,11 +665,11 @@ static void ev_router(struct mg_connection *nc, int ev, void *p) { if (nc->user_data != NULL) { //Waiting for ES reply subreq_ctx_t *ctx = (subreq_ctx_t *) nc->user_data; - mg_mgr_poll(&ctx->mgr, 0); + web_post_async_poll(ctx); - if (ctx->ev_data.done == TRUE) { + if (ctx->done == TRUE) { - response_t *r = ctx->ev_data.resp; + response_t *r = ctx->response; if (r->status_code == 200) { send_response_line(nc, 200, r->size, "Content-Type: application/json"); @@ -695,6 +692,8 @@ static void ev_router(struct mg_connection *nc, int ev, void *p) { } free_response(r); + free(ctx->data); + free(ctx); nc->flags |= MG_F_SEND_AND_CLOSE; nc->user_data = NULL; }