Mercurial > hg > nginx
view src/stream/ngx_stream.c @ 6692:56fc55e32f23
Stream: filters.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Thu, 15 Sep 2016 14:55:46 +0300 |
parents | 7357abd1fa8c |
children | 3908156a51fa |
line wrap: on
line source
/* * Copyright (C) Roman Arutyunyan * Copyright (C) Nginx, Inc. */ #include <ngx_config.h> #include <ngx_core.h> #include <ngx_event.h> #include <ngx_stream.h> static char *ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static ngx_int_t ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports, ngx_stream_listen_t *listen); static char *ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports); static ngx_int_t ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr); #if (NGX_HAVE_INET6) static ngx_int_t ngx_stream_add_addrs6(ngx_conf_t *cf, ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr); #endif static ngx_int_t ngx_stream_cmp_conf_addrs(const void *one, const void *two); ngx_uint_t ngx_stream_max_module; ngx_stream_filter_pt ngx_stream_top_filter; static ngx_command_t ngx_stream_commands[] = { { ngx_string("stream"), NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, ngx_stream_block, 0, 0, NULL }, ngx_null_command }; static ngx_core_module_t ngx_stream_module_ctx = { ngx_string("stream"), NULL, NULL }; ngx_module_t ngx_stream_module = { NGX_MODULE_V1, &ngx_stream_module_ctx, /* module context */ ngx_stream_commands, /* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; static char * ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { char *rv; ngx_uint_t i, m, mi, s; ngx_conf_t pcf; ngx_array_t ports; ngx_stream_listen_t *listen; ngx_stream_module_t *module; ngx_stream_conf_ctx_t *ctx; ngx_stream_core_srv_conf_t **cscfp; ngx_stream_core_main_conf_t *cmcf; if (*(ngx_stream_conf_ctx_t **) conf) { return "is duplicate"; } /* the main stream context */ ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t)); if (ctx == NULL) { return NGX_CONF_ERROR; } *(ngx_stream_conf_ctx_t **) conf = ctx; /* count the number of the stream modules and set up their indices */ ngx_stream_max_module = ngx_count_modules(cf->cycle, NGX_STREAM_MODULE); /* the stream main_conf context, it's the same in the all stream contexts */ ctx->main_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_stream_max_module); if (ctx->main_conf == NULL) { return NGX_CONF_ERROR; } /* * the stream null srv_conf context, it is used to merge * the server{}s' srv_conf's */ ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_stream_max_module); if (ctx->srv_conf == NULL) { return NGX_CONF_ERROR; } /* * create the main_conf's and the null srv_conf's of the all stream modules */ for (m = 0; cf->cycle->modules[m]; m++) { if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) { continue; } module = cf->cycle->modules[m]->ctx; mi = cf->cycle->modules[m]->ctx_index; if (module->create_main_conf) { ctx->main_conf[mi] = module->create_main_conf(cf); if (ctx->main_conf[mi] == NULL) { return NGX_CONF_ERROR; } } if (module->create_srv_conf) { ctx->srv_conf[mi] = module->create_srv_conf(cf); if (ctx->srv_conf[mi] == NULL) { return NGX_CONF_ERROR; } } } pcf = *cf; cf->ctx = ctx; for (m = 0; cf->cycle->modules[m]; m++) { if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) { continue; } module = cf->cycle->modules[m]->ctx; if (module->preconfiguration) { if (module->preconfiguration(cf) != NGX_OK) { return NGX_CONF_ERROR; } } } /* parse inside the stream{} block */ cf->module_type = NGX_STREAM_MODULE; cf->cmd_type = NGX_STREAM_MAIN_CONF; rv = ngx_conf_parse(cf, NULL); if (rv != NGX_CONF_OK) { *cf = pcf; return rv; } /* init stream{} main_conf's, merge the server{}s' srv_conf's */ cmcf = ctx->main_conf[ngx_stream_core_module.ctx_index]; cscfp = cmcf->servers.elts; for (m = 0; cf->cycle->modules[m]; m++) { if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) { continue; } module = cf->cycle->modules[m]->ctx; mi = cf->cycle->modules[m]->ctx_index; /* init stream{} main_conf's */ cf->ctx = ctx; if (module->init_main_conf) { rv = module->init_main_conf(cf, ctx->main_conf[mi]); if (rv != NGX_CONF_OK) { *cf = pcf; return rv; } } for (s = 0; s < cmcf->servers.nelts; s++) { /* merge the server{}s' srv_conf's */ cf->ctx = cscfp[s]->ctx; if (module->merge_srv_conf) { rv = module->merge_srv_conf(cf, ctx->srv_conf[mi], cscfp[s]->ctx->srv_conf[mi]); if (rv != NGX_CONF_OK) { *cf = pcf; return rv; } } } } for (m = 0; cf->cycle->modules[m]; m++) { if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) { continue; } module = cf->cycle->modules[m]->ctx; if (module->postconfiguration) { if (module->postconfiguration(cf) != NGX_OK) { return NGX_CONF_ERROR; } } } if (ngx_stream_variables_init_vars(cf) != NGX_OK) { return NGX_CONF_ERROR; } *cf = pcf; if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_stream_conf_port_t)) != NGX_OK) { return NGX_CONF_ERROR; } listen = cmcf->listen.elts; for (i = 0; i < cmcf->listen.nelts; i++) { if (ngx_stream_add_ports(cf, &ports, &listen[i]) != NGX_OK) { return NGX_CONF_ERROR; } } return ngx_stream_optimize_servers(cf, &ports); } static ngx_int_t ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports, ngx_stream_listen_t *listen) { in_port_t p; ngx_uint_t i; struct sockaddr *sa; ngx_stream_conf_port_t *port; ngx_stream_conf_addr_t *addr; sa = &listen->sockaddr.sockaddr; p = ngx_inet_get_port(sa); port = ports->elts; for (i = 0; i < ports->nelts; i++) { if (p == port[i].port && listen->type == port[i].type && sa->sa_family == port[i].family) { /* a port is already in the port list */ port = &port[i]; goto found; } } /* add a port to the port list */ port = ngx_array_push(ports); if (port == NULL) { return NGX_ERROR; } port->family = sa->sa_family; port->type = listen->type; port->port = p; if (ngx_array_init(&port->addrs, cf->temp_pool, 2, sizeof(ngx_stream_conf_addr_t)) != NGX_OK) { return NGX_ERROR; } found: addr = ngx_array_push(&port->addrs); if (addr == NULL) { return NGX_ERROR; } addr->opt = *listen; return NGX_OK; } static char * ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports) { ngx_uint_t i, p, last, bind_wildcard; ngx_listening_t *ls; ngx_stream_port_t *stport; ngx_stream_conf_port_t *port; ngx_stream_conf_addr_t *addr; ngx_stream_core_srv_conf_t *cscf; port = ports->elts; for (p = 0; p < ports->nelts; p++) { ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts, sizeof(ngx_stream_conf_addr_t), ngx_stream_cmp_conf_addrs); addr = port[p].addrs.elts; last = port[p].addrs.nelts; /* * if there is the binding to the "*:port" then we need to bind() * to the "*:port" only and ignore the other bindings */ if (addr[last - 1].opt.wildcard) { addr[last - 1].opt.bind = 1; bind_wildcard = 1; } else { bind_wildcard = 0; } i = 0; while (i < last) { if (bind_wildcard && !addr[i].opt.bind) { i++; continue; } ls = ngx_create_listening(cf, &addr[i].opt.sockaddr.sockaddr, addr[i].opt.socklen); if (ls == NULL) { return NGX_CONF_ERROR; } ls->addr_ntop = 1; ls->handler = ngx_stream_init_connection; ls->pool_size = 256; ls->type = addr[i].opt.type; cscf = addr->opt.ctx->srv_conf[ngx_stream_core_module.ctx_index]; ls->logp = cscf->error_log; ls->log.data = &ls->addr_text; ls->log.handler = ngx_accept_log_error; ls->backlog = addr[i].opt.backlog; ls->wildcard = addr[i].opt.wildcard; ls->keepalive = addr[i].opt.so_keepalive; #if (NGX_HAVE_KEEPALIVE_TUNABLE) ls->keepidle = addr[i].opt.tcp_keepidle; ls->keepintvl = addr[i].opt.tcp_keepintvl; ls->keepcnt = addr[i].opt.tcp_keepcnt; #endif #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) ls->ipv6only = addr[i].opt.ipv6only; #endif #if (NGX_HAVE_REUSEPORT) ls->reuseport = addr[i].opt.reuseport; #endif stport = ngx_palloc(cf->pool, sizeof(ngx_stream_port_t)); if (stport == NULL) { return NGX_CONF_ERROR; } ls->servers = stport; stport->naddrs = i + 1; switch (ls->sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: if (ngx_stream_add_addrs6(cf, stport, addr) != NGX_OK) { return NGX_CONF_ERROR; } break; #endif default: /* AF_INET */ if (ngx_stream_add_addrs(cf, stport, addr) != NGX_OK) { return NGX_CONF_ERROR; } break; } if (ngx_clone_listening(cf, ls) != NGX_OK) { return NGX_CONF_ERROR; } addr++; last--; } } return NGX_CONF_OK; } static ngx_int_t ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr) { u_char *p; size_t len; ngx_uint_t i; struct sockaddr_in *sin; ngx_stream_in_addr_t *addrs; u_char buf[NGX_SOCKADDR_STRLEN]; stport->addrs = ngx_pcalloc(cf->pool, stport->naddrs * sizeof(ngx_stream_in_addr_t)); if (stport->addrs == NULL) { return NGX_ERROR; } addrs = stport->addrs; for (i = 0; i < stport->naddrs; i++) { sin = &addr[i].opt.sockaddr.sockaddr_in; addrs[i].addr = sin->sin_addr.s_addr; addrs[i].conf.ctx = addr[i].opt.ctx; #if (NGX_STREAM_SSL) addrs[i].conf.ssl = addr[i].opt.ssl; #endif addrs[i].conf.proxy_protocol = addr[i].opt.proxy_protocol; len = ngx_sock_ntop(&addr[i].opt.sockaddr.sockaddr, addr[i].opt.socklen, buf, NGX_SOCKADDR_STRLEN, 1); p = ngx_pnalloc(cf->pool, len); if (p == NULL) { return NGX_ERROR; } ngx_memcpy(p, buf, len); addrs[i].conf.addr_text.len = len; addrs[i].conf.addr_text.data = p; } return NGX_OK; } #if (NGX_HAVE_INET6) static ngx_int_t ngx_stream_add_addrs6(ngx_conf_t *cf, ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr) { u_char *p; size_t len; ngx_uint_t i; struct sockaddr_in6 *sin6; ngx_stream_in6_addr_t *addrs6; u_char buf[NGX_SOCKADDR_STRLEN]; stport->addrs = ngx_pcalloc(cf->pool, stport->naddrs * sizeof(ngx_stream_in6_addr_t)); if (stport->addrs == NULL) { return NGX_ERROR; } addrs6 = stport->addrs; for (i = 0; i < stport->naddrs; i++) { sin6 = &addr[i].opt.sockaddr.sockaddr_in6; addrs6[i].addr6 = sin6->sin6_addr; addrs6[i].conf.ctx = addr[i].opt.ctx; #if (NGX_STREAM_SSL) addrs6[i].conf.ssl = addr[i].opt.ssl; #endif addrs6[i].conf.proxy_protocol = addr[i].opt.proxy_protocol; len = ngx_sock_ntop(&addr[i].opt.sockaddr.sockaddr, addr[i].opt.socklen, buf, NGX_SOCKADDR_STRLEN, 1); p = ngx_pnalloc(cf->pool, len); if (p == NULL) { return NGX_ERROR; } ngx_memcpy(p, buf, len); addrs6[i].conf.addr_text.len = len; addrs6[i].conf.addr_text.data = p; } return NGX_OK; } #endif static ngx_int_t ngx_stream_cmp_conf_addrs(const void *one, const void *two) { ngx_stream_conf_addr_t *first, *second; first = (ngx_stream_conf_addr_t *) one; second = (ngx_stream_conf_addr_t *) two; if (first->opt.wildcard) { /* a wildcard must be the last resort, shift it to the end */ return 1; } if (second->opt.wildcard) { /* a wildcard must be the last resort, shift it to the end */ return -1; } if (first->opt.bind && !second->opt.bind) { /* shift explicit bind()ed addresses to the start */ return -1; } if (!first->opt.bind && second->opt.bind) { /* shift explicit bind()ed addresses to the start */ return 1; } /* do not sort by default */ return 0; }