comparison lib/Cache/Memcached.pm @ 0:17fc6afc155e CACHE_MEMCACHED_1_24

Cache::Memcached 1.24
author Maxim Dounin <mdounin@mdounin.ru>
date Sun, 30 Sep 2007 16:23:31 +0400
parents
children f5cfb726ea65
comparison
equal deleted inserted replaced
-1:000000000000 0:17fc6afc155e
1 # $Id: Memcached.pm 601 2007-07-17 17:47:33Z bradfitz $
2 #
3 # Copyright (c) 2003, 2004 Brad Fitzpatrick <brad@danga.com>
4 #
5 # See COPYRIGHT section in pod text below for usage and distribution rights.
6 #
7
8 package Cache::Memcached;
9
10 use strict;
11 use warnings;
12
13 no strict 'refs';
14 use Storable ();
15 use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
16 use IO::Handle ();
17 use Time::HiRes ();
18 use String::CRC32;
19 use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
20 use Cache::Memcached::GetParser;
21 use fields qw{
22 debug no_rehash stats compress_threshold compress_enable stat_callback
23 readonly select_timeout namespace namespace_len servers active buckets
24 pref_ip
25 bucketcount _single_sock _stime
26 connect_timeout cb_connect_fail
27 parser_class
28 };
29
30 # flag definitions
31 use constant F_STORABLE => 1;
32 use constant F_COMPRESS => 2;
33
34 # size savings required before saving compressed value
35 use constant COMPRESS_SAVINGS => 0.20; # percent
36
37 use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL);
38 $VERSION = "1.24";
39
40 BEGIN {
41 $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
42 }
43
44 my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;";
45 $HAVE_XS = 0 if $ENV{NO_XS};
46
47 my $parser_class = $HAVE_XS ? "Cache::Memcached::GetParserXS" : "Cache::Memcached::GetParser";
48 if ($ENV{XS_DEBUG}) {
49 print "using parser: $parser_class\n";
50 }
51
52 $FLAG_NOSIGNAL = 0;
53 eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
54
55 my %host_dead; # host -> unixtime marked dead until
56 my %cache_sock; # host -> socket
57 my @buck2sock; # bucket number -> $sock
58
59 my $PROTO_TCP;
60
61 our $SOCK_TIMEOUT = 2.6; # default timeout in seconds
62
63 sub new {
64 my Cache::Memcached $self = shift;
65 $self = fields::new( $self ) unless ref $self;
66
67 my $args = (@_ == 1) ? shift : { @_ }; # hashref-ify args
68
69 $self->set_servers($args->{'servers'});
70 $self->{'debug'} = $args->{'debug'} || 0;
71 $self->{'no_rehash'} = $args->{'no_rehash'};
72 $self->{'stats'} = {};
73 $self->{'pref_ip'} = $args->{'pref_ip'} || {};
74 $self->{'compress_threshold'} = $args->{'compress_threshold'};
75 $self->{'compress_enable'} = 1;
76 $self->{'stat_callback'} = $args->{'stat_callback'} || undef;
77 $self->{'readonly'} = $args->{'readonly'};
78 $self->{'parser_class'} = $args->{'parser_class'} || $parser_class;
79
80 # TODO: undocumented
81 $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25;
82 $self->{'select_timeout'} = $args->{'select_timeout'} || 1.0;
83 $self->{namespace} = $args->{namespace} || '';
84 $self->{namespace_len} = length $self->{namespace};
85
86 return $self;
87 }
88
89 sub set_pref_ip {
90 my Cache::Memcached $self = shift;
91 $self->{'pref_ip'} = shift;
92 }
93
94 sub set_servers {
95 my Cache::Memcached $self = shift;
96 my ($list) = @_;
97 $self->{'servers'} = $list || [];
98 $self->{'active'} = scalar @{$self->{'servers'}};
99 $self->{'buckets'} = undef;
100 $self->{'bucketcount'} = 0;
101 $self->init_buckets;
102 @buck2sock = ();
103
104 $self->{'_single_sock'} = undef;
105 if (@{$self->{'servers'}} == 1) {
106 $self->{'_single_sock'} = $self->{'servers'}[0];
107 }
108
109 return $self;
110 }
111
112 sub set_cb_connect_fail {
113 my Cache::Memcached $self = shift;
114 $self->{'cb_connect_fail'} = shift;
115 }
116
117 sub set_connect_timeout {
118 my Cache::Memcached $self = shift;
119 $self->{'connect_timeout'} = shift;
120 }
121
122 sub set_debug {
123 my Cache::Memcached $self = shift;
124 my ($dbg) = @_;
125 $self->{'debug'} = $dbg || 0;
126 }
127
128 sub set_readonly {
129 my Cache::Memcached $self = shift;
130 my ($ro) = @_;
131 $self->{'readonly'} = $ro;
132 }
133
134 sub set_norehash {
135 my Cache::Memcached $self = shift;
136 my ($val) = @_;
137 $self->{'no_rehash'} = $val;
138 }
139
140 sub set_compress_threshold {
141 my Cache::Memcached $self = shift;
142 my ($thresh) = @_;
143 $self->{'compress_threshold'} = $thresh;
144 }
145
146 sub enable_compress {
147 my Cache::Memcached $self = shift;
148 my ($enable) = @_;
149 $self->{'compress_enable'} = $enable;
150 }
151
152 sub forget_dead_hosts {
153 %host_dead = ();
154 @buck2sock = ();
155 }
156
157 sub set_stat_callback {
158 my Cache::Memcached $self = shift;
159 my ($stat_callback) = @_;
160 $self->{'stat_callback'} = $stat_callback;
161 }
162
163 my %sock_map; # stringified-$sock -> "$ip:$port"
164
165 sub _dead_sock {
166 my ($sock, $ret, $dead_for) = @_;
167 if (my $ipport = $sock_map{$sock}) {
168 my $now = time();
169 $host_dead{$ipport} = $now + $dead_for
170 if $dead_for;
171 delete $cache_sock{$ipport};
172 delete $sock_map{$sock};
173 }
174 @buck2sock = ();
175 return $ret; # 0 or undef, probably, depending on what caller wants
176 }
177
178 sub _close_sock {
179 my ($sock) = @_;
180 if (my $ipport = $sock_map{$sock}) {
181 close $sock;
182 delete $cache_sock{$ipport};
183 delete $sock_map{$sock};
184 }
185 @buck2sock = ();
186 }
187
188 sub _connect_sock { # sock, sin, timeout
189 my ($sock, $sin, $timeout) = @_;
190 $timeout = 0.25 if not defined $timeout;
191
192 # make the socket non-blocking from now on,
193 # except if someone wants 0 timeout, meaning
194 # a blocking connect, but even then turn it
195 # non-blocking at the end of this function
196
197 if ($timeout) {
198 IO::Handle::blocking($sock, 0);
199 } else {
200 IO::Handle::blocking($sock, 1);
201 }
202
203 my $ret = connect($sock, $sin);
204
205 if (!$ret && $timeout && $!==EINPROGRESS) {
206
207 my $win='';
208 vec($win, fileno($sock), 1) = 1;
209
210 if (select(undef, $win, undef, $timeout) > 0) {
211 $ret = connect($sock, $sin);
212 # EISCONN means connected & won't re-connect, so success
213 $ret = 1 if !$ret && $!==EISCONN;
214 }
215 }
216
217 unless ($timeout) { # socket was temporarily blocking, now revert
218 IO::Handle::blocking($sock, 0);
219 }
220
221 # from here on, we use non-blocking (async) IO for the duration
222 # of the socket's life
223
224 return $ret;
225 }
226
227 sub sock_to_host { # (host)
228 my Cache::Memcached $self = ref $_[0] ? shift : undef;
229 my $host = $_[0];
230 return $cache_sock{$host} if $cache_sock{$host};
231
232 my $now = time();
233 my ($ip, $port) = $host =~ /(.*):(\d+)/;
234 return undef if
235 $host_dead{$host} && $host_dead{$host} > $now;
236 my $sock;
237
238 my $connected = 0;
239 my $sin;
240 my $proto = $PROTO_TCP ||= getprotobyname('tcp');
241
242 if ( index($host, '/') != 0 )
243 {
244 # if a preferred IP is known, try that first.
245 if ($self && $self->{pref_ip}{$ip}) {
246 socket($sock, PF_INET, SOCK_STREAM, $proto);
247 my $prefip = $self->{pref_ip}{$ip};
248 $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
249 if (_connect_sock($sock,$sin,$self->{connect_timeout})) {
250 $connected = 1;
251 } else {
252 if (my $cb = $self->{cb_connect_fail}) {
253 $cb->($prefip);
254 }
255 close $sock;
256 }
257 }
258
259 # normal path, or fallback path if preferred IP failed
260 unless ($connected) {
261 socket($sock, PF_INET, SOCK_STREAM, $proto);
262 $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
263 my $timeout = $self ? $self->{connect_timeout} : 0.25;
264 unless (_connect_sock($sock,$sin,$timeout)) {
265 my $cb = $self ? $self->{cb_connect_fail} : undef;
266 $cb->($ip) if $cb;
267 return _dead_sock($sock, undef, 20 + int(rand(10)));
268 }
269 }
270 } else { # it's a unix domain/local socket
271 socket($sock, PF_UNIX, SOCK_STREAM, 0);
272 $sin = Socket::sockaddr_un($host);
273 my $timeout = $self ? $self->{connect_timeout} : 0.25;
274 unless (_connect_sock($sock,$sin,$timeout)) {
275 my $cb = $self ? $self->{cb_connect_fail} : undef;
276 $cb->($host) if $cb;
277 return _dead_sock($sock, undef, 20 + int(rand(10)));
278 }
279 }
280
281 # make the new socket not buffer writes.
282 my $old = select($sock);
283 $| = 1;
284 select($old);
285
286 $sock_map{$sock} = $host;
287 $cache_sock{$host} = $sock;
288
289 return $sock;
290 }
291
292 sub get_sock { # (key)
293 my Cache::Memcached $self = $_[0];
294 my $key = $_[1];
295 return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
296 return undef unless $self->{'active'};
297 my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
298
299 my $real_key = ref $key ? $key->[1] : $key;
300 my $tries = 0;
301 while ($tries++ < 20) {
302 my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}];
303 my $sock = $self->sock_to_host($host);
304 return $sock if $sock;
305 return undef if $self->{'no_rehash'};
306 $hv += _hashfunc($tries . $real_key); # stupid, but works
307 }
308 return undef;
309 }
310
311 sub init_buckets {
312 my Cache::Memcached $self = shift;
313 return if $self->{'buckets'};
314 my $bu = $self->{'buckets'} = [];
315 foreach my $v (@{$self->{'servers'}}) {
316 if (ref $v eq "ARRAY") {
317 for (1..$v->[1]) { push @$bu, $v->[0]; }
318 } else {
319 push @$bu, $v;
320 }
321 }
322 $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
323 }
324
325 sub disconnect_all {
326 my $sock;
327 foreach $sock (values %cache_sock) {
328 close $sock;
329 }
330 %cache_sock = ();
331 }
332
333 # writes a line, then reads result. by default stops reading after a
334 # single line, but caller can override the $check_complete subref,
335 # which gets passed a scalarref of buffer read thus far.
336 sub _write_and_read {
337 my Cache::Memcached $self = shift;
338 my ($sock, $line, $check_complete) = @_;
339 my $res;
340 my ($ret, $offset) = (undef, 0);
341
342 $check_complete ||= sub {
343 return (rindex($ret, "\r\n") + 2 == length($ret));
344 };
345
346 # state: 0 - writing, 1 - reading, 2 - done
347 my $state = 0;
348
349 # the bitsets for select
350 my ($rin, $rout, $win, $wout);
351 my $nfound;
352
353 my $copy_state = -1;
354 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
355
356 # the select loop
357 while(1) {
358 if ($copy_state!=$state) {
359 last if $state==2;
360 ($rin, $win) = ('', '');
361 vec($rin, fileno($sock), 1) = 1 if $state==1;
362 vec($win, fileno($sock), 1) = 1 if $state==0;
363 $copy_state = $state;
364 }
365 $nfound = select($rout=$rin, $wout=$win, undef,
366 $self->{'select_timeout'});
367 last unless $nfound;
368
369 if (vec($wout, fileno($sock), 1)) {
370 $res = send($sock, $line, $FLAG_NOSIGNAL);
371 next
372 if not defined $res and $!==EWOULDBLOCK;
373 unless ($res > 0) {
374 _close_sock($sock);
375 return undef;
376 }
377 if ($res == length($line)) { # all sent
378 $state = 1;
379 } else { # we only succeeded in sending some of it
380 substr($line, 0, $res, ''); # delete the part we sent
381 }
382 }
383
384 if (vec($rout, fileno($sock), 1)) {
385 $res = sysread($sock, $ret, 255, $offset);
386 next
387 if !defined($res) and $!==EWOULDBLOCK;
388 if ($res == 0) { # catches 0=conn closed or undef=error
389 _close_sock($sock);
390 return undef;
391 }
392 $offset += $res;
393 $state = 2 if $check_complete->(\$ret);
394 }
395 }
396
397 unless ($state == 2) {
398 _dead_sock($sock); # improperly finished
399 return undef;
400 }
401
402 return $ret;
403 }
404
405 sub delete {
406 my Cache::Memcached $self = shift;
407 my ($key, $time) = @_;
408 return 0 if ! $self->{'active'} || $self->{'readonly'};
409 my $stime = Time::HiRes::time() if $self->{'stat_callback'};
410 my $sock = $self->get_sock($key);
411 return 0 unless $sock;
412
413 $self->{'stats'}->{"delete"}++;
414 $key = ref $key ? $key->[1] : $key;
415 $time = $time ? " $time" : "";
416 my $cmd = "delete $self->{namespace}$key$time\r\n";
417 my $res = _write_and_read($self, $sock, $cmd);
418
419 if ($self->{'stat_callback'}) {
420 my $etime = Time::HiRes::time();
421 $self->{'stat_callback'}->($stime, $etime, $sock, 'delete');
422 }
423
424 return $res eq "DELETED\r\n";
425 }
426 *remove = \&delete;
427
428 sub add {
429 _set("add", @_);
430 }
431
432 sub replace {
433 _set("replace", @_);
434 }
435
436 sub set {
437 _set("set", @_);
438 }
439
440 sub _set {
441 my $cmdname = shift;
442 my Cache::Memcached $self = shift;
443 my ($key, $val, $exptime) = @_;
444 return 0 if ! $self->{'active'} || $self->{'readonly'};
445 my $stime = Time::HiRes::time() if $self->{'stat_callback'};
446 my $sock = $self->get_sock($key);
447 return 0 unless $sock;
448
449 use bytes; # return bytes from length()
450
451 $self->{'stats'}->{$cmdname}++;
452 my $flags = 0;
453 $key = ref $key ? $key->[1] : $key;
454
455 if (ref $val) {
456 local $Carp::CarpLevel = 2;
457 $val = Storable::nfreeze($val);
458 $flags |= F_STORABLE;
459 }
460
461 my $len = length($val);
462
463 if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} &&
464 $len >= $self->{'compress_threshold'}) {
465
466 my $c_val = Compress::Zlib::memGzip($val);
467 my $c_len = length($c_val);
468
469 # do we want to keep it?
470 if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
471 $val = $c_val;
472 $len = $c_len;
473 $flags |= F_COMPRESS;
474 }
475 }
476
477 $exptime = int($exptime || 0);
478
479 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
480 my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n";
481
482 my $res = _write_and_read($self, $sock, $line);
483
484 if ($self->{'debug'} && $line) {
485 chop $line; chop $line;
486 print STDERR "Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n";
487 }
488
489 if ($self->{'stat_callback'}) {
490 my $etime = Time::HiRes::time();
491 $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
492 }
493
494 return $res eq "STORED\r\n";
495 }
496
497 sub incr {
498 _incrdecr("incr", @_);
499 }
500
501 sub decr {
502 _incrdecr("decr", @_);
503 }
504
505 sub _incrdecr {
506 my $cmdname = shift;
507 my Cache::Memcached $self = shift;
508 my ($key, $value) = @_;
509 return undef if ! $self->{'active'} || $self->{'readonly'};
510 my $stime = Time::HiRes::time() if $self->{'stat_callback'};
511 my $sock = $self->get_sock($key);
512 return undef unless $sock;
513 $key = $key->[1] if ref $key;
514 $self->{'stats'}->{$cmdname}++;
515 $value = 1 unless defined $value;
516
517 my $line = "$cmdname $self->{namespace}$key $value\r\n";
518 my $res = _write_and_read($self, $sock, $line);
519
520 if ($self->{'stat_callback'}) {
521 my $etime = Time::HiRes::time();
522 $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
523 }
524
525 return undef unless $res =~ /^(\d+)/;
526 return $1;
527 }
528
529 sub get {
530 my Cache::Memcached $self = $_[0];
531 my $key = $_[1];
532
533 # TODO: make a fast path for this? or just keep using get_multi?
534 my $r = $self->get_multi($key);
535 my $kval = ref $key ? $key->[1] : $key;
536 return $r->{$kval};
537 }
538
539 sub get_multi {
540 my Cache::Memcached $self = shift;
541 return {} unless $self->{'active'};
542 $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
543 $self->{'stats'}->{"get_multi"}++;
544
545 my %val; # what we'll be returning a reference to (realkey -> value)
546 my %sock_keys; # sockref_as_scalar -> [ realkeys ]
547 my $sock;
548
549 if ($self->{'_single_sock'}) {
550 $sock = $self->sock_to_host($self->{'_single_sock'});
551 unless ($sock) {
552 return {};
553 }
554 foreach my $key (@_) {
555 my $kval = ref $key ? $key->[1] : $key;
556 push @{$sock_keys{$sock}}, $kval;
557 }
558 } else {
559 my $bcount = $self->{'bucketcount'};
560 my $sock;
561 KEY:
562 foreach my $key (@_) {
563 my ($hv, $real_key) = ref $key ?
564 (int($key->[0]), $key->[1]) :
565 ((crc32($key) >> 16) & 0x7fff, $key);
566
567 my $tries;
568 while (1) {
569 my $bucket = $hv % $bcount;
570
571 # this segfaults perl 5.8.4 (and others?) if sock_to_host returns undef... wtf?
572 #$sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
573 # and last;
574
575 # but this variant doesn't crash:
576 $sock = $buck2sock[$bucket] || $self->sock_to_host($self->{buckets}[ $bucket ]);
577 if ($sock) {
578 $buck2sock[$bucket] = $sock;
579 last;
580 }
581
582 next KEY if $tries++ >= 20;
583 $hv += _hashfunc($tries . $real_key);
584 }
585
586 push @{$sock_keys{$sock}}, $real_key;
587 }
588 }
589
590 $self->{'stats'}->{"get_keys"} += @_;
591 $self->{'stats'}->{"get_socks"} += keys %sock_keys;
592
593 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
594
595 _load_multi($self, \%sock_keys, \%val);
596
597 if ($self->{'debug'}) {
598 while (my ($k, $v) = each %val) {
599 print STDERR "MemCache: got $k = $v\n";
600 }
601 }
602 return \%val;
603 }
604
605 sub _load_multi {
606 use bytes; # return bytes from length()
607 my Cache::Memcached $self;
608 my ($sock_keys, $ret);
609
610 ($self, $sock_keys, $ret) = @_;
611
612 # all keyed by $sockstr:
613 my %reading; # $sockstr -> $sock. bool, whether we're reading from this socket
614 my %writing; # $sockstr -> $sock. bool, whether we're writing to this socket
615 my %buf; # buffers, for writing
616
617 my %parser; # $sockstr -> Cache::Memcached::GetParser
618
619 my $active_changed = 1; # force rebuilding of select sets
620
621 my $dead = sub {
622 my $sock = shift;
623 print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
624 delete $reading{$sock};
625 delete $writing{$sock};
626
627 if (my $p = $parser{$sock}) {
628 my $key = $p->current_key;
629 delete $ret->{$key} if $key;
630 }
631
632 if ($self->{'stat_callback'}) {
633 my $etime = Time::HiRes::time();
634 $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
635 }
636
637 close $sock;
638 _dead_sock($sock);
639 };
640
641 # $finalize->($key, $flags)
642 # $finalize->({ $key => $flags, $key => $flags });
643 my $finalize = sub {
644 my $map = $_[0];
645 $map = {@_} unless ref $map;
646
647 while (my ($k, $flags) = each %$map) {
648
649 # remove trailing \r\n
650 chop $ret->{$k}; chop $ret->{$k};
651
652 $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
653 if $HAVE_ZLIB && $flags & F_COMPRESS;
654 if ($flags & F_STORABLE) {
655 # wrapped in eval in case a perl 5.6 Storable tries to
656 # unthaw data from a perl 5.8 Storable. (5.6 is stupid
657 # and dies if the version number changes at all. in 5.8
658 # they made it only die if it unencounters a new feature)
659 eval {
660 $ret->{$k} = Storable::thaw($ret->{$k});
661 };
662 # so if there was a problem, just treat it as a cache miss.
663 if ($@) {
664 delete $ret->{$k};
665 }
666 }
667 }
668 };
669
670 foreach (keys %$sock_keys) {
671 my $ipport = $sock_map{$_} or die "No map found matching for $_";
672 my $sock = $cache_sock{$ipport} or die "No sock found for $ipport";
673 print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
674 $writing{$_} = $sock;
675 if ($self->{namespace}) {
676 $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
677 } else {
678 $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
679 }
680
681 $parser{$_} = $self->{parser_class}->new($ret, $self->{namespace_len}, $finalize);
682 }
683
684 my $read = sub {
685 my $sockstr = "$_[0]"; # $sock is $_[0];
686 my $p = $parser{$sockstr} or die;
687 my $rv = $p->parse_from_sock($_[0]);
688 if ($rv > 0) {
689 # okay, finished with this socket
690 delete $reading{$sockstr};
691 } elsif ($rv < 0) {
692 $dead->($_[0]);
693 }
694 return $rv;
695 };
696
697 # returns 1 when it's done, for success or error. 0 if still working.
698 my $write = sub {
699 my ($sock, $sockstr) = ($_[0], "$_[0]");
700 my $res;
701
702 $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
703
704 return 0
705 if not defined $res and $!==EWOULDBLOCK;
706 unless ($res > 0) {
707 $dead->($sock);
708 return 1;
709 }
710 if ($res == length($buf{$sockstr})) { # all sent
711 $buf{$sockstr} = "";
712
713 # switch the socket from writing to reading
714 delete $writing{$sockstr};
715 $reading{$sockstr} = $sock;
716 return 1;
717 } else { # we only succeeded in sending some of it
718 substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
719 }
720 return 0;
721 };
722
723 # the bitsets for select
724 my ($rin, $rout, $win, $wout);
725 my $nfound;
726
727 # the big select loop
728 while(1) {
729 if ($active_changed) {
730 last unless %reading or %writing; # no sockets left?
731 ($rin, $win) = ('', '');
732 foreach (values %reading) {
733 vec($rin, fileno($_), 1) = 1;
734 }
735 foreach (values %writing) {
736 vec($win, fileno($_), 1) = 1;
737 }
738 $active_changed = 0;
739 }
740 # TODO: more intelligent cumulative timeout?
741 # TODO: select is interruptible w/ ptrace attach, signal, etc. should note that.
742 $nfound = select($rout=$rin, $wout=$win, undef,
743 $self->{'select_timeout'});
744 last unless $nfound;
745
746 # TODO: possible robustness improvement: we could select
747 # writing sockets for reading also, and raise hell if they're
748 # ready (input unread from last time, etc.)
749 # maybe do that on the first loop only?
750 foreach (values %writing) {
751 if (vec($wout, fileno($_), 1)) {
752 $active_changed = 1 if $write->($_);
753 }
754 }
755 foreach (values %reading) {
756 if (vec($rout, fileno($_), 1)) {
757 $active_changed = 1 if $read->($_);
758 }
759 }
760 }
761
762 # if there're active sockets left, they need to die
763 foreach (values %writing) {
764 $dead->($_);
765 }
766 foreach (values %reading) {
767 $dead->($_);
768 }
769
770 return;
771 }
772
773 sub _hashfunc {
774 return (crc32($_[0]) >> 16) & 0x7fff;
775 }
776
777 sub flush_all {
778 my Cache::Memcached $self = shift;
779
780 my $success = 1;
781
782 my @hosts = @{$self->{'buckets'}};
783 foreach my $host (@hosts) {
784 my $sock = $self->sock_to_host($host);
785 my @res = $self->run_command($sock, "flush_all\r\n");
786 $success = 0 unless (@res);
787 }
788
789 return $success;
790 }
791
792 # returns array of lines, or () on failure.
793 sub run_command {
794 my Cache::Memcached $self = shift;
795 my ($sock, $cmd) = @_;
796 return () unless $sock;
797 my $ret;
798 my $line = $cmd;
799 while (my $res = _write_and_read($self, $sock, $line)) {
800 undef $line;
801 $ret .= $res;
802 last if $ret =~ /(?:OK|END|ERROR)\r\n$/;
803 }
804 chop $ret; chop $ret;
805 return map { "$_\r\n" } split(/\r\n/, $ret);
806 }
807
808 sub stats {
809 my Cache::Memcached $self = shift;
810 my ($types) = @_;
811 return 0 unless $self->{'active'};
812 return 0 unless !ref($types) || ref($types) eq 'ARRAY';
813 if (!ref($types)) {
814 if (!$types) {
815 # I don't much care what the default is, it should just
816 # be something reasonable. Obviously "reset" should not
817 # be on the list :) but other types that might go in here
818 # include maps, cachedump, slabs, or items.
819 $types = [ qw( misc malloc sizes self ) ];
820 } else {
821 $types = [ $types ];
822 }
823 }
824
825 my $stats_hr = { };
826
827 # The "self" stat type is special, it only applies to this very
828 # object.
829 if (grep /^self$/, @$types) {
830 $stats_hr->{'self'} = \%{ $self->{'stats'} };
831 }
832
833 my %misc_keys = map { $_ => 1 }
834 qw/ bytes bytes_read bytes_written
835 cmd_get cmd_set connection_structures curr_items
836 get_hits get_misses
837 total_connections total_items
838 /;
839
840 # Now handle the other types, passing each type to each host server.
841 my @hosts = @{$self->{'buckets'}};
842 HOST: foreach my $host (@hosts) {
843 my $sock = $self->sock_to_host($host);
844 TYPE: foreach my $typename (grep !/^self$/, @$types) {
845 my $type = $typename eq 'misc' ? "" : " $typename";
846 my $lines = _write_and_read($self, $sock, "stats$type\r\n", sub {
847 my $bref = shift;
848 return $$bref =~ /^(?:END|ERROR)\r?\n/m;
849 });
850 unless ($lines) {
851 _dead_sock($sock);
852 next HOST;
853 }
854
855 $lines =~ s/\0//g; # 'stats sizes' starts with NULL?
856
857 # And, most lines end in \r\n but 'stats maps' (as of
858 # July 2003 at least) ends in \n. ??
859 my @lines = split(/\r?\n/, $lines);
860
861 # Some stats are key-value, some are not. malloc,
862 # sizes, and the empty string are key-value.
863 # ("self" was handled separately above.)
864 if ($typename =~ /^(malloc|sizes|misc)$/) {
865 # This stat is key-value.
866 foreach my $line (@lines) {
867 my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
868 if ($key) {
869 $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
870 }
871 $stats_hr->{'total'}{$key} += $value
872 if $typename eq 'misc' && $key && $misc_keys{$key};
873 $stats_hr->{'total'}{"malloc_$key"} += $value
874 if $typename eq 'malloc' && $key;
875 }
876 } else {
877 # This stat is not key-value so just pull it
878 # all out in one blob.
879 $lines =~ s/^END\r?\n//m;
880 $stats_hr->{'hosts'}{$host}{$typename} ||= "";
881 $stats_hr->{'hosts'}{$host}{$typename} .= "$lines";
882 }
883 }
884 }
885
886 return $stats_hr;
887 }
888
889 sub stats_reset {
890 my Cache::Memcached $self = shift;
891 my ($types) = @_;
892 return 0 unless $self->{'active'};
893
894 HOST: foreach my $host (@{$self->{'buckets'}}) {
895 my $sock = $self->sock_to_host($host);
896 my $ok = _write_and_read($self, $sock, "stats reset");
897 unless ($ok eq "RESET\r\n") {
898 _dead_sock($sock);
899 }
900 }
901 return 1;
902 }
903
904 1;
905 __END__
906
907 =head1 NAME
908
909 Cache::Memcached - client library for memcached (memory cache daemon)
910
911 =head1 SYNOPSIS
912
913 use Cache::Memcached;
914
915 $memd = new Cache::Memcached {
916 'servers' => [ "10.0.0.15:11211", "10.0.0.15:11212", "/var/sock/memcached",
917 "10.0.0.17:11211", [ "10.0.0.17:11211", 3 ] ],
918 'debug' => 0,
919 'compress_threshold' => 10_000,
920 };
921 $memd->set_servers($array_ref);
922 $memd->set_compress_threshold(10_000);
923 $memd->enable_compress(0);
924
925 $memd->set("my_key", "Some value");
926 $memd->set("object_key", { 'complex' => [ "object", 2, 4 ]});
927
928 $val = $memd->get("my_key");
929 $val = $memd->get("object_key");
930 if ($val) { print $val->{'complex'}->[2]; }
931
932 $memd->incr("key");
933 $memd->decr("key");
934 $memd->incr("key", 2);
935
936 =head1 DESCRIPTION
937
938 This is the Perl API for memcached, a distributed memory cache daemon.
939 More information is available at:
940
941 http://www.danga.com/memcached/
942
943 =head1 CONSTRUCTOR
944
945 =over 4
946
947 =item C<new>
948
949 Takes one parameter, a hashref of options. The most important key is
950 C<servers>, but that can also be set later with the C<set_servers>
951 method. The servers must be an arrayref of hosts, each of which is
952 either a scalar of the form C<10.0.0.10:11211> or an arrayref of the
953 former and an integer weight value. (The default weight if
954 unspecified is 1.) It's recommended that weight values be kept as low
955 as possible, as this module currently allocates memory for bucket
956 distribution proportional to the total host weights.
957
958 Use C<compress_threshold> to set a compression threshold, in bytes.
959 Values larger than this threshold will be compressed by C<set> and
960 decompressed by C<get>.
961
962 Use C<no_rehash> to disable finding a new memcached server when one
963 goes down. Your application may or may not need this, depending on
964 your expirations and key usage.
965
966 Use C<readonly> to disable writes to backend memcached servers. Only
967 get and get_multi will work. This is useful in bizarre debug and
968 profiling cases only.
969
970 Use C<namespace> to prefix all keys with the provided namespace value.
971 That is, if you set namespace to "app1:" and later do a set of "foo"
972 to "bar", memcached is actually seeing you set "app1:foo" to "bar".
973
974 The other useful key is C<debug>, which when set to true will produce
975 diagnostics on STDERR.
976
977 =back
978
979 =head1 METHODS
980
981 =over 4
982
983 =item C<set_servers>
984
985 Sets the server list this module distributes key gets and sets between.
986 The format is an arrayref of identical form as described in the C<new>
987 constructor.
988
989 =item C<set_debug>
990
991 Sets the C<debug> flag. See C<new> constructor for more information.
992
993 =item C<set_readonly>
994
995 Sets the C<readonly> flag. See C<new> constructor for more information.
996
997 =item C<set_norehash>
998
999 Sets the C<no_rehash> flag. See C<new> constructor for more information.
1000
1001 =item C<set_compress_threshold>
1002
1003 Sets the compression threshold. See C<new> constructor for more information.
1004
1005 =item C<enable_compress>
1006
1007 Temporarily enable or disable compression. Has no effect if C<compress_threshold>
1008 isn't set, but has an overriding effect if it is.
1009
1010 =item C<get>
1011
1012 my $val = $memd->get($key);
1013
1014 Retrieves a key from the memcache. Returns the value (automatically
1015 thawed with Storable, if necessary) or undef.
1016
1017 The $key can optionally be an arrayref, with the first element being the
1018 hash value, if you want to avoid making this module calculate a hash
1019 value. You may prefer, for example, to keep all of a given user's
1020 objects on the same memcache server, so you could use the user's
1021 unique id as the hash value.
1022
1023 =item C<get_multi>
1024
1025 my $hashref = $memd->get_multi(@keys);
1026
1027 Retrieves multiple keys from the memcache doing just one query.
1028 Returns a hashref of key/value pairs that were available.
1029
1030 This method is recommended over regular 'get' as it lowers the number
1031 of total packets flying around your network, reducing total latency,
1032 since your app doesn't have to wait for each round-trip of 'get'
1033 before sending the next one.
1034
1035 =item C<set>
1036
1037 $memd->set($key, $value[, $exptime]);
1038
1039 Unconditionally sets a key to a given value in the memcache. Returns true
1040 if it was stored successfully.
1041
1042 The $key can optionally be an arrayref, with the first element being the
1043 hash value, as described above.
1044
1045 The $exptime (expiration time) defaults to "never" if unspecified. If
1046 you want the key to expire in memcached, pass an integer $exptime. If
1047 value is less than 60*60*24*30 (30 days), time is assumed to be relative
1048 from the present. If larger, it's considered an absolute Unix time.
1049
1050 =item C<add>
1051
1052 $memd->add($key, $value[, $exptime]);
1053
1054 Like C<set>, but only stores in memcache if the key doesn't already exist.
1055
1056 =item C<replace>
1057
1058 $memd->replace($key, $value[, $exptime]);
1059
1060 Like C<set>, but only stores in memcache if the key already exists. The
1061 opposite of C<add>.
1062
1063 =item C<delete>
1064
1065 $memd->delete($key[, $time]);
1066
1067 Deletes a key. You may optionally provide an integer time value (in seconds) to
1068 tell the memcached server to block new writes to this key for that many seconds.
1069 (Sometimes useful as a hacky means to prevent races.) Returns true if key
1070 was found and deleted, and false otherwise.
1071
1072 You may also use the alternate method name B<remove>, so
1073 Cache::Memcached looks like the L<Cache::Cache> API.
1074
1075 =item C<incr>
1076
1077 $memd->incr($key[, $value]);
1078
1079 Sends a command to the server to atomically increment the value for
1080 $key by $value, or by 1 if $value is undefined. Returns undef if $key
1081 doesn't exist on server, otherwise it returns the new value after
1082 incrementing. Value should be zero or greater. Overflow on server
1083 is not checked. Be aware of values approaching 2**32. See decr.
1084
1085 =item C<decr>
1086
1087 $memd->decr($key[, $value]);
1088
1089 Like incr, but decrements. Unlike incr, underflow is checked and new
1090 values are capped at 0. If server value is 1, a decrement of 2
1091 returns 0, not -1.
1092
1093 =item C<stats>
1094
1095 $memd->stats([$keys]);
1096
1097 Returns a hashref of statistical data regarding the memcache server(s),
1098 the $memd object, or both. $keys can be an arrayref of keys wanted, a
1099 single key wanted, or absent (in which case the default value is malloc,
1100 sizes, self, and the empty string). These keys are the values passed
1101 to the 'stats' command issued to the memcached server(s), except for
1102 'self' which is internal to the $memd object. Allowed values are:
1103
1104 =over 4
1105
1106 =item C<misc>
1107
1108 The stats returned by a 'stats' command: pid, uptime, version,
1109 bytes, get_hits, etc.
1110
1111 =item C<malloc>
1112
1113 The stats returned by a 'stats malloc': total_alloc, arena_size, etc.
1114
1115 =item C<sizes>
1116
1117 The stats returned by a 'stats sizes'.
1118
1119 =item C<self>
1120
1121 The stats for the $memd object itself (a copy of $memd->{'stats'}).
1122
1123 =item C<maps>
1124
1125 The stats returned by a 'stats maps'.
1126
1127 =item C<cachedump>
1128
1129 The stats returned by a 'stats cachedump'.
1130
1131 =item C<slabs>
1132
1133 The stats returned by a 'stats slabs'.
1134
1135 =item C<items>
1136
1137 The stats returned by a 'stats items'.
1138
1139 =back
1140
1141 =item C<disconnect_all>
1142
1143 $memd->disconnect_all;
1144
1145 Closes all cached sockets to all memcached servers. You must do this
1146 if your program forks and the parent has used this module at all.
1147 Otherwise the children will try to use cached sockets and they'll fight
1148 (as children do) and garble the client/server protocol.
1149
1150 =item C<flush_all>
1151
1152 $memd->flush_all;
1153
1154 Runs the memcached "flush_all" command on all configured hosts,
1155 emptying all their caches. (or rather, invalidating all items
1156 in the caches in an O(1) operation...) Running stats will still
1157 show the item existing, they're just be non-existent and lazily
1158 destroyed next time you try to detch any of them.
1159
1160 =back
1161
1162 =head1 BUGS
1163
1164 When a server goes down, this module does detect it, and re-hashes the
1165 request to the remaining servers, but the way it does it isn't very
1166 clean. The result may be that it gives up during its rehashing and
1167 refuses to get/set something it could've, had it been done right.
1168
1169 =head1 COPYRIGHT
1170
1171 This module is Copyright (c) 2003 Brad Fitzpatrick.
1172 All rights reserved.
1173
1174 You may distribute under the terms of either the GNU General Public
1175 License or the Artistic License, as specified in the Perl README file.
1176
1177 =head1 WARRANTY
1178
1179 This is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1180
1181 =head1 FAQ
1182
1183 See the memcached website:
1184 http://www.danga.com/memcached/
1185
1186 =head1 AUTHORS
1187
1188 Brad Fitzpatrick <brad@danga.com>
1189
1190 Anatoly Vorobey <mellon@pobox.com>
1191
1192 Brad Whitaker <whitaker@danga.com>
1193
1194 Jamie McCarthy <jamie@mccarthy.vg>