diff src/stream/ngx_stream_proxy_module.c @ 6692:56fc55e32f23

Stream: filters.
author Roman Arutyunyan <arut@nginx.com>
date Thu, 15 Sep 2016 14:55:46 +0300
parents c02290241cbe
children edcd9303a4d3
line wrap: on
line diff
--- a/src/stream/ngx_stream_proxy_module.c
+++ b/src/stream/ngx_stream_proxy_module.c
@@ -84,10 +84,10 @@ static char *ngx_stream_proxy_pass(ngx_c
     void *conf);
 static char *ngx_stream_proxy_bind(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf);
-static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
 
 #if (NGX_STREAM_SSL)
 
+static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
 static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf,
     ngx_command_t *cmd, void *conf);
 static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s);
@@ -385,8 +385,6 @@ ngx_stream_proxy_handler(ngx_stream_sess
     }
 
     u->peer.type = c->type;
-
-    u->proxy_protocol = pscf->proxy_protocol;
     u->start_sec = ngx_time();
 
     c->write->handler = ngx_stream_proxy_downstream_handler;
@@ -411,28 +409,6 @@ ngx_stream_proxy_handler(ngx_stream_sess
         u->downstream_buf.pos = p;
         u->downstream_buf.last = p;
 
-        if (u->proxy_protocol
-#if (NGX_STREAM_SSL)
-            && pscf->ssl == NULL
-#endif
-            && pscf->buffer_size >= NGX_PROXY_PROTOCOL_MAX_HEADER)
-        {
-            /* optimization for a typical case */
-
-            ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
-                           "stream proxy send PROXY protocol header");
-
-            p = ngx_proxy_protocol_write(c, u->downstream_buf.last,
-                                         u->downstream_buf.end);
-            if (p == NULL) {
-                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
-                return;
-            }
-
-            u->downstream_buf.last = p;
-            u->proxy_protocol = 0;
-        }
-
         if (c->read->ready) {
             ngx_post_event(c->read, &ngx_posted_events);
         }
@@ -682,8 +658,13 @@ ngx_stream_proxy_connect(ngx_stream_sess
 
     c->log->action = "connecting to upstream";
 
+    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
     u = s->upstream;
 
+    u->connected = 0;
+    u->proxy_protocol = pscf->proxy_protocol;
+
     if (u->state) {
         u->state->response_time = ngx_current_msec - u->state->response_time;
     }
@@ -740,8 +721,6 @@ ngx_stream_proxy_connect(ngx_stream_sess
     pc->read->handler = ngx_stream_proxy_connect_handler;
     pc->write->handler = ngx_stream_proxy_connect_handler;
 
-    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
-
     ngx_add_timer(pc->write, pscf->connect_timeout);
 }
 
@@ -751,6 +730,7 @@ ngx_stream_proxy_init_upstream(ngx_strea
 {
     int                           tcp_nodelay;
     u_char                       *p;
+    ngx_chain_t                  *cl;
     ngx_connection_t             *c, *pc;
     ngx_log_handler_pt            handler;
     ngx_stream_upstream_t        *u;
@@ -782,21 +762,26 @@ ngx_stream_proxy_init_upstream(ngx_strea
         pc->tcp_nodelay = NGX_TCP_NODELAY_SET;
     }
 
-    if (u->proxy_protocol) {
-        if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
-            return;
-        }
-
-        u->proxy_protocol = 0;
-    }
-
     pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
 
 #if (NGX_STREAM_SSL)
-    if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) {
-        ngx_stream_proxy_ssl_init_connection(s);
-        return;
+
+    if (pc->type == SOCK_STREAM && pscf->ssl) {
+
+        if (u->proxy_protocol) {
+            if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
+                return;
+            }
+
+            u->proxy_protocol = 0;
+        }
+
+        if (pc->ssl == NULL) {
+            ngx_stream_proxy_ssl_init_connection(s);
+            return;
+        }
     }
+
 #endif
 
     c = s->connection;
@@ -838,14 +823,66 @@ ngx_stream_proxy_init_upstream(ngx_strea
         u->upstream_buf.last = p;
     }
 
-    if (c->type == SOCK_DGRAM) {
-        s->received = c->buffer->last - c->buffer->pos;
-        u->downstream_buf = *c->buffer;
-
-        if (pscf->responses == 0) {
-            pc->read->ready = 0;
-            pc->read->eof = 1;
+    if (c->buffer && c->buffer->pos < c->buffer->last) {
+        ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                       "stream proxy add preread buffer: %uz",
+                       c->buffer->last - c->buffer->pos);
+
+        cl = ngx_chain_get_free_buf(c->pool, &u->free);
+        if (cl == NULL) {
+            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+            return;
+        }
+
+        *cl->buf = *c->buffer;
+
+        cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+        cl->buf->flush = 1;
+        cl->buf->last_buf = (c->type == SOCK_DGRAM);
+
+        cl->next = u->upstream_out;
+        u->upstream_out = cl;
+    }
+
+    if (u->proxy_protocol) {
+        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                       "stream proxy add PROXY protocol header");
+
+        cl = ngx_chain_get_free_buf(c->pool, &u->free);
+        if (cl == NULL) {
+            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+            return;
         }
+
+        p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER);
+        if (p == NULL) {
+            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+            return;
+        }
+
+        cl->buf->pos = p;
+
+        p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER);
+        if (p == NULL) {
+            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+            return;
+        }
+
+        cl->buf->last = p;
+        cl->buf->temporary = 1;
+        cl->buf->flush = 0;
+        cl->buf->last_buf = 0;
+        cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+
+        cl->next = u->upstream_out;
+        u->upstream_out = cl;
+
+        u->proxy_protocol = 0;
+    }
+
+    if (c->type == SOCK_DGRAM && pscf->responses == 0) {
+        pc->read->ready = 0;
+        pc->read->eof = 1;
     }
 
     u->connected = 1;
