0
|
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
|
2 /*
|
|
3 * memcached - memory caching daemon
|
|
4 *
|
|
5 * http://www.danga.com/memcached/
|
|
6 *
|
|
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
|
|
8 *
|
|
9 * Use and distribution licensed under the BSD license. See
|
|
10 * the LICENSE file for full text.
|
|
11 *
|
|
12 * Authors:
|
|
13 * Anatoly Vorobey <mellon@pobox.com>
|
|
14 * Brad Fitzpatrick <brad@danga.com>
|
|
15 std *
|
|
16 * $Id: memcached.c 570 2007-06-20 01:21:45Z plindner $
|
|
17 */
|
|
18 #include "memcached.h"
|
|
19 #include <sys/stat.h>
|
|
20 #include <sys/socket.h>
|
|
21 #include <sys/un.h>
|
|
22 #include <sys/signal.h>
|
|
23 #include <sys/resource.h>
|
|
24 #include <sys/uio.h>
|
|
25
|
|
26 /* some POSIX systems need the following definition
|
|
27 * to get mlockall flags out of sys/mman.h. */
|
|
28 #ifndef _P1003_1B_VISIBLE
|
|
29 #define _P1003_1B_VISIBLE
|
|
30 #endif
|
|
31 /* need this to get IOV_MAX on some platforms. */
|
|
32 #ifndef __need_IOV_MAX
|
|
33 #define __need_IOV_MAX
|
|
34 #endif
|
|
35 #include <pwd.h>
|
|
36 #include <sys/mman.h>
|
|
37 #include <fcntl.h>
|
|
38 #include <netinet/tcp.h>
|
|
39 #include <arpa/inet.h>
|
|
40 #include <errno.h>
|
|
41 #include <stdlib.h>
|
|
42 #include <stdio.h>
|
|
43 #include <string.h>
|
|
44 #include <time.h>
|
|
45 #include <assert.h>
|
|
46 #include <limits.h>
|
|
47
|
|
48 #ifdef HAVE_MALLOC_H
|
|
49 /* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
|
|
50 #ifndef __OpenBSD__
|
|
51 #include <malloc.h>
|
|
52 #endif
|
|
53 #endif
|
|
54
|
|
55 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
|
|
56 #ifndef IOV_MAX
|
|
57 #if defined(__FreeBSD__)
|
|
58 # define IOV_MAX 1024
|
|
59 #endif
|
|
60 #endif
|
|
61
|
|
62 /*
|
|
63 * forward declarations
|
|
64 */
|
|
65 static void drive_machine(conn *c);
|
|
66 static int new_socket(const bool is_udp);
|
|
67 static int server_socket(const int port, const bool is_udp);
|
|
68 static int try_read_command(conn *c);
|
|
69 static int try_read_network(conn *c);
|
|
70 static int try_read_udp(conn *c);
|
|
71
|
|
72 /* stats */
|
|
73 static void stats_reset(void);
|
|
74 static void stats_init(void);
|
|
75
|
|
76 /* defaults */
|
|
77 static void settings_init(void);
|
|
78
|
|
79 /* event handling, network IO */
|
|
80 static void event_handler(const int fd, const short which, void *arg);
|
|
81 static void conn_close(conn *c);
|
|
82 static void conn_init(void);
|
|
83 static void accept_new_conns(const bool do_accept);
|
|
84 static bool update_event(conn *c, const int new_flags);
|
|
85 static void complete_nread(conn *c);
|
|
86 static void process_command(conn *c, char *command);
|
|
87 static int transmit(conn *c);
|
|
88 static int ensure_iov_space(conn *c);
|
|
89 static int add_iov(conn *c, const void *buf, int len);
|
|
90 static int add_msghdr(conn *c);
|
|
91
|
|
92
|
|
93 /* time handling */
|
|
94 static void set_current_time(void); /* update the global variable holding
|
|
95 global 32-bit seconds-since-start time
|
|
96 (to avoid 64 bit time_t) */
|
|
97
|
|
98 void pre_gdb(void);
|
|
99 static void conn_free(conn *c);
|
|
100
|
|
101 /** exported globals **/
|
|
102 struct stats stats;
|
|
103 struct settings settings;
|
|
104
|
|
105 /** file scope variables **/
|
|
106 static item **todelete = 0;
|
|
107 static int delcurr;
|
|
108 static int deltotal;
|
|
109 static conn *listen_conn;
|
|
110 static struct event_base *main_base;
|
|
111
|
|
112 #define TRANSMIT_COMPLETE 0
|
|
113 #define TRANSMIT_INCOMPLETE 1
|
|
114 #define TRANSMIT_SOFT_ERROR 2
|
|
115 #define TRANSMIT_HARD_ERROR 3
|
|
116
|
|
117 static int *buckets = 0; /* bucket->generation array for a managed instance */
|
|
118
|
|
119 #define REALTIME_MAXDELTA 60*60*24*30
|
|
120 /*
|
|
121 * given time value that's either unix time or delta from current unix time, return
|
|
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't
|
|
123 * be that low).
|
|
124 */
|
|
125 static rel_time_t realtime(const time_t exptime) {
|
|
126 /* no. of seconds in 30 days - largest possible delta exptime */
|
|
127
|
|
128 if (exptime == 0) return 0; /* 0 means never expire */
|
|
129
|
|
130 if (exptime > REALTIME_MAXDELTA) {
|
|
131 /* if item expiration is at/before the server started, give it an
|
|
132 expiration time of 1 second after the server started.
|
|
133 (because 0 means don't expire). without this, we'd
|
|
134 underflow and wrap around to some large value way in the
|
|
135 future, effectively making items expiring in the past
|
|
136 really expiring never */
|
|
137 if (exptime <= stats.started)
|
|
138 return (rel_time_t)1;
|
|
139 return (rel_time_t)(exptime - stats.started);
|
|
140 } else {
|
|
141 return (rel_time_t)(exptime + current_time);
|
|
142 }
|
|
143 }
|
|
144
|
|
145 static void stats_init(void) {
|
|
146 stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
|
|
147 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
|
|
148 stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
|
|
149
|
|
150 /* make the time we started always be 2 seconds before we really
|
|
151 did, so time(0) - time.started is never zero. if so, things
|
|
152 like 'settings.oldest_live' which act as booleans as well as
|
|
153 values are now false in boolean context... */
|
|
154 stats.started = time(0) - 2;
|
|
155 stats_prefix_init();
|
|
156 }
|
|
157
|
|
158 static void stats_reset(void) {
|
|
159 STATS_LOCK();
|
|
160 stats.total_items = stats.total_conns = 0;
|
|
161 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
|
|
162 stats.bytes_read = stats.bytes_written = 0;
|
|
163 stats_prefix_clear();
|
|
164 STATS_UNLOCK();
|
|
165 }
|
|
166
|
|
167 static void settings_init(void) {
|
|
168 settings.port = 11211;
|
|
169 settings.udpport = 0;
|
|
170 settings.interf.s_addr = htonl(INADDR_ANY);
|
|
171 settings.maxbytes = 67108864; /* default is 64MB: (64 * 1024 * 1024) */
|
|
172 settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
|
|
173 settings.verbose = 0;
|
|
174 settings.oldest_live = 0;
|
|
175 settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
|
|
176 settings.socketpath = NULL; /* by default, not using a unix socket */
|
|
177 settings.managed = false;
|
|
178 settings.factor = 1.25;
|
|
179 settings.chunk_size = 48; /* space for a modest key and value */
|
|
180 #ifdef USE_THREADS
|
|
181 settings.num_threads = 4;
|
|
182 #else
|
|
183 settings.num_threads = 1;
|
|
184 #endif
|
|
185 settings.prefix_delimiter = ':';
|
|
186 settings.detail_enabled = 0;
|
|
187 }
|
|
188
|
|
189 /* returns true if a deleted item's delete-locked-time is over, and it
|
|
190 should be removed from the namespace */
|
|
191 static bool item_delete_lock_over (item *it) {
|
|
192 assert(it->it_flags & ITEM_DELETED);
|
|
193 return (current_time >= it->exptime);
|
|
194 }
|
|
195
|
|
196 /*
|
|
197 * Adds a message header to a connection.
|
|
198 *
|
|
199 * Returns 0 on success, -1 on out-of-memory.
|
|
200 */
|
|
201 static int add_msghdr(conn *c)
|
|
202 {
|
|
203 struct msghdr *msg;
|
|
204
|
|
205 assert(c != NULL);
|
|
206
|
|
207 if (c->msgsize == c->msgused) {
|
|
208 msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
|
|
209 if (! msg)
|
|
210 return -1;
|
|
211 c->msglist = msg;
|
|
212 c->msgsize *= 2;
|
|
213 }
|
|
214
|
|
215 msg = c->msglist + c->msgused;
|
|
216
|
|
217 /* this wipes msg_iovlen, msg_control, msg_controllen, and
|
|
218 msg_flags, the last 3 of which aren't defined on solaris: */
|
|
219 memset(msg, 0, sizeof(struct msghdr));
|
|
220
|
|
221 msg->msg_iov = &c->iov[c->iovused];
|
|
222 msg->msg_name = &c->request_addr;
|
|
223 msg->msg_namelen = c->request_addr_size;
|
|
224
|
|
225 c->msgbytes = 0;
|
|
226 c->msgused++;
|
|
227
|
|
228 if (c->udp) {
|
|
229 /* Leave room for the UDP header, which we'll fill in later. */
|
|
230 return add_iov(c, NULL, UDP_HEADER_SIZE);
|
|
231 }
|
|
232
|
|
233 return 0;
|
|
234 }
|
|
235
|
|
236
|
|
237 /*
|
|
238 * Free list management for connections.
|
|
239 */
|
|
240
|
|
241 static conn **freeconns;
|
|
242 static int freetotal;
|
|
243 static int freecurr;
|
|
244
|
|
245
|
|
246 static void conn_init(void) {
|
|
247 freetotal = 200;
|
|
248 freecurr = 0;
|
|
249 if (!(freeconns = (conn **)malloc(sizeof(conn *) * freetotal))) {
|
|
250 perror("malloc()");
|
|
251 }
|
|
252 return;
|
|
253 }
|
|
254
|
|
255 /*
|
|
256 * Returns a connection from the freelist, if any. Should call this using
|
|
257 * conn_from_freelist() for thread safety.
|
|
258 */
|
|
259 conn *do_conn_from_freelist() {
|
|
260 conn *c;
|
|
261
|
|
262 if (freecurr > 0) {
|
|
263 c = freeconns[--freecurr];
|
|
264 } else {
|
|
265 c = NULL;
|
|
266 }
|
|
267
|
|
268 return c;
|
|
269 }
|
|
270
|
|
271 /*
|
|
272 * Adds a connection to the freelist. 0 = success. Should call this using
|
|
273 * conn_add_to_freelist() for thread safety.
|
|
274 */
|
|
275 int do_conn_add_to_freelist(conn *c) {
|
|
276 if (freecurr < freetotal) {
|
|
277 freeconns[freecurr++] = c;
|
|
278 return 0;
|
|
279 } else {
|
|
280 /* try to enlarge free connections array */
|
|
281 conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
|
|
282 if (new_freeconns) {
|
|
283 freetotal *= 2;
|
|
284 freeconns = new_freeconns;
|
|
285 freeconns[freecurr++] = c;
|
|
286 return 0;
|
|
287 }
|
|
288 }
|
|
289 return 1;
|
|
290 }
|
|
291
|
|
292 conn *conn_new(const int sfd, const int init_state, const int event_flags,
|
|
293 const int read_buffer_size, const bool is_udp, struct event_base *base) {
|
|
294 conn *c = conn_from_freelist();
|
|
295
|
|
296 if (NULL == c) {
|
|
297 if (!(c = (conn *)malloc(sizeof(conn)))) {
|
|
298 perror("malloc()");
|
|
299 return NULL;
|
|
300 }
|
|
301 c->rbuf = c->wbuf = 0;
|
|
302 c->ilist = 0;
|
|
303 c->iov = 0;
|
|
304 c->msglist = 0;
|
|
305 c->hdrbuf = 0;
|
|
306
|
|
307 c->rsize = read_buffer_size;
|
|
308 c->wsize = DATA_BUFFER_SIZE;
|
|
309 c->isize = ITEM_LIST_INITIAL;
|
|
310 c->iovsize = IOV_LIST_INITIAL;
|
|
311 c->msgsize = MSG_LIST_INITIAL;
|
|
312 c->hdrsize = 0;
|
|
313
|
|
314 c->rbuf = (char *)malloc((size_t)c->rsize);
|
|
315 c->wbuf = (char *)malloc((size_t)c->wsize);
|
|
316 c->ilist = (item **)malloc(sizeof(item *) * c->isize);
|
|
317 c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
|
|
318 c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
|
|
319
|
|
320 if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
|
|
321 c->msglist == 0) {
|
|
322 if (c->rbuf != 0) free(c->rbuf);
|
|
323 if (c->wbuf != 0) free(c->wbuf);
|
|
324 if (c->ilist !=0) free(c->ilist);
|
|
325 if (c->iov != 0) free(c->iov);
|
|
326 if (c->msglist != 0) free(c->msglist);
|
|
327 free(c);
|
|
328 perror("malloc()");
|
|
329 return NULL;
|
|
330 }
|
|
331
|
|
332 STATS_LOCK();
|
|
333 stats.conn_structs++;
|
|
334 STATS_UNLOCK();
|
|
335 }
|
|
336
|
|
337 if (settings.verbose > 1) {
|
|
338 if (init_state == conn_listening)
|
|
339 fprintf(stderr, "<%d server listening\n", sfd);
|
|
340 else if (is_udp)
|
|
341 fprintf(stderr, "<%d server listening (udp)\n", sfd);
|
|
342 else
|
|
343 fprintf(stderr, "<%d new client connection\n", sfd);
|
|
344 }
|
|
345
|
|
346 c->sfd = sfd;
|
|
347 c->udp = is_udp;
|
|
348 c->state = init_state;
|
|
349 c->rlbytes = 0;
|
|
350 c->rbytes = c->wbytes = 0;
|
|
351 c->wcurr = c->wbuf;
|
|
352 c->rcurr = c->rbuf;
|
|
353 c->ritem = 0;
|
|
354 c->icurr = c->ilist;
|
|
355 c->ileft = 0;
|
|
356 c->iovused = 0;
|
|
357 c->msgcurr = 0;
|
|
358 c->msgused = 0;
|
|
359
|
|
360 c->write_and_go = conn_read;
|
|
361 c->write_and_free = 0;
|
|
362 c->item = 0;
|
|
363 c->bucket = -1;
|
|
364 c->gen = 0;
|
|
365
|
|
366 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
|
|
367 event_base_set(base, &c->event);
|
|
368 c->ev_flags = event_flags;
|
|
369
|
|
370 if (event_add(&c->event, 0) == -1) {
|
|
371 if (conn_add_to_freelist(c)) {
|
|
372 conn_free(c);
|
|
373 }
|
|
374 return NULL;
|
|
375 }
|
|
376
|
|
377 STATS_LOCK();
|
|
378 stats.curr_conns++;
|
|
379 stats.total_conns++;
|
|
380 STATS_UNLOCK();
|
|
381
|
|
382 return c;
|
|
383 }
|
|
384
|
|
385 static void conn_cleanup(conn *c) {
|
|
386 assert(c != NULL);
|
|
387
|
|
388 if (c->item) {
|
|
389 item_remove(c->item);
|
|
390 c->item = 0;
|
|
391 }
|
|
392
|
|
393 if (c->ileft != 0) {
|
|
394 for (; c->ileft > 0; c->ileft--,c->icurr++) {
|
|
395 item_remove(*(c->icurr));
|
|
396 }
|
|
397 }
|
|
398
|
|
399 if (c->write_and_free) {
|
|
400 free(c->write_and_free);
|
|
401 c->write_and_free = 0;
|
|
402 }
|
|
403 }
|
|
404
|
|
405 /*
|
|
406 * Frees a connection.
|
|
407 */
|
|
408 void conn_free(conn *c) {
|
|
409 if (c) {
|
|
410 if (c->hdrbuf)
|
|
411 free(c->hdrbuf);
|
|
412 if (c->msglist)
|
|
413 free(c->msglist);
|
|
414 if (c->rbuf)
|
|
415 free(c->rbuf);
|
|
416 if (c->wbuf)
|
|
417 free(c->wbuf);
|
|
418 if (c->ilist)
|
|
419 free(c->ilist);
|
|
420 if (c->iov)
|
|
421 free(c->iov);
|
|
422 free(c);
|
|
423 }
|
|
424 }
|
|
425
|
|
426 static void conn_close(conn *c) {
|
|
427 assert(c != NULL);
|
|
428
|
|
429 /* delete the event, the socket and the conn */
|
|
430 event_del(&c->event);
|
|
431
|
|
432 if (settings.verbose > 1)
|
|
433 fprintf(stderr, "<%d connection closed.\n", c->sfd);
|
|
434
|
|
435 close(c->sfd);
|
|
436 accept_new_conns(true);
|
|
437 conn_cleanup(c);
|
|
438
|
|
439 /* if the connection has big buffers, just free it */
|
|
440 if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
|
|
441 conn_free(c);
|
|
442 }
|
|
443
|
|
444 STATS_LOCK();
|
|
445 stats.curr_conns--;
|
|
446 STATS_UNLOCK();
|
|
447
|
|
448 return;
|
|
449 }
|
|
450
|
|
451
|
|
452 /*
|
|
453 * Shrinks a connection's buffers if they're too big. This prevents
|
|
454 * periodic large "get" requests from permanently chewing lots of server
|
|
455 * memory.
|
|
456 *
|
|
457 * This should only be called in between requests since it can wipe output
|
|
458 * buffers!
|
|
459 */
|
|
460 static void conn_shrink(conn *c) {
|
|
461 assert(c != NULL);
|
|
462
|
|
463 if (c->udp)
|
|
464 return;
|
|
465
|
|
466 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
|
|
467 char *newbuf;
|
|
468
|
|
469 if (c->rcurr != c->rbuf)
|
|
470 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
|
|
471
|
|
472 newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
|
|
473
|
|
474 if (newbuf) {
|
|
475 c->rbuf = newbuf;
|
|
476 c->rsize = DATA_BUFFER_SIZE;
|
|
477 }
|
|
478 /* TODO check other branch... */
|
|
479 c->rcurr = c->rbuf;
|
|
480 }
|
|
481
|
|
482 if (c->isize > ITEM_LIST_HIGHWAT) {
|
|
483 item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
|
|
484 if (newbuf) {
|
|
485 c->ilist = newbuf;
|
|
486 c->isize = ITEM_LIST_INITIAL;
|
|
487 }
|
|
488 /* TODO check error condition? */
|
|
489 }
|
|
490
|
|
491 if (c->msgsize > MSG_LIST_HIGHWAT) {
|
|
492 struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
|
|
493 if (newbuf) {
|
|
494 c->msglist = newbuf;
|
|
495 c->msgsize = MSG_LIST_INITIAL;
|
|
496 }
|
|
497 /* TODO check error condition? */
|
|
498 }
|
|
499
|
|
500 if (c->iovsize > IOV_LIST_HIGHWAT) {
|
|
501 struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
|
|
502 if (newbuf) {
|
|
503 c->iov = newbuf;
|
|
504 c->iovsize = IOV_LIST_INITIAL;
|
|
505 }
|
|
506 /* TODO check return value */
|
|
507 }
|
|
508 }
|
|
509
|
|
510 /*
|
|
511 * Sets a connection's current state in the state machine. Any special
|
|
512 * processing that needs to happen on certain state transitions can
|
|
513 * happen here.
|
|
514 */
|
|
515 static void conn_set_state(conn *c, int state) {
|
|
516 assert(c != NULL);
|
|
517
|
|
518 if (state != c->state) {
|
|
519 if (state == conn_read) {
|
|
520 conn_shrink(c);
|
|
521 assoc_move_next_bucket();
|
|
522 }
|
|
523 c->state = state;
|
|
524 }
|
|
525 }
|
|
526
|
|
527
|
|
528 /*
|
|
529 * Ensures that there is room for another struct iovec in a connection's
|
|
530 * iov list.
|
|
531 *
|
|
532 * Returns 0 on success, -1 on out-of-memory.
|
|
533 */
|
|
534 static int ensure_iov_space(conn *c) {
|
|
535 assert(c != NULL);
|
|
536
|
|
537 if (c->iovused >= c->iovsize) {
|
|
538 int i, iovnum;
|
|
539 struct iovec *new_iov = (struct iovec *)realloc(c->iov,
|
|
540 (c->iovsize * 2) * sizeof(struct iovec));
|
|
541 if (! new_iov)
|
|
542 return -1;
|
|
543 c->iov = new_iov;
|
|
544 c->iovsize *= 2;
|
|
545
|
|
546 /* Point all the msghdr structures at the new list. */
|
|
547 for (i = 0, iovnum = 0; i < c->msgused; i++) {
|
|
548 c->msglist[i].msg_iov = &c->iov[iovnum];
|
|
549 iovnum += c->msglist[i].msg_iovlen;
|
|
550 }
|
|
551 }
|
|
552
|
|
553 return 0;
|
|
554 }
|
|
555
|
|
556
|
|
557 /*
|
|
558 * Adds data to the list of pending data that will be written out to a
|
|
559 * connection.
|
|
560 *
|
|
561 * Returns 0 on success, -1 on out-of-memory.
|
|
562 */
|
|
563
|
|
564 static int add_iov(conn *c, const void *buf, int len) {
|
|
565 struct msghdr *m;
|
|
566 int leftover;
|
|
567 bool limit_to_mtu;
|
|
568
|
|
569 assert(c != NULL);
|
|
570
|
|
571 do {
|
|
572 m = &c->msglist[c->msgused - 1];
|
|
573
|
|
574 /*
|
|
575 * Limit UDP packets, and the first payloads of TCP replies, to
|
|
576 * UDP_MAX_PAYLOAD_SIZE bytes.
|
|
577 */
|
|
578 limit_to_mtu = c->udp || (1 == c->msgused);
|
|
579
|
|
580 /* We may need to start a new msghdr if this one is full. */
|
|
581 if (m->msg_iovlen == IOV_MAX ||
|
|
582 (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
|
|
583 add_msghdr(c);
|
|
584 m = &c->msglist[c->msgused - 1];
|
|
585 }
|
|
586
|
|
587 if (ensure_iov_space(c) != 0)
|
|
588 return -1;
|
|
589
|
|
590 /* If the fragment is too big to fit in the datagram, split it up */
|
|
591 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
|
|
592 leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
|
|
593 len -= leftover;
|
|
594 } else {
|
|
595 leftover = 0;
|
|
596 }
|
|
597
|
|
598 m = &c->msglist[c->msgused - 1];
|
|
599 m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
|
|
600 m->msg_iov[m->msg_iovlen].iov_len = len;
|
|
601
|
|
602 c->msgbytes += len;
|
|
603 c->iovused++;
|
|
604 m->msg_iovlen++;
|
|
605
|
|
606 buf = ((char *)buf) + len;
|
|
607 len = leftover;
|
|
608 } while (leftover > 0);
|
|
609
|
|
610 return 0;
|
|
611 }
|
|
612
|
|
613
|
|
614 /*
|
|
615 * Constructs a set of UDP headers and attaches them to the outgoing messages.
|
|
616 */
|
|
617 static int build_udp_headers(conn *c) {
|
|
618 int i;
|
|
619 unsigned char *hdr;
|
|
620
|
|
621 assert(c != NULL);
|
|
622
|
|
623 if (c->msgused > c->hdrsize) {
|
|
624 void *new_hdrbuf;
|
|
625 if (c->hdrbuf)
|
|
626 new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
|
|
627 else
|
|
628 new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
|
|
629 if (! new_hdrbuf)
|
|
630 return -1;
|
|
631 c->hdrbuf = (unsigned char *)new_hdrbuf;
|
|
632 c->hdrsize = c->msgused * 2;
|
|
633 }
|
|
634
|
|
635 hdr = c->hdrbuf;
|
|
636 for (i = 0; i < c->msgused; i++) {
|
|
637 c->msglist[i].msg_iov[0].iov_base = hdr;
|
|
638 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
|
|
639 *hdr++ = c->request_id / 256;
|
|
640 *hdr++ = c->request_id % 256;
|
|
641 *hdr++ = i / 256;
|
|
642 *hdr++ = i % 256;
|
|
643 *hdr++ = c->msgused / 256;
|
|
644 *hdr++ = c->msgused % 256;
|
|
645 *hdr++ = 0;
|
|
646 *hdr++ = 0;
|
|
647 assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
|
|
648 }
|
|
649
|
|
650 return 0;
|
|
651 }
|
|
652
|
|
653
|
|
654 static void out_string(conn *c, const char *str) {
|
|
655 size_t len;
|
|
656
|
|
657 assert(c != NULL);
|
|
658
|
|
659 if (settings.verbose > 1)
|
|
660 fprintf(stderr, ">%d %s\n", c->sfd, str);
|
|
661
|
|
662 len = strlen(str);
|
|
663 if ((len + 2) > c->wsize) {
|
|
664 /* ought to be always enough. just fail for simplicity */
|
|
665 str = "SERVER_ERROR output line too long";
|
|
666 len = strlen(str);
|
|
667 }
|
|
668
|
|
669 memcpy(c->wbuf, str, len);
|
|
670 memcpy(c->wbuf + len, "\r\n", 3);
|
|
671 c->wbytes = len + 2;
|
|
672 c->wcurr = c->wbuf;
|
|
673
|
|
674 conn_set_state(c, conn_write);
|
|
675 c->write_and_go = conn_read;
|
|
676 return;
|
|
677 }
|
|
678
|
|
679 /*
|
|
680 * we get here after reading the value in set/add/replace commands. The command
|
|
681 * has been stored in c->item_comm, and the item is ready in c->item.
|
|
682 */
|
|
683
|
|
684 static void complete_nread(conn *c) {
|
|
685 assert(c != NULL);
|
|
686
|
|
687 item *it = c->item;
|
|
688 int comm = c->item_comm;
|
|
689
|
|
690 STATS_LOCK();
|
|
691 stats.set_cmds++;
|
|
692 STATS_UNLOCK();
|
|
693
|
|
694 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
|
|
695 out_string(c, "CLIENT_ERROR bad data chunk");
|
|
696 } else {
|
|
697 if (store_item(it, comm)) {
|
|
698 out_string(c, "STORED");
|
|
699 } else {
|
|
700 out_string(c, "NOT_STORED");
|
|
701 }
|
|
702 }
|
|
703
|
|
704 item_remove(c->item); /* release the c->item reference */
|
|
705 c->item = 0;
|
|
706 }
|
|
707
|
|
708 /*
|
|
709 * Stores an item in the cache according to the semantics of one of the set
|
|
710 * commands. In threaded mode, this is protected by the cache lock.
|
|
711 *
|
|
712 * Returns true if the item was stored.
|
|
713 */
|
|
714 int do_store_item(item *it, int comm) {
|
|
715 char *key = ITEM_key(it);
|
|
716 bool delete_locked = false;
|
|
717 item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
|
|
718 int stored = 0;
|
|
719
|
|
720 if (old_it != NULL && comm == NREAD_ADD) {
|
|
721 /* add only adds a nonexistent item, but promote to head of LRU */
|
|
722 do_item_update(old_it);
|
|
723 } else if (!old_it && comm == NREAD_REPLACE) {
|
|
724 /* replace only replaces an existing value; don't store */
|
|
725 } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
|
|
726 /* replace and add can't override delete locks; don't store */
|
|
727 } else {
|
|
728 /* "set" commands can override the delete lock
|
|
729 window... in which case we have to find the old hidden item
|
|
730 that's in the namespace/LRU but wasn't returned by
|
|
731 item_get.... because we need to replace it */
|
|
732 if (delete_locked)
|
|
733 old_it = do_item_get_nocheck(key, it->nkey);
|
|
734
|
|
735 if (old_it != NULL)
|
|
736 do_item_replace(old_it, it);
|
|
737 else
|
|
738 do_item_link(it);
|
|
739
|
|
740 stored = 1;
|
|
741 }
|
|
742
|
|
743 if (old_it)
|
|
744 do_item_remove(old_it); /* release our reference */
|
|
745 return stored;
|
|
746 }
|
|
747
|
|
748 typedef struct token_s {
|
|
749 char *value;
|
|
750 size_t length;
|
|
751 } token_t;
|
|
752
|
|
753 #define COMMAND_TOKEN 0
|
|
754 #define SUBCOMMAND_TOKEN 1
|
|
755 #define KEY_TOKEN 1
|
|
756 #define KEY_MAX_LENGTH 250
|
|
757
|
|
758 #define MAX_TOKENS 6
|
|
759
|
|
760 /*
|
|
761 * Tokenize the command string by replacing whitespace with '\0' and update
|
|
762 * the token array tokens with pointer to start of each token and length.
|
|
763 * Returns total number of tokens. The last valid token is the terminal
|
|
764 * token (value points to the first unprocessed character of the string and
|
|
765 * length zero).
|
|
766 *
|
|
767 * Usage example:
|
|
768 *
|
|
769 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
|
|
770 * for(int ix = 0; tokens[ix].length != 0; ix++) {
|
|
771 * ...
|
|
772 * }
|
|
773 * ncommand = tokens[ix].value - command;
|
|
774 * command = tokens[ix].value;
|
|
775 * }
|
|
776 */
|
|
777 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
|
|
778 char *s, *e;
|
|
779 size_t ntokens = 0;
|
|
780
|
|
781 assert(command != NULL && tokens != NULL && max_tokens > 1);
|
|
782
|
|
783 for (s = e = command; ntokens < max_tokens - 1; ++e) {
|
|
784 if (*e == ' ') {
|
|
785 if (s != e) {
|
|
786 tokens[ntokens].value = s;
|
|
787 tokens[ntokens].length = e - s;
|
|
788 ntokens++;
|
|
789 *e = '\0';
|
|
790 }
|
|
791 s = e + 1;
|
|
792 }
|
|
793 else if (*e == '\0') {
|
|
794 if (s != e) {
|
|
795 tokens[ntokens].value = s;
|
|
796 tokens[ntokens].length = e - s;
|
|
797 ntokens++;
|
|
798 }
|
|
799
|
|
800 break; /* string end */
|
|
801 }
|
|
802 }
|
|
803
|
|
804 /*
|
|
805 * If we scanned the whole string, the terminal value pointer is null,
|
|
806 * otherwise it is the first unprocessed character.
|
|
807 */
|
|
808 tokens[ntokens].value = *e == '\0' ? NULL : e;
|
|
809 tokens[ntokens].length = 0;
|
|
810 ntokens++;
|
|
811
|
|
812 return ntokens;
|
|
813 }
|
|
814
|
|
815 inline static void process_stats_detail(conn *c, const char *command) {
|
|
816 assert(c != NULL);
|
|
817
|
|
818 if (strcmp(command, "on") == 0) {
|
|
819 settings.detail_enabled = 1;
|
|
820 out_string(c, "OK");
|
|
821 }
|
|
822 else if (strcmp(command, "off") == 0) {
|
|
823 settings.detail_enabled = 0;
|
|
824 out_string(c, "OK");
|
|
825 }
|
|
826 else if (strcmp(command, "dump") == 0) {
|
|
827 int len;
|
|
828 char *stats = stats_prefix_dump(&len);
|
|
829 if (NULL != stats) {
|
|
830 c->write_and_free = stats;
|
|
831 c->wcurr = stats;
|
|
832 c->wbytes = len;
|
|
833 conn_set_state(c, conn_write);
|
|
834 c->write_and_go = conn_read;
|
|
835 }
|
|
836 else {
|
|
837 out_string(c, "SERVER_ERROR");
|
|
838 }
|
|
839 }
|
|
840 else {
|
|
841 out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
|
|
842 }
|
|
843 }
|
|
844
|
|
845 static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
|
|
846 rel_time_t now = current_time;
|
|
847 char *command;
|
|
848 char *subcommand;
|
|
849
|
|
850 assert(c != NULL);
|
|
851
|
|
852 if(ntokens < 2) {
|
|
853 out_string(c, "CLIENT_ERROR bad command line");
|
|
854 return;
|
|
855 }
|
|
856
|
|
857 command = tokens[COMMAND_TOKEN].value;
|
|
858
|
|
859 if (ntokens == 2 && strcmp(command, "stats") == 0) {
|
|
860 char temp[1024];
|
|
861 pid_t pid = getpid();
|
|
862 char *pos = temp;
|
|
863
|
|
864 #ifndef WIN32
|
|
865 struct rusage usage;
|
|
866 getrusage(RUSAGE_SELF, &usage);
|
|
867 #endif /* !WIN32 */
|
|
868
|
|
869 STATS_LOCK();
|
|
870 pos += sprintf(pos, "STAT pid %u\r\n", pid);
|
|
871 pos += sprintf(pos, "STAT uptime %u\r\n", now);
|
|
872 pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
|
|
873 pos += sprintf(pos, "STAT version " VERSION "\r\n");
|
|
874 pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
|
|
875 #ifndef WIN32
|
|
876 pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
|
|
877 pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
|
|
878 #endif /* !WIN32 */
|
|
879 pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
|
|
880 pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
|
|
881 pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
|
|
882 pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
|
|
883 pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
|
|
884 pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
|
|
885 pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
|
|
886 pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
|
|
887 pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
|
|
888 pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
|
|
889 pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
|
|
890 pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
|
|
891 pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
|
|
892 pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
|
|
893 pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
|
|
894 pos += sprintf(pos, "END");
|
|
895 STATS_UNLOCK();
|
|
896 out_string(c, temp);
|
|
897 return;
|
|
898 }
|
|
899
|
|
900 subcommand = tokens[SUBCOMMAND_TOKEN].value;
|
|
901
|
|
902 if (strcmp(subcommand, "reset") == 0) {
|
|
903 stats_reset();
|
|
904 out_string(c, "RESET");
|
|
905 return;
|
|
906 }
|
|
907
|
|
908 #ifdef HAVE_MALLOC_H
|
|
909 #ifdef HAVE_STRUCT_MALLINFO
|
|
910 if (strcmp(subcommand, "malloc") == 0) {
|
|
911 char temp[512];
|
|
912 struct mallinfo info;
|
|
913 char *pos = temp;
|
|
914
|
|
915 info = mallinfo();
|
|
916 pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
|
|
917 pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
|
|
918 pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
|
|
919 pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
|
|
920 pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
|
|
921 pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
|
|
922 pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
|
|
923 pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
|
|
924 pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
|
|
925 pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
|
|
926 out_string(c, temp);
|
|
927 return;
|
|
928 }
|
|
929 #endif /* HAVE_STRUCT_MALLINFO */
|
|
930 #endif /* HAVE_MALLOC_H */
|
|
931
|
|
932 #if !defined(WIN32) || !defined(__APPLE__)
|
|
933 if (strcmp(subcommand, "maps") == 0) {
|
|
934 char *wbuf;
|
|
935 int wsize = 8192; /* should be enough */
|
|
936 int fd;
|
|
937 int res;
|
|
938
|
|
939 if (!(wbuf = (char *)malloc(wsize))) {
|
|
940 out_string(c, "SERVER_ERROR out of memory");
|
|
941 return;
|
|
942 }
|
|
943
|
|
944 fd = open("/proc/self/maps", O_RDONLY);
|
|
945 if (fd == -1) {
|
|
946 out_string(c, "SERVER_ERROR cannot open the maps file");
|
|
947 free(wbuf);
|
|
948 return;
|
|
949 }
|
|
950
|
|
951 res = read(fd, wbuf, wsize - 6); /* 6 = END\r\n\0 */
|
|
952 if (res == wsize - 6) {
|
|
953 out_string(c, "SERVER_ERROR buffer overflow");
|
|
954 free(wbuf); close(fd);
|
|
955 return;
|
|
956 }
|
|
957 if (res == 0 || res == -1) {
|
|
958 out_string(c, "SERVER_ERROR can't read the maps file");
|
|
959 free(wbuf); close(fd);
|
|
960 return;
|
|
961 }
|
|
962 memcpy(wbuf + res, "END\r\n", 6);
|
|
963 c->write_and_free = wbuf;
|
|
964 c->wcurr = wbuf;
|
|
965 c->wbytes = res + 5; // Don't write the terminal '\0'
|
|
966 conn_set_state(c, conn_write);
|
|
967 c->write_and_go = conn_read;
|
|
968 close(fd);
|
|
969 return;
|
|
970 }
|
|
971 #endif
|
|
972
|
|
973 if (strcmp(subcommand, "cachedump") == 0) {
|
|
974
|
|
975 char *buf;
|
|
976 unsigned int bytes, id, limit = 0;
|
|
977
|
|
978 if(ntokens < 5) {
|
|
979 out_string(c, "CLIENT_ERROR bad command line");
|
|
980 return;
|
|
981 }
|
|
982
|
|
983 id = strtoul(tokens[2].value, NULL, 10);
|
|
984 limit = strtoul(tokens[3].value, NULL, 10);
|
|
985
|
|
986 if(errno == ERANGE) {
|
|
987 out_string(c, "CLIENT_ERROR bad command line format");
|
|
988 return;
|
|
989 }
|
|
990
|
|
991 buf = item_cachedump(id, limit, &bytes);
|
|
992 if (buf == 0) {
|
|
993 out_string(c, "SERVER_ERROR out of memory");
|
|
994 return;
|
|
995 }
|
|
996
|
|
997 c->write_and_free = buf;
|
|
998 c->wcurr = buf;
|
|
999 c->wbytes = bytes;
|
|
1000 conn_set_state(c, conn_write);
|
|
1001 c->write_and_go = conn_read;
|
|
1002 return;
|
|
1003 }
|
|
1004
|
|
1005 if (strcmp(subcommand, "slabs") == 0) {
|
|
1006 int bytes = 0;
|
|
1007 char *buf = slabs_stats(&bytes);
|
|
1008 if (!buf) {
|
|
1009 out_string(c, "SERVER_ERROR out of memory");
|
|
1010 return;
|
|
1011 }
|
|
1012 c->write_and_free = buf;
|
|
1013 c->wcurr = buf;
|
|
1014 c->wbytes = bytes;
|
|
1015 conn_set_state(c, conn_write);
|
|
1016 c->write_and_go = conn_read;
|
|
1017 return;
|
|
1018 }
|
|
1019
|
|
1020 if (strcmp(subcommand, "items") == 0) {
|
|
1021 char buffer[4096];
|
|
1022 item_stats(buffer, 4096);
|
|
1023 out_string(c, buffer);
|
|
1024 return;
|
|
1025 }
|
|
1026
|
|
1027 if (strcmp(subcommand, "detail") == 0) {
|
|
1028 if (ntokens < 4)
|
|
1029 process_stats_detail(c, ""); /* outputs the error message */
|
|
1030 else
|
|
1031 process_stats_detail(c, tokens[2].value);
|
|
1032 return;
|
|
1033 }
|
|
1034
|
|
1035 if (strcmp(subcommand, "sizes") == 0) {
|
|
1036 int bytes = 0;
|
|
1037 char *buf = item_stats_sizes(&bytes);
|
|
1038 if (! buf) {
|
|
1039 out_string(c, "SERVER_ERROR out of memory");
|
|
1040 return;
|
|
1041 }
|
|
1042
|
|
1043 c->write_and_free = buf;
|
|
1044 c->wcurr = buf;
|
|
1045 c->wbytes = bytes;
|
|
1046 conn_set_state(c, conn_write);
|
|
1047 c->write_and_go = conn_read;
|
|
1048 return;
|
|
1049 }
|
|
1050
|
|
1051 out_string(c, "ERROR");
|
|
1052 }
|
|
1053
|
|
1054 /* ntokens is overwritten here... shrug.. */
|
|
1055 static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) {
|
|
1056 char *key;
|
|
1057 size_t nkey;
|
|
1058 int i = 0;
|
|
1059 item *it;
|
|
1060 token_t *key_token = &tokens[KEY_TOKEN];
|
|
1061
|
|
1062 assert(c != NULL);
|
|
1063
|
|
1064 if (settings.managed) {
|
|
1065 int bucket = c->bucket;
|
|
1066 if (bucket == -1) {
|
|
1067 out_string(c, "CLIENT_ERROR no BG data in managed mode");
|
|
1068 return;
|
|
1069 }
|
|
1070 c->bucket = -1;
|
|
1071 if (buckets[bucket] != c->gen) {
|
|
1072 out_string(c, "ERROR_NOT_OWNER");
|
|
1073 return;
|
|
1074 }
|
|
1075 }
|
|
1076
|
|
1077 do {
|
|
1078 while(key_token->length != 0) {
|
|
1079
|
|
1080 key = key_token->value;
|
|
1081 nkey = key_token->length;
|
|
1082
|
|
1083 if(nkey > KEY_MAX_LENGTH) {
|
|
1084 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1085 return;
|
|
1086 }
|
|
1087
|
|
1088 STATS_LOCK();
|
|
1089 stats.get_cmds++;
|
|
1090 STATS_UNLOCK();
|
|
1091 it = item_get(key, nkey);
|
|
1092 if (settings.detail_enabled) {
|
|
1093 stats_prefix_record_get(key, NULL != it);
|
|
1094 }
|
|
1095 if (it) {
|
|
1096 if (i >= c->isize) {
|
|
1097 item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
|
|
1098 if (new_list) {
|
|
1099 c->isize *= 2;
|
|
1100 c->ilist = new_list;
|
|
1101 } else break;
|
|
1102 }
|
|
1103
|
|
1104 /*
|
|
1105 * Construct the response. Each hit adds three elements to the
|
|
1106 * outgoing data list:
|
|
1107 * "VALUE "
|
|
1108 * key
|
|
1109 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
|
|
1110 */
|
|
1111 if (add_iov(c, "VALUE ", 6) != 0 ||
|
|
1112 add_iov(c, ITEM_key(it), it->nkey) != 0 ||
|
|
1113 add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
|
|
1114 {
|
|
1115 break;
|
|
1116 }
|
|
1117 if (settings.verbose > 1)
|
|
1118 fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
|
|
1119
|
|
1120 /* item_get() has incremented it->refcount for us */
|
|
1121 STATS_LOCK();
|
|
1122 stats.get_hits++;
|
|
1123 STATS_UNLOCK();
|
|
1124 item_update(it);
|
|
1125 *(c->ilist + i) = it;
|
|
1126 i++;
|
|
1127
|
|
1128 } else {
|
|
1129 STATS_LOCK();
|
|
1130 stats.get_misses++;
|
|
1131 STATS_UNLOCK();
|
|
1132 }
|
|
1133
|
|
1134 key_token++;
|
|
1135 }
|
|
1136
|
|
1137 /*
|
|
1138 * If the command string hasn't been fully processed, get the next set
|
|
1139 * of tokens.
|
|
1140 */
|
|
1141 if(key_token->value != NULL) {
|
|
1142 ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
|
|
1143 key_token = tokens;
|
|
1144 }
|
|
1145
|
|
1146 } while(key_token->value != NULL);
|
|
1147
|
|
1148 c->icurr = c->ilist;
|
|
1149 c->ileft = i;
|
|
1150
|
|
1151 if (settings.verbose > 1)
|
|
1152 fprintf(stderr, ">%d END\n", c->sfd);
|
|
1153 add_iov(c, "END\r\n", 5);
|
|
1154
|
|
1155 if (c->udp && build_udp_headers(c) != 0) {
|
|
1156 out_string(c, "SERVER_ERROR out of memory");
|
|
1157 }
|
|
1158 else {
|
|
1159 conn_set_state(c, conn_mwrite);
|
|
1160 c->msgcurr = 0;
|
|
1161 }
|
|
1162 return;
|
|
1163 }
|
|
1164
|
|
1165 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm) {
|
|
1166 char *key;
|
|
1167 size_t nkey;
|
|
1168 int flags;
|
|
1169 time_t exptime;
|
|
1170 int vlen;
|
|
1171 item *it;
|
|
1172
|
|
1173 assert(c != NULL);
|
|
1174
|
|
1175 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
|
|
1176 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1177 return;
|
|
1178 }
|
|
1179
|
|
1180 key = tokens[KEY_TOKEN].value;
|
|
1181 nkey = tokens[KEY_TOKEN].length;
|
|
1182
|
|
1183 flags = strtoul(tokens[2].value, NULL, 10);
|
|
1184 exptime = strtol(tokens[3].value, NULL, 10);
|
|
1185 vlen = strtol(tokens[4].value, NULL, 10);
|
|
1186
|
|
1187 if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
|
|
1188 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1189 return;
|
|
1190 }
|
|
1191
|
|
1192 if (settings.detail_enabled) {
|
|
1193 stats_prefix_record_set(key);
|
|
1194 }
|
|
1195
|
|
1196 if (settings.managed) {
|
|
1197 int bucket = c->bucket;
|
|
1198 if (bucket == -1) {
|
|
1199 out_string(c, "CLIENT_ERROR no BG data in managed mode");
|
|
1200 return;
|
|
1201 }
|
|
1202 c->bucket = -1;
|
|
1203 if (buckets[bucket] != c->gen) {
|
|
1204 out_string(c, "ERROR_NOT_OWNER");
|
|
1205 return;
|
|
1206 }
|
|
1207 }
|
|
1208
|
|
1209 it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
|
|
1210
|
|
1211 if (it == 0) {
|
|
1212 if (! item_size_ok(nkey, flags, vlen + 2))
|
|
1213 out_string(c, "SERVER_ERROR object too large for cache");
|
|
1214 else
|
|
1215 out_string(c, "SERVER_ERROR out of memory");
|
|
1216 /* swallow the data line */
|
|
1217 c->write_and_go = conn_swallow;
|
|
1218 c->sbytes = vlen + 2;
|
|
1219 return;
|
|
1220 }
|
|
1221
|
|
1222 c->item_comm = comm;
|
|
1223 c->item = it;
|
|
1224 c->ritem = ITEM_data(it);
|
|
1225 c->rlbytes = it->nbytes;
|
|
1226 conn_set_state(c, conn_nread);
|
|
1227 }
|
|
1228
|
|
1229 static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) {
|
|
1230 char temp[32];
|
|
1231 item *it;
|
|
1232 unsigned int delta;
|
|
1233 char *key;
|
|
1234 size_t nkey;
|
|
1235
|
|
1236 assert(c != NULL);
|
|
1237
|
|
1238 if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
|
|
1239 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1240 return;
|
|
1241 }
|
|
1242
|
|
1243 key = tokens[KEY_TOKEN].value;
|
|
1244 nkey = tokens[KEY_TOKEN].length;
|
|
1245
|
|
1246 if (settings.managed) {
|
|
1247 int bucket = c->bucket;
|
|
1248 if (bucket == -1) {
|
|
1249 out_string(c, "CLIENT_ERROR no BG data in managed mode");
|
|
1250 return;
|
|
1251 }
|
|
1252 c->bucket = -1;
|
|
1253 if (buckets[bucket] != c->gen) {
|
|
1254 out_string(c, "ERROR_NOT_OWNER");
|
|
1255 return;
|
|
1256 }
|
|
1257 }
|
|
1258
|
|
1259 delta = strtoul(tokens[2].value, NULL, 10);
|
|
1260
|
|
1261 if(errno == ERANGE) {
|
|
1262 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1263 return;
|
|
1264 }
|
|
1265
|
|
1266 it = item_get(key, nkey);
|
|
1267 if (!it) {
|
|
1268 out_string(c, "NOT_FOUND");
|
|
1269 return;
|
|
1270 }
|
|
1271
|
|
1272 out_string(c, add_delta(it, incr, delta, temp));
|
|
1273 item_remove(it); /* release our reference */
|
|
1274 }
|
|
1275
|
|
1276 /*
|
|
1277 * adds a delta value to a numeric item.
|
|
1278 *
|
|
1279 * it item to adjust
|
|
1280 * incr true to increment value, false to decrement
|
|
1281 * delta amount to adjust value by
|
|
1282 * buf buffer for response string
|
|
1283 *
|
|
1284 * returns a response string to send back to the client.
|
|
1285 */
|
|
1286 char *do_add_delta(item *it, const int incr, unsigned int delta, char *buf) {
|
|
1287 char *ptr;
|
|
1288 unsigned int value;
|
|
1289 int res;
|
|
1290
|
|
1291 ptr = ITEM_data(it);
|
|
1292 while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++; // BUG: can't be true
|
|
1293
|
|
1294 value = strtol(ptr, NULL, 10);
|
|
1295
|
|
1296 if(errno == ERANGE) {
|
|
1297 return "CLIENT_ERROR cannot increment or decrement non-numeric value";
|
|
1298 }
|
|
1299
|
|
1300 if (incr != 0)
|
|
1301 value += delta;
|
|
1302 else {
|
|
1303 if (delta >= value) value = 0;
|
|
1304 else value -= delta;
|
|
1305 }
|
|
1306 snprintf(buf, 32, "%u", value);
|
|
1307 res = strlen(buf);
|
|
1308 if (res + 2 > it->nbytes) { /* need to realloc */
|
|
1309 item *new_it;
|
|
1310 new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
|
|
1311 if (new_it == 0) {
|
|
1312 return "SERVER_ERROR out of memory";
|
|
1313 }
|
|
1314 memcpy(ITEM_data(new_it), buf, res);
|
|
1315 memcpy(ITEM_data(new_it) + res, "\r\n", 3);
|
|
1316 do_item_replace(it, new_it);
|
|
1317 do_item_remove(new_it); /* release our reference */
|
|
1318 } else { /* replace in-place */
|
|
1319 memcpy(ITEM_data(it), buf, res);
|
|
1320 memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
|
|
1321 }
|
|
1322
|
|
1323 return buf;
|
|
1324 }
|
|
1325
|
|
1326 static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
|
|
1327 char *key;
|
|
1328 size_t nkey;
|
|
1329 item *it;
|
|
1330 time_t exptime = 0;
|
|
1331
|
|
1332 assert(c != NULL);
|
|
1333
|
|
1334 if (settings.managed) {
|
|
1335 int bucket = c->bucket;
|
|
1336 if (bucket == -1) {
|
|
1337 out_string(c, "CLIENT_ERROR no BG data in managed mode");
|
|
1338 return;
|
|
1339 }
|
|
1340 c->bucket = -1;
|
|
1341 if (buckets[bucket] != c->gen) {
|
|
1342 out_string(c, "ERROR_NOT_OWNER");
|
|
1343 return;
|
|
1344 }
|
|
1345 }
|
|
1346
|
|
1347 key = tokens[KEY_TOKEN].value;
|
|
1348 nkey = tokens[KEY_TOKEN].length;
|
|
1349
|
|
1350 if(nkey > KEY_MAX_LENGTH) {
|
|
1351 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1352 return;
|
|
1353 }
|
|
1354
|
|
1355 if(ntokens == 4) {
|
|
1356 exptime = strtol(tokens[2].value, NULL, 10);
|
|
1357
|
|
1358 if(errno == ERANGE) {
|
|
1359 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1360 return;
|
|
1361 }
|
|
1362 }
|
|
1363
|
|
1364 if (settings.detail_enabled) {
|
|
1365 stats_prefix_record_delete(key);
|
|
1366 }
|
|
1367
|
|
1368 it = item_get(key, nkey);
|
|
1369 if (it) {
|
|
1370 if (exptime == 0) {
|
|
1371 item_unlink(it);
|
|
1372 item_remove(it); /* release our reference */
|
|
1373 out_string(c, "DELETED");
|
|
1374 } else {
|
|
1375 /* our reference will be transfered to the delete queue */
|
|
1376 out_string(c, defer_delete(it, exptime));
|
|
1377 }
|
|
1378 } else {
|
|
1379 out_string(c, "NOT_FOUND");
|
|
1380 }
|
|
1381 }
|
|
1382
|
|
1383 /*
|
|
1384 * Adds an item to the deferred-delete list so it can be reaped later.
|
|
1385 *
|
|
1386 * Returns the result to send to the client.
|
|
1387 */
|
|
1388 char *do_defer_delete(item *it, time_t exptime)
|
|
1389 {
|
|
1390 if (delcurr >= deltotal) {
|
|
1391 item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
|
|
1392 if (new_delete) {
|
|
1393 todelete = new_delete;
|
|
1394 deltotal *= 2;
|
|
1395 } else {
|
|
1396 /*
|
|
1397 * can't delete it immediately, user wants a delay,
|
|
1398 * but we ran out of memory for the delete queue
|
|
1399 */
|
|
1400 item_remove(it); /* release reference */
|
|
1401 return "SERVER_ERROR out of memory";
|
|
1402 }
|
|
1403 }
|
|
1404
|
|
1405 /* use its expiration time as its deletion time now */
|
|
1406 it->exptime = realtime(exptime);
|
|
1407 it->it_flags |= ITEM_DELETED;
|
|
1408 todelete[delcurr++] = it;
|
|
1409
|
|
1410 return "DELETED";
|
|
1411 }
|
|
1412
|
|
1413 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
|
|
1414 unsigned int level;
|
|
1415
|
|
1416 assert(c != NULL);
|
|
1417
|
|
1418 level = strtoul(tokens[1].value, NULL, 10);
|
|
1419 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
|
|
1420 out_string(c, "OK");
|
|
1421 return;
|
|
1422 }
|
|
1423
|
|
1424 static void process_command(conn *c, char *command) {
|
|
1425
|
|
1426 token_t tokens[MAX_TOKENS];
|
|
1427 size_t ntokens;
|
|
1428 int comm;
|
|
1429
|
|
1430 assert(c != NULL);
|
|
1431
|
|
1432 if (settings.verbose > 1)
|
|
1433 fprintf(stderr, "<%d %s\n", c->sfd, command);
|
|
1434
|
|
1435 /*
|
|
1436 * for commands set/add/replace, we build an item and read the data
|
|
1437 * directly into it, then continue in nread_complete().
|
|
1438 */
|
|
1439
|
|
1440 c->msgcurr = 0;
|
|
1441 c->msgused = 0;
|
|
1442 c->iovused = 0;
|
|
1443 if (add_msghdr(c) != 0) {
|
|
1444 out_string(c, "SERVER_ERROR out of memory");
|
|
1445 return;
|
|
1446 }
|
|
1447
|
|
1448 ntokens = tokenize_command(command, tokens, MAX_TOKENS);
|
|
1449
|
|
1450 if (ntokens >= 3 &&
|
|
1451 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
|
|
1452 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
|
|
1453
|
|
1454 process_get_command(c, tokens, ntokens);
|
|
1455
|
|
1456 } else if (ntokens == 6 &&
|
|
1457 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
|
|
1458 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
|
|
1459 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
|
|
1460
|
|
1461 process_update_command(c, tokens, ntokens, comm);
|
|
1462
|
|
1463 } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
|
|
1464
|
|
1465 process_arithmetic_command(c, tokens, ntokens, 1);
|
|
1466
|
|
1467 } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
|
|
1468
|
|
1469 process_arithmetic_command(c, tokens, ntokens, 0);
|
|
1470
|
|
1471 } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
|
|
1472
|
|
1473 process_delete_command(c, tokens, ntokens);
|
|
1474
|
|
1475 } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
|
|
1476 unsigned int bucket, gen;
|
|
1477 if (!settings.managed) {
|
|
1478 out_string(c, "CLIENT_ERROR not a managed instance");
|
|
1479 return;
|
|
1480 }
|
|
1481
|
|
1482 if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
|
|
1483 if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
|
|
1484 out_string(c, "CLIENT_ERROR bucket number out of range");
|
|
1485 return;
|
|
1486 }
|
|
1487 buckets[bucket] = gen;
|
|
1488 out_string(c, "OWNED");
|
|
1489 return;
|
|
1490 } else {
|
|
1491 out_string(c, "CLIENT_ERROR bad format");
|
|
1492 return;
|
|
1493 }
|
|
1494
|
|
1495 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
|
|
1496
|
|
1497 int bucket;
|
|
1498 if (!settings.managed) {
|
|
1499 out_string(c, "CLIENT_ERROR not a managed instance");
|
|
1500 return;
|
|
1501 }
|
|
1502 if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
|
|
1503 if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
|
|
1504 out_string(c, "CLIENT_ERROR bucket number out of range");
|
|
1505 return;
|
|
1506 }
|
|
1507 buckets[bucket] = 0;
|
|
1508 out_string(c, "DISOWNED");
|
|
1509 return;
|
|
1510 } else {
|
|
1511 out_string(c, "CLIENT_ERROR bad format");
|
|
1512 return;
|
|
1513 }
|
|
1514
|
|
1515 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
|
|
1516 int bucket, gen;
|
|
1517 if (!settings.managed) {
|
|
1518 out_string(c, "CLIENT_ERROR not a managed instance");
|
|
1519 return;
|
|
1520 }
|
|
1521 if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
|
|
1522 /* we never write anything back, even if input's wrong */
|
|
1523 if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
|
|
1524 /* do nothing, bad input */
|
|
1525 } else {
|
|
1526 c->bucket = bucket;
|
|
1527 c->gen = gen;
|
|
1528 }
|
|
1529 conn_set_state(c, conn_read);
|
|
1530 return;
|
|
1531 } else {
|
|
1532 out_string(c, "CLIENT_ERROR bad format");
|
|
1533 return;
|
|
1534 }
|
|
1535
|
|
1536 } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
|
|
1537
|
|
1538 process_stat(c, tokens, ntokens);
|
|
1539
|
|
1540 } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
|
|
1541 time_t exptime = 0;
|
|
1542 set_current_time();
|
|
1543
|
|
1544 if(ntokens == 2) {
|
|
1545 settings.oldest_live = current_time - 1;
|
|
1546 item_flush_expired();
|
|
1547 out_string(c, "OK");
|
|
1548 return;
|
|
1549 }
|
|
1550
|
|
1551 exptime = strtol(tokens[1].value, NULL, 10);
|
|
1552 if(errno == ERANGE) {
|
|
1553 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1554 return;
|
|
1555 }
|
|
1556
|
|
1557 settings.oldest_live = realtime(exptime) - 1;
|
|
1558 item_flush_expired();
|
|
1559 out_string(c, "OK");
|
|
1560 return;
|
|
1561
|
|
1562 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
|
|
1563
|
|
1564 out_string(c, "VERSION " VERSION);
|
|
1565
|
|
1566 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
|
|
1567
|
|
1568 conn_set_state(c, conn_closing);
|
|
1569
|
|
1570 } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
|
|
1571 strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
|
|
1572 #ifdef ALLOW_SLABS_REASSIGN
|
|
1573
|
|
1574 int src, dst, rv;
|
|
1575
|
|
1576 src = strtol(tokens[2].value, NULL, 10);
|
|
1577 dst = strtol(tokens[3].value, NULL, 10);
|
|
1578
|
|
1579 if(errno == ERANGE) {
|
|
1580 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1581 return;
|
|
1582 }
|
|
1583
|
|
1584 rv = slabs_reassign(src, dst);
|
|
1585 if (rv == 1) {
|
|
1586 out_string(c, "DONE");
|
|
1587 return;
|
|
1588 }
|
|
1589 if (rv == 0) {
|
|
1590 out_string(c, "CANT");
|
|
1591 return;
|
|
1592 }
|
|
1593 if (rv == -1) {
|
|
1594 out_string(c, "BUSY");
|
|
1595 return;
|
|
1596 }
|
|
1597 #else
|
|
1598 out_string(c, "CLIENT_ERROR Slab reassignment not supported");
|
|
1599 #endif
|
|
1600 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
|
|
1601 process_verbosity_command(c, tokens, ntokens);
|
|
1602 } else {
|
|
1603 out_string(c, "ERROR");
|
|
1604 }
|
|
1605 return;
|
|
1606 }
|
|
1607
|
|
1608 /*
|
|
1609 * if we have a complete line in the buffer, process it.
|
|
1610 */
|
|
1611 static int try_read_command(conn *c) {
|
|
1612 char *el, *cont;
|
|
1613
|
|
1614 assert(c != NULL);
|
|
1615 assert(c->rcurr <= (c->rbuf + c->rsize));
|
|
1616
|
|
1617 if (c->rbytes == 0)
|
|
1618 return 0;
|
|
1619 el = memchr(c->rcurr, '\n', c->rbytes);
|
|
1620 if (!el)
|
|
1621 return 0;
|
|
1622 cont = el + 1;
|
|
1623 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
|
|
1624 el--;
|
|
1625 }
|
|
1626 *el = '\0';
|
|
1627
|
|
1628 assert(cont <= (c->rcurr + c->rbytes));
|
|
1629
|
|
1630 process_command(c, c->rcurr);
|
|
1631
|
|
1632 c->rbytes -= (cont - c->rcurr);
|
|
1633 c->rcurr = cont;
|
|
1634
|
|
1635 assert(c->rcurr <= (c->rbuf + c->rsize));
|
|
1636
|
|
1637 return 1;
|
|
1638 }
|
|
1639
|
|
1640 /*
|
|
1641 * read a UDP request.
|
|
1642 * return 0 if there's nothing to read.
|
|
1643 */
|
|
1644 static int try_read_udp(conn *c) {
|
|
1645 int res;
|
|
1646
|
|
1647 assert(c != NULL);
|
|
1648
|
|
1649 c->request_addr_size = sizeof(c->request_addr);
|
|
1650 res = recvfrom(c->sfd, c->rbuf, c->rsize,
|
|
1651 0, &c->request_addr, &c->request_addr_size);
|
|
1652 if (res > 8) {
|
|
1653 unsigned char *buf = (unsigned char *)c->rbuf;
|
|
1654 STATS_LOCK();
|
|
1655 stats.bytes_read += res;
|
|
1656 STATS_UNLOCK();
|
|
1657
|
|
1658 /* Beginning of UDP packet is the request ID; save it. */
|
|
1659 c->request_id = buf[0] * 256 + buf[1];
|
|
1660
|
|
1661 /* If this is a multi-packet request, drop it. */
|
|
1662 if (buf[4] != 0 || buf[5] != 1) {
|
|
1663 out_string(c, "SERVER_ERROR multi-packet request not supported");
|
|
1664 return 0;
|
|
1665 }
|
|
1666
|
|
1667 /* Don't care about any of the rest of the header. */
|
|
1668 res -= 8;
|
|
1669 memmove(c->rbuf, c->rbuf + 8, res);
|
|
1670
|
|
1671 c->rbytes += res;
|
|
1672 c->rcurr = c->rbuf;
|
|
1673 return 1;
|
|
1674 }
|
|
1675 return 0;
|
|
1676 }
|
|
1677
|
|
1678 /*
|
|
1679 * read from network as much as we can, handle buffer overflow and connection
|
|
1680 * close.
|
|
1681 * before reading, move the remaining incomplete fragment of a command
|
|
1682 * (if any) to the beginning of the buffer.
|
|
1683 * return 0 if there's nothing to read on the first read.
|
|
1684 */
|
|
1685 static int try_read_network(conn *c) {
|
|
1686 int gotdata = 0;
|
|
1687 int res;
|
|
1688
|
|
1689 assert(c != NULL);
|
|
1690
|
|
1691 if (c->rcurr != c->rbuf) {
|
|
1692 if (c->rbytes != 0) /* otherwise there's nothing to copy */
|
|
1693 memmove(c->rbuf, c->rcurr, c->rbytes);
|
|
1694 c->rcurr = c->rbuf;
|
|
1695 }
|
|
1696
|
|
1697 while (1) {
|
|
1698 if (c->rbytes >= c->rsize) {
|
|
1699 char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
|
|
1700 if (!new_rbuf) {
|
|
1701 if (settings.verbose > 0)
|
|
1702 fprintf(stderr, "Couldn't realloc input buffer\n");
|
|
1703 c->rbytes = 0; /* ignore what we read */
|
|
1704 out_string(c, "SERVER_ERROR out of memory");
|
|
1705 c->write_and_go = conn_closing;
|
|
1706 return 1;
|
|
1707 }
|
|
1708 c->rcurr = c->rbuf = new_rbuf;
|
|
1709 c->rsize *= 2;
|
|
1710 }
|
|
1711
|
|
1712 /* unix socket mode doesn't need this, so zeroed out. but why
|
|
1713 * is this done for every command? presumably for UDP
|
|
1714 * mode. */
|
|
1715 if (!settings.socketpath) {
|
|
1716 c->request_addr_size = sizeof(c->request_addr);
|
|
1717 } else {
|
|
1718 c->request_addr_size = 0;
|
|
1719 }
|
|
1720
|
|
1721 res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
|
|
1722 if (res > 0) {
|
|
1723 STATS_LOCK();
|
|
1724 stats.bytes_read += res;
|
|
1725 STATS_UNLOCK();
|
|
1726 gotdata = 1;
|
|
1727 c->rbytes += res;
|
|
1728 continue;
|
|
1729 }
|
|
1730 if (res == 0) {
|
|
1731 /* connection closed */
|
|
1732 conn_set_state(c, conn_closing);
|
|
1733 return 1;
|
|
1734 }
|
|
1735 if (res == -1) {
|
|
1736 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
|
|
1737 else return 0;
|
|
1738 }
|
|
1739 }
|
|
1740 return gotdata;
|
|
1741 }
|
|
1742
|
|
1743 static bool update_event(conn *c, const int new_flags) {
|
|
1744 assert(c != NULL);
|
|
1745
|
|
1746 struct event_base *base = c->event.ev_base;
|
|
1747 if (c->ev_flags == new_flags)
|
|
1748 return true;
|
|
1749 if (event_del(&c->event) == -1) return false;
|
|
1750 event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
|
|
1751 event_base_set(base, &c->event);
|
|
1752 c->ev_flags = new_flags;
|
|
1753 if (event_add(&c->event, 0) == -1) return false;
|
|
1754 return true;
|
|
1755 }
|
|
1756
|
|
1757 /*
|
|
1758 * Sets whether we are listening for new connections or not.
|
|
1759 */
|
|
1760 void accept_new_conns(const bool do_accept) {
|
|
1761 if (! is_listen_thread())
|
|
1762 return;
|
|
1763 if (do_accept) {
|
|
1764 update_event(listen_conn, EV_READ | EV_PERSIST);
|
|
1765 if (listen(listen_conn->sfd, 1024) != 0) {
|
|
1766 perror("listen");
|
|
1767 }
|
|
1768 }
|
|
1769 else {
|
|
1770 update_event(listen_conn, 0);
|
|
1771 if (listen(listen_conn->sfd, 0) != 0) {
|
|
1772 perror("listen");
|
|
1773 }
|
|
1774 }
|
|
1775 }
|
|
1776
|
|
1777
|
|
1778 /*
|
|
1779 * Transmit the next chunk of data from our list of msgbuf structures.
|
|
1780 *
|
|
1781 * Returns:
|
|
1782 * TRANSMIT_COMPLETE All done writing.
|
|
1783 * TRANSMIT_INCOMPLETE More data remaining to write.
|
|
1784 * TRANSMIT_SOFT_ERROR Can't write any more right now.
|
|
1785 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
|
|
1786 */
|
|
1787 static int transmit(conn *c) {
|
|
1788 assert(c != NULL);
|
|
1789
|
|
1790 if (c->msgcurr < c->msgused &&
|
|
1791 c->msglist[c->msgcurr].msg_iovlen == 0) {
|
|
1792 /* Finished writing the current msg; advance to the next. */
|
|
1793 c->msgcurr++;
|
|
1794 }
|
|
1795 if (c->msgcurr < c->msgused) {
|
|
1796 ssize_t res;
|
|
1797 struct msghdr *m = &c->msglist[c->msgcurr];
|
|
1798
|
|
1799 res = sendmsg(c->sfd, m, 0);
|
|
1800 if (res > 0) {
|
|
1801 STATS_LOCK();
|
|
1802 stats.bytes_written += res;
|
|
1803 STATS_UNLOCK();
|
|
1804
|
|
1805 /* We've written some of the data. Remove the completed
|
|
1806 iovec entries from the list of pending writes. */
|
|
1807 while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
|
|
1808 res -= m->msg_iov->iov_len;
|
|
1809 m->msg_iovlen--;
|
|
1810 m->msg_iov++;
|
|
1811 }
|
|
1812
|
|
1813 /* Might have written just part of the last iovec entry;
|
|
1814 adjust it so the next write will do the rest. */
|
|
1815 if (res > 0) {
|
|
1816 m->msg_iov->iov_base += res;
|
|
1817 m->msg_iov->iov_len -= res;
|
|
1818 }
|
|
1819 return TRANSMIT_INCOMPLETE;
|
|
1820 }
|
|
1821 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
|
1822 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
|
|
1823 if (settings.verbose > 0)
|
|
1824 fprintf(stderr, "Couldn't update event\n");
|
|
1825 conn_set_state(c, conn_closing);
|
|
1826 return TRANSMIT_HARD_ERROR;
|
|
1827 }
|
|
1828 return TRANSMIT_SOFT_ERROR;
|
|
1829 }
|
|
1830 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
|
|
1831 we have a real error, on which we close the connection */
|
|
1832 if (settings.verbose > 0)
|
|
1833 perror("Failed to write, and not due to blocking");
|
|
1834
|
|
1835 if (c->udp)
|
|
1836 conn_set_state(c, conn_read);
|
|
1837 else
|
|
1838 conn_set_state(c, conn_closing);
|
|
1839 return TRANSMIT_HARD_ERROR;
|
|
1840 } else {
|
|
1841 return TRANSMIT_COMPLETE;
|
|
1842 }
|
|
1843 }
|
|
1844
|
|
1845 static void drive_machine(conn *c) {
|
|
1846 bool stop = false;
|
|
1847 int sfd, flags = 1;
|
|
1848 socklen_t addrlen;
|
|
1849 struct sockaddr addr;
|
|
1850 int res;
|
|
1851
|
|
1852 assert(c != NULL);
|
|
1853
|
|
1854 while (!stop) {
|
|
1855
|
|
1856 switch(c->state) {
|
|
1857 case conn_listening:
|
|
1858 addrlen = sizeof(addr);
|
|
1859 if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
|
|
1860 if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
1861 /* these are transient, so don't log anything */
|
|
1862 stop = true;
|
|
1863 } else if (errno == EMFILE) {
|
|
1864 if (settings.verbose > 0)
|
|
1865 fprintf(stderr, "Too many open connections\n");
|
|
1866 accept_new_conns(false);
|
|
1867 stop = true;
|
|
1868 } else {
|
|
1869 perror("accept()");
|
|
1870 stop = true;
|
|
1871 }
|
|
1872 break;
|
|
1873 }
|
|
1874 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
|
|
1875 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
|
|
1876 perror("setting O_NONBLOCK");
|
|
1877 close(sfd);
|
|
1878 break;
|
|
1879 }
|
|
1880 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
|
|
1881 DATA_BUFFER_SIZE, false);
|
|
1882 break;
|
|
1883
|
|
1884 case conn_read:
|
|
1885 if (try_read_command(c) != 0) {
|
|
1886 continue;
|
|
1887 }
|
|
1888 if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
|
|
1889 continue;
|
|
1890 }
|
|
1891 /* we have no command line and no data to read from network */
|
|
1892 if (!update_event(c, EV_READ | EV_PERSIST)) {
|
|
1893 if (settings.verbose > 0)
|
|
1894 fprintf(stderr, "Couldn't update event\n");
|
|
1895 conn_set_state(c, conn_closing);
|
|
1896 break;
|
|
1897 }
|
|
1898 stop = true;
|
|
1899 break;
|
|
1900
|
|
1901 case conn_nread:
|
|
1902 /* we are reading rlbytes into ritem; */
|
|
1903 if (c->rlbytes == 0) {
|
|
1904 complete_nread(c);
|
|
1905 break;
|
|
1906 }
|
|
1907 /* first check if we have leftovers in the conn_read buffer */
|
|
1908 if (c->rbytes > 0) {
|
|
1909 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
|
|
1910 memcpy(c->ritem, c->rcurr, tocopy);
|
|
1911 c->ritem += tocopy;
|
|
1912 c->rlbytes -= tocopy;
|
|
1913 c->rcurr += tocopy;
|
|
1914 c->rbytes -= tocopy;
|
|
1915 break;
|
|
1916 }
|
|
1917
|
|
1918 /* now try reading from the socket */
|
|
1919 res = read(c->sfd, c->ritem, c->rlbytes);
|
|
1920 if (res > 0) {
|
|
1921 STATS_LOCK();
|
|
1922 stats.bytes_read += res;
|
|
1923 STATS_UNLOCK();
|
|
1924 c->ritem += res;
|
|
1925 c->rlbytes -= res;
|
|
1926 break;
|
|
1927 }
|
|
1928 if (res == 0) { /* end of stream */
|
|
1929 conn_set_state(c, conn_closing);
|
|
1930 break;
|
|
1931 }
|
|
1932 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
|
1933 if (!update_event(c, EV_READ | EV_PERSIST)) {
|
|
1934 if (settings.verbose > 0)
|
|
1935 fprintf(stderr, "Couldn't update event\n");
|
|
1936 conn_set_state(c, conn_closing);
|
|
1937 break;
|
|
1938 }
|
|
1939 stop = true;
|
|
1940 break;
|
|
1941 }
|
|
1942 /* otherwise we have a real error, on which we close the connection */
|
|
1943 if (settings.verbose > 0)
|
|
1944 fprintf(stderr, "Failed to read, and not due to blocking\n");
|
|
1945 conn_set_state(c, conn_closing);
|
|
1946 break;
|
|
1947
|
|
1948 case conn_swallow:
|
|
1949 /* we are reading sbytes and throwing them away */
|
|
1950 if (c->sbytes == 0) {
|
|
1951 conn_set_state(c, conn_read);
|
|
1952 break;
|
|
1953 }
|
|
1954
|
|
1955 /* first check if we have leftovers in the conn_read buffer */
|
|
1956 if (c->rbytes > 0) {
|
|
1957 int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
|
|
1958 c->sbytes -= tocopy;
|
|
1959 c->rcurr += tocopy;
|
|
1960 c->rbytes -= tocopy;
|
|
1961 break;
|
|
1962 }
|
|
1963
|
|
1964 /* now try reading from the socket */
|
|
1965 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
|
|
1966 if (res > 0) {
|
|
1967 STATS_LOCK();
|
|
1968 stats.bytes_read += res;
|
|
1969 STATS_UNLOCK();
|
|
1970 c->sbytes -= res;
|
|
1971 break;
|
|
1972 }
|
|
1973 if (res == 0) { /* end of stream */
|
|
1974 conn_set_state(c, conn_closing);
|
|
1975 break;
|
|
1976 }
|
|
1977 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
|
1978 if (!update_event(c, EV_READ | EV_PERSIST)) {
|
|
1979 if (settings.verbose > 0)
|
|
1980 fprintf(stderr, "Couldn't update event\n");
|
|
1981 conn_set_state(c, conn_closing);
|
|
1982 break;
|
|
1983 }
|
|
1984 stop = true;
|
|
1985 break;
|
|
1986 }
|
|
1987 /* otherwise we have a real error, on which we close the connection */
|
|
1988 if (settings.verbose > 0)
|
|
1989 fprintf(stderr, "Failed to read, and not due to blocking\n");
|
|
1990 conn_set_state(c, conn_closing);
|
|
1991 break;
|
|
1992
|
|
1993 case conn_write:
|
|
1994 /*
|
|
1995 * We want to write out a simple response. If we haven't already,
|
|
1996 * assemble it into a msgbuf list (this will be a single-entry
|
|
1997 * list for TCP or a two-entry list for UDP).
|
|
1998 */
|
|
1999 if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
|
|
2000 if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
|
|
2001 (c->udp && build_udp_headers(c) != 0)) {
|
|
2002 if (settings.verbose > 0)
|
|
2003 fprintf(stderr, "Couldn't build response\n");
|
|
2004 conn_set_state(c, conn_closing);
|
|
2005 break;
|
|
2006 }
|
|
2007 }
|
|
2008
|
|
2009 /* fall through... */
|
|
2010
|
|
2011 case conn_mwrite:
|
|
2012 switch (transmit(c)) {
|
|
2013 case TRANSMIT_COMPLETE:
|
|
2014 if (c->state == conn_mwrite) {
|
|
2015 while (c->ileft > 0) {
|
|
2016 item *it = *(c->icurr);
|
|
2017 assert((it->it_flags & ITEM_SLABBED) == 0);
|
|
2018 item_remove(it);
|
|
2019 c->icurr++;
|
|
2020 c->ileft--;
|
|
2021 }
|
|
2022 conn_set_state(c, conn_read);
|
|
2023 } else if (c->state == conn_write) {
|
|
2024 if (c->write_and_free) {
|
|
2025 free(c->write_and_free);
|
|
2026 c->write_and_free = 0;
|
|
2027 }
|
|
2028 conn_set_state(c, c->write_and_go);
|
|
2029 } else {
|
|
2030 if (settings.verbose > 0)
|
|
2031 fprintf(stderr, "Unexpected state %d\n", c->state);
|
|
2032 conn_set_state(c, conn_closing);
|
|
2033 }
|
|
2034 break;
|
|
2035
|
|
2036 case TRANSMIT_INCOMPLETE:
|
|
2037 case TRANSMIT_HARD_ERROR:
|
|
2038 break; /* Continue in state machine. */
|
|
2039
|
|
2040 case TRANSMIT_SOFT_ERROR:
|
|
2041 stop = true;
|
|
2042 break;
|
|
2043 }
|
|
2044 break;
|
|
2045
|
|
2046 case conn_closing:
|
|
2047 if (c->udp)
|
|
2048 conn_cleanup(c);
|
|
2049 else
|
|
2050 conn_close(c);
|
|
2051 stop = true;
|
|
2052 break;
|
|
2053 }
|
|
2054 }
|
|
2055
|
|
2056 return;
|
|
2057 }
|
|
2058
|
|
2059 void event_handler(const int fd, const short which, void *arg) {
|
|
2060 conn *c;
|
|
2061
|
|
2062 c = (conn *)arg;
|
|
2063 assert(c != NULL);
|
|
2064
|
|
2065 c->which = which;
|
|
2066
|
|
2067 /* sanity */
|
|
2068 if (fd != c->sfd) {
|
|
2069 if (settings.verbose > 0)
|
|
2070 fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
|
|
2071 conn_close(c);
|
|
2072 return;
|
|
2073 }
|
|
2074
|
|
2075 drive_machine(c);
|
|
2076
|
|
2077 /* wait for next event */
|
|
2078 return;
|
|
2079 }
|
|
2080
|
|
2081 static int new_socket(const bool is_udp) {
|
|
2082 int sfd;
|
|
2083 int flags;
|
|
2084
|
|
2085 if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
|
|
2086 perror("socket()");
|
|
2087 return -1;
|
|
2088 }
|
|
2089
|
|
2090 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
|
|
2091 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
|
|
2092 perror("setting O_NONBLOCK");
|
|
2093 close(sfd);
|
|
2094 return -1;
|
|
2095 }
|
|
2096 return sfd;
|
|
2097 }
|
|
2098
|
|
2099
|
|
2100 /*
|
|
2101 * Sets a socket's send buffer size to the maximum allowed by the system.
|
|
2102 */
|
|
2103 static void maximize_sndbuf(const int sfd) {
|
|
2104 socklen_t intsize = sizeof(int);
|
|
2105 int last_good = 0;
|
|
2106 int min, max, avg;
|
|
2107 int old_size;
|
|
2108
|
|
2109 /* Start with the default size. */
|
|
2110 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
|
|
2111 if (settings.verbose > 0)
|
|
2112 perror("getsockopt(SO_SNDBUF)");
|
|
2113 return;
|
|
2114 }
|
|
2115
|
|
2116 /* Binary-search for the real maximum. */
|
|
2117 min = old_size;
|
|
2118 max = MAX_SENDBUF_SIZE;
|
|
2119
|
|
2120 while (min <= max) {
|
|
2121 avg = ((unsigned int)(min + max)) / 2;
|
|
2122 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
|
|
2123 last_good = avg;
|
|
2124 min = avg + 1;
|
|
2125 } else {
|
|
2126 max = avg - 1;
|
|
2127 }
|
|
2128 }
|
|
2129
|
|
2130 if (settings.verbose > 1)
|
|
2131 fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
|
|
2132 }
|
|
2133
|
|
2134
|
|
2135 static int server_socket(const int port, const bool is_udp) {
|
|
2136 int sfd;
|
|
2137 struct linger ling = {0, 0};
|
|
2138 struct sockaddr_in addr;
|
|
2139 int flags =1;
|
|
2140
|
|
2141 if ((sfd = new_socket(is_udp)) == -1) {
|
|
2142 return -1;
|
|
2143 }
|
|
2144
|
|
2145 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
|
|
2146 if (is_udp) {
|
|
2147 maximize_sndbuf(sfd);
|
|
2148 } else {
|
|
2149 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
|
|
2150 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
|
|
2151 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
|
|
2152 }
|
|
2153
|
|
2154 /*
|
|
2155 * the memset call clears nonstandard fields in some impementations
|
|
2156 * that otherwise mess things up.
|
|
2157 */
|
|
2158 memset(&addr, 0, sizeof(addr));
|
|
2159
|
|
2160 addr.sin_family = AF_INET;
|
|
2161 addr.sin_port = htons(port);
|
|
2162 addr.sin_addr = settings.interf;
|
|
2163 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
|
2164 perror("bind()");
|
|
2165 close(sfd);
|
|
2166 return -1;
|
|
2167 }
|
|
2168 if (!is_udp && listen(sfd, 1024) == -1) {
|
|
2169 perror("listen()");
|
|
2170 close(sfd);
|
|
2171 return -1;
|
|
2172 }
|
|
2173 return sfd;
|
|
2174 }
|
|
2175
|
|
2176 static int new_socket_unix(void) {
|
|
2177 int sfd;
|
|
2178 int flags;
|
|
2179
|
|
2180 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
|
|
2181 perror("socket()");
|
|
2182 return -1;
|
|
2183 }
|
|
2184
|
|
2185 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
|
|
2186 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
|
|
2187 perror("setting O_NONBLOCK");
|
|
2188 close(sfd);
|
|
2189 return -1;
|
|
2190 }
|
|
2191 return sfd;
|
|
2192 }
|
|
2193
|
|
2194 static int server_socket_unix(const char *path) {
|
|
2195 int sfd;
|
|
2196 struct linger ling = {0, 0};
|
|
2197 struct sockaddr_un addr;
|
|
2198 struct stat tstat;
|
|
2199 int flags =1;
|
|
2200
|
|
2201 if (!path) {
|
|
2202 return -1;
|
|
2203 }
|
|
2204
|
|
2205 if ((sfd = new_socket_unix()) == -1) {
|
|
2206 return -1;
|
|
2207 }
|
|
2208
|
|
2209 /*
|
|
2210 * Clean up a previous socket file if we left it around
|
|
2211 */
|
|
2212 if (lstat(path, &tstat) == 0) {
|
|
2213 if (S_ISSOCK(tstat.st_mode))
|
|
2214 unlink(path);
|
|
2215 }
|
|
2216
|
|
2217 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
|
|
2218 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
|
|
2219 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
|
|
2220
|
|
2221 /*
|
|
2222 * the memset call clears nonstandard fields in some impementations
|
|
2223 * that otherwise mess things up.
|
|
2224 */
|
|
2225 memset(&addr, 0, sizeof(addr));
|
|
2226
|
|
2227 addr.sun_family = AF_UNIX;
|
|
2228 strcpy(addr.sun_path, path);
|
|
2229 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
|
2230 perror("bind()");
|
|
2231 close(sfd);
|
|
2232 return -1;
|
|
2233 }
|
|
2234 if (listen(sfd, 1024) == -1) {
|
|
2235 perror("listen()");
|
|
2236 close(sfd);
|
|
2237 return -1;
|
|
2238 }
|
|
2239 return sfd;
|
|
2240 }
|
|
2241
|
|
2242 /* listening socket */
|
|
2243 static int l_socket = 0;
|
|
2244
|
|
2245 /* udp socket */
|
|
2246 static int u_socket = -1;
|
|
2247
|
|
2248 /* invoke right before gdb is called, on assert */
|
|
2249 void pre_gdb(void) {
|
|
2250 int i;
|
|
2251 if (l_socket > -1) close(l_socket);
|
|
2252 if (u_socket > -1) close(u_socket);
|
|
2253 for (i = 3; i <= 500; i++) close(i); /* so lame */
|
|
2254 kill(getpid(), SIGABRT);
|
|
2255 }
|
|
2256
|
|
2257 /*
|
|
2258 * We keep the current time of day in a global variable that's updated by a
|
|
2259 * timer event. This saves us a bunch of time() system calls (we really only
|
|
2260 * need to get the time once a second, whereas there can be tens of thousands
|
|
2261 * of requests a second) and allows us to use server-start-relative timestamps
|
|
2262 * rather than absolute UNIX timestamps, a space savings on systems where
|
|
2263 * sizeof(time_t) > sizeof(unsigned int).
|
|
2264 */
|
|
2265 volatile rel_time_t current_time;
|
|
2266 static struct event clockevent;
|
|
2267
|
|
2268 /* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
|
|
2269 static void set_current_time(void) {
|
|
2270 current_time = (rel_time_t) (time(0) - stats.started);
|
|
2271 }
|
|
2272
|
|
2273 static void clock_handler(const int fd, const short which, void *arg) {
|
|
2274 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
|
|
2275 static bool initialized = false;
|
|
2276
|
|
2277 if (initialized) {
|
|
2278 /* only delete the event if it's actually there. */
|
|
2279 evtimer_del(&clockevent);
|
|
2280 } else {
|
|
2281 initialized = true;
|
|
2282 }
|
|
2283
|
|
2284 evtimer_set(&clockevent, clock_handler, 0);
|
|
2285 event_base_set(main_base, &clockevent);
|
|
2286 evtimer_add(&clockevent, &t);
|
|
2287
|
|
2288 set_current_time();
|
|
2289 }
|
|
2290
|
|
2291 static struct event deleteevent;
|
|
2292
|
|
2293 static void delete_handler(const int fd, const short which, void *arg) {
|
|
2294 struct timeval t = {.tv_sec = 5, .tv_usec = 0};
|
|
2295 static bool initialized = false;
|
|
2296
|
|
2297 if (initialized) {
|
|
2298 /* some versions of libevent don't like deleting events that don't exist,
|
|
2299 so only delete once we know this event has been added. */
|
|
2300 evtimer_del(&deleteevent);
|
|
2301 } else {
|
|
2302 initialized = true;
|
|
2303 }
|
|
2304
|
|
2305 evtimer_set(&deleteevent, delete_handler, 0);
|
|
2306 event_base_set(main_base, &deleteevent);
|
|
2307 evtimer_add(&deleteevent, &t);
|
|
2308 run_deferred_deletes();
|
|
2309 }
|
|
2310
|
|
2311 /* Call run_deferred_deletes instead of this. */
|
|
2312 void do_run_deferred_deletes(void)
|
|
2313 {
|
|
2314 int i, j = 0;
|
|
2315
|
|
2316 for (i = 0; i < delcurr; i++) {
|
|
2317 item *it = todelete[i];
|
|
2318 if (item_delete_lock_over(it)) {
|
|
2319 assert(it->refcount > 0);
|
|
2320 it->it_flags &= ~ITEM_DELETED;
|
|
2321 do_item_unlink(it);
|
|
2322 do_item_remove(it);
|
|
2323 } else {
|
|
2324 todelete[j++] = it;
|
|
2325 }
|
|
2326 }
|
|
2327 delcurr = j;
|
|
2328 }
|
|
2329
|
|
2330 static void usage(void) {
|
|
2331 printf(PACKAGE " " VERSION "\n");
|
|
2332 printf("-p <num> TCP port number to listen on (default: 11211)\n"
|
|
2333 "-U <num> UDP port number to listen on (default: 0, off)\n"
|
|
2334 "-s <file> unix socket path to listen on (disables network support)\n"
|
|
2335 "-l <ip_addr> interface to listen on, default is INDRR_ANY\n"
|
|
2336 "-d run as a daemon\n"
|
|
2337 "-r maximize core file limit\n"
|
|
2338 "-u <username> assume identity of <username> (only when run as root)\n"
|
|
2339 "-m <num> max memory to use for items in megabytes, default is 64 MB\n"
|
|
2340 "-M return error on memory exhausted (rather than removing items)\n"
|
|
2341 "-c <num> max simultaneous connections, default is 1024\n"
|
|
2342 "-k lock down all paged memory\n"
|
|
2343 "-v verbose (print errors/warnings while in event loop)\n"
|
|
2344 "-vv very verbose (also print client commands/reponses)\n"
|
|
2345 "-h print this help and exit\n"
|
|
2346 "-i print memcached and libevent license\n"
|
|
2347 "-b run a managed instanced (mnemonic: buckets)\n"
|
|
2348 "-P <file> save PID in <file>, only used with -d option\n"
|
|
2349 "-f <factor> chunk size growth factor, default 1.25\n"
|
|
2350 "-n <bytes> minimum space allocated for key+value+flags, default 48\n");
|
|
2351 #ifdef USE_THREADS
|
|
2352 printf("-t <num> number of threads to use, default 4\n");
|
|
2353 #endif
|
|
2354 return;
|
|
2355 }
|
|
2356
|
|
2357 static void usage_license(void) {
|
|
2358 printf(PACKAGE " " VERSION "\n\n");
|
|
2359 printf(
|
|
2360 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
|
|
2361 "All rights reserved.\n"
|
|
2362 "\n"
|
|
2363 "Redistribution and use in source and binary forms, with or without\n"
|
|
2364 "modification, are permitted provided that the following conditions are\n"
|
|
2365 "met:\n"
|
|
2366 "\n"
|
|
2367 " * Redistributions of source code must retain the above copyright\n"
|
|
2368 "notice, this list of conditions and the following disclaimer.\n"
|
|
2369 "\n"
|
|
2370 " * Redistributions in binary form must reproduce the above\n"
|
|
2371 "copyright notice, this list of conditions and the following disclaimer\n"
|
|
2372 "in the documentation and/or other materials provided with the\n"
|
|
2373 "distribution.\n"
|
|
2374 "\n"
|
|
2375 " * Neither the name of the Danga Interactive nor the names of its\n"
|
|
2376 "contributors may be used to endorse or promote products derived from\n"
|
|
2377 "this software without specific prior written permission.\n"
|
|
2378 "\n"
|
|
2379 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
|
|
2380 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
|
|
2381 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
|
|
2382 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
|
|
2383 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
|
|
2384 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
|
|
2385 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
|
|
2386 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
|
|
2387 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
|
|
2388 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
|
|
2389 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
|
|
2390 "\n"
|
|
2391 "\n"
|
|
2392 "This product includes software developed by Niels Provos.\n"
|
|
2393 "\n"
|
|
2394 "[ libevent ]\n"
|
|
2395 "\n"
|
|
2396 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
|
|
2397 "All rights reserved.\n"
|
|
2398 "\n"
|
|
2399 "Redistribution and use in source and binary forms, with or without\n"
|
|
2400 "modification, are permitted provided that the following conditions\n"
|
|
2401 "are met:\n"
|
|
2402 "1. Redistributions of source code must retain the above copyright\n"
|
|
2403 " notice, this list of conditions and the following disclaimer.\n"
|
|
2404 "2. Redistributions in binary form must reproduce the above copyright\n"
|
|
2405 " notice, this list of conditions and the following disclaimer in the\n"
|
|
2406 " documentation and/or other materials provided with the distribution.\n"
|
|
2407 "3. All advertising materials mentioning features or use of this software\n"
|
|
2408 " must display the following acknowledgement:\n"
|
|
2409 " This product includes software developed by Niels Provos.\n"
|
|
2410 "4. The name of the author may not be used to endorse or promote products\n"
|
|
2411 " derived from this software without specific prior written permission.\n"
|
|
2412 "\n"
|
|
2413 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
|
|
2414 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
|
|
2415 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
|
|
2416 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
|
|
2417 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
|
|
2418 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
|
|
2419 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
|
|
2420 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
|
|
2421 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
|
|
2422 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
|
|
2423 );
|
|
2424
|
|
2425 return;
|
|
2426 }
|
|
2427
|
|
2428 static void save_pid(const pid_t pid, const char *pid_file) {
|
|
2429 FILE *fp;
|
|
2430 if (pid_file == NULL)
|
|
2431 return;
|
|
2432
|
|
2433 if (!(fp = fopen(pid_file, "w"))) {
|
|
2434 fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
|
|
2435 return;
|
|
2436 }
|
|
2437
|
|
2438 fprintf(fp,"%ld\n", (long)pid);
|
|
2439 if (fclose(fp) == -1) {
|
|
2440 fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
|
|
2441 return;
|
|
2442 }
|
|
2443 }
|
|
2444
|
|
2445 static void remove_pidfile(const char *pid_file) {
|
|
2446 if (pid_file == NULL)
|
|
2447 return;
|
|
2448
|
|
2449 if (unlink(pid_file) != 0) {
|
|
2450 fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
|
|
2451 }
|
|
2452
|
|
2453 }
|
|
2454
|
|
2455
|
|
2456 static void sig_handler(const int sig) {
|
|
2457 printf("SIGINT handled.\n");
|
|
2458 exit(EXIT_SUCCESS);
|
|
2459 }
|
|
2460
|
|
2461 int main (int argc, char **argv) {
|
|
2462 int c;
|
|
2463 struct in_addr addr;
|
|
2464 bool lock_memory = false;
|
|
2465 bool daemonize = false;
|
|
2466 int maxcore = 0;
|
|
2467 char *username = NULL;
|
|
2468 char *pid_file = NULL;
|
|
2469 struct passwd *pw;
|
|
2470 struct sigaction sa;
|
|
2471 struct rlimit rlim;
|
|
2472
|
|
2473 /* handle SIGINT */
|
|
2474 signal(SIGINT, sig_handler);
|
|
2475
|
|
2476 /* init settings */
|
|
2477 settings_init();
|
|
2478
|
|
2479 /* set stderr non-buffering (for running under, say, daemontools) */
|
|
2480 setbuf(stderr, NULL);
|
|
2481
|
|
2482 /* process arguments */
|
|
2483 while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
|
|
2484 switch (c) {
|
|
2485 case 'U':
|
|
2486 settings.udpport = atoi(optarg);
|
|
2487 break;
|
|
2488 case 'b':
|
|
2489 settings.managed = true;
|
|
2490 break;
|
|
2491 case 'p':
|
|
2492 settings.port = atoi(optarg);
|
|
2493 break;
|
|
2494 case 's':
|
|
2495 settings.socketpath = optarg;
|
|
2496 break;
|
|
2497 case 'm':
|
|
2498 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
|
|
2499 break;
|
|
2500 case 'M':
|
|
2501 settings.evict_to_free = 0;
|
|
2502 break;
|
|
2503 case 'c':
|
|
2504 settings.maxconns = atoi(optarg);
|
|
2505 break;
|
|
2506 case 'h':
|
|
2507 usage();
|
|
2508 exit(EXIT_SUCCESS);
|
|
2509 case 'i':
|
|
2510 usage_license();
|
|
2511 exit(EXIT_SUCCESS);
|
|
2512 case 'k':
|
|
2513 lock_memory = true;
|
|
2514 break;
|
|
2515 case 'v':
|
|
2516 settings.verbose++;
|
|
2517 break;
|
|
2518 case 'l':
|
|
2519 if (inet_pton(AF_INET, optarg, &addr) <= 0) {
|
|
2520 fprintf(stderr, "Illegal address: %s\n", optarg);
|
|
2521 return 1;
|
|
2522 } else {
|
|
2523 settings.interf = addr;
|
|
2524 }
|
|
2525 break;
|
|
2526 case 'd':
|
|
2527 daemonize = true;
|
|
2528 break;
|
|
2529 case 'r':
|
|
2530 maxcore = 1;
|
|
2531 break;
|
|
2532 case 'u':
|
|
2533 username = optarg;
|
|
2534 break;
|
|
2535 case 'P':
|
|
2536 pid_file = optarg;
|
|
2537 break;
|
|
2538 case 'f':
|
|
2539 settings.factor = atof(optarg);
|
|
2540 if (settings.factor <= 1.0) {
|
|
2541 fprintf(stderr, "Factor must be greater than 1\n");
|
|
2542 return 1;
|
|
2543 }
|
|
2544 break;
|
|
2545 case 'n':
|
|
2546 settings.chunk_size = atoi(optarg);
|
|
2547 if (settings.chunk_size == 0) {
|
|
2548 fprintf(stderr, "Chunk size must be greater than 0\n");
|
|
2549 return 1;
|
|
2550 }
|
|
2551 break;
|
|
2552 case 't':
|
|
2553 settings.num_threads = atoi(optarg);
|
|
2554 if (settings.num_threads == 0) {
|
|
2555 fprintf(stderr, "Number of threads must be greater than 0\n");
|
|
2556 return 1;
|
|
2557 }
|
|
2558 break;
|
|
2559 case 'D':
|
|
2560 if (! optarg || ! optarg[0]) {
|
|
2561 fprintf(stderr, "No delimiter specified\n");
|
|
2562 return 1;
|
|
2563 }
|
|
2564 settings.prefix_delimiter = optarg[0];
|
|
2565 settings.detail_enabled = 1;
|
|
2566 break;
|
|
2567 default:
|
|
2568 fprintf(stderr, "Illegal argument \"%c\"\n", c);
|
|
2569 return 1;
|
|
2570 }
|
|
2571 }
|
|
2572
|
|
2573 if (maxcore != 0) {
|
|
2574 struct rlimit rlim_new;
|
|
2575 /*
|
|
2576 * First try raising to infinity; if that fails, try bringing
|
|
2577 * the soft limit to the hard.
|
|
2578 */
|
|
2579 if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
|
|
2580 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
|
|
2581 if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
|
|
2582 /* failed. try raising just to the old max */
|
|
2583 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
|
|
2584 (void)setrlimit(RLIMIT_CORE, &rlim_new);
|
|
2585 }
|
|
2586 }
|
|
2587 /*
|
|
2588 * getrlimit again to see what we ended up with. Only fail if
|
|
2589 * the soft limit ends up 0, because then no core files will be
|
|
2590 * created at all.
|
|
2591 */
|
|
2592
|
|
2593 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
|
|
2594 fprintf(stderr, "failed to ensure corefile creation\n");
|
|
2595 exit(EXIT_FAILURE);
|
|
2596 }
|
|
2597 }
|
|
2598
|
|
2599 /*
|
|
2600 * If needed, increase rlimits to allow as many connections
|
|
2601 * as needed.
|
|
2602 */
|
|
2603
|
|
2604 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
|
|
2605 fprintf(stderr, "failed to getrlimit number of files\n");
|
|
2606 exit(EXIT_FAILURE);
|
|
2607 } else {
|
|
2608 int maxfiles = settings.maxconns;
|
|
2609 if (rlim.rlim_cur < maxfiles)
|
|
2610 rlim.rlim_cur = maxfiles + 3;
|
|
2611 if (rlim.rlim_max < rlim.rlim_cur)
|
|
2612 rlim.rlim_max = rlim.rlim_cur;
|
|
2613 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
|
|
2614 fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n");
|
|
2615 exit(EXIT_FAILURE);
|
|
2616 }
|
|
2617 }
|
|
2618
|
|
2619 /*
|
|
2620 * initialization order: first create the listening sockets
|
|
2621 * (may need root on low ports), then drop root if needed,
|
|
2622 * then daemonise if needed, then init libevent (in some cases
|
|
2623 * descriptors created by libevent wouldn't survive forking).
|
|
2624 */
|
|
2625
|
|
2626 /* create the listening socket and bind it */
|
|
2627 if (settings.socketpath == NULL) {
|
|
2628 l_socket = server_socket(settings.port, 0);
|
|
2629 if (l_socket == -1) {
|
|
2630 fprintf(stderr, "failed to listen\n");
|
|
2631 exit(EXIT_FAILURE);
|
|
2632 }
|
|
2633 }
|
|
2634
|
|
2635 if (settings.udpport > 0 && settings.socketpath == NULL) {
|
|
2636 /* create the UDP listening socket and bind it */
|
|
2637 u_socket = server_socket(settings.udpport, 1);
|
|
2638 if (u_socket == -1) {
|
|
2639 fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
|
|
2640 exit(EXIT_FAILURE);
|
|
2641 }
|
|
2642 }
|
|
2643
|
|
2644 /* lose root privileges if we have them */
|
|
2645 if (getuid() == 0 || geteuid() == 0) {
|
|
2646 if (username == 0 || *username == '\0') {
|
|
2647 fprintf(stderr, "can't run as root without the -u switch\n");
|
|
2648 return 1;
|
|
2649 }
|
|
2650 if ((pw = getpwnam(username)) == 0) {
|
|
2651 fprintf(stderr, "can't find the user %s to switch to\n", username);
|
|
2652 return 1;
|
|
2653 }
|
|
2654 if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
|
|
2655 fprintf(stderr, "failed to assume identity of user %s\n", username);
|
|
2656 return 1;
|
|
2657 }
|
|
2658 }
|
|
2659
|
|
2660 /* create unix mode sockets after dropping privileges */
|
|
2661 if (settings.socketpath != NULL) {
|
|
2662 l_socket = server_socket_unix(settings.socketpath);
|
|
2663 if (l_socket == -1) {
|
|
2664 fprintf(stderr, "failed to listen\n");
|
|
2665 exit(EXIT_FAILURE);
|
|
2666 }
|
|
2667 }
|
|
2668
|
|
2669 /* daemonize if requested */
|
|
2670 /* if we want to ensure our ability to dump core, don't chdir to / */
|
|
2671 if (daemonize) {
|
|
2672 int res;
|
|
2673 res = daemon(maxcore, settings.verbose);
|
|
2674 if (res == -1) {
|
|
2675 fprintf(stderr, "failed to daemon() in order to daemonize\n");
|
|
2676 return 1;
|
|
2677 }
|
|
2678 }
|
|
2679
|
|
2680 /* initialize main thread libevent instance */
|
|
2681 main_base = event_init();
|
|
2682
|
|
2683 /* initialize other stuff */
|
|
2684 item_init();
|
|
2685 stats_init();
|
|
2686 assoc_init();
|
|
2687 conn_init();
|
|
2688 slabs_init(settings.maxbytes, settings.factor);
|
|
2689
|
|
2690 /* managed instance? alloc and zero a bucket array */
|
|
2691 if (settings.managed) {
|
|
2692 buckets = malloc(sizeof(int) * MAX_BUCKETS);
|
|
2693 if (buckets == 0) {
|
|
2694 fprintf(stderr, "failed to allocate the bucket array");
|
|
2695 exit(EXIT_FAILURE);
|
|
2696 }
|
|
2697 memset(buckets, 0, sizeof(int) * MAX_BUCKETS);
|
|
2698 }
|
|
2699
|
|
2700 /* lock paged memory if needed */
|
|
2701 if (lock_memory) {
|
|
2702 #ifdef HAVE_MLOCKALL
|
|
2703 mlockall(MCL_CURRENT | MCL_FUTURE);
|
|
2704 #else
|
|
2705 fprintf(stderr, "warning: mlockall() not supported on this platform. proceeding without.\n");
|
|
2706 #endif
|
|
2707 }
|
|
2708
|
|
2709 /*
|
|
2710 * ignore SIGPIPE signals; we can use errno==EPIPE if we
|
|
2711 * need that information
|
|
2712 */
|
|
2713 sa.sa_handler = SIG_IGN;
|
|
2714 sa.sa_flags = 0;
|
|
2715 if (sigemptyset(&sa.sa_mask) == -1 ||
|
|
2716 sigaction(SIGPIPE, &sa, 0) == -1) {
|
|
2717 perror("failed to ignore SIGPIPE; sigaction");
|
|
2718 exit(EXIT_FAILURE);
|
|
2719 }
|
|
2720 /* create the initial listening connection */
|
|
2721 if (!(listen_conn = conn_new(l_socket, conn_listening,
|
|
2722 EV_READ | EV_PERSIST, 1, false, main_base))) {
|
|
2723 fprintf(stderr, "failed to create listening connection");
|
|
2724 exit(EXIT_FAILURE);
|
|
2725 }
|
|
2726 /* save the PID in if we're a daemon */
|
|
2727 if (daemonize)
|
|
2728 save_pid(getpid(), pid_file);
|
|
2729 /* start up worker threads if MT mode */
|
|
2730 thread_init(settings.num_threads, main_base);
|
|
2731 /* initialise clock event */
|
|
2732 clock_handler(0, 0, 0);
|
|
2733 /* initialise deletion array and timer event */
|
|
2734 deltotal = 200;
|
|
2735 delcurr = 0;
|
|
2736 todelete = malloc(sizeof(item *) * deltotal);
|
|
2737 delete_handler(0, 0, 0); /* sets up the event */
|
|
2738 /* create the initial listening udp connection, monitored on all threads */
|
|
2739 if (u_socket > -1) {
|
|
2740 for (c = 0; c < settings.num_threads; c++) {
|
|
2741 /* this is guaranteed to hit all threads because we round-robin */
|
|
2742 dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
|
|
2743 UDP_READ_BUFFER_SIZE, 1);
|
|
2744 }
|
|
2745 }
|
|
2746 /* enter the event loop */
|
|
2747 event_base_loop(main_base, 0);
|
|
2748 /* remove the PID file if we're a daemon */
|
|
2749 if (daemonize)
|
|
2750 remove_pidfile(pid_file);
|
|
2751 return 0;
|
|
2752 }
|