diff src/stream/ngx_stream_proxy_module.c @ 6436:8f038068f4bc

Stream: UDP proxy.
author Roman Arutyunyan <arut@nginx.com>
date Wed, 20 Jan 2016 19:52:12 +0300
parents d1c791479bbb
children a01e315b3a78
line wrap: on
line diff
--- a/src/stream/ngx_stream_proxy_module.c
+++ b/src/stream/ngx_stream_proxy_module.c
@@ -17,6 +17,7 @@ typedef struct {
     size_t                           buffer_size;
     size_t                           upload_rate;
     size_t                           download_rate;
+    ngx_uint_t                       responses;
     ngx_uint_t                       next_upstream_tries;
     ngx_flag_t                       next_upstream;
     ngx_flag_t                       proxy_protocol;
@@ -167,6 +168,13 @@ static ngx_command_t  ngx_stream_proxy_c
       offsetof(ngx_stream_proxy_srv_conf_t, download_rate),
       NULL },
 
+    { ngx_string("proxy_responses"),
+      NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+      ngx_conf_set_num_slot,
+      NGX_STREAM_SRV_CONF_OFFSET,
+      offsetof(ngx_stream_proxy_srv_conf_t, responses),
+      NULL },
+
     { ngx_string("proxy_next_upstream"),
       NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
       ngx_conf_set_flag_slot,
@@ -351,6 +359,7 @@ ngx_stream_proxy_handler(ngx_stream_sess
     u->peer.log_error = NGX_ERROR_ERR;
 
     u->peer.local = pscf->local;
+    u->peer.type = c->type;
 
     uscf = pscf->upstream;
 
@@ -370,6 +379,14 @@ ngx_stream_proxy_handler(ngx_stream_sess
     u->proxy_protocol = pscf->proxy_protocol;
     u->start_sec = ngx_time();
 
+    c->write->handler = ngx_stream_proxy_downstream_handler;
+    c->read->handler = ngx_stream_proxy_downstream_handler;
+
+    if (c->type == SOCK_DGRAM) {
+        ngx_stream_proxy_connect(s);
+        return;
+    }
+
     p = ngx_pnalloc(c->pool, pscf->buffer_size);
     if (p == NULL) {
         ngx_stream_proxy_finalize(s, NGX_ERROR);
@@ -381,9 +398,6 @@ ngx_stream_proxy_handler(ngx_stream_sess
     u->downstream_buf.pos = p;
     u->downstream_buf.last = p;
 
-    c->write->handler = ngx_stream_proxy_downstream_handler;
-    c->read->handler = ngx_stream_proxy_downstream_handler;
-
     if (u->proxy_protocol
 #if (NGX_STREAM_SSL)
         && pscf->ssl == NULL
@@ -488,7 +502,10 @@ ngx_stream_proxy_init_upstream(ngx_strea
 
     cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
 
-    if (cscf->tcp_nodelay && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
+    if (pc->type == SOCK_STREAM
+        && cscf->tcp_nodelay
+        && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET)
+    {
         ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay");
 
         tcp_nodelay = 1;
@@ -516,7 +533,7 @@ ngx_stream_proxy_init_upstream(ngx_strea
     pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
 
 #if (NGX_STREAM_SSL)
-    if (pscf->ssl && pc->ssl == NULL) {
+    if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) {
         ngx_stream_proxy_ssl_init_connection(s);
         return;
     }
@@ -544,23 +561,35 @@ ngx_stream_proxy_init_upstream(ngx_strea
 
     c->log->action = "proxying connection";
 
-    p = ngx_pnalloc(c->pool, pscf->buffer_size);
-    if (p == NULL) {
-        ngx_stream_proxy_finalize(s, NGX_ERROR);
-        return;
+    if (u->upstream_buf.start == NULL) {
+        p = ngx_pnalloc(c->pool, pscf->buffer_size);
+        if (p == NULL) {
+            ngx_stream_proxy_finalize(s, NGX_ERROR);
+            return;
+        }
+
+        u->upstream_buf.start = p;
+        u->upstream_buf.end = p + pscf->buffer_size;
+        u->upstream_buf.pos = p;
+        u->upstream_buf.last = p;
     }
 
-    u->upstream_buf.start = p;
-    u->upstream_buf.end = p + pscf->buffer_size;
-    u->upstream_buf.pos = p;
-    u->upstream_buf.last = p;
+    if (c->type == SOCK_DGRAM) {
+        s->received = c->buffer->last - c->buffer->pos;
+        u->downstream_buf = *c->buffer;
+
+        if (pscf->responses == 0) {
+            pc->read->ready = 0;
+            pc->read->eof = 1;
+        }
+    }
 
     u->connected = 1;
 
     pc->read->handler = ngx_stream_proxy_upstream_handler;
     pc->write->handler = ngx_stream_proxy_upstream_handler;
 
-    if (pc->read->ready) {
+    if (pc->read->ready || pc->read->eof) {
         ngx_post_event(pc->read, &ngx_posted_events);
     }
 
@@ -894,11 +923,15 @@ ngx_stream_proxy_process_connection(ngx_
     s = c->data;
     u = s->upstream;
 
+    c = s->connection;
+    pc = u->peer.connection;
+
+    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
     if (ev->timedout) {
+        ev->timedout = 0;
 
         if (ev->delayed) {
-
-            ev->timedout = 0;
             ev->delayed = 0;
 
             if (!ev->ready) {
@@ -907,20 +940,35 @@ ngx_stream_proxy_process_connection(ngx_
                     return;
                 }
 
-                if (u->connected) {
-                    pc = u->peer.connection;
-
-                    if (!c->read->delayed && !pc->read->delayed) {
-                        pscf = ngx_stream_get_module_srv_conf(s,
-                                                       ngx_stream_proxy_module);
-                        ngx_add_timer(c->write, pscf->timeout);
-                    }
+                if (u->connected && !c->read->delayed && !pc->read->delayed) {
+                    ngx_add_timer(c->write, pscf->timeout);
                 }
 
                 return;
             }
 
         } else {
+            if (s->connection->type == SOCK_DGRAM) {
+                if (pscf->responses == NGX_MAX_INT32_VALUE) {
+
+                    /*
+                     * successfully terminate timed out UDP session
+                     * with unspecified number of responses
+                     */
+
+                    pc->read->ready = 0;
+                    pc->read->eof = 1;
+
+                    ngx_stream_proxy_process(s, 1, 0);
+                    return;
+                }
+
+                if (u->received == 0) {
+                    ngx_stream_proxy_next_upstream(s);
+                    return;
+                }
+            }
+
             ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
             ngx_stream_proxy_finalize(s, NGX_DECLINED);
             return;
@@ -1039,6 +1087,21 @@ ngx_stream_proxy_process(ngx_stream_sess
     c = s->connection;
     pc = u->connected ? u->peer.connection : NULL;
 
+    if (c->type == SOCK_DGRAM && (ngx_terminate || ngx_exiting)) {
+
+        /* socket is already closed on worker shutdown */
+
+        handler = c->log->handler;
+        c->log->handler = NULL;
+
+        ngx_log_error(NGX_LOG_INFO, c->log, 0, "disconnected on shutdown");
+
+        c->log->handler = handler;
+
+        ngx_stream_proxy_finalize(s, NGX_OK);
+        return;
+    }
+
     pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
 
     if (from_upstream) {
@@ -1066,7 +1129,17 @@ ngx_stream_proxy_process(ngx_stream_sess
 
                 n = dst->send(dst, b->pos, size);
 
+                if (n == NGX_AGAIN && dst->shared) {
+                    /* cannot wait on a shared socket */
+                    n = NGX_ERROR;
+                }
+
                 if (n == NGX_ERROR) {
+                    if (c->type == SOCK_DGRAM && !from_upstream) {
+                        ngx_stream_proxy_next_upstream(s);
+                        return;
+                    }
+
                     ngx_stream_proxy_finalize(s, NGX_DECLINED);
                     return;
                 }
@@ -1118,6 +1191,12 @@ ngx_stream_proxy_process(ngx_stream_sess
                     }
                 }
 
+                if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses)
+                {
+                    src->read->ready = 0;
+                    src->read->eof = 1;
+                }
+
                 *received += n;
                 b->last += n;
                 do_write = 1;
@@ -1126,6 +1205,11 @@ ngx_stream_proxy_process(ngx_stream_sess
             }
 
             if (n == NGX_ERROR) {
+                if (c->type == SOCK_DGRAM && u->received == 0) {
+                    ngx_stream_proxy_next_upstream(s);
+                    return;
+                }
+
                 src->read->eof = 1;
             }
         }
@@ -1152,13 +1236,13 @@ ngx_stream_proxy_process(ngx_stream_sess
 
     flags = src->read->eof ? NGX_CLOSE_EVENT : 0;
 
-    if (ngx_handle_read_event(src->read, flags) != NGX_OK) {
+    if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) {
         ngx_stream_proxy_finalize(s, NGX_ERROR);
         return;
     }
 
     if (dst) {
-        if (ngx_handle_write_event(dst->write, 0) != NGX_OK) {
+        if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) {
             ngx_stream_proxy_finalize(s, NGX_ERROR);
             return;
         }
@@ -1331,6 +1415,7 @@ ngx_stream_proxy_create_srv_conf(ngx_con
     conf->buffer_size = NGX_CONF_UNSET_SIZE;
     conf->upload_rate = NGX_CONF_UNSET_SIZE;
     conf->download_rate = NGX_CONF_UNSET_SIZE;
+    conf->responses = NGX_CONF_UNSET_UINT;
     conf->next_upstream_tries = NGX_CONF_UNSET_UINT;
     conf->next_upstream = NGX_CONF_UNSET;
     conf->proxy_protocol = NGX_CONF_UNSET;
@@ -1373,6 +1458,9 @@ ngx_stream_proxy_merge_srv_conf(ngx_conf
     ngx_conf_merge_size_value(conf->download_rate,
                               prev->download_rate, 0);
 
+    ngx_conf_merge_uint_value(conf->responses,
+                              prev->responses, NGX_MAX_INT32_VALUE);
+
     ngx_conf_merge_uint_value(conf->next_upstream_tries,
                               prev->next_upstream_tries, 0);