Mercurial > hg > nginx-quic
diff src/http/ngx_http_upstream.c @ 6050:a08fad30aeac
Request body: unbuffered reading.
The r->request_body_no_buffering flag was introduced. It instructs
client request body reading code to avoid reading the whole body, and
to call post_handler early instead. The caller should use the
ngx_http_read_unbuffered_request_body() function to read remaining
parts of the body.
Upstream module is now able to use this mode, if configured with
the proxy_request_buffering directive.
author | Maxim Dounin <mdounin@mdounin.ru> |
---|---|
date | Mon, 23 Mar 2015 21:09:19 +0300 |
parents | c8acea7c7041 |
children | 643f2ce02f1c |
line wrap: on
line diff
--- a/src/http/ngx_http_upstream.c +++ b/src/http/ngx_http_upstream.c @@ -36,9 +36,12 @@ static void ngx_http_upstream_connect(ng static ngx_int_t ngx_http_upstream_reinit(ngx_http_request_t *r, ngx_http_upstream_t *u); static void ngx_http_upstream_send_request(ngx_http_request_t *r, - ngx_http_upstream_t *u); + ngx_http_upstream_t *u, ngx_uint_t do_write); +static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r, + ngx_http_upstream_t *u, ngx_uint_t do_write); static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u); +static void ngx_http_upstream_read_request_handler(ngx_http_request_t *r); static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u); static ngx_int_t ngx_http_upstream_test_next(ngx_http_request_t *r, @@ -568,8 +571,11 @@ ngx_http_upstream_init_request(ngx_http_ u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; - u->output.output_filter = ngx_chain_writer; - u->output.filter_ctx = &u->writer; + + if (u->output.output_filter == NULL) { + u->output.output_filter = ngx_chain_writer; + u->output.filter_ctx = &u->writer; + } u->writer.pool = r->pool; @@ -1432,7 +1438,7 @@ ngx_http_upstream_connect(ngx_http_reque #endif - ngx_http_upstream_send_request(r, u); + ngx_http_upstream_send_request(r, u, 1); } @@ -1536,7 +1542,7 @@ ngx_http_upstream_ssl_handshake(ngx_conn c = r->connection; - ngx_http_upstream_send_request(r, u); + ngx_http_upstream_send_request(r, u, 1); ngx_http_run_posted_requests(c); return; @@ -1724,7 +1730,8 @@ ngx_http_upstream_reinit(ngx_http_reques static void -ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) +ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u, + ngx_uint_t do_write) { ngx_int_t rc; ngx_connection_t *c; @@ -1741,21 +1748,25 @@ ngx_http_upstream_send_request(ngx_http_ c->log->action = "sending request to upstream"; - rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs); - - u->request_sent = 1; + rc = ngx_http_upstream_send_request_body(r, u, do_write); if (rc == NGX_ERROR) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } - if (c->write->timer_set) { - ngx_del_timer(c->write); + if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { + ngx_http_upstream_finalize_request(r, u, rc); + return; } if (rc == NGX_AGAIN) { - ngx_add_timer(c->write, u->conf->send_timeout); + if (!c->write->ready) { + ngx_add_timer(c->write, u->conf->send_timeout); + + } else if (c->write->timer_set) { + ngx_del_timer(c->write); + } if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, @@ -1768,6 +1779,10 @@ ngx_http_upstream_send_request(ngx_http_ /* rc == NGX_OK */ + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { if (ngx_tcp_push(c->fd) == NGX_ERROR) { ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, @@ -1797,6 +1812,123 @@ ngx_http_upstream_send_request(ngx_http_ } +static ngx_int_t +ngx_http_upstream_send_request_body(ngx_http_request_t *r, + ngx_http_upstream_t *u, ngx_uint_t do_write) +{ + int tcp_nodelay; + ngx_int_t rc; + ngx_chain_t *out, *cl, *ln; + ngx_connection_t *c; + ngx_http_core_loc_conf_t *clcf; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http upstream send request body"); + + if (!r->request_body_no_buffering) { + + /* buffered request body */ + + if (!u->request_sent) { + u->request_sent = 1; + out = u->request_bufs; + + } else { + out = NULL; + } + + return ngx_output_chain(&u->output, out); + } + + if (!u->request_sent) { + u->request_sent = 1; + out = u->request_bufs; + + if (r->request_body->bufs) { + for (cl = out; cl->next; cl = out->next) { /* void */ } + cl->next = r->request_body->bufs; + r->request_body->bufs = NULL; + } + + c = u->peer.connection; + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + + if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay"); + + tcp_nodelay = 1; + + if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, + (const void *) &tcp_nodelay, sizeof(int)) == -1) + { + ngx_connection_error(c, ngx_socket_errno, + "setsockopt(TCP_NODELAY) failed"); + return NGX_ERROR; + } + + c->tcp_nodelay = NGX_TCP_NODELAY_SET; + } + + r->read_event_handler = ngx_http_upstream_read_request_handler; + + } else { + out = NULL; + } + + for ( ;; ) { + + if (do_write) { + rc = ngx_output_chain(&u->output, out); + + if (rc == NGX_ERROR) { + return NGX_ERROR; + } + + while (out) { + ln = out; + out = out->next; + ngx_free_chain(r->pool, ln); + } + + if (rc == NGX_OK && !r->reading_body) { + break; + } + } + + if (r->reading_body) { + /* read client request body */ + + rc = ngx_http_read_unbuffered_request_body(r); + + if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + out = r->request_body->bufs; + r->request_body->bufs = NULL; + } + + /* stop if there is nothing to send */ + + if (out == NULL) { + rc = NGX_AGAIN; + break; + } + + do_write = 1; + } + + if (!r->reading_body) { + if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { + r->read_event_handler = + ngx_http_upstream_rd_check_broken_connection; + } + } + + return rc; +} + + static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u) @@ -1830,7 +1962,29 @@ ngx_http_upstream_send_request_handler(n return; } - ngx_http_upstream_send_request(r, u); + ngx_http_upstream_send_request(r, u, 1); +} + + +static void +ngx_http_upstream_read_request_handler(ngx_http_request_t *r) +{ + ngx_connection_t *c; + ngx_http_upstream_t *u; + + c = r->connection; + u = r->upstream; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http upstream read request handler"); + + if (c->read->timedout) { + c->timedout = 1; + ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT); + return; + } + + ngx_http_upstream_send_request(r, u, 0); } @@ -3626,7 +3780,9 @@ ngx_http_upstream_next(ngx_http_request_ "upstream timed out"); } - if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR) { + if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR + && (!u->request_sent || !r->request_body_no_buffering)) + { status = 0; /* TODO: inform balancer instead */ @@ -3674,6 +3830,7 @@ ngx_http_upstream_next(ngx_http_request_ if (u->peer.tries == 0 || !(u->conf->next_upstream & ft_type) + || (u->request_sent && r->request_body_no_buffering) || (timeout && ngx_current_msec - u->peer.start_time >= timeout)) { #if (NGX_HTTP_CACHE)