Mercurial > hg > nginx
view src/stream/ngx_stream.c @ 7098:7bfbf73db920
Stream: relaxed next upstream condition (ticket #1317).
When switching to a next upstream, some buffers could be stuck in the middle
of the filter chain. A condition existed that raised an error when this
happened. As it turned out, this condition prevented switching to a next
upstream if ssl preread was used with the TCP protocol (see the ticket).
In fact, the condition does not make sense for TCP, since after successful
connection to an upstream switching to another upstream never happens. As for
UDP, the issue with stuck buffers is unlikely to happen, but is still possible.
Specifically, if a filter delays sending data to upstream.
The condition can be relaxed to only check the "buffered" bitmask of the
upstream connection. The new condition is simpler and fixes the ticket issue
as well. Additionally, the upstream_out chain is now reset for UDP prior to
connecting to a new upstream to prevent repeating the client data twice.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Mon, 11 Sep 2017 15:32:31 +0300 |
parents | d7ce41bdf050 |
children | e7b2b907c0f8 |
line wrap: on
line source
/* * Copyright (C) Roman Arutyunyan * Copyright (C) Nginx, Inc. */ #include <ngx_config.h> #include <ngx_core.h> #include <ngx_event.h> #include <ngx_stream.h> static char *ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static ngx_int_t ngx_stream_init_phases(ngx_conf_t *cf, ngx_stream_core_main_conf_t *cmcf); static ngx_int_t ngx_stream_init_phase_handlers(ngx_conf_t *cf, ngx_stream_core_main_conf_t *cmcf); static ngx_int_t ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports, ngx_stream_listen_t *listen); static char *ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports); static ngx_int_t ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr); #if (NGX_HAVE_INET6) static ngx_int_t ngx_stream_add_addrs6(ngx_conf_t *cf, ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr); #endif static ngx_int_t ngx_stream_cmp_conf_addrs(const void *one, const void *two); ngx_uint_t ngx_stream_max_module; ngx_stream_filter_pt ngx_stream_top_filter; static ngx_command_t ngx_stream_commands[] = { { ngx_string("stream"), NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, ngx_stream_block, 0, 0, NULL }, ngx_null_command }; static ngx_core_module_t ngx_stream_module_ctx = { ngx_string("stream"), NULL, NULL }; ngx_module_t ngx_stream_module = { NGX_MODULE_V1, &ngx_stream_module_ctx, /* module context */ ngx_stream_commands, /* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; static char * ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { char *rv; ngx_uint_t i, m, mi, s; ngx_conf_t pcf; ngx_array_t ports; ngx_stream_listen_t *listen; ngx_stream_module_t *module; ngx_stream_conf_ctx_t *ctx; ngx_stream_core_srv_conf_t **cscfp; ngx_stream_core_main_conf_t *cmcf; if (*(ngx_stream_conf_ctx_t **) conf) { return "is duplicate"; } /* the main stream context */ ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t)); if (ctx == NULL) { return NGX_CONF_ERROR; } *(ngx_stream_conf_ctx_t **) conf = ctx; /* count the number of the stream modules and set up their indices */ ngx_stream_max_module = ngx_count_modules(cf->cycle, NGX_STREAM_MODULE); /* the stream main_conf context, it's the same in the all stream contexts */ ctx->main_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_stream_max_module); if (ctx->main_conf == NULL) { return NGX_CONF_ERROR; } /* * the stream null srv_conf context, it is used to merge * the server{}s' srv_conf's */ ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_stream_max_module); if (ctx->srv_conf == NULL) { return NGX_CONF_ERROR; } /* * create the main_conf's and the null srv_conf's of the all stream modules */ for (m = 0; cf->cycle->modules[m]; m++) { if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) { continue; } module = cf->cycle->modules[m]->ctx; mi = cf->cycle->modules[m]->ctx_index; if (module->create_main_conf) { ctx->main_conf[mi] = module->create_main_conf(cf); if (ctx->main_conf[mi] == NULL) { return NGX_CONF_ERROR; } } if (module->create_srv_conf) { ctx->srv_conf[mi] = module->create_srv_conf(cf); if (ctx->srv_conf[mi] == NULL) { return NGX_CONF_ERROR; } } } pcf = *cf; cf->ctx = ctx; for (m = 0; cf->cycle->modules[m]; m++) { if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) { continue; } module = cf->cycle->modules[m]->ctx; if (module->preconfiguration) { if (module->preconfiguration(cf) != NGX_OK) { return NGX_CONF_ERROR; } } } /* parse inside the stream{} block */ cf->module_type = NGX_STREAM_MODULE; cf->cmd_type = NGX_STREAM_MAIN_CONF; rv = ngx_conf_parse(cf, NULL); if (rv != NGX_CONF_OK) { *cf = pcf; return rv; } /* init stream{} main_conf's, merge the server{}s' srv_conf's */ cmcf = ctx->main_conf[ngx_stream_core_module.ctx_index]; cscfp = cmcf->servers.elts; for (m = 0; cf->cycle->modules[m]; m++) { if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) { continue; } module = cf->cycle->modules[m]->ctx; mi = cf->cycle->modules[m]->ctx_index; /* init stream{} main_conf's */ cf->ctx = ctx; if (module->init_main_conf) { rv = module->init_main_conf(cf, ctx->main_conf[mi]); if (rv != NGX_CONF_OK) { *cf = pcf; return rv; } } for (s = 0; s < cmcf->servers.nelts; s++) { /* merge the server{}s' srv_conf's */ cf->ctx = cscfp[s]->ctx; if (module->merge_srv_conf) { rv = module->merge_srv_conf(cf, ctx->srv_conf[mi], cscfp[s]->ctx->srv_conf[mi]); if (rv != NGX_CONF_OK) { *cf = pcf; return rv; } } } } if (ngx_stream_init_phases(cf, cmcf) != NGX_OK) { return NGX_CONF_ERROR; } for (m = 0; cf->cycle->modules[m]; m++) { if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) { continue; } module = cf->cycle->modules[m]->ctx; if (module->postconfiguration) { if (module->postconfiguration(cf) != NGX_OK) { return NGX_CONF_ERROR; } } } if (ngx_stream_variables_init_vars(cf) != NGX_OK) { return NGX_CONF_ERROR; } *cf = pcf; if (ngx_stream_init_phase_handlers(cf, cmcf) != NGX_OK) { return NGX_CONF_ERROR; } if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_stream_conf_port_t)) != NGX_OK) { return NGX_CONF_ERROR; } listen = cmcf->listen.elts; for (i = 0; i < cmcf->listen.nelts; i++) { if (ngx_stream_add_ports(cf, &ports, &listen[i]) != NGX_OK) { return NGX_CONF_ERROR; } } return ngx_stream_optimize_servers(cf, &ports); } static ngx_int_t ngx_stream_init_phases(ngx_conf_t *cf, ngx_stream_core_main_conf_t *cmcf) { if (ngx_array_init(&cmcf->phases[NGX_STREAM_POST_ACCEPT_PHASE].handlers, cf->pool, 1, sizeof(ngx_stream_handler_pt)) != NGX_OK) { return NGX_ERROR; } if (ngx_array_init(&cmcf->phases[NGX_STREAM_PREACCESS_PHASE].handlers, cf->pool, 1, sizeof(ngx_stream_handler_pt)) != NGX_OK) { return NGX_ERROR; } if (ngx_array_init(&cmcf->phases[NGX_STREAM_ACCESS_PHASE].handlers, cf->pool, 1, sizeof(ngx_stream_handler_pt)) != NGX_OK) { return NGX_ERROR; } if (ngx_array_init(&cmcf->phases[NGX_STREAM_SSL_PHASE].handlers, cf->pool, 1, sizeof(ngx_stream_handler_pt)) != NGX_OK) { return NGX_ERROR; } if (ngx_array_init(&cmcf->phases[NGX_STREAM_PREREAD_PHASE].handlers, cf->pool, 1, sizeof(ngx_stream_handler_pt)) != NGX_OK) { return NGX_ERROR; } if (ngx_array_init(&cmcf->phases[NGX_STREAM_LOG_PHASE].handlers, cf->pool, 1, sizeof(ngx_stream_handler_pt)) != NGX_OK) { return NGX_ERROR; } return NGX_OK; } static ngx_int_t ngx_stream_init_phase_handlers(ngx_conf_t *cf, ngx_stream_core_main_conf_t *cmcf) { ngx_int_t j; ngx_uint_t i, n; ngx_stream_handler_pt *h; ngx_stream_phase_handler_t *ph; ngx_stream_phase_handler_pt checker; n = 1 /* content phase */; for (i = 0; i < NGX_STREAM_LOG_PHASE; i++) { n += cmcf->phases[i].handlers.nelts; } ph = ngx_pcalloc(cf->pool, n * sizeof(ngx_stream_phase_handler_t) + sizeof(void *)); if (ph == NULL) { return NGX_ERROR; } cmcf->phase_engine.handlers = ph; n = 0; for (i = 0; i < NGX_STREAM_LOG_PHASE; i++) { h = cmcf->phases[i].handlers.elts; switch (i) { case NGX_STREAM_PREREAD_PHASE: checker = ngx_stream_core_preread_phase; break; case NGX_STREAM_CONTENT_PHASE: ph->checker = ngx_stream_core_content_phase; n++; ph++; continue; default: checker = ngx_stream_core_generic_phase; } n += cmcf->phases[i].handlers.nelts; for (j = cmcf->phases[i].handlers.nelts - 1; j >= 0; j--) { ph->checker = checker; ph->handler = h[j]; ph->next = n; ph++; } } return NGX_OK; } static ngx_int_t ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports, ngx_stream_listen_t *listen) { in_port_t p; ngx_uint_t i; struct sockaddr *sa; ngx_stream_conf_port_t *port; ngx_stream_conf_addr_t *addr; sa = &listen->sockaddr.sockaddr; p = ngx_inet_get_port(sa); port = ports->elts; for (i = 0; i < ports->nelts; i++) { if (p == port[i].port && listen->type == port[i].type && sa->sa_family == port[i].family) { /* a port is already in the port list */ port = &port[i]; goto found; } } /* add a port to the port list */ port = ngx_array_push(ports); if (port == NULL) { return NGX_ERROR; } port->family = sa->sa_family; port->type = listen->type; port->port = p; if (ngx_array_init(&port->addrs, cf->temp_pool, 2, sizeof(ngx_stream_conf_addr_t)) != NGX_OK) { return NGX_ERROR; } found: addr = ngx_array_push(&port->addrs); if (addr == NULL) { return NGX_ERROR; } addr->opt = *listen; return NGX_OK; } static char * ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports) { ngx_uint_t i, p, last, bind_wildcard; ngx_listening_t *ls; ngx_stream_port_t *stport; ngx_stream_conf_port_t *port; ngx_stream_conf_addr_t *addr; ngx_stream_core_srv_conf_t *cscf; port = ports->elts; for (p = 0; p < ports->nelts; p++) { ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts, sizeof(ngx_stream_conf_addr_t), ngx_stream_cmp_conf_addrs); addr = port[p].addrs.elts; last = port[p].addrs.nelts; /* * if there is the binding to the "*:port" then we need to bind() * to the "*:port" only and ignore the other bindings */ if (addr[last - 1].opt.wildcard) { addr[last - 1].opt.bind = 1; bind_wildcard = 1; } else { bind_wildcard = 0; } i = 0; while (i < last) { if (bind_wildcard && !addr[i].opt.bind) { i++; continue; } ls = ngx_create_listening(cf, &addr[i].opt.sockaddr.sockaddr, addr[i].opt.socklen); if (ls == NULL) { return NGX_CONF_ERROR; } ls->addr_ntop = 1; ls->handler = ngx_stream_init_connection; ls->pool_size = 256; ls->type = addr[i].opt.type; cscf = addr->opt.ctx->srv_conf[ngx_stream_core_module.ctx_index]; ls->logp = cscf->error_log; ls->log.data = &ls->addr_text; ls->log.handler = ngx_accept_log_error; ls->backlog = addr[i].opt.backlog; ls->rcvbuf = addr[i].opt.rcvbuf; ls->sndbuf = addr[i].opt.sndbuf; ls->wildcard = addr[i].opt.wildcard; ls->keepalive = addr[i].opt.so_keepalive; #if (NGX_HAVE_KEEPALIVE_TUNABLE) ls->keepidle = addr[i].opt.tcp_keepidle; ls->keepintvl = addr[i].opt.tcp_keepintvl; ls->keepcnt = addr[i].opt.tcp_keepcnt; #endif #if (NGX_HAVE_INET6) ls->ipv6only = addr[i].opt.ipv6only; #endif #if (NGX_HAVE_REUSEPORT) ls->reuseport = addr[i].opt.reuseport; #endif stport = ngx_palloc(cf->pool, sizeof(ngx_stream_port_t)); if (stport == NULL) { return NGX_CONF_ERROR; } ls->servers = stport; stport->naddrs = i + 1; switch (ls->sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: if (ngx_stream_add_addrs6(cf, stport, addr) != NGX_OK) { return NGX_CONF_ERROR; } break; #endif default: /* AF_INET */ if (ngx_stream_add_addrs(cf, stport, addr) != NGX_OK) { return NGX_CONF_ERROR; } break; } if (ngx_clone_listening(cf, ls) != NGX_OK) { return NGX_CONF_ERROR; } addr++; last--; } } return NGX_CONF_OK; } static ngx_int_t ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr) { u_char *p; size_t len; ngx_uint_t i; struct sockaddr_in *sin; ngx_stream_in_addr_t *addrs; u_char buf[NGX_SOCKADDR_STRLEN]; stport->addrs = ngx_pcalloc(cf->pool, stport->naddrs * sizeof(ngx_stream_in_addr_t)); if (stport->addrs == NULL) { return NGX_ERROR; } addrs = stport->addrs; for (i = 0; i < stport->naddrs; i++) { sin = &addr[i].opt.sockaddr.sockaddr_in; addrs[i].addr = sin->sin_addr.s_addr; addrs[i].conf.ctx = addr[i].opt.ctx; #if (NGX_STREAM_SSL) addrs[i].conf.ssl = addr[i].opt.ssl; #endif addrs[i].conf.proxy_protocol = addr[i].opt.proxy_protocol; len = ngx_sock_ntop(&addr[i].opt.sockaddr.sockaddr, addr[i].opt.socklen, buf, NGX_SOCKADDR_STRLEN, 1); p = ngx_pnalloc(cf->pool, len); if (p == NULL) { return NGX_ERROR; } ngx_memcpy(p, buf, len); addrs[i].conf.addr_text.len = len; addrs[i].conf.addr_text.data = p; } return NGX_OK; } #if (NGX_HAVE_INET6) static ngx_int_t ngx_stream_add_addrs6(ngx_conf_t *cf, ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr) { u_char *p; size_t len; ngx_uint_t i; struct sockaddr_in6 *sin6; ngx_stream_in6_addr_t *addrs6; u_char buf[NGX_SOCKADDR_STRLEN]; stport->addrs = ngx_pcalloc(cf->pool, stport->naddrs * sizeof(ngx_stream_in6_addr_t)); if (stport->addrs == NULL) { return NGX_ERROR; } addrs6 = stport->addrs; for (i = 0; i < stport->naddrs; i++) { sin6 = &addr[i].opt.sockaddr.sockaddr_in6; addrs6[i].addr6 = sin6->sin6_addr; addrs6[i].conf.ctx = addr[i].opt.ctx; #if (NGX_STREAM_SSL) addrs6[i].conf.ssl = addr[i].opt.ssl; #endif addrs6[i].conf.proxy_protocol = addr[i].opt.proxy_protocol; len = ngx_sock_ntop(&addr[i].opt.sockaddr.sockaddr, addr[i].opt.socklen, buf, NGX_SOCKADDR_STRLEN, 1); p = ngx_pnalloc(cf->pool, len); if (p == NULL) { return NGX_ERROR; } ngx_memcpy(p, buf, len); addrs6[i].conf.addr_text.len = len; addrs6[i].conf.addr_text.data = p; } return NGX_OK; } #endif static ngx_int_t ngx_stream_cmp_conf_addrs(const void *one, const void *two) { ngx_stream_conf_addr_t *first, *second; first = (ngx_stream_conf_addr_t *) one; second = (ngx_stream_conf_addr_t *) two; if (first->opt.wildcard) { /* a wildcard must be the last resort, shift it to the end */ return 1; } if (second->opt.wildcard) { /* a wildcard must be the last resort, shift it to the end */ return -1; } if (first->opt.bind && !second->opt.bind) { /* shift explicit bind()ed addresses to the start */ return -1; } if (!first->opt.bind && second->opt.bind) { /* shift explicit bind()ed addresses to the start */ return 1; } /* do not sort by default */ return 0; }