Mercurial > hg > nginx
diff src/stream/ngx_stream_proxy_module.c @ 6436:8f038068f4bc
Stream: UDP proxy.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Wed, 20 Jan 2016 19:52:12 +0300 |
parents | d1c791479bbb |
children | a01e315b3a78 |
line wrap: on
line diff
--- a/src/stream/ngx_stream_proxy_module.c +++ b/src/stream/ngx_stream_proxy_module.c @@ -17,6 +17,7 @@ typedef struct { size_t buffer_size; size_t upload_rate; size_t download_rate; + ngx_uint_t responses; ngx_uint_t next_upstream_tries; ngx_flag_t next_upstream; ngx_flag_t proxy_protocol; @@ -167,6 +168,13 @@ static ngx_command_t ngx_stream_proxy_c offsetof(ngx_stream_proxy_srv_conf_t, download_rate), NULL }, + { ngx_string("proxy_responses"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, responses), + NULL }, + { ngx_string("proxy_next_upstream"), NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, ngx_conf_set_flag_slot, @@ -351,6 +359,7 @@ ngx_stream_proxy_handler(ngx_stream_sess u->peer.log_error = NGX_ERROR_ERR; u->peer.local = pscf->local; + u->peer.type = c->type; uscf = pscf->upstream; @@ -370,6 +379,14 @@ ngx_stream_proxy_handler(ngx_stream_sess u->proxy_protocol = pscf->proxy_protocol; u->start_sec = ngx_time(); + c->write->handler = ngx_stream_proxy_downstream_handler; + c->read->handler = ngx_stream_proxy_downstream_handler; + + if (c->type == SOCK_DGRAM) { + ngx_stream_proxy_connect(s); + return; + } + p = ngx_pnalloc(c->pool, pscf->buffer_size); if (p == NULL) { ngx_stream_proxy_finalize(s, NGX_ERROR); @@ -381,9 +398,6 @@ ngx_stream_proxy_handler(ngx_stream_sess u->downstream_buf.pos = p; u->downstream_buf.last = p; - c->write->handler = ngx_stream_proxy_downstream_handler; - c->read->handler = ngx_stream_proxy_downstream_handler; - if (u->proxy_protocol #if (NGX_STREAM_SSL) && pscf->ssl == NULL @@ -488,7 +502,10 @@ ngx_stream_proxy_init_upstream(ngx_strea cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); - if (cscf->tcp_nodelay && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { + if (pc->type == SOCK_STREAM + && cscf->tcp_nodelay + && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) + { ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay"); tcp_nodelay = 1; @@ -516,7 +533,7 @@ ngx_stream_proxy_init_upstream(ngx_strea pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); #if (NGX_STREAM_SSL) - if (pscf->ssl && pc->ssl == NULL) { + if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) { ngx_stream_proxy_ssl_init_connection(s); return; } @@ -544,23 +561,35 @@ ngx_stream_proxy_init_upstream(ngx_strea c->log->action = "proxying connection"; - p = ngx_pnalloc(c->pool, pscf->buffer_size); - if (p == NULL) { - ngx_stream_proxy_finalize(s, NGX_ERROR); - return; + if (u->upstream_buf.start == NULL) { + p = ngx_pnalloc(c->pool, pscf->buffer_size); + if (p == NULL) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + u->upstream_buf.start = p; + u->upstream_buf.end = p + pscf->buffer_size; + u->upstream_buf.pos = p; + u->upstream_buf.last = p; } - u->upstream_buf.start = p; - u->upstream_buf.end = p + pscf->buffer_size; - u->upstream_buf.pos = p; - u->upstream_buf.last = p; + if (c->type == SOCK_DGRAM) { + s->received = c->buffer->last - c->buffer->pos; + u->downstream_buf = *c->buffer; + + if (pscf->responses == 0) { + pc->read->ready = 0; + pc->read->eof = 1; + } + } u->connected = 1; pc->read->handler = ngx_stream_proxy_upstream_handler; pc->write->handler = ngx_stream_proxy_upstream_handler; - if (pc->read->ready) { + if (pc->read->ready || pc->read->eof) { ngx_post_event(pc->read, &ngx_posted_events); } @@ -894,11 +923,15 @@ ngx_stream_proxy_process_connection(ngx_ s = c->data; u = s->upstream; + c = s->connection; + pc = u->peer.connection; + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + if (ev->timedout) { + ev->timedout = 0; if (ev->delayed) { - - ev->timedout = 0; ev->delayed = 0; if (!ev->ready) { @@ -907,20 +940,35 @@ ngx_stream_proxy_process_connection(ngx_ return; } - if (u->connected) { - pc = u->peer.connection; - - if (!c->read->delayed && !pc->read->delayed) { - pscf = ngx_stream_get_module_srv_conf(s, - ngx_stream_proxy_module); - ngx_add_timer(c->write, pscf->timeout); - } + if (u->connected && !c->read->delayed && !pc->read->delayed) { + ngx_add_timer(c->write, pscf->timeout); } return; } } else { + if (s->connection->type == SOCK_DGRAM) { + if (pscf->responses == NGX_MAX_INT32_VALUE) { + + /* + * successfully terminate timed out UDP session + * with unspecified number of responses + */ + + pc->read->ready = 0; + pc->read->eof = 1; + + ngx_stream_proxy_process(s, 1, 0); + return; + } + + if (u->received == 0) { + ngx_stream_proxy_next_upstream(s); + return; + } + } + ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); ngx_stream_proxy_finalize(s, NGX_DECLINED); return; @@ -1039,6 +1087,21 @@ ngx_stream_proxy_process(ngx_stream_sess c = s->connection; pc = u->connected ? u->peer.connection : NULL; + if (c->type == SOCK_DGRAM && (ngx_terminate || ngx_exiting)) { + + /* socket is already closed on worker shutdown */ + + handler = c->log->handler; + c->log->handler = NULL; + + ngx_log_error(NGX_LOG_INFO, c->log, 0, "disconnected on shutdown"); + + c->log->handler = handler; + + ngx_stream_proxy_finalize(s, NGX_OK); + return; + } + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); if (from_upstream) { @@ -1066,7 +1129,17 @@ ngx_stream_proxy_process(ngx_stream_sess n = dst->send(dst, b->pos, size); + if (n == NGX_AGAIN && dst->shared) { + /* cannot wait on a shared socket */ + n = NGX_ERROR; + } + if (n == NGX_ERROR) { + if (c->type == SOCK_DGRAM && !from_upstream) { + ngx_stream_proxy_next_upstream(s); + return; + } + ngx_stream_proxy_finalize(s, NGX_DECLINED); return; } @@ -1118,6 +1191,12 @@ ngx_stream_proxy_process(ngx_stream_sess } } + if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses) + { + src->read->ready = 0; + src->read->eof = 1; + } + *received += n; b->last += n; do_write = 1; @@ -1126,6 +1205,11 @@ ngx_stream_proxy_process(ngx_stream_sess } if (n == NGX_ERROR) { + if (c->type == SOCK_DGRAM && u->received == 0) { + ngx_stream_proxy_next_upstream(s); + return; + } + src->read->eof = 1; } } @@ -1152,13 +1236,13 @@ ngx_stream_proxy_process(ngx_stream_sess flags = src->read->eof ? NGX_CLOSE_EVENT : 0; - if (ngx_handle_read_event(src->read, flags) != NGX_OK) { + if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) { ngx_stream_proxy_finalize(s, NGX_ERROR); return; } if (dst) { - if (ngx_handle_write_event(dst->write, 0) != NGX_OK) { + if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) { ngx_stream_proxy_finalize(s, NGX_ERROR); return; } @@ -1331,6 +1415,7 @@ ngx_stream_proxy_create_srv_conf(ngx_con conf->buffer_size = NGX_CONF_UNSET_SIZE; conf->upload_rate = NGX_CONF_UNSET_SIZE; conf->download_rate = NGX_CONF_UNSET_SIZE; + conf->responses = NGX_CONF_UNSET_UINT; conf->next_upstream_tries = NGX_CONF_UNSET_UINT; conf->next_upstream = NGX_CONF_UNSET; conf->proxy_protocol = NGX_CONF_UNSET; @@ -1373,6 +1458,9 @@ ngx_stream_proxy_merge_srv_conf(ngx_conf ngx_conf_merge_size_value(conf->download_rate, prev->download_rate, 0); + ngx_conf_merge_uint_value(conf->responses, + prev->responses, NGX_MAX_INT32_VALUE); + ngx_conf_merge_uint_value(conf->next_upstream_tries, prev->next_upstream_tries, 0);