# HG changeset patch # User Roman Arutyunyan # Date 1453308732 -10800 # Node ID 8f038068f4bc0379ff1df0695e07f9c3ccf9c20e # Parent d1c791479bbbad13afec5201069b5e113cafb95e Stream: UDP proxy. diff --git a/auto/options b/auto/options --- a/auto/options +++ b/auto/options @@ -479,8 +479,8 @@ cat << END --without-mail_imap_module disable ngx_mail_imap_module --without-mail_smtp_module disable ngx_mail_smtp_module - --with-stream enable TCP proxy module - --with-stream=dynamic enable dynamic TCP proxy module + --with-stream enable TCP/UDP proxy module + --with-stream=dynamic enable dynamic TCP/UDP proxy module --with-stream_ssl_module enable ngx_stream_ssl_module --without-stream_limit_conn_module disable ngx_stream_limit_conn_module --without-stream_access_module disable ngx_stream_access_module diff --git a/auto/sources b/auto/sources --- a/auto/sources +++ b/auto/sources @@ -165,6 +165,7 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \ src/os/unix/ngx_udp_recv.c \ src/os/unix/ngx_send.c \ src/os/unix/ngx_writev_chain.c \ + src/os/unix/ngx_udp_send.c \ src/os/unix/ngx_channel.c \ src/os/unix/ngx_shmem.c \ src/os/unix/ngx_process.c \ diff --git a/auto/unix b/auto/unix --- a/auto/unix +++ b/auto/unix @@ -329,6 +329,45 @@ ngx_feature_test="setsockopt(0, SOL_SOCK . auto/feature +# BSD way to get IPv4 datagram destination address + +ngx_feature="IP_RECVDSTADDR" +ngx_feature_name="NGX_HAVE_IP_RECVDSTADDR" +ngx_feature_run=no +ngx_feature_incs="#include + #include " +ngx_feature_path= +ngx_feature_libs= +ngx_feature_test="setsockopt(0, IPPROTO_IP, IP_RECVDSTADDR, NULL, 0)" +. auto/feature + + +# Linux way to get IPv4 datagram destination address + +ngx_feature="IP_PKTINFO" +ngx_feature_name="NGX_HAVE_IP_PKTINFO" +ngx_feature_run=no +ngx_feature_incs="#include + #include " +ngx_feature_path= +ngx_feature_libs= +ngx_feature_test="setsockopt(0, IPPROTO_IP, IP_PKTINFO, NULL, 0)" +. auto/feature + + +# RFC 3542 way to get IPv6 datagram destination address + +ngx_feature="IPV6_RECVPKTINFO" +ngx_feature_name="NGX_HAVE_IPV6_RECVPKTINFO" +ngx_feature_run=no +ngx_feature_incs="#include + #include " +ngx_feature_path= +ngx_feature_libs= +ngx_feature_test="setsockopt(0, IPPROTO_IPV6, IPV6_RECVPKTINFO, NULL, 0)" +. auto/feature + + ngx_feature="TCP_DEFER_ACCEPT" ngx_feature_name="NGX_HAVE_DEFERRED_ACCEPT" ngx_feature_run=no diff --git a/src/core/ngx_connection.c b/src/core/ngx_connection.c --- a/src/core/ngx_connection.c +++ b/src/core/ngx_connection.c @@ -566,6 +566,11 @@ ngx_open_listening_sockets(ngx_cycle_t * } #endif + if (ls[i].type != SOCK_STREAM) { + ls[i].fd = s; + continue; + } + if (listen(s, ls[i].backlog) == -1) { err = ngx_socket_errno; @@ -865,6 +870,67 @@ ngx_configure_listening_sockets(ngx_cycl #endif #endif /* NGX_HAVE_DEFERRED_ACCEPT */ + +#if (NGX_HAVE_IP_RECVDSTADDR) + + if (ls[i].wildcard + && ls[i].type == SOCK_DGRAM + && ls[i].sockaddr->sa_family == AF_INET) + { + value = 1; + + if (setsockopt(ls[i].fd, IPPROTO_IP, IP_RECVDSTADDR, + (const void *) &value, sizeof(int)) + == -1) + { + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno, + "setsockopt(IP_RECVDSTADDR) " + "for %V failed, ignored", + &ls[i].addr_text); + } + } + +#elif (NGX_HAVE_IP_PKTINFO) + + if (ls[i].wildcard + && ls[i].type == SOCK_DGRAM + && ls[i].sockaddr->sa_family == AF_INET) + { + value = 1; + + if (setsockopt(ls[i].fd, IPPROTO_IP, IP_PKTINFO, + (const void *) &value, sizeof(int)) + == -1) + { + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno, + "setsockopt(IP_PKTINFO) " + "for %V failed, ignored", + &ls[i].addr_text); + } + } + +#endif + +#if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO) + + if (ls[i].wildcard + && ls[i].type == SOCK_DGRAM + && ls[i].sockaddr->sa_family == AF_INET6) + { + value = 1; + + if (setsockopt(ls[i].fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, + (const void *) &value, sizeof(int)) + == -1) + { + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno, + "setsockopt(IPV6_RECVPKTINFO) " + "for %V failed, ignored", + &ls[i].addr_text); + } + } + +#endif } return; @@ -978,7 +1044,7 @@ ngx_get_connection(ngx_socket_t s, ngx_l ngx_cycle->free_connections = c->data; ngx_cycle->free_connection_n--; - if (ngx_cycle->files) { + if (ngx_cycle->files && ngx_cycle->files[s] == NULL) { ngx_cycle->files[s] = c; } @@ -1019,7 +1085,7 @@ ngx_free_connection(ngx_connection_t *c) ngx_cycle->free_connections = c; ngx_cycle->free_connection_n++; - if (ngx_cycle->files) { + if (ngx_cycle->files && ngx_cycle->files[c->fd] == c) { ngx_cycle->files[c->fd] = NULL; } } @@ -1045,16 +1111,18 @@ ngx_close_connection(ngx_connection_t *c ngx_del_timer(c->write); } - if (ngx_del_conn) { - ngx_del_conn(c, NGX_CLOSE_EVENT); + if (!c->shared) { + if (ngx_del_conn) { + ngx_del_conn(c, NGX_CLOSE_EVENT); - } else { - if (c->read->active || c->read->disabled) { - ngx_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT); - } + } else { + if (c->read->active || c->read->disabled) { + ngx_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT); + } - if (c->write->active || c->write->disabled) { - ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT); + if (c->write->active || c->write->disabled) { + ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT); + } } } @@ -1078,6 +1146,10 @@ ngx_close_connection(ngx_connection_t *c fd = c->fd; c->fd = (ngx_socket_t) -1; + if (c->shared) { + return; + } + if (ngx_close_socket(fd) == -1) { err = ngx_socket_errno; diff --git a/src/core/ngx_connection.h b/src/core/ngx_connection.h --- a/src/core/ngx_connection.h +++ b/src/core/ngx_connection.h @@ -64,6 +64,7 @@ struct ngx_listening_s { unsigned nonblocking:1; unsigned shared:1; /* shared between threads or processes */ unsigned addr_ntop:1; + unsigned wildcard:1; #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) unsigned ipv6only:1; @@ -141,6 +142,8 @@ struct ngx_connection_s { ngx_pool_t *pool; + int type; + struct sockaddr *sockaddr; socklen_t socklen; ngx_str_t addr_text; @@ -174,6 +177,7 @@ struct ngx_connection_s { unsigned idle:1; unsigned reusable:1; unsigned close:1; + unsigned shared:1; unsigned sendfile:1; unsigned sndlowat:1; diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c --- a/src/event/ngx_event.c +++ b/src/event/ngx_event.c @@ -746,6 +746,7 @@ ngx_event_process_init(ngx_cycle_t *cycl return NGX_ERROR; } + c->type = ls[i].type; c->log = &ls[i].log; c->listening = &ls[i]; @@ -818,7 +819,8 @@ ngx_event_process_init(ngx_cycle_t *cycl #else - rev->handler = ngx_event_accept; + rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept + : ngx_event_recvmsg; if (ngx_use_accept_mutex #if (NGX_HAVE_REUSEPORT) diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -418,6 +418,7 @@ extern ngx_os_io_t ngx_io; #define ngx_udp_recv ngx_io.udp_recv #define ngx_send ngx_io.send #define ngx_send_chain ngx_io.send_chain +#define ngx_udp_send ngx_io.udp_send #define NGX_EVENT_MODULE 0x544E5645 /* "EVNT" */ @@ -491,6 +492,9 @@ extern ngx_module_t ngx_event_ void ngx_event_accept(ngx_event_t *ev); +#if !(NGX_WIN32) +void ngx_event_recvmsg(ngx_event_t *ev); +#endif ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle); u_char *ngx_accept_log_error(ngx_log_t *log, u_char *buf, size_t len); diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c --- a/src/event/ngx_event_accept.c +++ b/src/event/ngx_event_accept.c @@ -13,6 +13,10 @@ static ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle); static ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle, ngx_uint_t all); static void ngx_close_accepted_connection(ngx_connection_t *c); +#if (NGX_DEBUG) +static void ngx_debug_accepted_connection(ngx_event_conf_t *ecf, + ngx_connection_t *c); +#endif void @@ -149,6 +153,8 @@ ngx_event_accept(ngx_event_t *ev) return; } + c->type = SOCK_STREAM; + #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_active, 1); #endif @@ -276,60 +282,10 @@ ngx_event_accept(ngx_event_t *ev) #if (NGX_DEBUG) { - - ngx_str_t addr; - struct sockaddr_in *sin; - ngx_cidr_t *cidr; - ngx_uint_t i; - u_char text[NGX_SOCKADDR_STRLEN]; -#if (NGX_HAVE_INET6) - struct sockaddr_in6 *sin6; - ngx_uint_t n; -#endif - - cidr = ecf->debug_connection.elts; - for (i = 0; i < ecf->debug_connection.nelts; i++) { - if (cidr[i].family != (ngx_uint_t) c->sockaddr->sa_family) { - goto next; - } - - switch (cidr[i].family) { + ngx_str_t addr; + u_char text[NGX_SOCKADDR_STRLEN]; -#if (NGX_HAVE_INET6) - case AF_INET6: - sin6 = (struct sockaddr_in6 *) c->sockaddr; - for (n = 0; n < 16; n++) { - if ((sin6->sin6_addr.s6_addr[n] - & cidr[i].u.in6.mask.s6_addr[n]) - != cidr[i].u.in6.addr.s6_addr[n]) - { - goto next; - } - } - break; -#endif - -#if (NGX_HAVE_UNIX_DOMAIN) - case AF_UNIX: - break; -#endif - - default: /* AF_INET */ - sin = (struct sockaddr_in *) c->sockaddr; - if ((sin->sin_addr.s_addr & cidr[i].u.in.mask) - != cidr[i].u.in.addr) - { - goto next; - } - break; - } - - log->log_level = NGX_LOG_DEBUG_CONNECTION|NGX_LOG_DEBUG_ALL; - break; - - next: - continue; - } + ngx_debug_accepted_connection(ecf, c); if (log->log_level & NGX_LOG_DEBUG_EVENT) { addr.data = text; @@ -363,6 +319,324 @@ ngx_event_accept(ngx_event_t *ev) } +#if !(NGX_WIN32) + +void +ngx_event_recvmsg(ngx_event_t *ev) +{ + ssize_t n; + ngx_log_t *log; + ngx_err_t err; + ngx_event_t *rev, *wev; + struct iovec iov[1]; + struct msghdr msg; + ngx_listening_t *ls; + ngx_event_conf_t *ecf; + ngx_connection_t *c, *lc; + u_char sa[NGX_SOCKADDRLEN]; + static u_char buffer[65535]; + +#if (NGX_HAVE_MSGHDR_MSG_CONTROL) + +#if (NGX_HAVE_IP_RECVDSTADDR) + u_char msg_control[CMSG_SPACE(sizeof(struct in_addr))]; +#elif (NGX_HAVE_IP_PKTINFO) + u_char msg_control[CMSG_SPACE(sizeof(struct in_pktinfo))]; +#endif + +#if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO) + u_char msg_control6[CMSG_SPACE(sizeof(struct in6_pktinfo))]; +#endif + +#endif + + if (ev->timedout) { + if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { + return; + } + + ev->timedout = 0; + } + + ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); + + if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { + ev->available = ecf->multi_accept; + } + + lc = ev->data; + ls = lc->listening; + ev->ready = 0; + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, + "recvmsg on %V, ready: %d", &ls->addr_text, ev->available); + + do { + ngx_memzero(&msg, sizeof(struct msghdr)); + + iov[0].iov_base = (void *) buffer; + iov[0].iov_len = sizeof(buffer); + + msg.msg_name = &sa; + msg.msg_namelen = sizeof(sa); + msg.msg_iov = iov; + msg.msg_iovlen = 1; + +#if (NGX_HAVE_MSGHDR_MSG_CONTROL) + + if (ls->wildcard) { + +#if (NGX_HAVE_IP_RECVDSTADDR || NGX_HAVE_IP_PKTINFO) + if (ls->sockaddr->sa_family == AF_INET) { + msg.msg_control = &msg_control; + msg.msg_controllen = sizeof(msg_control); + } +#endif + +#if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO) + if (ls->sockaddr->sa_family == AF_INET6) { + msg.msg_control = &msg_control6; + msg.msg_controllen = sizeof(msg_control6); + } +#endif + } + +#endif + + n = recvmsg(lc->fd, &msg, 0); + + if (n == -1) { + err = ngx_socket_errno; + + if (err == NGX_EAGAIN) { + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, + "recvmsg() not ready"); + return; + } + + ngx_log_error(NGX_LOG_ALERT, ev->log, err, "recvmsg() failed"); + + return; + } + +#if (NGX_STAT_STUB) + (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1); +#endif + +#if (NGX_HAVE_MSGHDR_MSG_CONTROL) + if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) { + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, + "recvmsg() truncated data"); + continue; + } +#endif + + ngx_accept_disabled = ngx_cycle->connection_n / 8 + - ngx_cycle->free_connection_n; + + c = ngx_get_connection(lc->fd, ev->log); + if (c == NULL) { + return; + } + + c->shared = 1; + c->type = SOCK_DGRAM; + c->socklen = msg.msg_namelen; + +#if (NGX_STAT_STUB) + (void) ngx_atomic_fetch_add(ngx_stat_active, 1); +#endif + + c->pool = ngx_create_pool(ls->pool_size, ev->log); + if (c->pool == NULL) { + ngx_close_accepted_connection(c); + return; + } + + c->sockaddr = ngx_palloc(c->pool, c->socklen); + if (c->sockaddr == NULL) { + ngx_close_accepted_connection(c); + return; + } + + ngx_memcpy(c->sockaddr, msg.msg_name, c->socklen); + + log = ngx_palloc(c->pool, sizeof(ngx_log_t)); + if (log == NULL) { + ngx_close_accepted_connection(c); + return; + } + + *log = ls->log; + + c->send = ngx_udp_send; + + c->log = log; + c->pool->log = log; + + c->listening = ls; + c->local_sockaddr = ls->sockaddr; + c->local_socklen = ls->socklen; + +#if (NGX_HAVE_MSGHDR_MSG_CONTROL) + + if (ls->wildcard) { + struct cmsghdr *cmsg; + struct sockaddr *sockaddr; + + sockaddr = ngx_palloc(c->pool, c->local_socklen); + if (sockaddr == NULL) { + ngx_close_accepted_connection(c); + return; + } + + ngx_memcpy(sockaddr, c->local_sockaddr, c->local_socklen); + c->local_sockaddr = sockaddr; + + for (cmsg = CMSG_FIRSTHDR(&msg); + cmsg != NULL; + cmsg = CMSG_NXTHDR(&msg, cmsg)) + { + +#if (NGX_HAVE_IP_RECVDSTADDR) + + if (cmsg->cmsg_level == IPPROTO_IP + && cmsg->cmsg_type == IP_RECVDSTADDR + && sockaddr->sa_family == AF_INET) + { + struct in_addr *addr; + struct sockaddr_in *sin; + + addr = (struct in_addr *) CMSG_DATA(cmsg); + sin = (struct sockaddr_in *) sockaddr; + sin->sin_addr = *addr; + + break; + } + +#elif (NGX_HAVE_IP_PKTINFO) + + if (cmsg->cmsg_level == IPPROTO_IP + && cmsg->cmsg_type == IP_PKTINFO + && sockaddr->sa_family == AF_INET) + { + struct in_pktinfo *pkt; + struct sockaddr_in *sin; + + pkt = (struct in_pktinfo *) CMSG_DATA(cmsg); + sin = (struct sockaddr_in *) sockaddr; + sin->sin_addr = pkt->ipi_addr; + + break; + } + +#endif + +#if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO) + + if (cmsg->cmsg_level == IPPROTO_IPV6 + && cmsg->cmsg_type == IPV6_PKTINFO + && sockaddr->sa_family == AF_INET6) + { + struct in6_pktinfo *pkt6; + struct sockaddr_in6 *sin6; + + pkt6 = (struct in6_pktinfo *) CMSG_DATA(cmsg); + sin6 = (struct sockaddr_in6 *) sockaddr; + sin6->sin6_addr = pkt6->ipi6_addr; + + break; + } + +#endif + + } + } + +#endif + + c->buffer = ngx_create_temp_buf(c->pool, n); + if (c->buffer == NULL) { + ngx_close_accepted_connection(c); + return; + } + + c->buffer->last = ngx_cpymem(c->buffer->last, buffer, n); + + rev = c->read; + wev = c->write; + + wev->ready = 1; + + rev->log = log; + wev->log = log; + + /* + * TODO: MT: - ngx_atomic_fetch_add() + * or protection by critical section or light mutex + * + * TODO: MP: - allocated in a shared memory + * - ngx_atomic_fetch_add() + * or protection by critical section or light mutex + */ + + c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); + +#if (NGX_STAT_STUB) + (void) ngx_atomic_fetch_add(ngx_stat_handled, 1); +#endif + + if (ls->addr_ntop) { + c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); + if (c->addr_text.data == NULL) { + ngx_close_accepted_connection(c); + return; + } + + c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, + c->addr_text.data, + ls->addr_text_max_len, 0); + if (c->addr_text.len == 0) { + ngx_close_accepted_connection(c); + return; + } + } + +#if (NGX_DEBUG) + { + ngx_str_t addr; + u_char text[NGX_SOCKADDR_STRLEN]; + + ngx_debug_accepted_connection(ecf, c); + + if (log->log_level & NGX_LOG_DEBUG_EVENT) { + addr.data = text; + addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, + NGX_SOCKADDR_STRLEN, 1); + + ngx_log_debug4(NGX_LOG_DEBUG_EVENT, log, 0, + "*%uA recvmsg: %V fd:%d n:%z", + c->number, &addr, c->fd, n); + } + + } +#endif + + log->data = NULL; + log->handler = NULL; + + ls->handler(c); + + if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { + ev->available -= n; + } + + } while (ev->available); +} + +#endif + + ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { @@ -476,7 +750,7 @@ ngx_close_accepted_connection(ngx_connec fd = c->fd; c->fd = (ngx_socket_t) -1; - if (ngx_close_socket(fd) == -1) { + if (!c->shared && ngx_close_socket(fd) == -1) { ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno, ngx_close_socket_n " failed"); } @@ -497,3 +771,64 @@ ngx_accept_log_error(ngx_log_t *log, u_c return ngx_snprintf(buf, len, " while accepting new connection on %V", log->data); } + + +#if (NGX_DEBUG) + +static void +ngx_debug_accepted_connection(ngx_event_conf_t *ecf, ngx_connection_t *c) +{ + struct sockaddr_in *sin; + ngx_cidr_t *cidr; + ngx_uint_t i; +#if (NGX_HAVE_INET6) + struct sockaddr_in6 *sin6; + ngx_uint_t n; +#endif + + cidr = ecf->debug_connection.elts; + for (i = 0; i < ecf->debug_connection.nelts; i++) { + if (cidr[i].family != (ngx_uint_t) c->sockaddr->sa_family) { + goto next; + } + + switch (cidr[i].family) { + +#if (NGX_HAVE_INET6) + case AF_INET6: + sin6 = (struct sockaddr_in6 *) c->sockaddr; + for (n = 0; n < 16; n++) { + if ((sin6->sin6_addr.s6_addr[n] + & cidr[i].u.in6.mask.s6_addr[n]) + != cidr[i].u.in6.addr.s6_addr[n]) + { + goto next; + } + } + break; +#endif + +#if (NGX_HAVE_UNIX_DOMAIN) + case AF_UNIX: + break; +#endif + + default: /* AF_INET */ + sin = (struct sockaddr_in *) c->sockaddr; + if ((sin->sin_addr.s_addr & cidr[i].u.in.mask) + != cidr[i].u.in.addr) + { + goto next; + } + break; + } + + c->log->log_level = NGX_LOG_DEBUG_CONNECTION|NGX_LOG_DEBUG_ALL; + break; + + next: + continue; + } +} + +#endif diff --git a/src/event/ngx_event_connect.c b/src/event/ngx_event_connect.c --- a/src/event/ngx_event_connect.c +++ b/src/event/ngx_event_connect.c @@ -14,7 +14,7 @@ ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) { - int rc; + int rc, type; ngx_int_t event; ngx_err_t err; ngx_uint_t level; @@ -27,9 +27,12 @@ ngx_event_connect_peer(ngx_peer_connecti return rc; } - s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0); + type = (pc->type ? pc->type : SOCK_STREAM); - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, "socket %d", s); + s = ngx_socket(pc->sockaddr->sa_family, type, 0); + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d", + (type == SOCK_STREAM) ? "stream" : "dgram", s); if (s == (ngx_socket_t) -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, @@ -49,6 +52,8 @@ ngx_event_connect_peer(ngx_peer_connecti return NGX_ERROR; } + c->type = type; + if (pc->rcvbuf) { if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const void *) &pc->rcvbuf, sizeof(int)) == -1) @@ -75,25 +80,31 @@ ngx_event_connect_peer(ngx_peer_connecti } } - c->recv = ngx_recv; - c->send = ngx_send; - c->recv_chain = ngx_recv_chain; - c->send_chain = ngx_send_chain; + if (type == SOCK_STREAM) { + c->recv = ngx_recv; + c->send = ngx_send; + c->recv_chain = ngx_recv_chain; + c->send_chain = ngx_send_chain; + + c->sendfile = 1; - c->sendfile = 1; + if (pc->sockaddr->sa_family == AF_UNIX) { + c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; + c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; + +#if (NGX_SOLARIS) + /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ + c->sendfile = 0; +#endif + } + + } else { /* type == SOCK_DGRAM */ + c->recv = ngx_udp_recv; + c->send = ngx_send; + } c->log_error = pc->log_error; - if (pc->sockaddr->sa_family == AF_UNIX) { - c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; - c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; - -#if (NGX_SOLARIS) - /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ - c->sendfile = 0; -#endif - } - rev = c->read; wev = c->write; diff --git a/src/event/ngx_event_connect.h b/src/event/ngx_event_connect.h --- a/src/event/ngx_event_connect.h +++ b/src/event/ngx_event_connect.h @@ -55,6 +55,7 @@ struct ngx_peer_connection_s { ngx_addr_t *local; + int type; int rcvbuf; ngx_log_t *log; diff --git a/src/os/unix/ngx_darwin_init.c b/src/os/unix/ngx_darwin_init.c --- a/src/os/unix/ngx_darwin_init.c +++ b/src/os/unix/ngx_darwin_init.c @@ -23,6 +23,7 @@ static ngx_os_io_t ngx_darwin_io = { ngx_readv_chain, ngx_udp_unix_recv, ngx_unix_send, + ngx_udp_unix_send, #if (NGX_HAVE_SENDFILE) ngx_darwin_sendfile_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_freebsd_init.c b/src/os/unix/ngx_freebsd_init.c --- a/src/os/unix/ngx_freebsd_init.c +++ b/src/os/unix/ngx_freebsd_init.c @@ -32,6 +32,7 @@ static ngx_os_io_t ngx_freebsd_io = { ngx_readv_chain, ngx_udp_unix_recv, ngx_unix_send, + ngx_udp_unix_send, #if (NGX_HAVE_SENDFILE) ngx_freebsd_sendfile_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_linux_init.c b/src/os/unix/ngx_linux_init.c --- a/src/os/unix/ngx_linux_init.c +++ b/src/os/unix/ngx_linux_init.c @@ -18,6 +18,7 @@ static ngx_os_io_t ngx_linux_io = { ngx_readv_chain, ngx_udp_unix_recv, ngx_unix_send, + ngx_udp_unix_send, #if (NGX_HAVE_SENDFILE) ngx_linux_sendfile_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_os.h b/src/os/unix/ngx_os.h --- a/src/os/unix/ngx_os.h +++ b/src/os/unix/ngx_os.h @@ -28,6 +28,7 @@ typedef struct { ngx_recv_chain_pt recv_chain; ngx_recv_pt udp_recv; ngx_send_pt send; + ngx_send_pt udp_send; ngx_send_chain_pt send_chain; ngx_uint_t flags; } ngx_os_io_t; @@ -47,6 +48,7 @@ ssize_t ngx_udp_unix_recv(ngx_connection ssize_t ngx_unix_send(ngx_connection_t *c, u_char *buf, size_t size); ngx_chain_t *ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit); +ssize_t ngx_udp_unix_send(ngx_connection_t *c, u_char *buf, size_t size); #if (IOV_MAX > 64) diff --git a/src/os/unix/ngx_posix_init.c b/src/os/unix/ngx_posix_init.c --- a/src/os/unix/ngx_posix_init.c +++ b/src/os/unix/ngx_posix_init.c @@ -24,6 +24,7 @@ ngx_os_io_t ngx_os_io = { ngx_readv_chain, ngx_udp_unix_recv, ngx_unix_send, + ngx_udp_unix_send, ngx_writev_chain, 0 }; diff --git a/src/os/unix/ngx_solaris_init.c b/src/os/unix/ngx_solaris_init.c --- a/src/os/unix/ngx_solaris_init.c +++ b/src/os/unix/ngx_solaris_init.c @@ -19,6 +19,7 @@ static ngx_os_io_t ngx_solaris_io = { ngx_readv_chain, ngx_udp_unix_recv, ngx_unix_send, + ngx_udp_unix_send, #if (NGX_HAVE_SENDFILE) ngx_solaris_sendfilev_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_udp_send.c b/src/os/unix/ngx_udp_send.c new file mode 100644 --- /dev/null +++ b/src/os/unix/ngx_udp_send.c @@ -0,0 +1,56 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +ssize_t +ngx_udp_unix_send(ngx_connection_t *c, u_char *buf, size_t size) +{ + ssize_t n; + ngx_err_t err; + ngx_event_t *wev; + + wev = c->write; + + for ( ;; ) { + n = sendto(c->fd, buf, size, 0, c->sockaddr, c->socklen); + + ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, + "sendto: fd:%d %z of %uz to \"%V\"", + c->fd, n, size, &c->addr_text); + + if (n >= 0) { + if ((size_t) n != size) { + wev->error = 1; + (void) ngx_connection_error(c, 0, "sendto() incomplete"); + return NGX_ERROR; + } + + c->sent += n; + + return n; + } + + err = ngx_socket_errno; + + if (err == NGX_EAGAIN) { + wev->ready = 0; + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, NGX_EAGAIN, + "sendto() not ready"); + return NGX_AGAIN; + } + + if (err != NGX_EINTR) { + wev->error = 1; + (void) ngx_connection_error(c, err, "sendto() failed"); + return NGX_ERROR; + } + } +} diff --git a/src/stream/ngx_stream.c b/src/stream/ngx_stream.c --- a/src/stream/ngx_stream.c +++ b/src/stream/ngx_stream.c @@ -275,8 +275,11 @@ ngx_stream_add_ports(ngx_conf_t *cf, ngx port = ports->elts; for (i = 0; i < ports->nelts; i++) { - if (p == port[i].port && sa->sa_family == port[i].family) { + if (p == port[i].port + && listen->type == port[i].type + && sa->sa_family == port[i].family) + { /* a port is already in the port list */ port = &port[i]; @@ -292,6 +295,7 @@ ngx_stream_add_ports(ngx_conf_t *cf, ngx } port->family = sa->sa_family; + port->type = listen->type; port->port = p; if (ngx_array_init(&port->addrs, cf->temp_pool, 2, @@ -364,6 +368,7 @@ ngx_stream_optimize_servers(ngx_conf_t * ls->addr_ntop = 1; ls->handler = ngx_stream_init_connection; ls->pool_size = 256; + ls->type = addr[i].opt.type; cscf = addr->opt.ctx->srv_conf[ngx_stream_core_module.ctx_index]; @@ -373,6 +378,8 @@ ngx_stream_optimize_servers(ngx_conf_t * ls->backlog = addr[i].opt.backlog; + ls->wildcard = addr[i].opt.wildcard; + ls->keepalive = addr[i].opt.so_keepalive; #if (NGX_HAVE_KEEPALIVE_TUNABLE) ls->keepidle = addr[i].opt.tcp_keepidle; diff --git a/src/stream/ngx_stream.h b/src/stream/ngx_stream.h --- a/src/stream/ngx_stream.h +++ b/src/stream/ngx_stream.h @@ -66,6 +66,7 @@ typedef struct { int tcp_keepcnt; #endif int backlog; + int type; } ngx_stream_listen_t; @@ -102,6 +103,7 @@ typedef struct { typedef struct { int family; + int type; in_port_t port; ngx_array_t addrs; /* array of ngx_stream_conf_addr_t */ } ngx_stream_conf_port_t; diff --git a/src/stream/ngx_stream_core_module.c b/src/stream/ngx_stream_core_module.c --- a/src/stream/ngx_stream_core_module.c +++ b/src/stream/ngx_stream_core_module.c @@ -252,7 +252,7 @@ ngx_stream_core_listen(ngx_conf_t *cf, n in_port_t port; ngx_str_t *value; ngx_url_t u; - ngx_uint_t i; + ngx_uint_t i, backlog; struct sockaddr *sa; struct sockaddr_in *sin; ngx_stream_listen_t *ls; @@ -343,6 +343,7 @@ ngx_stream_core_listen(ngx_conf_t *cf, n ls->socklen = u.socklen; ls->backlog = NGX_LISTEN_BACKLOG; + ls->type = SOCK_STREAM; ls->wildcard = u.wildcard; ls->ctx = cf->ctx; @@ -350,8 +351,17 @@ ngx_stream_core_listen(ngx_conf_t *cf, n ls->ipv6only = 1; #endif + backlog = 0; + for (i = 2; i < cf->args->nelts; i++) { +#if !(NGX_WIN32) + if (ngx_strcmp(value[i].data, "udp") == 0) { + ls->type = SOCK_DGRAM; + continue; + } +#endif + if (ngx_strcmp(value[i].data, "bind") == 0) { ls->bind = 1; continue; @@ -367,6 +377,8 @@ ngx_stream_core_listen(ngx_conf_t *cf, n return NGX_CONF_ERROR; } + backlog = 1; + continue; } @@ -530,5 +542,21 @@ ngx_stream_core_listen(ngx_conf_t *cf, n return NGX_CONF_ERROR; } + if (ls->type == SOCK_DGRAM) { + if (backlog) { + return "\"backlog\" parameter is incompatible with \"udp\""; + } + +#if (NGX_STREAM_SSL) + if (ls->ssl) { + return "\"ssl\" parameter is incompatible with \"udp\""; + } +#endif + + if (ls->so_keepalive) { + return "\"so_keepalive\" parameter is incompatible with \"udp\""; + } + } + return NGX_CONF_OK; } diff --git a/src/stream/ngx_stream_handler.c b/src/stream/ngx_stream_handler.c --- a/src/stream/ngx_stream_handler.c +++ b/src/stream/ngx_stream_handler.c @@ -52,7 +52,7 @@ ngx_stream_init_connection(ngx_connectio * is the "*:port" wildcard so getsockname() is needed to determine * the server address. * - * AcceptEx() already gave this address. + * AcceptEx() and recvmsg() already gave this address. */ if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { @@ -166,7 +166,10 @@ ngx_stream_init_connection(ngx_connectio } } - if (cscf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { + if (c->type == SOCK_STREAM + && cscf->tcp_nodelay + && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) + { ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, "tcp_nodelay"); tcp_nodelay = 1; diff --git a/src/stream/ngx_stream_proxy_module.c b/src/stream/ngx_stream_proxy_module.c --- a/src/stream/ngx_stream_proxy_module.c +++ b/src/stream/ngx_stream_proxy_module.c @@ -17,6 +17,7 @@ typedef struct { size_t buffer_size; size_t upload_rate; size_t download_rate; + ngx_uint_t responses; ngx_uint_t next_upstream_tries; ngx_flag_t next_upstream; ngx_flag_t proxy_protocol; @@ -167,6 +168,13 @@ static ngx_command_t ngx_stream_proxy_c offsetof(ngx_stream_proxy_srv_conf_t, download_rate), NULL }, + { ngx_string("proxy_responses"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, responses), + NULL }, + { ngx_string("proxy_next_upstream"), NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, ngx_conf_set_flag_slot, @@ -351,6 +359,7 @@ ngx_stream_proxy_handler(ngx_stream_sess u->peer.log_error = NGX_ERROR_ERR; u->peer.local = pscf->local; + u->peer.type = c->type; uscf = pscf->upstream; @@ -370,6 +379,14 @@ ngx_stream_proxy_handler(ngx_stream_sess u->proxy_protocol = pscf->proxy_protocol; u->start_sec = ngx_time(); + c->write->handler = ngx_stream_proxy_downstream_handler; + c->read->handler = ngx_stream_proxy_downstream_handler; + + if (c->type == SOCK_DGRAM) { + ngx_stream_proxy_connect(s); + return; + } + p = ngx_pnalloc(c->pool, pscf->buffer_size); if (p == NULL) { ngx_stream_proxy_finalize(s, NGX_ERROR); @@ -381,9 +398,6 @@ ngx_stream_proxy_handler(ngx_stream_sess u->downstream_buf.pos = p; u->downstream_buf.last = p; - c->write->handler = ngx_stream_proxy_downstream_handler; - c->read->handler = ngx_stream_proxy_downstream_handler; - if (u->proxy_protocol #if (NGX_STREAM_SSL) && pscf->ssl == NULL @@ -488,7 +502,10 @@ ngx_stream_proxy_init_upstream(ngx_strea cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); - if (cscf->tcp_nodelay && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { + if (pc->type == SOCK_STREAM + && cscf->tcp_nodelay + && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) + { ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay"); tcp_nodelay = 1; @@ -516,7 +533,7 @@ ngx_stream_proxy_init_upstream(ngx_strea pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); #if (NGX_STREAM_SSL) - if (pscf->ssl && pc->ssl == NULL) { + if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) { ngx_stream_proxy_ssl_init_connection(s); return; } @@ -544,23 +561,35 @@ ngx_stream_proxy_init_upstream(ngx_strea c->log->action = "proxying connection"; - p = ngx_pnalloc(c->pool, pscf->buffer_size); - if (p == NULL) { - ngx_stream_proxy_finalize(s, NGX_ERROR); - return; + if (u->upstream_buf.start == NULL) { + p = ngx_pnalloc(c->pool, pscf->buffer_size); + if (p == NULL) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + u->upstream_buf.start = p; + u->upstream_buf.end = p + pscf->buffer_size; + u->upstream_buf.pos = p; + u->upstream_buf.last = p; } - u->upstream_buf.start = p; - u->upstream_buf.end = p + pscf->buffer_size; - u->upstream_buf.pos = p; - u->upstream_buf.last = p; + if (c->type == SOCK_DGRAM) { + s->received = c->buffer->last - c->buffer->pos; + u->downstream_buf = *c->buffer; + + if (pscf->responses == 0) { + pc->read->ready = 0; + pc->read->eof = 1; + } + } u->connected = 1; pc->read->handler = ngx_stream_proxy_upstream_handler; pc->write->handler = ngx_stream_proxy_upstream_handler; - if (pc->read->ready) { + if (pc->read->ready || pc->read->eof) { ngx_post_event(pc->read, &ngx_posted_events); } @@ -894,11 +923,15 @@ ngx_stream_proxy_process_connection(ngx_ s = c->data; u = s->upstream; + c = s->connection; + pc = u->peer.connection; + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + if (ev->timedout) { + ev->timedout = 0; if (ev->delayed) { - - ev->timedout = 0; ev->delayed = 0; if (!ev->ready) { @@ -907,20 +940,35 @@ ngx_stream_proxy_process_connection(ngx_ return; } - if (u->connected) { - pc = u->peer.connection; - - if (!c->read->delayed && !pc->read->delayed) { - pscf = ngx_stream_get_module_srv_conf(s, - ngx_stream_proxy_module); - ngx_add_timer(c->write, pscf->timeout); - } + if (u->connected && !c->read->delayed && !pc->read->delayed) { + ngx_add_timer(c->write, pscf->timeout); } return; } } else { + if (s->connection->type == SOCK_DGRAM) { + if (pscf->responses == NGX_MAX_INT32_VALUE) { + + /* + * successfully terminate timed out UDP session + * with unspecified number of responses + */ + + pc->read->ready = 0; + pc->read->eof = 1; + + ngx_stream_proxy_process(s, 1, 0); + return; + } + + if (u->received == 0) { + ngx_stream_proxy_next_upstream(s); + return; + } + } + ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); ngx_stream_proxy_finalize(s, NGX_DECLINED); return; @@ -1039,6 +1087,21 @@ ngx_stream_proxy_process(ngx_stream_sess c = s->connection; pc = u->connected ? u->peer.connection : NULL; + if (c->type == SOCK_DGRAM && (ngx_terminate || ngx_exiting)) { + + /* socket is already closed on worker shutdown */ + + handler = c->log->handler; + c->log->handler = NULL; + + ngx_log_error(NGX_LOG_INFO, c->log, 0, "disconnected on shutdown"); + + c->log->handler = handler; + + ngx_stream_proxy_finalize(s, NGX_OK); + return; + } + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); if (from_upstream) { @@ -1066,7 +1129,17 @@ ngx_stream_proxy_process(ngx_stream_sess n = dst->send(dst, b->pos, size); + if (n == NGX_AGAIN && dst->shared) { + /* cannot wait on a shared socket */ + n = NGX_ERROR; + } + if (n == NGX_ERROR) { + if (c->type == SOCK_DGRAM && !from_upstream) { + ngx_stream_proxy_next_upstream(s); + return; + } + ngx_stream_proxy_finalize(s, NGX_DECLINED); return; } @@ -1118,6 +1191,12 @@ ngx_stream_proxy_process(ngx_stream_sess } } + if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses) + { + src->read->ready = 0; + src->read->eof = 1; + } + *received += n; b->last += n; do_write = 1; @@ -1126,6 +1205,11 @@ ngx_stream_proxy_process(ngx_stream_sess } if (n == NGX_ERROR) { + if (c->type == SOCK_DGRAM && u->received == 0) { + ngx_stream_proxy_next_upstream(s); + return; + } + src->read->eof = 1; } } @@ -1152,13 +1236,13 @@ ngx_stream_proxy_process(ngx_stream_sess flags = src->read->eof ? NGX_CLOSE_EVENT : 0; - if (ngx_handle_read_event(src->read, flags) != NGX_OK) { + if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) { ngx_stream_proxy_finalize(s, NGX_ERROR); return; } if (dst) { - if (ngx_handle_write_event(dst->write, 0) != NGX_OK) { + if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) { ngx_stream_proxy_finalize(s, NGX_ERROR); return; } @@ -1331,6 +1415,7 @@ ngx_stream_proxy_create_srv_conf(ngx_con conf->buffer_size = NGX_CONF_UNSET_SIZE; conf->upload_rate = NGX_CONF_UNSET_SIZE; conf->download_rate = NGX_CONF_UNSET_SIZE; + conf->responses = NGX_CONF_UNSET_UINT; conf->next_upstream_tries = NGX_CONF_UNSET_UINT; conf->next_upstream = NGX_CONF_UNSET; conf->proxy_protocol = NGX_CONF_UNSET; @@ -1373,6 +1458,9 @@ ngx_stream_proxy_merge_srv_conf(ngx_conf ngx_conf_merge_size_value(conf->download_rate, prev->download_rate, 0); + ngx_conf_merge_uint_value(conf->responses, + prev->responses, NGX_MAX_INT32_VALUE); + ngx_conf_merge_uint_value(conf->next_upstream_tries, prev->next_upstream_tries, 0); diff --git a/src/stream/ngx_stream_upstream.h b/src/stream/ngx_stream_upstream.h --- a/src/stream/ngx_stream_upstream.h +++ b/src/stream/ngx_stream_upstream.h @@ -84,6 +84,7 @@ typedef struct { ngx_buf_t upstream_buf; off_t received; time_t start_sec; + ngx_uint_t responses; #if (NGX_STREAM_SSL) ngx_str_t ssl_name; #endif