# HG changeset patch # User Maxim Dounin # Date 1224633149 -14400 # Node ID ca955a7f651b5a1c58e56d4335adc5f66f8bbc82 # Parent 725ee11164f36c9833446442ab45c8fc79e4f38f Keepalive: common cache for all backends. Known limitations: Connections cached to the upstream pool as a whole, not to individual backend servers, and therefore load distribution may be less than ideal (especially under light load). diff --git a/ngx_http_upstream_keepalive_module.c b/ngx_http_upstream_keepalive_module.c --- a/ngx_http_upstream_keepalive_module.c +++ b/ngx_http_upstream_keepalive_module.c @@ -1,6 +1,6 @@ /* - * Copyright (C) Igor Sysoev + * Copyright (C) Maxim Dounin */ @@ -10,7 +10,10 @@ typedef struct { - ngx_int_t cached; + ngx_uint_t max_cached; + ngx_uint_t last_cached; + + ngx_connection_t **cached; ngx_http_upstream_init_pt original_init_upstream; ngx_http_upstream_init_peer_pt original_init_peer; @@ -18,6 +21,8 @@ typedef struct { } ngx_http_upstream_keepalive_srv_conf_t; typedef struct { + ngx_http_upstream_keepalive_srv_conf_t *conf; + void *data; ngx_event_get_peer_pt original_get_peer; @@ -33,6 +38,10 @@ static ngx_int_t ngx_http_upstream_get_k static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, ngx_uint_t state); +static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev); +static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev); + + static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf); static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); @@ -102,6 +111,13 @@ ngx_http_upstream_init_keepalive(ngx_con us->peer.init = ngx_http_upstream_init_keepalive_peer; + kcf->cached = ngx_pcalloc(cf->pool, + sizeof(ngx_connection_t *) * kcf->max_cached); + if (kcf->cached == NULL) { + return NGX_ERROR; + } + + return NGX_OK; } @@ -128,6 +144,7 @@ ngx_http_upstream_init_keepalive_peer(ng return NGX_ERROR; } + kp->conf = kcf; kp->data = r->upstream->peer.data; kp->original_get_peer = r->upstream->peer.get; kp->original_free_peer = r->upstream->peer.free; @@ -145,9 +162,28 @@ ngx_http_upstream_get_keepalive_peer(ngx { ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_connection_t *c; + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, "get keepalive peer"); + /* XXX single pool of cached connections */ + + if (kp->conf->last_cached) { + + c = kp->conf->cached[--kp->conf->last_cached]; + + c->idle = 0; + c->log = pc->log; + c->read->log = pc->log; + c->write->log = pc->log; + + pc->connection = c; + pc->cached = 1; + + return NGX_DONE; + } + return kp->original_get_peer(pc, kp->data); } @@ -158,13 +194,79 @@ ngx_http_upstream_free_keepalive_peer(ng { ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_connection_t *c; + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, "free keepalive peer"); + if (!(state & NGX_PEER_FAILED) + && pc->connection != NULL + && kp->conf->last_cached < kp->conf->max_cached) + { + c = pc->connection; + + kp->conf->cached[kp->conf->last_cached++] = c; + pc->connection = NULL; + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + + c->write->handler = ngx_http_upstream_keepalive_dummy_handler; + c->read->handler = ngx_http_upstream_keepalive_close_handler; + + c->data = kp->conf; + c->idle = 1; + c->log = ngx_cycle->log; + c->read->log = ngx_cycle->log; + c->write->log = ngx_cycle->log; + } + return kp->original_free_peer(pc, kp->data, state); } +static void +ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev) +{ + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "keepalive dummy handler"); +} + + +static void +ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev) +{ + ngx_http_upstream_keepalive_srv_conf_t *conf; + + ngx_uint_t i; + ngx_connection_t *c; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "keepalive close handler"); + + c = ev->data; + conf = c->data; + + for (i = 0; i < conf->last_cached; i++) { + if (conf->cached[i] == c) { + + conf->cached[i] = conf->cached[--conf->last_cached]; + + ngx_close_connection(c); + return; + } + } + + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, + "keepalive close handler: unknown connection %p", c); + ngx_close_connection(c); +} + + static void * ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf) { @@ -179,11 +281,13 @@ ngx_http_upstream_keepalive_create_conf( /* * set by ngx_pcalloc(): * - * conf->cached = 0; + * conf->last_cached = 0; * conf->original_init_upstream = NULL; * conf->original_init_peer = NULL; */ + conf->max_cached = 1; + return conf; } @@ -194,6 +298,10 @@ ngx_http_upstream_keepalive(ngx_conf_t * ngx_http_upstream_srv_conf_t *uscf; ngx_http_upstream_keepalive_srv_conf_t *kcf; + ngx_int_t n; + ngx_str_t *value; + ngx_uint_t i; + uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module); kcf = ngx_http_conf_upstream_srv_conf(uscf, @@ -205,5 +313,33 @@ ngx_http_upstream_keepalive(ngx_conf_t * uscf->peer.init_upstream = ngx_http_upstream_init_keepalive; + /* read options */ + + value = cf->args->elts; + + for (i = 1; i < cf->args->nelts; i++) { + + if (ngx_strncmp(value[i].data, "cached=", 7) == 0) { + n = ngx_atoi(&value[i].data[7], value[i].len - 7); + + if (n == NGX_ERROR) { + goto invalid; + } + + kcf->max_cached = n; + + continue; + } + + goto invalid; + } + return NGX_CONF_OK; + +invalid: + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid parameter \"%V\"", &value[i]); + + return NGX_CONF_ERROR; } diff --git a/t/memcached-keepalive.t b/t/memcached-keepalive.t new file mode 100644 --- /dev/null +++ b/t/memcached-keepalive.t @@ -0,0 +1,84 @@ +#!/usr/bin/perl + +# (C) Maxim Dounin + +# Test for memcached with keepalive. + +############################################################################### + +use warnings; +use strict; + +use Test::More; +use Test::Nginx; + +############################################################################### + +select STDERR; $| = 1; +select STDOUT; $| = 1; + +eval { require Cache::Memcached; }; +plain(skip_all => 'Cache::Memcached not installed') if $@; + +my $t = Test::Nginx->new()->has('rewrite')->has_daemon('memcached')->plan(7) + ->write_file_expand('nginx.conf', <<'EOF'); + +master_process off; +daemon off; + +events { + worker_connections 1024; +} + +http { + access_log off; + + client_body_temp_path %%TESTDIR%%/client_body_temp; + fastcgi_temp_path %%TESTDIR%%/fastcgi_temp; + proxy_temp_path %%TESTDIR%%/proxy_temp; + + upstream memd { + server 127.0.0.1:8081; + keepalive; + } + + server { + listen localhost:8080; + server_name localhost; + + location / { + set $memcached_key $uri; + memcached_pass memd; + } + + location /next { + set $memcached_key $uri; + memcached_next_upstream not_found; + memcached_pass memd; + } + } +} + +EOF + +$t->run_daemon('memcached', '-l', '127.0.0.1', '-p', '8081'); +$t->run(); + +############################################################################### + +my $memd = Cache::Memcached->new(servers => [ '127.0.0.1:8081' ]); +$memd->set('/', 'SEE-THIS'); + +my $total = $memd->stats()->{total}->{total_connections}; + +like(http_get('/'), qr/SEE-THIS/, 'keepalive memcached request'); +like(http_get('/notfound'), qr/404/, 'keepalive memcached not found'); +like(http_get('/next'), qr/404/, + 'keepalive not found with memcached_next_upstream'); +like(http_get('/'), qr/SEE-THIS/, 'keepalive memcached request again'); +like(http_get('/'), qr/SEE-THIS/, 'keepalive memcached request again'); +like(http_get('/'), qr/SEE-THIS/, 'keepalive memcached request again'); + +is($memd->stats()->{total}->{total_connections}, $total + 1, 'keepalive used'); + +###############################################################################