diff src/stream/ngx_stream_proxy_module.c @ 7286:d27aa9060c95

Stream: udp streams. Previously, only one client packet could be processed in a udp stream session even though multiple response packets were supported. Now multiple packets coming from the same client address and port are delivered to the same stream session. If it's required to maintain a single stream of data, nginx should be configured in a way that all packets from a client are delivered to the same worker. On Linux and DragonFly BSD the "reuseport" parameter should be specified for this. Other systems do not currently provide appropriate mechanisms. For these systems a single stream of udp packets is only guaranteed in single-worker configurations. The proxy_response directive now specifies how many packets are expected in response to a single client packet.
author Roman Arutyunyan <arut@nginx.com>
date Mon, 04 Jun 2018 19:50:00 +0300
parents ec4d95eed062
children 696df3ac27ac
line wrap: on
line diff
--- a/src/stream/ngx_stream_proxy_module.c
+++ b/src/stream/ngx_stream_proxy_module.c
@@ -377,6 +377,8 @@ ngx_stream_proxy_handler(ngx_stream_sess
 
     s->log_handler = ngx_stream_proxy_log_error;
 
+    u->requests = 1;
+
     u->peer.log = c->log;
     u->peer.log_error = NGX_ERROR_ERR;
 
@@ -398,21 +400,19 @@ ngx_stream_proxy_handler(ngx_stream_sess
         return;
     }
 
-    if (c->type == SOCK_STREAM) {
-        p = ngx_pnalloc(c->pool, pscf->buffer_size);
-        if (p == NULL) {
-            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
-            return;
-        }
-
-        u->downstream_buf.start = p;
-        u->downstream_buf.end = p + pscf->buffer_size;
-        u->downstream_buf.pos = p;
-        u->downstream_buf.last = p;
-
-        if (c->read->ready) {
-            ngx_post_event(c->read, &ngx_posted_events);
-        }
+    p = ngx_pnalloc(c->pool, pscf->buffer_size);
+    if (p == NULL) {
+        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+        return;
+    }
+
+    u->downstream_buf.start = p;
+    u->downstream_buf.end = p + pscf->buffer_size;
+    u->downstream_buf.pos = p;
+    u->downstream_buf.last = p;
+
+    if (c->read->ready) {
+        ngx_post_event(c->read, &ngx_posted_events);
     }
 
     if (pscf->upstream_value) {
@@ -829,7 +829,6 @@ ngx_stream_proxy_init_upstream(ngx_strea
 
         cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
         cl->buf->flush = 1;
-        cl->buf->last_buf = (c->type == SOCK_DGRAM);
 
         cl->next = u->upstream_out;
         u->upstream_out = cl;
@@ -871,17 +870,12 @@ ngx_stream_proxy_init_upstream(ngx_strea
         u->proxy_protocol = 0;
     }
 
-    if (c->type == SOCK_DGRAM && pscf->responses == 0) {
-        pc->read->ready = 0;
-        pc->read->eof = 1;
-    }
-
     u->connected = 1;
 
     pc->read->handler = ngx_stream_proxy_upstream_handler;
     pc->write->handler = ngx_stream_proxy_upstream_handler;
 
-    if (pc->read->ready || pc->read->eof) {
+    if (pc->read->ready) {
         ngx_post_event(pc->read, &ngx_posted_events);
     }
 
@@ -1280,6 +1274,7 @@ static void
 ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream)
 {
     ngx_connection_t             *c, *pc;
+    ngx_log_handler_pt            handler;
     ngx_stream_session_t         *s;
     ngx_stream_upstream_t        *u;
     ngx_stream_proxy_srv_conf_t  *pscf;
@@ -1328,25 +1323,37 @@ ngx_stream_proxy_process_connection(ngx_
                      * with unspecified number of responses
                      */
 
-                    pc->read->ready = 0;
-                    pc->read->eof = 1;
-
-                    ngx_stream_proxy_process(s, 1, 0);
+                    handler = c->log->handler;
+                    c->log->handler = NULL;
+
+                    ngx_log_error(NGX_LOG_INFO, c->log, 0,
+                                  "udp timed out"
+                                  ", packets from/to client:%ui/%ui"
+                                  ", bytes from/to client:%O/%O"
+                                  ", bytes from/to upstream:%O/%O",
+                                  u->requests, u->responses,
+                                  s->received, c->sent, u->received,
+                                  pc ? pc->sent : 0);
+
+                    c->log->handler = handler;
+
+                    ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
                     return;
                 }
 
                 ngx_connection_error(pc, NGX_ETIMEDOUT, "upstream timed out");
 
-                if (u->received == 0) {
-                    ngx_stream_proxy_next_upstream(s);
-                    return;
-                }
-
-            } else {
-                ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
+                pc->read->error = 1;
+
+                ngx_stream_proxy_finalize(s, NGX_STREAM_BAD_GATEWAY);
+
+                return;
             }
 
+            ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
+
             ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
+
             return;
         }
 
@@ -1453,7 +1460,7 @@ ngx_stream_proxy_process(ngx_stream_sess
     ssize_t                       n;
     ngx_buf_t                    *b;
     ngx_int_t                     rc;
-    ngx_uint_t                    flags;
+    ngx_uint_t                    flags, *packets;
     ngx_msec_t                    delay;
     ngx_chain_t                  *cl, **ll, **out, **busy;
     ngx_connection_t             *c, *pc, *src, *dst;
@@ -1489,6 +1496,7 @@ ngx_stream_proxy_process(ngx_stream_sess
         b = &u->upstream_buf;
         limit_rate = pscf->download_rate;
         received = &u->received;
+        packets = &u->responses;
         out = &u->downstream_out;
         busy = &u->downstream_busy;
         recv_action = "proxying and reading from upstream";
@@ -1500,6 +1508,7 @@ ngx_stream_proxy_process(ngx_stream_sess
         b = &u->downstream_buf;
         limit_rate = pscf->upload_rate;
         received = &s->received;
+        packets = &u->requests;
         out = &u->upstream_out;
         busy = &u->upstream_busy;
         recv_action = "proxying and reading from client";
@@ -1516,11 +1525,6 @@ ngx_stream_proxy_process(ngx_stream_sess
                 rc = ngx_stream_top_filter(s, *out, from_upstream);
 
                 if (rc == NGX_ERROR) {
-                    if (c->type == SOCK_DGRAM && !from_upstream) {
-                        ngx_stream_proxy_next_upstream(s);
-                        return;
-                    }
-
                     ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
                     return;
                 }
@@ -1565,11 +1569,6 @@ ngx_stream_proxy_process(ngx_stream_sess
             }
 
             if (n == NGX_ERROR) {
-                if (c->type == SOCK_DGRAM && u->received == 0) {
-                    ngx_stream_proxy_next_upstream(s);
-                    return;
-                }
-
                 src->read->eof = 1;
                 n = 0;
             }
@@ -1591,12 +1590,6 @@ ngx_stream_proxy_process(ngx_stream_sess
                     }
                 }
 
-                if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses)
-                {
-                    src->read->ready = 0;
-                    src->read->eof = 1;
-                }
-
                 for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }
 
                 cl = ngx_chain_get_free_buf(c->pool, &u->free);
@@ -1616,6 +1609,7 @@ ngx_stream_proxy_process(ngx_stream_sess
                 cl->buf->last_buf = src->read->eof;
                 cl->buf->flush = 1;
 
+                (*packets)++;
                 *received += n;
                 b->last += n;
                 do_write = 1;
@@ -1629,15 +1623,38 @@ ngx_stream_proxy_process(ngx_stream_sess
 
     c->log->action = "proxying connection";
 
-    if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) {
+    if (c->type == SOCK_DGRAM
+        && pscf->responses != NGX_MAX_INT32_VALUE
+        && u->responses >= pscf->responses * u->requests
+        && !src->buffered && dst && !dst->buffered)
+    {
         handler = c->log->handler;
         c->log->handler = NULL;
 
         ngx_log_error(NGX_LOG_INFO, c->log, 0,
-                      "%s%s disconnected"
+                      "udp done"
+                      ", packets from/to client:%ui/%ui"
                       ", bytes from/to client:%O/%O"
                       ", bytes from/to upstream:%O/%O",
-                      src->type == SOCK_DGRAM ? "udp " : "",
+                      u->requests, u->responses,
+                      s->received, c->sent, u->received, pc ? pc->sent : 0);
+
+        c->log->handler = handler;
+
+        ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
+        return;
+    }
+
+    if (c->type == SOCK_STREAM
+        && src->read->eof && dst && (dst->read->eof || !dst->buffered))
+    {
+        handler = c->log->handler;
+        c->log->handler = NULL;
+
+        ngx_log_error(NGX_LOG_INFO, c->log, 0,
+                      "%s disconnected"
+                      ", bytes from/to client:%O/%O"
+                      ", bytes from/to upstream:%O/%O",
                       from_upstream ? "upstream" : "client",
                       s->received, c->sent, u->received, pc ? pc->sent : 0);
 
@@ -1739,6 +1756,7 @@ ngx_stream_proxy_next_upstream(ngx_strea
 static void
 ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_uint_t rc)
 {
+    ngx_uint_t              state;
     ngx_connection_t       *pc;
     ngx_stream_upstream_t  *u;
 
@@ -1768,7 +1786,15 @@ ngx_stream_proxy_finalize(ngx_stream_ses
     }
 
     if (u->peer.free && u->peer.sockaddr) {
-        u->peer.free(&u->peer, u->peer.data, 0);
+        state = 0;
+
+        if (pc && pc->type == SOCK_DGRAM
+            && (pc->read->error || pc->write->error))
+        {
+            state = NGX_PEER_FAILED;
+        }
+
+        u->peer.free(&u->peer, u->peer.data, state);
         u->peer.sockaddr = NULL;
     }