Mercurial > hg > memcached
annotate memcached.c @ 3:0b6ac95a09bb default tip
Fix unix sockets support under FreeBSD.
author | Maxim Dounin <mdounin@mdounin.ru> |
---|---|
date | Wed, 03 Oct 2007 05:34:42 +0400 |
parents | e28ab6bd21fa |
children |
rev | line source |
---|---|
0 | 1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 /* | |
3 * memcached - memory caching daemon | |
4 * | |
5 * http://www.danga.com/memcached/ | |
6 * | |
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved. | |
8 * | |
9 * Use and distribution licensed under the BSD license. See | |
10 * the LICENSE file for full text. | |
11 * | |
12 * Authors: | |
13 * Anatoly Vorobey <mellon@pobox.com> | |
14 * Brad Fitzpatrick <brad@danga.com> | |
15 std * | |
16 * $Id: memcached.c 570 2007-06-20 01:21:45Z plindner $ | |
17 */ | |
18 #include "memcached.h" | |
19 #include <sys/stat.h> | |
20 #include <sys/socket.h> | |
21 #include <sys/un.h> | |
22 #include <sys/signal.h> | |
23 #include <sys/resource.h> | |
24 #include <sys/uio.h> | |
25 | |
26 /* some POSIX systems need the following definition | |
27 * to get mlockall flags out of sys/mman.h. */ | |
28 #ifndef _P1003_1B_VISIBLE | |
29 #define _P1003_1B_VISIBLE | |
30 #endif | |
31 /* need this to get IOV_MAX on some platforms. */ | |
32 #ifndef __need_IOV_MAX | |
33 #define __need_IOV_MAX | |
34 #endif | |
35 #include <pwd.h> | |
36 #include <sys/mman.h> | |
37 #include <fcntl.h> | |
38 #include <netinet/tcp.h> | |
39 #include <arpa/inet.h> | |
40 #include <errno.h> | |
41 #include <stdlib.h> | |
42 #include <stdio.h> | |
43 #include <string.h> | |
44 #include <time.h> | |
45 #include <assert.h> | |
46 #include <limits.h> | |
47 | |
48 #ifdef HAVE_MALLOC_H | |
49 /* OpenBSD has a malloc.h, but warns to use stdlib.h instead */ | |
50 #ifndef __OpenBSD__ | |
51 #include <malloc.h> | |
52 #endif | |
53 #endif | |
54 | |
55 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */ | |
56 #ifndef IOV_MAX | |
57 #if defined(__FreeBSD__) | |
58 # define IOV_MAX 1024 | |
59 #endif | |
60 #endif | |
61 | |
62 /* | |
63 * forward declarations | |
64 */ | |
65 static void drive_machine(conn *c); | |
66 static int new_socket(const bool is_udp); | |
67 static int server_socket(const int port, const bool is_udp); | |
68 static int try_read_command(conn *c); | |
69 static int try_read_network(conn *c); | |
70 static int try_read_udp(conn *c); | |
71 | |
72 /* stats */ | |
73 static void stats_reset(void); | |
74 static void stats_init(void); | |
75 | |
76 /* defaults */ | |
77 static void settings_init(void); | |
78 | |
79 /* event handling, network IO */ | |
80 static void event_handler(const int fd, const short which, void *arg); | |
81 static void conn_close(conn *c); | |
82 static void conn_init(void); | |
83 static void accept_new_conns(const bool do_accept); | |
84 static bool update_event(conn *c, const int new_flags); | |
85 static void complete_nread(conn *c); | |
86 static void process_command(conn *c, char *command); | |
87 static int transmit(conn *c); | |
88 static int ensure_iov_space(conn *c); | |
89 static int add_iov(conn *c, const void *buf, int len); | |
90 static int add_msghdr(conn *c); | |
91 | |
92 | |
93 /* time handling */ | |
94 static void set_current_time(void); /* update the global variable holding | |
95 global 32-bit seconds-since-start time | |
96 (to avoid 64 bit time_t) */ | |
97 | |
98 void pre_gdb(void); | |
99 static void conn_free(conn *c); | |
100 | |
101 /** exported globals **/ | |
102 struct stats stats; | |
103 struct settings settings; | |
104 | |
105 /** file scope variables **/ | |
106 static item **todelete = 0; | |
107 static int delcurr; | |
108 static int deltotal; | |
109 static conn *listen_conn; | |
110 static struct event_base *main_base; | |
111 | |
112 #define TRANSMIT_COMPLETE 0 | |
113 #define TRANSMIT_INCOMPLETE 1 | |
114 #define TRANSMIT_SOFT_ERROR 2 | |
115 #define TRANSMIT_HARD_ERROR 3 | |
116 | |
117 static int *buckets = 0; /* bucket->generation array for a managed instance */ | |
118 | |
119 #define REALTIME_MAXDELTA 60*60*24*30 | |
120 /* | |
121 * given time value that's either unix time or delta from current unix time, return | |
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't | |
123 * be that low). | |
124 */ | |
125 static rel_time_t realtime(const time_t exptime) { | |
126 /* no. of seconds in 30 days - largest possible delta exptime */ | |
127 | |
128 if (exptime == 0) return 0; /* 0 means never expire */ | |
129 | |
130 if (exptime > REALTIME_MAXDELTA) { | |
131 /* if item expiration is at/before the server started, give it an | |
132 expiration time of 1 second after the server started. | |
133 (because 0 means don't expire). without this, we'd | |
134 underflow and wrap around to some large value way in the | |
135 future, effectively making items expiring in the past | |
136 really expiring never */ | |
137 if (exptime <= stats.started) | |
138 return (rel_time_t)1; | |
139 return (rel_time_t)(exptime - stats.started); | |
140 } else { | |
141 return (rel_time_t)(exptime + current_time); | |
142 } | |
143 } | |
144 | |
145 static void stats_init(void) { | |
146 stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0; | |
147 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0; | |
148 stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0; | |
149 | |
150 /* make the time we started always be 2 seconds before we really | |
151 did, so time(0) - time.started is never zero. if so, things | |
152 like 'settings.oldest_live' which act as booleans as well as | |
153 values are now false in boolean context... */ | |
154 stats.started = time(0) - 2; | |
155 stats_prefix_init(); | |
156 } | |
157 | |
158 static void stats_reset(void) { | |
159 STATS_LOCK(); | |
160 stats.total_items = stats.total_conns = 0; | |
161 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0; | |
162 stats.bytes_read = stats.bytes_written = 0; | |
163 stats_prefix_clear(); | |
164 STATS_UNLOCK(); | |
165 } | |
166 | |
167 static void settings_init(void) { | |
168 settings.port = 11211; | |
169 settings.udpport = 0; | |
170 settings.interf.s_addr = htonl(INADDR_ANY); | |
171 settings.maxbytes = 67108864; /* default is 64MB: (64 * 1024 * 1024) */ | |
172 settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */ | |
173 settings.verbose = 0; | |
174 settings.oldest_live = 0; | |
175 settings.evict_to_free = 1; /* push old items out of cache when memory runs out */ | |
176 settings.socketpath = NULL; /* by default, not using a unix socket */ | |
177 settings.managed = false; | |
178 settings.factor = 1.25; | |
179 settings.chunk_size = 48; /* space for a modest key and value */ | |
180 #ifdef USE_THREADS | |
181 settings.num_threads = 4; | |
182 #else | |
183 settings.num_threads = 1; | |
184 #endif | |
185 settings.prefix_delimiter = ':'; | |
186 settings.detail_enabled = 0; | |
187 } | |
188 | |
189 /* returns true if a deleted item's delete-locked-time is over, and it | |
190 should be removed from the namespace */ | |
191 static bool item_delete_lock_over (item *it) { | |
192 assert(it->it_flags & ITEM_DELETED); | |
193 return (current_time >= it->exptime); | |
194 } | |
195 | |
196 /* | |
197 * Adds a message header to a connection. | |
198 * | |
199 * Returns 0 on success, -1 on out-of-memory. | |
200 */ | |
201 static int add_msghdr(conn *c) | |
202 { | |
203 struct msghdr *msg; | |
204 | |
205 assert(c != NULL); | |
206 | |
207 if (c->msgsize == c->msgused) { | |
208 msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr)); | |
209 if (! msg) | |
210 return -1; | |
211 c->msglist = msg; | |
212 c->msgsize *= 2; | |
213 } | |
214 | |
215 msg = c->msglist + c->msgused; | |
216 | |
217 /* this wipes msg_iovlen, msg_control, msg_controllen, and | |
218 msg_flags, the last 3 of which aren't defined on solaris: */ | |
219 memset(msg, 0, sizeof(struct msghdr)); | |
220 | |
221 msg->msg_iov = &c->iov[c->iovused]; | |
3
0b6ac95a09bb
Fix unix sockets support under FreeBSD.
Maxim Dounin <mdounin@mdounin.ru>
parents:
2
diff
changeset
|
222 |
0b6ac95a09bb
Fix unix sockets support under FreeBSD.
Maxim Dounin <mdounin@mdounin.ru>
parents:
2
diff
changeset
|
223 if (c->request_addr_size > 0) { |
0b6ac95a09bb
Fix unix sockets support under FreeBSD.
Maxim Dounin <mdounin@mdounin.ru>
parents:
2
diff
changeset
|
224 msg->msg_name = &c->request_addr; |
0b6ac95a09bb
Fix unix sockets support under FreeBSD.
Maxim Dounin <mdounin@mdounin.ru>
parents:
2
diff
changeset
|
225 msg->msg_namelen = c->request_addr_size; |
0b6ac95a09bb
Fix unix sockets support under FreeBSD.
Maxim Dounin <mdounin@mdounin.ru>
parents:
2
diff
changeset
|
226 } |
0 | 227 |
228 c->msgbytes = 0; | |
229 c->msgused++; | |
230 | |
231 if (c->udp) { | |
232 /* Leave room for the UDP header, which we'll fill in later. */ | |
233 return add_iov(c, NULL, UDP_HEADER_SIZE); | |
234 } | |
235 | |
236 return 0; | |
237 } | |
238 | |
239 | |
240 /* | |
241 * Free list management for connections. | |
242 */ | |
243 | |
244 static conn **freeconns; | |
245 static int freetotal; | |
246 static int freecurr; | |
247 | |
248 | |
249 static void conn_init(void) { | |
250 freetotal = 200; | |
251 freecurr = 0; | |
252 if (!(freeconns = (conn **)malloc(sizeof(conn *) * freetotal))) { | |
253 perror("malloc()"); | |
254 } | |
255 return; | |
256 } | |
257 | |
258 /* | |
259 * Returns a connection from the freelist, if any. Should call this using | |
260 * conn_from_freelist() for thread safety. | |
261 */ | |
262 conn *do_conn_from_freelist() { | |
263 conn *c; | |
264 | |
265 if (freecurr > 0) { | |
266 c = freeconns[--freecurr]; | |
267 } else { | |
268 c = NULL; | |
269 } | |
270 | |
271 return c; | |
272 } | |
273 | |
274 /* | |
275 * Adds a connection to the freelist. 0 = success. Should call this using | |
276 * conn_add_to_freelist() for thread safety. | |
277 */ | |
278 int do_conn_add_to_freelist(conn *c) { | |
279 if (freecurr < freetotal) { | |
280 freeconns[freecurr++] = c; | |
281 return 0; | |
282 } else { | |
283 /* try to enlarge free connections array */ | |
284 conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2); | |
285 if (new_freeconns) { | |
286 freetotal *= 2; | |
287 freeconns = new_freeconns; | |
288 freeconns[freecurr++] = c; | |
289 return 0; | |
290 } | |
291 } | |
292 return 1; | |
293 } | |
294 | |
295 conn *conn_new(const int sfd, const int init_state, const int event_flags, | |
296 const int read_buffer_size, const bool is_udp, struct event_base *base) { | |
297 conn *c = conn_from_freelist(); | |
298 | |
299 if (NULL == c) { | |
300 if (!(c = (conn *)malloc(sizeof(conn)))) { | |
301 perror("malloc()"); | |
302 return NULL; | |
303 } | |
304 c->rbuf = c->wbuf = 0; | |
305 c->ilist = 0; | |
306 c->iov = 0; | |
307 c->msglist = 0; | |
308 c->hdrbuf = 0; | |
309 | |
310 c->rsize = read_buffer_size; | |
311 c->wsize = DATA_BUFFER_SIZE; | |
312 c->isize = ITEM_LIST_INITIAL; | |
313 c->iovsize = IOV_LIST_INITIAL; | |
314 c->msgsize = MSG_LIST_INITIAL; | |
315 c->hdrsize = 0; | |
316 | |
317 c->rbuf = (char *)malloc((size_t)c->rsize); | |
318 c->wbuf = (char *)malloc((size_t)c->wsize); | |
319 c->ilist = (item **)malloc(sizeof(item *) * c->isize); | |
320 c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize); | |
321 c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize); | |
322 | |
323 if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 || | |
324 c->msglist == 0) { | |
325 if (c->rbuf != 0) free(c->rbuf); | |
326 if (c->wbuf != 0) free(c->wbuf); | |
327 if (c->ilist !=0) free(c->ilist); | |
328 if (c->iov != 0) free(c->iov); | |
329 if (c->msglist != 0) free(c->msglist); | |
330 free(c); | |
331 perror("malloc()"); | |
332 return NULL; | |
333 } | |
334 | |
335 STATS_LOCK(); | |
336 stats.conn_structs++; | |
337 STATS_UNLOCK(); | |
338 } | |
339 | |
340 if (settings.verbose > 1) { | |
341 if (init_state == conn_listening) | |
342 fprintf(stderr, "<%d server listening\n", sfd); | |
343 else if (is_udp) | |
344 fprintf(stderr, "<%d server listening (udp)\n", sfd); | |
345 else | |
346 fprintf(stderr, "<%d new client connection\n", sfd); | |
347 } | |
348 | |
349 c->sfd = sfd; | |
350 c->udp = is_udp; | |
351 c->state = init_state; | |
352 c->rlbytes = 0; | |
353 c->rbytes = c->wbytes = 0; | |
354 c->wcurr = c->wbuf; | |
355 c->rcurr = c->rbuf; | |
356 c->ritem = 0; | |
357 c->icurr = c->ilist; | |
358 c->ileft = 0; | |
359 c->iovused = 0; | |
360 c->msgcurr = 0; | |
361 c->msgused = 0; | |
362 | |
363 c->write_and_go = conn_read; | |
364 c->write_and_free = 0; | |
365 c->item = 0; | |
366 c->bucket = -1; | |
367 c->gen = 0; | |
368 | |
369 event_set(&c->event, sfd, event_flags, event_handler, (void *)c); | |
370 event_base_set(base, &c->event); | |
371 c->ev_flags = event_flags; | |
372 | |
373 if (event_add(&c->event, 0) == -1) { | |
374 if (conn_add_to_freelist(c)) { | |
375 conn_free(c); | |
376 } | |
377 return NULL; | |
378 } | |
379 | |
380 STATS_LOCK(); | |
381 stats.curr_conns++; | |
382 stats.total_conns++; | |
383 STATS_UNLOCK(); | |
384 | |
385 return c; | |
386 } | |
387 | |
388 static void conn_cleanup(conn *c) { | |
389 assert(c != NULL); | |
390 | |
391 if (c->item) { | |
392 item_remove(c->item); | |
393 c->item = 0; | |
394 } | |
395 | |
396 if (c->ileft != 0) { | |
397 for (; c->ileft > 0; c->ileft--,c->icurr++) { | |
398 item_remove(*(c->icurr)); | |
399 } | |
400 } | |
401 | |
402 if (c->write_and_free) { | |
403 free(c->write_and_free); | |
404 c->write_and_free = 0; | |
405 } | |
406 } | |
407 | |
408 /* | |
409 * Frees a connection. | |
410 */ | |
411 void conn_free(conn *c) { | |
412 if (c) { | |
413 if (c->hdrbuf) | |
414 free(c->hdrbuf); | |
415 if (c->msglist) | |
416 free(c->msglist); | |
417 if (c->rbuf) | |
418 free(c->rbuf); | |
419 if (c->wbuf) | |
420 free(c->wbuf); | |
421 if (c->ilist) | |
422 free(c->ilist); | |
423 if (c->iov) | |
424 free(c->iov); | |
425 free(c); | |
426 } | |
427 } | |
428 | |
429 static void conn_close(conn *c) { | |
430 assert(c != NULL); | |
431 | |
432 /* delete the event, the socket and the conn */ | |
433 event_del(&c->event); | |
434 | |
435 if (settings.verbose > 1) | |
436 fprintf(stderr, "<%d connection closed.\n", c->sfd); | |
437 | |
438 close(c->sfd); | |
439 accept_new_conns(true); | |
440 conn_cleanup(c); | |
441 | |
442 /* if the connection has big buffers, just free it */ | |
443 if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) { | |
444 conn_free(c); | |
445 } | |
446 | |
447 STATS_LOCK(); | |
448 stats.curr_conns--; | |
449 STATS_UNLOCK(); | |
450 | |
451 return; | |
452 } | |
453 | |
454 | |
455 /* | |
456 * Shrinks a connection's buffers if they're too big. This prevents | |
457 * periodic large "get" requests from permanently chewing lots of server | |
458 * memory. | |
459 * | |
460 * This should only be called in between requests since it can wipe output | |
461 * buffers! | |
462 */ | |
463 static void conn_shrink(conn *c) { | |
464 assert(c != NULL); | |
465 | |
466 if (c->udp) | |
467 return; | |
468 | |
469 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { | |
470 char *newbuf; | |
471 | |
472 if (c->rcurr != c->rbuf) | |
473 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); | |
474 | |
475 newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE); | |
476 | |
477 if (newbuf) { | |
478 c->rbuf = newbuf; | |
479 c->rsize = DATA_BUFFER_SIZE; | |
480 } | |
481 /* TODO check other branch... */ | |
482 c->rcurr = c->rbuf; | |
483 } | |
484 | |
485 if (c->isize > ITEM_LIST_HIGHWAT) { | |
486 item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0])); | |
487 if (newbuf) { | |
488 c->ilist = newbuf; | |
489 c->isize = ITEM_LIST_INITIAL; | |
490 } | |
491 /* TODO check error condition? */ | |
492 } | |
493 | |
494 if (c->msgsize > MSG_LIST_HIGHWAT) { | |
495 struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0])); | |
496 if (newbuf) { | |
497 c->msglist = newbuf; | |
498 c->msgsize = MSG_LIST_INITIAL; | |
499 } | |
500 /* TODO check error condition? */ | |
501 } | |
502 | |
503 if (c->iovsize > IOV_LIST_HIGHWAT) { | |
504 struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0])); | |
505 if (newbuf) { | |
506 c->iov = newbuf; | |
507 c->iovsize = IOV_LIST_INITIAL; | |
508 } | |
509 /* TODO check return value */ | |
510 } | |
511 } | |
512 | |
513 /* | |
514 * Sets a connection's current state in the state machine. Any special | |
515 * processing that needs to happen on certain state transitions can | |
516 * happen here. | |
517 */ | |
518 static void conn_set_state(conn *c, int state) { | |
519 assert(c != NULL); | |
520 | |
521 if (state != c->state) { | |
522 if (state == conn_read) { | |
523 conn_shrink(c); | |
524 assoc_move_next_bucket(); | |
525 } | |
526 c->state = state; | |
527 } | |
528 } | |
529 | |
530 | |
531 /* | |
532 * Ensures that there is room for another struct iovec in a connection's | |
533 * iov list. | |
534 * | |
535 * Returns 0 on success, -1 on out-of-memory. | |
536 */ | |
537 static int ensure_iov_space(conn *c) { | |
538 assert(c != NULL); | |
539 | |
540 if (c->iovused >= c->iovsize) { | |
541 int i, iovnum; | |
542 struct iovec *new_iov = (struct iovec *)realloc(c->iov, | |
543 (c->iovsize * 2) * sizeof(struct iovec)); | |
544 if (! new_iov) | |
545 return -1; | |
546 c->iov = new_iov; | |
547 c->iovsize *= 2; | |
548 | |
549 /* Point all the msghdr structures at the new list. */ | |
550 for (i = 0, iovnum = 0; i < c->msgused; i++) { | |
551 c->msglist[i].msg_iov = &c->iov[iovnum]; | |
552 iovnum += c->msglist[i].msg_iovlen; | |
553 } | |
554 } | |
555 | |
556 return 0; | |
557 } | |
558 | |
559 | |
560 /* | |
561 * Adds data to the list of pending data that will be written out to a | |
562 * connection. | |
563 * | |
564 * Returns 0 on success, -1 on out-of-memory. | |
565 */ | |
566 | |
567 static int add_iov(conn *c, const void *buf, int len) { | |
568 struct msghdr *m; | |
569 int leftover; | |
570 bool limit_to_mtu; | |
571 | |
572 assert(c != NULL); | |
573 | |
574 do { | |
575 m = &c->msglist[c->msgused - 1]; | |
576 | |
577 /* | |
578 * Limit UDP packets, and the first payloads of TCP replies, to | |
579 * UDP_MAX_PAYLOAD_SIZE bytes. | |
580 */ | |
581 limit_to_mtu = c->udp || (1 == c->msgused); | |
582 | |
583 /* We may need to start a new msghdr if this one is full. */ | |
584 if (m->msg_iovlen == IOV_MAX || | |
585 (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) { | |
586 add_msghdr(c); | |
587 m = &c->msglist[c->msgused - 1]; | |
588 } | |
589 | |
590 if (ensure_iov_space(c) != 0) | |
591 return -1; | |
592 | |
593 /* If the fragment is too big to fit in the datagram, split it up */ | |
594 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) { | |
595 leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE; | |
596 len -= leftover; | |
597 } else { | |
598 leftover = 0; | |
599 } | |
600 | |
601 m = &c->msglist[c->msgused - 1]; | |
602 m->msg_iov[m->msg_iovlen].iov_base = (void *)buf; | |
603 m->msg_iov[m->msg_iovlen].iov_len = len; | |
604 | |
605 c->msgbytes += len; | |
606 c->iovused++; | |
607 m->msg_iovlen++; | |
608 | |
609 buf = ((char *)buf) + len; | |
610 len = leftover; | |
611 } while (leftover > 0); | |
612 | |
613 return 0; | |
614 } | |
615 | |
616 | |
617 /* | |
618 * Constructs a set of UDP headers and attaches them to the outgoing messages. | |
619 */ | |
620 static int build_udp_headers(conn *c) { | |
621 int i; | |
622 unsigned char *hdr; | |
623 | |
624 assert(c != NULL); | |
625 | |
626 if (c->msgused > c->hdrsize) { | |
627 void *new_hdrbuf; | |
628 if (c->hdrbuf) | |
629 new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE); | |
630 else | |
631 new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE); | |
632 if (! new_hdrbuf) | |
633 return -1; | |
634 c->hdrbuf = (unsigned char *)new_hdrbuf; | |
635 c->hdrsize = c->msgused * 2; | |
636 } | |
637 | |
638 hdr = c->hdrbuf; | |
639 for (i = 0; i < c->msgused; i++) { | |
640 c->msglist[i].msg_iov[0].iov_base = hdr; | |
641 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE; | |
642 *hdr++ = c->request_id / 256; | |
643 *hdr++ = c->request_id % 256; | |
644 *hdr++ = i / 256; | |
645 *hdr++ = i % 256; | |
646 *hdr++ = c->msgused / 256; | |
647 *hdr++ = c->msgused % 256; | |
648 *hdr++ = 0; | |
649 *hdr++ = 0; | |
650 assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE); | |
651 } | |
652 | |
653 return 0; | |
654 } | |
655 | |
656 | |
657 static void out_string(conn *c, const char *str) { | |
658 size_t len; | |
659 | |
660 assert(c != NULL); | |
661 | |
662 if (settings.verbose > 1) | |
663 fprintf(stderr, ">%d %s\n", c->sfd, str); | |
664 | |
665 len = strlen(str); | |
666 if ((len + 2) > c->wsize) { | |
667 /* ought to be always enough. just fail for simplicity */ | |
668 str = "SERVER_ERROR output line too long"; | |
669 len = strlen(str); | |
670 } | |
671 | |
672 memcpy(c->wbuf, str, len); | |
673 memcpy(c->wbuf + len, "\r\n", 3); | |
674 c->wbytes = len + 2; | |
675 c->wcurr = c->wbuf; | |
676 | |
677 conn_set_state(c, conn_write); | |
678 c->write_and_go = conn_read; | |
679 return; | |
680 } | |
681 | |
682 /* | |
683 * we get here after reading the value in set/add/replace commands. The command | |
684 * has been stored in c->item_comm, and the item is ready in c->item. | |
685 */ | |
686 | |
687 static void complete_nread(conn *c) { | |
688 assert(c != NULL); | |
689 | |
690 item *it = c->item; | |
691 int comm = c->item_comm; | |
692 | |
693 STATS_LOCK(); | |
694 stats.set_cmds++; | |
695 STATS_UNLOCK(); | |
696 | |
697 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { | |
698 out_string(c, "CLIENT_ERROR bad data chunk"); | |
699 } else { | |
700 if (store_item(it, comm)) { | |
701 out_string(c, "STORED"); | |
702 } else { | |
703 out_string(c, "NOT_STORED"); | |
704 } | |
705 } | |
706 | |
707 item_remove(c->item); /* release the c->item reference */ | |
708 c->item = 0; | |
709 } | |
710 | |
711 /* | |
712 * Stores an item in the cache according to the semantics of one of the set | |
713 * commands. In threaded mode, this is protected by the cache lock. | |
714 * | |
715 * Returns true if the item was stored. | |
716 */ | |
717 int do_store_item(item *it, int comm) { | |
718 char *key = ITEM_key(it); | |
719 bool delete_locked = false; | |
720 item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked); | |
721 int stored = 0; | |
722 | |
2 | 723 item *new_it = NULL; |
724 int flags; | |
725 | |
0 | 726 if (old_it != NULL && comm == NREAD_ADD) { |
727 /* add only adds a nonexistent item, but promote to head of LRU */ | |
728 do_item_update(old_it); | |
2 | 729 } else if (!old_it && (comm == NREAD_REPLACE |
730 || comm == NREAD_APPEND || comm == NREAD_PREPEND)) | |
731 { | |
0 | 732 /* replace only replaces an existing value; don't store */ |
2 | 733 } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD |
734 || comm == NREAD_APPEND || comm == NREAD_PREPEND)) | |
735 { | |
0 | 736 /* replace and add can't override delete locks; don't store */ |
737 } else { | |
2 | 738 |
739 /* | |
740 * Append - combine new and old record into single one. Here it's | |
741 * atomic and thread-safe. | |
742 */ | |
743 | |
744 if (comm == NREAD_APPEND || comm == NREAD_PREPEND) { | |
745 | |
746 /* we have it and old_it here - alloc memory to hold both */ | |
747 /* flags was already lost - so recover them from ITEM_suffix(it) */ | |
748 | |
749 flags = (int) strtol(ITEM_suffix(it), (char **) NULL, 10); | |
750 | |
751 new_it = do_item_alloc(key, it->nkey, flags, it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */); | |
752 | |
753 if (new_it == NULL) { | |
754 /* SERVER_ERROR out of memory */ | |
755 return 0; | |
756 } | |
757 | |
758 /* copy data from it and old_it to new_it */ | |
759 | |
760 if (comm == NREAD_APPEND) { | |
761 memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes); | |
762 memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes); | |
763 } else { | |
764 /* NREAD_PREPEND */ | |
765 memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes); | |
766 memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes); | |
767 } | |
768 | |
769 it = new_it; | |
770 } | |
771 | |
0 | 772 /* "set" commands can override the delete lock |
773 window... in which case we have to find the old hidden item | |
774 that's in the namespace/LRU but wasn't returned by | |
775 item_get.... because we need to replace it */ | |
776 if (delete_locked) | |
777 old_it = do_item_get_nocheck(key, it->nkey); | |
778 | |
779 if (old_it != NULL) | |
780 do_item_replace(old_it, it); | |
781 else | |
782 do_item_link(it); | |
783 | |
784 stored = 1; | |
785 } | |
786 | |
787 if (old_it) | |
788 do_item_remove(old_it); /* release our reference */ | |
2 | 789 if (new_it) |
790 do_item_remove(new_it); | |
791 | |
0 | 792 return stored; |
793 } | |
794 | |
795 typedef struct token_s { | |
796 char *value; | |
797 size_t length; | |
798 } token_t; | |
799 | |
800 #define COMMAND_TOKEN 0 | |
801 #define SUBCOMMAND_TOKEN 1 | |
802 #define KEY_TOKEN 1 | |
803 #define KEY_MAX_LENGTH 250 | |
804 | |
805 #define MAX_TOKENS 6 | |
806 | |
807 /* | |
808 * Tokenize the command string by replacing whitespace with '\0' and update | |
809 * the token array tokens with pointer to start of each token and length. | |
810 * Returns total number of tokens. The last valid token is the terminal | |
811 * token (value points to the first unprocessed character of the string and | |
812 * length zero). | |
813 * | |
814 * Usage example: | |
815 * | |
816 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) { | |
817 * for(int ix = 0; tokens[ix].length != 0; ix++) { | |
818 * ... | |
819 * } | |
820 * ncommand = tokens[ix].value - command; | |
821 * command = tokens[ix].value; | |
822 * } | |
823 */ | |
824 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) { | |
825 char *s, *e; | |
826 size_t ntokens = 0; | |
827 | |
828 assert(command != NULL && tokens != NULL && max_tokens > 1); | |
829 | |
830 for (s = e = command; ntokens < max_tokens - 1; ++e) { | |
831 if (*e == ' ') { | |
832 if (s != e) { | |
833 tokens[ntokens].value = s; | |
834 tokens[ntokens].length = e - s; | |
835 ntokens++; | |
836 *e = '\0'; | |
837 } | |
838 s = e + 1; | |
839 } | |
840 else if (*e == '\0') { | |
841 if (s != e) { | |
842 tokens[ntokens].value = s; | |
843 tokens[ntokens].length = e - s; | |
844 ntokens++; | |
845 } | |
846 | |
847 break; /* string end */ | |
848 } | |
849 } | |
850 | |
851 /* | |
852 * If we scanned the whole string, the terminal value pointer is null, | |
853 * otherwise it is the first unprocessed character. | |
854 */ | |
855 tokens[ntokens].value = *e == '\0' ? NULL : e; | |
856 tokens[ntokens].length = 0; | |
857 ntokens++; | |
858 | |
859 return ntokens; | |
860 } | |
861 | |
862 inline static void process_stats_detail(conn *c, const char *command) { | |
863 assert(c != NULL); | |
864 | |
865 if (strcmp(command, "on") == 0) { | |
866 settings.detail_enabled = 1; | |
867 out_string(c, "OK"); | |
868 } | |
869 else if (strcmp(command, "off") == 0) { | |
870 settings.detail_enabled = 0; | |
871 out_string(c, "OK"); | |
872 } | |
873 else if (strcmp(command, "dump") == 0) { | |
874 int len; | |
875 char *stats = stats_prefix_dump(&len); | |
876 if (NULL != stats) { | |
877 c->write_and_free = stats; | |
878 c->wcurr = stats; | |
879 c->wbytes = len; | |
880 conn_set_state(c, conn_write); | |
881 c->write_and_go = conn_read; | |
882 } | |
883 else { | |
884 out_string(c, "SERVER_ERROR"); | |
885 } | |
886 } | |
887 else { | |
888 out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump"); | |
889 } | |
890 } | |
891 | |
892 static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { | |
893 rel_time_t now = current_time; | |
894 char *command; | |
895 char *subcommand; | |
896 | |
897 assert(c != NULL); | |
898 | |
899 if(ntokens < 2) { | |
900 out_string(c, "CLIENT_ERROR bad command line"); | |
901 return; | |
902 } | |
903 | |
904 command = tokens[COMMAND_TOKEN].value; | |
905 | |
906 if (ntokens == 2 && strcmp(command, "stats") == 0) { | |
907 char temp[1024]; | |
908 pid_t pid = getpid(); | |
909 char *pos = temp; | |
910 | |
911 #ifndef WIN32 | |
912 struct rusage usage; | |
913 getrusage(RUSAGE_SELF, &usage); | |
914 #endif /* !WIN32 */ | |
915 | |
916 STATS_LOCK(); | |
917 pos += sprintf(pos, "STAT pid %u\r\n", pid); | |
918 pos += sprintf(pos, "STAT uptime %u\r\n", now); | |
919 pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started); | |
920 pos += sprintf(pos, "STAT version " VERSION "\r\n"); | |
921 pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *)); | |
922 #ifndef WIN32 | |
923 pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec); | |
924 pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec); | |
925 #endif /* !WIN32 */ | |
926 pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items); | |
927 pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items); | |
928 pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes); | |
929 pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */ | |
930 pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns); | |
931 pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs); | |
932 pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds); | |
933 pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds); | |
934 pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits); | |
935 pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses); | |
936 pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions); | |
937 pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read); | |
938 pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written); | |
939 pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes); | |
940 pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads); | |
941 pos += sprintf(pos, "END"); | |
942 STATS_UNLOCK(); | |
943 out_string(c, temp); | |
944 return; | |
945 } | |
946 | |
947 subcommand = tokens[SUBCOMMAND_TOKEN].value; | |
948 | |
949 if (strcmp(subcommand, "reset") == 0) { | |
950 stats_reset(); | |
951 out_string(c, "RESET"); | |
952 return; | |
953 } | |
954 | |
955 #ifdef HAVE_MALLOC_H | |
956 #ifdef HAVE_STRUCT_MALLINFO | |
957 if (strcmp(subcommand, "malloc") == 0) { | |
958 char temp[512]; | |
959 struct mallinfo info; | |
960 char *pos = temp; | |
961 | |
962 info = mallinfo(); | |
963 pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena); | |
964 pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks); | |
965 pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks); | |
966 pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks); | |
967 pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd); | |
968 pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks); | |
969 pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks); | |
970 pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks); | |
971 pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks); | |
972 pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost); | |
973 out_string(c, temp); | |
974 return; | |
975 } | |
976 #endif /* HAVE_STRUCT_MALLINFO */ | |
977 #endif /* HAVE_MALLOC_H */ | |
978 | |
979 #if !defined(WIN32) || !defined(__APPLE__) | |
980 if (strcmp(subcommand, "maps") == 0) { | |
981 char *wbuf; | |
982 int wsize = 8192; /* should be enough */ | |
983 int fd; | |
984 int res; | |
985 | |
986 if (!(wbuf = (char *)malloc(wsize))) { | |
987 out_string(c, "SERVER_ERROR out of memory"); | |
988 return; | |
989 } | |
990 | |
991 fd = open("/proc/self/maps", O_RDONLY); | |
992 if (fd == -1) { | |
993 out_string(c, "SERVER_ERROR cannot open the maps file"); | |
994 free(wbuf); | |
995 return; | |
996 } | |
997 | |
998 res = read(fd, wbuf, wsize - 6); /* 6 = END\r\n\0 */ | |
999 if (res == wsize - 6) { | |
1000 out_string(c, "SERVER_ERROR buffer overflow"); | |
1001 free(wbuf); close(fd); | |
1002 return; | |
1003 } | |
1004 if (res == 0 || res == -1) { | |
1005 out_string(c, "SERVER_ERROR can't read the maps file"); | |
1006 free(wbuf); close(fd); | |
1007 return; | |
1008 } | |
1009 memcpy(wbuf + res, "END\r\n", 6); | |
1010 c->write_and_free = wbuf; | |
1011 c->wcurr = wbuf; | |
1012 c->wbytes = res + 5; // Don't write the terminal '\0' | |
1013 conn_set_state(c, conn_write); | |
1014 c->write_and_go = conn_read; | |
1015 close(fd); | |
1016 return; | |
1017 } | |
1018 #endif | |
1019 | |
1020 if (strcmp(subcommand, "cachedump") == 0) { | |
1021 | |
1022 char *buf; | |
1023 unsigned int bytes, id, limit = 0; | |
1024 | |
1025 if(ntokens < 5) { | |
1026 out_string(c, "CLIENT_ERROR bad command line"); | |
1027 return; | |
1028 } | |
1029 | |
1030 id = strtoul(tokens[2].value, NULL, 10); | |
1031 limit = strtoul(tokens[3].value, NULL, 10); | |
1032 | |
1033 if(errno == ERANGE) { | |
1034 out_string(c, "CLIENT_ERROR bad command line format"); | |
1035 return; | |
1036 } | |
1037 | |
1038 buf = item_cachedump(id, limit, &bytes); | |
1039 if (buf == 0) { | |
1040 out_string(c, "SERVER_ERROR out of memory"); | |
1041 return; | |
1042 } | |
1043 | |
1044 c->write_and_free = buf; | |
1045 c->wcurr = buf; | |
1046 c->wbytes = bytes; | |
1047 conn_set_state(c, conn_write); | |
1048 c->write_and_go = conn_read; | |
1049 return; | |
1050 } | |
1051 | |
1052 if (strcmp(subcommand, "slabs") == 0) { | |
1053 int bytes = 0; | |
1054 char *buf = slabs_stats(&bytes); | |
1055 if (!buf) { | |
1056 out_string(c, "SERVER_ERROR out of memory"); | |
1057 return; | |
1058 } | |
1059 c->write_and_free = buf; | |
1060 c->wcurr = buf; | |
1061 c->wbytes = bytes; | |
1062 conn_set_state(c, conn_write); | |
1063 c->write_and_go = conn_read; | |
1064 return; | |
1065 } | |
1066 | |
1067 if (strcmp(subcommand, "items") == 0) { | |
1068 char buffer[4096]; | |
1069 item_stats(buffer, 4096); | |
1070 out_string(c, buffer); | |
1071 return; | |
1072 } | |
1073 | |
1074 if (strcmp(subcommand, "detail") == 0) { | |
1075 if (ntokens < 4) | |
1076 process_stats_detail(c, ""); /* outputs the error message */ | |
1077 else | |
1078 process_stats_detail(c, tokens[2].value); | |
1079 return; | |
1080 } | |
1081 | |
1082 if (strcmp(subcommand, "sizes") == 0) { | |
1083 int bytes = 0; | |
1084 char *buf = item_stats_sizes(&bytes); | |
1085 if (! buf) { | |
1086 out_string(c, "SERVER_ERROR out of memory"); | |
1087 return; | |
1088 } | |
1089 | |
1090 c->write_and_free = buf; | |
1091 c->wcurr = buf; | |
1092 c->wbytes = bytes; | |
1093 conn_set_state(c, conn_write); | |
1094 c->write_and_go = conn_read; | |
1095 return; | |
1096 } | |
1097 | |
1098 out_string(c, "ERROR"); | |
1099 } | |
1100 | |
1101 /* ntokens is overwritten here... shrug.. */ | |
1102 static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) { | |
1103 char *key; | |
1104 size_t nkey; | |
1105 int i = 0; | |
1106 item *it; | |
1107 token_t *key_token = &tokens[KEY_TOKEN]; | |
1108 | |
1109 assert(c != NULL); | |
1110 | |
1111 if (settings.managed) { | |
1112 int bucket = c->bucket; | |
1113 if (bucket == -1) { | |
1114 out_string(c, "CLIENT_ERROR no BG data in managed mode"); | |
1115 return; | |
1116 } | |
1117 c->bucket = -1; | |
1118 if (buckets[bucket] != c->gen) { | |
1119 out_string(c, "ERROR_NOT_OWNER"); | |
1120 return; | |
1121 } | |
1122 } | |
1123 | |
1124 do { | |
1125 while(key_token->length != 0) { | |
1126 | |
1127 key = key_token->value; | |
1128 nkey = key_token->length; | |
1129 | |
1130 if(nkey > KEY_MAX_LENGTH) { | |
1131 out_string(c, "CLIENT_ERROR bad command line format"); | |
1132 return; | |
1133 } | |
1134 | |
1135 STATS_LOCK(); | |
1136 stats.get_cmds++; | |
1137 STATS_UNLOCK(); | |
1138 it = item_get(key, nkey); | |
1139 if (settings.detail_enabled) { | |
1140 stats_prefix_record_get(key, NULL != it); | |
1141 } | |
1142 if (it) { | |
1143 if (i >= c->isize) { | |
1144 item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2); | |
1145 if (new_list) { | |
1146 c->isize *= 2; | |
1147 c->ilist = new_list; | |
1148 } else break; | |
1149 } | |
1150 | |
1151 /* | |
1152 * Construct the response. Each hit adds three elements to the | |
1153 * outgoing data list: | |
1154 * "VALUE " | |
1155 * key | |
1156 * " " + flags + " " + data length + "\r\n" + data (with \r\n) | |
1157 */ | |
1158 if (add_iov(c, "VALUE ", 6) != 0 || | |
1159 add_iov(c, ITEM_key(it), it->nkey) != 0 || | |
1160 add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0) | |
1161 { | |
1162 break; | |
1163 } | |
1164 if (settings.verbose > 1) | |
1165 fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it)); | |
1166 | |
1167 /* item_get() has incremented it->refcount for us */ | |
1168 STATS_LOCK(); | |
1169 stats.get_hits++; | |
1170 STATS_UNLOCK(); | |
1171 item_update(it); | |
1172 *(c->ilist + i) = it; | |
1173 i++; | |
1174 | |
1175 } else { | |
1176 STATS_LOCK(); | |
1177 stats.get_misses++; | |
1178 STATS_UNLOCK(); | |
1179 } | |
1180 | |
1181 key_token++; | |
1182 } | |
1183 | |
1184 /* | |
1185 * If the command string hasn't been fully processed, get the next set | |
1186 * of tokens. | |
1187 */ | |
1188 if(key_token->value != NULL) { | |
1189 ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS); | |
1190 key_token = tokens; | |
1191 } | |
1192 | |
1193 } while(key_token->value != NULL); | |
1194 | |
1195 c->icurr = c->ilist; | |
1196 c->ileft = i; | |
1197 | |
1198 if (settings.verbose > 1) | |
1199 fprintf(stderr, ">%d END\n", c->sfd); | |
1200 add_iov(c, "END\r\n", 5); | |
1201 | |
1202 if (c->udp && build_udp_headers(c) != 0) { | |
1203 out_string(c, "SERVER_ERROR out of memory"); | |
1204 } | |
1205 else { | |
1206 conn_set_state(c, conn_mwrite); | |
1207 c->msgcurr = 0; | |
1208 } | |
1209 return; | |
1210 } | |
1211 | |
1212 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm) { | |
1213 char *key; | |
1214 size_t nkey; | |
1215 int flags; | |
1216 time_t exptime; | |
1217 int vlen; | |
1218 item *it; | |
1219 | |
1220 assert(c != NULL); | |
1221 | |
1222 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { | |
1223 out_string(c, "CLIENT_ERROR bad command line format"); | |
1224 return; | |
1225 } | |
1226 | |
1227 key = tokens[KEY_TOKEN].value; | |
1228 nkey = tokens[KEY_TOKEN].length; | |
1229 | |
1230 flags = strtoul(tokens[2].value, NULL, 10); | |
1231 exptime = strtol(tokens[3].value, NULL, 10); | |
1232 vlen = strtol(tokens[4].value, NULL, 10); | |
1233 | |
1234 if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) { | |
1235 out_string(c, "CLIENT_ERROR bad command line format"); | |
1236 return; | |
1237 } | |
1238 | |
1239 if (settings.detail_enabled) { | |
1240 stats_prefix_record_set(key); | |
1241 } | |
1242 | |
1243 if (settings.managed) { | |
1244 int bucket = c->bucket; | |
1245 if (bucket == -1) { | |
1246 out_string(c, "CLIENT_ERROR no BG data in managed mode"); | |
1247 return; | |
1248 } | |
1249 c->bucket = -1; | |
1250 if (buckets[bucket] != c->gen) { | |
1251 out_string(c, "ERROR_NOT_OWNER"); | |
1252 return; | |
1253 } | |
1254 } | |
1255 | |
1256 it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2); | |
1257 | |
1258 if (it == 0) { | |
1259 if (! item_size_ok(nkey, flags, vlen + 2)) | |
1260 out_string(c, "SERVER_ERROR object too large for cache"); | |
1261 else | |
1262 out_string(c, "SERVER_ERROR out of memory"); | |
1263 /* swallow the data line */ | |
1264 c->write_and_go = conn_swallow; | |
1265 c->sbytes = vlen + 2; | |
1266 return; | |
1267 } | |
1268 | |
1269 c->item_comm = comm; | |
1270 c->item = it; | |
1271 c->ritem = ITEM_data(it); | |
1272 c->rlbytes = it->nbytes; | |
1273 conn_set_state(c, conn_nread); | |
1274 } | |
1275 | |
1276 static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) { | |
1277 char temp[32]; | |
1278 item *it; | |
1279 unsigned int delta; | |
1280 char *key; | |
1281 size_t nkey; | |
1282 | |
1283 assert(c != NULL); | |
1284 | |
1285 if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { | |
1286 out_string(c, "CLIENT_ERROR bad command line format"); | |
1287 return; | |
1288 } | |
1289 | |
1290 key = tokens[KEY_TOKEN].value; | |
1291 nkey = tokens[KEY_TOKEN].length; | |
1292 | |
1293 if (settings.managed) { | |
1294 int bucket = c->bucket; | |
1295 if (bucket == -1) { | |
1296 out_string(c, "CLIENT_ERROR no BG data in managed mode"); | |
1297 return; | |
1298 } | |
1299 c->bucket = -1; | |
1300 if (buckets[bucket] != c->gen) { | |
1301 out_string(c, "ERROR_NOT_OWNER"); | |
1302 return; | |
1303 } | |
1304 } | |
1305 | |
1306 delta = strtoul(tokens[2].value, NULL, 10); | |
1307 | |
1308 if(errno == ERANGE) { | |
1309 out_string(c, "CLIENT_ERROR bad command line format"); | |
1310 return; | |
1311 } | |
1312 | |
1313 it = item_get(key, nkey); | |
1314 if (!it) { | |
1315 out_string(c, "NOT_FOUND"); | |
1316 return; | |
1317 } | |
1318 | |
1319 out_string(c, add_delta(it, incr, delta, temp)); | |
1320 item_remove(it); /* release our reference */ | |
1321 } | |
1322 | |
1323 /* | |
1324 * adds a delta value to a numeric item. | |
1325 * | |
1326 * it item to adjust | |
1327 * incr true to increment value, false to decrement | |
1328 * delta amount to adjust value by | |
1329 * buf buffer for response string | |
1330 * | |
1331 * returns a response string to send back to the client. | |
1332 */ | |
1333 char *do_add_delta(item *it, const int incr, unsigned int delta, char *buf) { | |
1334 char *ptr; | |
1335 unsigned int value; | |
1336 int res; | |
1337 | |
1338 ptr = ITEM_data(it); | |
1339 while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++; // BUG: can't be true | |
1340 | |
1341 value = strtol(ptr, NULL, 10); | |
1342 | |
1343 if(errno == ERANGE) { | |
1344 return "CLIENT_ERROR cannot increment or decrement non-numeric value"; | |
1345 } | |
1346 | |
1347 if (incr != 0) | |
1348 value += delta; | |
1349 else { | |
1350 if (delta >= value) value = 0; | |
1351 else value -= delta; | |
1352 } | |
1353 snprintf(buf, 32, "%u", value); | |
1354 res = strlen(buf); | |
1355 if (res + 2 > it->nbytes) { /* need to realloc */ | |
1356 item *new_it; | |
1357 new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 ); | |
1358 if (new_it == 0) { | |
1359 return "SERVER_ERROR out of memory"; | |
1360 } | |
1361 memcpy(ITEM_data(new_it), buf, res); | |
1362 memcpy(ITEM_data(new_it) + res, "\r\n", 3); | |
1363 do_item_replace(it, new_it); | |
1364 do_item_remove(new_it); /* release our reference */ | |
1365 } else { /* replace in-place */ | |
1366 memcpy(ITEM_data(it), buf, res); | |
1367 memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2); | |
1368 } | |
1369 | |
1370 return buf; | |
1371 } | |
1372 | |
1373 static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) { | |
1374 char *key; | |
1375 size_t nkey; | |
1376 item *it; | |
1377 time_t exptime = 0; | |
1378 | |
1379 assert(c != NULL); | |
1380 | |
1381 if (settings.managed) { | |
1382 int bucket = c->bucket; | |
1383 if (bucket == -1) { | |
1384 out_string(c, "CLIENT_ERROR no BG data in managed mode"); | |
1385 return; | |
1386 } | |
1387 c->bucket = -1; | |
1388 if (buckets[bucket] != c->gen) { | |
1389 out_string(c, "ERROR_NOT_OWNER"); | |
1390 return; | |
1391 } | |
1392 } | |
1393 | |
1394 key = tokens[KEY_TOKEN].value; | |
1395 nkey = tokens[KEY_TOKEN].length; | |
1396 | |
1397 if(nkey > KEY_MAX_LENGTH) { | |
1398 out_string(c, "CLIENT_ERROR bad command line format"); | |
1399 return; | |
1400 } | |
1401 | |
1402 if(ntokens == 4) { | |
1403 exptime = strtol(tokens[2].value, NULL, 10); | |
1404 | |
1405 if(errno == ERANGE) { | |
1406 out_string(c, "CLIENT_ERROR bad command line format"); | |
1407 return; | |
1408 } | |
1409 } | |
1410 | |
1411 if (settings.detail_enabled) { | |
1412 stats_prefix_record_delete(key); | |
1413 } | |
1414 | |
1415 it = item_get(key, nkey); | |
1416 if (it) { | |
1417 if (exptime == 0) { | |
1418 item_unlink(it); | |
1419 item_remove(it); /* release our reference */ | |
1420 out_string(c, "DELETED"); | |
1421 } else { | |
1422 /* our reference will be transfered to the delete queue */ | |
1423 out_string(c, defer_delete(it, exptime)); | |
1424 } | |
1425 } else { | |
1426 out_string(c, "NOT_FOUND"); | |
1427 } | |
1428 } | |
1429 | |
1430 /* | |
1431 * Adds an item to the deferred-delete list so it can be reaped later. | |
1432 * | |
1433 * Returns the result to send to the client. | |
1434 */ | |
1435 char *do_defer_delete(item *it, time_t exptime) | |
1436 { | |
1437 if (delcurr >= deltotal) { | |
1438 item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2); | |
1439 if (new_delete) { | |
1440 todelete = new_delete; | |
1441 deltotal *= 2; | |
1442 } else { | |
1443 /* | |
1444 * can't delete it immediately, user wants a delay, | |
1445 * but we ran out of memory for the delete queue | |
1446 */ | |
1447 item_remove(it); /* release reference */ | |
1448 return "SERVER_ERROR out of memory"; | |
1449 } | |
1450 } | |
1451 | |
1452 /* use its expiration time as its deletion time now */ | |
1453 it->exptime = realtime(exptime); | |
1454 it->it_flags |= ITEM_DELETED; | |
1455 todelete[delcurr++] = it; | |
1456 | |
1457 return "DELETED"; | |
1458 } | |
1459 | |
1460 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) { | |
1461 unsigned int level; | |
1462 | |
1463 assert(c != NULL); | |
1464 | |
1465 level = strtoul(tokens[1].value, NULL, 10); | |
1466 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level; | |
1467 out_string(c, "OK"); | |
1468 return; | |
1469 } | |
1470 | |
1471 static void process_command(conn *c, char *command) { | |
1472 | |
1473 token_t tokens[MAX_TOKENS]; | |
1474 size_t ntokens; | |
1475 int comm; | |
1476 | |
1477 assert(c != NULL); | |
1478 | |
1479 if (settings.verbose > 1) | |
1480 fprintf(stderr, "<%d %s\n", c->sfd, command); | |
1481 | |
1482 /* | |
1483 * for commands set/add/replace, we build an item and read the data | |
1484 * directly into it, then continue in nread_complete(). | |
1485 */ | |
1486 | |
1487 c->msgcurr = 0; | |
1488 c->msgused = 0; | |
1489 c->iovused = 0; | |
1490 if (add_msghdr(c) != 0) { | |
1491 out_string(c, "SERVER_ERROR out of memory"); | |
1492 return; | |
1493 } | |
1494 | |
1495 ntokens = tokenize_command(command, tokens, MAX_TOKENS); | |
1496 | |
1497 if (ntokens >= 3 && | |
1498 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) || | |
1499 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) { | |
1500 | |
1501 process_get_command(c, tokens, ntokens); | |
1502 | |
1503 } else if (ntokens == 6 && | |
1504 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) || | |
1505 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) || | |
2 | 1506 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) || |
1507 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) || | |
0 | 1508 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) { |
1509 | |
1510 process_update_command(c, tokens, ntokens, comm); | |
1511 | |
1512 } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { | |
1513 | |
1514 process_arithmetic_command(c, tokens, ntokens, 1); | |
1515 | |
1516 } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) { | |
1517 | |
1518 process_arithmetic_command(c, tokens, ntokens, 0); | |
1519 | |
1520 } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) { | |
1521 | |
1522 process_delete_command(c, tokens, ntokens); | |
1523 | |
1524 } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) { | |
1525 unsigned int bucket, gen; | |
1526 if (!settings.managed) { | |
1527 out_string(c, "CLIENT_ERROR not a managed instance"); | |
1528 return; | |
1529 } | |
1530 | |
1531 if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) { | |
1532 if ((bucket < 0) || (bucket >= MAX_BUCKETS)) { | |
1533 out_string(c, "CLIENT_ERROR bucket number out of range"); | |
1534 return; | |
1535 } | |
1536 buckets[bucket] = gen; | |
1537 out_string(c, "OWNED"); | |
1538 return; | |
1539 } else { | |
1540 out_string(c, "CLIENT_ERROR bad format"); | |
1541 return; | |
1542 } | |
1543 | |
1544 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) { | |
1545 | |
1546 int bucket; | |
1547 if (!settings.managed) { | |
1548 out_string(c, "CLIENT_ERROR not a managed instance"); | |
1549 return; | |
1550 } | |
1551 if (sscanf(tokens[1].value, "%u", &bucket) == 1) { | |
1552 if ((bucket < 0) || (bucket >= MAX_BUCKETS)) { | |
1553 out_string(c, "CLIENT_ERROR bucket number out of range"); | |
1554 return; | |
1555 } | |
1556 buckets[bucket] = 0; | |
1557 out_string(c, "DISOWNED"); | |
1558 return; | |
1559 } else { | |
1560 out_string(c, "CLIENT_ERROR bad format"); | |
1561 return; | |
1562 } | |
1563 | |
1564 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) { | |
1565 int bucket, gen; | |
1566 if (!settings.managed) { | |
1567 out_string(c, "CLIENT_ERROR not a managed instance"); | |
1568 return; | |
1569 } | |
1570 if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) { | |
1571 /* we never write anything back, even if input's wrong */ | |
1572 if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) { | |
1573 /* do nothing, bad input */ | |
1574 } else { | |
1575 c->bucket = bucket; | |
1576 c->gen = gen; | |
1577 } | |
1578 conn_set_state(c, conn_read); | |
1579 return; | |
1580 } else { | |
1581 out_string(c, "CLIENT_ERROR bad format"); | |
1582 return; | |
1583 } | |
1584 | |
1585 } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) { | |
1586 | |
1587 process_stat(c, tokens, ntokens); | |
1588 | |
1589 } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) { | |
1590 time_t exptime = 0; | |
1591 set_current_time(); | |
1592 | |
1593 if(ntokens == 2) { | |
1594 settings.oldest_live = current_time - 1; | |
1595 item_flush_expired(); | |
1596 out_string(c, "OK"); | |
1597 return; | |
1598 } | |
1599 | |
1600 exptime = strtol(tokens[1].value, NULL, 10); | |
1601 if(errno == ERANGE) { | |
1602 out_string(c, "CLIENT_ERROR bad command line format"); | |
1603 return; | |
1604 } | |
1605 | |
1606 settings.oldest_live = realtime(exptime) - 1; | |
1607 item_flush_expired(); | |
1608 out_string(c, "OK"); | |
1609 return; | |
1610 | |
1611 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) { | |
1612 | |
1613 out_string(c, "VERSION " VERSION); | |
1614 | |
1615 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) { | |
1616 | |
1617 conn_set_state(c, conn_closing); | |
1618 | |
1619 } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 && | |
1620 strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) { | |
1621 #ifdef ALLOW_SLABS_REASSIGN | |
1622 | |
1623 int src, dst, rv; | |
1624 | |
1625 src = strtol(tokens[2].value, NULL, 10); | |
1626 dst = strtol(tokens[3].value, NULL, 10); | |
1627 | |
1628 if(errno == ERANGE) { | |
1629 out_string(c, "CLIENT_ERROR bad command line format"); | |
1630 return; | |
1631 } | |
1632 | |
1633 rv = slabs_reassign(src, dst); | |
1634 if (rv == 1) { | |
1635 out_string(c, "DONE"); | |
1636 return; | |
1637 } | |
1638 if (rv == 0) { | |
1639 out_string(c, "CANT"); | |
1640 return; | |
1641 } | |
1642 if (rv == -1) { | |
1643 out_string(c, "BUSY"); | |
1644 return; | |
1645 } | |
1646 #else | |
1647 out_string(c, "CLIENT_ERROR Slab reassignment not supported"); | |
1648 #endif | |
1649 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) { | |
1650 process_verbosity_command(c, tokens, ntokens); | |
1651 } else { | |
1652 out_string(c, "ERROR"); | |
1653 } | |
1654 return; | |
1655 } | |
1656 | |
1657 /* | |
1658 * if we have a complete line in the buffer, process it. | |
1659 */ | |
1660 static int try_read_command(conn *c) { | |
1661 char *el, *cont; | |
1662 | |
1663 assert(c != NULL); | |
1664 assert(c->rcurr <= (c->rbuf + c->rsize)); | |
1665 | |
1666 if (c->rbytes == 0) | |
1667 return 0; | |
1668 el = memchr(c->rcurr, '\n', c->rbytes); | |
1669 if (!el) | |
1670 return 0; | |
1671 cont = el + 1; | |
1672 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { | |
1673 el--; | |
1674 } | |
1675 *el = '\0'; | |
1676 | |
1677 assert(cont <= (c->rcurr + c->rbytes)); | |
1678 | |
1679 process_command(c, c->rcurr); | |
1680 | |
1681 c->rbytes -= (cont - c->rcurr); | |
1682 c->rcurr = cont; | |
1683 | |
1684 assert(c->rcurr <= (c->rbuf + c->rsize)); | |
1685 | |
1686 return 1; | |
1687 } | |
1688 | |
1689 /* | |
1690 * read a UDP request. | |
1691 * return 0 if there's nothing to read. | |
1692 */ | |
1693 static int try_read_udp(conn *c) { | |
1694 int res; | |
1695 | |
1696 assert(c != NULL); | |
1697 | |
1698 c->request_addr_size = sizeof(c->request_addr); | |
1699 res = recvfrom(c->sfd, c->rbuf, c->rsize, | |
1700 0, &c->request_addr, &c->request_addr_size); | |
1701 if (res > 8) { | |
1702 unsigned char *buf = (unsigned char *)c->rbuf; | |
1703 STATS_LOCK(); | |
1704 stats.bytes_read += res; | |
1705 STATS_UNLOCK(); | |
1706 | |
1707 /* Beginning of UDP packet is the request ID; save it. */ | |
1708 c->request_id = buf[0] * 256 + buf[1]; | |
1709 | |
1710 /* If this is a multi-packet request, drop it. */ | |
1711 if (buf[4] != 0 || buf[5] != 1) { | |
1712 out_string(c, "SERVER_ERROR multi-packet request not supported"); | |
1713 return 0; | |
1714 } | |
1715 | |
1716 /* Don't care about any of the rest of the header. */ | |
1717 res -= 8; | |
1718 memmove(c->rbuf, c->rbuf + 8, res); | |
1719 | |
1720 c->rbytes += res; | |
1721 c->rcurr = c->rbuf; | |
1722 return 1; | |
1723 } | |
1724 return 0; | |
1725 } | |
1726 | |
1727 /* | |
1728 * read from network as much as we can, handle buffer overflow and connection | |
1729 * close. | |
1730 * before reading, move the remaining incomplete fragment of a command | |
1731 * (if any) to the beginning of the buffer. | |
1732 * return 0 if there's nothing to read on the first read. | |
1733 */ | |
1734 static int try_read_network(conn *c) { | |
1735 int gotdata = 0; | |
1736 int res; | |
1737 | |
1738 assert(c != NULL); | |
1739 | |
1740 if (c->rcurr != c->rbuf) { | |
1741 if (c->rbytes != 0) /* otherwise there's nothing to copy */ | |
1742 memmove(c->rbuf, c->rcurr, c->rbytes); | |
1743 c->rcurr = c->rbuf; | |
1744 } | |
1745 | |
1746 while (1) { | |
1747 if (c->rbytes >= c->rsize) { | |
1748 char *new_rbuf = realloc(c->rbuf, c->rsize * 2); | |
1749 if (!new_rbuf) { | |
1750 if (settings.verbose > 0) | |
1751 fprintf(stderr, "Couldn't realloc input buffer\n"); | |
1752 c->rbytes = 0; /* ignore what we read */ | |
1753 out_string(c, "SERVER_ERROR out of memory"); | |
1754 c->write_and_go = conn_closing; | |
1755 return 1; | |
1756 } | |
1757 c->rcurr = c->rbuf = new_rbuf; | |
1758 c->rsize *= 2; | |
1759 } | |
1760 | |
1761 /* unix socket mode doesn't need this, so zeroed out. but why | |
1762 * is this done for every command? presumably for UDP | |
1763 * mode. */ | |
1764 if (!settings.socketpath) { | |
1765 c->request_addr_size = sizeof(c->request_addr); | |
1766 } else { | |
1767 c->request_addr_size = 0; | |
1768 } | |
1769 | |
1770 res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes); | |
1771 if (res > 0) { | |
1772 STATS_LOCK(); | |
1773 stats.bytes_read += res; | |
1774 STATS_UNLOCK(); | |
1775 gotdata = 1; | |
1776 c->rbytes += res; | |
1777 continue; | |
1778 } | |
1779 if (res == 0) { | |
1780 /* connection closed */ | |
1781 conn_set_state(c, conn_closing); | |
1782 return 1; | |
1783 } | |
1784 if (res == -1) { | |
1785 if (errno == EAGAIN || errno == EWOULDBLOCK) break; | |
1786 else return 0; | |
1787 } | |
1788 } | |
1789 return gotdata; | |
1790 } | |
1791 | |
1792 static bool update_event(conn *c, const int new_flags) { | |
1793 assert(c != NULL); | |
1794 | |
1795 struct event_base *base = c->event.ev_base; | |
1796 if (c->ev_flags == new_flags) | |
1797 return true; | |
1798 if (event_del(&c->event) == -1) return false; | |
1799 event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c); | |
1800 event_base_set(base, &c->event); | |
1801 c->ev_flags = new_flags; | |
1802 if (event_add(&c->event, 0) == -1) return false; | |
1803 return true; | |
1804 } | |
1805 | |
1806 /* | |
1807 * Sets whether we are listening for new connections or not. | |
1808 */ | |
1809 void accept_new_conns(const bool do_accept) { | |
1810 if (! is_listen_thread()) | |
1811 return; | |
1812 if (do_accept) { | |
1813 update_event(listen_conn, EV_READ | EV_PERSIST); | |
1814 if (listen(listen_conn->sfd, 1024) != 0) { | |
1815 perror("listen"); | |
1816 } | |
1817 } | |
1818 else { | |
1819 update_event(listen_conn, 0); | |
1820 if (listen(listen_conn->sfd, 0) != 0) { | |
1821 perror("listen"); | |
1822 } | |
1823 } | |
1824 } | |
1825 | |
1826 | |
1827 /* | |
1828 * Transmit the next chunk of data from our list of msgbuf structures. | |
1829 * | |
1830 * Returns: | |
1831 * TRANSMIT_COMPLETE All done writing. | |
1832 * TRANSMIT_INCOMPLETE More data remaining to write. | |
1833 * TRANSMIT_SOFT_ERROR Can't write any more right now. | |
1834 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing) | |
1835 */ | |
1836 static int transmit(conn *c) { | |
1837 assert(c != NULL); | |
1838 | |
1839 if (c->msgcurr < c->msgused && | |
1840 c->msglist[c->msgcurr].msg_iovlen == 0) { | |
1841 /* Finished writing the current msg; advance to the next. */ | |
1842 c->msgcurr++; | |
1843 } | |
1844 if (c->msgcurr < c->msgused) { | |
1845 ssize_t res; | |
1846 struct msghdr *m = &c->msglist[c->msgcurr]; | |
1847 | |
1848 res = sendmsg(c->sfd, m, 0); | |
1849 if (res > 0) { | |
1850 STATS_LOCK(); | |
1851 stats.bytes_written += res; | |
1852 STATS_UNLOCK(); | |
1853 | |
1854 /* We've written some of the data. Remove the completed | |
1855 iovec entries from the list of pending writes. */ | |
1856 while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) { | |
1857 res -= m->msg_iov->iov_len; | |
1858 m->msg_iovlen--; | |
1859 m->msg_iov++; | |
1860 } | |
1861 | |
1862 /* Might have written just part of the last iovec entry; | |
1863 adjust it so the next write will do the rest. */ | |
1864 if (res > 0) { | |
1865 m->msg_iov->iov_base += res; | |
1866 m->msg_iov->iov_len -= res; | |
1867 } | |
1868 return TRANSMIT_INCOMPLETE; | |
1869 } | |
1870 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { | |
1871 if (!update_event(c, EV_WRITE | EV_PERSIST)) { | |
1872 if (settings.verbose > 0) | |
1873 fprintf(stderr, "Couldn't update event\n"); | |
1874 conn_set_state(c, conn_closing); | |
1875 return TRANSMIT_HARD_ERROR; | |
1876 } | |
1877 return TRANSMIT_SOFT_ERROR; | |
1878 } | |
1879 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK, | |
1880 we have a real error, on which we close the connection */ | |
1881 if (settings.verbose > 0) | |
1882 perror("Failed to write, and not due to blocking"); | |
1883 | |
1884 if (c->udp) | |
1885 conn_set_state(c, conn_read); | |
1886 else | |
1887 conn_set_state(c, conn_closing); | |
1888 return TRANSMIT_HARD_ERROR; | |
1889 } else { | |
1890 return TRANSMIT_COMPLETE; | |
1891 } | |
1892 } | |
1893 | |
1894 static void drive_machine(conn *c) { | |
1895 bool stop = false; | |
1896 int sfd, flags = 1; | |
1897 socklen_t addrlen; | |
1898 struct sockaddr addr; | |
1899 int res; | |
1900 | |
1901 assert(c != NULL); | |
1902 | |
1903 while (!stop) { | |
1904 | |
1905 switch(c->state) { | |
1906 case conn_listening: | |
1907 addrlen = sizeof(addr); | |
1908 if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) { | |
1909 if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
1910 /* these are transient, so don't log anything */ | |
1911 stop = true; | |
1912 } else if (errno == EMFILE) { | |
1913 if (settings.verbose > 0) | |
1914 fprintf(stderr, "Too many open connections\n"); | |
1915 accept_new_conns(false); | |
1916 stop = true; | |
1917 } else { | |
1918 perror("accept()"); | |
1919 stop = true; | |
1920 } | |
1921 break; | |
1922 } | |
1923 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || | |
1924 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { | |
1925 perror("setting O_NONBLOCK"); | |
1926 close(sfd); | |
1927 break; | |
1928 } | |
1929 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, | |
1930 DATA_BUFFER_SIZE, false); | |
1931 break; | |
1932 | |
1933 case conn_read: | |
1934 if (try_read_command(c) != 0) { | |
1935 continue; | |
1936 } | |
1937 if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) { | |
1938 continue; | |
1939 } | |
1940 /* we have no command line and no data to read from network */ | |
1941 if (!update_event(c, EV_READ | EV_PERSIST)) { | |
1942 if (settings.verbose > 0) | |
1943 fprintf(stderr, "Couldn't update event\n"); | |
1944 conn_set_state(c, conn_closing); | |
1945 break; | |
1946 } | |
1947 stop = true; | |
1948 break; | |
1949 | |
1950 case conn_nread: | |
1951 /* we are reading rlbytes into ritem; */ | |
1952 if (c->rlbytes == 0) { | |
1953 complete_nread(c); | |
1954 break; | |
1955 } | |
1956 /* first check if we have leftovers in the conn_read buffer */ | |
1957 if (c->rbytes > 0) { | |
1958 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; | |
1959 memcpy(c->ritem, c->rcurr, tocopy); | |
1960 c->ritem += tocopy; | |
1961 c->rlbytes -= tocopy; | |
1962 c->rcurr += tocopy; | |
1963 c->rbytes -= tocopy; | |
1964 break; | |
1965 } | |
1966 | |
1967 /* now try reading from the socket */ | |
1968 res = read(c->sfd, c->ritem, c->rlbytes); | |
1969 if (res > 0) { | |
1970 STATS_LOCK(); | |
1971 stats.bytes_read += res; | |
1972 STATS_UNLOCK(); | |
1973 c->ritem += res; | |
1974 c->rlbytes -= res; | |
1975 break; | |
1976 } | |
1977 if (res == 0) { /* end of stream */ | |
1978 conn_set_state(c, conn_closing); | |
1979 break; | |
1980 } | |
1981 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { | |
1982 if (!update_event(c, EV_READ | EV_PERSIST)) { | |
1983 if (settings.verbose > 0) | |
1984 fprintf(stderr, "Couldn't update event\n"); | |
1985 conn_set_state(c, conn_closing); | |
1986 break; | |
1987 } | |
1988 stop = true; | |
1989 break; | |
1990 } | |
1991 /* otherwise we have a real error, on which we close the connection */ | |
1992 if (settings.verbose > 0) | |
1993 fprintf(stderr, "Failed to read, and not due to blocking\n"); | |
1994 conn_set_state(c, conn_closing); | |
1995 break; | |
1996 | |
1997 case conn_swallow: | |
1998 /* we are reading sbytes and throwing them away */ | |
1999 if (c->sbytes == 0) { | |
2000 conn_set_state(c, conn_read); | |
2001 break; | |
2002 } | |
2003 | |
2004 /* first check if we have leftovers in the conn_read buffer */ | |
2005 if (c->rbytes > 0) { | |
2006 int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes; | |
2007 c->sbytes -= tocopy; | |
2008 c->rcurr += tocopy; | |
2009 c->rbytes -= tocopy; | |
2010 break; | |
2011 } | |
2012 | |
2013 /* now try reading from the socket */ | |
2014 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize); | |
2015 if (res > 0) { | |
2016 STATS_LOCK(); | |
2017 stats.bytes_read += res; | |
2018 STATS_UNLOCK(); | |
2019 c->sbytes -= res; | |
2020 break; | |
2021 } | |
2022 if (res == 0) { /* end of stream */ | |
2023 conn_set_state(c, conn_closing); | |
2024 break; | |
2025 } | |
2026 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { | |
2027 if (!update_event(c, EV_READ | EV_PERSIST)) { | |
2028 if (settings.verbose > 0) | |
2029 fprintf(stderr, "Couldn't update event\n"); | |
2030 conn_set_state(c, conn_closing); | |
2031 break; | |
2032 } | |
2033 stop = true; | |
2034 break; | |
2035 } | |
2036 /* otherwise we have a real error, on which we close the connection */ | |
2037 if (settings.verbose > 0) | |
2038 fprintf(stderr, "Failed to read, and not due to blocking\n"); | |
2039 conn_set_state(c, conn_closing); | |
2040 break; | |
2041 | |
2042 case conn_write: | |
2043 /* | |
2044 * We want to write out a simple response. If we haven't already, | |
2045 * assemble it into a msgbuf list (this will be a single-entry | |
2046 * list for TCP or a two-entry list for UDP). | |
2047 */ | |
2048 if (c->iovused == 0 || (c->udp && c->iovused == 1)) { | |
2049 if (add_iov(c, c->wcurr, c->wbytes) != 0 || | |
2050 (c->udp && build_udp_headers(c) != 0)) { | |
2051 if (settings.verbose > 0) | |
2052 fprintf(stderr, "Couldn't build response\n"); | |
2053 conn_set_state(c, conn_closing); | |
2054 break; | |
2055 } | |
2056 } | |
2057 | |
2058 /* fall through... */ | |
2059 | |
2060 case conn_mwrite: | |
2061 switch (transmit(c)) { | |
2062 case TRANSMIT_COMPLETE: | |
2063 if (c->state == conn_mwrite) { | |
2064 while (c->ileft > 0) { | |
2065 item *it = *(c->icurr); | |
2066 assert((it->it_flags & ITEM_SLABBED) == 0); | |
2067 item_remove(it); | |
2068 c->icurr++; | |
2069 c->ileft--; | |
2070 } | |
2071 conn_set_state(c, conn_read); | |
2072 } else if (c->state == conn_write) { | |
2073 if (c->write_and_free) { | |
2074 free(c->write_and_free); | |
2075 c->write_and_free = 0; | |
2076 } | |
2077 conn_set_state(c, c->write_and_go); | |
2078 } else { | |
2079 if (settings.verbose > 0) | |
2080 fprintf(stderr, "Unexpected state %d\n", c->state); | |
2081 conn_set_state(c, conn_closing); | |
2082 } | |
2083 break; | |
2084 | |
2085 case TRANSMIT_INCOMPLETE: | |
2086 case TRANSMIT_HARD_ERROR: | |
2087 break; /* Continue in state machine. */ | |
2088 | |
2089 case TRANSMIT_SOFT_ERROR: | |
2090 stop = true; | |
2091 break; | |
2092 } | |
2093 break; | |
2094 | |
2095 case conn_closing: | |
2096 if (c->udp) | |
2097 conn_cleanup(c); | |
2098 else | |
2099 conn_close(c); | |
2100 stop = true; | |
2101 break; | |
2102 } | |
2103 } | |
2104 | |
2105 return; | |
2106 } | |
2107 | |
2108 void event_handler(const int fd, const short which, void *arg) { | |
2109 conn *c; | |
2110 | |
2111 c = (conn *)arg; | |
2112 assert(c != NULL); | |
2113 | |
2114 c->which = which; | |
2115 | |
2116 /* sanity */ | |
2117 if (fd != c->sfd) { | |
2118 if (settings.verbose > 0) | |
2119 fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n"); | |
2120 conn_close(c); | |
2121 return; | |
2122 } | |
2123 | |
2124 drive_machine(c); | |
2125 | |
2126 /* wait for next event */ | |
2127 return; | |
2128 } | |
2129 | |
2130 static int new_socket(const bool is_udp) { | |
2131 int sfd; | |
2132 int flags; | |
2133 | |
2134 if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) { | |
2135 perror("socket()"); | |
2136 return -1; | |
2137 } | |
2138 | |
2139 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || | |
2140 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { | |
2141 perror("setting O_NONBLOCK"); | |
2142 close(sfd); | |
2143 return -1; | |
2144 } | |
2145 return sfd; | |
2146 } | |
2147 | |
2148 | |
2149 /* | |
2150 * Sets a socket's send buffer size to the maximum allowed by the system. | |
2151 */ | |
2152 static void maximize_sndbuf(const int sfd) { | |
2153 socklen_t intsize = sizeof(int); | |
2154 int last_good = 0; | |
2155 int min, max, avg; | |
2156 int old_size; | |
2157 | |
2158 /* Start with the default size. */ | |
2159 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) { | |
2160 if (settings.verbose > 0) | |
2161 perror("getsockopt(SO_SNDBUF)"); | |
2162 return; | |
2163 } | |
2164 | |
2165 /* Binary-search for the real maximum. */ | |
2166 min = old_size; | |
2167 max = MAX_SENDBUF_SIZE; | |
2168 | |
2169 while (min <= max) { | |
2170 avg = ((unsigned int)(min + max)) / 2; | |
2171 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) { | |
2172 last_good = avg; | |
2173 min = avg + 1; | |
2174 } else { | |
2175 max = avg - 1; | |
2176 } | |
2177 } | |
2178 | |
2179 if (settings.verbose > 1) | |
2180 fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good); | |
2181 } | |
2182 | |
2183 | |
2184 static int server_socket(const int port, const bool is_udp) { | |
2185 int sfd; | |
2186 struct linger ling = {0, 0}; | |
2187 struct sockaddr_in addr; | |
2188 int flags =1; | |
2189 | |
2190 if ((sfd = new_socket(is_udp)) == -1) { | |
2191 return -1; | |
2192 } | |
2193 | |
2194 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); | |
2195 if (is_udp) { | |
2196 maximize_sndbuf(sfd); | |
2197 } else { | |
2198 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); | |
2199 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); | |
2200 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); | |
2201 } | |
2202 | |
2203 /* | |
2204 * the memset call clears nonstandard fields in some impementations | |
2205 * that otherwise mess things up. | |
2206 */ | |
2207 memset(&addr, 0, sizeof(addr)); | |
2208 | |
2209 addr.sin_family = AF_INET; | |
2210 addr.sin_port = htons(port); | |
2211 addr.sin_addr = settings.interf; | |
2212 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { | |
2213 perror("bind()"); | |
2214 close(sfd); | |
2215 return -1; | |
2216 } | |
2217 if (!is_udp && listen(sfd, 1024) == -1) { | |
2218 perror("listen()"); | |
2219 close(sfd); | |
2220 return -1; | |
2221 } | |
2222 return sfd; | |
2223 } | |
2224 | |
2225 static int new_socket_unix(void) { | |
2226 int sfd; | |
2227 int flags; | |
2228 | |
2229 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { | |
2230 perror("socket()"); | |
2231 return -1; | |
2232 } | |
2233 | |
2234 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || | |
2235 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { | |
2236 perror("setting O_NONBLOCK"); | |
2237 close(sfd); | |
2238 return -1; | |
2239 } | |
2240 return sfd; | |
2241 } | |
2242 | |
2243 static int server_socket_unix(const char *path) { | |
2244 int sfd; | |
2245 struct linger ling = {0, 0}; | |
2246 struct sockaddr_un addr; | |
2247 struct stat tstat; | |
2248 int flags =1; | |
2249 | |
2250 if (!path) { | |
2251 return -1; | |
2252 } | |
2253 | |
2254 if ((sfd = new_socket_unix()) == -1) { | |
2255 return -1; | |
2256 } | |
2257 | |
2258 /* | |
2259 * Clean up a previous socket file if we left it around | |
2260 */ | |
2261 if (lstat(path, &tstat) == 0) { | |
2262 if (S_ISSOCK(tstat.st_mode)) | |
2263 unlink(path); | |
2264 } | |
2265 | |
2266 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); | |
2267 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); | |
2268 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); | |
2269 | |
2270 /* | |
2271 * the memset call clears nonstandard fields in some impementations | |
2272 * that otherwise mess things up. | |
2273 */ | |
2274 memset(&addr, 0, sizeof(addr)); | |
2275 | |
2276 addr.sun_family = AF_UNIX; | |
2277 strcpy(addr.sun_path, path); | |
2278 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { | |
2279 perror("bind()"); | |
2280 close(sfd); | |
2281 return -1; | |
2282 } | |
2283 if (listen(sfd, 1024) == -1) { | |
2284 perror("listen()"); | |
2285 close(sfd); | |
2286 return -1; | |
2287 } | |
2288 return sfd; | |
2289 } | |
2290 | |
2291 /* listening socket */ | |
2292 static int l_socket = 0; | |
2293 | |
2294 /* udp socket */ | |
2295 static int u_socket = -1; | |
2296 | |
2297 /* invoke right before gdb is called, on assert */ | |
2298 void pre_gdb(void) { | |
2299 int i; | |
2300 if (l_socket > -1) close(l_socket); | |
2301 if (u_socket > -1) close(u_socket); | |
2302 for (i = 3; i <= 500; i++) close(i); /* so lame */ | |
2303 kill(getpid(), SIGABRT); | |
2304 } | |
2305 | |
2306 /* | |
2307 * We keep the current time of day in a global variable that's updated by a | |
2308 * timer event. This saves us a bunch of time() system calls (we really only | |
2309 * need to get the time once a second, whereas there can be tens of thousands | |
2310 * of requests a second) and allows us to use server-start-relative timestamps | |
2311 * rather than absolute UNIX timestamps, a space savings on systems where | |
2312 * sizeof(time_t) > sizeof(unsigned int). | |
2313 */ | |
2314 volatile rel_time_t current_time; | |
2315 static struct event clockevent; | |
2316 | |
2317 /* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */ | |
2318 static void set_current_time(void) { | |
2319 current_time = (rel_time_t) (time(0) - stats.started); | |
2320 } | |
2321 | |
2322 static void clock_handler(const int fd, const short which, void *arg) { | |
2323 struct timeval t = {.tv_sec = 1, .tv_usec = 0}; | |
2324 static bool initialized = false; | |
2325 | |
2326 if (initialized) { | |
2327 /* only delete the event if it's actually there. */ | |
2328 evtimer_del(&clockevent); | |
2329 } else { | |
2330 initialized = true; | |
2331 } | |
2332 | |
2333 evtimer_set(&clockevent, clock_handler, 0); | |
2334 event_base_set(main_base, &clockevent); | |
2335 evtimer_add(&clockevent, &t); | |
2336 | |
2337 set_current_time(); | |
2338 } | |
2339 | |
2340 static struct event deleteevent; | |
2341 | |
2342 static void delete_handler(const int fd, const short which, void *arg) { | |
2343 struct timeval t = {.tv_sec = 5, .tv_usec = 0}; | |
2344 static bool initialized = false; | |
2345 | |
2346 if (initialized) { | |
2347 /* some versions of libevent don't like deleting events that don't exist, | |
2348 so only delete once we know this event has been added. */ | |
2349 evtimer_del(&deleteevent); | |
2350 } else { | |
2351 initialized = true; | |
2352 } | |
2353 | |
2354 evtimer_set(&deleteevent, delete_handler, 0); | |
2355 event_base_set(main_base, &deleteevent); | |
2356 evtimer_add(&deleteevent, &t); | |
2357 run_deferred_deletes(); | |
2358 } | |
2359 | |
2360 /* Call run_deferred_deletes instead of this. */ | |
2361 void do_run_deferred_deletes(void) | |
2362 { | |
2363 int i, j = 0; | |
2364 | |
2365 for (i = 0; i < delcurr; i++) { | |
2366 item *it = todelete[i]; | |
2367 if (item_delete_lock_over(it)) { | |
2368 assert(it->refcount > 0); | |
2369 it->it_flags &= ~ITEM_DELETED; | |
2370 do_item_unlink(it); | |
2371 do_item_remove(it); | |
2372 } else { | |
2373 todelete[j++] = it; | |
2374 } | |
2375 } | |
2376 delcurr = j; | |
2377 } | |
2378 | |
2379 static void usage(void) { | |
2380 printf(PACKAGE " " VERSION "\n"); | |
2381 printf("-p <num> TCP port number to listen on (default: 11211)\n" | |
2382 "-U <num> UDP port number to listen on (default: 0, off)\n" | |
2383 "-s <file> unix socket path to listen on (disables network support)\n" | |
2384 "-l <ip_addr> interface to listen on, default is INDRR_ANY\n" | |
2385 "-d run as a daemon\n" | |
2386 "-r maximize core file limit\n" | |
2387 "-u <username> assume identity of <username> (only when run as root)\n" | |
2388 "-m <num> max memory to use for items in megabytes, default is 64 MB\n" | |
2389 "-M return error on memory exhausted (rather than removing items)\n" | |
2390 "-c <num> max simultaneous connections, default is 1024\n" | |
2391 "-k lock down all paged memory\n" | |
2392 "-v verbose (print errors/warnings while in event loop)\n" | |
2393 "-vv very verbose (also print client commands/reponses)\n" | |
2394 "-h print this help and exit\n" | |
2395 "-i print memcached and libevent license\n" | |
2396 "-b run a managed instanced (mnemonic: buckets)\n" | |
2397 "-P <file> save PID in <file>, only used with -d option\n" | |
2398 "-f <factor> chunk size growth factor, default 1.25\n" | |
2399 "-n <bytes> minimum space allocated for key+value+flags, default 48\n"); | |
2400 #ifdef USE_THREADS | |
2401 printf("-t <num> number of threads to use, default 4\n"); | |
2402 #endif | |
2403 return; | |
2404 } | |
2405 | |
2406 static void usage_license(void) { | |
2407 printf(PACKAGE " " VERSION "\n\n"); | |
2408 printf( | |
2409 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n" | |
2410 "All rights reserved.\n" | |
2411 "\n" | |
2412 "Redistribution and use in source and binary forms, with or without\n" | |
2413 "modification, are permitted provided that the following conditions are\n" | |
2414 "met:\n" | |
2415 "\n" | |
2416 " * Redistributions of source code must retain the above copyright\n" | |
2417 "notice, this list of conditions and the following disclaimer.\n" | |
2418 "\n" | |
2419 " * Redistributions in binary form must reproduce the above\n" | |
2420 "copyright notice, this list of conditions and the following disclaimer\n" | |
2421 "in the documentation and/or other materials provided with the\n" | |
2422 "distribution.\n" | |
2423 "\n" | |
2424 " * Neither the name of the Danga Interactive nor the names of its\n" | |
2425 "contributors may be used to endorse or promote products derived from\n" | |
2426 "this software without specific prior written permission.\n" | |
2427 "\n" | |
2428 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n" | |
2429 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n" | |
2430 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n" | |
2431 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n" | |
2432 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n" | |
2433 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n" | |
2434 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n" | |
2435 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n" | |
2436 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n" | |
2437 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n" | |
2438 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n" | |
2439 "\n" | |
2440 "\n" | |
2441 "This product includes software developed by Niels Provos.\n" | |
2442 "\n" | |
2443 "[ libevent ]\n" | |
2444 "\n" | |
2445 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n" | |
2446 "All rights reserved.\n" | |
2447 "\n" | |
2448 "Redistribution and use in source and binary forms, with or without\n" | |
2449 "modification, are permitted provided that the following conditions\n" | |
2450 "are met:\n" | |
2451 "1. Redistributions of source code must retain the above copyright\n" | |
2452 " notice, this list of conditions and the following disclaimer.\n" | |
2453 "2. Redistributions in binary form must reproduce the above copyright\n" | |
2454 " notice, this list of conditions and the following disclaimer in the\n" | |
2455 " documentation and/or other materials provided with the distribution.\n" | |
2456 "3. All advertising materials mentioning features or use of this software\n" | |
2457 " must display the following acknowledgement:\n" | |
2458 " This product includes software developed by Niels Provos.\n" | |
2459 "4. The name of the author may not be used to endorse or promote products\n" | |
2460 " derived from this software without specific prior written permission.\n" | |
2461 "\n" | |
2462 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n" | |
2463 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n" | |
2464 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n" | |
2465 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n" | |
2466 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n" | |
2467 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n" | |
2468 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n" | |
2469 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n" | |
2470 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n" | |
2471 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n" | |
2472 ); | |
2473 | |
2474 return; | |
2475 } | |
2476 | |
2477 static void save_pid(const pid_t pid, const char *pid_file) { | |
2478 FILE *fp; | |
2479 if (pid_file == NULL) | |
2480 return; | |
2481 | |
2482 if (!(fp = fopen(pid_file, "w"))) { | |
2483 fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file); | |
2484 return; | |
2485 } | |
2486 | |
2487 fprintf(fp,"%ld\n", (long)pid); | |
2488 if (fclose(fp) == -1) { | |
2489 fprintf(stderr, "Could not close the pid file %s.\n", pid_file); | |
2490 return; | |
2491 } | |
2492 } | |
2493 | |
2494 static void remove_pidfile(const char *pid_file) { | |
2495 if (pid_file == NULL) | |
2496 return; | |
2497 | |
2498 if (unlink(pid_file) != 0) { | |
2499 fprintf(stderr, "Could not remove the pid file %s.\n", pid_file); | |
2500 } | |
2501 | |
2502 } | |
2503 | |
2504 | |
2505 static void sig_handler(const int sig) { | |
2506 printf("SIGINT handled.\n"); | |
2507 exit(EXIT_SUCCESS); | |
2508 } | |
2509 | |
2510 int main (int argc, char **argv) { | |
2511 int c; | |
2512 struct in_addr addr; | |
2513 bool lock_memory = false; | |
2514 bool daemonize = false; | |
2515 int maxcore = 0; | |
2516 char *username = NULL; | |
2517 char *pid_file = NULL; | |
2518 struct passwd *pw; | |
2519 struct sigaction sa; | |
2520 struct rlimit rlim; | |
2521 | |
2522 /* handle SIGINT */ | |
2523 signal(SIGINT, sig_handler); | |
2524 | |
2525 /* init settings */ | |
2526 settings_init(); | |
2527 | |
2528 /* set stderr non-buffering (for running under, say, daemontools) */ | |
2529 setbuf(stderr, NULL); | |
2530 | |
2531 /* process arguments */ | |
2532 while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) { | |
2533 switch (c) { | |
2534 case 'U': | |
2535 settings.udpport = atoi(optarg); | |
2536 break; | |
2537 case 'b': | |
2538 settings.managed = true; | |
2539 break; | |
2540 case 'p': | |
2541 settings.port = atoi(optarg); | |
2542 break; | |
2543 case 's': | |
2544 settings.socketpath = optarg; | |
2545 break; | |
2546 case 'm': | |
2547 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024; | |
2548 break; | |
2549 case 'M': | |
2550 settings.evict_to_free = 0; | |
2551 break; | |
2552 case 'c': | |
2553 settings.maxconns = atoi(optarg); | |
2554 break; | |
2555 case 'h': | |
2556 usage(); | |
2557 exit(EXIT_SUCCESS); | |
2558 case 'i': | |
2559 usage_license(); | |
2560 exit(EXIT_SUCCESS); | |
2561 case 'k': | |
2562 lock_memory = true; | |
2563 break; | |
2564 case 'v': | |
2565 settings.verbose++; | |
2566 break; | |
2567 case 'l': | |
2568 if (inet_pton(AF_INET, optarg, &addr) <= 0) { | |
2569 fprintf(stderr, "Illegal address: %s\n", optarg); | |
2570 return 1; | |
2571 } else { | |
2572 settings.interf = addr; | |
2573 } | |
2574 break; | |
2575 case 'd': | |
2576 daemonize = true; | |
2577 break; | |
2578 case 'r': | |
2579 maxcore = 1; | |
2580 break; | |
2581 case 'u': | |
2582 username = optarg; | |
2583 break; | |
2584 case 'P': | |
2585 pid_file = optarg; | |
2586 break; | |
2587 case 'f': | |
2588 settings.factor = atof(optarg); | |
2589 if (settings.factor <= 1.0) { | |
2590 fprintf(stderr, "Factor must be greater than 1\n"); | |
2591 return 1; | |
2592 } | |
2593 break; | |
2594 case 'n': | |
2595 settings.chunk_size = atoi(optarg); | |
2596 if (settings.chunk_size == 0) { | |
2597 fprintf(stderr, "Chunk size must be greater than 0\n"); | |
2598 return 1; | |
2599 } | |
2600 break; | |
2601 case 't': | |
2602 settings.num_threads = atoi(optarg); | |
2603 if (settings.num_threads == 0) { | |
2604 fprintf(stderr, "Number of threads must be greater than 0\n"); | |
2605 return 1; | |
2606 } | |
2607 break; | |
2608 case 'D': | |
2609 if (! optarg || ! optarg[0]) { | |
2610 fprintf(stderr, "No delimiter specified\n"); | |
2611 return 1; | |
2612 } | |
2613 settings.prefix_delimiter = optarg[0]; | |
2614 settings.detail_enabled = 1; | |
2615 break; | |
2616 default: | |
2617 fprintf(stderr, "Illegal argument \"%c\"\n", c); | |
2618 return 1; | |
2619 } | |
2620 } | |
2621 | |
2622 if (maxcore != 0) { | |
2623 struct rlimit rlim_new; | |
2624 /* | |
2625 * First try raising to infinity; if that fails, try bringing | |
2626 * the soft limit to the hard. | |
2627 */ | |
2628 if (getrlimit(RLIMIT_CORE, &rlim) == 0) { | |
2629 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY; | |
2630 if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) { | |
2631 /* failed. try raising just to the old max */ | |
2632 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max; | |
2633 (void)setrlimit(RLIMIT_CORE, &rlim_new); | |
2634 } | |
2635 } | |
2636 /* | |
2637 * getrlimit again to see what we ended up with. Only fail if | |
2638 * the soft limit ends up 0, because then no core files will be | |
2639 * created at all. | |
2640 */ | |
2641 | |
2642 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) { | |
2643 fprintf(stderr, "failed to ensure corefile creation\n"); | |
2644 exit(EXIT_FAILURE); | |
2645 } | |
2646 } | |
2647 | |
2648 /* | |
2649 * If needed, increase rlimits to allow as many connections | |
2650 * as needed. | |
2651 */ | |
2652 | |
2653 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { | |
2654 fprintf(stderr, "failed to getrlimit number of files\n"); | |
2655 exit(EXIT_FAILURE); | |
2656 } else { | |
2657 int maxfiles = settings.maxconns; | |
2658 if (rlim.rlim_cur < maxfiles) | |
2659 rlim.rlim_cur = maxfiles + 3; | |
2660 if (rlim.rlim_max < rlim.rlim_cur) | |
2661 rlim.rlim_max = rlim.rlim_cur; | |
2662 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) { | |
2663 fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n"); | |
2664 exit(EXIT_FAILURE); | |
2665 } | |
2666 } | |
2667 | |
2668 /* | |
2669 * initialization order: first create the listening sockets | |
2670 * (may need root on low ports), then drop root if needed, | |
2671 * then daemonise if needed, then init libevent (in some cases | |
2672 * descriptors created by libevent wouldn't survive forking). | |
2673 */ | |
2674 | |
2675 /* create the listening socket and bind it */ | |
2676 if (settings.socketpath == NULL) { | |
2677 l_socket = server_socket(settings.port, 0); | |
2678 if (l_socket == -1) { | |
2679 fprintf(stderr, "failed to listen\n"); | |
2680 exit(EXIT_FAILURE); | |
2681 } | |
2682 } | |
2683 | |
2684 if (settings.udpport > 0 && settings.socketpath == NULL) { | |
2685 /* create the UDP listening socket and bind it */ | |
2686 u_socket = server_socket(settings.udpport, 1); | |
2687 if (u_socket == -1) { | |
2688 fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport); | |
2689 exit(EXIT_FAILURE); | |
2690 } | |
2691 } | |
2692 | |
2693 /* lose root privileges if we have them */ | |
2694 if (getuid() == 0 || geteuid() == 0) { | |
2695 if (username == 0 || *username == '\0') { | |
2696 fprintf(stderr, "can't run as root without the -u switch\n"); | |
2697 return 1; | |
2698 } | |
2699 if ((pw = getpwnam(username)) == 0) { | |
2700 fprintf(stderr, "can't find the user %s to switch to\n", username); | |
2701 return 1; | |
2702 } | |
2703 if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) { | |
2704 fprintf(stderr, "failed to assume identity of user %s\n", username); | |
2705 return 1; | |
2706 } | |
2707 } | |
2708 | |
2709 /* create unix mode sockets after dropping privileges */ | |
2710 if (settings.socketpath != NULL) { | |
2711 l_socket = server_socket_unix(settings.socketpath); | |
2712 if (l_socket == -1) { | |
2713 fprintf(stderr, "failed to listen\n"); | |
2714 exit(EXIT_FAILURE); | |
2715 } | |
2716 } | |
2717 | |
2718 /* daemonize if requested */ | |
2719 /* if we want to ensure our ability to dump core, don't chdir to / */ | |
2720 if (daemonize) { | |
2721 int res; | |
2722 res = daemon(maxcore, settings.verbose); | |
2723 if (res == -1) { | |
2724 fprintf(stderr, "failed to daemon() in order to daemonize\n"); | |
2725 return 1; | |
2726 } | |
2727 } | |
2728 | |
2729 /* initialize main thread libevent instance */ | |
2730 main_base = event_init(); | |
2731 | |
2732 /* initialize other stuff */ | |
2733 item_init(); | |
2734 stats_init(); | |
2735 assoc_init(); | |
2736 conn_init(); | |
2737 slabs_init(settings.maxbytes, settings.factor); | |
2738 | |
2739 /* managed instance? alloc and zero a bucket array */ | |
2740 if (settings.managed) { | |
2741 buckets = malloc(sizeof(int) * MAX_BUCKETS); | |
2742 if (buckets == 0) { | |
2743 fprintf(stderr, "failed to allocate the bucket array"); | |
2744 exit(EXIT_FAILURE); | |
2745 } | |
2746 memset(buckets, 0, sizeof(int) * MAX_BUCKETS); | |
2747 } | |
2748 | |
2749 /* lock paged memory if needed */ | |
2750 if (lock_memory) { | |
2751 #ifdef HAVE_MLOCKALL | |
2752 mlockall(MCL_CURRENT | MCL_FUTURE); | |
2753 #else | |
2754 fprintf(stderr, "warning: mlockall() not supported on this platform. proceeding without.\n"); | |
2755 #endif | |
2756 } | |
2757 | |
2758 /* | |
2759 * ignore SIGPIPE signals; we can use errno==EPIPE if we | |
2760 * need that information | |
2761 */ | |
2762 sa.sa_handler = SIG_IGN; | |
2763 sa.sa_flags = 0; | |
2764 if (sigemptyset(&sa.sa_mask) == -1 || | |
2765 sigaction(SIGPIPE, &sa, 0) == -1) { | |
2766 perror("failed to ignore SIGPIPE; sigaction"); | |
2767 exit(EXIT_FAILURE); | |
2768 } | |
2769 /* create the initial listening connection */ | |
2770 if (!(listen_conn = conn_new(l_socket, conn_listening, | |
2771 EV_READ | EV_PERSIST, 1, false, main_base))) { | |
2772 fprintf(stderr, "failed to create listening connection"); | |
2773 exit(EXIT_FAILURE); | |
2774 } | |
2775 /* save the PID in if we're a daemon */ | |
2776 if (daemonize) | |
2777 save_pid(getpid(), pid_file); | |
2778 /* start up worker threads if MT mode */ | |
2779 thread_init(settings.num_threads, main_base); | |
2780 /* initialise clock event */ | |
2781 clock_handler(0, 0, 0); | |
2782 /* initialise deletion array and timer event */ | |
2783 deltotal = 200; | |
2784 delcurr = 0; | |
2785 todelete = malloc(sizeof(item *) * deltotal); | |
2786 delete_handler(0, 0, 0); /* sets up the event */ | |
2787 /* create the initial listening udp connection, monitored on all threads */ | |
2788 if (u_socket > -1) { | |
2789 for (c = 0; c < settings.num_threads; c++) { | |
2790 /* this is guaranteed to hit all threads because we round-robin */ | |
2791 dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, | |
2792 UDP_READ_BUFFER_SIZE, 1); | |
2793 } | |
2794 } | |
2795 /* enter the event loop */ | |
2796 event_base_loop(main_base, 0); | |
2797 /* remove the PID file if we're a daemon */ | |
2798 if (daemonize) | |
2799 remove_pidfile(pid_file); | |
2800 return 0; | |
2801 } |