# HG changeset patch # User Maxim Dounin # Date 1316114898 0 # Node ID b2e21a39307c68e220fd024ada6e9cb3b311fb34 # Parent bba9a5ccc6cd7466a9717f5855c0f26ed48642ad Upstream keepalive module. diff --git a/auto/modules b/auto/modules --- a/auto/modules +++ b/auto/modules @@ -344,6 +344,11 @@ if [ $HTTP_UPSTREAM_IP_HASH = YES ]; the HTTP_SRCS="$HTTP_SRCS $HTTP_UPSTREAM_IP_HASH_SRCS" fi +if [ $HTTP_UPSTREAM_KEEPALIVE = YES ]; then + HTTP_MODULES="$HTTP_MODULES $HTTP_UPSTREAM_KEEPALIVE_MODULE" + HTTP_SRCS="$HTTP_SRCS $HTTP_UPSTREAM_KEEPALIVE_SRCS" +fi + if [ $HTTP_STUB_STATUS = YES ]; then have=NGX_STAT_STUB . auto/have HTTP_MODULES="$HTTP_MODULES ngx_http_stub_status_module" diff --git a/auto/options b/auto/options --- a/auto/options +++ b/auto/options @@ -95,6 +95,7 @@ HTTP_FLV=NO HTTP_MP4=NO HTTP_GZIP_STATIC=NO HTTP_UPSTREAM_IP_HASH=YES +HTTP_UPSTREAM_KEEPALIVE=YES # STUB HTTP_STUB_STATUS=NO @@ -231,6 +232,7 @@ do --without-http_empty_gif_module) HTTP_EMPTY_GIF=NO ;; --without-http_browser_module) HTTP_BROWSER=NO ;; --without-http_upstream_ip_hash_module) HTTP_UPSTREAM_IP_HASH=NO ;; + --without-http_upstream_keepalive_module) HTTP_UPSTREAM_KEEPALIVE=NO ;; --with-http_perl_module) HTTP_PERL=YES ;; --with-perl_modules_path=*) NGX_PERL_MODULES="$value" ;; diff --git a/auto/sources b/auto/sources --- a/auto/sources +++ b/auto/sources @@ -475,6 +475,11 @@ HTTP_UPSTREAM_IP_HASH_MODULE=ngx_http_up HTTP_UPSTREAM_IP_HASH_SRCS=src/http/modules/ngx_http_upstream_ip_hash_module.c +HTTP_UPSTREAM_KEEPALIVE_MODULE=ngx_http_upstream_keepalive_module +HTTP_UPSTREAM_KEEPALIVE_SRCS=" \ + src/http/modules/ngx_http_upstream_keepalive_module.c" + + MAIL_INCS="src/mail" MAIL_DEPS="src/mail/ngx_mail.h" diff --git a/src/http/modules/ngx_http_upstream_keepalive_module.c b/src/http/modules/ngx_http_upstream_keepalive_module.c new file mode 100644 --- /dev/null +++ b/src/http/modules/ngx_http_upstream_keepalive_module.c @@ -0,0 +1,569 @@ + +/* + * Copyright (C) Maxim Dounin + */ + + +#include +#include +#include + + +typedef struct { + ngx_uint_t max_cached; + ngx_uint_t single; /* unsigned:1 */ + + ngx_queue_t cache; + ngx_queue_t free; + + ngx_http_upstream_init_pt original_init_upstream; + ngx_http_upstream_init_peer_pt original_init_peer; + +} ngx_http_upstream_keepalive_srv_conf_t; + + +typedef struct { + ngx_http_upstream_keepalive_srv_conf_t *conf; + + ngx_http_upstream_t *upstream; + + void *data; + + ngx_event_get_peer_pt original_get_peer; + ngx_event_free_peer_pt original_free_peer; + +#if (NGX_HTTP_SSL) + ngx_event_set_peer_session_pt original_set_session; + ngx_event_save_peer_session_pt original_save_session; +#endif + + ngx_uint_t failed; /* unsigned:1 */ + +} ngx_http_upstream_keepalive_peer_data_t; + + +typedef struct { + ngx_http_upstream_keepalive_srv_conf_t *conf; + + ngx_queue_t queue; + ngx_connection_t *connection; + + socklen_t socklen; + u_char sockaddr[NGX_SOCKADDRLEN]; + +} ngx_http_upstream_keepalive_cache_t; + + +static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, + ngx_http_upstream_srv_conf_t *us); +static ngx_int_t ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, + void *data); +static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, + void *data, ngx_uint_t state); + +static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev); +static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev); +static void ngx_http_upstream_keepalive_close(ngx_connection_t *c); + + +#if (NGX_HTTP_SSL) +static ngx_int_t ngx_http_upstream_keepalive_set_session( + ngx_peer_connection_t *pc, void *data); +static void ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, + void *data); +#endif + +static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf); +static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +static ngx_command_t ngx_http_upstream_keepalive_commands[] = { + + { ngx_string("keepalive"), + NGX_HTTP_UPS_CONF|NGX_CONF_TAKE12, + ngx_http_upstream_keepalive, + 0, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_http_module_t ngx_http_upstream_keepalive_module_ctx = { + NULL, /* preconfiguration */ + NULL, /* postconfiguration */ + + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_http_upstream_keepalive_create_conf, /* create server configuration */ + NULL, /* merge server configuration */ + + NULL, /* create location configuration */ + NULL /* merge location configuration */ +}; + + +ngx_module_t ngx_http_upstream_keepalive_module = { + NGX_MODULE_V1, + &ngx_http_upstream_keepalive_module_ctx, /* module context */ + ngx_http_upstream_keepalive_commands, /* module directives */ + NGX_HTTP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static ngx_int_t +ngx_http_upstream_init_keepalive(ngx_conf_t *cf, + ngx_http_upstream_srv_conf_t *us) +{ + ngx_uint_t i; + ngx_http_upstream_keepalive_srv_conf_t *kcf; + ngx_http_upstream_keepalive_cache_t *cached; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0, + "init keepalive"); + + kcf = ngx_http_conf_upstream_srv_conf(us, + ngx_http_upstream_keepalive_module); + + if (kcf->original_init_upstream(cf, us) != NGX_OK) { + return NGX_ERROR; + } + + kcf->original_init_peer = us->peer.init; + + us->peer.init = ngx_http_upstream_init_keepalive_peer; + + /* allocate cache items and add to free queue */ + + cached = ngx_pcalloc(cf->pool, + sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached); + if (cached == NULL) { + return NGX_ERROR; + } + + ngx_queue_init(&kcf->cache); + ngx_queue_init(&kcf->free); + + for (i = 0; i < kcf->max_cached; i++) { + ngx_queue_insert_head(&kcf->free, &cached[i].queue); + cached[i].conf = kcf; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, + ngx_http_upstream_srv_conf_t *us) +{ + ngx_http_upstream_keepalive_peer_data_t *kp; + ngx_http_upstream_keepalive_srv_conf_t *kcf; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "init keepalive peer"); + + kcf = ngx_http_conf_upstream_srv_conf(us, + ngx_http_upstream_keepalive_module); + + kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t)); + if (kp == NULL) { + return NGX_ERROR; + } + + if (kcf->original_init_peer(r, us) != NGX_OK) { + return NGX_ERROR; + } + + kp->conf = kcf; + kp->upstream = r->upstream; + kp->data = r->upstream->peer.data; + kp->original_get_peer = r->upstream->peer.get; + kp->original_free_peer = r->upstream->peer.free; + + r->upstream->peer.data = kp; + r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer; + r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer; + +#if (NGX_HTTP_SSL) + kp->original_set_session = r->upstream->peer.set_session; + kp->original_save_session = r->upstream->peer.save_session; + r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session; + r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session; +#endif + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_http_upstream_keepalive_cache_t *item; + + ngx_int_t rc; + ngx_queue_t *q, *cache; + ngx_connection_t *c; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer"); + + kp->failed = 0; + + /* single pool of cached connections */ + + if (kp->conf->single && !ngx_queue_empty(&kp->conf->cache)) { + + q = ngx_queue_head(&kp->conf->cache); + + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + c = item->connection; + + ngx_queue_remove(q); + ngx_queue_insert_head(&kp->conf->free, q); + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer: using connection %p", c); + + c->idle = 0; + c->log = pc->log; + c->read->log = pc->log; + c->write->log = pc->log; + c->pool->log = pc->log; + + pc->connection = c; + pc->cached = 1; + + return NGX_DONE; + } + + rc = kp->original_get_peer(pc, kp->data); + + if (kp->conf->single || rc != NGX_OK) { + return rc; + } + + /* search cache for suitable connection */ + + cache = &kp->conf->cache; + + for (q = ngx_queue_head(cache); + q != ngx_queue_sentinel(cache); + q = ngx_queue_next(q)) + { + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + c = item->connection; + + if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr, + item->socklen, pc->socklen) + == 0) + { + ngx_queue_remove(q); + ngx_queue_insert_head(&kp->conf->free, q); + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer: using connection %p", c); + + c->idle = 0; + c->log = pc->log; + c->read->log = pc->log; + c->write->log = pc->log; + c->pool->log = pc->log; + + pc->connection = c; + pc->cached = 1; + + return NGX_DONE; + } + } + + return NGX_OK; +} + + +static void +ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_http_upstream_keepalive_cache_t *item; + + ngx_queue_t *q; + ngx_connection_t *c; + ngx_http_upstream_t *u; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "free keepalive peer"); + + /* remember failed state - peer.free() may be called more than once */ + + if (state & NGX_PEER_FAILED) { + kp->failed = 1; + } + + /* cache valid connections */ + + u = kp->upstream; + c = pc->connection; + + if (kp->failed + || c == NULL + || c->read->eof + || c->read->error + || c->read->timedout + || c->write->error + || c->write->timedout) + { + goto invalid; + } + + if (!u->keepalive) { + goto invalid; + } + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + goto invalid; + } + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "free keepalive peer: saving connection %p", c); + + if (ngx_queue_empty(&kp->conf->free)) { + + q = ngx_queue_last(&kp->conf->cache); + ngx_queue_remove(q); + + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + + ngx_http_upstream_keepalive_close(item->connection); + + } else { + q = ngx_queue_head(&kp->conf->free); + ngx_queue_remove(q); + + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + } + + item->connection = c; + ngx_queue_insert_head(&kp->conf->cache, q); + + pc->connection = NULL; + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + + c->write->handler = ngx_http_upstream_keepalive_dummy_handler; + c->read->handler = ngx_http_upstream_keepalive_close_handler; + + c->data = item; + c->idle = 1; + c->log = ngx_cycle->log; + c->read->log = ngx_cycle->log; + c->write->log = ngx_cycle->log; + c->pool->log = ngx_cycle->log; + + item->socklen = pc->socklen; + ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen); + + if (c->read->ready) { + ngx_http_upstream_keepalive_close_handler(c->read); + } + +invalid: + + kp->original_free_peer(pc, kp->data, state); +} + + +static void +ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev) +{ + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "keepalive dummy handler"); +} + + +static void +ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev) +{ + ngx_http_upstream_keepalive_srv_conf_t *conf; + ngx_http_upstream_keepalive_cache_t *item; + + int n; + char buf[1]; + ngx_connection_t *c; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "keepalive close handler"); + + c = ev->data; + + if (c->close) { + goto close; + } + + n = recv(c->fd, buf, 1, MSG_PEEK); + + if (n == -1 && ngx_socket_errno == NGX_EAGAIN) { + /* stale event */ + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + goto close; + } + + return; + } + +close: + + item = c->data; + conf = item->conf; + + ngx_http_upstream_keepalive_close(c); + + ngx_queue_remove(&item->queue); + ngx_queue_insert_head(&conf->free, &item->queue); +} + + +static void +ngx_http_upstream_keepalive_close(ngx_connection_t *c) +{ + +#if (NGX_HTTP_SSL) + + if (c->ssl) { + c->ssl->no_wait_shutdown = 1; + c->ssl->no_send_shutdown = 1; + + if (ngx_ssl_shutdown(c) == NGX_AGAIN) { + c->ssl->handler = ngx_http_upstream_keepalive_close; + return; + } + } + +#endif + + ngx_destroy_pool(c->pool); + ngx_close_connection(c); +} + + +#if (NGX_HTTP_SSL) + +static ngx_int_t +ngx_http_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + + return kp->original_set_session(pc, kp->data); +} + + +static void +ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + + kp->original_save_session(pc, kp->data); + return; +} + +#endif + + +static void * +ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf) +{ + ngx_http_upstream_keepalive_srv_conf_t *conf; + + conf = ngx_pcalloc(cf->pool, + sizeof(ngx_http_upstream_keepalive_srv_conf_t)); + if (conf == NULL) { + return NULL; + } + + /* + * set by ngx_pcalloc(): + * + * conf->original_init_upstream = NULL; + * conf->original_init_peer = NULL; + */ + + conf->max_cached = 1; + + return conf; +} + + +static char * +ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_http_upstream_srv_conf_t *uscf; + ngx_http_upstream_keepalive_srv_conf_t *kcf; + + ngx_int_t n; + ngx_str_t *value; + ngx_uint_t i; + + uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module); + + kcf = ngx_http_conf_upstream_srv_conf(uscf, + ngx_http_upstream_keepalive_module); + + kcf->original_init_upstream = uscf->peer.init_upstream + ? uscf->peer.init_upstream + : ngx_http_upstream_init_round_robin; + + uscf->peer.init_upstream = ngx_http_upstream_init_keepalive; + + /* read options */ + + value = cf->args->elts; + + n = ngx_atoi(value[1].data, value[1].len); + + if (n == NGX_ERROR || n == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid value \"%V\" in \"%V\" directive", + &value[1], &cmd->name); + return NGX_CONF_ERROR; + } + + kcf->max_cached = n; + + for (i = 2; i < cf->args->nelts; i++) { + + if (ngx_strcmp(value[i].data, "single") == 0) { + kcf->single = 1; + continue; + } + + goto invalid; + } + + return NGX_CONF_OK; + +invalid: + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid parameter \"%V\"", &value[i]); + + return NGX_CONF_ERROR; +}