# HG changeset patch # User Maxim Dounin # Date 1427134159 -10800 # Node ID a08fad30aeac5dcacaf8ac568c3dd2f56ae77556 # Parent 42d9beeb22db39739edbfc52047dbf0bc5ae6a55 Request body: unbuffered reading. The r->request_body_no_buffering flag was introduced. It instructs client request body reading code to avoid reading the whole body, and to call post_handler early instead. The caller should use the ngx_http_read_unbuffered_request_body() function to read remaining parts of the body. Upstream module is now able to use this mode, if configured with the proxy_request_buffering directive. diff --git a/src/http/modules/ngx_http_proxy_module.c b/src/http/modules/ngx_http_proxy_module.c --- a/src/http/modules/ngx_http_proxy_module.c +++ b/src/http/modules/ngx_http_proxy_module.c @@ -292,6 +292,13 @@ static ngx_command_t ngx_http_proxy_com offsetof(ngx_http_proxy_loc_conf_t, upstream.buffering), NULL }, + { ngx_string("proxy_request_buffering"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_proxy_loc_conf_t, upstream.request_buffering), + NULL }, + { ngx_string("proxy_ignore_client_abort"), NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG, ngx_conf_set_flag_slot, @@ -876,6 +883,15 @@ ngx_http_proxy_handler(ngx_http_request_ u->accel = 1; + if (!plcf->upstream.request_buffering + && plcf->body_values == NULL && plcf->upstream.pass_request_body + && !r->headers_in.chunked) + { + /* TODO: support chunked when using HTTP/1.1 */ + + r->request_body_no_buffering = 1; + } + rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init); if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { @@ -1393,7 +1409,11 @@ ngx_http_proxy_create_request(ngx_http_r "http proxy header:%N\"%*s\"", (size_t) (b->last - b->pos), b->pos); - if (plcf->body_values == NULL && plcf->upstream.pass_request_body) { + if (r->request_body_no_buffering) { + + u->request_bufs = cl; + + } else if (plcf->body_values == NULL && plcf->upstream.pass_request_body) { body = u->request_bufs; u->request_bufs = cl; @@ -2582,6 +2602,7 @@ ngx_http_proxy_create_loc_conf(ngx_conf_ conf->upstream.store_access = NGX_CONF_UNSET_UINT; conf->upstream.next_upstream_tries = NGX_CONF_UNSET_UINT; conf->upstream.buffering = NGX_CONF_UNSET; + conf->upstream.request_buffering = NGX_CONF_UNSET; conf->upstream.ignore_client_abort = NGX_CONF_UNSET; conf->upstream.force_ranges = NGX_CONF_UNSET; @@ -2691,6 +2712,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t ngx_conf_merge_value(conf->upstream.buffering, prev->upstream.buffering, 1); + ngx_conf_merge_value(conf->upstream.request_buffering, + prev->upstream.request_buffering, 1); + ngx_conf_merge_value(conf->upstream.ignore_client_abort, prev->upstream.ignore_client_abort, 0); diff --git a/src/http/ngx_http.h b/src/http/ngx_http.h --- a/src/http/ngx_http.h +++ b/src/http/ngx_http.h @@ -138,6 +138,7 @@ ngx_int_t ngx_http_send_special(ngx_http ngx_int_t ngx_http_read_client_request_body(ngx_http_request_t *r, ngx_http_client_body_handler_pt post_handler); +ngx_int_t ngx_http_read_unbuffered_request_body(ngx_http_request_t *r); ngx_int_t ngx_http_send_header(ngx_http_request_t *r); ngx_int_t ngx_http_special_response_handler(ngx_http_request_t *r, diff --git a/src/http/ngx_http_request.c b/src/http/ngx_http_request.c --- a/src/http/ngx_http_request.c +++ b/src/http/ngx_http_request.c @@ -2525,6 +2525,11 @@ ngx_http_finalize_connection(ngx_http_re return; } + if (r->reading_body) { + r->keepalive = 0; + r->lingering_close = 1; + } + if (!ngx_terminate && !ngx_exiting && r->keepalive diff --git a/src/http/ngx_http_request.h b/src/http/ngx_http_request.h --- a/src/http/ngx_http_request.h +++ b/src/http/ngx_http_request.h @@ -473,6 +473,7 @@ struct ngx_http_request_s { unsigned request_body_in_clean_file:1; unsigned request_body_file_group_access:1; unsigned request_body_file_log_level:3; + unsigned request_body_no_buffering:1; unsigned subrequest_in_memory:1; unsigned waited:1; @@ -509,6 +510,7 @@ struct ngx_http_request_s { unsigned keepalive:1; unsigned lingering_close:1; unsigned discard_body:1; + unsigned reading_body:1; unsigned internal:1; unsigned error_page:1; unsigned filter_finalize:1; diff --git a/src/http/ngx_http_request_body.c b/src/http/ngx_http_request_body.c --- a/src/http/ngx_http_request_body.c +++ b/src/http/ngx_http_request_body.c @@ -42,12 +42,14 @@ ngx_http_read_client_request_body(ngx_ht #if (NGX_HTTP_SPDY) if (r->spdy_stream && r == r->main) { + r->request_body_no_buffering = 0; rc = ngx_http_spdy_read_request_body(r, post_handler); goto done; } #endif if (r != r->main || r->request_body || r->discard_body) { + r->request_body_no_buffering = 0; post_handler(r); return NGX_OK; } @@ -57,6 +59,10 @@ ngx_http_read_client_request_body(ngx_ht goto done; } + if (r->request_body_no_buffering) { + r->request_body_in_file_only = 0; + } + rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t)); if (rb == NULL) { rc = NGX_HTTP_INTERNAL_SERVER_ERROR; @@ -79,6 +85,7 @@ ngx_http_read_client_request_body(ngx_ht r->request_body = rb; if (r->headers_in.content_length_n < 0 && !r->headers_in.chunked) { + r->request_body_no_buffering = 0; post_handler(r); return NGX_OK; } @@ -171,6 +178,8 @@ ngx_http_read_client_request_body(ngx_ht } } + r->request_body_no_buffering = 0; + post_handler(r); return NGX_OK; @@ -214,6 +223,21 @@ ngx_http_read_client_request_body(ngx_ht done: + if (r->request_body_no_buffering + && (rc == NGX_OK || rc == NGX_AGAIN)) + { + if (rc == NGX_OK) { + r->request_body_no_buffering = 0; + + } else { + /* rc == NGX_AGAIN */ + r->reading_body = 1; + } + + r->read_event_handler = ngx_http_block_reading; + post_handler(r); + } + if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { r->main->count--; } @@ -222,6 +246,26 @@ done: } +ngx_int_t +ngx_http_read_unbuffered_request_body(ngx_http_request_t *r) +{ + ngx_int_t rc; + + if (r->connection->read->timedout) { + r->connection->timedout = 1; + return NGX_HTTP_REQUEST_TIME_OUT; + } + + rc = ngx_http_do_read_client_request_body(r); + + if (rc == NGX_OK) { + r->reading_body = 0; + } + + return rc; +} + + static void ngx_http_read_client_request_body_handler(ngx_http_request_t *r) { @@ -264,18 +308,43 @@ ngx_http_do_read_client_request_body(ngx for ( ;; ) { if (rb->buf->last == rb->buf->end) { - /* pass buffer to request body filter chain */ + if (rb->buf->pos != rb->buf->last) { + + /* pass buffer to request body filter chain */ - out.buf = rb->buf; - out.next = NULL; + out.buf = rb->buf; + out.next = NULL; + + rc = ngx_http_request_body_filter(r, &out); - rc = ngx_http_request_body_filter(r, &out); + if (rc != NGX_OK) { + return rc; + } + + } else { - if (rc != NGX_OK) { - return rc; + /* update chains */ + + rc = ngx_http_request_body_filter(r, NULL); + + if (rc != NGX_OK) { + return rc; + } } if (rb->busy != NULL) { + if (r->request_body_no_buffering) { + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + return NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + return NGX_AGAIN; + } + return NGX_HTTP_INTERNAL_SERVER_ERROR; } @@ -342,6 +411,22 @@ ngx_http_do_read_client_request_body(ngx } if (!c->read->ready) { + + if (r->request_body_no_buffering + && rb->buf->pos != rb->buf->last) + { + /* pass buffer to request body filter chain */ + + out.buf = rb->buf; + out.next = NULL; + + rc = ngx_http_request_body_filter(r, &out); + + if (rc != NGX_OK) { + return rc; + } + } + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); ngx_add_timer(c->read, clcf->client_body_timeout); @@ -387,9 +472,10 @@ ngx_http_do_read_client_request_body(ngx } } - r->read_event_handler = ngx_http_block_reading; - - rb->post_handler(r); + if (!r->request_body_no_buffering) { + r->read_event_handler = ngx_http_block_reading; + rb->post_handler(r); + } return NGX_OK; } @@ -1085,7 +1171,8 @@ ngx_http_request_body_save_filter(ngx_ht } if (rb->rest > 0 - && rb->buf && rb->buf->last == rb->buf->end) + && rb->buf && rb->buf->last == rb->buf->end + && !r->request_body_no_buffering) { if (ngx_http_write_request_body(r) != NGX_OK) { return NGX_HTTP_INTERNAL_SERVER_ERROR; diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c --- a/src/http/ngx_http_upstream.c +++ b/src/http/ngx_http_upstream.c @@ -36,9 +36,12 @@ static void ngx_http_upstream_connect(ng static ngx_int_t ngx_http_upstream_reinit(ngx_http_request_t *r, ngx_http_upstream_t *u); static void ngx_http_upstream_send_request(ngx_http_request_t *r, - ngx_http_upstream_t *u); + ngx_http_upstream_t *u, ngx_uint_t do_write); +static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r, + ngx_http_upstream_t *u, ngx_uint_t do_write); static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u); +static void ngx_http_upstream_read_request_handler(ngx_http_request_t *r); static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u); static ngx_int_t ngx_http_upstream_test_next(ngx_http_request_t *r, @@ -568,8 +571,11 @@ ngx_http_upstream_init_request(ngx_http_ u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; - u->output.output_filter = ngx_chain_writer; - u->output.filter_ctx = &u->writer; + + if (u->output.output_filter == NULL) { + u->output.output_filter = ngx_chain_writer; + u->output.filter_ctx = &u->writer; + } u->writer.pool = r->pool; @@ -1432,7 +1438,7 @@ ngx_http_upstream_connect(ngx_http_reque #endif - ngx_http_upstream_send_request(r, u); + ngx_http_upstream_send_request(r, u, 1); } @@ -1536,7 +1542,7 @@ ngx_http_upstream_ssl_handshake(ngx_conn c = r->connection; - ngx_http_upstream_send_request(r, u); + ngx_http_upstream_send_request(r, u, 1); ngx_http_run_posted_requests(c); return; @@ -1724,7 +1730,8 @@ ngx_http_upstream_reinit(ngx_http_reques static void -ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) +ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u, + ngx_uint_t do_write) { ngx_int_t rc; ngx_connection_t *c; @@ -1741,21 +1748,25 @@ ngx_http_upstream_send_request(ngx_http_ c->log->action = "sending request to upstream"; - rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs); - - u->request_sent = 1; + rc = ngx_http_upstream_send_request_body(r, u, do_write); if (rc == NGX_ERROR) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } - if (c->write->timer_set) { - ngx_del_timer(c->write); + if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { + ngx_http_upstream_finalize_request(r, u, rc); + return; } if (rc == NGX_AGAIN) { - ngx_add_timer(c->write, u->conf->send_timeout); + if (!c->write->ready) { + ngx_add_timer(c->write, u->conf->send_timeout); + + } else if (c->write->timer_set) { + ngx_del_timer(c->write); + } if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, @@ -1768,6 +1779,10 @@ ngx_http_upstream_send_request(ngx_http_ /* rc == NGX_OK */ + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { if (ngx_tcp_push(c->fd) == NGX_ERROR) { ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, @@ -1797,6 +1812,123 @@ ngx_http_upstream_send_request(ngx_http_ } +static ngx_int_t +ngx_http_upstream_send_request_body(ngx_http_request_t *r, + ngx_http_upstream_t *u, ngx_uint_t do_write) +{ + int tcp_nodelay; + ngx_int_t rc; + ngx_chain_t *out, *cl, *ln; + ngx_connection_t *c; + ngx_http_core_loc_conf_t *clcf; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http upstream send request body"); + + if (!r->request_body_no_buffering) { + + /* buffered request body */ + + if (!u->request_sent) { + u->request_sent = 1; + out = u->request_bufs; + + } else { + out = NULL; + } + + return ngx_output_chain(&u->output, out); + } + + if (!u->request_sent) { + u->request_sent = 1; + out = u->request_bufs; + + if (r->request_body->bufs) { + for (cl = out; cl->next; cl = out->next) { /* void */ } + cl->next = r->request_body->bufs; + r->request_body->bufs = NULL; + } + + c = u->peer.connection; + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + + if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay"); + + tcp_nodelay = 1; + + if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, + (const void *) &tcp_nodelay, sizeof(int)) == -1) + { + ngx_connection_error(c, ngx_socket_errno, + "setsockopt(TCP_NODELAY) failed"); + return NGX_ERROR; + } + + c->tcp_nodelay = NGX_TCP_NODELAY_SET; + } + + r->read_event_handler = ngx_http_upstream_read_request_handler; + + } else { + out = NULL; + } + + for ( ;; ) { + + if (do_write) { + rc = ngx_output_chain(&u->output, out); + + if (rc == NGX_ERROR) { + return NGX_ERROR; + } + + while (out) { + ln = out; + out = out->next; + ngx_free_chain(r->pool, ln); + } + + if (rc == NGX_OK && !r->reading_body) { + break; + } + } + + if (r->reading_body) { + /* read client request body */ + + rc = ngx_http_read_unbuffered_request_body(r); + + if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + out = r->request_body->bufs; + r->request_body->bufs = NULL; + } + + /* stop if there is nothing to send */ + + if (out == NULL) { + rc = NGX_AGAIN; + break; + } + + do_write = 1; + } + + if (!r->reading_body) { + if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { + r->read_event_handler = + ngx_http_upstream_rd_check_broken_connection; + } + } + + return rc; +} + + static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u) @@ -1830,7 +1962,29 @@ ngx_http_upstream_send_request_handler(n return; } - ngx_http_upstream_send_request(r, u); + ngx_http_upstream_send_request(r, u, 1); +} + + +static void +ngx_http_upstream_read_request_handler(ngx_http_request_t *r) +{ + ngx_connection_t *c; + ngx_http_upstream_t *u; + + c = r->connection; + u = r->upstream; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http upstream read request handler"); + + if (c->read->timedout) { + c->timedout = 1; + ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT); + return; + } + + ngx_http_upstream_send_request(r, u, 0); } @@ -3626,7 +3780,9 @@ ngx_http_upstream_next(ngx_http_request_ "upstream timed out"); } - if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR) { + if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR + && (!u->request_sent || !r->request_body_no_buffering)) + { status = 0; /* TODO: inform balancer instead */ @@ -3674,6 +3830,7 @@ ngx_http_upstream_next(ngx_http_request_ if (u->peer.tries == 0 || !(u->conf->next_upstream & ft_type) + || (u->request_sent && r->request_body_no_buffering) || (timeout && ngx_current_msec - u->peer.start_time >= timeout)) { #if (NGX_HTTP_CACHE) diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h --- a/src/http/ngx_http_upstream.h +++ b/src/http/ngx_http_upstream.h @@ -160,6 +160,7 @@ typedef struct { ngx_uint_t store_access; ngx_uint_t next_upstream_tries; ngx_flag_t buffering; + ngx_flag_t request_buffering; ngx_flag_t pass_request_headers; ngx_flag_t pass_request_body; diff --git a/src/http/ngx_http_variables.c b/src/http/ngx_http_variables.c --- a/src/http/ngx_http_variables.c +++ b/src/http/ngx_http_variables.c @@ -1081,6 +1081,10 @@ ngx_http_variable_content_length(ngx_htt v->no_cacheable = 0; v->not_found = 0; + } else if (r->reading_body) { + v->not_found = 1; + v->no_cacheable = 1; + } else if (r->headers_in.content_length_n >= 0) { p = ngx_pnalloc(r->pool, NGX_OFF_T_LEN); if (p == NULL) {