# HG changeset patch # User Ruslan Ermilov # Date 1429524311 -10800 # Node ID 61d7ae76647d10fadc4399f570e0594308dbd66b # Parent 4a640716f4e2fffdec2fb5c18b5bc3fecb9c6c1a Stream: port from NGINX+. diff --git a/auto/make b/auto/make --- a/auto/make +++ b/auto/make @@ -10,6 +10,7 @@ mkdir -p $NGX_OBJS/src/core $NGX_OBJS/sr $NGX_OBJS/src/http $NGX_OBJS/src/http/modules \ $NGX_OBJS/src/http/modules/perl \ $NGX_OBJS/src/mail \ + $NGX_OBJS/src/stream \ $NGX_OBJS/src/misc @@ -121,6 +122,32 @@ END fi +# the stream dependences and include paths + +if [ $STREAM = YES ]; then + + ngx_all_srcs="$ngx_all_srcs $STREAM_SRCS" + + ngx_deps=`echo $STREAM_DEPS \ + | sed -e "s/ *\([^ ][^ ]*\)/$ngx_regex_cont\1/g" \ + -e "s/\//$ngx_regex_dirsep/g"` + + ngx_incs=`echo $STREAM_INCS \ + | sed -e "s/ *\([^ ][^ ]*\)/$ngx_regex_cont$ngx_include_opt\1/g" \ + -e "s/\//$ngx_regex_dirsep/g"` + + cat << END >> $NGX_MAKEFILE + +STREAM_DEPS = $ngx_deps + + +STREAM_INCS = $ngx_include_opt$ngx_incs + +END + +fi + + ngx_all_srcs="$ngx_all_srcs $NGX_MISC_SRCS" @@ -306,6 +333,36 @@ END fi +# the stream sources + +if [ $STREAM = YES ]; then + + if test -n "$NGX_PCH"; then + ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS)" + else + ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) \$(CORE_INCS) \$(STREAM_INCS)" + fi + + for ngx_src in $STREAM_SRCS + do + ngx_src=`echo $ngx_src | sed -e "s/\//$ngx_regex_dirsep/g"` + ngx_obj=`echo $ngx_src \ + | sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \ + -e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \ + -e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \ + -e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"` + + cat << END >> $NGX_MAKEFILE + +$ngx_obj: \$(CORE_DEPS) \$(STREAM_DEPS)$ngx_cont$ngx_src + $ngx_cc$ngx_tab$ngx_objout$ngx_obj$ngx_tab$ngx_src$NGX_AUX + +END + done + +fi + + # the misc sources if test -n "$NGX_MISC_SRCS"; then diff --git a/auto/modules b/auto/modules --- a/auto/modules +++ b/auto/modules @@ -435,6 +435,12 @@ if [ $MAIL_SSL = YES ]; then fi +if [ $STREAM_SSL = YES ]; then + have=NGX_STREAM_SSL . auto/have + USE_OPENSSL=YES +fi + + modules="$CORE_MODULES $EVENT_MODULES" @@ -505,6 +511,36 @@ if [ $MAIL = YES ]; then fi +if [ $STREAM = YES ]; then + have=NGX_STREAM . auto/have + modules="$modules $STREAM_MODULES" + + if [ $STREAM_SSL = YES ]; then + modules="$modules $STREAM_SSL_MODULE" + STREAM_DEPS="$STREAM_DEPS $STREAM_SSL_DEPS" + STREAM_SRCS="$STREAM_SRCS $STREAM_SSL_SRCS" + fi + + if [ $STREAM_UPSTREAM_HASH = YES ]; then + modules="$modules $STREAM_UPSTREAM_HASH_MODULE" + STREAM_SRCS="$STREAM_SRCS $STREAM_UPSTREAM_HASH_SRCS" + fi + + if [ $STREAM_UPSTREAM_LEAST_CONN = YES ]; then + modules="$modules $STREAM_UPSTREAM_LEAST_CONN_MODULE" + STREAM_SRCS="$STREAM_SRCS $STREAM_UPSTREAM_LEAST_CONN_SRCS" + fi + + if [ $STREAM_UPSTREAM_ZONE = YES ]; then + have=NGX_STREAM_UPSTREAM_ZONE . auto/have + modules="$modules $STREAM_UPSTREAM_ZONE_MODULE" + STREAM_SRCS="$STREAM_SRCS $STREAM_UPSTREAM_ZONE_SRCS" + fi + + NGX_ADDON_DEPS="$NGX_ADDON_DEPS \$(STREAM_DEPS)" +fi + + if [ $NGX_GOOGLE_PERFTOOLS = YES ]; then modules="$modules $NGX_GOOGLE_PERFTOOLS_MODULE" NGX_MISC_SRCS="$NGX_MISC_SRCS $NGX_GOOGLE_PERFTOOLS_SRCS" diff --git a/auto/options b/auto/options --- a/auto/options +++ b/auto/options @@ -114,6 +114,12 @@ MAIL_POP3=YES MAIL_IMAP=YES MAIL_SMTP=YES +STREAM=NO +STREAM_SSL=NO +STREAM_UPSTREAM_HASH=YES +STREAM_UPSTREAM_LEAST_CONN=YES +STREAM_UPSTREAM_ZONE=YES + NGX_ADDONS= USE_PCRE=NO @@ -275,6 +281,15 @@ use the \"--without-http_limit_conn_modu --without-mail_imap_module) MAIL_IMAP=NO ;; --without-mail_smtp_module) MAIL_SMTP=NO ;; + --with-stream) STREAM=YES ;; + --with-stream_ssl_module) STREAM_SSL=YES ;; + --without-stream_upstream_hash_module) + STREAM_UPSTREAM_HASH=NO ;; + --without-stream_upstream_least_conn_module) + STREAM_UPSTREAM_LEAST_CONN=NO ;; + --without-stream_upstream_zone_module) + STREAM_UPSTREAM_ZONE=NO ;; + --with-google_perftools_module) NGX_GOOGLE_PERFTOOLS=YES ;; --with-cpp_test_module) NGX_CPP_TEST=YES ;; @@ -436,6 +451,15 @@ cat << END --without-mail_imap_module disable ngx_mail_imap_module --without-mail_smtp_module disable ngx_mail_smtp_module + --with-stream enable TCP proxy module + --with-stream_ssl_module enable ngx_stream_ssl_module + --without-stream_upstream_hash_module + disable ngx_stream_upstream_hash_module + --without-stream_upstream_least_conn_module + disable ngx_stream_upstream_least_conn_module + --without-stream_upstream_zone_module + disable ngx_stream_upstream_zone_module + --with-google_perftools_module enable ngx_google_perftools_module --with-cpp_test_module enable ngx_cpp_test_module diff --git a/auto/sources b/auto/sources --- a/auto/sources +++ b/auto/sources @@ -554,6 +554,40 @@ MAIL_AUTH_HTTP_SRCS="src/mail/ngx_mail_a MAIL_PROXY_MODULE="ngx_mail_proxy_module" MAIL_PROXY_SRCS="src/mail/ngx_mail_proxy_module.c" + +STREAM_INCS="src/stream" + +STREAM_DEPS="src/stream/ngx_stream.h \ + src/stream/ngx_stream_upstream.h \ + src/stream/ngx_stream_upstream_round_robin.h" + +STREAM_MODULES="ngx_stream_module \ + ngx_stream_core_module \ + ngx_stream_proxy_module \ + ngx_stream_upstream_module" + +STREAM_SRCS="src/stream/ngx_stream.c \ + src/stream/ngx_stream_handler.c \ + src/stream/ngx_stream_core_module.c \ + src/stream/ngx_stream_proxy_module.c \ + src/stream/ngx_stream_upstream.c \ + src/stream/ngx_stream_upstream_round_robin.c" + +STREAM_SSL_MODULE="ngx_stream_ssl_module" +STREAM_SSL_DEPS="src/stream/ngx_stream_ssl_module.h" +STREAM_SSL_SRCS="src/stream/ngx_stream_ssl_module.c" + +STREAM_UPSTREAM_HASH_MODULE=ngx_stream_upstream_hash_module +STREAM_UPSTREAM_HASH_SRCS=src/stream/ngx_stream_upstream_hash_module.c + +STREAM_UPSTREAM_LEAST_CONN_MODULE=ngx_stream_upstream_least_conn_module +STREAM_UPSTREAM_LEAST_CONN_SRCS=" \ + src/stream/ngx_stream_upstream_least_conn_module.c" + +STREAM_UPSTREAM_ZONE_MODULE=ngx_stream_upstream_zone_module +STREAM_UPSTREAM_ZONE_SRCS=src/stream/ngx_stream_upstream_zone_module.c + + NGX_GOOGLE_PERFTOOLS_MODULE=ngx_google_perftools_module NGX_GOOGLE_PERFTOOLS_SRCS=src/misc/ngx_google_perftools_module.c diff --git a/src/core/ngx_log.c b/src/core/ngx_log.c --- a/src/core/ngx_log.c +++ b/src/core/ngx_log.c @@ -86,7 +86,7 @@ static ngx_str_t err_levels[] = { static const char *debug_levels[] = { "debug_core", "debug_alloc", "debug_mutex", "debug_event", - "debug_http", "debug_mail", "debug_mysql" + "debug_http", "debug_mail", "debug_mysql", "debug_stream" }; diff --git a/src/core/ngx_log.h b/src/core/ngx_log.h --- a/src/core/ngx_log.h +++ b/src/core/ngx_log.h @@ -30,6 +30,7 @@ #define NGX_LOG_DEBUG_HTTP 0x100 #define NGX_LOG_DEBUG_MAIL 0x200 #define NGX_LOG_DEBUG_MYSQL 0x400 +#define NGX_LOG_DEBUG_STREAM 0x800 /* * do not forget to update debug_levels[] in src/core/ngx_log.c @@ -37,7 +38,7 @@ */ #define NGX_LOG_DEBUG_FIRST NGX_LOG_DEBUG_CORE -#define NGX_LOG_DEBUG_LAST NGX_LOG_DEBUG_MYSQL +#define NGX_LOG_DEBUG_LAST NGX_LOG_DEBUG_STREAM #define NGX_LOG_DEBUG_CONNECTION 0x80000000 #define NGX_LOG_DEBUG_ALL 0x7ffffff0 diff --git a/src/stream/ngx_stream.c b/src/stream/ngx_stream.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream.c @@ -0,0 +1,557 @@ + +/* + * Copyright (C) Roman Arutyunyan + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include +#include + + +static char *ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); +static ngx_int_t ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports, + ngx_stream_listen_t *listen); +static char *ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports); +static ngx_int_t ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport, + ngx_stream_conf_addr_t *addr); +#if (NGX_HAVE_INET6) +static ngx_int_t ngx_stream_add_addrs6(ngx_conf_t *cf, + ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr); +#endif +static ngx_int_t ngx_stream_cmp_conf_addrs(const void *one, const void *two); + + +ngx_uint_t ngx_stream_max_module; + + +static ngx_command_t ngx_stream_commands[] = { + + { ngx_string("stream"), + NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, + ngx_stream_block, + 0, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_core_module_t ngx_stream_module_ctx = { + ngx_string("stream"), + NULL, + NULL +}; + + +ngx_module_t ngx_stream_module = { + NGX_MODULE_V1, + &ngx_stream_module_ctx, /* module context */ + ngx_stream_commands, /* module directives */ + NGX_CORE_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static char * +ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + char *rv; + ngx_uint_t i, m, mi, s; + ngx_conf_t pcf; + ngx_array_t ports; + ngx_stream_listen_t *listen; + ngx_stream_module_t *module; + ngx_stream_conf_ctx_t *ctx; + ngx_stream_core_srv_conf_t **cscfp; + ngx_stream_core_main_conf_t *cmcf; + + /* the main stream context */ + + ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t)); + if (ctx == NULL) { + return NGX_CONF_ERROR; + } + + *(ngx_stream_conf_ctx_t **) conf = ctx; + + /* count the number of the stream modules and set up their indices */ + + ngx_stream_max_module = 0; + for (m = 0; ngx_modules[m]; m++) { + if (ngx_modules[m]->type != NGX_STREAM_MODULE) { + continue; + } + + ngx_modules[m]->ctx_index = ngx_stream_max_module++; + } + + + /* the stream main_conf context, it's the same in the all stream contexts */ + + ctx->main_conf = ngx_pcalloc(cf->pool, + sizeof(void *) * ngx_stream_max_module); + if (ctx->main_conf == NULL) { + return NGX_CONF_ERROR; + } + + + /* + * the stream null srv_conf context, it is used to merge + * the server{}s' srv_conf's + */ + + ctx->srv_conf = ngx_pcalloc(cf->pool, + sizeof(void *) * ngx_stream_max_module); + if (ctx->srv_conf == NULL) { + return NGX_CONF_ERROR; + } + + + /* + * create the main_conf's and the null srv_conf's of the all stream modules + */ + + for (m = 0; ngx_modules[m]; m++) { + if (ngx_modules[m]->type != NGX_STREAM_MODULE) { + continue; + } + + module = ngx_modules[m]->ctx; + mi = ngx_modules[m]->ctx_index; + + if (module->create_main_conf) { + ctx->main_conf[mi] = module->create_main_conf(cf); + if (ctx->main_conf[mi] == NULL) { + return NGX_CONF_ERROR; + } + } + + if (module->create_srv_conf) { + ctx->srv_conf[mi] = module->create_srv_conf(cf); + if (ctx->srv_conf[mi] == NULL) { + return NGX_CONF_ERROR; + } + } + } + + + /* parse inside the stream{} block */ + + pcf = *cf; + cf->ctx = ctx; + + cf->module_type = NGX_STREAM_MODULE; + cf->cmd_type = NGX_STREAM_MAIN_CONF; + rv = ngx_conf_parse(cf, NULL); + + if (rv != NGX_CONF_OK) { + *cf = pcf; + return rv; + } + + + /* init stream{} main_conf's, merge the server{}s' srv_conf's */ + + cmcf = ctx->main_conf[ngx_stream_core_module.ctx_index]; + cscfp = cmcf->servers.elts; + + for (m = 0; ngx_modules[m]; m++) { + if (ngx_modules[m]->type != NGX_STREAM_MODULE) { + continue; + } + + module = ngx_modules[m]->ctx; + mi = ngx_modules[m]->ctx_index; + + /* init stream{} main_conf's */ + + cf->ctx = ctx; + + if (module->init_main_conf) { + rv = module->init_main_conf(cf, ctx->main_conf[mi]); + if (rv != NGX_CONF_OK) { + *cf = pcf; + return rv; + } + } + + for (s = 0; s < cmcf->servers.nelts; s++) { + + /* merge the server{}s' srv_conf's */ + + cf->ctx = cscfp[s]->ctx; + + if (module->merge_srv_conf) { + rv = module->merge_srv_conf(cf, + ctx->srv_conf[mi], + cscfp[s]->ctx->srv_conf[mi]); + if (rv != NGX_CONF_OK) { + *cf = pcf; + return rv; + } + } + } + } + + *cf = pcf; + + + if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_stream_conf_port_t)) + != NGX_OK) + { + return NGX_CONF_ERROR; + } + + listen = cmcf->listen.elts; + + for (i = 0; i < cmcf->listen.nelts; i++) { + if (ngx_stream_add_ports(cf, &ports, &listen[i]) != NGX_OK) { + return NGX_CONF_ERROR; + } + } + + return ngx_stream_optimize_servers(cf, &ports); +} + + +static ngx_int_t +ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports, + ngx_stream_listen_t *listen) +{ + in_port_t p; + ngx_uint_t i; + struct sockaddr *sa; + struct sockaddr_in *sin; + ngx_stream_conf_port_t *port; + ngx_stream_conf_addr_t *addr; +#if (NGX_HAVE_INET6) + struct sockaddr_in6 *sin6; +#endif + + sa = (struct sockaddr *) &listen->sockaddr; + + switch (sa->sa_family) { + +#if (NGX_HAVE_INET6) + case AF_INET6: + sin6 = (struct sockaddr_in6 *) sa; + p = sin6->sin6_port; + break; +#endif + +#if (NGX_HAVE_UNIX_DOMAIN) + case AF_UNIX: + p = 0; + break; +#endif + + default: /* AF_INET */ + sin = (struct sockaddr_in *) sa; + p = sin->sin_port; + break; + } + + port = ports->elts; + for (i = 0; i < ports->nelts; i++) { + if (p == port[i].port && sa->sa_family == port[i].family) { + + /* a port is already in the port list */ + + port = &port[i]; + goto found; + } + } + + /* add a port to the port list */ + + port = ngx_array_push(ports); + if (port == NULL) { + return NGX_ERROR; + } + + port->family = sa->sa_family; + port->port = p; + + if (ngx_array_init(&port->addrs, cf->temp_pool, 2, + sizeof(ngx_stream_conf_addr_t)) + != NGX_OK) + { + return NGX_ERROR; + } + +found: + + addr = ngx_array_push(&port->addrs); + if (addr == NULL) { + return NGX_ERROR; + } + + addr->sockaddr = (struct sockaddr *) &listen->sockaddr; + addr->socklen = listen->socklen; + addr->ctx = listen->ctx; + addr->bind = listen->bind; + addr->wildcard = listen->wildcard; + addr->so_keepalive = listen->so_keepalive; +#if (NGX_HAVE_KEEPALIVE_TUNABLE) + addr->tcp_keepidle = listen->tcp_keepidle; + addr->tcp_keepintvl = listen->tcp_keepintvl; + addr->tcp_keepcnt = listen->tcp_keepcnt; +#endif +#if (NGX_STREAM_SSL) + addr->ssl = listen->ssl; +#endif +#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) + addr->ipv6only = listen->ipv6only; +#endif + + return NGX_OK; +} + + +static char * +ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports) +{ + ngx_uint_t i, p, last, bind_wildcard; + ngx_listening_t *ls; + ngx_stream_port_t *stport; + ngx_stream_conf_port_t *port; + ngx_stream_conf_addr_t *addr; + ngx_stream_core_srv_conf_t *cscf; + + port = ports->elts; + for (p = 0; p < ports->nelts; p++) { + + ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts, + sizeof(ngx_stream_conf_addr_t), ngx_stream_cmp_conf_addrs); + + addr = port[p].addrs.elts; + last = port[p].addrs.nelts; + + /* + * if there is the binding to the "*:port" then we need to bind() + * to the "*:port" only and ignore the other bindings + */ + + if (addr[last - 1].wildcard) { + addr[last - 1].bind = 1; + bind_wildcard = 1; + + } else { + bind_wildcard = 0; + } + + i = 0; + + while (i < last) { + + if (bind_wildcard && !addr[i].bind) { + i++; + continue; + } + + ls = ngx_create_listening(cf, addr[i].sockaddr, addr[i].socklen); + if (ls == NULL) { + return NGX_CONF_ERROR; + } + + ls->addr_ntop = 1; + ls->handler = ngx_stream_init_connection; + ls->pool_size = 256; + + cscf = addr->ctx->srv_conf[ngx_stream_core_module.ctx_index]; + ls->logp = cscf->error_log; + + ls->log.data = &ls->addr_text; + ls->log.handler = ngx_accept_log_error; + + ls->keepalive = addr[i].so_keepalive; +#if (NGX_HAVE_KEEPALIVE_TUNABLE) + ls->keepidle = addr[i].tcp_keepidle; + ls->keepintvl = addr[i].tcp_keepintvl; + ls->keepcnt = addr[i].tcp_keepcnt; +#endif + +#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) + ls->ipv6only = addr[i].ipv6only; +#endif + + stport = ngx_palloc(cf->pool, sizeof(ngx_stream_port_t)); + if (stport == NULL) { + return NGX_CONF_ERROR; + } + + ls->servers = stport; + + if (i == last - 1) { + stport->naddrs = last; + + } else { + stport->naddrs = 1; + i = 0; + } + + switch (ls->sockaddr->sa_family) { +#if (NGX_HAVE_INET6) + case AF_INET6: + if (ngx_stream_add_addrs6(cf, stport, addr) != NGX_OK) { + return NGX_CONF_ERROR; + } + break; +#endif + default: /* AF_INET */ + if (ngx_stream_add_addrs(cf, stport, addr) != NGX_OK) { + return NGX_CONF_ERROR; + } + break; + } + + addr++; + last--; + } + } + + return NGX_CONF_OK; +} + + +static ngx_int_t +ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport, + ngx_stream_conf_addr_t *addr) +{ + u_char *p; + size_t len; + ngx_uint_t i; + struct sockaddr_in *sin; + ngx_stream_in_addr_t *addrs; + u_char buf[NGX_SOCKADDR_STRLEN]; + + stport->addrs = ngx_pcalloc(cf->pool, + stport->naddrs * sizeof(ngx_stream_in_addr_t)); + if (stport->addrs == NULL) { + return NGX_ERROR; + } + + addrs = stport->addrs; + + for (i = 0; i < stport->naddrs; i++) { + + sin = (struct sockaddr_in *) addr[i].sockaddr; + addrs[i].addr = sin->sin_addr.s_addr; + + addrs[i].conf.ctx = addr[i].ctx; +#if (NGX_STREAM_SSL) + addrs[i].conf.ssl = addr[i].ssl; +#endif + + len = ngx_sock_ntop(addr[i].sockaddr, addr[i].socklen, buf, + NGX_SOCKADDR_STRLEN, 1); + + p = ngx_pnalloc(cf->pool, len); + if (p == NULL) { + return NGX_ERROR; + } + + ngx_memcpy(p, buf, len); + + addrs[i].conf.addr_text.len = len; + addrs[i].conf.addr_text.data = p; + } + + return NGX_OK; +} + + +#if (NGX_HAVE_INET6) + +static ngx_int_t +ngx_stream_add_addrs6(ngx_conf_t *cf, ngx_stream_port_t *stport, + ngx_stream_conf_addr_t *addr) +{ + u_char *p; + size_t len; + ngx_uint_t i; + struct sockaddr_in6 *sin6; + ngx_stream_in6_addr_t *addrs6; + u_char buf[NGX_SOCKADDR_STRLEN]; + + stport->addrs = ngx_pcalloc(cf->pool, + stport->naddrs * sizeof(ngx_stream_in6_addr_t)); + if (stport->addrs == NULL) { + return NGX_ERROR; + } + + addrs6 = stport->addrs; + + for (i = 0; i < stport->naddrs; i++) { + + sin6 = (struct sockaddr_in6 *) addr[i].sockaddr; + addrs6[i].addr6 = sin6->sin6_addr; + + addrs6[i].conf.ctx = addr[i].ctx; +#if (NGX_STREAM_SSL) + addrs6[i].conf.ssl = addr[i].ssl; +#endif + + len = ngx_sock_ntop(addr[i].sockaddr, addr[i].socklen, buf, + NGX_SOCKADDR_STRLEN, 1); + + p = ngx_pnalloc(cf->pool, len); + if (p == NULL) { + return NGX_ERROR; + } + + ngx_memcpy(p, buf, len); + + addrs6[i].conf.addr_text.len = len; + addrs6[i].conf.addr_text.data = p; + } + + return NGX_OK; +} + +#endif + + +static ngx_int_t +ngx_stream_cmp_conf_addrs(const void *one, const void *two) +{ + ngx_stream_conf_addr_t *first, *second; + + first = (ngx_stream_conf_addr_t *) one; + second = (ngx_stream_conf_addr_t *) two; + + if (first->wildcard) { + /* a wildcard must be the last resort, shift it to the end */ + return 1; + } + + if (second->wildcard) { + /* a wildcard must be the last resort, shift it to the end */ + return -1; + } + + if (first->bind && !second->bind) { + /* shift explicit bind()ed addresses to the start */ + return -1; + } + + if (!first->bind && second->bind) { + /* shift explicit bind()ed addresses to the start */ + return 1; + } + + /* do not sort by default */ + + return 0; +} diff --git a/src/stream/ngx_stream.h b/src/stream/ngx_stream.h new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream.h @@ -0,0 +1,215 @@ + +/* + * Copyright (C) Roman Arutyunyan + * Copyright (C) Nginx, Inc. + */ + + +#ifndef _NGX_STREAM_H_INCLUDED_ +#define _NGX_STREAM_H_INCLUDED_ + + +#include +#include + +#if (NGX_STREAM_SSL) +#include +#endif + + +typedef struct ngx_stream_session_s ngx_stream_session_t; + + +#include +#include + + +typedef struct { + void **main_conf; + void **srv_conf; +} ngx_stream_conf_ctx_t; + + +typedef struct { + u_char sockaddr[NGX_SOCKADDRLEN]; + socklen_t socklen; + + /* server ctx */ + ngx_stream_conf_ctx_t *ctx; + + unsigned bind:1; + unsigned wildcard:1; +#if (NGX_STREAM_SSL) + unsigned ssl:1; +#endif +#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) + unsigned ipv6only:1; +#endif + unsigned so_keepalive:2; +#if (NGX_HAVE_KEEPALIVE_TUNABLE) + int tcp_keepidle; + int tcp_keepintvl; + int tcp_keepcnt; +#endif +} ngx_stream_listen_t; + + +typedef struct { + ngx_stream_conf_ctx_t *ctx; + ngx_str_t addr_text; +#if (NGX_STREAM_SSL) + ngx_uint_t ssl; /* unsigned ssl:1; */ +#endif +} ngx_stream_addr_conf_t; + +typedef struct { + in_addr_t addr; + ngx_stream_addr_conf_t conf; +} ngx_stream_in_addr_t; + + +#if (NGX_HAVE_INET6) + +typedef struct { + struct in6_addr addr6; + ngx_stream_addr_conf_t conf; +} ngx_stream_in6_addr_t; + +#endif + + +typedef struct { + /* ngx_stream_in_addr_t or ngx_stream_in6_addr_t */ + void *addrs; + ngx_uint_t naddrs; +} ngx_stream_port_t; + + +typedef struct { + int family; + in_port_t port; + ngx_array_t addrs; /* array of ngx_stream_conf_addr_t */ +} ngx_stream_conf_port_t; + + +typedef struct { + struct sockaddr *sockaddr; + socklen_t socklen; + + ngx_stream_conf_ctx_t *ctx; + + unsigned bind:1; + unsigned wildcard:1; +#if (NGX_STREAM_SSL) + unsigned ssl:1; +#endif +#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) + unsigned ipv6only:1; +#endif + unsigned so_keepalive:2; +#if (NGX_HAVE_KEEPALIVE_TUNABLE) + int tcp_keepidle; + int tcp_keepintvl; + int tcp_keepcnt; +#endif +} ngx_stream_conf_addr_t; + + +typedef struct { + ngx_array_t servers; /* ngx_stream_core_srv_conf_t */ + ngx_array_t listen; /* ngx_stream_listen_t */ +} ngx_stream_core_main_conf_t; + + +typedef void (*ngx_stream_handler_pt)(ngx_stream_session_t *s); + + +typedef struct { + ngx_stream_handler_pt handler; + ngx_stream_conf_ctx_t *ctx; + u_char *file_name; + ngx_int_t line; + ngx_log_t *error_log; +} ngx_stream_core_srv_conf_t; + + +struct ngx_stream_session_s { + uint32_t signature; /* "STRM" */ + + ngx_connection_t *connection; + + off_t received; + + ngx_log_handler_pt log_handler; + + void **ctx; + void **main_conf; + void **srv_conf; + + ngx_stream_upstream_t *upstream; +}; + + +typedef struct { + void *(*create_main_conf)(ngx_conf_t *cf); + char *(*init_main_conf)(ngx_conf_t *cf, void *conf); + + void *(*create_srv_conf)(ngx_conf_t *cf); + char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev, + void *conf); +} ngx_stream_module_t; + + +#define NGX_STREAM_MODULE 0x4d525453 /* "STRM" */ + +#define NGX_STREAM_MAIN_CONF 0x02000000 +#define NGX_STREAM_SRV_CONF 0x04000000 +#define NGX_STREAM_UPS_CONF 0x08000000 + + +#define NGX_STREAM_MAIN_CONF_OFFSET offsetof(ngx_stream_conf_ctx_t, main_conf) +#define NGX_STREAM_SRV_CONF_OFFSET offsetof(ngx_stream_conf_ctx_t, srv_conf) + + +#define ngx_stream_get_module_ctx(s, module) (s)->ctx[module.ctx_index] +#define ngx_stream_set_ctx(s, c, module) s->ctx[module.ctx_index] = c; +#define ngx_stream_delete_ctx(s, module) s->ctx[module.ctx_index] = NULL; + + +#define ngx_stream_get_module_main_conf(s, module) \ + (s)->main_conf[module.ctx_index] +#define ngx_stream_get_module_srv_conf(s, module) \ + (s)->srv_conf[module.ctx_index] + +#define ngx_stream_conf_get_module_main_conf(cf, module) \ + ((ngx_stream_conf_ctx_t *) cf->ctx)->main_conf[module.ctx_index] +#define ngx_stream_conf_get_module_srv_conf(cf, module) \ + ((ngx_stream_conf_ctx_t *) cf->ctx)->srv_conf[module.ctx_index] + +#define ngx_stream_cycle_get_module_main_conf(cycle, module) \ + (cycle->conf_ctx[ngx_stream_module.index] ? \ + ((ngx_stream_conf_ctx_t *) cycle->conf_ctx[ngx_stream_module.index]) \ + ->main_conf[module.ctx_index]: \ + NULL) + +#define ngx_stream_set_connection_log(c, l) \ + \ + c->log->file = l->file; \ + c->log->next = l->next; \ + c->log->writer = l->writer; \ + c->log->wdata = l->wdata; \ + if (!(c->log->log_level & NGX_LOG_DEBUG_CONNECTION)) { \ + c->log->log_level = l->log_level; \ + } + + +void ngx_stream_init_connection(ngx_connection_t *c); +void ngx_stream_close_connection(ngx_connection_t *c); + + +extern ngx_module_t ngx_stream_module; +extern ngx_uint_t ngx_stream_max_module; +extern ngx_module_t ngx_stream_core_module; + + +#endif /* _NGX_STREAM_H_INCLUDED_ */ diff --git a/src/stream/ngx_stream_core_module.c b/src/stream/ngx_stream_core_module.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_core_module.c @@ -0,0 +1,495 @@ + +/* + * Copyright (C) Roman Arutyunyan + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +static void *ngx_stream_core_create_main_conf(ngx_conf_t *cf); +static void *ngx_stream_core_create_srv_conf(ngx_conf_t *cf); +static char *ngx_stream_core_merge_srv_conf(ngx_conf_t *cf, void *parent, + void *child); +static char *ngx_stream_core_error_log(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); +static char *ngx_stream_core_server(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); +static char *ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +static ngx_command_t ngx_stream_core_commands[] = { + + { ngx_string("server"), + NGX_STREAM_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, + ngx_stream_core_server, + 0, + 0, + NULL }, + + { ngx_string("listen"), + NGX_STREAM_SRV_CONF|NGX_CONF_1MORE, + ngx_stream_core_listen, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("error_log"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_1MORE, + ngx_stream_core_error_log, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_stream_module_t ngx_stream_core_module_ctx = { + ngx_stream_core_create_main_conf, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_stream_core_create_srv_conf, /* create server configuration */ + ngx_stream_core_merge_srv_conf /* merge server configuration */ +}; + + +ngx_module_t ngx_stream_core_module = { + NGX_MODULE_V1, + &ngx_stream_core_module_ctx, /* module context */ + ngx_stream_core_commands, /* module directives */ + NGX_STREAM_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void * +ngx_stream_core_create_main_conf(ngx_conf_t *cf) +{ + ngx_stream_core_main_conf_t *cmcf; + + cmcf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_core_main_conf_t)); + if (cmcf == NULL) { + return NULL; + } + + if (ngx_array_init(&cmcf->servers, cf->pool, 4, + sizeof(ngx_stream_core_srv_conf_t *)) + != NGX_OK) + { + return NULL; + } + + if (ngx_array_init(&cmcf->listen, cf->pool, 4, sizeof(ngx_stream_listen_t)) + != NGX_OK) + { + return NULL; + } + + return cmcf; +} + + +static void * +ngx_stream_core_create_srv_conf(ngx_conf_t *cf) +{ + ngx_stream_core_srv_conf_t *cscf; + + cscf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_core_srv_conf_t)); + if (cscf == NULL) { + return NULL; + } + + /* + * set by ngx_pcalloc(): + * + * cscf->handler = NULL; + * cscf->error_log = NULL; + */ + + cscf->file_name = cf->conf_file->file.name.data; + cscf->line = cf->conf_file->line; + + return cscf; +} + + +static char * +ngx_stream_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_stream_core_srv_conf_t *prev = parent; + ngx_stream_core_srv_conf_t *conf = child; + + if (conf->handler == NULL) { + ngx_log_error(NGX_LOG_EMERG, cf->log, 0, + "no handler for server in %s:%ui", + conf->file_name, conf->line); + return NGX_CONF_ERROR; + } + + if (conf->error_log == NULL) { + if (prev->error_log) { + conf->error_log = prev->error_log; + } else { + conf->error_log = &cf->cycle->new_log; + } + } + + return NGX_CONF_OK; +} + + +static char * +ngx_stream_core_error_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_stream_core_srv_conf_t *cscf = conf; + + return ngx_log_set_log(cf, &cscf->error_log); +} + + +static char * +ngx_stream_core_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + char *rv; + void *mconf; + ngx_uint_t m; + ngx_conf_t pcf; + ngx_stream_module_t *module; + ngx_stream_conf_ctx_t *ctx, *stream_ctx; + ngx_stream_core_srv_conf_t *cscf, **cscfp; + ngx_stream_core_main_conf_t *cmcf; + + ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t)); + if (ctx == NULL) { + return NGX_CONF_ERROR; + } + + stream_ctx = cf->ctx; + ctx->main_conf = stream_ctx->main_conf; + + /* the server{}'s srv_conf */ + + ctx->srv_conf = ngx_pcalloc(cf->pool, + sizeof(void *) * ngx_stream_max_module); + if (ctx->srv_conf == NULL) { + return NGX_CONF_ERROR; + } + + for (m = 0; ngx_modules[m]; m++) { + if (ngx_modules[m]->type != NGX_STREAM_MODULE) { + continue; + } + + module = ngx_modules[m]->ctx; + + if (module->create_srv_conf) { + mconf = module->create_srv_conf(cf); + if (mconf == NULL) { + return NGX_CONF_ERROR; + } + + ctx->srv_conf[ngx_modules[m]->ctx_index] = mconf; + } + } + + /* the server configuration context */ + + cscf = ctx->srv_conf[ngx_stream_core_module.ctx_index]; + cscf->ctx = ctx; + + cmcf = ctx->main_conf[ngx_stream_core_module.ctx_index]; + + cscfp = ngx_array_push(&cmcf->servers); + if (cscfp == NULL) { + return NGX_CONF_ERROR; + } + + *cscfp = cscf; + + + /* parse inside server{} */ + + pcf = *cf; + cf->ctx = ctx; + cf->cmd_type = NGX_STREAM_SRV_CONF; + + rv = ngx_conf_parse(cf, NULL); + + *cf = pcf; + + return rv; +} + + +static char * +ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + size_t len, off; + in_port_t port; + ngx_str_t *value; + ngx_url_t u; + ngx_uint_t i; + struct sockaddr *sa; + struct sockaddr_in *sin; + ngx_stream_listen_t *ls; + ngx_stream_core_main_conf_t *cmcf; +#if (NGX_HAVE_INET6) + struct sockaddr_in6 *sin6; +#endif + + value = cf->args->elts; + + ngx_memzero(&u, sizeof(ngx_url_t)); + + u.url = value[1]; + u.listen = 1; + + if (ngx_parse_url(cf->pool, &u) != NGX_OK) { + if (u.err) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "%s in \"%V\" of the \"listen\" directive", + u.err, &u.url); + } + + return NGX_CONF_ERROR; + } + + cmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_core_module); + + ls = cmcf->listen.elts; + + for (i = 0; i < cmcf->listen.nelts; i++) { + + sa = (struct sockaddr *) ls[i].sockaddr; + + if (sa->sa_family != u.family) { + continue; + } + + switch (sa->sa_family) { + +#if (NGX_HAVE_INET6) + case AF_INET6: + off = offsetof(struct sockaddr_in6, sin6_addr); + len = 16; + sin6 = (struct sockaddr_in6 *) sa; + port = sin6->sin6_port; + break; +#endif + +#if (NGX_HAVE_UNIX_DOMAIN) + case AF_UNIX: + off = offsetof(struct sockaddr_un, sun_path); + len = sizeof(((struct sockaddr_un *) sa)->sun_path); + port = 0; + break; +#endif + + default: /* AF_INET */ + off = offsetof(struct sockaddr_in, sin_addr); + len = 4; + sin = (struct sockaddr_in *) sa; + port = sin->sin_port; + break; + } + + if (ngx_memcmp(ls[i].sockaddr + off, u.sockaddr + off, len) != 0) { + continue; + } + + if (port != u.port) { + continue; + } + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "duplicate \"%V\" address and port pair", &u.url); + return NGX_CONF_ERROR; + } + + ls = ngx_array_push(&cmcf->listen); + if (ls == NULL) { + return NGX_CONF_ERROR; + } + + ngx_memzero(ls, sizeof(ngx_stream_listen_t)); + + ngx_memcpy(ls->sockaddr, u.sockaddr, u.socklen); + + ls->socklen = u.socklen; + ls->wildcard = u.wildcard; + ls->ctx = cf->ctx; + +#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) + ls->ipv6only = 1; +#endif + + for (i = 2; i < cf->args->nelts; i++) { + + if (ngx_strcmp(value[i].data, "bind") == 0) { + ls->bind = 1; + continue; + } + + if (ngx_strncmp(value[i].data, "ipv6only=o", 10) == 0) { +#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) + struct sockaddr *sa; + u_char buf[NGX_SOCKADDR_STRLEN]; + + sa = (struct sockaddr *) ls->sockaddr; + + if (sa->sa_family == AF_INET6) { + + if (ngx_strcmp(&value[i].data[10], "n") == 0) { + ls->ipv6only = 1; + + } else if (ngx_strcmp(&value[i].data[10], "ff") == 0) { + ls->ipv6only = 0; + + } else { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid ipv6only flags \"%s\"", + &value[i].data[9]); + return NGX_CONF_ERROR; + } + + ls->bind = 1; + + } else { + len = ngx_sock_ntop(sa, ls->socklen, buf, + NGX_SOCKADDR_STRLEN, 1); + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "ipv6only is not supported " + "on addr \"%*s\", ignored", len, buf); + } + + continue; +#else + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "bind ipv6only is not supported " + "on this platform"); + return NGX_CONF_ERROR; +#endif + } + + if (ngx_strcmp(value[i].data, "ssl") == 0) { +#if (NGX_STREAM_SSL) + ls->ssl = 1; + continue; +#else + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "the \"ssl\" parameter requires " + "ngx_stream_ssl_module"); + return NGX_CONF_ERROR; +#endif + } + + if (ngx_strncmp(value[i].data, "so_keepalive=", 13) == 0) { + + if (ngx_strcmp(&value[i].data[13], "on") == 0) { + ls->so_keepalive = 1; + + } else if (ngx_strcmp(&value[i].data[13], "off") == 0) { + ls->so_keepalive = 2; + + } else { + +#if (NGX_HAVE_KEEPALIVE_TUNABLE) + u_char *p, *end; + ngx_str_t s; + + end = value[i].data + value[i].len; + s.data = value[i].data + 13; + + p = ngx_strlchr(s.data, end, ':'); + if (p == NULL) { + p = end; + } + + if (p > s.data) { + s.len = p - s.data; + + ls->tcp_keepidle = ngx_parse_time(&s, 1); + if (ls->tcp_keepidle == (time_t) NGX_ERROR) { + goto invalid_so_keepalive; + } + } + + s.data = (p < end) ? (p + 1) : end; + + p = ngx_strlchr(s.data, end, ':'); + if (p == NULL) { + p = end; + } + + if (p > s.data) { + s.len = p - s.data; + + ls->tcp_keepintvl = ngx_parse_time(&s, 1); + if (ls->tcp_keepintvl == (time_t) NGX_ERROR) { + goto invalid_so_keepalive; + } + } + + s.data = (p < end) ? (p + 1) : end; + + if (s.data < end) { + s.len = end - s.data; + + ls->tcp_keepcnt = ngx_atoi(s.data, s.len); + if (ls->tcp_keepcnt == NGX_ERROR) { + goto invalid_so_keepalive; + } + } + + if (ls->tcp_keepidle == 0 && ls->tcp_keepintvl == 0 + && ls->tcp_keepcnt == 0) + { + goto invalid_so_keepalive; + } + + ls->so_keepalive = 1; + +#else + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "the \"so_keepalive\" parameter accepts " + "only \"on\" or \"off\" on this platform"); + return NGX_CONF_ERROR; + +#endif + } + + ls->bind = 1; + + continue; + +#if (NGX_HAVE_KEEPALIVE_TUNABLE) + invalid_so_keepalive: + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid so_keepalive value: \"%s\"", + &value[i].data[13]); + return NGX_CONF_ERROR; +#endif + } + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "the invalid \"%V\" parameter", &value[i]); + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} diff --git a/src/stream/ngx_stream_handler.c b/src/stream/ngx_stream_handler.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_handler.c @@ -0,0 +1,296 @@ + +/* + * Copyright (C) Roman Arutyunyan + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include +#include + + +static u_char *ngx_stream_log_error(ngx_log_t *log, u_char *buf, size_t len); +static void ngx_stream_init_session(ngx_connection_t *c); + +#if (NGX_STREAM_SSL) +static void ngx_stream_ssl_init_connection(ngx_ssl_t *ssl, ngx_connection_t *c); +static void ngx_stream_ssl_handshake_handler(ngx_connection_t *c); +#endif + + +void +ngx_stream_init_connection(ngx_connection_t *c) +{ + u_char text[NGX_SOCKADDR_STRLEN]; + size_t len; + ngx_uint_t i; + struct sockaddr *sa; + ngx_stream_port_t *port; + struct sockaddr_in *sin; + ngx_stream_in_addr_t *addr; + ngx_stream_session_t *s; + ngx_stream_addr_conf_t *addr_conf; +#if (NGX_HAVE_INET6) + struct sockaddr_in6 *sin6; + ngx_stream_in6_addr_t *addr6; +#endif + ngx_stream_core_srv_conf_t *cscf; + + /* find the server configuration for the address:port */ + + port = c->listening->servers; + + if (port->naddrs > 1) { + + /* + * There are several addresses on this port and one of them + * is the "*:port" wildcard so getsockname() is needed to determine + * the server address. + * + * AcceptEx() already gave this address. + */ + + if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { + ngx_stream_close_connection(c); + return; + } + + sa = c->local_sockaddr; + + switch (sa->sa_family) { + +#if (NGX_HAVE_INET6) + case AF_INET6: + sin6 = (struct sockaddr_in6 *) sa; + + addr6 = port->addrs; + + /* the last address is "*" */ + + for (i = 0; i < port->naddrs - 1; i++) { + if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) { + break; + } + } + + addr_conf = &addr6[i].conf; + + break; +#endif + + default: /* AF_INET */ + sin = (struct sockaddr_in *) sa; + + addr = port->addrs; + + /* the last address is "*" */ + + for (i = 0; i < port->naddrs - 1; i++) { + if (addr[i].addr == sin->sin_addr.s_addr) { + break; + } + } + + addr_conf = &addr[i].conf; + + break; + } + + } else { + switch (c->local_sockaddr->sa_family) { + +#if (NGX_HAVE_INET6) + case AF_INET6: + addr6 = port->addrs; + addr_conf = &addr6[0].conf; + break; +#endif + + default: /* AF_INET */ + addr = port->addrs; + addr_conf = &addr[0].conf; + break; + } + } + + s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t)); + if (s == NULL) { + ngx_stream_close_connection(c); + return; + } + + s->signature = NGX_STREAM_MODULE; + s->main_conf = addr_conf->ctx->main_conf; + s->srv_conf = addr_conf->ctx->srv_conf; + + s->connection = c; + c->data = s; + + cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); + + ngx_stream_set_connection_log(c, cscf->error_log); + + len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1); + + ngx_log_error(NGX_LOG_INFO, c->log, 0, "*%uA client %*s connected to %V", + c->number, len, text, &addr_conf->addr_text); + + c->log->connection = c->number; + c->log->handler = ngx_stream_log_error; + c->log->data = s; + c->log->action = "initializing connection"; + c->log_error = NGX_ERROR_INFO; + +#if (NGX_STREAM_SSL) + { + ngx_stream_ssl_conf_t *sslcf; + + sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module); + + if (addr_conf->ssl) { + c->log->action = "SSL handshaking"; + + if (sslcf->ssl.ctx == NULL) { + ngx_log_error(NGX_LOG_ERR, c->log, 0, + "no \"ssl_certificate\" is defined " + "in server listening on SSL port"); + ngx_stream_close_connection(c); + return; + } + + ngx_stream_ssl_init_connection(&sslcf->ssl, c); + return; + } + } +#endif + + ngx_stream_init_session(c); +} + + +static void +ngx_stream_init_session(ngx_connection_t *c) +{ + ngx_stream_session_t *s; + ngx_stream_core_srv_conf_t *cscf; + + s = c->data; + c->log->action = "handling client connection"; + + cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); + + s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module); + if (s->ctx == NULL) { + ngx_stream_close_connection(c); + return; + } + + cscf->handler(s); +} + + +#if (NGX_STREAM_SSL) + +static void +ngx_stream_ssl_init_connection(ngx_ssl_t *ssl, ngx_connection_t *c) +{ + ngx_stream_session_t *s; + ngx_stream_ssl_conf_t *sslcf; + + if (ngx_ssl_create_connection(ssl, c, 0) == NGX_ERROR) { + ngx_stream_close_connection(c); + return; + } + + if (ngx_ssl_handshake(c) == NGX_AGAIN) { + + s = c->data; + + sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module); + + ngx_add_timer(c->read, sslcf->handshake_timeout); + + c->ssl->handler = ngx_stream_ssl_handshake_handler; + + return; + } + + ngx_stream_ssl_handshake_handler(c); +} + + +static void +ngx_stream_ssl_handshake_handler(ngx_connection_t *c) +{ + if (!c->ssl->handshaked) { + ngx_stream_close_connection(c); + return; + } + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + + ngx_stream_init_session(c); +} + +#endif + + +void +ngx_stream_close_connection(ngx_connection_t *c) +{ + ngx_pool_t *pool; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, + "close stream connection: %d", c->fd); + +#if (NGX_STREAM_SSL) + + if (c->ssl) { + if (ngx_ssl_shutdown(c) == NGX_AGAIN) { + c->ssl->handler = ngx_stream_close_connection; + return; + } + } + +#endif + +#if (NGX_STAT_STUB) + (void) ngx_atomic_fetch_add(ngx_stat_active, -1); +#endif + + pool = c->pool; + + ngx_close_connection(c); + + ngx_destroy_pool(pool); +} + + +static u_char * +ngx_stream_log_error(ngx_log_t *log, u_char *buf, size_t len) +{ + u_char *p; + ngx_stream_session_t *s; + + if (log->action) { + p = ngx_snprintf(buf, len, " while %s", log->action); + len -= p - buf; + buf = p; + } + + s = log->data; + + p = ngx_snprintf(buf, len, ", client: %V, server: %V", + &s->connection->addr_text, + &s->connection->listening->addr_text); + + if (s->log_handler) { + return s->log_handler(log, p, len); + } + + return p; +} diff --git a/src/stream/ngx_stream_proxy_module.c b/src/stream/ngx_stream_proxy_module.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_proxy_module.c @@ -0,0 +1,1288 @@ + +/* + * Copyright (C) Roman Arutyunyan + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +typedef void (*ngx_stream_proxy_handler_pt)(ngx_stream_session_t *s); + + +typedef struct { + ngx_msec_t connect_timeout; + ngx_msec_t timeout; + ngx_msec_t next_upstream_timeout; + size_t downstream_buf_size; + size_t upstream_buf_size; + ngx_uint_t next_upstream_tries; + ngx_flag_t next_upstream; + +#if (NGX_STREAM_SSL) + ngx_flag_t ssl_enable; + ngx_flag_t ssl_session_reuse; + ngx_uint_t ssl_protocols; + ngx_str_t ssl_ciphers; + ngx_str_t ssl_name; + ngx_flag_t ssl_server_name; + + ngx_flag_t ssl_verify; + ngx_uint_t ssl_verify_depth; + ngx_str_t ssl_trusted_certificate; + ngx_str_t ssl_crl; + ngx_str_t ssl_certificate; + ngx_str_t ssl_certificate_key; + ngx_array_t *ssl_passwords; + + ngx_ssl_t *ssl; +#endif + + ngx_stream_upstream_srv_conf_t *upstream; +} ngx_stream_proxy_srv_conf_t; + + +static void ngx_stream_proxy_handler(ngx_stream_session_t *s); +static void ngx_stream_proxy_connect(ngx_stream_session_t *s); +static void ngx_stream_proxy_init_upstream(ngx_stream_session_t *s); +static void ngx_stream_proxy_upstream_handler(ngx_event_t *ev); +static void ngx_stream_proxy_downstream_handler(ngx_event_t *ev); +static void ngx_stream_proxy_connect_handler(ngx_event_t *ev); +static ngx_int_t ngx_stream_proxy_test_connect(ngx_connection_t *c); +static ngx_int_t ngx_stream_proxy_process(ngx_stream_session_t *s, + ngx_uint_t from_upstream, ngx_uint_t do_write); +static void ngx_stream_proxy_next_upstream(ngx_stream_session_t *s); +static void ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_int_t rc); +static u_char *ngx_stream_proxy_log_error(ngx_log_t *log, u_char *buf, + size_t len); + +static void *ngx_stream_proxy_create_srv_conf(ngx_conf_t *cf); +static char *ngx_stream_proxy_merge_srv_conf(ngx_conf_t *cf, void *parent, + void *child); +static char *ngx_stream_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + +#if (NGX_STREAM_SSL) + +static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, + ngx_command_t *cmd, void *conf); +static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s); +static void ngx_stream_proxy_ssl_handshake(ngx_connection_t *pc); +static ngx_int_t ngx_stream_proxy_ssl_name(ngx_stream_session_t *s); +static ngx_int_t ngx_stream_proxy_set_ssl(ngx_conf_t *cf, + ngx_stream_proxy_srv_conf_t *pscf); + + +static ngx_conf_bitmask_t ngx_stream_proxy_ssl_protocols[] = { + { ngx_string("SSLv2"), NGX_SSL_SSLv2 }, + { ngx_string("SSLv3"), NGX_SSL_SSLv3 }, + { ngx_string("TLSv1"), NGX_SSL_TLSv1 }, + { ngx_string("TLSv1.1"), NGX_SSL_TLSv1_1 }, + { ngx_string("TLSv1.2"), NGX_SSL_TLSv1_2 }, + { ngx_null_string, 0 } +}; + +#endif + + +static ngx_command_t ngx_stream_proxy_commands[] = { + + { ngx_string("proxy_pass"), + NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_stream_proxy_pass, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("proxy_connect_timeout"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, connect_timeout), + NULL }, + + { ngx_string("proxy_timeout"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, timeout), + NULL }, + + { ngx_string("proxy_downstream_buffer"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, downstream_buf_size), + NULL }, + + { ngx_string("proxy_upstream_buffer"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, upstream_buf_size), + NULL }, + + { ngx_string("proxy_next_upstream"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, next_upstream), + NULL }, + + { ngx_string("proxy_next_upstream_tries"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, next_upstream_tries), + NULL }, + + { ngx_string("proxy_next_upstream_timeout"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, next_upstream_timeout), + NULL }, + +#if (NGX_STREAM_SSL) + + { ngx_string("proxy_ssl"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_enable), + NULL }, + + { ngx_string("proxy_ssl_session_reuse"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_session_reuse), + NULL }, + + { ngx_string("proxy_ssl_protocols"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_1MORE, + ngx_conf_set_bitmask_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_protocols), + &ngx_stream_proxy_ssl_protocols }, + + { ngx_string("proxy_ssl_ciphers"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_ciphers), + NULL }, + + { ngx_string("proxy_ssl_name"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_name), + NULL }, + + { ngx_string("proxy_ssl_server_name"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_server_name), + NULL }, + + { ngx_string("proxy_ssl_verify"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_verify), + NULL }, + + { ngx_string("proxy_ssl_verify_depth"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_verify_depth), + NULL }, + + { ngx_string("proxy_ssl_trusted_certificate"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_trusted_certificate), + NULL }, + + { ngx_string("proxy_ssl_crl"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_crl), + NULL }, + + { ngx_string("proxy_ssl_certificate"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_certificate), + NULL }, + + { ngx_string("proxy_ssl_certificate_key"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, ssl_certificate_key), + NULL }, + + { ngx_string("proxy_ssl_password_file"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_stream_proxy_ssl_password_file, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + NULL }, + +#endif + + ngx_null_command +}; + + +static ngx_stream_module_t ngx_stream_proxy_module_ctx = { + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_stream_proxy_create_srv_conf, /* create server configuration */ + ngx_stream_proxy_merge_srv_conf /* merge server configuration */ +}; + + +ngx_module_t ngx_stream_proxy_module = { + NGX_MODULE_V1, + &ngx_stream_proxy_module_ctx, /* module context */ + ngx_stream_proxy_commands, /* module directives */ + NGX_STREAM_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void +ngx_stream_proxy_handler(ngx_stream_session_t *s) +{ + u_char *p; + ngx_connection_t *c; + ngx_stream_upstream_t *u; + ngx_stream_proxy_srv_conf_t *pscf; + ngx_stream_upstream_srv_conf_t *uscf; + + c = s->connection; + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, + "proxy connection handler"); + + u = ngx_pcalloc(c->pool, sizeof(ngx_stream_upstream_t)); + if (u == NULL) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + s->upstream = u; + + s->log_handler = ngx_stream_proxy_log_error; + + u->peer.log = c->log; + u->peer.log_error = NGX_ERROR_ERR; + + uscf = pscf->upstream; + + if (uscf->peer.init(s, uscf) != NGX_OK) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + u->peer.start_time = ngx_current_msec; + + if (pscf->next_upstream_tries + && u->peer.tries > pscf->next_upstream_tries) + { + u->peer.tries = pscf->next_upstream_tries; + } + + p = ngx_pnalloc(c->pool, pscf->downstream_buf_size); + if (p == NULL) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + u->downstream_buf.start = p; + u->downstream_buf.end = p + pscf->downstream_buf_size; + u->downstream_buf.pos = p; + u->downstream_buf.last = p; + + c->write->handler = ngx_stream_proxy_downstream_handler; + c->read->handler = ngx_stream_proxy_downstream_handler; + + if (ngx_stream_proxy_process(s, 0, 0) != NGX_OK) { + return; + } + + ngx_stream_proxy_connect(s); +} + + +static void +ngx_stream_proxy_connect(ngx_stream_session_t *s) +{ + ngx_int_t rc; + ngx_connection_t *c, *pc; + ngx_stream_upstream_t *u; + ngx_stream_proxy_srv_conf_t *pscf; + + c = s->connection; + + c->log->action = "connecting to upstream"; + + u = s->upstream; + + rc = ngx_event_connect_peer(&u->peer); + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "proxy connect: %i", rc); + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + + if (rc == NGX_ERROR) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + if (rc == NGX_BUSY) { + ngx_log_error(NGX_LOG_ERR, c->log, 0, "no live upstreams"); + ngx_stream_proxy_finalize(s, NGX_DECLINED); + return; + } + + if (rc == NGX_DECLINED) { + ngx_stream_proxy_next_upstream(s); + return; + } + + /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */ + + pc = u->peer.connection; + + pc->data = s; + pc->log = c->log; + pc->pool = c->pool; + pc->read->log = c->log; + pc->write->log = c->log; + + if (rc != NGX_AGAIN) { + ngx_stream_proxy_init_upstream(s); + return; + } + + pc->read->handler = ngx_stream_proxy_connect_handler; + pc->write->handler = ngx_stream_proxy_connect_handler; + + ngx_add_timer(pc->write, pscf->connect_timeout); +} + + +static void +ngx_stream_proxy_init_upstream(ngx_stream_session_t *s) +{ + u_char *p; + ngx_connection_t *c, *pc; + ngx_log_handler_pt handler; + ngx_stream_upstream_t *u; + ngx_stream_proxy_srv_conf_t *pscf; + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + + u = s->upstream; + + pc = u->peer.connection; + +#if (NGX_STREAM_SSL) + if (pscf->ssl && pc->ssl == NULL) { + ngx_stream_proxy_ssl_init_connection(s); + return; + } +#endif + + c = s->connection; + + if (c->log->log_level >= NGX_LOG_INFO) { + ngx_str_t s; + u_char addr[NGX_SOCKADDR_STRLEN]; + + s.len = NGX_SOCKADDR_STRLEN; + s.data = addr; + + if (ngx_connection_local_sockaddr(pc, &s, 1) == NGX_OK) { + handler = c->log->handler; + c->log->handler = NULL; + + ngx_log_error(NGX_LOG_INFO, c->log, 0, "proxy %V connected to %V", + &s, u->peer.name); + + c->log->handler = handler; + } + } + + c->log->action = "proxying connection"; + + p = ngx_pnalloc(c->pool, pscf->upstream_buf_size); + if (p == NULL) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + u->upstream_buf.start = p; + u->upstream_buf.end = p + pscf->upstream_buf_size; + u->upstream_buf.pos = p; + u->upstream_buf.last = p; + + pc->read->handler = ngx_stream_proxy_upstream_handler; + pc->write->handler = ngx_stream_proxy_upstream_handler; + + if (ngx_stream_proxy_process(s, 1, 0) != NGX_OK) { + return; + } + + ngx_stream_proxy_process(s, 0, 1); +} + + +#if (NGX_STREAM_SSL) + +static char * +ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf) +{ + ngx_stream_proxy_srv_conf_t *pscf = conf; + + ngx_str_t *value; + + if (pscf->ssl_passwords != NGX_CONF_UNSET_PTR) { + return "is duplicate"; + } + + value = cf->args->elts; + + pscf->ssl_passwords = ngx_ssl_read_password_file(cf, &value[1]); + + if (pscf->ssl_passwords == NULL) { + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} + + +static void +ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s) +{ + ngx_int_t rc; + ngx_connection_t *pc; + ngx_stream_upstream_t *u; + ngx_stream_proxy_srv_conf_t *pscf; + + u = s->upstream; + + pc = u->peer.connection; + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + + if (ngx_ssl_create_connection(pscf->ssl, pc, NGX_SSL_BUFFER|NGX_SSL_CLIENT) + != NGX_OK) + { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + if (pscf->ssl_server_name || pscf->ssl_verify) { + if (ngx_stream_proxy_ssl_name(s) != NGX_OK) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + } + + if (pscf->ssl_session_reuse) { + if (u->peer.set_session(&u->peer, u->peer.data) != NGX_OK) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + } + + s->connection->log->action = "SSL handshaking to upstream"; + + rc = ngx_ssl_handshake(pc); + + if (rc == NGX_AGAIN) { + + if (!pc->write->timer_set) { + ngx_add_timer(pc->write, pscf->connect_timeout); + } + + pc->ssl->handler = ngx_stream_proxy_ssl_handshake; + return; + } + + ngx_stream_proxy_ssl_handshake(pc); +} + + +static void +ngx_stream_proxy_ssl_handshake(ngx_connection_t *pc) +{ + long rc; + ngx_stream_session_t *s; + ngx_stream_upstream_t *u; + ngx_stream_proxy_srv_conf_t *pscf; + + s = pc->data; + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + + if (pc->ssl->handshaked) { + + if (pscf->ssl_verify) { + rc = SSL_get_verify_result(pc->ssl->connection); + + if (rc != X509_V_OK) { + ngx_log_error(NGX_LOG_ERR, pc->log, 0, + "upstream SSL certificate verify error: (%l:%s)", + rc, X509_verify_cert_error_string(rc)); + goto failed; + } + + u = s->upstream; + + if (ngx_ssl_check_host(pc, &u->ssl_name) != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, pc->log, 0, + "upstream SSL certificate does not match \"%V\"", + &u->ssl_name); + goto failed; + } + } + + if (pscf->ssl_session_reuse) { + u = s->upstream; + u->peer.save_session(&u->peer, u->peer.data); + } + + ngx_stream_proxy_init_upstream(s); + + return; + } + +failed: + + ngx_stream_proxy_next_upstream(s); +} + + +static ngx_int_t +ngx_stream_proxy_ssl_name(ngx_stream_session_t *s) +{ + u_char *p, *last; + ngx_str_t name; + ngx_stream_upstream_t *u; + ngx_stream_proxy_srv_conf_t *pscf; + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + + u = s->upstream; + + name = pscf->ssl_name; + + if (name.len == 0) { + name = pscf->upstream->host; + } + + if (name.len == 0) { + goto done; + } + + /* + * ssl name here may contain port, strip it for compatibility + * with the http module + */ + + p = name.data; + last = name.data + name.len; + + if (*p == '[') { + p = ngx_strlchr(p, last, ']'); + + if (p == NULL) { + p = name.data; + } + } + + p = ngx_strlchr(p, last, ':'); + + if (p != NULL) { + name.len = p - name.data; + } + + if (!pscf->ssl_server_name) { + goto done; + } + +#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME + + /* as per RFC 6066, literal IPv4 and IPv6 addresses are not permitted */ + + if (name.len == 0 || *name.data == '[') { + goto done; + } + + if (ngx_inet_addr(name.data, name.len) != INADDR_NONE) { + goto done; + } + + /* + * SSL_set_tlsext_host_name() needs a null-terminated string, + * hence we explicitly null-terminate name here + */ + + p = ngx_pnalloc(s->connection->pool, name.len + 1); + if (p == NULL) { + return NGX_ERROR; + } + + (void) ngx_cpystrn(p, name.data, name.len + 1); + + name.data = p; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, + "upstream SSL server name: \"%s\"", name.data); + + if (SSL_set_tlsext_host_name(u->peer.connection->ssl->connection, name.data) + == 0) + { + ngx_ssl_error(NGX_LOG_ERR, s->connection->log, 0, + "SSL_set_tlsext_host_name(\"%s\") failed", name.data); + return NGX_ERROR; + } + +#endif + +done: + + u->ssl_name = name; + + return NGX_OK; +} + +#endif + + +static void +ngx_stream_proxy_downstream_handler(ngx_event_t *ev) +{ + ngx_connection_t *c; + ngx_stream_session_t *s; + ngx_stream_upstream_t *u; + + c = ev->data; + s = c->data; + + if (ev->timedout) { + ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); + ngx_stream_proxy_finalize(s, NGX_DECLINED); + return; + } + + u = s->upstream; + + if (!ev->write) { + ngx_stream_proxy_process(s, 0, 0); + + } else if (u->upstream_buf.start) { + ngx_stream_proxy_process(s, 1, 1); + } +} + + +static void +ngx_stream_proxy_upstream_handler(ngx_event_t *ev) +{ + ngx_connection_t *c; + ngx_stream_session_t *s; + ngx_stream_upstream_t *u; + + c = ev->data; + s = c->data; + + u = s->upstream; + + if (ev->write) { + ngx_stream_proxy_process(s, 0, 1); + + } else if (u->upstream_buf.start) { + ngx_stream_proxy_process(s, 1, 0); + } +} + + +static void +ngx_stream_proxy_connect_handler(ngx_event_t *ev) +{ + ngx_connection_t *c; + ngx_stream_session_t *s; + + c = ev->data; + s = c->data; + + if (ev->timedout) { + ngx_log_error(NGX_LOG_ERR, c->log, NGX_ETIMEDOUT, "upstream timed out"); + ngx_stream_proxy_next_upstream(s); + return; + } + + ngx_del_timer(c->write); + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream proxy connect upstream"); + + if (ngx_stream_proxy_test_connect(c) != NGX_OK) { + ngx_stream_proxy_next_upstream(s); + return; + } + + ngx_stream_proxy_init_upstream(s); +} + + +static ngx_int_t +ngx_stream_proxy_test_connect(ngx_connection_t *c) +{ + int err; + socklen_t len; + +#if (NGX_HAVE_KQUEUE) + + if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { + err = c->write->kq_errno ? c->write->kq_errno : c->read->kq_errno; + + if (err) { + (void) ngx_connection_error(c, err, + "kevent() reported that connect() failed"); + return NGX_ERROR; + } + + } else +#endif + { + err = 0; + len = sizeof(int); + + /* + * BSDs and Linux return 0 and set a pending error in err + * Solaris returns -1 and sets errno + */ + + if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len) + == -1) + { + err = ngx_socket_errno; + } + + if (err) { + (void) ngx_connection_error(c, err, "connect() failed"); + return NGX_ERROR; + } + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream, + ngx_uint_t do_write) +{ + size_t size; + ssize_t n; + ngx_buf_t *b; + ngx_connection_t *c, *pc, *src, *dst; + ngx_log_handler_pt handler; + ngx_stream_upstream_t *u; + ngx_stream_proxy_srv_conf_t *pscf; + + u = s->upstream; + + c = s->connection; + pc = u->upstream_buf.start ? u->peer.connection : NULL; + + if (from_upstream) { + src = pc; + dst = c; + b = &u->upstream_buf; + + } else { + src = c; + dst = pc; + b = &u->downstream_buf; + } + + for ( ;; ) { + + if (do_write) { + + size = b->last - b->pos; + + if (size && dst && dst->write->ready) { + + n = dst->send(dst, b->pos, size); + + if (n == NGX_ERROR) { + ngx_stream_proxy_finalize(s, NGX_DECLINED); + return NGX_ERROR; + } + + if (n > 0) { + b->pos += n; + + if (b->pos == b->last) { + b->pos = b->start; + b->last = b->start; + } + } + } + } + + size = b->end - b->last; + + if (size && src->read->ready) { + + n = src->recv(src, b->last, size); + + if (n == NGX_AGAIN || n == 0) { + break; + } + + if (n > 0) { + if (from_upstream) { + u->received += n; + + } else { + s->received += n; + } + + do_write = 1; + b->last += n; + continue; + } + + if (n == NGX_ERROR) { + src->read->eof = 1; + } + } + + break; + } + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + + if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) { + handler = c->log->handler; + c->log->handler = NULL; + + ngx_log_error(NGX_LOG_INFO, c->log, 0, + "%s disconnected" + ", bytes from/to client:%O/%O" + ", bytes from/to upstream:%O/%O", + from_upstream ? "upstream" : "client", + s->received, c->sent, u->received, pc ? pc->sent : 0); + + c->log->handler = handler; + + ngx_stream_proxy_finalize(s, NGX_OK); + return NGX_DONE; + } + + if (ngx_handle_read_event(src->read, 0) != NGX_OK) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return NGX_ERROR; + } + + if (dst) { + if (ngx_handle_write_event(dst->write, 0) != NGX_OK) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return NGX_ERROR; + } + + ngx_add_timer(c->read, pscf->timeout); + } + + return NGX_OK; +} + + +static void +ngx_stream_proxy_next_upstream(ngx_stream_session_t *s) +{ + ngx_msec_t timeout; + ngx_connection_t *pc; + ngx_stream_upstream_t *u; + ngx_stream_proxy_srv_conf_t *pscf; + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, + "stream proxy next upstream"); + + u = s->upstream; + + if (u->peer.sockaddr) { + u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED); + u->peer.sockaddr = NULL; + } + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + + timeout = pscf->next_upstream_timeout; + + if (u->peer.tries == 0 + || !pscf->next_upstream + || (timeout && ngx_current_msec - u->peer.start_time >= timeout)) + { + ngx_stream_proxy_finalize(s, NGX_DECLINED); + return; + } + + pc = u->peer.connection; + + if (pc) { + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, + "close proxy upstream connection: %d", pc->fd); + +#if (NGX_STREAM_SSL) + if (pc->ssl) { + pc->ssl->no_wait_shutdown = 1; + pc->ssl->no_send_shutdown = 1; + + (void) ngx_ssl_shutdown(pc); + } +#endif + + ngx_close_connection(pc); + u->peer.connection = NULL; + } + + ngx_stream_proxy_connect(s); +} + + +static void +ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_int_t rc) +{ + ngx_connection_t *pc; + ngx_stream_upstream_t *u; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, + "finalize stream proxy: %i", rc); + + u = s->upstream; + + if (u == NULL) { + goto noupstream; + } + + if (u->peer.free && u->peer.sockaddr) { + u->peer.free(&u->peer, u->peer.data, 0); + u->peer.sockaddr = NULL; + } + + pc = u->peer.connection; + + if (pc) { + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, + "close stream proxy upstream connection: %d", pc->fd); + +#if (NGX_STREAM_SSL) + if (pc->ssl) { + pc->ssl->no_wait_shutdown = 1; + (void) ngx_ssl_shutdown(pc); + } +#endif + + ngx_close_connection(pc); + u->peer.connection = NULL; + } + +noupstream: + + ngx_stream_close_connection(s->connection); +} + + +static u_char * +ngx_stream_proxy_log_error(ngx_log_t *log, u_char *buf, size_t len) +{ + u_char *p; + ngx_connection_t *pc; + ngx_stream_session_t *s; + ngx_stream_upstream_t *u; + + s = log->data; + + u = s->upstream; + + p = buf; + + if (u->peer.name) { + p = ngx_snprintf(p, len, ", upstream: \"%V\"", u->peer.name); + len -= p - buf; + } + + pc = u->peer.connection; + + p = ngx_snprintf(p, len, + ", bytes from/to client:%O/%O" + ", bytes from/to upstream:%O/%O", + s->received, s->connection->sent, + u->received, pc ? pc->sent : 0); + + return p; +} + + +static void * +ngx_stream_proxy_create_srv_conf(ngx_conf_t *cf) +{ + ngx_stream_proxy_srv_conf_t *conf; + + conf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_proxy_srv_conf_t)); + if (conf == NULL) { + return NULL; + } + + /* + * set by ngx_pcalloc(): + * + * conf->ssl_protocols = 0; + * conf->ssl_ciphers = { 0, NULL }; + * conf->ssl_name = { 0, NULL }; + * conf->ssl_trusted_certificate = { 0, NULL }; + * conf->ssl_crl = { 0, NULL }; + * conf->ssl_certificate = { 0, NULL }; + * conf->ssl_certificate_key = { 0, NULL }; + * + * conf->ssl = NULL; + * conf->upstream = NULL; + */ + + conf->connect_timeout = NGX_CONF_UNSET_MSEC; + conf->timeout = NGX_CONF_UNSET_MSEC; + conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC; + conf->downstream_buf_size = NGX_CONF_UNSET_SIZE; + conf->upstream_buf_size = NGX_CONF_UNSET_SIZE; + conf->next_upstream_tries = NGX_CONF_UNSET_UINT; + conf->next_upstream = NGX_CONF_UNSET; + +#if (NGX_STREAM_SSL) + conf->ssl_enable = NGX_CONF_UNSET; + conf->ssl_session_reuse = NGX_CONF_UNSET; + conf->ssl_server_name = NGX_CONF_UNSET; + conf->ssl_verify = NGX_CONF_UNSET; + conf->ssl_verify_depth = NGX_CONF_UNSET_UINT; + conf->ssl_passwords = NGX_CONF_UNSET_PTR; +#endif + + return conf; +} + + +static char * +ngx_stream_proxy_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_stream_proxy_srv_conf_t *prev = parent; + ngx_stream_proxy_srv_conf_t *conf = child; + + ngx_conf_merge_msec_value(conf->connect_timeout, + prev->connect_timeout, 60000); + + ngx_conf_merge_msec_value(conf->timeout, + prev->timeout, 10 * 60000); + + ngx_conf_merge_msec_value(conf->next_upstream_timeout, + prev->next_upstream_timeout, 0); + + ngx_conf_merge_size_value(conf->downstream_buf_size, + prev->downstream_buf_size, 16384); + + ngx_conf_merge_size_value(conf->upstream_buf_size, + prev->upstream_buf_size, 16384); + + ngx_conf_merge_uint_value(conf->next_upstream_tries, + prev->next_upstream_tries, 0); + + ngx_conf_merge_value(conf->next_upstream, prev->next_upstream, 1); + +#if (NGX_STREAM_SSL) + + ngx_conf_merge_value(conf->ssl_enable, prev->ssl_enable, 0); + + ngx_conf_merge_value(conf->ssl_session_reuse, + prev->ssl_session_reuse, 1); + + ngx_conf_merge_bitmask_value(conf->ssl_protocols, prev->ssl_protocols, + (NGX_CONF_BITMASK_SET|NGX_SSL_SSLv3 + |NGX_SSL_TLSv1|NGX_SSL_TLSv1_1 + |NGX_SSL_TLSv1_2)); + + ngx_conf_merge_str_value(conf->ssl_ciphers, prev->ssl_ciphers, "DEFAULT"); + + ngx_conf_merge_str_value(conf->ssl_name, prev->ssl_name, ""); + + ngx_conf_merge_value(conf->ssl_server_name, prev->ssl_server_name, 0); + + ngx_conf_merge_value(conf->ssl_verify, prev->ssl_verify, 0); + + ngx_conf_merge_uint_value(conf->ssl_verify_depth, + prev->ssl_verify_depth, 1); + + ngx_conf_merge_str_value(conf->ssl_trusted_certificate, + prev->ssl_trusted_certificate, ""); + + ngx_conf_merge_str_value(conf->ssl_crl, prev->ssl_crl, ""); + + ngx_conf_merge_str_value(conf->ssl_certificate, + prev->ssl_certificate, ""); + + ngx_conf_merge_str_value(conf->ssl_certificate_key, + prev->ssl_certificate_key, ""); + + ngx_conf_merge_ptr_value(conf->ssl_passwords, prev->ssl_passwords, NULL); + + if (conf->ssl_enable && ngx_stream_proxy_set_ssl(cf, conf) != NGX_OK) { + return NGX_CONF_ERROR; + } + +#endif + + return NGX_CONF_OK; +} + + +#if (NGX_STREAM_SSL) + +static ngx_int_t +ngx_stream_proxy_set_ssl(ngx_conf_t *cf, ngx_stream_proxy_srv_conf_t *pscf) +{ + ngx_pool_cleanup_t *cln; + + pscf->ssl = ngx_pcalloc(cf->pool, sizeof(ngx_ssl_t)); + if (pscf->ssl == NULL) { + return NGX_ERROR; + } + + pscf->ssl->log = cf->log; + + if (ngx_ssl_create(pscf->ssl, pscf->ssl_protocols, NULL) != NGX_OK) { + return NGX_ERROR; + } + + cln = ngx_pool_cleanup_add(cf->pool, 0); + if (cln == NULL) { + return NGX_ERROR; + } + + cln->handler = ngx_ssl_cleanup_ctx; + cln->data = pscf->ssl; + + if (pscf->ssl_certificate.len) { + + if (pscf->ssl_certificate_key.len == 0) { + ngx_log_error(NGX_LOG_EMERG, cf->log, 0, + "no \"proxy_ssl_certificate_key\" is defined " + "for certificate \"%V\"", &pscf->ssl_certificate); + return NGX_ERROR; + } + + if (ngx_ssl_certificate(cf, pscf->ssl, &pscf->ssl_certificate, + &pscf->ssl_certificate_key, pscf->ssl_passwords) + != NGX_OK) + { + return NGX_ERROR; + } + } + + if (SSL_CTX_set_cipher_list(pscf->ssl->ctx, + (const char *) pscf->ssl_ciphers.data) + == 0) + { + ngx_ssl_error(NGX_LOG_EMERG, cf->log, 0, + "SSL_CTX_set_cipher_list(\"%V\") failed", + &pscf->ssl_ciphers); + return NGX_ERROR; + } + + if (pscf->ssl_verify) { + if (pscf->ssl_trusted_certificate.len == 0) { + ngx_log_error(NGX_LOG_EMERG, cf->log, 0, + "no proxy_ssl_trusted_certificate for proxy_ssl_verify"); + return NGX_ERROR; + } + + if (ngx_ssl_trusted_certificate(cf, pscf->ssl, + &pscf->ssl_trusted_certificate, + pscf->ssl_verify_depth) + != NGX_OK) + { + return NGX_ERROR; + } + + if (ngx_ssl_crl(cf, pscf->ssl, &pscf->ssl_crl) != NGX_OK) { + return NGX_ERROR; + } + } + + return NGX_OK; +} + +#endif + + +static char * +ngx_stream_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_stream_proxy_srv_conf_t *pscf = conf; + + ngx_url_t u; + ngx_str_t *value, *url; + ngx_stream_core_srv_conf_t *cscf; + + if (pscf->upstream) { + return "is duplicate"; + } + + cscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_core_module); + + cscf->handler = ngx_stream_proxy_handler; + + value = cf->args->elts; + + url = &value[1]; + + ngx_memzero(&u, sizeof(ngx_url_t)); + + u.url = *url; + u.no_resolve = 1; + + pscf->upstream = ngx_stream_upstream_add(cf, &u, 0); + if (pscf->upstream == NULL) { + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} diff --git a/src/stream/ngx_stream_ssl_module.c b/src/stream/ngx_stream_ssl_module.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_ssl_module.c @@ -0,0 +1,456 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +#define NGX_DEFAULT_CIPHERS "HIGH:!aNULL:!MD5" +#define NGX_DEFAULT_ECDH_CURVE "prime256v1" + + +static void *ngx_stream_ssl_create_conf(ngx_conf_t *cf); +static char *ngx_stream_ssl_merge_conf(ngx_conf_t *cf, void *parent, + void *child); + +static char *ngx_stream_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); +static char *ngx_stream_ssl_session_cache(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +static ngx_conf_bitmask_t ngx_stream_ssl_protocols[] = { + { ngx_string("SSLv2"), NGX_SSL_SSLv2 }, + { ngx_string("SSLv3"), NGX_SSL_SSLv3 }, + { ngx_string("TLSv1"), NGX_SSL_TLSv1 }, + { ngx_string("TLSv1.1"), NGX_SSL_TLSv1_1 }, + { ngx_string("TLSv1.2"), NGX_SSL_TLSv1_2 }, + { ngx_null_string, 0 } +}; + + +static ngx_command_t ngx_stream_ssl_commands[] = { + + { ngx_string("ssl_handshake_timeout"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, handshake_timeout), + NULL }, + + { ngx_string("ssl_certificate"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, certificate), + NULL }, + + { ngx_string("ssl_certificate_key"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, certificate_key), + NULL }, + + { ngx_string("ssl_password_file"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_stream_ssl_password_file, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("ssl_dhparam"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, dhparam), + NULL }, + + { ngx_string("ssl_ecdh_curve"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, ecdh_curve), + NULL }, + + { ngx_string("ssl_protocols"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_1MORE, + ngx_conf_set_bitmask_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, protocols), + &ngx_stream_ssl_protocols }, + + { ngx_string("ssl_ciphers"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, ciphers), + NULL }, + + { ngx_string("ssl_prefer_server_ciphers"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, prefer_server_ciphers), + NULL }, + + { ngx_string("ssl_session_cache"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE12, + ngx_stream_ssl_session_cache, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("ssl_session_tickets"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, session_tickets), + NULL }, + + { ngx_string("ssl_session_ticket_key"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_array_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, session_ticket_keys), + NULL }, + + { ngx_string("ssl_session_timeout"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_sec_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_ssl_conf_t, session_timeout), + NULL }, + + ngx_null_command +}; + + +static ngx_stream_module_t ngx_stream_ssl_module_ctx = { + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_stream_ssl_create_conf, /* create server configuration */ + ngx_stream_ssl_merge_conf /* merge server configuration */ +}; + + +ngx_module_t ngx_stream_ssl_module = { + NGX_MODULE_V1, + &ngx_stream_ssl_module_ctx, /* module context */ + ngx_stream_ssl_commands, /* module directives */ + NGX_STREAM_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static ngx_str_t ngx_stream_ssl_sess_id_ctx = ngx_string("STREAM"); + + +static void * +ngx_stream_ssl_create_conf(ngx_conf_t *cf) +{ + ngx_stream_ssl_conf_t *scf; + + scf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_ssl_conf_t)); + if (scf == NULL) { + return NULL; + } + + /* + * set by ngx_pcalloc(): + * + * scf->protocols = 0; + * scf->certificate = { 0, NULL }; + * scf->certificate_key = { 0, NULL }; + * scf->dhparam = { 0, NULL }; + * scf->ecdh_curve = { 0, NULL }; + * scf->ciphers = { 0, NULL }; + * scf->shm_zone = NULL; + */ + + scf->handshake_timeout = NGX_CONF_UNSET_MSEC; + scf->passwords = NGX_CONF_UNSET_PTR; + scf->prefer_server_ciphers = NGX_CONF_UNSET; + scf->builtin_session_cache = NGX_CONF_UNSET; + scf->session_timeout = NGX_CONF_UNSET; + scf->session_tickets = NGX_CONF_UNSET; + scf->session_ticket_keys = NGX_CONF_UNSET_PTR; + + return scf; +} + + +static char * +ngx_stream_ssl_merge_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_stream_ssl_conf_t *prev = parent; + ngx_stream_ssl_conf_t *conf = child; + + ngx_pool_cleanup_t *cln; + + ngx_conf_merge_msec_value(conf->handshake_timeout, + prev->handshake_timeout, 60000); + + ngx_conf_merge_value(conf->session_timeout, + prev->session_timeout, 300); + + ngx_conf_merge_value(conf->prefer_server_ciphers, + prev->prefer_server_ciphers, 0); + + ngx_conf_merge_bitmask_value(conf->protocols, prev->protocols, + (NGX_CONF_BITMASK_SET|NGX_SSL_SSLv3|NGX_SSL_TLSv1 + |NGX_SSL_TLSv1_1|NGX_SSL_TLSv1_2)); + + ngx_conf_merge_str_value(conf->certificate, prev->certificate, ""); + ngx_conf_merge_str_value(conf->certificate_key, prev->certificate_key, ""); + + ngx_conf_merge_ptr_value(conf->passwords, prev->passwords, NULL); + + ngx_conf_merge_str_value(conf->dhparam, prev->dhparam, ""); + + ngx_conf_merge_str_value(conf->ecdh_curve, prev->ecdh_curve, + NGX_DEFAULT_ECDH_CURVE); + + ngx_conf_merge_str_value(conf->ciphers, prev->ciphers, NGX_DEFAULT_CIPHERS); + + + conf->ssl.log = cf->log; + + if (conf->certificate.len == 0) { + return NGX_CONF_OK; + } + + if (conf->certificate_key.len == 0) { + ngx_log_error(NGX_LOG_EMERG, cf->log, 0, + "no \"ssl_certificate_key\" is defined " + "for certificate \"%V\"", + &conf->certificate); + return NGX_CONF_ERROR; + } + + if (ngx_ssl_create(&conf->ssl, conf->protocols, NULL) != NGX_OK) { + return NGX_CONF_ERROR; + } + + cln = ngx_pool_cleanup_add(cf->pool, 0); + if (cln == NULL) { + return NGX_CONF_ERROR; + } + + cln->handler = ngx_ssl_cleanup_ctx; + cln->data = &conf->ssl; + + if (ngx_ssl_certificate(cf, &conf->ssl, &conf->certificate, + &conf->certificate_key, conf->passwords) + != NGX_OK) + { + return NGX_CONF_ERROR; + } + + if (SSL_CTX_set_cipher_list(conf->ssl.ctx, + (const char *) conf->ciphers.data) + == 0) + { + ngx_ssl_error(NGX_LOG_EMERG, cf->log, 0, + "SSL_CTX_set_cipher_list(\"%V\") failed", + &conf->ciphers); + return NGX_CONF_ERROR; + } + + if (conf->prefer_server_ciphers) { + SSL_CTX_set_options(conf->ssl.ctx, SSL_OP_CIPHER_SERVER_PREFERENCE); + } + + SSL_CTX_set_tmp_rsa_callback(conf->ssl.ctx, ngx_ssl_rsa512_key_callback); + + if (ngx_ssl_dhparam(cf, &conf->ssl, &conf->dhparam) != NGX_OK) { + return NGX_CONF_ERROR; + } + + if (ngx_ssl_ecdh_curve(cf, &conf->ssl, &conf->ecdh_curve) != NGX_OK) { + return NGX_CONF_ERROR; + } + + ngx_conf_merge_value(conf->builtin_session_cache, + prev->builtin_session_cache, NGX_SSL_NONE_SCACHE); + + if (conf->shm_zone == NULL) { + conf->shm_zone = prev->shm_zone; + } + + if (ngx_ssl_session_cache(&conf->ssl, &ngx_stream_ssl_sess_id_ctx, + conf->builtin_session_cache, + conf->shm_zone, conf->session_timeout) + != NGX_OK) + { + return NGX_CONF_ERROR; + } + + ngx_conf_merge_value(conf->session_tickets, + prev->session_tickets, 1); + +#ifdef SSL_OP_NO_TICKET + if (!conf->session_tickets) { + SSL_CTX_set_options(conf->ssl.ctx, SSL_OP_NO_TICKET); + } +#endif + + ngx_conf_merge_ptr_value(conf->session_ticket_keys, + prev->session_ticket_keys, NULL); + + if (ngx_ssl_session_ticket_keys(cf, &conf->ssl, conf->session_ticket_keys) + != NGX_OK) + { + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} + + +static char * +ngx_stream_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_stream_ssl_conf_t *scf = conf; + + ngx_str_t *value; + + if (scf->passwords != NGX_CONF_UNSET_PTR) { + return "is duplicate"; + } + + value = cf->args->elts; + + scf->passwords = ngx_ssl_read_password_file(cf, &value[1]); + + if (scf->passwords == NULL) { + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} + + +static char * +ngx_stream_ssl_session_cache(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_stream_ssl_conf_t *scf = conf; + + size_t len; + ngx_str_t *value, name, size; + ngx_int_t n; + ngx_uint_t i, j; + + value = cf->args->elts; + + for (i = 1; i < cf->args->nelts; i++) { + + if (ngx_strcmp(value[i].data, "off") == 0) { + scf->builtin_session_cache = NGX_SSL_NO_SCACHE; + continue; + } + + if (ngx_strcmp(value[i].data, "none") == 0) { + scf->builtin_session_cache = NGX_SSL_NONE_SCACHE; + continue; + } + + if (ngx_strcmp(value[i].data, "builtin") == 0) { + scf->builtin_session_cache = NGX_SSL_DFLT_BUILTIN_SCACHE; + continue; + } + + if (value[i].len > sizeof("builtin:") - 1 + && ngx_strncmp(value[i].data, "builtin:", sizeof("builtin:") - 1) + == 0) + { + n = ngx_atoi(value[i].data + sizeof("builtin:") - 1, + value[i].len - (sizeof("builtin:") - 1)); + + if (n == NGX_ERROR) { + goto invalid; + } + + scf->builtin_session_cache = n; + + continue; + } + + if (value[i].len > sizeof("shared:") - 1 + && ngx_strncmp(value[i].data, "shared:", sizeof("shared:") - 1) + == 0) + { + len = 0; + + for (j = sizeof("shared:") - 1; j < value[i].len; j++) { + if (value[i].data[j] == ':') { + break; + } + + len++; + } + + if (len == 0) { + goto invalid; + } + + name.len = len; + name.data = value[i].data + sizeof("shared:") - 1; + + size.len = value[i].len - j - 1; + size.data = name.data + len + 1; + + n = ngx_parse_size(&size); + + if (n == NGX_ERROR) { + goto invalid; + } + + if (n < (ngx_int_t) (8 * ngx_pagesize)) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "session cache \"%V\" is too small", + &value[i]); + + return NGX_CONF_ERROR; + } + + scf->shm_zone = ngx_shared_memory_add(cf, &name, n, + &ngx_stream_ssl_module); + if (scf->shm_zone == NULL) { + return NGX_CONF_ERROR; + } + + scf->shm_zone->init = ngx_ssl_session_cache_init; + + continue; + } + + goto invalid; + } + + if (scf->shm_zone && scf->builtin_session_cache == NGX_CONF_UNSET) { + scf->builtin_session_cache = NGX_SSL_NO_BUILTIN_SCACHE; + } + + return NGX_CONF_OK; + +invalid: + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid session cache \"%V\"", &value[i]); + + return NGX_CONF_ERROR; +} diff --git a/src/stream/ngx_stream_ssl_module.h b/src/stream/ngx_stream_ssl_module.h new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_ssl_module.h @@ -0,0 +1,49 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#ifndef _NGX_STREAM_SSL_H_INCLUDED_ +#define _NGX_STREAM_SSL_H_INCLUDED_ + + +#include +#include +#include + + +typedef struct { + ngx_msec_t handshake_timeout; + + ngx_flag_t prefer_server_ciphers; + + ngx_ssl_t ssl; + + ngx_uint_t protocols; + + ssize_t builtin_session_cache; + + time_t session_timeout; + + ngx_str_t certificate; + ngx_str_t certificate_key; + ngx_str_t dhparam; + ngx_str_t ecdh_curve; + + ngx_str_t ciphers; + + ngx_array_t *passwords; + + ngx_shm_zone_t *shm_zone; + + ngx_flag_t session_tickets; + ngx_array_t *session_ticket_keys; +} ngx_stream_ssl_conf_t; + + +extern ngx_module_t ngx_stream_ssl_module; + + +#endif /* _NGX_STREAM_SSL_H_INCLUDED_ */ diff --git a/src/stream/ngx_stream_upstream.c b/src/stream/ngx_stream_upstream.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_upstream.c @@ -0,0 +1,462 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +static char *ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd, + void *dummy); +static char *ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); +static void *ngx_stream_upstream_create_main_conf(ngx_conf_t *cf); +static char *ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf); + + +static ngx_command_t ngx_stream_upstream_commands[] = { + + { ngx_string("upstream"), + NGX_STREAM_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_TAKE1, + ngx_stream_upstream, + 0, + 0, + NULL }, + + { ngx_string("server"), + NGX_STREAM_UPS_CONF|NGX_CONF_1MORE, + ngx_stream_upstream_server, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_stream_module_t ngx_stream_upstream_module_ctx = { + ngx_stream_upstream_create_main_conf, /* create main configuration */ + ngx_stream_upstream_init_main_conf, /* init main configuration */ + + NULL, /* create server configuration */ + NULL, /* merge server configuration */ +}; + + +ngx_module_t ngx_stream_upstream_module = { + NGX_MODULE_V1, + &ngx_stream_upstream_module_ctx, /* module context */ + ngx_stream_upstream_commands, /* module directives */ + NGX_STREAM_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static char * +ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy) +{ + char *rv; + void *mconf; + ngx_str_t *value; + ngx_url_t u; + ngx_uint_t m; + ngx_conf_t pcf; + ngx_stream_module_t *module; + ngx_stream_conf_ctx_t *ctx, *stream_ctx; + ngx_stream_upstream_srv_conf_t *uscf; + + ngx_memzero(&u, sizeof(ngx_url_t)); + + value = cf->args->elts; + u.host = value[1]; + u.no_resolve = 1; + u.no_port = 1; + + uscf = ngx_stream_upstream_add(cf, &u, NGX_STREAM_UPSTREAM_CREATE + |NGX_STREAM_UPSTREAM_WEIGHT + |NGX_STREAM_UPSTREAM_MAX_FAILS + |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT + |NGX_STREAM_UPSTREAM_DOWN + |NGX_STREAM_UPSTREAM_BACKUP); + if (uscf == NULL) { + return NGX_CONF_ERROR; + } + + + ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t)); + if (ctx == NULL) { + return NGX_CONF_ERROR; + } + + stream_ctx = cf->ctx; + ctx->main_conf = stream_ctx->main_conf; + + /* the upstream{}'s srv_conf */ + + ctx->srv_conf = ngx_pcalloc(cf->pool, + sizeof(void *) * ngx_stream_max_module); + if (ctx->srv_conf == NULL) { + return NGX_CONF_ERROR; + } + + ctx->srv_conf[ngx_stream_upstream_module.ctx_index] = uscf; + + uscf->srv_conf = ctx->srv_conf; + + for (m = 0; ngx_modules[m]; m++) { + if (ngx_modules[m]->type != NGX_STREAM_MODULE) { + continue; + } + + module = ngx_modules[m]->ctx; + + if (module->create_srv_conf) { + mconf = module->create_srv_conf(cf); + if (mconf == NULL) { + return NGX_CONF_ERROR; + } + + ctx->srv_conf[ngx_modules[m]->ctx_index] = mconf; + } + } + + uscf->servers = ngx_array_create(cf->pool, 4, + sizeof(ngx_stream_upstream_server_t)); + if (uscf->servers == NULL) { + return NGX_CONF_ERROR; + } + + + /* parse inside upstream{} */ + + pcf = *cf; + cf->ctx = ctx; + cf->cmd_type = NGX_STREAM_UPS_CONF; + + rv = ngx_conf_parse(cf, NULL); + + *cf = pcf; + + if (rv != NGX_CONF_OK) { + return rv; + } + + if (uscf->servers->nelts == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "no servers are inside upstream"); + return NGX_CONF_ERROR; + } + + return rv; +} + + +static char * +ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_stream_upstream_srv_conf_t *uscf = conf; + + time_t fail_timeout; + ngx_str_t *value, s; + ngx_url_t u; + ngx_int_t weight, max_fails; + ngx_uint_t i; + ngx_stream_upstream_server_t *us; + + us = ngx_array_push(uscf->servers); + if (us == NULL) { + return NGX_CONF_ERROR; + } + + ngx_memzero(us, sizeof(ngx_stream_upstream_server_t)); + + value = cf->args->elts; + + weight = 1; + max_fails = 1; + fail_timeout = 10; + + for (i = 2; i < cf->args->nelts; i++) { + + if (ngx_strncmp(value[i].data, "weight=", 7) == 0) { + + if (!(uscf->flags & NGX_STREAM_UPSTREAM_WEIGHT)) { + goto not_supported; + } + + weight = ngx_atoi(&value[i].data[7], value[i].len - 7); + + if (weight == NGX_ERROR || weight == 0) { + goto invalid; + } + + continue; + } + + if (ngx_strncmp(value[i].data, "max_fails=", 10) == 0) { + + if (!(uscf->flags & NGX_STREAM_UPSTREAM_MAX_FAILS)) { + goto not_supported; + } + + max_fails = ngx_atoi(&value[i].data[10], value[i].len - 10); + + if (max_fails == NGX_ERROR) { + goto invalid; + } + + continue; + } + + if (ngx_strncmp(value[i].data, "fail_timeout=", 13) == 0) { + + if (!(uscf->flags & NGX_STREAM_UPSTREAM_FAIL_TIMEOUT)) { + goto not_supported; + } + + s.len = value[i].len - 13; + s.data = &value[i].data[13]; + + fail_timeout = ngx_parse_time(&s, 1); + + if (fail_timeout == (time_t) NGX_ERROR) { + goto invalid; + } + + continue; + } + + if (ngx_strcmp(value[i].data, "backup") == 0) { + + if (!(uscf->flags & NGX_STREAM_UPSTREAM_BACKUP)) { + goto not_supported; + } + + us->backup = 1; + + continue; + } + + if (ngx_strcmp(value[i].data, "down") == 0) { + + if (!(uscf->flags & NGX_STREAM_UPSTREAM_DOWN)) { + goto not_supported; + } + + us->down = 1; + + continue; + } + + goto invalid; + } + + ngx_memzero(&u, sizeof(ngx_url_t)); + + u.url = value[1]; + + if (ngx_parse_url(cf->pool, &u) != NGX_OK) { + if (u.err) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "%s in upstream \"%V\"", u.err, &u.url); + } + + return NGX_CONF_ERROR; + } + + if (u.no_port) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "no port in upstream \"%V\"", &u.url); + return NGX_CONF_ERROR; + } + + us->name = u.url; + us->addrs = u.addrs; + us->naddrs = u.naddrs; + us->weight = weight; + us->max_fails = max_fails; + us->fail_timeout = fail_timeout; + + return NGX_CONF_OK; + +invalid: + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid parameter \"%V\"", &value[i]); + + return NGX_CONF_ERROR; + +not_supported: + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "balancing method does not support parameter \"%V\"", + &value[i]); + + return NGX_CONF_ERROR; +} + + +ngx_stream_upstream_srv_conf_t * +ngx_stream_upstream_add(ngx_conf_t *cf, ngx_url_t *u, ngx_uint_t flags) +{ + ngx_uint_t i; + ngx_stream_upstream_server_t *us; + ngx_stream_upstream_srv_conf_t *uscf, **uscfp; + ngx_stream_upstream_main_conf_t *umcf; + + if (!(flags & NGX_STREAM_UPSTREAM_CREATE)) { + + if (ngx_parse_url(cf->pool, u) != NGX_OK) { + if (u->err) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "%s in upstream \"%V\"", u->err, &u->url); + } + + return NULL; + } + } + + umcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_upstream_module); + + uscfp = umcf->upstreams.elts; + + for (i = 0; i < umcf->upstreams.nelts; i++) { + + if (uscfp[i]->host.len != u->host.len + || ngx_strncasecmp(uscfp[i]->host.data, u->host.data, u->host.len) + != 0) + { + continue; + } + + if ((flags & NGX_STREAM_UPSTREAM_CREATE) + && (uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE)) + { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "duplicate upstream \"%V\"", &u->host); + return NULL; + } + + if ((uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE) && !u->no_port) { + ngx_conf_log_error(NGX_LOG_WARN, cf, 0, + "upstream \"%V\" may not have port %d", + &u->host, u->port); + return NULL; + } + + if ((flags & NGX_STREAM_UPSTREAM_CREATE) && !uscfp[i]->no_port) { + ngx_log_error(NGX_LOG_WARN, cf->log, 0, + "upstream \"%V\" may not have port %d in %s:%ui", + &u->host, uscfp[i]->port, + uscfp[i]->file_name, uscfp[i]->line); + return NULL; + } + + if (uscfp[i]->port != u->port) { + continue; + } + + if (flags & NGX_STREAM_UPSTREAM_CREATE) { + uscfp[i]->flags = flags; + } + + return uscfp[i]; + } + + uscf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_srv_conf_t)); + if (uscf == NULL) { + return NULL; + } + + uscf->flags = flags; + uscf->host = u->host; + uscf->file_name = cf->conf_file->file.name.data; + uscf->line = cf->conf_file->line; + uscf->port = u->port; + uscf->no_port = u->no_port; + + if (u->naddrs == 1) { + uscf->servers = ngx_array_create(cf->pool, 1, + sizeof(ngx_stream_upstream_server_t)); + if (uscf->servers == NULL) { + return NULL; + } + + us = ngx_array_push(uscf->servers); + if (us == NULL) { + return NULL; + } + + ngx_memzero(us, sizeof(ngx_stream_upstream_server_t)); + + us->addrs = u->addrs; + us->naddrs = 1; + } + + uscfp = ngx_array_push(&umcf->upstreams); + if (uscfp == NULL) { + return NULL; + } + + *uscfp = uscf; + + return uscf; +} + + +static void * +ngx_stream_upstream_create_main_conf(ngx_conf_t *cf) +{ + ngx_stream_upstream_main_conf_t *umcf; + + umcf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_main_conf_t)); + if (umcf == NULL) { + return NULL; + } + + if (ngx_array_init(&umcf->upstreams, cf->pool, 4, + sizeof(ngx_stream_upstream_srv_conf_t *)) + != NGX_OK) + { + return NULL; + } + + return umcf; +} + + +static char * +ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf) +{ + ngx_stream_upstream_main_conf_t *umcf = conf; + + ngx_uint_t i; + ngx_stream_upstream_init_pt init; + ngx_stream_upstream_srv_conf_t **uscfp; + + uscfp = umcf->upstreams.elts; + + for (i = 0; i < umcf->upstreams.nelts; i++) { + + init = uscfp[i]->peer.init_upstream + ? uscfp[i]->peer.init_upstream + : ngx_stream_upstream_init_round_robin; + + if (init(cf, uscfp[i]) != NGX_OK) { + return NGX_CONF_ERROR; + } + } + + return NGX_CONF_OK; +} diff --git a/src/stream/ngx_stream_upstream.h b/src/stream/ngx_stream_upstream.h new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_upstream.h @@ -0,0 +1,103 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#ifndef _NGX_STREAM_UPSTREAM_H_INCLUDED_ +#define _NGX_STREAM_UPSTREAM_H_INCLUDED_ + + +#include +#include +#include +#include + + +#define NGX_STREAM_UPSTREAM_CREATE 0x0001 +#define NGX_STREAM_UPSTREAM_WEIGHT 0x0002 +#define NGX_STREAM_UPSTREAM_MAX_FAILS 0x0004 +#define NGX_STREAM_UPSTREAM_FAIL_TIMEOUT 0x0008 +#define NGX_STREAM_UPSTREAM_DOWN 0x0010 +#define NGX_STREAM_UPSTREAM_BACKUP 0x0020 + + +typedef struct { + ngx_array_t upstreams; + /* ngx_stream_upstream_srv_conf_t */ +} ngx_stream_upstream_main_conf_t; + + +typedef struct ngx_stream_upstream_srv_conf_s ngx_stream_upstream_srv_conf_t; + + +typedef ngx_int_t (*ngx_stream_upstream_init_pt)(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us); +typedef ngx_int_t (*ngx_stream_upstream_init_peer_pt)(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us); + + +typedef struct { + ngx_stream_upstream_init_pt init_upstream; + ngx_stream_upstream_init_peer_pt init; + void *data; +} ngx_stream_upstream_peer_t; + + +typedef struct { + ngx_str_t name; + ngx_addr_t *addrs; + ngx_uint_t naddrs; + ngx_uint_t weight; + ngx_uint_t max_fails; + time_t fail_timeout; + + unsigned down:1; + unsigned backup:1; +} ngx_stream_upstream_server_t; + + +struct ngx_stream_upstream_srv_conf_s { + ngx_stream_upstream_peer_t peer; + void **srv_conf; + + ngx_array_t *servers; + /* ngx_stream_upstream_server_t */ + + ngx_uint_t flags; + ngx_str_t host; + u_char *file_name; + ngx_uint_t line; + in_port_t port; + ngx_uint_t no_port; /* unsigned no_port:1 */ + +#if (NGX_STREAM_UPSTREAM_ZONE) + ngx_shm_zone_t *shm_zone; +#endif +}; + + +typedef struct { + ngx_peer_connection_t peer; + ngx_buf_t downstream_buf; + ngx_buf_t upstream_buf; + off_t received; +#if (NGX_STREAM_SSL) + ngx_str_t ssl_name; +#endif +} ngx_stream_upstream_t; + + +ngx_stream_upstream_srv_conf_t *ngx_stream_upstream_add(ngx_conf_t *cf, + ngx_url_t *u, ngx_uint_t flags); + + +#define ngx_stream_conf_upstream_srv_conf(uscf, module) \ + uscf->srv_conf[module.ctx_index] + + +extern ngx_module_t ngx_stream_upstream_module; + + +#endif /* _NGX_STREAM_UPSTREAM_H_INCLUDED_ */ diff --git a/src/stream/ngx_stream_upstream_hash_module.c b/src/stream/ngx_stream_upstream_hash_module.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_upstream_hash_module.c @@ -0,0 +1,657 @@ + +/* + * Copyright (C) Roman Arutyunyan + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +typedef struct { + uint32_t hash; + ngx_str_t *server; +} ngx_stream_upstream_chash_point_t; + + +typedef struct { + ngx_uint_t number; + ngx_stream_upstream_chash_point_t point[1]; +} ngx_stream_upstream_chash_points_t; + + +typedef struct { + ngx_stream_upstream_chash_points_t *points; +} ngx_stream_upstream_hash_srv_conf_t; + + +typedef struct { + /* the round robin data must be first */ + ngx_stream_upstream_rr_peer_data_t rrp; + ngx_stream_upstream_hash_srv_conf_t *conf; + ngx_str_t key; + ngx_uint_t tries; + ngx_uint_t rehash; + uint32_t hash; + ngx_event_get_peer_pt get_rr_peer; +} ngx_stream_upstream_hash_peer_data_t; + + +static ngx_int_t ngx_stream_upstream_init_hash(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us); +static ngx_int_t ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us); +static ngx_int_t ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc, + void *data); + +static ngx_int_t ngx_stream_upstream_init_chash(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us); +static int ngx_libc_cdecl + ngx_stream_upstream_chash_cmp_points(const void *one, const void *two); +static ngx_uint_t ngx_stream_upstream_find_chash_point( + ngx_stream_upstream_chash_points_t *points, uint32_t hash); +static ngx_int_t ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us); +static ngx_int_t ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc, + void *data); + +static void *ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf); +static char *ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +static ngx_command_t ngx_stream_upstream_hash_commands[] = { + + { ngx_string("hash"), + NGX_STREAM_UPS_CONF|NGX_CONF_TAKE12, + ngx_stream_upstream_hash, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_stream_module_t ngx_stream_upstream_hash_module_ctx = { + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_stream_upstream_hash_create_conf, /* create server configuration */ + NULL, /* merge server configuration */ +}; + + +ngx_module_t ngx_stream_upstream_hash_module = { + NGX_MODULE_V1, + &ngx_stream_upstream_hash_module_ctx, /* module context */ + ngx_stream_upstream_hash_commands, /* module directives */ + NGX_STREAM_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static ngx_int_t +ngx_stream_upstream_init_hash(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us) +{ + if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) { + return NGX_ERROR; + } + + us->peer.init = ngx_stream_upstream_init_hash_peer; + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us) +{ + ngx_stream_upstream_hash_srv_conf_t *hcf; + ngx_stream_upstream_hash_peer_data_t *hp; + + hp = ngx_palloc(s->connection->pool, + sizeof(ngx_stream_upstream_hash_peer_data_t)); + if (hp == NULL) { + return NGX_ERROR; + } + + s->upstream->peer.data = &hp->rrp; + + if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) { + return NGX_ERROR; + } + + s->upstream->peer.get = ngx_stream_upstream_get_hash_peer; + + hcf = ngx_stream_conf_upstream_srv_conf(us, + ngx_stream_upstream_hash_module); + + hp->key = s->connection->addr_text; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, + "upstream hash key:\"%V\"", &hp->key); + + hp->conf = hcf; + hp->tries = 0; + hp->rehash = 0; + hp->hash = 0; + hp->get_rr_peer = ngx_stream_upstream_get_round_robin_peer; + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc, void *data) +{ + ngx_stream_upstream_hash_peer_data_t *hp = data; + + time_t now; + u_char buf[NGX_INT_T_LEN]; + size_t size; + uint32_t hash; + ngx_int_t w; + uintptr_t m; + ngx_uint_t i, n, p; + ngx_stream_upstream_rr_peer_t *peer; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "get hash peer, try: %ui", pc->tries); + + ngx_stream_upstream_rr_peers_wlock(hp->rrp.peers); + + if (hp->tries > 20 || hp->rrp.peers->single) { + ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers); + return hp->get_rr_peer(pc, &hp->rrp); + } + + now = ngx_time(); + + pc->connection = NULL; + + for ( ;; ) { + + /* + * Hash expression is compatible with Cache::Memcached: + * ((crc32([REHASH] KEY) >> 16) & 0x7fff) + PREV_HASH + * with REHASH omitted at the first iteration. + */ + + ngx_crc32_init(hash); + + if (hp->rehash > 0) { + size = ngx_sprintf(buf, "%ui", hp->rehash) - buf; + ngx_crc32_update(&hash, buf, size); + } + + ngx_crc32_update(&hash, hp->key.data, hp->key.len); + ngx_crc32_final(hash); + + hash = (hash >> 16) & 0x7fff; + + hp->hash += hash; + hp->rehash++; + + if (!hp->rrp.peers->weighted) { + p = hp->hash % hp->rrp.peers->number; + + peer = hp->rrp.peers->peer; + for (i = 0; i < p; i++) { + peer = peer->next; + } + + } else { + w = hp->hash % hp->rrp.peers->total_weight; + + for (peer = hp->rrp.peers->peer, i = 0; + peer; + peer = peer->next, i++) + { + w -= peer->weight; + if (w < 0) { + break; + } + } + + p = i; + } + + n = p / (8 * sizeof(uintptr_t)); + m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t)); + + if (hp->rrp.tried[n] & m) { + goto next; + } + + ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "get hash peer, value:%uD, peer:%ui", hp->hash, p); + + if (peer->down) { + goto next; + } + + if (peer->max_fails + && peer->fails >= peer->max_fails + && now - peer->checked <= peer->fail_timeout) + { + goto next; + } + + break; + + next: + + if (++hp->tries > 20) { + ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers); + return hp->get_rr_peer(pc, &hp->rrp); + } + } + + hp->rrp.current = peer; + + pc->sockaddr = peer->sockaddr; + pc->socklen = peer->socklen; + pc->name = &peer->name; + + peer->conns++; + + if (now - peer->checked > peer->fail_timeout) { + peer->checked = now; + } + + ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers); + + hp->rrp.tried[n] |= m; + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_upstream_init_chash(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us) +{ + u_char *host, *port, c; + size_t host_len, port_len, size; + uint32_t hash, base_hash, prev_hash; + ngx_str_t *server; + ngx_uint_t npoints, i, j; + ngx_stream_upstream_rr_peer_t *peer; + ngx_stream_upstream_rr_peers_t *peers; + ngx_stream_upstream_chash_points_t *points; + ngx_stream_upstream_hash_srv_conf_t *hcf; + + if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) { + return NGX_ERROR; + } + + us->peer.init = ngx_stream_upstream_init_chash_peer; + + peers = us->peer.data; + npoints = peers->total_weight * 160; + + size = sizeof(ngx_stream_upstream_chash_points_t) + + sizeof(ngx_stream_upstream_chash_point_t) * (npoints - 1); + + points = ngx_palloc(cf->pool, size); + if (points == NULL) { + return NGX_ERROR; + } + + points->number = 0; + + for (peer = peers->peer; peer; peer = peer->next) { + server = &peer->server; + + /* + * Hash expression is compatible with Cache::Memcached::Fast: + * crc32(HOST \0 PORT PREV_HASH). + */ + + if (server->len >= 5 + && ngx_strncasecmp(server->data, (u_char *) "unix:", 5) == 0) + { + host = server->data + 5; + host_len = server->len - 5; + port = NULL; + port_len = 0; + goto done; + } + + for (j = 0; j < server->len; j++) { + c = server->data[server->len - j - 1]; + + if (c == ':') { + host = server->data; + host_len = server->len - j - 1; + port = server->data + server->len - j; + port_len = j; + goto done; + } + + if (c < '0' || c > '9') { + break; + } + } + + host = server->data; + host_len = server->len; + port = NULL; + port_len = 0; + + done: + + ngx_crc32_init(base_hash); + ngx_crc32_update(&base_hash, host, host_len); + ngx_crc32_update(&base_hash, (u_char *) "", 1); + ngx_crc32_update(&base_hash, port, port_len); + + prev_hash = 0; + npoints = peer->weight * 160; + + for (j = 0; j < npoints; j++) { + hash = base_hash; + + ngx_crc32_update(&hash, (u_char *) &prev_hash, sizeof(uint32_t)); + ngx_crc32_final(hash); + + points->point[points->number].hash = hash; + points->point[points->number].server = server; + points->number++; + + prev_hash = hash; + } + } + + ngx_qsort(points->point, + points->number, + sizeof(ngx_stream_upstream_chash_point_t), + ngx_stream_upstream_chash_cmp_points); + + for (i = 0, j = 1; j < points->number; j++) { + if (points->point[i].hash != points->point[j].hash) { + points->point[++i] = points->point[j]; + } + } + + points->number = i + 1; + + hcf = ngx_stream_conf_upstream_srv_conf(us, + ngx_stream_upstream_hash_module); + hcf->points = points; + + return NGX_OK; +} + + +static int ngx_libc_cdecl +ngx_stream_upstream_chash_cmp_points(const void *one, const void *two) +{ + ngx_stream_upstream_chash_point_t *first = + (ngx_stream_upstream_chash_point_t *) one; + ngx_stream_upstream_chash_point_t *second = + (ngx_stream_upstream_chash_point_t *) two; + + if (first->hash < second->hash) { + return -1; + + } else if (first->hash > second->hash) { + return 1; + + } else { + return 0; + } +} + + +static ngx_uint_t +ngx_stream_upstream_find_chash_point(ngx_stream_upstream_chash_points_t *points, + uint32_t hash) +{ + ngx_uint_t i, j, k; + ngx_stream_upstream_chash_point_t *point; + + /* find first point >= hash */ + + point = &points->point[0]; + + i = 0; + j = points->number; + + while (i < j) { + k = (i + j) / 2; + + if (hash > point[k].hash) { + i = k + 1; + + } else if (hash < point[k].hash) { + j = k; + + } else { + return k; + } + } + + return i; +} + + +static ngx_int_t +ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us) +{ + uint32_t hash; + ngx_stream_upstream_hash_srv_conf_t *hcf; + ngx_stream_upstream_hash_peer_data_t *hp; + + if (ngx_stream_upstream_init_hash_peer(s, us) != NGX_OK) { + return NGX_ERROR; + } + + s->upstream->peer.get = ngx_stream_upstream_get_chash_peer; + + hp = s->upstream->peer.data; + hcf = ngx_stream_conf_upstream_srv_conf(us, + ngx_stream_upstream_hash_module); + + hash = ngx_crc32_long(hp->key.data, hp->key.len); + + ngx_stream_upstream_rr_peers_rlock(hp->rrp.peers); + + hp->hash = ngx_stream_upstream_find_chash_point(hcf->points, hash); + + ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers); + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc, void *data) +{ + ngx_stream_upstream_hash_peer_data_t *hp = data; + + time_t now; + intptr_t m; + ngx_str_t *server; + ngx_int_t total; + ngx_uint_t i, n, best_i; + ngx_stream_upstream_rr_peer_t *peer, *best; + ngx_stream_upstream_chash_point_t *point; + ngx_stream_upstream_chash_points_t *points; + ngx_stream_upstream_hash_srv_conf_t *hcf; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "get consistent hash peer, try: %ui", pc->tries); + + ngx_stream_upstream_rr_peers_wlock(hp->rrp.peers); + + pc->connection = NULL; + + now = ngx_time(); + hcf = hp->conf; + + points = hcf->points; + point = &points->point[0]; + + for ( ;; ) { + server = point[hp->hash % points->number].server; + + ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "consistent hash peer:%uD, server:\"%V\"", + hp->hash, server); + + best = NULL; + best_i = 0; + total = 0; + + for (peer = hp->rrp.peers->peer, i = 0; + peer; + peer = peer->next, i++) + { + + n = i / (8 * sizeof(uintptr_t)); + m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t)); + + if (hp->rrp.tried[n] & m) { + continue; + } + + if (peer->down) { + continue; + } + + if (peer->server.len != server->len + || ngx_strncmp(peer->server.data, server->data, server->len) + != 0) + { + continue; + } + + if (peer->max_fails + && peer->fails >= peer->max_fails + && now - peer->checked <= peer->fail_timeout) + { + continue; + } + + peer->current_weight += peer->effective_weight; + total += peer->effective_weight; + + if (peer->effective_weight < peer->weight) { + peer->effective_weight++; + } + + if (best == NULL || peer->current_weight > best->current_weight) { + best = peer; + best_i = i; + } + } + + if (best) { + best->current_weight -= total; + break; + } + + hp->hash++; + hp->tries++; + + if (hp->tries >= points->number) { + ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers); + return NGX_BUSY; + } + } + + hp->rrp.current = best; + + pc->sockaddr = best->sockaddr; + pc->socklen = best->socklen; + pc->name = &best->name; + + best->conns++; + + if (now - best->checked > best->fail_timeout) { + best->checked = now; + } + + ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers); + + n = best_i / (8 * sizeof(uintptr_t)); + m = (uintptr_t) 1 << best_i % (8 * sizeof(uintptr_t)); + + hp->rrp.tried[n] |= m; + + return NGX_OK; +} + + +static void * +ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf) +{ + ngx_stream_upstream_hash_srv_conf_t *conf; + + conf = ngx_palloc(cf->pool, sizeof(ngx_stream_upstream_hash_srv_conf_t)); + if (conf == NULL) { + return NULL; + } + + conf->points = NULL; + + return conf; +} + + +static char * +ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_str_t *value; + ngx_stream_upstream_srv_conf_t *uscf; + + value = cf->args->elts; + + if (ngx_strcmp(value[1].data, "$remote_addr")) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "unsupported hash key \"%V\", use $remote_addr", + &value[1]); + return NGX_CONF_ERROR; + } + + uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module); + + if (uscf->peer.init_upstream) { + ngx_conf_log_error(NGX_LOG_WARN, cf, 0, + "load balancing method redefined"); + } + + uscf->flags = NGX_STREAM_UPSTREAM_CREATE + |NGX_STREAM_UPSTREAM_WEIGHT + |NGX_STREAM_UPSTREAM_MAX_FAILS + |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT + |NGX_STREAM_UPSTREAM_DOWN; + + if (cf->args->nelts == 2) { + uscf->peer.init_upstream = ngx_stream_upstream_init_hash; + + } else if (ngx_strcmp(value[2].data, "consistent") == 0) { + uscf->peer.init_upstream = ngx_stream_upstream_init_chash; + + } else { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid parameter \"%V\"", &value[2]); + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} diff --git a/src/stream/ngx_stream_upstream_least_conn_module.c b/src/stream/ngx_stream_upstream_least_conn_module.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_upstream_least_conn_module.c @@ -0,0 +1,305 @@ + +/* + * Copyright (C) Maxim Dounin + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +static ngx_int_t ngx_stream_upstream_init_least_conn_peer( + ngx_stream_session_t *s, ngx_stream_upstream_srv_conf_t *us); +static ngx_int_t ngx_stream_upstream_get_least_conn_peer( + ngx_peer_connection_t *pc, void *data); +static char *ngx_stream_upstream_least_conn(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +static ngx_command_t ngx_stream_upstream_least_conn_commands[] = { + + { ngx_string("least_conn"), + NGX_STREAM_UPS_CONF|NGX_CONF_NOARGS, + ngx_stream_upstream_least_conn, + 0, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_stream_module_t ngx_stream_upstream_least_conn_module_ctx = { + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + NULL, /* create server configuration */ + NULL, /* merge server configuration */ +}; + + +ngx_module_t ngx_stream_upstream_least_conn_module = { + NGX_MODULE_V1, + &ngx_stream_upstream_least_conn_module_ctx, /* module context */ + ngx_stream_upstream_least_conn_commands, /* module directives */ + NGX_STREAM_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static ngx_int_t +ngx_stream_upstream_init_least_conn(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us) +{ + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, cf->log, 0, + "init least conn"); + + if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) { + return NGX_ERROR; + } + + us->peer.init = ngx_stream_upstream_init_least_conn_peer; + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_upstream_init_least_conn_peer(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us) +{ + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, + "init least conn peer"); + + if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) { + return NGX_ERROR; + } + + s->upstream->peer.get = ngx_stream_upstream_get_least_conn_peer; + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_upstream_get_least_conn_peer(ngx_peer_connection_t *pc, void *data) +{ + ngx_stream_upstream_rr_peer_data_t *rrp = data; + + time_t now; + uintptr_t m; + ngx_int_t rc, total; + ngx_uint_t i, n, p, many; + ngx_stream_upstream_rr_peer_t *peer, *best; + ngx_stream_upstream_rr_peers_t *peers; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "get least conn peer, try: %ui", pc->tries); + + if (rrp->peers->single) { + return ngx_stream_upstream_get_round_robin_peer(pc, rrp); + } + + pc->connection = NULL; + + now = ngx_time(); + + peers = rrp->peers; + + ngx_stream_upstream_rr_peers_wlock(peers); + + best = NULL; + total = 0; + +#if (NGX_SUPPRESS_WARN) + many = 0; + p = 0; +#endif + + for (peer = peers->peer, i = 0; + peer; + peer = peer->next, i++) + { + + n = i / (8 * sizeof(uintptr_t)); + m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t)); + + if (rrp->tried[n] & m) { + continue; + } + + if (peer->down) { + continue; + } + + if (peer->max_fails + && peer->fails >= peer->max_fails + && now - peer->checked <= peer->fail_timeout) + { + continue; + } + + /* + * select peer with least number of connections; if there are + * multiple peers with the same number of connections, select + * based on round-robin + */ + + if (best == NULL + || peer->conns * best->weight < best->conns * peer->weight) + { + best = peer; + many = 0; + p = i; + + } else if (peer->conns * best->weight == best->conns * peer->weight) { + many = 1; + } + } + + if (best == NULL) { + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "get least conn peer, no peer found"); + + goto failed; + } + + if (many) { + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "get least conn peer, many"); + + for (peer = best, i = p; + peer; + peer = peer->next, i++) + { + n = i / (8 * sizeof(uintptr_t)); + m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t)); + + if (rrp->tried[n] & m) { + continue; + } + + if (peer->down) { + continue; + } + + if (peer->conns * best->weight != best->conns * peer->weight) { + continue; + } + + if (peer->max_fails + && peer->fails >= peer->max_fails + && now - peer->checked <= peer->fail_timeout) + { + continue; + } + + peer->current_weight += peer->effective_weight; + total += peer->effective_weight; + + if (peer->effective_weight < peer->weight) { + peer->effective_weight++; + } + + if (peer->current_weight > best->current_weight) { + best = peer; + p = i; + } + } + } + + best->current_weight -= total; + + if (now - best->checked > best->fail_timeout) { + best->checked = now; + } + + pc->sockaddr = best->sockaddr; + pc->socklen = best->socklen; + pc->name = &best->name; + + best->conns++; + + rrp->current = best; + + n = p / (8 * sizeof(uintptr_t)); + m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t)); + + rrp->tried[n] |= m; + + ngx_stream_upstream_rr_peers_unlock(peers); + + return NGX_OK; + +failed: + + if (peers->next) { + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "get least conn peer, backup servers"); + + rrp->peers = peers->next; + + n = (rrp->peers->number + (8 * sizeof(uintptr_t) - 1)) + / (8 * sizeof(uintptr_t)); + + for (i = 0; i < n; i++) { + rrp->tried[i] = 0; + } + + ngx_stream_upstream_rr_peers_unlock(peers); + + rc = ngx_stream_upstream_get_least_conn_peer(pc, rrp); + + if (rc != NGX_BUSY) { + return rc; + } + + ngx_stream_upstream_rr_peers_wlock(peers); + } + + /* all peers failed, mark them as live for quick recovery */ + + for (peer = peers->peer; peer; peer = peer->next) { + peer->fails = 0; + } + + ngx_stream_upstream_rr_peers_unlock(peers); + + pc->name = peers->name; + + return NGX_BUSY; +} + + +static char * +ngx_stream_upstream_least_conn(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_stream_upstream_srv_conf_t *uscf; + + uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module); + + if (uscf->peer.init_upstream) { + ngx_conf_log_error(NGX_LOG_WARN, cf, 0, + "load balancing method redefined"); + } + + uscf->peer.init_upstream = ngx_stream_upstream_init_least_conn; + + uscf->flags = NGX_STREAM_UPSTREAM_CREATE + |NGX_STREAM_UPSTREAM_WEIGHT + |NGX_STREAM_UPSTREAM_MAX_FAILS + |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT + |NGX_STREAM_UPSTREAM_DOWN + |NGX_STREAM_UPSTREAM_BACKUP; + + return NGX_CONF_OK; +} diff --git a/src/stream/ngx_stream_upstream_round_robin.c b/src/stream/ngx_stream_upstream_round_robin.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_upstream_round_robin.c @@ -0,0 +1,697 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +#define ngx_stream_upstream_tries(p) ((p)->number \ + + ((p)->next ? (p)->next->number : 0)) + + +static ngx_stream_upstream_rr_peer_t *ngx_stream_upstream_get_peer( + ngx_stream_upstream_rr_peer_data_t *rrp); + +#if (NGX_STREAM_SSL) + +static ngx_int_t ngx_stream_upstream_set_round_robin_peer_session( + ngx_peer_connection_t *pc, void *data); +static void ngx_stream_upstream_save_round_robin_peer_session( + ngx_peer_connection_t *pc, void *data); + +#endif + + +ngx_int_t +ngx_stream_upstream_init_round_robin(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us) +{ + ngx_url_t u; + ngx_uint_t i, j, n, w; + ngx_stream_upstream_server_t *server; + ngx_stream_upstream_rr_peer_t *peer, **peerp; + ngx_stream_upstream_rr_peers_t *peers, *backup; + + us->peer.init = ngx_stream_upstream_init_round_robin_peer; + + if (us->servers) { + server = us->servers->elts; + + n = 0; + w = 0; + + for (i = 0; i < us->servers->nelts; i++) { + if (server[i].backup) { + continue; + } + + n += server[i].naddrs; + w += server[i].naddrs * server[i].weight; + } + + if (n == 0) { + ngx_log_error(NGX_LOG_EMERG, cf->log, 0, + "no servers in upstream \"%V\" in %s:%ui", + &us->host, us->file_name, us->line); + return NGX_ERROR; + } + + peers = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peers_t)); + if (peers == NULL) { + return NGX_ERROR; + } + + peer = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peer_t) * n); + if (peer == NULL) { + return NGX_ERROR; + } + + peers->single = (n == 1); + peers->number = n; + peers->weighted = (w != n); + peers->total_weight = w; + peers->name = &us->host; + + n = 0; + peerp = &peers->peer; + + for (i = 0; i < us->servers->nelts; i++) { + if (server[i].backup) { + continue; + } + + for (j = 0; j < server[i].naddrs; j++) { + peer[n].sockaddr = server[i].addrs[j].sockaddr; + peer[n].socklen = server[i].addrs[j].socklen; + peer[n].name = server[i].addrs[j].name; + peer[n].weight = server[i].weight; + peer[n].effective_weight = server[i].weight; + peer[n].current_weight = 0; + peer[n].max_fails = server[i].max_fails; + peer[n].fail_timeout = server[i].fail_timeout; + peer[n].down = server[i].down; + peer[n].server = server[i].name; + + *peerp = &peer[n]; + peerp = &peer[n].next; + n++; + } + } + + us->peer.data = peers; + + /* backup servers */ + + n = 0; + w = 0; + + for (i = 0; i < us->servers->nelts; i++) { + if (!server[i].backup) { + continue; + } + + n += server[i].naddrs; + w += server[i].naddrs * server[i].weight; + } + + if (n == 0) { + return NGX_OK; + } + + backup = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peers_t)); + if (backup == NULL) { + return NGX_ERROR; + } + + peer = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peer_t) * n); + if (peer == NULL) { + return NGX_ERROR; + } + + peers->single = 0; + backup->single = 0; + backup->number = n; + backup->weighted = (w != n); + backup->total_weight = w; + backup->name = &us->host; + + n = 0; + peerp = &backup->peer; + + for (i = 0; i < us->servers->nelts; i++) { + if (!server[i].backup) { + continue; + } + + for (j = 0; j < server[i].naddrs; j++) { + peer[n].sockaddr = server[i].addrs[j].sockaddr; + peer[n].socklen = server[i].addrs[j].socklen; + peer[n].name = server[i].addrs[j].name; + peer[n].weight = server[i].weight; + peer[n].effective_weight = server[i].weight; + peer[n].current_weight = 0; + peer[n].max_fails = server[i].max_fails; + peer[n].fail_timeout = server[i].fail_timeout; + peer[n].down = server[i].down; + peer[n].server = server[i].name; + + *peerp = &peer[n]; + peerp = &peer[n].next; + n++; + } + } + + peers->next = backup; + + return NGX_OK; + } + + + /* an upstream implicitly defined by proxy_pass, etc. */ + + if (us->port == 0) { + ngx_log_error(NGX_LOG_EMERG, cf->log, 0, + "no port in upstream \"%V\" in %s:%ui", + &us->host, us->file_name, us->line); + return NGX_ERROR; + } + + ngx_memzero(&u, sizeof(ngx_url_t)); + + u.host = us->host; + u.port = us->port; + + if (ngx_inet_resolve_host(cf->pool, &u) != NGX_OK) { + if (u.err) { + ngx_log_error(NGX_LOG_EMERG, cf->log, 0, + "%s in upstream \"%V\" in %s:%ui", + u.err, &us->host, us->file_name, us->line); + } + + return NGX_ERROR; + } + + n = u.naddrs; + + peers = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peers_t)); + if (peers == NULL) { + return NGX_ERROR; + } + + peer = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peer_t) * n); + if (peer == NULL) { + return NGX_ERROR; + } + + peers->single = (n == 1); + peers->number = n; + peers->weighted = 0; + peers->total_weight = n; + peers->name = &us->host; + + peerp = &peers->peer; + + for (i = 0; i < u.naddrs; i++) { + peer[i].sockaddr = u.addrs[i].sockaddr; + peer[i].socklen = u.addrs[i].socklen; + peer[i].name = u.addrs[i].name; + peer[i].weight = 1; + peer[i].effective_weight = 1; + peer[i].current_weight = 0; + peer[i].max_fails = 1; + peer[i].fail_timeout = 10; + *peerp = &peer[i]; + peerp = &peer[i].next; + } + + us->peer.data = peers; + + /* implicitly defined upstream has no backup servers */ + + return NGX_OK; +} + + +ngx_int_t +ngx_stream_upstream_init_round_robin_peer(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us) +{ + ngx_uint_t n; + ngx_stream_upstream_rr_peer_data_t *rrp; + + rrp = s->upstream->peer.data; + + if (rrp == NULL) { + rrp = ngx_palloc(s->connection->pool, + sizeof(ngx_stream_upstream_rr_peer_data_t)); + if (rrp == NULL) { + return NGX_ERROR; + } + + s->upstream->peer.data = rrp; + } + + rrp->peers = us->peer.data; + rrp->current = NULL; + + n = rrp->peers->number; + + if (rrp->peers->next && rrp->peers->next->number > n) { + n = rrp->peers->next->number; + } + + if (n <= 8 * sizeof(uintptr_t)) { + rrp->tried = &rrp->data; + rrp->data = 0; + + } else { + n = (n + (8 * sizeof(uintptr_t) - 1)) / (8 * sizeof(uintptr_t)); + + rrp->tried = ngx_pcalloc(s->connection->pool, n * sizeof(uintptr_t)); + if (rrp->tried == NULL) { + return NGX_ERROR; + } + } + + s->upstream->peer.get = ngx_stream_upstream_get_round_robin_peer; + s->upstream->peer.free = ngx_stream_upstream_free_round_robin_peer; + s->upstream->peer.tries = ngx_stream_upstream_tries(rrp->peers); +#if (NGX_STREAM_SSL) + s->upstream->peer.set_session = + ngx_stream_upstream_set_round_robin_peer_session; + s->upstream->peer.save_session = + ngx_stream_upstream_save_round_robin_peer_session; +#endif + + return NGX_OK; +} + + +ngx_int_t +ngx_stream_upstream_get_round_robin_peer(ngx_peer_connection_t *pc, void *data) +{ + ngx_stream_upstream_rr_peer_data_t *rrp = data; + + ngx_int_t rc; + ngx_uint_t i, n; + ngx_stream_upstream_rr_peer_t *peer; + ngx_stream_upstream_rr_peers_t *peers; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "get rr peer, try: %ui", pc->tries); + + pc->connection = NULL; + + peers = rrp->peers; + ngx_stream_upstream_rr_peers_wlock(peers); + + if (peers->single) { + peer = peers->peer; + + if (peer->down) { + goto failed; + } + + rrp->current = peer; + + } else { + + /* there are several peers */ + + peer = ngx_stream_upstream_get_peer(rrp); + + if (peer == NULL) { + goto failed; + } + + ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "get rr peer, current: %p %i", + peer, peer->current_weight); + } + + pc->sockaddr = peer->sockaddr; + pc->socklen = peer->socklen; + pc->name = &peer->name; + + peer->conns++; + + ngx_stream_upstream_rr_peers_unlock(peers); + + return NGX_OK; + +failed: + + if (peers->next) { + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "backup servers"); + + rrp->peers = peers->next; + + n = (rrp->peers->number + (8 * sizeof(uintptr_t) - 1)) + / (8 * sizeof(uintptr_t)); + + for (i = 0; i < n; i++) { + rrp->tried[i] = 0; + } + + ngx_stream_upstream_rr_peers_unlock(peers); + + rc = ngx_stream_upstream_get_round_robin_peer(pc, rrp); + + if (rc != NGX_BUSY) { + return rc; + } + + ngx_stream_upstream_rr_peers_wlock(peers); + } + + /* all peers failed, mark them as live for quick recovery */ + + for (peer = peers->peer; peer; peer = peer->next) { + peer->fails = 0; + } + + ngx_stream_upstream_rr_peers_unlock(peers); + + pc->name = peers->name; + + return NGX_BUSY; +} + + +static ngx_stream_upstream_rr_peer_t * +ngx_stream_upstream_get_peer(ngx_stream_upstream_rr_peer_data_t *rrp) +{ + time_t now; + uintptr_t m; + ngx_int_t total; + ngx_uint_t i, n, p; + ngx_stream_upstream_rr_peer_t *peer, *best; + + now = ngx_time(); + + best = NULL; + total = 0; + +#if (NGX_SUPPRESS_WARN) + p = 0; +#endif + + for (peer = rrp->peers->peer, i = 0; + peer; + peer = peer->next, i++) + { + + n = i / (8 * sizeof(uintptr_t)); + m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t)); + + if (rrp->tried[n] & m) { + continue; + } + + if (peer->down) { + continue; + } + + if (peer->max_fails + && peer->fails >= peer->max_fails + && now - peer->checked <= peer->fail_timeout) + { + continue; + } + + peer->current_weight += peer->effective_weight; + total += peer->effective_weight; + + if (peer->effective_weight < peer->weight) { + peer->effective_weight++; + } + + if (best == NULL || peer->current_weight > best->current_weight) { + best = peer; + p = i; + } + } + + if (best == NULL) { + return NULL; + } + + rrp->current = best; + + n = p / (8 * sizeof(uintptr_t)); + m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t)); + + rrp->tried[n] |= m; + + best->current_weight -= total; + + if (now - best->checked > best->fail_timeout) { + best->checked = now; + } + + return best; +} + + +void +ngx_stream_upstream_free_round_robin_peer(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state) +{ + ngx_stream_upstream_rr_peer_data_t *rrp = data; + + time_t now; + ngx_stream_upstream_rr_peer_t *peer; + + ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "free rr peer %ui %ui", pc->tries, state); + + peer = rrp->current; + + ngx_stream_upstream_rr_peers_rlock(rrp->peers); + ngx_stream_upstream_rr_peer_lock(rrp->peers, peer); + + if (rrp->peers->single) { + peer->conns--; + + ngx_stream_upstream_rr_peer_unlock(rrp->peers, peer); + ngx_stream_upstream_rr_peers_unlock(rrp->peers); + + pc->tries = 0; + return; + } + + if (state & NGX_PEER_FAILED) { + now = ngx_time(); + + peer->fails++; + peer->accessed = now; + peer->checked = now; + + if (peer->max_fails) { + peer->effective_weight -= peer->weight / peer->max_fails; + } + + ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "free rr peer failed: %p %i", + peer, peer->effective_weight); + + if (peer->effective_weight < 0) { + peer->effective_weight = 0; + } + + } else { + + /* mark peer live if check passed */ + + if (peer->accessed < peer->checked) { + peer->fails = 0; + } + } + + peer->conns--; + + ngx_stream_upstream_rr_peer_unlock(rrp->peers, peer); + ngx_stream_upstream_rr_peers_unlock(rrp->peers); + + if (pc->tries) { + pc->tries--; + } +} + + +#if (NGX_STREAM_SSL) + +static ngx_int_t +ngx_stream_upstream_set_round_robin_peer_session(ngx_peer_connection_t *pc, + void *data) +{ + ngx_stream_upstream_rr_peer_data_t *rrp = data; + + ngx_int_t rc; + ngx_ssl_session_t *ssl_session; + ngx_stream_upstream_rr_peer_t *peer; +#if (NGX_STREAM_UPSTREAM_ZONE) + int len; +#if OPENSSL_VERSION_NUMBER >= 0x0090707fL + const +#endif + u_char *p; + ngx_stream_upstream_rr_peers_t *peers; + u_char buf[NGX_SSL_MAX_SESSION_SIZE]; +#endif + + peer = rrp->current; + +#if (NGX_STREAM_UPSTREAM_ZONE) + peers = rrp->peers; + + if (peers->shpool) { + ngx_stream_upstream_rr_peers_rlock(peers); + ngx_stream_upstream_rr_peer_lock(peers, peer); + + if (peer->ssl_session == NULL) { + ngx_stream_upstream_rr_peer_unlock(peers, peer); + ngx_stream_upstream_rr_peers_unlock(peers); + return NGX_OK; + } + + len = peer->ssl_session_len; + + ngx_memcpy(buf, peer->ssl_session, len); + + ngx_stream_upstream_rr_peer_unlock(peers, peer); + ngx_stream_upstream_rr_peers_unlock(peers); + + p = buf; + ssl_session = d2i_SSL_SESSION(NULL, &p, len); + + rc = ngx_ssl_set_session(pc->connection, ssl_session); + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "set session: %p", ssl_session); + + ngx_ssl_free_session(ssl_session); + + return rc; + } +#endif + + ssl_session = peer->ssl_session; + + rc = ngx_ssl_set_session(pc->connection, ssl_session); + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "set session: %p", ssl_session); + + return rc; +} + + +static void +ngx_stream_upstream_save_round_robin_peer_session(ngx_peer_connection_t *pc, + void *data) +{ + ngx_stream_upstream_rr_peer_data_t *rrp = data; + + ngx_ssl_session_t *old_ssl_session, *ssl_session; + ngx_stream_upstream_rr_peer_t *peer; +#if (NGX_STREAM_UPSTREAM_ZONE) + int len; + u_char *p; + ngx_stream_upstream_rr_peers_t *peers; + u_char buf[NGX_SSL_MAX_SESSION_SIZE]; +#endif + +#if (NGX_STREAM_UPSTREAM_ZONE) + peers = rrp->peers; + + if (peers->shpool) { + + ssl_session = SSL_get0_session(pc->connection->ssl->connection); + + if (ssl_session == NULL) { + return; + } + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "save session: %p", ssl_session); + + len = i2d_SSL_SESSION(ssl_session, NULL); + + /* do not cache too big session */ + + if (len > NGX_SSL_MAX_SESSION_SIZE) { + return; + } + + p = buf; + (void) i2d_SSL_SESSION(ssl_session, &p); + + peer = rrp->current; + + ngx_stream_upstream_rr_peers_rlock(peers); + ngx_stream_upstream_rr_peer_lock(peers, peer); + + if (len > peer->ssl_session_len) { + ngx_shmtx_lock(&peers->shpool->mutex); + + if (peer->ssl_session) { + ngx_slab_free_locked(peers->shpool, peer->ssl_session); + } + + peer->ssl_session = ngx_slab_alloc_locked(peers->shpool, len); + + ngx_shmtx_unlock(&peers->shpool->mutex); + + if (peer->ssl_session == NULL) { + peer->ssl_session_len = 0; + + ngx_stream_upstream_rr_peer_unlock(peers, peer); + ngx_stream_upstream_rr_peers_unlock(peers); + return; + } + + peer->ssl_session_len = len; + } + + ngx_memcpy(peer->ssl_session, buf, len); + + ngx_stream_upstream_rr_peer_unlock(peers, peer); + ngx_stream_upstream_rr_peers_unlock(peers); + + return; + } +#endif + + ssl_session = ngx_ssl_get_session(pc->connection); + + if (ssl_session == NULL) { + return; + } + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "save session: %p", ssl_session); + + peer = rrp->current; + + old_ssl_session = peer->ssl_session; + peer->ssl_session = ssl_session; + + if (old_ssl_session) { + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0, + "old session: %p", old_ssl_session); + + /* TODO: may block */ + + ngx_ssl_free_session(old_ssl_session); + } +} + +#endif diff --git a/src/stream/ngx_stream_upstream_round_robin.h b/src/stream/ngx_stream_upstream_round_robin.h new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_upstream_round_robin.h @@ -0,0 +1,138 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#ifndef _NGX_STREAM_UPSTREAM_ROUND_ROBIN_H_INCLUDED_ +#define _NGX_STREAM_UPSTREAM_ROUND_ROBIN_H_INCLUDED_ + + +#include +#include +#include + + +typedef struct ngx_stream_upstream_rr_peer_s ngx_stream_upstream_rr_peer_t; + +struct ngx_stream_upstream_rr_peer_s { + struct sockaddr *sockaddr; + socklen_t socklen; + ngx_str_t name; + ngx_str_t server; + + ngx_int_t current_weight; + ngx_int_t effective_weight; + ngx_int_t weight; + + ngx_uint_t conns; + + ngx_uint_t fails; + time_t accessed; + time_t checked; + + ngx_uint_t max_fails; + time_t fail_timeout; + + ngx_uint_t down; /* unsigned down:1; */ + +#if (NGX_STREAM_SSL) + void *ssl_session; + int ssl_session_len; +#endif + + ngx_stream_upstream_rr_peer_t *next; + +#if (NGX_STREAM_UPSTREAM_ZONE) + ngx_atomic_t lock; +#endif +}; + + +typedef struct ngx_stream_upstream_rr_peers_s ngx_stream_upstream_rr_peers_t; + +struct ngx_stream_upstream_rr_peers_s { + ngx_uint_t number; + +#if (NGX_STREAM_UPSTREAM_ZONE) + ngx_slab_pool_t *shpool; + ngx_atomic_t rwlock; +#endif + + ngx_uint_t total_weight; + + unsigned single:1; + unsigned weighted:1; + + ngx_str_t *name; + + ngx_stream_upstream_rr_peers_t *next; + + ngx_stream_upstream_rr_peer_t *peer; +}; + + +#if (NGX_STREAM_UPSTREAM_ZONE) + +#define ngx_stream_upstream_rr_peers_rlock(peers) \ + \ + if (peers->shpool) { \ + ngx_rwlock_rlock(&peers->rwlock); \ + } + +#define ngx_stream_upstream_rr_peers_wlock(peers) \ + \ + if (peers->shpool) { \ + ngx_rwlock_wlock(&peers->rwlock); \ + } + +#define ngx_stream_upstream_rr_peers_unlock(peers) \ + \ + if (peers->shpool) { \ + ngx_rwlock_unlock(&peers->rwlock); \ + } + + +#define ngx_stream_upstream_rr_peer_lock(peers, peer) \ + \ + if (peers->shpool) { \ + ngx_rwlock_wlock(&peer->lock); \ + } + +#define ngx_stream_upstream_rr_peer_unlock(peers, peer) \ + \ + if (peers->shpool) { \ + ngx_rwlock_unlock(&peer->lock); \ + } + +#else + +#define ngx_stream_upstream_rr_peers_rlock(peers) +#define ngx_stream_upstream_rr_peers_wlock(peers) +#define ngx_stream_upstream_rr_peers_unlock(peers) +#define ngx_stream_upstream_rr_peer_lock(peers, peer) +#define ngx_stream_upstream_rr_peer_unlock(peers, peer) + +#endif + + +typedef struct { + ngx_stream_upstream_rr_peers_t *peers; + ngx_stream_upstream_rr_peer_t *current; + uintptr_t *tried; + uintptr_t data; +} ngx_stream_upstream_rr_peer_data_t; + + +ngx_int_t ngx_stream_upstream_init_round_robin(ngx_conf_t *cf, + ngx_stream_upstream_srv_conf_t *us); +ngx_int_t ngx_stream_upstream_init_round_robin_peer(ngx_stream_session_t *s, + ngx_stream_upstream_srv_conf_t *us); +ngx_int_t ngx_stream_upstream_get_round_robin_peer(ngx_peer_connection_t *pc, + void *data); +void ngx_stream_upstream_free_round_robin_peer(ngx_peer_connection_t *pc, + void *data, ngx_uint_t state); + + +#endif /* _NGX_STREAM_UPSTREAM_ROUND_ROBIN_H_INCLUDED_ */ diff --git a/src/stream/ngx_stream_upstream_zone_module.c b/src/stream/ngx_stream_upstream_zone_module.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_upstream_zone_module.c @@ -0,0 +1,207 @@ + +/* + * Copyright (C) Ruslan Ermilov + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +static char *ngx_stream_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); +static ngx_int_t ngx_stream_upstream_init_zone(ngx_shm_zone_t *shm_zone, + void *data); + + +static ngx_command_t ngx_stream_upstream_zone_commands[] = { + + { ngx_string("zone"), + NGX_STREAM_UPS_CONF|NGX_CONF_TAKE2, + ngx_stream_upstream_zone, + 0, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_stream_module_t ngx_stream_upstream_zone_module_ctx = { + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + NULL, /* create server configuration */ + NULL, /* merge server configuration */ +}; + + +ngx_module_t ngx_stream_upstream_zone_module = { + NGX_MODULE_V1, + &ngx_stream_upstream_zone_module_ctx, /* module context */ + ngx_stream_upstream_zone_commands, /* module directives */ + NGX_STREAM_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static char * +ngx_stream_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ssize_t size; + ngx_str_t *value; + ngx_stream_upstream_srv_conf_t *uscf; + + uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module); + + value = cf->args->elts; + + if (!value[1].len) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid zone name \"%V\"", &value[1]); + return NGX_CONF_ERROR; + } + + size = ngx_parse_size(&value[2]); + + if (size == NGX_ERROR) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid zone size \"%V\"", &value[2]); + return NGX_CONF_ERROR; + } + + if (size < (ssize_t) (8 * ngx_pagesize)) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "zone \"%V\" is too small", &value[1]); + return NGX_CONF_ERROR; + } + + uscf->shm_zone = ngx_shared_memory_add(cf, &value[1], size, + &ngx_stream_upstream_module); + if (uscf->shm_zone == NULL) { + return NGX_CONF_ERROR; + } + + if (uscf->shm_zone->data) { + uscf = uscf->shm_zone->data; + + ngx_conf_log_error(NGX_LOG_WARN, cf, 0, + "upstream \"%V\" in %s:%ui " + "is already bound to zone \"%V\"", + &uscf->host, uscf->file_name, uscf->line, + &value[1]); + return NGX_CONF_ERROR; + } + + uscf->shm_zone->init = ngx_stream_upstream_init_zone; + uscf->shm_zone->data = uscf; + + uscf->shm_zone->noreuse = 1; + + return NGX_CONF_OK; +} + + +static ngx_int_t +ngx_stream_upstream_init_zone(ngx_shm_zone_t *shm_zone, void *data) +{ + ngx_stream_upstream_srv_conf_t *ouscf = data; + + size_t len; + ngx_slab_pool_t *shpool; + ngx_stream_upstream_rr_peer_t *peer, **peerp; + ngx_stream_upstream_rr_peers_t *peers, *backup; + ngx_stream_upstream_srv_conf_t *uscf; + + uscf = shm_zone->data; + + if (ouscf) { + ngx_log_error(NGX_LOG_EMERG, shm_zone->shm.log, 0, + "zone \"%V\" cannot be reused", &shm_zone->shm.name); + return NGX_ERROR; + } + + shpool = (ngx_slab_pool_t *) shm_zone->shm.addr; + + if (shm_zone->shm.exists) { + return NGX_ERROR; + } + + + /* copy peers to shared memory */ + + len = sizeof(" in upstream zone \"\"") + shm_zone->shm.name.len; + + shpool->log_ctx = ngx_slab_alloc(shpool, len); + if (shpool->log_ctx == NULL) { + return NGX_ERROR; + } + + ngx_sprintf(shpool->log_ctx, " in upstream zone \"%V\"%Z", + &shm_zone->shm.name); + + peers = ngx_slab_alloc(shpool, sizeof(ngx_stream_upstream_rr_peers_t)); + if (peers == NULL) { + return NGX_ERROR; + } + + ngx_memcpy(peers, uscf->peer.data, sizeof(ngx_stream_upstream_rr_peers_t)); + + peers->shpool = shpool; + + for (peerp = &peers->peer; *peerp; peerp = &peer->next) { + /* pool is unlocked */ + peer = ngx_slab_calloc_locked(shpool, + sizeof(ngx_stream_upstream_rr_peer_t)); + if (peer == NULL) { + return NGX_ERROR; + } + + ngx_memcpy(peer, *peerp, sizeof(ngx_stream_upstream_rr_peer_t)); + + *peerp = peer; + } + + if (peers->next == NULL) { + goto done; + } + + backup = ngx_slab_alloc(shpool, sizeof(ngx_stream_upstream_rr_peers_t)); + if (backup == NULL) { + return NGX_ERROR; + } + + ngx_memcpy(backup, peers->next, sizeof(ngx_stream_upstream_rr_peers_t)); + + backup->shpool = shpool; + + for (peerp = &backup->peer; *peerp; peerp = &peer->next) { + /* pool is unlocked */ + peer = ngx_slab_calloc_locked(shpool, + sizeof(ngx_stream_upstream_rr_peer_t)); + if (peer == NULL) { + return NGX_ERROR; + } + + ngx_memcpy(peer, *peerp, sizeof(ngx_stream_upstream_rr_peer_t)); + + *peerp = peer; + } + + peers->next = backup; + +done: + + uscf->peer.data = peers; + + return NGX_OK; +}