changeset 6050:a08fad30aeac

Request body: unbuffered reading. The r->request_body_no_buffering flag was introduced. It instructs client request body reading code to avoid reading the whole body, and to call post_handler early instead. The caller should use the ngx_http_read_unbuffered_request_body() function to read remaining parts of the body. Upstream module is now able to use this mode, if configured with the proxy_request_buffering directive.
author Maxim Dounin <mdounin@mdounin.ru>
date Mon, 23 Mar 2015 21:09:19 +0300
parents 42d9beeb22db
children d97e6be2d292
files src/http/modules/ngx_http_proxy_module.c src/http/ngx_http.h src/http/ngx_http_request.c src/http/ngx_http_request.h src/http/ngx_http_request_body.c src/http/ngx_http_upstream.c src/http/ngx_http_upstream.h src/http/ngx_http_variables.c
diffstat 8 files changed, 306 insertions(+), 25 deletions(-) [+]
line wrap: on
line diff
--- a/src/http/modules/ngx_http_proxy_module.c
+++ b/src/http/modules/ngx_http_proxy_module.c
@@ -292,6 +292,13 @@ static ngx_command_t  ngx_http_proxy_com
       offsetof(ngx_http_proxy_loc_conf_t, upstream.buffering),
       NULL },
 
+    { ngx_string("proxy_request_buffering"),
+      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
+      ngx_conf_set_flag_slot,
+      NGX_HTTP_LOC_CONF_OFFSET,
+      offsetof(ngx_http_proxy_loc_conf_t, upstream.request_buffering),
+      NULL },
+
     { ngx_string("proxy_ignore_client_abort"),
       NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
       ngx_conf_set_flag_slot,
@@ -876,6 +883,15 @@ ngx_http_proxy_handler(ngx_http_request_
 
     u->accel = 1;
 
+    if (!plcf->upstream.request_buffering
+        && plcf->body_values == NULL && plcf->upstream.pass_request_body
+        && !r->headers_in.chunked)
+    {
+        /* TODO: support chunked when using HTTP/1.1 */
+
+        r->request_body_no_buffering = 1;
+    }
+
     rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
 
     if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
@@ -1393,7 +1409,11 @@ ngx_http_proxy_create_request(ngx_http_r
                    "http proxy header:%N\"%*s\"",
                    (size_t) (b->last - b->pos), b->pos);
 
-    if (plcf->body_values == NULL && plcf->upstream.pass_request_body) {
+    if (r->request_body_no_buffering) {
+
+        u->request_bufs = cl;
+
+    } else if (plcf->body_values == NULL && plcf->upstream.pass_request_body) {
 
         body = u->request_bufs;
         u->request_bufs = cl;
@@ -2582,6 +2602,7 @@ ngx_http_proxy_create_loc_conf(ngx_conf_
     conf->upstream.store_access = NGX_CONF_UNSET_UINT;
     conf->upstream.next_upstream_tries = NGX_CONF_UNSET_UINT;
     conf->upstream.buffering = NGX_CONF_UNSET;
+    conf->upstream.request_buffering = NGX_CONF_UNSET;
     conf->upstream.ignore_client_abort = NGX_CONF_UNSET;
     conf->upstream.force_ranges = NGX_CONF_UNSET;
 
@@ -2691,6 +2712,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t
     ngx_conf_merge_value(conf->upstream.buffering,
                               prev->upstream.buffering, 1);
 
+    ngx_conf_merge_value(conf->upstream.request_buffering,
+                              prev->upstream.request_buffering, 1);
+
     ngx_conf_merge_value(conf->upstream.ignore_client_abort,
                               prev->upstream.ignore_client_abort, 0);
 
--- a/src/http/ngx_http.h
+++ b/src/http/ngx_http.h
@@ -138,6 +138,7 @@ ngx_int_t ngx_http_send_special(ngx_http
 
 ngx_int_t ngx_http_read_client_request_body(ngx_http_request_t *r,
     ngx_http_client_body_handler_pt post_handler);
+ngx_int_t ngx_http_read_unbuffered_request_body(ngx_http_request_t *r);
 
 ngx_int_t ngx_http_send_header(ngx_http_request_t *r);
 ngx_int_t ngx_http_special_response_handler(ngx_http_request_t *r,
--- a/src/http/ngx_http_request.c
+++ b/src/http/ngx_http_request.c
@@ -2525,6 +2525,11 @@ ngx_http_finalize_connection(ngx_http_re
         return;
     }
 
+    if (r->reading_body) {
+        r->keepalive = 0;
+        r->lingering_close = 1;
+    }
+
     if (!ngx_terminate
          && !ngx_exiting
          && r->keepalive
--- a/src/http/ngx_http_request.h
+++ b/src/http/ngx_http_request.h
@@ -473,6 +473,7 @@ struct ngx_http_request_s {
     unsigned                          request_body_in_clean_file:1;
     unsigned                          request_body_file_group_access:1;
     unsigned                          request_body_file_log_level:3;
+    unsigned                          request_body_no_buffering:1;
 
     unsigned                          subrequest_in_memory:1;
     unsigned                          waited:1;
@@ -509,6 +510,7 @@ struct ngx_http_request_s {
     unsigned                          keepalive:1;
     unsigned                          lingering_close:1;
     unsigned                          discard_body:1;
+    unsigned                          reading_body:1;
     unsigned                          internal:1;
     unsigned                          error_page:1;
     unsigned                          filter_finalize:1;
--- a/src/http/ngx_http_request_body.c
+++ b/src/http/ngx_http_request_body.c
@@ -42,12 +42,14 @@ ngx_http_read_client_request_body(ngx_ht
 
 #if (NGX_HTTP_SPDY)
     if (r->spdy_stream && r == r->main) {
+        r->request_body_no_buffering = 0;
         rc = ngx_http_spdy_read_request_body(r, post_handler);
         goto done;
     }
 #endif
 
     if (r != r->main || r->request_body || r->discard_body) {
+        r->request_body_no_buffering = 0;
         post_handler(r);
         return NGX_OK;
     }
@@ -57,6 +59,10 @@ ngx_http_read_client_request_body(ngx_ht
         goto done;
     }
 
+    if (r->request_body_no_buffering) {
+        r->request_body_in_file_only = 0;
+    }
+
     rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t));
     if (rb == NULL) {
         rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
@@ -79,6 +85,7 @@ ngx_http_read_client_request_body(ngx_ht
     r->request_body = rb;
 
     if (r->headers_in.content_length_n < 0 && !r->headers_in.chunked) {
+        r->request_body_no_buffering = 0;
         post_handler(r);
         return NGX_OK;
     }
@@ -171,6 +178,8 @@ ngx_http_read_client_request_body(ngx_ht
             }
         }
 
+        r->request_body_no_buffering = 0;
+
         post_handler(r);
 
         return NGX_OK;
@@ -214,6 +223,21 @@ ngx_http_read_client_request_body(ngx_ht
 
 done:
 
+    if (r->request_body_no_buffering
+        && (rc == NGX_OK || rc == NGX_AGAIN))
+    {
+        if (rc == NGX_OK) {
+            r->request_body_no_buffering = 0;
+
+        } else {
+            /* rc == NGX_AGAIN */
+            r->reading_body = 1;
+        }
+
+        r->read_event_handler = ngx_http_block_reading;
+        post_handler(r);
+    }
+
     if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
         r->main->count--;
     }
@@ -222,6 +246,26 @@ done:
 }
 
 
+ngx_int_t
+ngx_http_read_unbuffered_request_body(ngx_http_request_t *r)
+{
+    ngx_int_t  rc;
+
+    if (r->connection->read->timedout) {
+        r->connection->timedout = 1;
+        return NGX_HTTP_REQUEST_TIME_OUT;
+    }
+
+    rc = ngx_http_do_read_client_request_body(r);
+
+    if (rc == NGX_OK) {
+        r->reading_body = 0;
+    }
+
+    return rc;
+}
+
+
 static void
 ngx_http_read_client_request_body_handler(ngx_http_request_t *r)
 {
@@ -264,18 +308,43 @@ ngx_http_do_read_client_request_body(ngx
         for ( ;; ) {
             if (rb->buf->last == rb->buf->end) {
 
-                /* pass buffer to request body filter chain */
+                if (rb->buf->pos != rb->buf->last) {
+
+                    /* pass buffer to request body filter chain */
 
-                out.buf = rb->buf;
-                out.next = NULL;
+                    out.buf = rb->buf;
+                    out.next = NULL;
+
+                    rc = ngx_http_request_body_filter(r, &out);
 
-                rc = ngx_http_request_body_filter(r, &out);
+                    if (rc != NGX_OK) {
+                        return rc;
+                    }
+
+                } else {
 
-                if (rc != NGX_OK) {
-                    return rc;
+                    /* update chains */
+
+                    rc = ngx_http_request_body_filter(r, NULL);
+
+                    if (rc != NGX_OK) {
+                        return rc;
+                    }
                 }
 
                 if (rb->busy != NULL) {
+                    if (r->request_body_no_buffering) {
+                        if (c->read->timer_set) {
+                            ngx_del_timer(c->read);
+                        }
+
+                        if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+                            return NGX_HTTP_INTERNAL_SERVER_ERROR;
+                        }
+
+                        return NGX_AGAIN;
+                    }
+
                     return NGX_HTTP_INTERNAL_SERVER_ERROR;
                 }
 
@@ -342,6 +411,22 @@ ngx_http_do_read_client_request_body(ngx
         }
 
         if (!c->read->ready) {
+
+            if (r->request_body_no_buffering
+                && rb->buf->pos != rb->buf->last)
+            {
+                /* pass buffer to request body filter chain */
+
+                out.buf = rb->buf;
+                out.next = NULL;
+
+                rc = ngx_http_request_body_filter(r, &out);
+
+                if (rc != NGX_OK) {
+                    return rc;
+                }
+            }
+
             clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
             ngx_add_timer(c->read, clcf->client_body_timeout);
 
@@ -387,9 +472,10 @@ ngx_http_do_read_client_request_body(ngx
         }
     }
 
-    r->read_event_handler = ngx_http_block_reading;
-
-    rb->post_handler(r);
+    if (!r->request_body_no_buffering) {
+        r->read_event_handler = ngx_http_block_reading;
+        rb->post_handler(r);
+    }
 
     return NGX_OK;
 }
@@ -1085,7 +1171,8 @@ ngx_http_request_body_save_filter(ngx_ht
     }
 
     if (rb->rest > 0
-        && rb->buf && rb->buf->last == rb->buf->end)
+        && rb->buf && rb->buf->last == rb->buf->end
+        && !r->request_body_no_buffering)
     {
         if (ngx_http_write_request_body(r) != NGX_OK) {
             return NGX_HTTP_INTERNAL_SERVER_ERROR;
--- a/src/http/ngx_http_upstream.c
+++ b/src/http/ngx_http_upstream.c
@@ -36,9 +36,12 @@ static void ngx_http_upstream_connect(ng
 static ngx_int_t ngx_http_upstream_reinit(ngx_http_request_t *r,
     ngx_http_upstream_t *u);
 static void ngx_http_upstream_send_request(ngx_http_request_t *r,
-    ngx_http_upstream_t *u);
+    ngx_http_upstream_t *u, ngx_uint_t do_write);
+static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r,
+    ngx_http_upstream_t *u, ngx_uint_t do_write);
 static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
     ngx_http_upstream_t *u);
+static void ngx_http_upstream_read_request_handler(ngx_http_request_t *r);
 static void ngx_http_upstream_process_header(ngx_http_request_t *r,
     ngx_http_upstream_t *u);
 static ngx_int_t ngx_http_upstream_test_next(ngx_http_request_t *r,
@@ -568,8 +571,11 @@ ngx_http_upstream_init_request(ngx_http_
     u->output.pool = r->pool;
     u->output.bufs.num = 1;
     u->output.bufs.size = clcf->client_body_buffer_size;
-    u->output.output_filter = ngx_chain_writer;
-    u->output.filter_ctx = &u->writer;
+
+    if (u->output.output_filter == NULL) {
+        u->output.output_filter = ngx_chain_writer;
+        u->output.filter_ctx = &u->writer;
+    }
 
     u->writer.pool = r->pool;
 
@@ -1432,7 +1438,7 @@ ngx_http_upstream_connect(ngx_http_reque
 
 #endif
 
-    ngx_http_upstream_send_request(r, u);
+    ngx_http_upstream_send_request(r, u, 1);
 }
 
 
@@ -1536,7 +1542,7 @@ ngx_http_upstream_ssl_handshake(ngx_conn
 
         c = r->connection;
 
-        ngx_http_upstream_send_request(r, u);
+        ngx_http_upstream_send_request(r, u, 1);
 
         ngx_http_run_posted_requests(c);
         return;
@@ -1724,7 +1730,8 @@ ngx_http_upstream_reinit(ngx_http_reques
 
 
 static void
-ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
+ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u,
+    ngx_uint_t do_write)
 {
     ngx_int_t          rc;
     ngx_connection_t  *c;
@@ -1741,21 +1748,25 @@ ngx_http_upstream_send_request(ngx_http_
 
     c->log->action = "sending request to upstream";
 
-    rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
-
-    u->request_sent = 1;
+    rc = ngx_http_upstream_send_request_body(r, u, do_write);
 
     if (rc == NGX_ERROR) {
         ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
         return;
     }
 
-    if (c->write->timer_set) {
-        ngx_del_timer(c->write);
+    if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+        ngx_http_upstream_finalize_request(r, u, rc);
+        return;
     }
 
     if (rc == NGX_AGAIN) {
-        ngx_add_timer(c->write, u->conf->send_timeout);
+        if (!c->write->ready) {
+            ngx_add_timer(c->write, u->conf->send_timeout);
+
+        } else if (c->write->timer_set) {
+            ngx_del_timer(c->write);
+        }
 
         if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
             ngx_http_upstream_finalize_request(r, u,
@@ -1768,6 +1779,10 @@ ngx_http_upstream_send_request(ngx_http_
 
     /* rc == NGX_OK */
 
+    if (c->write->timer_set) {
+        ngx_del_timer(c->write);
+    }
+
     if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
         if (ngx_tcp_push(c->fd) == NGX_ERROR) {
             ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
@@ -1797,6 +1812,123 @@ ngx_http_upstream_send_request(ngx_http_
 }
 
 
+static ngx_int_t
+ngx_http_upstream_send_request_body(ngx_http_request_t *r,
+    ngx_http_upstream_t *u, ngx_uint_t do_write)
+{
+    int                        tcp_nodelay;
+    ngx_int_t                  rc;
+    ngx_chain_t               *out, *cl, *ln;
+    ngx_connection_t          *c;
+    ngx_http_core_loc_conf_t  *clcf;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                   "http upstream send request body");
+
+    if (!r->request_body_no_buffering) {
+
+       /* buffered request body */
+
+       if (!u->request_sent) {
+           u->request_sent = 1;
+           out = u->request_bufs;
+
+       } else {
+           out = NULL;
+       }
+
+       return ngx_output_chain(&u->output, out);
+    }
+
+    if (!u->request_sent) {
+        u->request_sent = 1;
+        out = u->request_bufs;
+
+        if (r->request_body->bufs) {
+            for (cl = out; cl->next; cl = out->next) { /* void */ }
+            cl->next = r->request_body->bufs;
+            r->request_body->bufs = NULL;
+        }
+
+        c = u->peer.connection;
+        clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
+
+        if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
+            ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
+
+            tcp_nodelay = 1;
+
+            if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
+                           (const void *) &tcp_nodelay, sizeof(int)) == -1)
+            {
+                ngx_connection_error(c, ngx_socket_errno,
+                                     "setsockopt(TCP_NODELAY) failed");
+                return NGX_ERROR;
+            }
+
+            c->tcp_nodelay = NGX_TCP_NODELAY_SET;
+        }
+
+        r->read_event_handler = ngx_http_upstream_read_request_handler;
+
+    } else {
+        out = NULL;
+    }
+
+    for ( ;; ) {
+
+        if (do_write) {
+            rc = ngx_output_chain(&u->output, out);
+
+            if (rc == NGX_ERROR) {
+                return NGX_ERROR;
+            }
+
+            while (out) {
+                ln = out;
+                out = out->next;
+                ngx_free_chain(r->pool, ln);
+            }
+
+            if (rc == NGX_OK && !r->reading_body) {
+                break;
+            }
+        }
+
+        if (r->reading_body) {
+            /* read client request body */
+
+            rc = ngx_http_read_unbuffered_request_body(r);
+
+            if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+                return rc;
+            }
+
+            out = r->request_body->bufs;
+            r->request_body->bufs = NULL;
+        }
+
+        /* stop if there is nothing to send */
+
+        if (out == NULL) {
+            rc = NGX_AGAIN;
+            break;
+        }
+
+        do_write = 1;
+    }
+
+    if (!r->reading_body) {
+        if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
+            r->read_event_handler =
+                                  ngx_http_upstream_rd_check_broken_connection;
+        }
+    }
+
+    return rc;
+}
+
+
 static void
 ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
     ngx_http_upstream_t *u)
@@ -1830,7 +1962,29 @@ ngx_http_upstream_send_request_handler(n
         return;
     }
 
-    ngx_http_upstream_send_request(r, u);
+    ngx_http_upstream_send_request(r, u, 1);
+}
+
+
+static void
+ngx_http_upstream_read_request_handler(ngx_http_request_t *r)
+{
+    ngx_connection_t     *c;
+    ngx_http_upstream_t  *u;
+
+    c = r->connection;
+    u = r->upstream;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                   "http upstream read request handler");
+
+    if (c->read->timedout) {
+        c->timedout = 1;
+        ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
+        return;
+    }
+
+    ngx_http_upstream_send_request(r, u, 0);
 }
 
 
@@ -3626,7 +3780,9 @@ ngx_http_upstream_next(ngx_http_request_
                       "upstream timed out");
     }
 
-    if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR) {
+    if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR
+        && (!u->request_sent || !r->request_body_no_buffering))
+    {
         status = 0;
 
         /* TODO: inform balancer instead */
@@ -3674,6 +3830,7 @@ ngx_http_upstream_next(ngx_http_request_
 
         if (u->peer.tries == 0
             || !(u->conf->next_upstream & ft_type)
+            || (u->request_sent && r->request_body_no_buffering)
             || (timeout && ngx_current_msec - u->peer.start_time >= timeout))
         {
 #if (NGX_HTTP_CACHE)
--- a/src/http/ngx_http_upstream.h
+++ b/src/http/ngx_http_upstream.h
@@ -160,6 +160,7 @@ typedef struct {
     ngx_uint_t                       store_access;
     ngx_uint_t                       next_upstream_tries;
     ngx_flag_t                       buffering;
+    ngx_flag_t                       request_buffering;
     ngx_flag_t                       pass_request_headers;
     ngx_flag_t                       pass_request_body;
 
--- a/src/http/ngx_http_variables.c
+++ b/src/http/ngx_http_variables.c
@@ -1081,6 +1081,10 @@ ngx_http_variable_content_length(ngx_htt
         v->no_cacheable = 0;
         v->not_found = 0;
 
+    } else if (r->reading_body) {
+        v->not_found = 1;
+        v->no_cacheable = 1;
+
     } else if (r->headers_in.content_length_n >= 0) {
         p = ngx_pnalloc(r->pool, NGX_OFF_T_LEN);
         if (p == NULL) {