@@ -861,6 +898,8 @@ ngx_stream_proxy_init_upstream(ngx_strea
 }
 
 
+#if (NGX_STREAM_SSL)
+
 static ngx_int_t
 ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s)
 {
@@ -931,8 +970,6 @@ ngx_stream_proxy_send_proxy_protocol(ngx
 }
 
 
-#if (NGX_STREAM_SSL)
-
 static char *
 ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf)
@@ -1412,8 +1449,10 @@ ngx_stream_proxy_process(ngx_stream_sess
     size_t                        size, limit_rate;
     ssize_t                       n;
     ngx_buf_t                    *b;
+    ngx_int_t                     rc;
     ngx_uint_t                    flags;
     ngx_msec_t                    delay;
+    ngx_chain_t                  *cl, **ll, **out, **busy;
     ngx_connection_t             *c, *pc, *src, *dst;
     ngx_log_handler_pt            handler;
     ngx_stream_upstream_t        *u;
@@ -1447,6 +1486,8 @@ ngx_stream_proxy_process(ngx_stream_sess
         b = &u->upstream_buf;
         limit_rate = pscf->download_rate;
         received = &u->received;
+        out = &u->downstream_out;
+        busy = &u->downstream_busy;
 
     } else {
         src = c;
@@ -1454,24 +1495,18 @@ ngx_stream_proxy_process(ngx_stream_sess
         b = &u->downstream_buf;
         limit_rate = pscf->upload_rate;
         received = &s->received;
+        out = &u->upstream_out;
+        busy = &u->upstream_busy;
     }
 
     for ( ;; ) {
 
-        if (do_write) {
-
-            size = b->last - b->pos;
-
-            if (size && dst && dst->write->ready) {
-
-                n = dst->send(dst, b->pos, size);
-
-                if (n == NGX_AGAIN && dst->shared) {
-                    /* cannot wait on a shared socket */
-                    n = NGX_ERROR;
-                }
-
-                if (n == NGX_ERROR) {
+        if (do_write && dst) {
+
+            if (*out || *busy || dst->buffered) {
+                rc = ngx_stream_top_filter(s, *out, from_upstream);
+
+                if (rc == NGX_ERROR) {
                     if (c->type == SOCK_DGRAM && !from_upstream) {
                         ngx_stream_proxy_next_upstream(s);
                         return;
@@ -1481,13 +1516,12 @@ ngx_stream_proxy_process(ngx_stream_sess
                     return;
                 }
 
-                if (n > 0) {
-                    b->pos += n;
-
-                    if (b->pos == b->last) {
-                        b->pos = b->start;
-                        b->last = b->start;
-                    }
+                ngx_chain_update_chains(c->pool, &u->free, busy, out,
+                                      (ngx_buf_tag_t) &ngx_stream_proxy_module);
+
+                if (*busy == NULL) {
+                    b->pos = b->start;
+                    b->last = b->start;
                 }
             }
         }
@@ -1514,11 +1548,21 @@ ngx_stream_proxy_process(ngx_stream_sess
 
             n = src->recv(src, b->last, size);
 
-            if (n == NGX_AGAIN || n == 0) {
+            if (n == NGX_AGAIN) {
                 break;
             }
 
-            if (n > 0) {
+            if (n == NGX_ERROR) {
+                if (c->type == SOCK_DGRAM && u->received == 0) {
+                    ngx_stream_proxy_next_upstream(s);
+                    return;
+                }
+
+                src->read->eof = 1;
+                n = 0;
+            }
+
+            if (n >= 0) {
                 if (limit_rate) {
                     delay = (ngx_msec_t) (n * 1000 / limit_rate);
 
@@ -1541,27 +1585,37 @@ ngx_stream_proxy_process(ngx_stream_sess
                     src->read->eof = 1;
                 }
 
+                for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }
+
+                cl = ngx_chain_get_free_buf(c->pool, &u->free);
+                if (cl == NULL) {
+                    ngx_stream_proxy_finalize(s,
+                                              NGX_STREAM_INTERNAL_SERVER_ERROR);
+                    return;
+                }
+
+                *ll = cl;
+
+                cl->buf->pos = b->last;
+                cl->buf->last = b->last + n;
+                cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+
+                cl->buf->temporary = (n ? 1 : 0);
+                cl->buf->last_buf = src->read->eof;
+                cl->buf->flush = 1;
+
                 *received += n;
                 b->last += n;
                 do_write = 1;
 
                 continue;
             }
-
-            if (n == NGX_ERROR) {
-                if (c->type == SOCK_DGRAM && u->received == 0) {
-                    ngx_stream_proxy_next_upstream(s);
-                    return;
-                }
-
-                src->read->eof = 1;
-            }
         }
 
         break;
     }
 
-    if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) {
+    if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) {
         handler = c->log->handler;
         c->log->handler = NULL;
 
@@ -1614,6 +1668,14 @@ ngx_stream_proxy_next_upstream(ngx_strea
                    "stream proxy next upstream");
 
     u = s->upstream;
+    pc = u->peer.connection;
+
+    if (u->upstream_out || u->upstream_busy || (pc && pc->buffered)) {
+        ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
+                      "pending buffers on next upstream");
+        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+        return;
+    }
 
     if (u->peer.sockaddr) {
         u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED);
@@ -1632,8 +1694,6 @@ ngx_stream_proxy_next_upstream(ngx_strea
         return;
     }
 
-    pc = u->peer.connection;
-
     if (pc) {
         ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
                        "close proxy upstream connection: %d", pc->fd);