comparison memcached.c @ 0:30782bb1fc04 MEMCACHED_1_2_3

memcached-1.2.3
author Maxim Dounin <mdounin@mdounin.ru>
date Sun, 23 Sep 2007 03:58:34 +0400
parents
children e28ab6bd21fa
comparison
equal deleted inserted replaced
-1:000000000000 0:30782bb1fc04
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * memcached - memory caching daemon
4 *
5 * http://www.danga.com/memcached/
6 *
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
8 *
9 * Use and distribution licensed under the BSD license. See
10 * the LICENSE file for full text.
11 *
12 * Authors:
13 * Anatoly Vorobey <mellon@pobox.com>
14 * Brad Fitzpatrick <brad@danga.com>
15 std *
16 * $Id: memcached.c 570 2007-06-20 01:21:45Z plindner $
17 */
18 #include "memcached.h"
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/un.h>
22 #include <sys/signal.h>
23 #include <sys/resource.h>
24 #include <sys/uio.h>
25
26 /* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h. */
28 #ifndef _P1003_1B_VISIBLE
29 #define _P1003_1B_VISIBLE
30 #endif
31 /* need this to get IOV_MAX on some platforms. */
32 #ifndef __need_IOV_MAX
33 #define __need_IOV_MAX
34 #endif
35 #include <pwd.h>
36 #include <sys/mman.h>
37 #include <fcntl.h>
38 #include <netinet/tcp.h>
39 #include <arpa/inet.h>
40 #include <errno.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #include <time.h>
45 #include <assert.h>
46 #include <limits.h>
47
48 #ifdef HAVE_MALLOC_H
49 /* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
50 #ifndef __OpenBSD__
51 #include <malloc.h>
52 #endif
53 #endif
54
55 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
56 #ifndef IOV_MAX
57 #if defined(__FreeBSD__)
58 # define IOV_MAX 1024
59 #endif
60 #endif
61
62 /*
63 * forward declarations
64 */
65 static void drive_machine(conn *c);
66 static int new_socket(const bool is_udp);
67 static int server_socket(const int port, const bool is_udp);
68 static int try_read_command(conn *c);
69 static int try_read_network(conn *c);
70 static int try_read_udp(conn *c);
71
72 /* stats */
73 static void stats_reset(void);
74 static void stats_init(void);
75
76 /* defaults */
77 static void settings_init(void);
78
79 /* event handling, network IO */
80 static void event_handler(const int fd, const short which, void *arg);
81 static void conn_close(conn *c);
82 static void conn_init(void);
83 static void accept_new_conns(const bool do_accept);
84 static bool update_event(conn *c, const int new_flags);
85 static void complete_nread(conn *c);
86 static void process_command(conn *c, char *command);
87 static int transmit(conn *c);
88 static int ensure_iov_space(conn *c);
89 static int add_iov(conn *c, const void *buf, int len);
90 static int add_msghdr(conn *c);
91
92
93 /* time handling */
94 static void set_current_time(void); /* update the global variable holding
95 global 32-bit seconds-since-start time
96 (to avoid 64 bit time_t) */
97
98 void pre_gdb(void);
99 static void conn_free(conn *c);
100
101 /** exported globals **/
102 struct stats stats;
103 struct settings settings;
104
105 /** file scope variables **/
106 static item **todelete = 0;
107 static int delcurr;
108 static int deltotal;
109 static conn *listen_conn;
110 static struct event_base *main_base;
111
112 #define TRANSMIT_COMPLETE 0
113 #define TRANSMIT_INCOMPLETE 1
114 #define TRANSMIT_SOFT_ERROR 2
115 #define TRANSMIT_HARD_ERROR 3
116
117 static int *buckets = 0; /* bucket->generation array for a managed instance */
118
119 #define REALTIME_MAXDELTA 60*60*24*30
120 /*
121 * given time value that's either unix time or delta from current unix time, return
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't
123 * be that low).
124 */
125 static rel_time_t realtime(const time_t exptime) {
126 /* no. of seconds in 30 days - largest possible delta exptime */
127
128 if (exptime == 0) return 0; /* 0 means never expire */
129
130 if (exptime > REALTIME_MAXDELTA) {
131 /* if item expiration is at/before the server started, give it an
132 expiration time of 1 second after the server started.
133 (because 0 means don't expire). without this, we'd
134 underflow and wrap around to some large value way in the
135 future, effectively making items expiring in the past
136 really expiring never */
137 if (exptime <= stats.started)
138 return (rel_time_t)1;
139 return (rel_time_t)(exptime - stats.started);
140 } else {
141 return (rel_time_t)(exptime + current_time);
142 }
143 }
144
145 static void stats_init(void) {
146 stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
147 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
148 stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
149
150 /* make the time we started always be 2 seconds before we really
151 did, so time(0) - time.started is never zero. if so, things
152 like 'settings.oldest_live' which act as booleans as well as
153 values are now false in boolean context... */
154 stats.started = time(0) - 2;
155 stats_prefix_init();
156 }
157
158 static void stats_reset(void) {
159 STATS_LOCK();
160 stats.total_items = stats.total_conns = 0;
161 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
162 stats.bytes_read = stats.bytes_written = 0;
163 stats_prefix_clear();
164 STATS_UNLOCK();
165 }
166
167 static void settings_init(void) {
168 settings.port = 11211;
169 settings.udpport = 0;
170 settings.interf.s_addr = htonl(INADDR_ANY);
171 settings.maxbytes = 67108864; /* default is 64MB: (64 * 1024 * 1024) */
172 settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
173 settings.verbose = 0;
174 settings.oldest_live = 0;
175 settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
176 settings.socketpath = NULL; /* by default, not using a unix socket */
177 settings.managed = false;
178 settings.factor = 1.25;
179 settings.chunk_size = 48; /* space for a modest key and value */
180 #ifdef USE_THREADS
181 settings.num_threads = 4;
182 #else
183 settings.num_threads = 1;
184 #endif
185 settings.prefix_delimiter = ':';
186 settings.detail_enabled = 0;
187 }
188
189 /* returns true if a deleted item's delete-locked-time is over, and it
190 should be removed from the namespace */
191 static bool item_delete_lock_over (item *it) {
192 assert(it->it_flags & ITEM_DELETED);
193 return (current_time >= it->exptime);
194 }
195
196 /*
197 * Adds a message header to a connection.
198 *
199 * Returns 0 on success, -1 on out-of-memory.
200 */
201 static int add_msghdr(conn *c)
202 {
203 struct msghdr *msg;
204
205 assert(c != NULL);
206
207 if (c->msgsize == c->msgused) {
208 msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
209 if (! msg)
210 return -1;
211 c->msglist = msg;
212 c->msgsize *= 2;
213 }
214
215 msg = c->msglist + c->msgused;
216
217 /* this wipes msg_iovlen, msg_control, msg_controllen, and
218 msg_flags, the last 3 of which aren't defined on solaris: */
219 memset(msg, 0, sizeof(struct msghdr));
220
221 msg->msg_iov = &c->iov[c->iovused];
222 msg->msg_name = &c->request_addr;
223 msg->msg_namelen = c->request_addr_size;
224
225 c->msgbytes = 0;
226 c->msgused++;
227
228 if (c->udp) {
229 /* Leave room for the UDP header, which we'll fill in later. */
230 return add_iov(c, NULL, UDP_HEADER_SIZE);
231 }
232
233 return 0;
234 }
235
236
237 /*
238 * Free list management for connections.
239 */
240
241 static conn **freeconns;
242 static int freetotal;
243 static int freecurr;
244
245
246 static void conn_init(void) {
247 freetotal = 200;
248 freecurr = 0;
249 if (!(freeconns = (conn **)malloc(sizeof(conn *) * freetotal))) {
250 perror("malloc()");
251 }
252 return;
253 }
254
255 /*
256 * Returns a connection from the freelist, if any. Should call this using
257 * conn_from_freelist() for thread safety.
258 */
259 conn *do_conn_from_freelist() {
260 conn *c;
261
262 if (freecurr > 0) {
263 c = freeconns[--freecurr];
264 } else {
265 c = NULL;
266 }
267
268 return c;
269 }
270
271 /*
272 * Adds a connection to the freelist. 0 = success. Should call this using
273 * conn_add_to_freelist() for thread safety.
274 */
275 int do_conn_add_to_freelist(conn *c) {
276 if (freecurr < freetotal) {
277 freeconns[freecurr++] = c;
278 return 0;
279 } else {
280 /* try to enlarge free connections array */
281 conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
282 if (new_freeconns) {
283 freetotal *= 2;
284 freeconns = new_freeconns;
285 freeconns[freecurr++] = c;
286 return 0;
287 }
288 }
289 return 1;
290 }
291
292 conn *conn_new(const int sfd, const int init_state, const int event_flags,
293 const int read_buffer_size, const bool is_udp, struct event_base *base) {
294 conn *c = conn_from_freelist();
295
296 if (NULL == c) {
297 if (!(c = (conn *)malloc(sizeof(conn)))) {
298 perror("malloc()");
299 return NULL;
300 }
301 c->rbuf = c->wbuf = 0;
302 c->ilist = 0;
303 c->iov = 0;
304 c->msglist = 0;
305 c->hdrbuf = 0;
306
307 c->rsize = read_buffer_size;
308 c->wsize = DATA_BUFFER_SIZE;
309 c->isize = ITEM_LIST_INITIAL;
310 c->iovsize = IOV_LIST_INITIAL;
311 c->msgsize = MSG_LIST_INITIAL;
312 c->hdrsize = 0;
313
314 c->rbuf = (char *)malloc((size_t)c->rsize);
315 c->wbuf = (char *)malloc((size_t)c->wsize);
316 c->ilist = (item **)malloc(sizeof(item *) * c->isize);
317 c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
318 c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
319
320 if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
321 c->msglist == 0) {
322 if (c->rbuf != 0) free(c->rbuf);
323 if (c->wbuf != 0) free(c->wbuf);
324 if (c->ilist !=0) free(c->ilist);
325 if (c->iov != 0) free(c->iov);
326 if (c->msglist != 0) free(c->msglist);
327 free(c);
328 perror("malloc()");
329 return NULL;
330 }
331
332 STATS_LOCK();
333 stats.conn_structs++;
334 STATS_UNLOCK();
335 }
336
337 if (settings.verbose > 1) {
338 if (init_state == conn_listening)
339 fprintf(stderr, "<%d server listening\n", sfd);
340 else if (is_udp)
341 fprintf(stderr, "<%d server listening (udp)\n", sfd);
342 else
343 fprintf(stderr, "<%d new client connection\n", sfd);
344 }
345
346 c->sfd = sfd;
347 c->udp = is_udp;
348 c->state = init_state;
349 c->rlbytes = 0;
350 c->rbytes = c->wbytes = 0;
351 c->wcurr = c->wbuf;
352 c->rcurr = c->rbuf;
353 c->ritem = 0;
354 c->icurr = c->ilist;
355 c->ileft = 0;
356 c->iovused = 0;
357 c->msgcurr = 0;
358 c->msgused = 0;
359
360 c->write_and_go = conn_read;
361 c->write_and_free = 0;
362 c->item = 0;
363 c->bucket = -1;
364 c->gen = 0;
365
366 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
367 event_base_set(base, &c->event);
368 c->ev_flags = event_flags;
369
370 if (event_add(&c->event, 0) == -1) {
371 if (conn_add_to_freelist(c)) {
372 conn_free(c);
373 }
374 return NULL;
375 }
376
377 STATS_LOCK();
378 stats.curr_conns++;
379 stats.total_conns++;
380 STATS_UNLOCK();
381
382 return c;
383 }
384
385 static void conn_cleanup(conn *c) {
386 assert(c != NULL);
387
388 if (c->item) {
389 item_remove(c->item);
390 c->item = 0;
391 }
392
393 if (c->ileft != 0) {
394 for (; c->ileft > 0; c->ileft--,c->icurr++) {
395 item_remove(*(c->icurr));
396 }
397 }
398
399 if (c->write_and_free) {
400 free(c->write_and_free);
401 c->write_and_free = 0;
402 }
403 }
404
405 /*
406 * Frees a connection.
407 */
408 void conn_free(conn *c) {
409 if (c) {
410 if (c->hdrbuf)
411 free(c->hdrbuf);
412 if (c->msglist)
413 free(c->msglist);
414 if (c->rbuf)
415 free(c->rbuf);
416 if (c->wbuf)
417 free(c->wbuf);
418 if (c->ilist)
419 free(c->ilist);
420 if (c->iov)
421 free(c->iov);
422 free(c);
423 }
424 }
425
426 static void conn_close(conn *c) {
427 assert(c != NULL);
428
429 /* delete the event, the socket and the conn */
430 event_del(&c->event);
431
432 if (settings.verbose > 1)
433 fprintf(stderr, "<%d connection closed.\n", c->sfd);
434
435 close(c->sfd);
436 accept_new_conns(true);
437 conn_cleanup(c);
438
439 /* if the connection has big buffers, just free it */
440 if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
441 conn_free(c);
442 }
443
444 STATS_LOCK();
445 stats.curr_conns--;
446 STATS_UNLOCK();
447
448 return;
449 }
450
451
452 /*
453 * Shrinks a connection's buffers if they're too big. This prevents
454 * periodic large "get" requests from permanently chewing lots of server
455 * memory.
456 *
457 * This should only be called in between requests since it can wipe output
458 * buffers!
459 */
460 static void conn_shrink(conn *c) {
461 assert(c != NULL);
462
463 if (c->udp)
464 return;
465
466 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
467 char *newbuf;
468
469 if (c->rcurr != c->rbuf)
470 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
471
472 newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
473
474 if (newbuf) {
475 c->rbuf = newbuf;
476 c->rsize = DATA_BUFFER_SIZE;
477 }
478 /* TODO check other branch... */
479 c->rcurr = c->rbuf;
480 }
481
482 if (c->isize > ITEM_LIST_HIGHWAT) {
483 item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
484 if (newbuf) {
485 c->ilist = newbuf;
486 c->isize = ITEM_LIST_INITIAL;
487 }
488 /* TODO check error condition? */
489 }
490
491 if (c->msgsize > MSG_LIST_HIGHWAT) {
492 struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
493 if (newbuf) {
494 c->msglist = newbuf;
495 c->msgsize = MSG_LIST_INITIAL;
496 }
497 /* TODO check error condition? */
498 }
499
500 if (c->iovsize > IOV_LIST_HIGHWAT) {
501 struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
502 if (newbuf) {
503 c->iov = newbuf;
504 c->iovsize = IOV_LIST_INITIAL;
505 }
506 /* TODO check return value */
507 }
508 }
509
510 /*
511 * Sets a connection's current state in the state machine. Any special
512 * processing that needs to happen on certain state transitions can
513 * happen here.
514 */
515 static void conn_set_state(conn *c, int state) {
516 assert(c != NULL);
517
518 if (state != c->state) {
519 if (state == conn_read) {
520 conn_shrink(c);
521 assoc_move_next_bucket();
522 }
523 c->state = state;
524 }
525 }
526
527
528 /*
529 * Ensures that there is room for another struct iovec in a connection's
530 * iov list.
531 *
532 * Returns 0 on success, -1 on out-of-memory.
533 */
534 static int ensure_iov_space(conn *c) {
535 assert(c != NULL);
536
537 if (c->iovused >= c->iovsize) {
538 int i, iovnum;
539 struct iovec *new_iov = (struct iovec *)realloc(c->iov,
540 (c->iovsize * 2) * sizeof(struct iovec));
541 if (! new_iov)
542 return -1;
543 c->iov = new_iov;
544 c->iovsize *= 2;
545
546 /* Point all the msghdr structures at the new list. */
547 for (i = 0, iovnum = 0; i < c->msgused; i++) {
548 c->msglist[i].msg_iov = &c->iov[iovnum];
549 iovnum += c->msglist[i].msg_iovlen;
550 }
551 }
552
553 return 0;
554 }
555
556
557 /*
558 * Adds data to the list of pending data that will be written out to a
559 * connection.
560 *
561 * Returns 0 on success, -1 on out-of-memory.
562 */
563
564 static int add_iov(conn *c, const void *buf, int len) {
565 struct msghdr *m;
566 int leftover;
567 bool limit_to_mtu;
568
569 assert(c != NULL);
570
571 do {
572 m = &c->msglist[c->msgused - 1];
573
574 /*
575 * Limit UDP packets, and the first payloads of TCP replies, to
576 * UDP_MAX_PAYLOAD_SIZE bytes.
577 */
578 limit_to_mtu = c->udp || (1 == c->msgused);
579
580 /* We may need to start a new msghdr if this one is full. */
581 if (m->msg_iovlen == IOV_MAX ||
582 (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
583 add_msghdr(c);
584 m = &c->msglist[c->msgused - 1];
585 }
586
587 if (ensure_iov_space(c) != 0)
588 return -1;
589
590 /* If the fragment is too big to fit in the datagram, split it up */
591 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
592 leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
593 len -= leftover;
594 } else {
595 leftover = 0;
596 }
597
598 m = &c->msglist[c->msgused - 1];
599 m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
600 m->msg_iov[m->msg_iovlen].iov_len = len;
601
602 c->msgbytes += len;
603 c->iovused++;
604 m->msg_iovlen++;
605
606 buf = ((char *)buf) + len;
607 len = leftover;
608 } while (leftover > 0);
609
610 return 0;
611 }
612
613
614 /*
615 * Constructs a set of UDP headers and attaches them to the outgoing messages.
616 */
617 static int build_udp_headers(conn *c) {
618 int i;
619 unsigned char *hdr;
620
621 assert(c != NULL);
622
623 if (c->msgused > c->hdrsize) {
624 void *new_hdrbuf;
625 if (c->hdrbuf)
626 new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
627 else
628 new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
629 if (! new_hdrbuf)
630 return -1;
631 c->hdrbuf = (unsigned char *)new_hdrbuf;
632 c->hdrsize = c->msgused * 2;
633 }
634
635 hdr = c->hdrbuf;
636 for (i = 0; i < c->msgused; i++) {
637 c->msglist[i].msg_iov[0].iov_base = hdr;
638 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
639 *hdr++ = c->request_id / 256;
640 *hdr++ = c->request_id % 256;
641 *hdr++ = i / 256;
642 *hdr++ = i % 256;
643 *hdr++ = c->msgused / 256;
644 *hdr++ = c->msgused % 256;
645 *hdr++ = 0;
646 *hdr++ = 0;
647 assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
648 }
649
650 return 0;
651 }
652
653
654 static void out_string(conn *c, const char *str) {
655 size_t len;
656
657 assert(c != NULL);
658
659 if (settings.verbose > 1)
660 fprintf(stderr, ">%d %s\n", c->sfd, str);
661
662 len = strlen(str);
663 if ((len + 2) > c->wsize) {
664 /* ought to be always enough. just fail for simplicity */
665 str = "SERVER_ERROR output line too long";
666 len = strlen(str);
667 }
668
669 memcpy(c->wbuf, str, len);
670 memcpy(c->wbuf + len, "\r\n", 3);
671 c->wbytes = len + 2;
672 c->wcurr = c->wbuf;
673
674 conn_set_state(c, conn_write);
675 c->write_and_go = conn_read;
676 return;
677 }
678
679 /*
680 * we get here after reading the value in set/add/replace commands. The command
681 * has been stored in c->item_comm, and the item is ready in c->item.
682 */
683
684 static void complete_nread(conn *c) {
685 assert(c != NULL);
686
687 item *it = c->item;
688 int comm = c->item_comm;
689
690 STATS_LOCK();
691 stats.set_cmds++;
692 STATS_UNLOCK();
693
694 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
695 out_string(c, "CLIENT_ERROR bad data chunk");
696 } else {
697 if (store_item(it, comm)) {
698 out_string(c, "STORED");
699 } else {
700 out_string(c, "NOT_STORED");
701 }
702 }
703
704 item_remove(c->item); /* release the c->item reference */
705 c->item = 0;
706 }
707
708 /*
709 * Stores an item in the cache according to the semantics of one of the set
710 * commands. In threaded mode, this is protected by the cache lock.
711 *
712 * Returns true if the item was stored.
713 */
714 int do_store_item(item *it, int comm) {
715 char *key = ITEM_key(it);
716 bool delete_locked = false;
717 item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
718 int stored = 0;
719
720 if (old_it != NULL && comm == NREAD_ADD) {
721 /* add only adds a nonexistent item, but promote to head of LRU */
722 do_item_update(old_it);
723 } else if (!old_it && comm == NREAD_REPLACE) {
724 /* replace only replaces an existing value; don't store */
725 } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
726 /* replace and add can't override delete locks; don't store */
727 } else {
728 /* "set" commands can override the delete lock
729 window... in which case we have to find the old hidden item
730 that's in the namespace/LRU but wasn't returned by
731 item_get.... because we need to replace it */
732 if (delete_locked)
733 old_it = do_item_get_nocheck(key, it->nkey);
734
735 if (old_it != NULL)
736 do_item_replace(old_it, it);
737 else
738 do_item_link(it);
739
740 stored = 1;
741 }
742
743 if (old_it)
744 do_item_remove(old_it); /* release our reference */
745 return stored;
746 }
747
748 typedef struct token_s {
749 char *value;
750 size_t length;
751 } token_t;
752
753 #define COMMAND_TOKEN 0
754 #define SUBCOMMAND_TOKEN 1
755 #define KEY_TOKEN 1
756 #define KEY_MAX_LENGTH 250
757
758 #define MAX_TOKENS 6
759
760 /*
761 * Tokenize the command string by replacing whitespace with '\0' and update
762 * the token array tokens with pointer to start of each token and length.
763 * Returns total number of tokens. The last valid token is the terminal
764 * token (value points to the first unprocessed character of the string and
765 * length zero).
766 *
767 * Usage example:
768 *
769 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
770 * for(int ix = 0; tokens[ix].length != 0; ix++) {
771 * ...
772 * }
773 * ncommand = tokens[ix].value - command;
774 * command = tokens[ix].value;
775 * }
776 */
777 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
778 char *s, *e;
779 size_t ntokens = 0;
780
781 assert(command != NULL && tokens != NULL && max_tokens > 1);
782
783 for (s = e = command; ntokens < max_tokens - 1; ++e) {
784 if (*e == ' ') {
785 if (s != e) {
786 tokens[ntokens].value = s;
787 tokens[ntokens].length = e - s;
788 ntokens++;
789 *e = '\0';
790 }
791 s = e + 1;
792 }
793 else if (*e == '\0') {
794 if (s != e) {
795 tokens[ntokens].value = s;
796 tokens[ntokens].length = e - s;
797 ntokens++;
798 }
799
800 break; /* string end */
801 }
802 }
803
804 /*
805 * If we scanned the whole string, the terminal value pointer is null,
806 * otherwise it is the first unprocessed character.
807 */
808 tokens[ntokens].value = *e == '\0' ? NULL : e;
809 tokens[ntokens].length = 0;
810 ntokens++;
811
812 return ntokens;
813 }
814
815 inline static void process_stats_detail(conn *c, const char *command) {
816 assert(c != NULL);
817
818 if (strcmp(command, "on") == 0) {
819 settings.detail_enabled = 1;
820 out_string(c, "OK");
821 }
822 else if (strcmp(command, "off") == 0) {
823 settings.detail_enabled = 0;
824 out_string(c, "OK");
825 }
826 else if (strcmp(command, "dump") == 0) {
827 int len;
828 char *stats = stats_prefix_dump(&len);
829 if (NULL != stats) {
830 c->write_and_free = stats;
831 c->wcurr = stats;
832 c->wbytes = len;
833 conn_set_state(c, conn_write);
834 c->write_and_go = conn_read;
835 }
836 else {
837 out_string(c, "SERVER_ERROR");
838 }
839 }
840 else {
841 out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
842 }
843 }
844
845 static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
846 rel_time_t now = current_time;
847 char *command;
848 char *subcommand;
849
850 assert(c != NULL);
851
852 if(ntokens < 2) {
853 out_string(c, "CLIENT_ERROR bad command line");
854 return;
855 }
856
857 command = tokens[COMMAND_TOKEN].value;
858
859 if (ntokens == 2 && strcmp(command, "stats") == 0) {
860 char temp[1024];
861 pid_t pid = getpid();
862 char *pos = temp;
863
864 #ifndef WIN32
865 struct rusage usage;
866 getrusage(RUSAGE_SELF, &usage);
867 #endif /* !WIN32 */
868
869 STATS_LOCK();
870 pos += sprintf(pos, "STAT pid %u\r\n", pid);
871 pos += sprintf(pos, "STAT uptime %u\r\n", now);
872 pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
873 pos += sprintf(pos, "STAT version " VERSION "\r\n");
874 pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
875 #ifndef WIN32
876 pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
877 pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
878 #endif /* !WIN32 */
879 pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
880 pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
881 pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
882 pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
883 pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
884 pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
885 pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
886 pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
887 pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
888 pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
889 pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
890 pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
891 pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
892 pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
893 pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
894 pos += sprintf(pos, "END");
895 STATS_UNLOCK();
896 out_string(c, temp);
897 return;
898 }
899
900 subcommand = tokens[SUBCOMMAND_TOKEN].value;
901
902 if (strcmp(subcommand, "reset") == 0) {
903 stats_reset();
904 out_string(c, "RESET");
905 return;
906 }
907
908 #ifdef HAVE_MALLOC_H
909 #ifdef HAVE_STRUCT_MALLINFO
910 if (strcmp(subcommand, "malloc") == 0) {
911 char temp[512];
912 struct mallinfo info;
913 char *pos = temp;
914
915 info = mallinfo();
916 pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
917 pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
918 pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
919 pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
920 pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
921 pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
922 pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
923 pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
924 pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
925 pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
926 out_string(c, temp);
927 return;
928 }
929 #endif /* HAVE_STRUCT_MALLINFO */
930 #endif /* HAVE_MALLOC_H */
931
932 #if !defined(WIN32) || !defined(__APPLE__)
933 if (strcmp(subcommand, "maps") == 0) {
934 char *wbuf;
935 int wsize = 8192; /* should be enough */
936 int fd;
937 int res;
938
939 if (!(wbuf = (char *)malloc(wsize))) {
940 out_string(c, "SERVER_ERROR out of memory");
941 return;
942 }
943
944 fd = open("/proc/self/maps", O_RDONLY);
945 if (fd == -1) {
946 out_string(c, "SERVER_ERROR cannot open the maps file");
947 free(wbuf);
948 return;
949 }
950
951 res = read(fd, wbuf, wsize - 6); /* 6 = END\r\n\0 */
952 if (res == wsize - 6) {
953 out_string(c, "SERVER_ERROR buffer overflow");
954 free(wbuf); close(fd);
955 return;
956 }
957 if (res == 0 || res == -1) {
958 out_string(c, "SERVER_ERROR can't read the maps file");
959 free(wbuf); close(fd);
960 return;
961 }
962 memcpy(wbuf + res, "END\r\n", 6);
963 c->write_and_free = wbuf;
964 c->wcurr = wbuf;
965 c->wbytes = res + 5; // Don't write the terminal '\0'
966 conn_set_state(c, conn_write);
967 c->write_and_go = conn_read;
968 close(fd);
969 return;
970 }
971 #endif
972
973 if (strcmp(subcommand, "cachedump") == 0) {
974
975 char *buf;
976 unsigned int bytes, id, limit = 0;
977
978 if(ntokens < 5) {
979 out_string(c, "CLIENT_ERROR bad command line");
980 return;
981 }
982
983 id = strtoul(tokens[2].value, NULL, 10);
984 limit = strtoul(tokens[3].value, NULL, 10);
985
986 if(errno == ERANGE) {
987 out_string(c, "CLIENT_ERROR bad command line format");
988 return;
989 }
990
991 buf = item_cachedump(id, limit, &bytes);
992 if (buf == 0) {
993 out_string(c, "SERVER_ERROR out of memory");
994 return;
995 }
996
997 c->write_and_free = buf;
998 c->wcurr = buf;
999 c->wbytes = bytes;
1000 conn_set_state(c, conn_write);
1001 c->write_and_go = conn_read;
1002 return;
1003 }
1004
1005 if (strcmp(subcommand, "slabs") == 0) {
1006 int bytes = 0;
1007 char *buf = slabs_stats(&bytes);
1008 if (!buf) {
1009 out_string(c, "SERVER_ERROR out of memory");
1010 return;
1011 }
1012 c->write_and_free = buf;
1013 c->wcurr = buf;
1014 c->wbytes = bytes;
1015 conn_set_state(c, conn_write);
1016 c->write_and_go = conn_read;
1017 return;
1018 }
1019
1020 if (strcmp(subcommand, "items") == 0) {
1021 char buffer[4096];
1022 item_stats(buffer, 4096);
1023 out_string(c, buffer);
1024 return;
1025 }
1026
1027 if (strcmp(subcommand, "detail") == 0) {
1028 if (ntokens < 4)
1029 process_stats_detail(c, ""); /* outputs the error message */
1030 else
1031 process_stats_detail(c, tokens[2].value);
1032 return;
1033 }
1034
1035 if (strcmp(subcommand, "sizes") == 0) {
1036 int bytes = 0;
1037 char *buf = item_stats_sizes(&bytes);
1038 if (! buf) {
1039 out_string(c, "SERVER_ERROR out of memory");
1040 return;
1041 }
1042
1043 c->write_and_free = buf;
1044 c->wcurr = buf;
1045 c->wbytes = bytes;
1046 conn_set_state(c, conn_write);
1047 c->write_and_go = conn_read;
1048 return;
1049 }
1050
1051 out_string(c, "ERROR");
1052 }
1053
1054 /* ntokens is overwritten here... shrug.. */
1055 static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) {
1056 char *key;
1057 size_t nkey;
1058 int i = 0;
1059 item *it;
1060 token_t *key_token = &tokens[KEY_TOKEN];
1061
1062 assert(c != NULL);
1063
1064 if (settings.managed) {
1065 int bucket = c->bucket;
1066 if (bucket == -1) {
1067 out_string(c, "CLIENT_ERROR no BG data in managed mode");
1068 return;
1069 }
1070 c->bucket = -1;
1071 if (buckets[bucket] != c->gen) {
1072 out_string(c, "ERROR_NOT_OWNER");
1073 return;
1074 }
1075 }
1076
1077 do {
1078 while(key_token->length != 0) {
1079
1080 key = key_token->value;
1081 nkey = key_token->length;
1082
1083 if(nkey > KEY_MAX_LENGTH) {
1084 out_string(c, "CLIENT_ERROR bad command line format");
1085 return;
1086 }
1087
1088 STATS_LOCK();
1089 stats.get_cmds++;
1090 STATS_UNLOCK();
1091 it = item_get(key, nkey);
1092 if (settings.detail_enabled) {
1093 stats_prefix_record_get(key, NULL != it);
1094 }
1095 if (it) {
1096 if (i >= c->isize) {
1097 item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1098 if (new_list) {
1099 c->isize *= 2;
1100 c->ilist = new_list;
1101 } else break;
1102 }
1103
1104 /*
1105 * Construct the response. Each hit adds three elements to the
1106 * outgoing data list:
1107 * "VALUE "
1108 * key
1109 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
1110 */
1111 if (add_iov(c, "VALUE ", 6) != 0 ||
1112 add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1113 add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1114 {
1115 break;
1116 }
1117 if (settings.verbose > 1)
1118 fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1119
1120 /* item_get() has incremented it->refcount for us */
1121 STATS_LOCK();
1122 stats.get_hits++;
1123 STATS_UNLOCK();
1124 item_update(it);
1125 *(c->ilist + i) = it;
1126 i++;
1127
1128 } else {
1129 STATS_LOCK();
1130 stats.get_misses++;
1131 STATS_UNLOCK();
1132 }
1133
1134 key_token++;
1135 }
1136
1137 /*
1138 * If the command string hasn't been fully processed, get the next set
1139 * of tokens.
1140 */
1141 if(key_token->value != NULL) {
1142 ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1143 key_token = tokens;
1144 }
1145
1146 } while(key_token->value != NULL);
1147
1148 c->icurr = c->ilist;
1149 c->ileft = i;
1150
1151 if (settings.verbose > 1)
1152 fprintf(stderr, ">%d END\n", c->sfd);
1153 add_iov(c, "END\r\n", 5);
1154
1155 if (c->udp && build_udp_headers(c) != 0) {
1156 out_string(c, "SERVER_ERROR out of memory");
1157 }
1158 else {
1159 conn_set_state(c, conn_mwrite);
1160 c->msgcurr = 0;
1161 }
1162 return;
1163 }
1164
1165 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm) {
1166 char *key;
1167 size_t nkey;
1168 int flags;
1169 time_t exptime;
1170 int vlen;
1171 item *it;
1172
1173 assert(c != NULL);
1174
1175 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1176 out_string(c, "CLIENT_ERROR bad command line format");
1177 return;
1178 }
1179
1180 key = tokens[KEY_TOKEN].value;
1181 nkey = tokens[KEY_TOKEN].length;
1182
1183 flags = strtoul(tokens[2].value, NULL, 10);
1184 exptime = strtol(tokens[3].value, NULL, 10);
1185 vlen = strtol(tokens[4].value, NULL, 10);
1186
1187 if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1188 out_string(c, "CLIENT_ERROR bad command line format");
1189 return;
1190 }
1191
1192 if (settings.detail_enabled) {
1193 stats_prefix_record_set(key);
1194 }
1195
1196 if (settings.managed) {
1197 int bucket = c->bucket;
1198 if (bucket == -1) {
1199 out_string(c, "CLIENT_ERROR no BG data in managed mode");
1200 return;
1201 }
1202 c->bucket = -1;
1203 if (buckets[bucket] != c->gen) {
1204 out_string(c, "ERROR_NOT_OWNER");
1205 return;
1206 }
1207 }
1208
1209 it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1210
1211 if (it == 0) {
1212 if (! item_size_ok(nkey, flags, vlen + 2))
1213 out_string(c, "SERVER_ERROR object too large for cache");
1214 else
1215 out_string(c, "SERVER_ERROR out of memory");
1216 /* swallow the data line */
1217 c->write_and_go = conn_swallow;
1218 c->sbytes = vlen + 2;
1219 return;
1220 }
1221
1222 c->item_comm = comm;
1223 c->item = it;
1224 c->ritem = ITEM_data(it);
1225 c->rlbytes = it->nbytes;
1226 conn_set_state(c, conn_nread);
1227 }
1228
1229 static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) {
1230 char temp[32];
1231 item *it;
1232 unsigned int delta;
1233 char *key;
1234 size_t nkey;
1235
1236 assert(c != NULL);
1237
1238 if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1239 out_string(c, "CLIENT_ERROR bad command line format");
1240 return;
1241 }
1242
1243 key = tokens[KEY_TOKEN].value;
1244 nkey = tokens[KEY_TOKEN].length;
1245
1246 if (settings.managed) {
1247 int bucket = c->bucket;
1248 if (bucket == -1) {
1249 out_string(c, "CLIENT_ERROR no BG data in managed mode");
1250 return;
1251 }
1252 c->bucket = -1;
1253 if (buckets[bucket] != c->gen) {
1254 out_string(c, "ERROR_NOT_OWNER");
1255 return;
1256 }
1257 }
1258
1259 delta = strtoul(tokens[2].value, NULL, 10);
1260
1261 if(errno == ERANGE) {
1262 out_string(c, "CLIENT_ERROR bad command line format");
1263 return;
1264 }
1265
1266 it = item_get(key, nkey);
1267 if (!it) {
1268 out_string(c, "NOT_FOUND");
1269 return;
1270 }
1271
1272 out_string(c, add_delta(it, incr, delta, temp));
1273 item_remove(it); /* release our reference */
1274 }
1275
1276 /*
1277 * adds a delta value to a numeric item.
1278 *
1279 * it item to adjust
1280 * incr true to increment value, false to decrement
1281 * delta amount to adjust value by
1282 * buf buffer for response string
1283 *
1284 * returns a response string to send back to the client.
1285 */
1286 char *do_add_delta(item *it, const int incr, unsigned int delta, char *buf) {
1287 char *ptr;
1288 unsigned int value;
1289 int res;
1290
1291 ptr = ITEM_data(it);
1292 while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++; // BUG: can't be true
1293
1294 value = strtol(ptr, NULL, 10);
1295
1296 if(errno == ERANGE) {
1297 return "CLIENT_ERROR cannot increment or decrement non-numeric value";
1298 }
1299
1300 if (incr != 0)
1301 value += delta;
1302 else {
1303 if (delta >= value) value = 0;
1304 else value -= delta;
1305 }
1306 snprintf(buf, 32, "%u", value);
1307 res = strlen(buf);
1308 if (res + 2 > it->nbytes) { /* need to realloc */
1309 item *new_it;
1310 new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
1311 if (new_it == 0) {
1312 return "SERVER_ERROR out of memory";
1313 }
1314 memcpy(ITEM_data(new_it), buf, res);
1315 memcpy(ITEM_data(new_it) + res, "\r\n", 3);
1316 do_item_replace(it, new_it);
1317 do_item_remove(new_it); /* release our reference */
1318 } else { /* replace in-place */
1319 memcpy(ITEM_data(it), buf, res);
1320 memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
1321 }
1322
1323 return buf;
1324 }
1325
1326 static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
1327 char *key;
1328 size_t nkey;
1329 item *it;
1330 time_t exptime = 0;
1331
1332 assert(c != NULL);
1333
1334 if (settings.managed) {
1335 int bucket = c->bucket;
1336 if (bucket == -1) {
1337 out_string(c, "CLIENT_ERROR no BG data in managed mode");
1338 return;
1339 }
1340 c->bucket = -1;
1341 if (buckets[bucket] != c->gen) {
1342 out_string(c, "ERROR_NOT_OWNER");
1343 return;
1344 }
1345 }
1346
1347 key = tokens[KEY_TOKEN].value;
1348 nkey = tokens[KEY_TOKEN].length;
1349
1350 if(nkey > KEY_MAX_LENGTH) {
1351 out_string(c, "CLIENT_ERROR bad command line format");
1352 return;
1353 }
1354
1355 if(ntokens == 4) {
1356 exptime = strtol(tokens[2].value, NULL, 10);
1357
1358 if(errno == ERANGE) {
1359 out_string(c, "CLIENT_ERROR bad command line format");
1360 return;
1361 }
1362 }
1363
1364 if (settings.detail_enabled) {
1365 stats_prefix_record_delete(key);
1366 }
1367
1368 it = item_get(key, nkey);
1369 if (it) {
1370 if (exptime == 0) {
1371 item_unlink(it);
1372 item_remove(it); /* release our reference */
1373 out_string(c, "DELETED");
1374 } else {
1375 /* our reference will be transfered to the delete queue */
1376 out_string(c, defer_delete(it, exptime));
1377 }
1378 } else {
1379 out_string(c, "NOT_FOUND");
1380 }
1381 }
1382
1383 /*
1384 * Adds an item to the deferred-delete list so it can be reaped later.
1385 *
1386 * Returns the result to send to the client.
1387 */
1388 char *do_defer_delete(item *it, time_t exptime)
1389 {
1390 if (delcurr >= deltotal) {
1391 item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
1392 if (new_delete) {
1393 todelete = new_delete;
1394 deltotal *= 2;
1395 } else {
1396 /*
1397 * can't delete it immediately, user wants a delay,
1398 * but we ran out of memory for the delete queue
1399 */
1400 item_remove(it); /* release reference */
1401 return "SERVER_ERROR out of memory";
1402 }
1403 }
1404
1405 /* use its expiration time as its deletion time now */
1406 it->exptime = realtime(exptime);
1407 it->it_flags |= ITEM_DELETED;
1408 todelete[delcurr++] = it;
1409
1410 return "DELETED";
1411 }
1412
1413 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
1414 unsigned int level;
1415
1416 assert(c != NULL);
1417
1418 level = strtoul(tokens[1].value, NULL, 10);
1419 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
1420 out_string(c, "OK");
1421 return;
1422 }
1423
1424 static void process_command(conn *c, char *command) {
1425
1426 token_t tokens[MAX_TOKENS];
1427 size_t ntokens;
1428 int comm;
1429
1430 assert(c != NULL);
1431
1432 if (settings.verbose > 1)
1433 fprintf(stderr, "<%d %s\n", c->sfd, command);
1434
1435 /*
1436 * for commands set/add/replace, we build an item and read the data
1437 * directly into it, then continue in nread_complete().
1438 */
1439
1440 c->msgcurr = 0;
1441 c->msgused = 0;
1442 c->iovused = 0;
1443 if (add_msghdr(c) != 0) {
1444 out_string(c, "SERVER_ERROR out of memory");
1445 return;
1446 }
1447
1448 ntokens = tokenize_command(command, tokens, MAX_TOKENS);
1449
1450 if (ntokens >= 3 &&
1451 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
1452 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
1453
1454 process_get_command(c, tokens, ntokens);
1455
1456 } else if (ntokens == 6 &&
1457 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
1458 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
1459 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
1460
1461 process_update_command(c, tokens, ntokens, comm);
1462
1463 } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
1464
1465 process_arithmetic_command(c, tokens, ntokens, 1);
1466
1467 } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
1468
1469 process_arithmetic_command(c, tokens, ntokens, 0);
1470
1471 } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
1472
1473 process_delete_command(c, tokens, ntokens);
1474
1475 } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
1476 unsigned int bucket, gen;
1477 if (!settings.managed) {
1478 out_string(c, "CLIENT_ERROR not a managed instance");
1479 return;
1480 }
1481
1482 if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1483 if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1484 out_string(c, "CLIENT_ERROR bucket number out of range");
1485 return;
1486 }
1487 buckets[bucket] = gen;
1488 out_string(c, "OWNED");
1489 return;
1490 } else {
1491 out_string(c, "CLIENT_ERROR bad format");
1492 return;
1493 }
1494
1495 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
1496
1497 int bucket;
1498 if (!settings.managed) {
1499 out_string(c, "CLIENT_ERROR not a managed instance");
1500 return;
1501 }
1502 if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
1503 if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1504 out_string(c, "CLIENT_ERROR bucket number out of range");
1505 return;
1506 }
1507 buckets[bucket] = 0;
1508 out_string(c, "DISOWNED");
1509 return;
1510 } else {
1511 out_string(c, "CLIENT_ERROR bad format");
1512 return;
1513 }
1514
1515 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
1516 int bucket, gen;
1517 if (!settings.managed) {
1518 out_string(c, "CLIENT_ERROR not a managed instance");
1519 return;
1520 }
1521 if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
1522 /* we never write anything back, even if input's wrong */
1523 if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
1524 /* do nothing, bad input */
1525 } else {
1526 c->bucket = bucket;
1527 c->gen = gen;
1528 }
1529 conn_set_state(c, conn_read);
1530 return;
1531 } else {
1532 out_string(c, "CLIENT_ERROR bad format");
1533 return;
1534 }
1535
1536 } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
1537
1538 process_stat(c, tokens, ntokens);
1539
1540 } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
1541 time_t exptime = 0;
1542 set_current_time();
1543
1544 if(ntokens == 2) {
1545 settings.oldest_live = current_time - 1;
1546 item_flush_expired();
1547 out_string(c, "OK");
1548 return;
1549 }
1550
1551 exptime = strtol(tokens[1].value, NULL, 10);
1552 if(errno == ERANGE) {
1553 out_string(c, "CLIENT_ERROR bad command line format");
1554 return;
1555 }
1556
1557 settings.oldest_live = realtime(exptime) - 1;
1558 item_flush_expired();
1559 out_string(c, "OK");
1560 return;
1561
1562 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
1563
1564 out_string(c, "VERSION " VERSION);
1565
1566 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
1567
1568 conn_set_state(c, conn_closing);
1569
1570 } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
1571 strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
1572 #ifdef ALLOW_SLABS_REASSIGN
1573
1574 int src, dst, rv;
1575
1576 src = strtol(tokens[2].value, NULL, 10);
1577 dst = strtol(tokens[3].value, NULL, 10);
1578
1579 if(errno == ERANGE) {
1580 out_string(c, "CLIENT_ERROR bad command line format");
1581 return;
1582 }
1583
1584 rv = slabs_reassign(src, dst);
1585 if (rv == 1) {
1586 out_string(c, "DONE");
1587 return;
1588 }
1589 if (rv == 0) {
1590 out_string(c, "CANT");
1591 return;
1592 }
1593 if (rv == -1) {
1594 out_string(c, "BUSY");
1595 return;
1596 }
1597 #else
1598 out_string(c, "CLIENT_ERROR Slab reassignment not supported");
1599 #endif
1600 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
1601 process_verbosity_command(c, tokens, ntokens);
1602 } else {
1603 out_string(c, "ERROR");
1604 }
1605 return;
1606 }
1607
1608 /*
1609 * if we have a complete line in the buffer, process it.
1610 */
1611 static int try_read_command(conn *c) {
1612 char *el, *cont;
1613
1614 assert(c != NULL);
1615 assert(c->rcurr <= (c->rbuf + c->rsize));
1616
1617 if (c->rbytes == 0)
1618 return 0;
1619 el = memchr(c->rcurr, '\n', c->rbytes);
1620 if (!el)
1621 return 0;
1622 cont = el + 1;
1623 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
1624 el--;
1625 }
1626 *el = '\0';
1627
1628 assert(cont <= (c->rcurr + c->rbytes));
1629
1630 process_command(c, c->rcurr);
1631
1632 c->rbytes -= (cont - c->rcurr);
1633 c->rcurr = cont;
1634
1635 assert(c->rcurr <= (c->rbuf + c->rsize));
1636
1637 return 1;
1638 }
1639
1640 /*
1641 * read a UDP request.
1642 * return 0 if there's nothing to read.
1643 */
1644 static int try_read_udp(conn *c) {
1645 int res;
1646
1647 assert(c != NULL);
1648
1649 c->request_addr_size = sizeof(c->request_addr);
1650 res = recvfrom(c->sfd, c->rbuf, c->rsize,
1651 0, &c->request_addr, &c->request_addr_size);
1652 if (res > 8) {
1653 unsigned char *buf = (unsigned char *)c->rbuf;
1654 STATS_LOCK();
1655 stats.bytes_read += res;
1656 STATS_UNLOCK();
1657
1658 /* Beginning of UDP packet is the request ID; save it. */
1659 c->request_id = buf[0] * 256 + buf[1];
1660
1661 /* If this is a multi-packet request, drop it. */
1662 if (buf[4] != 0 || buf[5] != 1) {
1663 out_string(c, "SERVER_ERROR multi-packet request not supported");
1664 return 0;
1665 }
1666
1667 /* Don't care about any of the rest of the header. */
1668 res -= 8;
1669 memmove(c->rbuf, c->rbuf + 8, res);
1670
1671 c->rbytes += res;
1672 c->rcurr = c->rbuf;
1673 return 1;
1674 }
1675 return 0;
1676 }
1677
1678 /*
1679 * read from network as much as we can, handle buffer overflow and connection
1680 * close.
1681 * before reading, move the remaining incomplete fragment of a command
1682 * (if any) to the beginning of the buffer.
1683 * return 0 if there's nothing to read on the first read.
1684 */
1685 static int try_read_network(conn *c) {
1686 int gotdata = 0;
1687 int res;
1688
1689 assert(c != NULL);
1690
1691 if (c->rcurr != c->rbuf) {
1692 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1693 memmove(c->rbuf, c->rcurr, c->rbytes);
1694 c->rcurr = c->rbuf;
1695 }
1696
1697 while (1) {
1698 if (c->rbytes >= c->rsize) {
1699 char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
1700 if (!new_rbuf) {
1701 if (settings.verbose > 0)
1702 fprintf(stderr, "Couldn't realloc input buffer\n");
1703 c->rbytes = 0; /* ignore what we read */
1704 out_string(c, "SERVER_ERROR out of memory");
1705 c->write_and_go = conn_closing;
1706 return 1;
1707 }
1708 c->rcurr = c->rbuf = new_rbuf;
1709 c->rsize *= 2;
1710 }
1711
1712 /* unix socket mode doesn't need this, so zeroed out. but why
1713 * is this done for every command? presumably for UDP
1714 * mode. */
1715 if (!settings.socketpath) {
1716 c->request_addr_size = sizeof(c->request_addr);
1717 } else {
1718 c->request_addr_size = 0;
1719 }
1720
1721 res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
1722 if (res > 0) {
1723 STATS_LOCK();
1724 stats.bytes_read += res;
1725 STATS_UNLOCK();
1726 gotdata = 1;
1727 c->rbytes += res;
1728 continue;
1729 }
1730 if (res == 0) {
1731 /* connection closed */
1732 conn_set_state(c, conn_closing);
1733 return 1;
1734 }
1735 if (res == -1) {
1736 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
1737 else return 0;
1738 }
1739 }
1740 return gotdata;
1741 }
1742
1743 static bool update_event(conn *c, const int new_flags) {
1744 assert(c != NULL);
1745
1746 struct event_base *base = c->event.ev_base;
1747 if (c->ev_flags == new_flags)
1748 return true;
1749 if (event_del(&c->event) == -1) return false;
1750 event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
1751 event_base_set(base, &c->event);
1752 c->ev_flags = new_flags;
1753 if (event_add(&c->event, 0) == -1) return false;
1754 return true;
1755 }
1756
1757 /*
1758 * Sets whether we are listening for new connections or not.
1759 */
1760 void accept_new_conns(const bool do_accept) {
1761 if (! is_listen_thread())
1762 return;
1763 if (do_accept) {
1764 update_event(listen_conn, EV_READ | EV_PERSIST);
1765 if (listen(listen_conn->sfd, 1024) != 0) {
1766 perror("listen");
1767 }
1768 }
1769 else {
1770 update_event(listen_conn, 0);
1771 if (listen(listen_conn->sfd, 0) != 0) {
1772 perror("listen");
1773 }
1774 }
1775 }
1776
1777
1778 /*
1779 * Transmit the next chunk of data from our list of msgbuf structures.
1780 *
1781 * Returns:
1782 * TRANSMIT_COMPLETE All done writing.
1783 * TRANSMIT_INCOMPLETE More data remaining to write.
1784 * TRANSMIT_SOFT_ERROR Can't write any more right now.
1785 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1786 */
1787 static int transmit(conn *c) {
1788 assert(c != NULL);
1789
1790 if (c->msgcurr < c->msgused &&
1791 c->msglist[c->msgcurr].msg_iovlen == 0) {
1792 /* Finished writing the current msg; advance to the next. */
1793 c->msgcurr++;
1794 }
1795 if (c->msgcurr < c->msgused) {
1796 ssize_t res;
1797 struct msghdr *m = &c->msglist[c->msgcurr];
1798
1799 res = sendmsg(c->sfd, m, 0);
1800 if (res > 0) {
1801 STATS_LOCK();
1802 stats.bytes_written += res;
1803 STATS_UNLOCK();
1804
1805 /* We've written some of the data. Remove the completed
1806 iovec entries from the list of pending writes. */
1807 while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
1808 res -= m->msg_iov->iov_len;
1809 m->msg_iovlen--;
1810 m->msg_iov++;
1811 }
1812
1813 /* Might have written just part of the last iovec entry;
1814 adjust it so the next write will do the rest. */
1815 if (res > 0) {
1816 m->msg_iov->iov_base += res;
1817 m->msg_iov->iov_len -= res;
1818 }
1819 return TRANSMIT_INCOMPLETE;
1820 }
1821 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1822 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1823 if (settings.verbose > 0)
1824 fprintf(stderr, "Couldn't update event\n");
1825 conn_set_state(c, conn_closing);
1826 return TRANSMIT_HARD_ERROR;
1827 }
1828 return TRANSMIT_SOFT_ERROR;
1829 }
1830 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1831 we have a real error, on which we close the connection */
1832 if (settings.verbose > 0)
1833 perror("Failed to write, and not due to blocking");
1834
1835 if (c->udp)
1836 conn_set_state(c, conn_read);
1837 else
1838 conn_set_state(c, conn_closing);
1839 return TRANSMIT_HARD_ERROR;
1840 } else {
1841 return TRANSMIT_COMPLETE;
1842 }
1843 }
1844
1845 static void drive_machine(conn *c) {
1846 bool stop = false;
1847 int sfd, flags = 1;
1848 socklen_t addrlen;
1849 struct sockaddr addr;
1850 int res;
1851
1852 assert(c != NULL);
1853
1854 while (!stop) {
1855
1856 switch(c->state) {
1857 case conn_listening:
1858 addrlen = sizeof(addr);
1859 if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
1860 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1861 /* these are transient, so don't log anything */
1862 stop = true;
1863 } else if (errno == EMFILE) {
1864 if (settings.verbose > 0)
1865 fprintf(stderr, "Too many open connections\n");
1866 accept_new_conns(false);
1867 stop = true;
1868 } else {
1869 perror("accept()");
1870 stop = true;
1871 }
1872 break;
1873 }
1874 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
1875 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
1876 perror("setting O_NONBLOCK");
1877 close(sfd);
1878 break;
1879 }
1880 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
1881 DATA_BUFFER_SIZE, false);
1882 break;
1883
1884 case conn_read:
1885 if (try_read_command(c) != 0) {
1886 continue;
1887 }
1888 if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
1889 continue;
1890 }
1891 /* we have no command line and no data to read from network */
1892 if (!update_event(c, EV_READ | EV_PERSIST)) {
1893 if (settings.verbose > 0)
1894 fprintf(stderr, "Couldn't update event\n");
1895 conn_set_state(c, conn_closing);
1896 break;
1897 }
1898 stop = true;
1899 break;
1900
1901 case conn_nread:
1902 /* we are reading rlbytes into ritem; */
1903 if (c->rlbytes == 0) {
1904 complete_nread(c);
1905 break;
1906 }
1907 /* first check if we have leftovers in the conn_read buffer */
1908 if (c->rbytes > 0) {
1909 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
1910 memcpy(c->ritem, c->rcurr, tocopy);
1911 c->ritem += tocopy;
1912 c->rlbytes -= tocopy;
1913 c->rcurr += tocopy;
1914 c->rbytes -= tocopy;
1915 break;
1916 }
1917
1918 /* now try reading from the socket */
1919 res = read(c->sfd, c->ritem, c->rlbytes);
1920 if (res > 0) {
1921 STATS_LOCK();
1922 stats.bytes_read += res;
1923 STATS_UNLOCK();
1924 c->ritem += res;
1925 c->rlbytes -= res;
1926 break;
1927 }
1928 if (res == 0) { /* end of stream */
1929 conn_set_state(c, conn_closing);
1930 break;
1931 }
1932 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1933 if (!update_event(c, EV_READ | EV_PERSIST)) {
1934 if (settings.verbose > 0)
1935 fprintf(stderr, "Couldn't update event\n");
1936 conn_set_state(c, conn_closing);
1937 break;
1938 }
1939 stop = true;
1940 break;
1941 }
1942 /* otherwise we have a real error, on which we close the connection */
1943 if (settings.verbose > 0)
1944 fprintf(stderr, "Failed to read, and not due to blocking\n");
1945 conn_set_state(c, conn_closing);
1946 break;
1947
1948 case conn_swallow:
1949 /* we are reading sbytes and throwing them away */
1950 if (c->sbytes == 0) {
1951 conn_set_state(c, conn_read);
1952 break;
1953 }
1954
1955 /* first check if we have leftovers in the conn_read buffer */
1956 if (c->rbytes > 0) {
1957 int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
1958 c->sbytes -= tocopy;
1959 c->rcurr += tocopy;
1960 c->rbytes -= tocopy;
1961 break;
1962 }
1963
1964 /* now try reading from the socket */
1965 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
1966 if (res > 0) {
1967 STATS_LOCK();
1968 stats.bytes_read += res;
1969 STATS_UNLOCK();
1970 c->sbytes -= res;
1971 break;
1972 }
1973 if (res == 0) { /* end of stream */
1974 conn_set_state(c, conn_closing);
1975 break;
1976 }
1977 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1978 if (!update_event(c, EV_READ | EV_PERSIST)) {
1979 if (settings.verbose > 0)
1980 fprintf(stderr, "Couldn't update event\n");
1981 conn_set_state(c, conn_closing);
1982 break;
1983 }
1984 stop = true;
1985 break;
1986 }
1987 /* otherwise we have a real error, on which we close the connection */
1988 if (settings.verbose > 0)
1989 fprintf(stderr, "Failed to read, and not due to blocking\n");
1990 conn_set_state(c, conn_closing);
1991 break;
1992
1993 case conn_write:
1994 /*
1995 * We want to write out a simple response. If we haven't already,
1996 * assemble it into a msgbuf list (this will be a single-entry
1997 * list for TCP or a two-entry list for UDP).
1998 */
1999 if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
2000 if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2001 (c->udp && build_udp_headers(c) != 0)) {
2002 if (settings.verbose > 0)
2003 fprintf(stderr, "Couldn't build response\n");
2004 conn_set_state(c, conn_closing);
2005 break;
2006 }
2007 }
2008
2009 /* fall through... */
2010
2011 case conn_mwrite:
2012 switch (transmit(c)) {
2013 case TRANSMIT_COMPLETE:
2014 if (c->state == conn_mwrite) {
2015 while (c->ileft > 0) {
2016 item *it = *(c->icurr);
2017 assert((it->it_flags & ITEM_SLABBED) == 0);
2018 item_remove(it);
2019 c->icurr++;
2020 c->ileft--;
2021 }
2022 conn_set_state(c, conn_read);
2023 } else if (c->state == conn_write) {
2024 if (c->write_and_free) {
2025 free(c->write_and_free);
2026 c->write_and_free = 0;
2027 }
2028 conn_set_state(c, c->write_and_go);
2029 } else {
2030 if (settings.verbose > 0)
2031 fprintf(stderr, "Unexpected state %d\n", c->state);
2032 conn_set_state(c, conn_closing);
2033 }
2034 break;
2035
2036 case TRANSMIT_INCOMPLETE:
2037 case TRANSMIT_HARD_ERROR:
2038 break; /* Continue in state machine. */
2039
2040 case TRANSMIT_SOFT_ERROR:
2041 stop = true;
2042 break;
2043 }
2044 break;
2045
2046 case conn_closing:
2047 if (c->udp)
2048 conn_cleanup(c);
2049 else
2050 conn_close(c);
2051 stop = true;
2052 break;
2053 }
2054 }
2055
2056 return;
2057 }
2058
2059 void event_handler(const int fd, const short which, void *arg) {
2060 conn *c;
2061
2062 c = (conn *)arg;
2063 assert(c != NULL);
2064
2065 c->which = which;
2066
2067 /* sanity */
2068 if (fd != c->sfd) {
2069 if (settings.verbose > 0)
2070 fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2071 conn_close(c);
2072 return;
2073 }
2074
2075 drive_machine(c);
2076
2077 /* wait for next event */
2078 return;
2079 }
2080
2081 static int new_socket(const bool is_udp) {
2082 int sfd;
2083 int flags;
2084
2085 if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
2086 perror("socket()");
2087 return -1;
2088 }
2089
2090 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2091 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2092 perror("setting O_NONBLOCK");
2093 close(sfd);
2094 return -1;
2095 }
2096 return sfd;
2097 }
2098
2099
2100 /*
2101 * Sets a socket's send buffer size to the maximum allowed by the system.
2102 */
2103 static void maximize_sndbuf(const int sfd) {
2104 socklen_t intsize = sizeof(int);
2105 int last_good = 0;
2106 int min, max, avg;
2107 int old_size;
2108
2109 /* Start with the default size. */
2110 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
2111 if (settings.verbose > 0)
2112 perror("getsockopt(SO_SNDBUF)");
2113 return;
2114 }
2115
2116 /* Binary-search for the real maximum. */
2117 min = old_size;
2118 max = MAX_SENDBUF_SIZE;
2119
2120 while (min <= max) {
2121 avg = ((unsigned int)(min + max)) / 2;
2122 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
2123 last_good = avg;
2124 min = avg + 1;
2125 } else {
2126 max = avg - 1;
2127 }
2128 }
2129
2130 if (settings.verbose > 1)
2131 fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
2132 }
2133
2134
2135 static int server_socket(const int port, const bool is_udp) {
2136 int sfd;
2137 struct linger ling = {0, 0};
2138 struct sockaddr_in addr;
2139 int flags =1;
2140
2141 if ((sfd = new_socket(is_udp)) == -1) {
2142 return -1;
2143 }
2144
2145 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2146 if (is_udp) {
2147 maximize_sndbuf(sfd);
2148 } else {
2149 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2150 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2151 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
2152 }
2153
2154 /*
2155 * the memset call clears nonstandard fields in some impementations
2156 * that otherwise mess things up.
2157 */
2158 memset(&addr, 0, sizeof(addr));
2159
2160 addr.sin_family = AF_INET;
2161 addr.sin_port = htons(port);
2162 addr.sin_addr = settings.interf;
2163 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2164 perror("bind()");
2165 close(sfd);
2166 return -1;
2167 }
2168 if (!is_udp && listen(sfd, 1024) == -1) {
2169 perror("listen()");
2170 close(sfd);
2171 return -1;
2172 }
2173 return sfd;
2174 }
2175
2176 static int new_socket_unix(void) {
2177 int sfd;
2178 int flags;
2179
2180 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
2181 perror("socket()");
2182 return -1;
2183 }
2184
2185 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2186 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2187 perror("setting O_NONBLOCK");
2188 close(sfd);
2189 return -1;
2190 }
2191 return sfd;
2192 }
2193
2194 static int server_socket_unix(const char *path) {
2195 int sfd;
2196 struct linger ling = {0, 0};
2197 struct sockaddr_un addr;
2198 struct stat tstat;
2199 int flags =1;
2200
2201 if (!path) {
2202 return -1;
2203 }
2204
2205 if ((sfd = new_socket_unix()) == -1) {
2206 return -1;
2207 }
2208
2209 /*
2210 * Clean up a previous socket file if we left it around
2211 */
2212 if (lstat(path, &tstat) == 0) {
2213 if (S_ISSOCK(tstat.st_mode))
2214 unlink(path);
2215 }
2216
2217 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2218 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2219 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2220
2221 /*
2222 * the memset call clears nonstandard fields in some impementations
2223 * that otherwise mess things up.
2224 */
2225 memset(&addr, 0, sizeof(addr));
2226
2227 addr.sun_family = AF_UNIX;
2228 strcpy(addr.sun_path, path);
2229 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2230 perror("bind()");
2231 close(sfd);
2232 return -1;
2233 }
2234 if (listen(sfd, 1024) == -1) {
2235 perror("listen()");
2236 close(sfd);
2237 return -1;
2238 }
2239 return sfd;
2240 }
2241
2242 /* listening socket */
2243 static int l_socket = 0;
2244
2245 /* udp socket */
2246 static int u_socket = -1;
2247
2248 /* invoke right before gdb is called, on assert */
2249 void pre_gdb(void) {
2250 int i;
2251 if (l_socket > -1) close(l_socket);
2252 if (u_socket > -1) close(u_socket);
2253 for (i = 3; i <= 500; i++) close(i); /* so lame */
2254 kill(getpid(), SIGABRT);
2255 }
2256
2257 /*
2258 * We keep the current time of day in a global variable that's updated by a
2259 * timer event. This saves us a bunch of time() system calls (we really only
2260 * need to get the time once a second, whereas there can be tens of thousands
2261 * of requests a second) and allows us to use server-start-relative timestamps
2262 * rather than absolute UNIX timestamps, a space savings on systems where
2263 * sizeof(time_t) > sizeof(unsigned int).
2264 */
2265 volatile rel_time_t current_time;
2266 static struct event clockevent;
2267
2268 /* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
2269 static void set_current_time(void) {
2270 current_time = (rel_time_t) (time(0) - stats.started);
2271 }
2272
2273 static void clock_handler(const int fd, const short which, void *arg) {
2274 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
2275 static bool initialized = false;
2276
2277 if (initialized) {
2278 /* only delete the event if it's actually there. */
2279 evtimer_del(&clockevent);
2280 } else {
2281 initialized = true;
2282 }
2283
2284 evtimer_set(&clockevent, clock_handler, 0);
2285 event_base_set(main_base, &clockevent);
2286 evtimer_add(&clockevent, &t);
2287
2288 set_current_time();
2289 }
2290
2291 static struct event deleteevent;
2292
2293 static void delete_handler(const int fd, const short which, void *arg) {
2294 struct timeval t = {.tv_sec = 5, .tv_usec = 0};
2295 static bool initialized = false;
2296
2297 if (initialized) {
2298 /* some versions of libevent don't like deleting events that don't exist,
2299 so only delete once we know this event has been added. */
2300 evtimer_del(&deleteevent);
2301 } else {
2302 initialized = true;
2303 }
2304
2305 evtimer_set(&deleteevent, delete_handler, 0);
2306 event_base_set(main_base, &deleteevent);
2307 evtimer_add(&deleteevent, &t);
2308 run_deferred_deletes();
2309 }
2310
2311 /* Call run_deferred_deletes instead of this. */
2312 void do_run_deferred_deletes(void)
2313 {
2314 int i, j = 0;
2315
2316 for (i = 0; i < delcurr; i++) {
2317 item *it = todelete[i];
2318 if (item_delete_lock_over(it)) {
2319 assert(it->refcount > 0);
2320 it->it_flags &= ~ITEM_DELETED;
2321 do_item_unlink(it);
2322 do_item_remove(it);
2323 } else {
2324 todelete[j++] = it;
2325 }
2326 }
2327 delcurr = j;
2328 }
2329
2330 static void usage(void) {
2331 printf(PACKAGE " " VERSION "\n");
2332 printf("-p <num> TCP port number to listen on (default: 11211)\n"
2333 "-U <num> UDP port number to listen on (default: 0, off)\n"
2334 "-s <file> unix socket path to listen on (disables network support)\n"
2335 "-l <ip_addr> interface to listen on, default is INDRR_ANY\n"
2336 "-d run as a daemon\n"
2337 "-r maximize core file limit\n"
2338 "-u <username> assume identity of <username> (only when run as root)\n"
2339 "-m <num> max memory to use for items in megabytes, default is 64 MB\n"
2340 "-M return error on memory exhausted (rather than removing items)\n"
2341 "-c <num> max simultaneous connections, default is 1024\n"
2342 "-k lock down all paged memory\n"
2343 "-v verbose (print errors/warnings while in event loop)\n"
2344 "-vv very verbose (also print client commands/reponses)\n"
2345 "-h print this help and exit\n"
2346 "-i print memcached and libevent license\n"
2347 "-b run a managed instanced (mnemonic: buckets)\n"
2348 "-P <file> save PID in <file>, only used with -d option\n"
2349 "-f <factor> chunk size growth factor, default 1.25\n"
2350 "-n <bytes> minimum space allocated for key+value+flags, default 48\n");
2351 #ifdef USE_THREADS
2352 printf("-t <num> number of threads to use, default 4\n");
2353 #endif
2354 return;
2355 }
2356
2357 static void usage_license(void) {
2358 printf(PACKAGE " " VERSION "\n\n");
2359 printf(
2360 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
2361 "All rights reserved.\n"
2362 "\n"
2363 "Redistribution and use in source and binary forms, with or without\n"
2364 "modification, are permitted provided that the following conditions are\n"
2365 "met:\n"
2366 "\n"
2367 " * Redistributions of source code must retain the above copyright\n"
2368 "notice, this list of conditions and the following disclaimer.\n"
2369 "\n"
2370 " * Redistributions in binary form must reproduce the above\n"
2371 "copyright notice, this list of conditions and the following disclaimer\n"
2372 "in the documentation and/or other materials provided with the\n"
2373 "distribution.\n"
2374 "\n"
2375 " * Neither the name of the Danga Interactive nor the names of its\n"
2376 "contributors may be used to endorse or promote products derived from\n"
2377 "this software without specific prior written permission.\n"
2378 "\n"
2379 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
2380 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
2381 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
2382 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
2383 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
2384 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
2385 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2386 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2387 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2388 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
2389 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2390 "\n"
2391 "\n"
2392 "This product includes software developed by Niels Provos.\n"
2393 "\n"
2394 "[ libevent ]\n"
2395 "\n"
2396 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
2397 "All rights reserved.\n"
2398 "\n"
2399 "Redistribution and use in source and binary forms, with or without\n"
2400 "modification, are permitted provided that the following conditions\n"
2401 "are met:\n"
2402 "1. Redistributions of source code must retain the above copyright\n"
2403 " notice, this list of conditions and the following disclaimer.\n"
2404 "2. Redistributions in binary form must reproduce the above copyright\n"
2405 " notice, this list of conditions and the following disclaimer in the\n"
2406 " documentation and/or other materials provided with the distribution.\n"
2407 "3. All advertising materials mentioning features or use of this software\n"
2408 " must display the following acknowledgement:\n"
2409 " This product includes software developed by Niels Provos.\n"
2410 "4. The name of the author may not be used to endorse or promote products\n"
2411 " derived from this software without specific prior written permission.\n"
2412 "\n"
2413 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
2414 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
2415 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
2416 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
2417 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
2418 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2419 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2420 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2421 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
2422 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2423 );
2424
2425 return;
2426 }
2427
2428 static void save_pid(const pid_t pid, const char *pid_file) {
2429 FILE *fp;
2430 if (pid_file == NULL)
2431 return;
2432
2433 if (!(fp = fopen(pid_file, "w"))) {
2434 fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
2435 return;
2436 }
2437
2438 fprintf(fp,"%ld\n", (long)pid);
2439 if (fclose(fp) == -1) {
2440 fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
2441 return;
2442 }
2443 }
2444
2445 static void remove_pidfile(const char *pid_file) {
2446 if (pid_file == NULL)
2447 return;
2448
2449 if (unlink(pid_file) != 0) {
2450 fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
2451 }
2452
2453 }
2454
2455
2456 static void sig_handler(const int sig) {
2457 printf("SIGINT handled.\n");
2458 exit(EXIT_SUCCESS);
2459 }
2460
2461 int main (int argc, char **argv) {
2462 int c;
2463 struct in_addr addr;
2464 bool lock_memory = false;
2465 bool daemonize = false;
2466 int maxcore = 0;
2467 char *username = NULL;
2468 char *pid_file = NULL;
2469 struct passwd *pw;
2470 struct sigaction sa;
2471 struct rlimit rlim;
2472
2473 /* handle SIGINT */
2474 signal(SIGINT, sig_handler);
2475
2476 /* init settings */
2477 settings_init();
2478
2479 /* set stderr non-buffering (for running under, say, daemontools) */
2480 setbuf(stderr, NULL);
2481
2482 /* process arguments */
2483 while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
2484 switch (c) {
2485 case 'U':
2486 settings.udpport = atoi(optarg);
2487 break;
2488 case 'b':
2489 settings.managed = true;
2490 break;
2491 case 'p':
2492 settings.port = atoi(optarg);
2493 break;
2494 case 's':
2495 settings.socketpath = optarg;
2496 break;
2497 case 'm':
2498 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
2499 break;
2500 case 'M':
2501 settings.evict_to_free = 0;
2502 break;
2503 case 'c':
2504 settings.maxconns = atoi(optarg);
2505 break;
2506 case 'h':
2507 usage();
2508 exit(EXIT_SUCCESS);
2509 case 'i':
2510 usage_license();
2511 exit(EXIT_SUCCESS);
2512 case 'k':
2513 lock_memory = true;
2514 break;
2515 case 'v':
2516 settings.verbose++;
2517 break;
2518 case 'l':
2519 if (inet_pton(AF_INET, optarg, &addr) <= 0) {
2520 fprintf(stderr, "Illegal address: %s\n", optarg);
2521 return 1;
2522 } else {
2523 settings.interf = addr;
2524 }
2525 break;
2526 case 'd':
2527 daemonize = true;
2528 break;
2529 case 'r':
2530 maxcore = 1;
2531 break;
2532 case 'u':
2533 username = optarg;
2534 break;
2535 case 'P':
2536 pid_file = optarg;
2537 break;
2538 case 'f':
2539 settings.factor = atof(optarg);
2540 if (settings.factor <= 1.0) {
2541 fprintf(stderr, "Factor must be greater than 1\n");
2542 return 1;
2543 }
2544 break;
2545 case 'n':
2546 settings.chunk_size = atoi(optarg);
2547 if (settings.chunk_size == 0) {
2548 fprintf(stderr, "Chunk size must be greater than 0\n");
2549 return 1;
2550 }
2551 break;
2552 case 't':
2553 settings.num_threads = atoi(optarg);
2554 if (settings.num_threads == 0) {
2555 fprintf(stderr, "Number of threads must be greater than 0\n");
2556 return 1;
2557 }
2558 break;
2559 case 'D':
2560 if (! optarg || ! optarg[0]) {
2561 fprintf(stderr, "No delimiter specified\n");
2562 return 1;
2563 }
2564 settings.prefix_delimiter = optarg[0];
2565 settings.detail_enabled = 1;
2566 break;
2567 default:
2568 fprintf(stderr, "Illegal argument \"%c\"\n", c);
2569 return 1;
2570 }
2571 }
2572
2573 if (maxcore != 0) {
2574 struct rlimit rlim_new;
2575 /*
2576 * First try raising to infinity; if that fails, try bringing
2577 * the soft limit to the hard.
2578 */
2579 if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
2580 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
2581 if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
2582 /* failed. try raising just to the old max */
2583 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
2584 (void)setrlimit(RLIMIT_CORE, &rlim_new);
2585 }
2586 }
2587 /*
2588 * getrlimit again to see what we ended up with. Only fail if
2589 * the soft limit ends up 0, because then no core files will be
2590 * created at all.
2591 */
2592
2593 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
2594 fprintf(stderr, "failed to ensure corefile creation\n");
2595 exit(EXIT_FAILURE);
2596 }
2597 }
2598
2599 /*
2600 * If needed, increase rlimits to allow as many connections
2601 * as needed.
2602 */
2603
2604 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2605 fprintf(stderr, "failed to getrlimit number of files\n");
2606 exit(EXIT_FAILURE);
2607 } else {
2608 int maxfiles = settings.maxconns;
2609 if (rlim.rlim_cur < maxfiles)
2610 rlim.rlim_cur = maxfiles + 3;
2611 if (rlim.rlim_max < rlim.rlim_cur)
2612 rlim.rlim_max = rlim.rlim_cur;
2613 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2614 fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n");
2615 exit(EXIT_FAILURE);
2616 }
2617 }
2618
2619 /*
2620 * initialization order: first create the listening sockets
2621 * (may need root on low ports), then drop root if needed,
2622 * then daemonise if needed, then init libevent (in some cases
2623 * descriptors created by libevent wouldn't survive forking).
2624 */
2625
2626 /* create the listening socket and bind it */
2627 if (settings.socketpath == NULL) {
2628 l_socket = server_socket(settings.port, 0);
2629 if (l_socket == -1) {
2630 fprintf(stderr, "failed to listen\n");
2631 exit(EXIT_FAILURE);
2632 }
2633 }
2634
2635 if (settings.udpport > 0 && settings.socketpath == NULL) {
2636 /* create the UDP listening socket and bind it */
2637 u_socket = server_socket(settings.udpport, 1);
2638 if (u_socket == -1) {
2639 fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
2640 exit(EXIT_FAILURE);
2641 }
2642 }
2643
2644 /* lose root privileges if we have them */
2645 if (getuid() == 0 || geteuid() == 0) {
2646 if (username == 0 || *username == '\0') {
2647 fprintf(stderr, "can't run as root without the -u switch\n");
2648 return 1;
2649 }
2650 if ((pw = getpwnam(username)) == 0) {
2651 fprintf(stderr, "can't find the user %s to switch to\n", username);
2652 return 1;
2653 }
2654 if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
2655 fprintf(stderr, "failed to assume identity of user %s\n", username);
2656 return 1;
2657 }
2658 }
2659
2660 /* create unix mode sockets after dropping privileges */
2661 if (settings.socketpath != NULL) {
2662 l_socket = server_socket_unix(settings.socketpath);
2663 if (l_socket == -1) {
2664 fprintf(stderr, "failed to listen\n");
2665 exit(EXIT_FAILURE);
2666 }
2667 }
2668
2669 /* daemonize if requested */
2670 /* if we want to ensure our ability to dump core, don't chdir to / */
2671 if (daemonize) {
2672 int res;
2673 res = daemon(maxcore, settings.verbose);
2674 if (res == -1) {
2675 fprintf(stderr, "failed to daemon() in order to daemonize\n");
2676 return 1;
2677 }
2678 }
2679
2680 /* initialize main thread libevent instance */
2681 main_base = event_init();
2682
2683 /* initialize other stuff */
2684 item_init();
2685 stats_init();
2686 assoc_init();
2687 conn_init();
2688 slabs_init(settings.maxbytes, settings.factor);
2689
2690 /* managed instance? alloc and zero a bucket array */
2691 if (settings.managed) {
2692 buckets = malloc(sizeof(int) * MAX_BUCKETS);
2693 if (buckets == 0) {
2694 fprintf(stderr, "failed to allocate the bucket array");
2695 exit(EXIT_FAILURE);
2696 }
2697 memset(buckets, 0, sizeof(int) * MAX_BUCKETS);
2698 }
2699
2700 /* lock paged memory if needed */
2701 if (lock_memory) {
2702 #ifdef HAVE_MLOCKALL
2703 mlockall(MCL_CURRENT | MCL_FUTURE);
2704 #else
2705 fprintf(stderr, "warning: mlockall() not supported on this platform. proceeding without.\n");
2706 #endif
2707 }
2708
2709 /*
2710 * ignore SIGPIPE signals; we can use errno==EPIPE if we
2711 * need that information
2712 */
2713 sa.sa_handler = SIG_IGN;
2714 sa.sa_flags = 0;
2715 if (sigemptyset(&sa.sa_mask) == -1 ||
2716 sigaction(SIGPIPE, &sa, 0) == -1) {
2717 perror("failed to ignore SIGPIPE; sigaction");
2718 exit(EXIT_FAILURE);
2719 }
2720 /* create the initial listening connection */
2721 if (!(listen_conn = conn_new(l_socket, conn_listening,
2722 EV_READ | EV_PERSIST, 1, false, main_base))) {
2723 fprintf(stderr, "failed to create listening connection");
2724 exit(EXIT_FAILURE);
2725 }
2726 /* save the PID in if we're a daemon */
2727 if (daemonize)
2728 save_pid(getpid(), pid_file);
2729 /* start up worker threads if MT mode */
2730 thread_init(settings.num_threads, main_base);
2731 /* initialise clock event */
2732 clock_handler(0, 0, 0);
2733 /* initialise deletion array and timer event */
2734 deltotal = 200;
2735 delcurr = 0;
2736 todelete = malloc(sizeof(item *) * deltotal);
2737 delete_handler(0, 0, 0); /* sets up the event */
2738 /* create the initial listening udp connection, monitored on all threads */
2739 if (u_socket > -1) {
2740 for (c = 0; c < settings.num_threads; c++) {
2741 /* this is guaranteed to hit all threads because we round-robin */
2742 dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
2743 UDP_READ_BUFFER_SIZE, 1);
2744 }
2745 }
2746 /* enter the event loop */
2747 event_base_loop(main_base, 0);
2748 /* remove the PID file if we're a daemon */
2749 if (daemonize)
2750 remove_pidfile(pid_file);
2751 return 0;
2752 }