changeset 1:ca955a7f651b

Keepalive: common cache for all backends. Known limitations: Connections cached to the upstream pool as a whole, not to individual backend servers, and therefore load distribution may be less than ideal (especially under light load).
author Maxim Dounin <mdounin@mdounin.ru>
date Wed, 22 Oct 2008 03:52:29 +0400
parents 725ee11164f3
children 8545bbda9e4b
files ngx_http_upstream_keepalive_module.c t/memcached-keepalive.t
diffstat 2 files changed, 223 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/ngx_http_upstream_keepalive_module.c
+++ b/ngx_http_upstream_keepalive_module.c
@@ -1,6 +1,6 @@
 
 /*
- * Copyright (C) Igor Sysoev
+ * Copyright (C) Maxim Dounin
  */
 
 
@@ -10,7 +10,10 @@
 
 
 typedef struct {
-    ngx_int_t                          cached;
+    ngx_uint_t                         max_cached;
+    ngx_uint_t                         last_cached;
+
+    ngx_connection_t                 **cached;
 
     ngx_http_upstream_init_pt          original_init_upstream;
     ngx_http_upstream_init_peer_pt     original_init_peer;
@@ -18,6 +21,8 @@ typedef struct {
 } ngx_http_upstream_keepalive_srv_conf_t;
 
 typedef struct {
+    ngx_http_upstream_keepalive_srv_conf_t  *conf;
+
     void                              *data;
 
     ngx_event_get_peer_pt              original_get_peer;
@@ -33,6 +38,10 @@ static ngx_int_t ngx_http_upstream_get_k
 static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc,
     void *data, ngx_uint_t state);
 
+static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev);
+static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev);
+
+
 static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf);
 static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf);
@@ -102,6 +111,13 @@ ngx_http_upstream_init_keepalive(ngx_con
 
     us->peer.init = ngx_http_upstream_init_keepalive_peer;
 
+    kcf->cached = ngx_pcalloc(cf->pool,
+                              sizeof(ngx_connection_t *) * kcf->max_cached);
+    if (kcf->cached == NULL) {
+        return NGX_ERROR;
+    }
+
+
     return NGX_OK;
 }
 
@@ -128,6 +144,7 @@ ngx_http_upstream_init_keepalive_peer(ng
         return NGX_ERROR;
     }
 
+    kp->conf = kcf;
     kp->data = r->upstream->peer.data;
     kp->original_get_peer = r->upstream->peer.get;
     kp->original_free_peer = r->upstream->peer.free;
@@ -145,9 +162,28 @@ ngx_http_upstream_get_keepalive_peer(ngx
 {
     ngx_http_upstream_keepalive_peer_data_t  *kp = data;
 
+    ngx_connection_t   *c;
+
     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
                    "get keepalive peer");
 
+    /* XXX single pool of cached connections */
+
+    if (kp->conf->last_cached) {
+
+        c = kp->conf->cached[--kp->conf->last_cached];
+
+        c->idle = 0;
+        c->log = pc->log;
+        c->read->log = pc->log;
+        c->write->log = pc->log;
+
+        pc->connection = c;
+        pc->cached = 1;
+
+        return NGX_DONE;
+    }
+
     return kp->original_get_peer(pc, kp->data);
 }
 
@@ -158,13 +194,79 @@ ngx_http_upstream_free_keepalive_peer(ng
 {
     ngx_http_upstream_keepalive_peer_data_t  *kp = data;
 
+    ngx_connection_t  *c;
+
     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
                    "free keepalive peer");
 
+    if (!(state & NGX_PEER_FAILED)
+        && pc->connection != NULL
+        && kp->conf->last_cached < kp->conf->max_cached)
+    {
+        c = pc->connection;
+
+        kp->conf->cached[kp->conf->last_cached++] = c;
+        pc->connection = NULL;
+
+        if (c->read->timer_set) {
+            ngx_del_timer(c->read);
+        }
+        if (c->write->timer_set) {
+            ngx_del_timer(c->write);
+        }
+
+        c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
+        c->read->handler = ngx_http_upstream_keepalive_close_handler;
+
+        c->data = kp->conf;
+        c->idle = 1;
+        c->log = ngx_cycle->log;
+        c->read->log = ngx_cycle->log;
+        c->write->log = ngx_cycle->log;
+    }
+
     return kp->original_free_peer(pc, kp->data, state);
 }
 
 
