Mercurial > hg > Cache-Memcached
comparison lib/Cache/Memcached.pm @ 0:17fc6afc155e CACHE_MEMCACHED_1_24
Cache::Memcached 1.24
author | Maxim Dounin <mdounin@mdounin.ru> |
---|---|
date | Sun, 30 Sep 2007 16:23:31 +0400 |
parents | |
children | f5cfb726ea65 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:17fc6afc155e |
---|---|
1 # $Id: Memcached.pm 601 2007-07-17 17:47:33Z bradfitz $ | |
2 # | |
3 # Copyright (c) 2003, 2004 Brad Fitzpatrick <brad@danga.com> | |
4 # | |
5 # See COPYRIGHT section in pod text below for usage and distribution rights. | |
6 # | |
7 | |
8 package Cache::Memcached; | |
9 | |
10 use strict; | |
11 use warnings; | |
12 | |
13 no strict 'refs'; | |
14 use Storable (); | |
15 use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM ); | |
16 use IO::Handle (); | |
17 use Time::HiRes (); | |
18 use String::CRC32; | |
19 use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN ); | |
20 use Cache::Memcached::GetParser; | |
21 use fields qw{ | |
22 debug no_rehash stats compress_threshold compress_enable stat_callback | |
23 readonly select_timeout namespace namespace_len servers active buckets | |
24 pref_ip | |
25 bucketcount _single_sock _stime | |
26 connect_timeout cb_connect_fail | |
27 parser_class | |
28 }; | |
29 | |
30 # flag definitions | |
31 use constant F_STORABLE => 1; | |
32 use constant F_COMPRESS => 2; | |
33 | |
34 # size savings required before saving compressed value | |
35 use constant COMPRESS_SAVINGS => 0.20; # percent | |
36 | |
37 use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL); | |
38 $VERSION = "1.24"; | |
39 | |
40 BEGIN { | |
41 $HAVE_ZLIB = eval "use Compress::Zlib (); 1;"; | |
42 } | |
43 | |
44 my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;"; | |
45 $HAVE_XS = 0 if $ENV{NO_XS}; | |
46 | |
47 my $parser_class = $HAVE_XS ? "Cache::Memcached::GetParserXS" : "Cache::Memcached::GetParser"; | |
48 if ($ENV{XS_DEBUG}) { | |
49 print "using parser: $parser_class\n"; | |
50 } | |
51 | |
52 $FLAG_NOSIGNAL = 0; | |
53 eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; }; | |
54 | |
55 my %host_dead; # host -> unixtime marked dead until | |
56 my %cache_sock; # host -> socket | |
57 my @buck2sock; # bucket number -> $sock | |
58 | |
59 my $PROTO_TCP; | |
60 | |
61 our $SOCK_TIMEOUT = 2.6; # default timeout in seconds | |
62 | |
63 sub new { | |
64 my Cache::Memcached $self = shift; | |
65 $self = fields::new( $self ) unless ref $self; | |
66 | |
67 my $args = (@_ == 1) ? shift : { @_ }; # hashref-ify args | |
68 | |
69 $self->set_servers($args->{'servers'}); | |
70 $self->{'debug'} = $args->{'debug'} || 0; | |
71 $self->{'no_rehash'} = $args->{'no_rehash'}; | |
72 $self->{'stats'} = {}; | |
73 $self->{'pref_ip'} = $args->{'pref_ip'} || {}; | |
74 $self->{'compress_threshold'} = $args->{'compress_threshold'}; | |
75 $self->{'compress_enable'} = 1; | |
76 $self->{'stat_callback'} = $args->{'stat_callback'} || undef; | |
77 $self->{'readonly'} = $args->{'readonly'}; | |
78 $self->{'parser_class'} = $args->{'parser_class'} || $parser_class; | |
79 | |
80 # TODO: undocumented | |
81 $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25; | |
82 $self->{'select_timeout'} = $args->{'select_timeout'} || 1.0; | |
83 $self->{namespace} = $args->{namespace} || ''; | |
84 $self->{namespace_len} = length $self->{namespace}; | |
85 | |
86 return $self; | |
87 } | |
88 | |
89 sub set_pref_ip { | |
90 my Cache::Memcached $self = shift; | |
91 $self->{'pref_ip'} = shift; | |
92 } | |
93 | |
94 sub set_servers { | |
95 my Cache::Memcached $self = shift; | |
96 my ($list) = @_; | |
97 $self->{'servers'} = $list || []; | |
98 $self->{'active'} = scalar @{$self->{'servers'}}; | |
99 $self->{'buckets'} = undef; | |
100 $self->{'bucketcount'} = 0; | |
101 $self->init_buckets; | |
102 @buck2sock = (); | |
103 | |
104 $self->{'_single_sock'} = undef; | |
105 if (@{$self->{'servers'}} == 1) { | |
106 $self->{'_single_sock'} = $self->{'servers'}[0]; | |
107 } | |
108 | |
109 return $self; | |
110 } | |
111 | |
112 sub set_cb_connect_fail { | |
113 my Cache::Memcached $self = shift; | |
114 $self->{'cb_connect_fail'} = shift; | |
115 } | |
116 | |
117 sub set_connect_timeout { | |
118 my Cache::Memcached $self = shift; | |
119 $self->{'connect_timeout'} = shift; | |
120 } | |
121 | |
122 sub set_debug { | |
123 my Cache::Memcached $self = shift; | |
124 my ($dbg) = @_; | |
125 $self->{'debug'} = $dbg || 0; | |
126 } | |
127 | |
128 sub set_readonly { | |
129 my Cache::Memcached $self = shift; | |
130 my ($ro) = @_; | |
131 $self->{'readonly'} = $ro; | |
132 } | |
133 | |
134 sub set_norehash { | |
135 my Cache::Memcached $self = shift; | |
136 my ($val) = @_; | |
137 $self->{'no_rehash'} = $val; | |
138 } | |
139 | |
140 sub set_compress_threshold { | |
141 my Cache::Memcached $self = shift; | |
142 my ($thresh) = @_; | |
143 $self->{'compress_threshold'} = $thresh; | |
144 } | |
145 | |
146 sub enable_compress { | |
147 my Cache::Memcached $self = shift; | |
148 my ($enable) = @_; | |
149 $self->{'compress_enable'} = $enable; | |
150 } | |
151 | |
152 sub forget_dead_hosts { | |
153 %host_dead = (); | |
154 @buck2sock = (); | |
155 } | |
156 | |
157 sub set_stat_callback { | |
158 my Cache::Memcached $self = shift; | |
159 my ($stat_callback) = @_; | |
160 $self->{'stat_callback'} = $stat_callback; | |
161 } | |
162 | |
163 my %sock_map; # stringified-$sock -> "$ip:$port" | |
164 | |
165 sub _dead_sock { | |
166 my ($sock, $ret, $dead_for) = @_; | |
167 if (my $ipport = $sock_map{$sock}) { | |
168 my $now = time(); | |
169 $host_dead{$ipport} = $now + $dead_for | |
170 if $dead_for; | |
171 delete $cache_sock{$ipport}; | |
172 delete $sock_map{$sock}; | |
173 } | |
174 @buck2sock = (); | |
175 return $ret; # 0 or undef, probably, depending on what caller wants | |
176 } | |
177 | |
178 sub _close_sock { | |
179 my ($sock) = @_; | |
180 if (my $ipport = $sock_map{$sock}) { | |
181 close $sock; | |
182 delete $cache_sock{$ipport}; | |
183 delete $sock_map{$sock}; | |
184 } | |
185 @buck2sock = (); | |
186 } | |
187 | |
188 sub _connect_sock { # sock, sin, timeout | |
189 my ($sock, $sin, $timeout) = @_; | |
190 $timeout = 0.25 if not defined $timeout; | |
191 | |
192 # make the socket non-blocking from now on, | |
193 # except if someone wants 0 timeout, meaning | |
194 # a blocking connect, but even then turn it | |
195 # non-blocking at the end of this function | |
196 | |
197 if ($timeout) { | |
198 IO::Handle::blocking($sock, 0); | |
199 } else { | |
200 IO::Handle::blocking($sock, 1); | |
201 } | |
202 | |
203 my $ret = connect($sock, $sin); | |
204 | |
205 if (!$ret && $timeout && $!==EINPROGRESS) { | |
206 | |
207 my $win=''; | |
208 vec($win, fileno($sock), 1) = 1; | |
209 | |
210 if (select(undef, $win, undef, $timeout) > 0) { | |
211 $ret = connect($sock, $sin); | |
212 # EISCONN means connected & won't re-connect, so success | |
213 $ret = 1 if !$ret && $!==EISCONN; | |
214 } | |
215 } | |
216 | |
217 unless ($timeout) { # socket was temporarily blocking, now revert | |
218 IO::Handle::blocking($sock, 0); | |
219 } | |
220 | |
221 # from here on, we use non-blocking (async) IO for the duration | |
222 # of the socket's life | |
223 | |
224 return $ret; | |
225 } | |
226 | |
227 sub sock_to_host { # (host) | |
228 my Cache::Memcached $self = ref $_[0] ? shift : undef; | |
229 my $host = $_[0]; | |
230 return $cache_sock{$host} if $cache_sock{$host}; | |
231 | |
232 my $now = time(); | |
233 my ($ip, $port) = $host =~ /(.*):(\d+)/; | |
234 return undef if | |
235 $host_dead{$host} && $host_dead{$host} > $now; | |
236 my $sock; | |
237 | |
238 my $connected = 0; | |
239 my $sin; | |
240 my $proto = $PROTO_TCP ||= getprotobyname('tcp'); | |
241 | |
242 if ( index($host, '/') != 0 ) | |
243 { | |
244 # if a preferred IP is known, try that first. | |
245 if ($self && $self->{pref_ip}{$ip}) { | |
246 socket($sock, PF_INET, SOCK_STREAM, $proto); | |
247 my $prefip = $self->{pref_ip}{$ip}; | |
248 $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip)); | |
249 if (_connect_sock($sock,$sin,$self->{connect_timeout})) { | |
250 $connected = 1; | |
251 } else { | |
252 if (my $cb = $self->{cb_connect_fail}) { | |
253 $cb->($prefip); | |
254 } | |
255 close $sock; | |
256 } | |
257 } | |
258 | |
259 # normal path, or fallback path if preferred IP failed | |
260 unless ($connected) { | |
261 socket($sock, PF_INET, SOCK_STREAM, $proto); | |
262 $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip)); | |
263 my $timeout = $self ? $self->{connect_timeout} : 0.25; | |
264 unless (_connect_sock($sock,$sin,$timeout)) { | |
265 my $cb = $self ? $self->{cb_connect_fail} : undef; | |
266 $cb->($ip) if $cb; | |
267 return _dead_sock($sock, undef, 20 + int(rand(10))); | |
268 } | |
269 } | |
270 } else { # it's a unix domain/local socket | |
271 socket($sock, PF_UNIX, SOCK_STREAM, 0); | |
272 $sin = Socket::sockaddr_un($host); | |
273 my $timeout = $self ? $self->{connect_timeout} : 0.25; | |
274 unless (_connect_sock($sock,$sin,$timeout)) { | |
275 my $cb = $self ? $self->{cb_connect_fail} : undef; | |
276 $cb->($host) if $cb; | |
277 return _dead_sock($sock, undef, 20 + int(rand(10))); | |
278 } | |
279 } | |
280 | |
281 # make the new socket not buffer writes. | |
282 my $old = select($sock); | |
283 $| = 1; | |
284 select($old); | |
285 | |
286 $sock_map{$sock} = $host; | |
287 $cache_sock{$host} = $sock; | |
288 | |
289 return $sock; | |
290 } | |
291 | |
292 sub get_sock { # (key) | |
293 my Cache::Memcached $self = $_[0]; | |
294 my $key = $_[1]; | |
295 return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'}; | |
296 return undef unless $self->{'active'}; | |
297 my $hv = ref $key ? int($key->[0]) : _hashfunc($key); | |
298 | |
299 my $real_key = ref $key ? $key->[1] : $key; | |
300 my $tries = 0; | |
301 while ($tries++ < 20) { | |
302 my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}]; | |
303 my $sock = $self->sock_to_host($host); | |
304 return $sock if $sock; | |
305 return undef if $self->{'no_rehash'}; | |
306 $hv += _hashfunc($tries . $real_key); # stupid, but works | |
307 } | |
308 return undef; | |
309 } | |
310 | |
311 sub init_buckets { | |
312 my Cache::Memcached $self = shift; | |
313 return if $self->{'buckets'}; | |
314 my $bu = $self->{'buckets'} = []; | |
315 foreach my $v (@{$self->{'servers'}}) { | |
316 if (ref $v eq "ARRAY") { | |
317 for (1..$v->[1]) { push @$bu, $v->[0]; } | |
318 } else { | |
319 push @$bu, $v; | |
320 } | |
321 } | |
322 $self->{'bucketcount'} = scalar @{$self->{'buckets'}}; | |
323 } | |
324 | |
325 sub disconnect_all { | |
326 my $sock; | |
327 foreach $sock (values %cache_sock) { | |
328 close $sock; | |
329 } | |
330 %cache_sock = (); | |
331 } | |
332 | |
333 # writes a line, then reads result. by default stops reading after a | |
334 # single line, but caller can override the $check_complete subref, | |
335 # which gets passed a scalarref of buffer read thus far. | |
336 sub _write_and_read { | |
337 my Cache::Memcached $self = shift; | |
338 my ($sock, $line, $check_complete) = @_; | |
339 my $res; | |
340 my ($ret, $offset) = (undef, 0); | |
341 | |
342 $check_complete ||= sub { | |
343 return (rindex($ret, "\r\n") + 2 == length($ret)); | |
344 }; | |
345 | |
346 # state: 0 - writing, 1 - reading, 2 - done | |
347 my $state = 0; | |
348 | |
349 # the bitsets for select | |
350 my ($rin, $rout, $win, $wout); | |
351 my $nfound; | |
352 | |
353 my $copy_state = -1; | |
354 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL; | |
355 | |
356 # the select loop | |
357 while(1) { | |
358 if ($copy_state!=$state) { | |
359 last if $state==2; | |
360 ($rin, $win) = ('', ''); | |
361 vec($rin, fileno($sock), 1) = 1 if $state==1; | |
362 vec($win, fileno($sock), 1) = 1 if $state==0; | |
363 $copy_state = $state; | |
364 } | |
365 $nfound = select($rout=$rin, $wout=$win, undef, | |
366 $self->{'select_timeout'}); | |
367 last unless $nfound; | |
368 | |
369 if (vec($wout, fileno($sock), 1)) { | |
370 $res = send($sock, $line, $FLAG_NOSIGNAL); | |
371 next | |
372 if not defined $res and $!==EWOULDBLOCK; | |
373 unless ($res > 0) { | |
374 _close_sock($sock); | |
375 return undef; | |
376 } | |
377 if ($res == length($line)) { # all sent | |
378 $state = 1; | |
379 } else { # we only succeeded in sending some of it | |
380 substr($line, 0, $res, ''); # delete the part we sent | |
381 } | |
382 } | |
383 | |
384 if (vec($rout, fileno($sock), 1)) { | |
385 $res = sysread($sock, $ret, 255, $offset); | |
386 next | |
387 if !defined($res) and $!==EWOULDBLOCK; | |
388 if ($res == 0) { # catches 0=conn closed or undef=error | |
389 _close_sock($sock); | |
390 return undef; | |
391 } | |
392 $offset += $res; | |
393 $state = 2 if $check_complete->(\$ret); | |
394 } | |
395 } | |
396 | |
397 unless ($state == 2) { | |
398 _dead_sock($sock); # improperly finished | |
399 return undef; | |
400 } | |
401 | |
402 return $ret; | |
403 } | |
404 | |
405 sub delete { | |
406 my Cache::Memcached $self = shift; | |
407 my ($key, $time) = @_; | |
408 return 0 if ! $self->{'active'} || $self->{'readonly'}; | |
409 my $stime = Time::HiRes::time() if $self->{'stat_callback'}; | |
410 my $sock = $self->get_sock($key); | |
411 return 0 unless $sock; | |
412 | |
413 $self->{'stats'}->{"delete"}++; | |
414 $key = ref $key ? $key->[1] : $key; | |
415 $time = $time ? " $time" : ""; | |
416 my $cmd = "delete $self->{namespace}$key$time\r\n"; | |
417 my $res = _write_and_read($self, $sock, $cmd); | |
418 | |
419 if ($self->{'stat_callback'}) { | |
420 my $etime = Time::HiRes::time(); | |
421 $self->{'stat_callback'}->($stime, $etime, $sock, 'delete'); | |
422 } | |
423 | |
424 return $res eq "DELETED\r\n"; | |
425 } | |
426 *remove = \&delete; | |
427 | |
428 sub add { | |
429 _set("add", @_); | |
430 } | |
431 | |
432 sub replace { | |
433 _set("replace", @_); | |
434 } | |
435 | |
436 sub set { | |
437 _set("set", @_); | |
438 } | |
439 | |
440 sub _set { | |
441 my $cmdname = shift; | |
442 my Cache::Memcached $self = shift; | |
443 my ($key, $val, $exptime) = @_; | |
444 return 0 if ! $self->{'active'} || $self->{'readonly'}; | |
445 my $stime = Time::HiRes::time() if $self->{'stat_callback'}; | |
446 my $sock = $self->get_sock($key); | |
447 return 0 unless $sock; | |
448 | |
449 use bytes; # return bytes from length() | |
450 | |
451 $self->{'stats'}->{$cmdname}++; | |
452 my $flags = 0; | |
453 $key = ref $key ? $key->[1] : $key; | |
454 | |
455 if (ref $val) { | |
456 local $Carp::CarpLevel = 2; | |
457 $val = Storable::nfreeze($val); | |
458 $flags |= F_STORABLE; | |
459 } | |
460 | |
461 my $len = length($val); | |
462 | |
463 if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} && | |
464 $len >= $self->{'compress_threshold'}) { | |
465 | |
466 my $c_val = Compress::Zlib::memGzip($val); | |
467 my $c_len = length($c_val); | |
468 | |
469 # do we want to keep it? | |
470 if ($c_len < $len*(1 - COMPRESS_SAVINGS)) { | |
471 $val = $c_val; | |
472 $len = $c_len; | |
473 $flags |= F_COMPRESS; | |
474 } | |
475 } | |
476 | |
477 $exptime = int($exptime || 0); | |
478 | |
479 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL; | |
480 my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n"; | |
481 | |
482 my $res = _write_and_read($self, $sock, $line); | |
483 | |
484 if ($self->{'debug'} && $line) { | |
485 chop $line; chop $line; | |
486 print STDERR "Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n"; | |
487 } | |
488 | |
489 if ($self->{'stat_callback'}) { | |
490 my $etime = Time::HiRes::time(); | |
491 $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname); | |
492 } | |
493 | |
494 return $res eq "STORED\r\n"; | |
495 } | |
496 | |
497 sub incr { | |
498 _incrdecr("incr", @_); | |
499 } | |
500 | |
501 sub decr { | |
502 _incrdecr("decr", @_); | |
503 } | |
504 | |
505 sub _incrdecr { | |
506 my $cmdname = shift; | |
507 my Cache::Memcached $self = shift; | |
508 my ($key, $value) = @_; | |
509 return undef if ! $self->{'active'} || $self->{'readonly'}; | |
510 my $stime = Time::HiRes::time() if $self->{'stat_callback'}; | |
511 my $sock = $self->get_sock($key); | |
512 return undef unless $sock; | |
513 $key = $key->[1] if ref $key; | |
514 $self->{'stats'}->{$cmdname}++; | |
515 $value = 1 unless defined $value; | |
516 | |
517 my $line = "$cmdname $self->{namespace}$key $value\r\n"; | |
518 my $res = _write_and_read($self, $sock, $line); | |
519 | |
520 if ($self->{'stat_callback'}) { | |
521 my $etime = Time::HiRes::time(); | |
522 $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname); | |
523 } | |
524 | |
525 return undef unless $res =~ /^(\d+)/; | |
526 return $1; | |
527 } | |
528 | |
529 sub get { | |
530 my Cache::Memcached $self = $_[0]; | |
531 my $key = $_[1]; | |
532 | |
533 # TODO: make a fast path for this? or just keep using get_multi? | |
534 my $r = $self->get_multi($key); | |
535 my $kval = ref $key ? $key->[1] : $key; | |
536 return $r->{$kval}; | |
537 } | |
538 | |
539 sub get_multi { | |
540 my Cache::Memcached $self = shift; | |
541 return {} unless $self->{'active'}; | |
542 $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'}; | |
543 $self->{'stats'}->{"get_multi"}++; | |
544 | |
545 my %val; # what we'll be returning a reference to (realkey -> value) | |
546 my %sock_keys; # sockref_as_scalar -> [ realkeys ] | |
547 my $sock; | |
548 | |
549 if ($self->{'_single_sock'}) { | |
550 $sock = $self->sock_to_host($self->{'_single_sock'}); | |
551 unless ($sock) { | |
552 return {}; | |
553 } | |
554 foreach my $key (@_) { | |
555 my $kval = ref $key ? $key->[1] : $key; | |
556 push @{$sock_keys{$sock}}, $kval; | |
557 } | |
558 } else { | |
559 my $bcount = $self->{'bucketcount'}; | |
560 my $sock; | |
561 KEY: | |
562 foreach my $key (@_) { | |
563 my ($hv, $real_key) = ref $key ? | |
564 (int($key->[0]), $key->[1]) : | |
565 ((crc32($key) >> 16) & 0x7fff, $key); | |
566 | |
567 my $tries; | |
568 while (1) { | |
569 my $bucket = $hv % $bcount; | |
570 | |
571 # this segfaults perl 5.8.4 (and others?) if sock_to_host returns undef... wtf? | |
572 #$sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ]) | |
573 # and last; | |
574 | |
575 # but this variant doesn't crash: | |
576 $sock = $buck2sock[$bucket] || $self->sock_to_host($self->{buckets}[ $bucket ]); | |
577 if ($sock) { | |
578 $buck2sock[$bucket] = $sock; | |
579 last; | |
580 } | |
581 | |
582 next KEY if $tries++ >= 20; | |
583 $hv += _hashfunc($tries . $real_key); | |
584 } | |
585 | |
586 push @{$sock_keys{$sock}}, $real_key; | |
587 } | |
588 } | |
589 | |
590 $self->{'stats'}->{"get_keys"} += @_; | |
591 $self->{'stats'}->{"get_socks"} += keys %sock_keys; | |
592 | |
593 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL; | |
594 | |
595 _load_multi($self, \%sock_keys, \%val); | |
596 | |
597 if ($self->{'debug'}) { | |
598 while (my ($k, $v) = each %val) { | |
599 print STDERR "MemCache: got $k = $v\n"; | |
600 } | |
601 } | |
602 return \%val; | |
603 } | |
604 | |
605 sub _load_multi { | |
606 use bytes; # return bytes from length() | |
607 my Cache::Memcached $self; | |
608 my ($sock_keys, $ret); | |
609 | |
610 ($self, $sock_keys, $ret) = @_; | |
611 | |
612 # all keyed by $sockstr: | |
613 my %reading; # $sockstr -> $sock. bool, whether we're reading from this socket | |
614 my %writing; # $sockstr -> $sock. bool, whether we're writing to this socket | |
615 my %buf; # buffers, for writing | |
616 | |
617 my %parser; # $sockstr -> Cache::Memcached::GetParser | |
618 | |
619 my $active_changed = 1; # force rebuilding of select sets | |
620 | |
621 my $dead = sub { | |
622 my $sock = shift; | |
623 print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2; | |
624 delete $reading{$sock}; | |
625 delete $writing{$sock}; | |
626 | |
627 if (my $p = $parser{$sock}) { | |
628 my $key = $p->current_key; | |
629 delete $ret->{$key} if $key; | |
630 } | |
631 | |
632 if ($self->{'stat_callback'}) { | |
633 my $etime = Time::HiRes::time(); | |
634 $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi'); | |
635 } | |
636 | |
637 close $sock; | |
638 _dead_sock($sock); | |
639 }; | |
640 | |
641 # $finalize->($key, $flags) | |
642 # $finalize->({ $key => $flags, $key => $flags }); | |
643 my $finalize = sub { | |
644 my $map = $_[0]; | |
645 $map = {@_} unless ref $map; | |
646 | |
647 while (my ($k, $flags) = each %$map) { | |
648 | |
649 # remove trailing \r\n | |
650 chop $ret->{$k}; chop $ret->{$k}; | |
651 | |
652 $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k}) | |
653 if $HAVE_ZLIB && $flags & F_COMPRESS; | |
654 if ($flags & F_STORABLE) { | |
655 # wrapped in eval in case a perl 5.6 Storable tries to | |
656 # unthaw data from a perl 5.8 Storable. (5.6 is stupid | |
657 # and dies if the version number changes at all. in 5.8 | |
658 # they made it only die if it unencounters a new feature) | |
659 eval { | |
660 $ret->{$k} = Storable::thaw($ret->{$k}); | |
661 }; | |
662 # so if there was a problem, just treat it as a cache miss. | |
663 if ($@) { | |
664 delete $ret->{$k}; | |
665 } | |
666 } | |
667 } | |
668 }; | |
669 | |
670 foreach (keys %$sock_keys) { | |
671 my $ipport = $sock_map{$_} or die "No map found matching for $_"; | |
672 my $sock = $cache_sock{$ipport} or die "No sock found for $ipport"; | |
673 print STDERR "processing socket $_\n" if $self->{'debug'} >= 2; | |
674 $writing{$_} = $sock; | |
675 if ($self->{namespace}) { | |
676 $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n"); | |
677 } else { | |
678 $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n"); | |
679 } | |
680 | |
681 $parser{$_} = $self->{parser_class}->new($ret, $self->{namespace_len}, $finalize); | |
682 } | |
683 | |
684 my $read = sub { | |
685 my $sockstr = "$_[0]"; # $sock is $_[0]; | |
686 my $p = $parser{$sockstr} or die; | |
687 my $rv = $p->parse_from_sock($_[0]); | |
688 if ($rv > 0) { | |
689 # okay, finished with this socket | |
690 delete $reading{$sockstr}; | |
691 } elsif ($rv < 0) { | |
692 $dead->($_[0]); | |
693 } | |
694 return $rv; | |
695 }; | |
696 | |
697 # returns 1 when it's done, for success or error. 0 if still working. | |
698 my $write = sub { | |
699 my ($sock, $sockstr) = ($_[0], "$_[0]"); | |
700 my $res; | |
701 | |
702 $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL); | |
703 | |
704 return 0 | |
705 if not defined $res and $!==EWOULDBLOCK; | |
706 unless ($res > 0) { | |
707 $dead->($sock); | |
708 return 1; | |
709 } | |
710 if ($res == length($buf{$sockstr})) { # all sent | |
711 $buf{$sockstr} = ""; | |
712 | |
713 # switch the socket from writing to reading | |
714 delete $writing{$sockstr}; | |
715 $reading{$sockstr} = $sock; | |
716 return 1; | |
717 } else { # we only succeeded in sending some of it | |
718 substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent | |
719 } | |
720 return 0; | |
721 }; | |
722 | |
723 # the bitsets for select | |
724 my ($rin, $rout, $win, $wout); | |
725 my $nfound; | |
726 | |
727 # the big select loop | |
728 while(1) { | |
729 if ($active_changed) { | |
730 last unless %reading or %writing; # no sockets left? | |
731 ($rin, $win) = ('', ''); | |
732 foreach (values %reading) { | |
733 vec($rin, fileno($_), 1) = 1; | |
734 } | |
735 foreach (values %writing) { | |
736 vec($win, fileno($_), 1) = 1; | |
737 } | |
738 $active_changed = 0; | |
739 } | |
740 # TODO: more intelligent cumulative timeout? | |
741 # TODO: select is interruptible w/ ptrace attach, signal, etc. should note that. | |
742 $nfound = select($rout=$rin, $wout=$win, undef, | |
743 $self->{'select_timeout'}); | |
744 last unless $nfound; | |
745 | |
746 # TODO: possible robustness improvement: we could select | |
747 # writing sockets for reading also, and raise hell if they're | |
748 # ready (input unread from last time, etc.) | |
749 # maybe do that on the first loop only? | |
750 foreach (values %writing) { | |
751 if (vec($wout, fileno($_), 1)) { | |
752 $active_changed = 1 if $write->($_); | |
753 } | |
754 } | |
755 foreach (values %reading) { | |
756 if (vec($rout, fileno($_), 1)) { | |
757 $active_changed = 1 if $read->($_); | |
758 } | |
759 } | |
760 } | |
761 | |
762 # if there're active sockets left, they need to die | |
763 foreach (values %writing) { | |
764 $dead->($_); | |
765 } | |
766 foreach (values %reading) { | |
767 $dead->($_); | |
768 } | |
769 | |
770 return; | |
771 } | |
772 | |
773 sub _hashfunc { | |
774 return (crc32($_[0]) >> 16) & 0x7fff; | |
775 } | |
776 | |
777 sub flush_all { | |
778 my Cache::Memcached $self = shift; | |
779 | |
780 my $success = 1; | |
781 | |
782 my @hosts = @{$self->{'buckets'}}; | |
783 foreach my $host (@hosts) { | |
784 my $sock = $self->sock_to_host($host); | |
785 my @res = $self->run_command($sock, "flush_all\r\n"); | |
786 $success = 0 unless (@res); | |
787 } | |
788 | |
789 return $success; | |
790 } | |
791 | |
792 # returns array of lines, or () on failure. | |
793 sub run_command { | |
794 my Cache::Memcached $self = shift; | |
795 my ($sock, $cmd) = @_; | |
796 return () unless $sock; | |
797 my $ret; | |
798 my $line = $cmd; | |
799 while (my $res = _write_and_read($self, $sock, $line)) { | |
800 undef $line; | |
801 $ret .= $res; | |
802 last if $ret =~ /(?:OK|END|ERROR)\r\n$/; | |
803 } | |
804 chop $ret; chop $ret; | |
805 return map { "$_\r\n" } split(/\r\n/, $ret); | |
806 } | |
807 | |
808 sub stats { | |
809 my Cache::Memcached $self = shift; | |
810 my ($types) = @_; | |
811 return 0 unless $self->{'active'}; | |
812 return 0 unless !ref($types) || ref($types) eq 'ARRAY'; | |
813 if (!ref($types)) { | |
814 if (!$types) { | |
815 # I don't much care what the default is, it should just | |
816 # be something reasonable. Obviously "reset" should not | |
817 # be on the list :) but other types that might go in here | |
818 # include maps, cachedump, slabs, or items. | |
819 $types = [ qw( misc malloc sizes self ) ]; | |
820 } else { | |
821 $types = [ $types ]; | |
822 } | |
823 } | |
824 | |
825 my $stats_hr = { }; | |
826 | |
827 # The "self" stat type is special, it only applies to this very | |
828 # object. | |
829 if (grep /^self$/, @$types) { | |
830 $stats_hr->{'self'} = \%{ $self->{'stats'} }; | |
831 } | |
832 | |
833 my %misc_keys = map { $_ => 1 } | |
834 qw/ bytes bytes_read bytes_written | |
835 cmd_get cmd_set connection_structures curr_items | |
836 get_hits get_misses | |
837 total_connections total_items | |
838 /; | |
839 | |
840 # Now handle the other types, passing each type to each host server. | |
841 my @hosts = @{$self->{'buckets'}}; | |
842 HOST: foreach my $host (@hosts) { | |
843 my $sock = $self->sock_to_host($host); | |
844 TYPE: foreach my $typename (grep !/^self$/, @$types) { | |
845 my $type = $typename eq 'misc' ? "" : " $typename"; | |
846 my $lines = _write_and_read($self, $sock, "stats$type\r\n", sub { | |
847 my $bref = shift; | |
848 return $$bref =~ /^(?:END|ERROR)\r?\n/m; | |
849 }); | |
850 unless ($lines) { | |
851 _dead_sock($sock); | |
852 next HOST; | |
853 } | |
854 | |
855 $lines =~ s/\0//g; # 'stats sizes' starts with NULL? | |
856 | |
857 # And, most lines end in \r\n but 'stats maps' (as of | |
858 # July 2003 at least) ends in \n. ?? | |
859 my @lines = split(/\r?\n/, $lines); | |
860 | |
861 # Some stats are key-value, some are not. malloc, | |
862 # sizes, and the empty string are key-value. | |
863 # ("self" was handled separately above.) | |
864 if ($typename =~ /^(malloc|sizes|misc)$/) { | |
865 # This stat is key-value. | |
866 foreach my $line (@lines) { | |
867 my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/; | |
868 if ($key) { | |
869 $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value; | |
870 } | |
871 $stats_hr->{'total'}{$key} += $value | |
872 if $typename eq 'misc' && $key && $misc_keys{$key}; | |
873 $stats_hr->{'total'}{"malloc_$key"} += $value | |
874 if $typename eq 'malloc' && $key; | |
875 } | |
876 } else { | |
877 # This stat is not key-value so just pull it | |
878 # all out in one blob. | |
879 $lines =~ s/^END\r?\n//m; | |
880 $stats_hr->{'hosts'}{$host}{$typename} ||= ""; | |
881 $stats_hr->{'hosts'}{$host}{$typename} .= "$lines"; | |
882 } | |
883 } | |
884 } | |
885 | |
886 return $stats_hr; | |
887 } | |
888 | |
889 sub stats_reset { | |
890 my Cache::Memcached $self = shift; | |
891 my ($types) = @_; | |
892 return 0 unless $self->{'active'}; | |
893 | |
894 HOST: foreach my $host (@{$self->{'buckets'}}) { | |
895 my $sock = $self->sock_to_host($host); | |
896 my $ok = _write_and_read($self, $sock, "stats reset"); | |
897 unless ($ok eq "RESET\r\n") { | |
898 _dead_sock($sock); | |
899 } | |
900 } | |
901 return 1; | |
902 } | |
903 | |
904 1; | |
905 __END__ | |
906 | |
907 =head1 NAME | |
908 | |
909 Cache::Memcached - client library for memcached (memory cache daemon) | |
910 | |
911 =head1 SYNOPSIS | |
912 | |
913 use Cache::Memcached; | |
914 | |
915 $memd = new Cache::Memcached { | |
916 'servers' => [ "10.0.0.15:11211", "10.0.0.15:11212", "/var/sock/memcached", | |
917 "10.0.0.17:11211", [ "10.0.0.17:11211", 3 ] ], | |
918 'debug' => 0, | |
919 'compress_threshold' => 10_000, | |
920 }; | |
921 $memd->set_servers($array_ref); | |
922 $memd->set_compress_threshold(10_000); | |
923 $memd->enable_compress(0); | |
924 | |
925 $memd->set("my_key", "Some value"); | |
926 $memd->set("object_key", { 'complex' => [ "object", 2, 4 ]}); | |
927 | |
928 $val = $memd->get("my_key"); | |
929 $val = $memd->get("object_key"); | |
930 if ($val) { print $val->{'complex'}->[2]; } | |
931 | |
932 $memd->incr("key"); | |
933 $memd->decr("key"); | |
934 $memd->incr("key", 2); | |
935 | |
936 =head1 DESCRIPTION | |
937 | |
938 This is the Perl API for memcached, a distributed memory cache daemon. | |
939 More information is available at: | |
940 | |
941 http://www.danga.com/memcached/ | |
942 | |
943 =head1 CONSTRUCTOR | |
944 | |
945 =over 4 | |
946 | |
947 =item C<new> | |
948 | |
949 Takes one parameter, a hashref of options. The most important key is | |
950 C<servers>, but that can also be set later with the C<set_servers> | |
951 method. The servers must be an arrayref of hosts, each of which is | |
952 either a scalar of the form C<10.0.0.10:11211> or an arrayref of the | |
953 former and an integer weight value. (The default weight if | |
954 unspecified is 1.) It's recommended that weight values be kept as low | |
955 as possible, as this module currently allocates memory for bucket | |
956 distribution proportional to the total host weights. | |
957 | |
958 Use C<compress_threshold> to set a compression threshold, in bytes. | |
959 Values larger than this threshold will be compressed by C<set> and | |
960 decompressed by C<get>. | |
961 | |
962 Use C<no_rehash> to disable finding a new memcached server when one | |
963 goes down. Your application may or may not need this, depending on | |
964 your expirations and key usage. | |
965 | |
966 Use C<readonly> to disable writes to backend memcached servers. Only | |
967 get and get_multi will work. This is useful in bizarre debug and | |
968 profiling cases only. | |
969 | |
970 Use C<namespace> to prefix all keys with the provided namespace value. | |
971 That is, if you set namespace to "app1:" and later do a set of "foo" | |
972 to "bar", memcached is actually seeing you set "app1:foo" to "bar". | |
973 | |
974 The other useful key is C<debug>, which when set to true will produce | |
975 diagnostics on STDERR. | |
976 | |
977 =back | |
978 | |
979 =head1 METHODS | |
980 | |
981 =over 4 | |
982 | |
983 =item C<set_servers> | |
984 | |
985 Sets the server list this module distributes key gets and sets between. | |
986 The format is an arrayref of identical form as described in the C<new> | |
987 constructor. | |
988 | |
989 =item C<set_debug> | |
990 | |
991 Sets the C<debug> flag. See C<new> constructor for more information. | |
992 | |
993 =item C<set_readonly> | |
994 | |
995 Sets the C<readonly> flag. See C<new> constructor for more information. | |
996 | |
997 =item C<set_norehash> | |
998 | |
999 Sets the C<no_rehash> flag. See C<new> constructor for more information. | |
1000 | |
1001 =item C<set_compress_threshold> | |
1002 | |
1003 Sets the compression threshold. See C<new> constructor for more information. | |
1004 | |
1005 =item C<enable_compress> | |
1006 | |
1007 Temporarily enable or disable compression. Has no effect if C<compress_threshold> | |
1008 isn't set, but has an overriding effect if it is. | |
1009 | |
1010 =item C<get> | |
1011 | |
1012 my $val = $memd->get($key); | |
1013 | |
1014 Retrieves a key from the memcache. Returns the value (automatically | |
1015 thawed with Storable, if necessary) or undef. | |
1016 | |
1017 The $key can optionally be an arrayref, with the first element being the | |
1018 hash value, if you want to avoid making this module calculate a hash | |
1019 value. You may prefer, for example, to keep all of a given user's | |
1020 objects on the same memcache server, so you could use the user's | |
1021 unique id as the hash value. | |
1022 | |
1023 =item C<get_multi> | |
1024 | |
1025 my $hashref = $memd->get_multi(@keys); | |
1026 | |
1027 Retrieves multiple keys from the memcache doing just one query. | |
1028 Returns a hashref of key/value pairs that were available. | |
1029 | |
1030 This method is recommended over regular 'get' as it lowers the number | |
1031 of total packets flying around your network, reducing total latency, | |
1032 since your app doesn't have to wait for each round-trip of 'get' | |
1033 before sending the next one. | |
1034 | |
1035 =item C<set> | |
1036 | |
1037 $memd->set($key, $value[, $exptime]); | |
1038 | |
1039 Unconditionally sets a key to a given value in the memcache. Returns true | |
1040 if it was stored successfully. | |
1041 | |
1042 The $key can optionally be an arrayref, with the first element being the | |
1043 hash value, as described above. | |
1044 | |
1045 The $exptime (expiration time) defaults to "never" if unspecified. If | |
1046 you want the key to expire in memcached, pass an integer $exptime. If | |
1047 value is less than 60*60*24*30 (30 days), time is assumed to be relative | |
1048 from the present. If larger, it's considered an absolute Unix time. | |
1049 | |
1050 =item C<add> | |
1051 | |
1052 $memd->add($key, $value[, $exptime]); | |
1053 | |
1054 Like C<set>, but only stores in memcache if the key doesn't already exist. | |
1055 | |
1056 =item C<replace> | |
1057 | |
1058 $memd->replace($key, $value[, $exptime]); | |
1059 | |
1060 Like C<set>, but only stores in memcache if the key already exists. The | |
1061 opposite of C<add>. | |
1062 | |
1063 =item C<delete> | |
1064 | |
1065 $memd->delete($key[, $time]); | |
1066 | |
1067 Deletes a key. You may optionally provide an integer time value (in seconds) to | |
1068 tell the memcached server to block new writes to this key for that many seconds. | |
1069 (Sometimes useful as a hacky means to prevent races.) Returns true if key | |
1070 was found and deleted, and false otherwise. | |
1071 | |
1072 You may also use the alternate method name B<remove>, so | |
1073 Cache::Memcached looks like the L<Cache::Cache> API. | |
1074 | |
1075 =item C<incr> | |
1076 | |
1077 $memd->incr($key[, $value]); | |
1078 | |
1079 Sends a command to the server to atomically increment the value for | |
1080 $key by $value, or by 1 if $value is undefined. Returns undef if $key | |
1081 doesn't exist on server, otherwise it returns the new value after | |
1082 incrementing. Value should be zero or greater. Overflow on server | |
1083 is not checked. Be aware of values approaching 2**32. See decr. | |
1084 | |
1085 =item C<decr> | |
1086 | |
1087 $memd->decr($key[, $value]); | |
1088 | |
1089 Like incr, but decrements. Unlike incr, underflow is checked and new | |
1090 values are capped at 0. If server value is 1, a decrement of 2 | |
1091 returns 0, not -1. | |
1092 | |
1093 =item C<stats> | |
1094 | |
1095 $memd->stats([$keys]); | |
1096 | |
1097 Returns a hashref of statistical data regarding the memcache server(s), | |
1098 the $memd object, or both. $keys can be an arrayref of keys wanted, a | |
1099 single key wanted, or absent (in which case the default value is malloc, | |
1100 sizes, self, and the empty string). These keys are the values passed | |
1101 to the 'stats' command issued to the memcached server(s), except for | |
1102 'self' which is internal to the $memd object. Allowed values are: | |
1103 | |
1104 =over 4 | |
1105 | |
1106 =item C<misc> | |
1107 | |
1108 The stats returned by a 'stats' command: pid, uptime, version, | |
1109 bytes, get_hits, etc. | |
1110 | |
1111 =item C<malloc> | |
1112 | |
1113 The stats returned by a 'stats malloc': total_alloc, arena_size, etc. | |
1114 | |
1115 =item C<sizes> | |
1116 | |
1117 The stats returned by a 'stats sizes'. | |
1118 | |
1119 =item C<self> | |
1120 | |
1121 The stats for the $memd object itself (a copy of $memd->{'stats'}). | |
1122 | |
1123 =item C<maps> | |
1124 | |
1125 The stats returned by a 'stats maps'. | |
1126 | |
1127 =item C<cachedump> | |
1128 | |
1129 The stats returned by a 'stats cachedump'. | |
1130 | |
1131 =item C<slabs> | |
1132 | |
1133 The stats returned by a 'stats slabs'. | |
1134 | |
1135 =item C<items> | |
1136 | |
1137 The stats returned by a 'stats items'. | |
1138 | |
1139 =back | |
1140 | |
1141 =item C<disconnect_all> | |
1142 | |
1143 $memd->disconnect_all; | |
1144 | |
1145 Closes all cached sockets to all memcached servers. You must do this | |
1146 if your program forks and the parent has used this module at all. | |
1147 Otherwise the children will try to use cached sockets and they'll fight | |
1148 (as children do) and garble the client/server protocol. | |
1149 | |
1150 =item C<flush_all> | |
1151 | |
1152 $memd->flush_all; | |
1153 | |
1154 Runs the memcached "flush_all" command on all configured hosts, | |
1155 emptying all their caches. (or rather, invalidating all items | |
1156 in the caches in an O(1) operation...) Running stats will still | |
1157 show the item existing, they're just be non-existent and lazily | |
1158 destroyed next time you try to detch any of them. | |
1159 | |
1160 =back | |
1161 | |
1162 =head1 BUGS | |
1163 | |
1164 When a server goes down, this module does detect it, and re-hashes the | |
1165 request to the remaining servers, but the way it does it isn't very | |
1166 clean. The result may be that it gives up during its rehashing and | |
1167 refuses to get/set something it could've, had it been done right. | |
1168 | |
1169 =head1 COPYRIGHT | |
1170 | |
1171 This module is Copyright (c) 2003 Brad Fitzpatrick. | |
1172 All rights reserved. | |
1173 | |
1174 You may distribute under the terms of either the GNU General Public | |
1175 License or the Artistic License, as specified in the Perl README file. | |
1176 | |
1177 =head1 WARRANTY | |
1178 | |
1179 This is free software. IT COMES WITHOUT WARRANTY OF ANY KIND. | |
1180 | |
1181 =head1 FAQ | |
1182 | |
1183 See the memcached website: | |
1184 http://www.danga.com/memcached/ | |
1185 | |
1186 =head1 AUTHORS | |
1187 | |
1188 Brad Fitzpatrick <brad@danga.com> | |
1189 | |
1190 Anatoly Vorobey <mellon@pobox.com> | |
1191 | |
1192 Brad Whitaker <whitaker@danga.com> | |
1193 | |
1194 Jamie McCarthy <jamie@mccarthy.vg> |