+static void
+ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev)
+{
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
+                   "keepalive dummy handler");
+}
+
+
+static void
+ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev)
+{
+    ngx_http_upstream_keepalive_srv_conf_t  *conf;
+
+    ngx_uint_t         i;
+    ngx_connection_t  *c;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
+                   "keepalive close handler");
+
+    c = ev->data;
+    conf = c->data;
+
+    for (i = 0; i < conf->last_cached; i++) {
+        if (conf->cached[i] == c) {
+
+            conf->cached[i] = conf->cached[--conf->last_cached];
+
+            ngx_close_connection(c);
+            return;
+        }
+    }
+
+    ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
+                  "keepalive close handler: unknown connection %p", c);
+    ngx_close_connection(c);
+}
+
+
 static void *
 ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf)
 {
@@ -179,11 +281,13 @@ ngx_http_upstream_keepalive_create_conf(
     /*
      * set by ngx_pcalloc():
      *
-     *     conf->cached = 0;
+     *     conf->last_cached = 0;
      *     conf->original_init_upstream = NULL;
      *     conf->original_init_peer = NULL;
      */
 
+    conf->max_cached = 1;
+
     return conf;
 }
 
@@ -194,6 +298,10 @@ ngx_http_upstream_keepalive(ngx_conf_t *
     ngx_http_upstream_srv_conf_t            *uscf;
     ngx_http_upstream_keepalive_srv_conf_t  *kcf;
 
+    ngx_int_t    n;
+    ngx_str_t   *value;
+    ngx_uint_t   i;
+
     uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
 
     kcf = ngx_http_conf_upstream_srv_conf(uscf,
@@ -205,5 +313,33 @@ ngx_http_upstream_keepalive(ngx_conf_t *
 
     uscf->peer.init_upstream = ngx_http_upstream_init_keepalive;
 
+    /* read options */
+
+    value = cf->args->elts;
+
+    for (i = 1; i < cf->args->nelts; i++) {
+
+        if (ngx_strncmp(value[i].data, "cached=", 7) == 0) {
+            n = ngx_atoi(&value[i].data[7], value[i].len - 7);
+
+            if (n == NGX_ERROR) {
+                goto invalid;
+            }
+
+            kcf->max_cached = n;
+
+            continue;
+        }
+
+        goto invalid;
+    }
+
     return NGX_CONF_OK;
+
+invalid:
+
+    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+                       "invalid parameter \"%V\"", &value[i]);
+
+    return NGX_CONF_ERROR;
 }
new file mode 100644
--- /dev/null
+++ b/t/memcached-keepalive.t
@@ -0,0 +1,84 @@
+#!/usr/bin/perl
+
+# (C) Maxim Dounin
+
+# Test for memcached with keepalive.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use Test::More;
+use Test::Nginx;
+
+###############################################################################
+
+select STDERR; $| = 1;
+select STDOUT; $| = 1;
+
+eval { require Cache::Memcached; };
+plain(skip_all => 'Cache::Memcached not installed') if $@;
+
+my $t = Test::Nginx->new()->has('rewrite')->has_daemon('memcached')->plan(7)
+	->write_file_expand('nginx.conf', <<'EOF');
+
+master_process off;
+daemon         off;
+
+events {
+    worker_connections  1024;
+}
+
+http {
+    access_log    off;
+
+    client_body_temp_path  %%TESTDIR%%/client_body_temp;
+    fastcgi_temp_path      %%TESTDIR%%/fastcgi_temp;
+    proxy_temp_path        %%TESTDIR%%/proxy_temp;
+
+    upstream memd {
+        server 127.0.0.1:8081;
+        keepalive;
+    }
+
+    server {
+        listen       localhost:8080;
+        server_name  localhost;
+
+        location / {
+            set $memcached_key $uri;
+            memcached_pass memd;
+        }
+
+        location /next {
+            set $memcached_key $uri;
+            memcached_next_upstream  not_found;
+            memcached_pass memd;
+        }
+    }
+}
+
+EOF
+
+$t->run_daemon('memcached', '-l', '127.0.0.1', '-p', '8081');
+$t->run();
+
+###############################################################################
+
+my $memd = Cache::Memcached->new(servers => [ '127.0.0.1:8081' ]);
+$memd->set('/', 'SEE-THIS');
+
+my $total = $memd->stats()->{total}->{total_connections};
+
+like(http_get('/'), qr/SEE-THIS/, 'keepalive memcached request');
+like(http_get('/notfound'), qr/404/, 'keepalive memcached not found');
+like(http_get('/next'), qr/404/,
+	'keepalive not found with memcached_next_upstream');
+like(http_get('/'), qr/SEE-THIS/, 'keepalive memcached request again');
+like(http_get('/'), qr/SEE-THIS/, 'keepalive memcached request again');
+like(http_get('/'), qr/SEE-THIS/, 'keepalive memcached request again');
+
+is($memd->stats()->{total}->{total_connections}, $total + 1, 'keepalive used');
+
+###############################################################################