0
|
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
|
2 /*
|
|
3 * memcached - memory caching daemon
|
|
4 *
|
|
5 * http://www.danga.com/memcached/
|
|
6 *
|
|
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
|
|
8 *
|
|
9 * Use and distribution licensed under the BSD license. See
|
|
10 * the LICENSE file for full text.
|
|
11 *
|
|
12 * Authors:
|
|
13 * Anatoly Vorobey <mellon@pobox.com>
|
|
14 * Brad Fitzpatrick <brad@danga.com>
|
|
15 std *
|
|
16 * $Id: memcached.c 570 2007-06-20 01:21:45Z plindner $
|
|
17 */
|
|
18 #include "memcached.h"
|
|
19 #include <sys/stat.h>
|
|
20 #include <sys/socket.h>
|
|
21 #include <sys/un.h>
|
|
22 #include <sys/signal.h>
|
|
23 #include <sys/resource.h>
|
|
24 #include <sys/uio.h>
|
|
25
|
|
26 /* some POSIX systems need the following definition
|
|
27 * to get mlockall flags out of sys/mman.h. */
|
|
28 #ifndef _P1003_1B_VISIBLE
|
|
29 #define _P1003_1B_VISIBLE
|
|
30 #endif
|
|
31 /* need this to get IOV_MAX on some platforms. */
|
|
32 #ifndef __need_IOV_MAX
|
|
33 #define __need_IOV_MAX
|
|
34 #endif
|
|
35 #include <pwd.h>
|
|
36 #include <sys/mman.h>
|
|
37 #include <fcntl.h>
|
|
38 #include <netinet/tcp.h>
|
|
39 #include <arpa/inet.h>
|
|
40 #include <errno.h>
|
|
41 #include <stdlib.h>
|
|
42 #include <stdio.h>
|
|
43 #include <string.h>
|
|
44 #include <time.h>
|
|
45 #include <assert.h>
|
|
46 #include <limits.h>
|
|
47
|
|
48 #ifdef HAVE_MALLOC_H
|
|
49 /* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
|
|
50 #ifndef __OpenBSD__
|
|
51 #include <malloc.h>
|
|
52 #endif
|
|
53 #endif
|
|
54
|
|
55 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
|
|
56 #ifndef IOV_MAX
|
|
57 #if defined(__FreeBSD__)
|
|
58 # define IOV_MAX 1024
|
|
59 #endif
|
|
60 #endif
|
|
61
|
|
62 /*
|
|
63 * forward declarations
|
|
64 */
|
|
65 static void drive_machine(conn *c);
|
|
66 static int new_socket(const bool is_udp);
|
|
67 static int server_socket(const int port, const bool is_udp);
|
|
68 static int try_read_command(conn *c);
|
|
69 static int try_read_network(conn *c);
|
|
70 static int try_read_udp(conn *c);
|
|
71
|
|
72 /* stats */
|
|
73 static void stats_reset(void);
|
|
74 static void stats_init(void);
|
|
75
|
|
76 /* defaults */
|
|
77 static void settings_init(void);
|
|
78
|
|
79 /* event handling, network IO */
|
|
80 static void event_handler(const int fd, const short which, void *arg);
|
|
81 static void conn_close(conn *c);
|
|
82 static void conn_init(void);
|
|
83 static void accept_new_conns(const bool do_accept);
|
|
84 static bool update_event(conn *c, const int new_flags);
|
|
85 static void complete_nread(conn *c);
|
|
86 static void process_command(conn *c, char *command);
|
|
87 static int transmit(conn *c);
|
|
88 static int ensure_iov_space(conn *c);
|
|
89 static int add_iov(conn *c, const void *buf, int len);
|
|
90 static int add_msghdr(conn *c);
|
|
91
|
|
92
|
|
93 /* time handling */
|
|
94 static void set_current_time(void); /* update the global variable holding
|
|
95 global 32-bit seconds-since-start time
|
|
96 (to avoid 64 bit time_t) */
|
|
97
|
|
98 void pre_gdb(void);
|
|
99 static void conn_free(conn *c);
|
|
100
|
|
101 /** exported globals **/
|
|
102 struct stats stats;
|
|
103 struct settings settings;
|
|
104
|
|
105 /** file scope variables **/
|
|
106 static item **todelete = 0;
|
|
107 static int delcurr;
|
|
108 static int deltotal;
|
|
109 static conn *listen_conn;
|
|
110 static struct event_base *main_base;
|
|
111
|
|
112 #define TRANSMIT_COMPLETE 0
|
|
113 #define TRANSMIT_INCOMPLETE 1
|
|
114 #define TRANSMIT_SOFT_ERROR 2
|
|
115 #define TRANSMIT_HARD_ERROR 3
|
|
116
|
|
117 static int *buckets = 0; /* bucket->generation array for a managed instance */
|
|
118
|
|
119 #define REALTIME_MAXDELTA 60*60*24*30
|
|
120 /*
|
|
121 * given time value that's either unix time or delta from current unix time, return
|
|
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't
|
|
123 * be that low).
|
|
124 */
|
|
125 static rel_time_t realtime(const time_t exptime) {
|
|
126 /* no. of seconds in 30 days - largest possible delta exptime */
|
|
127
|
|
128 if (exptime == 0) return 0; /* 0 means never expire */
|
|
129
|
|
130 if (exptime > REALTIME_MAXDELTA) {
|
|
131 /* if item expiration is at/before the server started, give it an
|
|
132 expiration time of 1 second after the server started.
|
|
133 (because 0 means don't expire). without this, we'd
|
|
134 underflow and wrap around to some large value way in the
|
|
135 future, effectively making items expiring in the past
|
|
136 really expiring never */
|
|
137 if (exptime <= stats.started)
|
|
138 return (rel_time_t)1;
|
|
139 return (rel_time_t)(exptime - stats.started);
|
|
140 } else {
|
|
141 return (rel_time_t)(exptime + current_time);
|
|
142 }
|
|
143 }
|
|
144
|
|
145 static void stats_init(void) {
|
|
146 stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
|
|
147 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
|
|
148 stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
|
|
149
|
|
150 /* make the time we started always be 2 seconds before we really
|
|
151 did, so time(0) - time.started is never zero. if so, things
|
|
152 like 'settings.oldest_live' which act as booleans as well as
|
|
153 values are now false in boolean context... */
|
|
154 stats.started = time(0) - 2;
|
|
155 stats_prefix_init();
|
|
156 }
|
|
157
|
|
158 static void stats_reset(void) {
|
|
159 STATS_LOCK();
|
|
160 stats.total_items = stats.total_conns = 0;
|
|
161 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
|
|
162 stats.bytes_read = stats.bytes_written = 0;
|
|
163 stats_prefix_clear();
|
|
164 STATS_UNLOCK();
|
|
165 }
|
|
166
|
|
167 static void settings_init(void) {
|
|
168 settings.port = 11211;
|
|
169 settings.udpport = 0;
|
|
170 settings.interf.s_addr = htonl(INADDR_ANY);
|
|
171 settings.maxbytes = 67108864; /* default is 64MB: (64 * 1024 * 1024) */
|
|
172 settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
|
|
173 settings.verbose = 0;
|
|
174 settings.oldest_live = 0;
|
|
175 settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
|
|
176 settings.socketpath = NULL; /* by default, not using a unix socket */
|
|
177 settings.managed = false;
|
|
178 settings.factor = 1.25;
|
|
179 settings.chunk_size = 48; /* space for a modest key and value */
|
|
180 #ifdef USE_THREADS
|
|
181 settings.num_threads = 4;
|
|
182 #else
|
|
183 settings.num_threads = 1;
|
|
184 #endif
|
|
185 settings.prefix_delimiter = ':';
|
|
186 settings.detail_enabled = 0;
|
|
187 }
|
|
188
|
|
189 /* returns true if a deleted item's delete-locked-time is over, and it
|
|
190 should be removed from the namespace */
|
|
191 static bool item_delete_lock_over (item *it) {
|
|
192 assert(it->it_flags & ITEM_DELETED);
|
|
193 return (current_time >= it->exptime);
|
|
194 }
|
|
195
|
|
196 /*
|
|
197 * Adds a message header to a connection.
|
|
198 *
|
|
199 * Returns 0 on success, -1 on out-of-memory.
|
|
200 */
|
|
201 static int add_msghdr(conn *c)
|
|
202 {
|
|
203 struct msghdr *msg;
|
|
204
|
|
205 assert(c != NULL);
|
|
206
|
|
207 if (c->msgsize == c->msgused) {
|
|
208 msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
|
|
209 if (! msg)
|
|
210 return -1;
|
|
211 c->msglist = msg;
|
|
212 c->msgsize *= 2;
|
|
213 }
|
|
214
|
|
215 msg = c->msglist + c->msgused;
|
|
216
|
|
217 /* this wipes msg_iovlen, msg_control, msg_controllen, and
|
|
218 msg_flags, the last 3 of which aren't defined on solaris: */
|
|
219 memset(msg, 0, sizeof(struct msghdr));
|
|
220
|
|
221 msg->msg_iov = &c->iov[c->iovused];
|
|
222 msg->msg_name = &c->request_addr;
|
|
223 msg->msg_namelen = c->request_addr_size;
|
|
224
|
|
225 c->msgbytes = 0;
|
|
226 c->msgused++;
|
|
227
|
|
228 if (c->udp) {
|
|
229 /* Leave room for the UDP header, which we'll fill in later. */
|
|
230 return add_iov(c, NULL, UDP_HEADER_SIZE);
|
|
231 }
|
|
232
|
|
233 return 0;
|
|
234 }
|
|
235
|
|
236
|
|
237 /*
|
|
238 * Free list management for connections.
|
|
239 */
|
|
240
|
|
241 static conn **freeconns;
|
|
242 static int freetotal;
|
|
243 static int freecurr;
|
|
244
|
|
245
|
|
246 static void conn_init(void) {
|
|
247 freetotal = 200;
|
|
248 freecurr = 0;
|
|
249 if (!(freeconns = (conn **)malloc(sizeof(conn *) * freetotal))) {
|
|
250 perror("malloc()");
|
|
251 }
|
|
252 return;
|
|
253 }
|
|
254
|
|
255 /*
|
|
256 * Returns a connection from the freelist, if any. Should call this using
|
|
257 * conn_from_freelist() for thread safety.
|
|
258 */
|
|
259 conn *do_conn_from_freelist() {
|
|
260 conn *c;
|
|
261
|
|
262 if (freecurr > 0) {
|
|
263 c = freeconns[--freecurr];
|
|
264 } else {
|
|
265 c = NULL;
|
|
266 }
|
|
267
|
|
268 return c;
|
|
269 }
|
|
270
|
|
271 /*
|
|
272 * Adds a connection to the freelist. 0 = success. Should call this using
|
|
273 * conn_add_to_freelist() for thread safety.
|
|
274 */
|
|
275 int do_conn_add_to_freelist(conn *c) {
|
|
276 if (freecurr < freetotal) {
|
|
277 freeconns[freecurr++] = c;
|
|
278 return 0;
|
|
279 } else {
|
|
280 /* try to enlarge free connections array */
|
|
281 conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
|
|
282 if (new_freeconns) {
|
|
283 freetotal *= 2;
|
|
284 freeconns = new_freeconns;
|
|
285 freeconns[freecurr++] = c;
|
|
286 return 0;
|
|
287 }
|
|
288 }
|
|
289 return 1;
|
|
290 }
|
|
291
|
|
292 conn *conn_new(const int sfd, const int init_state, const int event_flags,
|
|
293 const int read_buffer_size, const bool is_udp, struct event_base *base) {
|
|
294 conn *c = conn_from_freelist();
|
|
295
|
|
296 if (NULL == c) {
|
|
297 if (!(c = (conn *)malloc(sizeof(conn)))) {
|
|
298 perror("malloc()");
|
|
299 return NULL;
|
|
300 }
|
|
301 c->rbuf = c->wbuf = 0;
|
|
302 c->ilist = 0;
|
|
303 c->iov = 0;
|
|
304 c->msglist = 0;
|
|
305 c->hdrbuf = 0;
|
|
306
|
|
307 c->rsize = read_buffer_size;
|
|
308 c->wsize = DATA_BUFFER_SIZE;
|
|
309 c->isize = ITEM_LIST_INITIAL;
|
|
310 c->iovsize = IOV_LIST_INITIAL;
|
|
311 c->msgsize = MSG_LIST_INITIAL;
|
|
312 c->hdrsize = 0;
|
|
313
|
|
314 c->rbuf = (char *)malloc((size_t)c->rsize);
|
|
315 c->wbuf = (char *)malloc((size_t)c->wsize);
|
|
316 c->ilist = (item **)malloc(sizeof(item *) * c->isize);
|
|
317 c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
|
|
318 c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
|
|
319
|
|
320 if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
|
|
321 c->msglist == 0) {
|
|
322 if (c->rbuf != 0) free(c->rbuf);
|
|
323 if (c->wbuf != 0) free(c->wbuf);
|
|
324 if (c->ilist !=0) free(c->ilist);
|
|
325 if (c->iov != 0) free(c->iov);
|
|
326 if (c->msglist != 0) free(c->msglist);
|
|
327 free(c);
|
|
328 perror("malloc()");
|
|
329 return NULL;
|
|
330 }
|
|
331
|
|
332 STATS_LOCK();
|
|
333 stats.conn_structs++;
|
|
334 STATS_UNLOCK();
|
|
335 }
|
|
336
|
|
337 if (settings.verbose > 1) {
|
|
338 if (init_state == conn_listening)
|
|
339 fprintf(stderr, "<%d server listening\n", sfd);
|
|
340 else if (is_udp)
|
|
341 fprintf(stderr, "<%d server listening (udp)\n", sfd);
|
|
342 else
|
|
343 fprintf(stderr, "<%d new client connection\n", sfd);
|
|
344 }
|
|
345
|
|
346 c->sfd = sfd;
|
|
347 c->udp = is_udp;
|
|
348 c->state = init_state;
|
|
349 c->rlbytes = 0;
|
|
350 c->rbytes = c->wbytes = 0;
|
|
351 c->wcurr = c->wbuf;
|
|
352 c->rcurr = c->rbuf;
|
|
353 c->ritem = 0;
|
|
354 c->icurr = c->ilist;
|
|
355 c->ileft = 0;
|
|
356 c->iovused = 0;
|
|
357 c->msgcurr = 0;
|
|
358 c->msgused = 0;
|
|
359
|
|
360 c->write_and_go = conn_read;
|
|
361 c->write_and_free = 0;
|
|
362 c->item = 0;
|
|
363 c->bucket = -1;
|
|
364 c->gen = 0;
|
|
365
|
|
366 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
|
|
367 event_base_set(base, &c->event);
|
|
368 c->ev_flags = event_flags;
|
|
369
|
|
370 if (event_add(&c->event, 0) == -1) {
|
|
371 if (conn_add_to_freelist(c)) {
|
|
372 conn_free(c);
|
|
373 }
|
|
374 return NULL;
|
|
375 }
|
|
376
|
|
377 STATS_LOCK();
|
|
378 stats.curr_conns++;
|
|
379 stats.total_conns++;
|
|
380 STATS_UNLOCK();
|
|
381
|
|
382 return c;
|
|
383 }
|
|
384
|
|
385 static void conn_cleanup(conn *c) {
|
|
386 assert(c != NULL);
|
|
387
|
|
388 if (c->item) {
|
|
389 item_remove(c->item);
|
|
390 c->item = 0;
|
|
391 }
|
|
392
|
|
393 if (c->ileft != 0) {
|
|
394 for (; c->ileft > 0; c->ileft--,c->icurr++) {
|
|
395 item_remove(*(c->icurr));
|
|
396 }
|
|
397 }
|
|
398
|
|
399 if (c->write_and_free) {
|
|
400 free(c->write_and_free);
|
|
401 c->write_and_free = 0;
|
|
402 }
|
|
403 }
|
|
404
|
|
405 /*
|
|
406 * Frees a connection.
|
|
407 */
|
|
408 void conn_free(conn *c) {
|
|
409 if (c) {
|
|
410 if (c->hdrbuf)
|
|
411 free(c->hdrbuf);
|
|
412 if (c->msglist)
|
|
413 free(c->msglist);
|
|
414 if (c->rbuf)
|
|
415 free(c->rbuf);
|
|
416 if (c->wbuf)
|
|
417 free(c->wbuf);
|
|
418 if (c->ilist)
|
|
419 free(c->ilist);
|
|
420 if (c->iov)
|
|
421 free(c->iov);
|
|
422 free(c);
|
|
423 }
|
|
424 }
|
|
425
|
|
426 static void conn_close(conn *c) {
|
|
427 assert(c != NULL);
|
|
428
|
|
429 /* delete the event, the socket and the conn */
|
|
430 event_del(&c->event);
|
|
431
|
|
432 if (settings.verbose > 1)
|
|
433 fprintf(stderr, "<%d connection closed.\n", c->sfd);
|
|
434
|
|
435 close(c->sfd);
|
|
436 accept_new_conns(true);
|
|
437 conn_cleanup(c);
|
|
438
|
|
439 /* if the connection has big buffers, just free it */
|
|
440 if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
|
|
441 conn_free(c);
|
|
442 }
|
|
443
|
|
444 STATS_LOCK();
|
|
445 stats.curr_conns--;
|
|
446 STATS_UNLOCK();
|
|
447
|
|
448 return;
|
|
449 }
|
|
450
|
|
451
|
|
452 /*
|
|
453 * Shrinks a connection's buffers if they're too big. This prevents
|
|
454 * periodic large "get" requests from permanently chewing lots of server
|
|
455 * memory.
|
|
456 *
|
|
457 * This should only be called in between requests since it can wipe output
|
|
458 * buffers!
|
|
459 */
|
|
460 static void conn_shrink(conn *c) {
|
|
461 assert(c != NULL);
|
|
462
|
|
463 if (c->udp)
|
|
464 return;
|
|
465
|
|
466 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
|
|
467 char *newbuf;
|
|
468
|
|
469 if (c->rcurr != c->rbuf)
|
|
470 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
|
|
471
|
|
472 newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
|
|
473
|
|
474 if (newbuf) {
|
|
475 c->rbuf = newbuf;
|
|
476 c->rsize = DATA_BUFFER_SIZE;
|
|
477 }
|
|
478 /* TODO check other branch... */
|
|
479 c->rcurr = c->rbuf;
|
|
480 }
|
|
481
|
|
482 if (c->isize > ITEM_LIST_HIGHWAT) {
|
|
483 item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
|
|
484 if (newbuf) {
|
|
485 c->ilist = newbuf;
|
|
486 c->isize = ITEM_LIST_INITIAL;
|
|
487 }
|
|
488 /* TODO check error condition? */
|
|
489 }
|
|
490
|
|
491 if (c->msgsize > MSG_LIST_HIGHWAT) {
|
|
492 struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
|
|
493 if (newbuf) {
|
|
494 c->msglist = newbuf;
|
|
495 c->msgsize = MSG_LIST_INITIAL;
|
|
496 }
|
|
497 /* TODO check error condition? */
|
|
498 }
|
|
499
|
|
500 if (c->iovsize > IOV_LIST_HIGHWAT) {
|
|
501 struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
|
|
502 if (newbuf) {
|
|
503 c->iov = newbuf;
|
|
504 c->iovsize = IOV_LIST_INITIAL;
|
|
505 }
|
|
506 /* TODO check return value */
|
|
507 }
|
|
508 }
|
|
509
|
|
510 /*
|
|
511 * Sets a connection's current state in the state machine. Any special
|
|
512 * processing that needs to happen on certain state transitions can
|
|
513 * happen here.
|
|
514 */
|
|
515 static void conn_set_state(conn *c, int state) {
|
|
516 assert(c != NULL);
|
|
517
|
|
518 if (state != c->state) {
|
|
519 if (state == conn_read) {
|
|
520 conn_shrink(c);
|
|
521 assoc_move_next_bucket();
|
|
522 }
|
|
523 c->state = state;
|
|
524 }
|
|
525 }
|
|
526
|
|
527
|
|
528 /*
|
|
529 * Ensures that there is room for another struct iovec in a connection's
|
|
530 * iov list.
|
|
531 *
|
|
532 * Returns 0 on success, -1 on out-of-memory.
|
|
533 */
|
|
534 static int ensure_iov_space(conn *c) {
|
|
535 assert(c != NULL);
|
|
536
|
|
537 if (c->iovused >= c->iovsize) {
|
|
538 int i, iovnum;
|
|
539 struct iovec *new_iov = (struct iovec *)realloc(c->iov,
|
|
540 (c->iovsize * 2) * sizeof(struct iovec));
|
|
541 if (! new_iov)
|
|
542 return -1;
|
|
543 c->iov = new_iov;
|
|
544 c->iovsize *= 2;
|
|
545
|
|
546 /* Point all the msghdr structures at the new list. */
|
|
547 for (i = 0, iovnum = 0; i < c->msgused; i++) {
|
|
548 c->msglist[i].msg_iov = &c->iov[iovnum];
|
|
549 iovnum += c->msglist[i].msg_iovlen;
|
|
550 }
|
|
551 }
|
|
552
|
|
553 return 0;
|
|
554 }
|
|
555
|
|
556
|
|
557 /*
|
|
558 * Adds data to the list of pending data that will be written out to a
|
|
559 * connection.
|
|
560 *
|
|
561 * Returns 0 on success, -1 on out-of-memory.
|
|
562 */
|
|
563
|
|
564 static int add_iov(conn *c, const void *buf, int len) {
|
|
565 struct msghdr *m;
|
|
566 int leftover;
|
|
567 bool limit_to_mtu;
|
|
568
|
|
569 assert(c != NULL);
|
|
570
|
|
571 do {
|
|
572 m = &c->msglist[c->msgused - 1];
|
|
573
|
|
574 /*
|
|
575 * Limit UDP packets, and the first payloads of TCP replies, to
|
|
576 * UDP_MAX_PAYLOAD_SIZE bytes.
|
|
577 */
|
|
578 limit_to_mtu = c->udp || (1 == c->msgused);
|
|
579
|
|
580 /* We may need to start a new msghdr if this one is full. */
|
|
581 if (m->msg_iovlen == IOV_MAX ||
|
|
582 (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
|
|
583 add_msghdr(c);
|
|
584 m = &c->msglist[c->msgused - 1];
|
|
585 }
|
|
586
|
|
587 if (ensure_iov_space(c) != 0)
|
|
588 return -1;
|
|
589
|
|
590 /* If the fragment is too big to fit in the datagram, split it up */
|
|
591 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
|
|
592 leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
|
|
593 len -= leftover;
|
|
594 } else {
|
|
595 leftover = 0;
|
|
596 }
|
|
597
|
|
598 m = &c->msglist[c->msgused - 1];
|
|
599 m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
|
|
600 m->msg_iov[m->msg_iovlen].iov_len = len;
|
|
601
|
|
602 c->msgbytes += len;
|
|
603 c->iovused++;
|
|
604 m->msg_iovlen++;
|
|
605
|
|
606 buf = ((char *)buf) + len;
|
|
607 len = leftover;
|
|
608 } while (leftover > 0);
|
|
609
|
|
610 return 0;
|
|
611 }
|
|
612
|
|
613
|
|
614 /*
|
|
615 * Constructs a set of UDP headers and attaches them to the outgoing messages.
|
|
616 */
|
|
617 static int build_udp_headers(conn *c) {
|
|
618 int i;
|
|
619 unsigned char *hdr;
|
|
620
|
|
621 assert(c != NULL);
|
|
622
|
|
623 if (c->msgused > c->hdrsize) {
|
|
624 void *new_hdrbuf;
|
|
625 if (c->hdrbuf)
|
|
626 new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
|
|
627 else
|
|
628 new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
|
|
629 if (! new_hdrbuf)
|
|
630 return -1;
|
|
631 c->hdrbuf = (unsigned char *)new_hdrbuf;
|
|
632 c->hdrsize = c->msgused * 2;
|
|
633 }
|
|
634
|
|
635 hdr = c->hdrbuf;
|
|
636 for (i = 0; i < c->msgused; i++) {
|
|
637 c->msglist[i].msg_iov[0].iov_base = hdr;
|
|
638 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
|
|
639 *hdr++ = c->request_id / 256;
|
|
640 *hdr++ = c->request_id % 256;
|
|
641 *hdr++ = i / 256;
|
|
642 *hdr++ = i % 256;
|
|
643 *hdr++ = c->msgused / 256;
|
|
644 *hdr++ = c->msgused % 256;
|
|
645 *hdr++ = 0;
|
|
646 *hdr++ = 0;
|
|
647 assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
|
|
648 }
|
|
649
|
|
650 return 0;
|
|
651 }
|
|
652
|
|
653
|
|
654 static void out_string(conn *c, const char *str) {
|
|
655 size_t len;
|
|
656
|
|
657 assert(c != NULL);
|
|
658
|
|
659 if (settings.verbose > 1)
|
|
660 fprintf(stderr, ">%d %s\n", c->sfd, str);
|
|
661
|
|
662 len = strlen(str);
|
|
663 if ((len + 2) > c->wsize) {
|
|
664 /* ought to be always enough. just fail for simplicity */
|
|
665 str = "SERVER_ERROR output line too long";
|
|
666 len = strlen(str);
|
|
667 }
|
|
668
|
|
669 memcpy(c->wbuf, str, len);
|
|
670 memcpy(c->wbuf + len, "\r\n", 3);
|
|
671 c->wbytes = len + 2;
|
|
672 c->wcurr = c->wbuf;
|
|
673
|
|
674 conn_set_state(c, conn_write);
|
|
675 c->write_and_go = conn_read;
|
|
676 return;
|
|
677 }
|
|
678
|
|
679 /*
|
|
680 * we get here after reading the value in set/add/replace commands. The command
|
|
681 * has been stored in c->item_comm, and the item is ready in c->item.
|
|
682 */
|
|
683
|
|
684 static void complete_nread(conn *c) {
|
|
685 assert(c != NULL);
|
|
686
|
|
687 item *it = c->item;
|
|
688 int comm = c->item_comm;
|
|
689
|
|
690 STATS_LOCK();
|
|
691 stats.set_cmds++;
|
|
692 STATS_UNLOCK();
|
|
693
|
|
694 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
|
|
695 out_string(c, "CLIENT_ERROR bad data chunk");
|
|
696 } else {
|
|
697 if (store_item(it, comm)) {
|
|
698 out_string(c, "STORED");
|
|
699 } else {
|
|
700 out_string(c, "NOT_STORED");
|
|
701 }
|
|
702 }
|
|
703
|
|
704 item_remove(c->item); /* release the c->item reference */
|
|
705 c->item = 0;
|
|
706 }
|
|
707
|
|
708 /*
|
|
709 * Stores an item in the cache according to the semantics of one of the set
|
|
710 * commands. In threaded mode, this is protected by the cache lock.
|
|
711 *
|
|
712 * Returns true if the item was stored.
|
|
713 */
|
|
714 int do_store_item(item *it, int comm) {
|
|
715 char *key = ITEM_key(it);
|
|
716 bool delete_locked = false;
|
|
717 item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
|
|
718 int stored = 0;
|
|
719
|
2
|
720 item *new_it = NULL;
|
|
721 int flags;
|
|
722
|
0
|
723 if (old_it != NULL && comm == NREAD_ADD) {
|
|
724 /* add only adds a nonexistent item, but promote to head of LRU */
|
|
725 do_item_update(old_it);
|
2
|
726 } else if (!old_it && (comm == NREAD_REPLACE
|
|
727 || comm == NREAD_APPEND || comm == NREAD_PREPEND))
|
|
728 {
|
0
|
729 /* replace only replaces an existing value; don't store */
|
2
|
730 } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
|
|
731 || comm == NREAD_APPEND || comm == NREAD_PREPEND))
|
|
732 {
|
0
|
733 /* replace and add can't override delete locks; don't store */
|
|
734 } else {
|
2
|
735
|
|
736 /*
|
|
737 * Append - combine new and old record into single one. Here it's
|
|
738 * atomic and thread-safe.
|
|
739 */
|
|
740
|
|
741 if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
|
|
742
|
|
743 /* we have it and old_it here - alloc memory to hold both */
|
|
744 /* flags was already lost - so recover them from ITEM_suffix(it) */
|
|
745
|
|
746 flags = (int) strtol(ITEM_suffix(it), (char **) NULL, 10);
|
|
747
|
|
748 new_it = do_item_alloc(key, it->nkey, flags, it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
|
|
749
|
|
750 if (new_it == NULL) {
|
|
751 /* SERVER_ERROR out of memory */
|
|
752 return 0;
|
|
753 }
|
|
754
|
|
755 /* copy data from it and old_it to new_it */
|
|
756
|
|
757 if (comm == NREAD_APPEND) {
|
|
758 memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
|
|
759 memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
|
|
760 } else {
|
|
761 /* NREAD_PREPEND */
|
|
762 memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
|
|
763 memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
|
|
764 }
|
|
765
|
|
766 it = new_it;
|
|
767 }
|
|
768
|
0
|
769 /* "set" commands can override the delete lock
|
|
770 window... in which case we have to find the old hidden item
|
|
771 that's in the namespace/LRU but wasn't returned by
|
|
772 item_get.... because we need to replace it */
|
|
773 if (delete_locked)
|
|
774 old_it = do_item_get_nocheck(key, it->nkey);
|
|
775
|
|
776 if (old_it != NULL)
|
|
777 do_item_replace(old_it, it);
|
|
778 else
|
|
779 do_item_link(it);
|
|
780
|
|
781 stored = 1;
|
|
782 }
|
|
783
|
|
784 if (old_it)
|
|
785 do_item_remove(old_it); /* release our reference */
|
2
|
786 if (new_it)
|
|
787 do_item_remove(new_it);
|
|
788
|
0
|
789 return stored;
|
|
790 }
|
|
791
|
|
792 typedef struct token_s {
|
|
793 char *value;
|
|
794 size_t length;
|
|
795 } token_t;
|
|
796
|
|
797 #define COMMAND_TOKEN 0
|
|
798 #define SUBCOMMAND_TOKEN 1
|
|
799 #define KEY_TOKEN 1
|
|
800 #define KEY_MAX_LENGTH 250
|
|
801
|
|
802 #define MAX_TOKENS 6
|
|
803
|
|
804 /*
|
|
805 * Tokenize the command string by replacing whitespace with '\0' and update
|
|
806 * the token array tokens with pointer to start of each token and length.
|
|
807 * Returns total number of tokens. The last valid token is the terminal
|
|
808 * token (value points to the first unprocessed character of the string and
|
|
809 * length zero).
|
|
810 *
|
|
811 * Usage example:
|
|
812 *
|
|
813 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
|
|
814 * for(int ix = 0; tokens[ix].length != 0; ix++) {
|
|
815 * ...
|
|
816 * }
|
|
817 * ncommand = tokens[ix].value - command;
|
|
818 * command = tokens[ix].value;
|
|
819 * }
|
|
820 */
|
|
821 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
|
|
822 char *s, *e;
|
|
823 size_t ntokens = 0;
|
|
824
|
|
825 assert(command != NULL && tokens != NULL && max_tokens > 1);
|
|
826
|
|
827 for (s = e = command; ntokens < max_tokens - 1; ++e) {
|
|
828 if (*e == ' ') {
|
|
829 if (s != e) {
|
|
830 tokens[ntokens].value = s;
|
|
831 tokens[ntokens].length = e - s;
|
|
832 ntokens++;
|
|
833 *e = '\0';
|
|
834 }
|
|
835 s = e + 1;
|
|
836 }
|
|
837 else if (*e == '\0') {
|
|
838 if (s != e) {
|
|
839 tokens[ntokens].value = s;
|
|
840 tokens[ntokens].length = e - s;
|
|
841 ntokens++;
|
|
842 }
|
|
843
|
|
844 break; /* string end */
|
|
845 }
|
|
846 }
|
|
847
|
|
848 /*
|
|
849 * If we scanned the whole string, the terminal value pointer is null,
|
|
850 * otherwise it is the first unprocessed character.
|
|
851 */
|
|
852 tokens[ntokens].value = *e == '\0' ? NULL : e;
|
|
853 tokens[ntokens].length = 0;
|
|
854 ntokens++;
|
|
855
|
|
856 return ntokens;
|
|
857 }
|
|
858
|
|
859 inline static void process_stats_detail(conn *c, const char *command) {
|
|
860 assert(c != NULL);
|
|
861
|
|
862 if (strcmp(command, "on") == 0) {
|
|
863 settings.detail_enabled = 1;
|
|
864 out_string(c, "OK");
|
|
865 }
|
|
866 else if (strcmp(command, "off") == 0) {
|
|
867 settings.detail_enabled = 0;
|
|
868 out_string(c, "OK");
|
|
869 }
|
|
870 else if (strcmp(command, "dump") == 0) {
|
|
871 int len;
|
|
872 char *stats = stats_prefix_dump(&len);
|
|
873 if (NULL != stats) {
|
|
874 c->write_and_free = stats;
|
|
875 c->wcurr = stats;
|
|
876 c->wbytes = len;
|
|
877 conn_set_state(c, conn_write);
|
|
878 c->write_and_go = conn_read;
|
|
879 }
|
|
880 else {
|
|
881 out_string(c, "SERVER_ERROR");
|
|
882 }
|
|
883 }
|
|
884 else {
|
|
885 out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
|
|
886 }
|
|
887 }
|
|
888
|
|
889 static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
|
|
890 rel_time_t now = current_time;
|
|
891 char *command;
|
|
892 char *subcommand;
|
|
893
|
|
894 assert(c != NULL);
|
|
895
|
|
896 if(ntokens < 2) {
|
|
897 out_string(c, "CLIENT_ERROR bad command line");
|
|
898 return;
|
|
899 }
|
|
900
|
|
901 command = tokens[COMMAND_TOKEN].value;
|
|
902
|
|
903 if (ntokens == 2 && strcmp(command, "stats") == 0) {
|
|
904 char temp[1024];
|
|
905 pid_t pid = getpid();
|
|
906 char *pos = temp;
|
|
907
|
|
908 #ifndef WIN32
|
|
909 struct rusage usage;
|
|
910 getrusage(RUSAGE_SELF, &usage);
|
|
911 #endif /* !WIN32 */
|
|
912
|
|
913 STATS_LOCK();
|
|
914 pos += sprintf(pos, "STAT pid %u\r\n", pid);
|
|
915 pos += sprintf(pos, "STAT uptime %u\r\n", now);
|
|
916 pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
|
|
917 pos += sprintf(pos, "STAT version " VERSION "\r\n");
|
|
918 pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
|
|
919 #ifndef WIN32
|
|
920 pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
|
|
921 pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
|
|
922 #endif /* !WIN32 */
|
|
923 pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
|
|
924 pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
|
|
925 pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
|
|
926 pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
|
|
927 pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
|
|
928 pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
|
|
929 pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
|
|
930 pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
|
|
931 pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
|
|
932 pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
|
|
933 pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
|
|
934 pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
|
|
935 pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
|
|
936 pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
|
|
937 pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
|
|
938 pos += sprintf(pos, "END");
|
|
939 STATS_UNLOCK();
|
|
940 out_string(c, temp);
|
|
941 return;
|
|
942 }
|
|
943
|
|
944 subcommand = tokens[SUBCOMMAND_TOKEN].value;
|
|
945
|
|
946 if (strcmp(subcommand, "reset") == 0) {
|
|
947 stats_reset();
|
|
948 out_string(c, "RESET");
|
|
949 return;
|
|
950 }
|
|
951
|
|
952 #ifdef HAVE_MALLOC_H
|
|
953 #ifdef HAVE_STRUCT_MALLINFO
|
|
954 if (strcmp(subcommand, "malloc") == 0) {
|
|
955 char temp[512];
|
|
956 struct mallinfo info;
|
|
957 char *pos = temp;
|
|
958
|
|
959 info = mallinfo();
|
|
960 pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
|
|
961 pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
|
|
962 pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
|
|
963 pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
|
|
964 pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
|
|
965 pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
|
|
966 pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
|
|
967 pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
|
|
968 pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
|
|
969 pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
|
|
970 out_string(c, temp);
|
|
971 return;
|
|
972 }
|
|
973 #endif /* HAVE_STRUCT_MALLINFO */
|
|
974 #endif /* HAVE_MALLOC_H */
|
|
975
|
|
976 #if !defined(WIN32) || !defined(__APPLE__)
|
|
977 if (strcmp(subcommand, "maps") == 0) {
|
|
978 char *wbuf;
|
|
979 int wsize = 8192; /* should be enough */
|
|
980 int fd;
|
|
981 int res;
|
|
982
|
|
983 if (!(wbuf = (char *)malloc(wsize))) {
|
|
984 out_string(c, "SERVER_ERROR out of memory");
|
|
985 return;
|
|
986 }
|
|
987
|
|
988 fd = open("/proc/self/maps", O_RDONLY);
|
|
989 if (fd == -1) {
|
|
990 out_string(c, "SERVER_ERROR cannot open the maps file");
|
|
991 free(wbuf);
|
|
992 return;
|
|
993 }
|
|
994
|
|
995 res = read(fd, wbuf, wsize - 6); /* 6 = END\r\n\0 */
|
|
996 if (res == wsize - 6) {
|
|
997 out_string(c, "SERVER_ERROR buffer overflow");
|
|
998 free(wbuf); close(fd);
|
|
999 return;
|
|
1000 }
|
|
1001 if (res == 0 || res == -1) {
|
|
1002 out_string(c, "SERVER_ERROR can't read the maps file");
|
|
1003 free(wbuf); close(fd);
|
|
1004 return;
|
|
1005 }
|
|
1006 memcpy(wbuf + res, "END\r\n", 6);
|
|
1007 c->write_and_free = wbuf;
|
|
1008 c->wcurr = wbuf;
|
|
1009 c->wbytes = res + 5; // Don't write the terminal '\0'
|
|
1010 conn_set_state(c, conn_write);
|
|
1011 c->write_and_go = conn_read;
|
|
1012 close(fd);
|
|
1013 return;
|
|
1014 }
|
|
1015 #endif
|
|
1016
|
|
1017 if (strcmp(subcommand, "cachedump") == 0) {
|
|
1018
|
|
1019 char *buf;
|
|
1020 unsigned int bytes, id, limit = 0;
|
|
1021
|
|
1022 if(ntokens < 5) {
|
|
1023 out_string(c, "CLIENT_ERROR bad command line");
|
|
1024 return;
|
|
1025 }
|
|
1026
|
|
1027 id = strtoul(tokens[2].value, NULL, 10);
|
|
1028 limit = strtoul(tokens[3].value, NULL, 10);
|
|
1029
|
|
1030 if(errno == ERANGE) {
|
|
1031 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1032 return;
|
|
1033 }
|
|
1034
|
|
1035 buf = item_cachedump(id, limit, &bytes);
|
|
1036 if (buf == 0) {
|
|
1037 out_string(c, "SERVER_ERROR out of memory");
|
|
1038 return;
|
|
1039 }
|
|
1040
|
|
1041 c->write_and_free = buf;
|
|
1042 c->wcurr = buf;
|
|
1043 c->wbytes = bytes;
|
|
1044 conn_set_state(c, conn_write);
|
|
1045 c->write_and_go = conn_read;
|
|
1046 return;
|
|
1047 }
|
|
1048
|
|
1049 if (strcmp(subcommand, "slabs") == 0) {
|
|
1050 int bytes = 0;
|
|
1051 char *buf = slabs_stats(&bytes);
|
|
1052 if (!buf) {
|
|
1053 out_string(c, "SERVER_ERROR out of memory");
|
|
1054 return;
|
|
1055 }
|
|
1056 c->write_and_free = buf;
|
|
1057 c->wcurr = buf;
|
|
1058 c->wbytes = bytes;
|
|
1059 conn_set_state(c, conn_write);
|
|
1060 c->write_and_go = conn_read;
|
|
1061 return;
|
|
1062 }
|
|
1063
|
|
1064 if (strcmp(subcommand, "items") == 0) {
|
|
1065 char buffer[4096];
|
|
1066 item_stats(buffer, 4096);
|
|
1067 out_string(c, buffer);
|
|
1068 return;
|
|
1069 }
|
|
1070
|
|
1071 if (strcmp(subcommand, "detail") == 0) {
|
|
1072 if (ntokens < 4)
|
|
1073 process_stats_detail(c, ""); /* outputs the error message */
|
|
1074 else
|
|
1075 process_stats_detail(c, tokens[2].value);
|
|
1076 return;
|
|
1077 }
|
|
1078
|
|
1079 if (strcmp(subcommand, "sizes") == 0) {
|
|
1080 int bytes = 0;
|
|
1081 char *buf = item_stats_sizes(&bytes);
|
|
1082 if (! buf) {
|
|
1083 out_string(c, "SERVER_ERROR out of memory");
|
|
1084 return;
|
|
1085 }
|
|
1086
|
|
1087 c->write_and_free = buf;
|
|
1088 c->wcurr = buf;
|
|
1089 c->wbytes = bytes;
|
|
1090 conn_set_state(c, conn_write);
|
|
1091 c->write_and_go = conn_read;
|
|
1092 return;
|
|
1093 }
|
|
1094
|
|
1095 out_string(c, "ERROR");
|
|
1096 }
|
|
1097
|
|
1098 /* ntokens is overwritten here... shrug.. */
|
|
1099 static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) {
|
|
1100 char *key;
|
|
1101 size_t nkey;
|
|
1102 int i = 0;
|
|
1103 item *it;
|
|
1104 token_t *key_token = &tokens[KEY_TOKEN];
|
|
1105
|
|
1106 assert(c != NULL);
|
|
1107
|
|
1108 if (settings.managed) {
|
|
1109 int bucket = c->bucket;
|
|
1110 if (bucket == -1) {
|
|
1111 out_string(c, "CLIENT_ERROR no BG data in managed mode");
|
|
1112 return;
|
|
1113 }
|
|
1114 c->bucket = -1;
|
|
1115 if (buckets[bucket] != c->gen) {
|
|
1116 out_string(c, "ERROR_NOT_OWNER");
|
|
1117 return;
|
|
1118 }
|
|
1119 }
|
|
1120
|
|
1121 do {
|
|
1122 while(key_token->length != 0) {
|
|
1123
|
|
1124 key = key_token->value;
|
|
1125 nkey = key_token->length;
|
|
1126
|
|
1127 if(nkey > KEY_MAX_LENGTH) {
|
|
1128 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1129 return;
|
|
1130 }
|
|
1131
|
|
1132 STATS_LOCK();
|
|
1133 stats.get_cmds++;
|
|
1134 STATS_UNLOCK();
|
|
1135 it = item_get(key, nkey);
|
|
1136 if (settings.detail_enabled) {
|
|
1137 stats_prefix_record_get(key, NULL != it);
|
|
1138 }
|
|
1139 if (it) {
|
|
1140 if (i >= c->isize) {
|
|
1141 item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
|
|
1142 if (new_list) {
|
|
1143 c->isize *= 2;
|
|
1144 c->ilist = new_list;
|
|
1145 } else break;
|
|
1146 }
|
|
1147
|
|
1148 /*
|
|
1149 * Construct the response. Each hit adds three elements to the
|
|
1150 * outgoing data list:
|
|
1151 * "VALUE "
|
|
1152 * key
|
|
1153 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
|
|
1154 */
|
|
1155 if (add_iov(c, "VALUE ", 6) != 0 ||
|
|
1156 add_iov(c, ITEM_key(it), it->nkey) != 0 ||
|
|
1157 add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
|
|
1158 {
|
|
1159 break;
|
|
1160 }
|
|
1161 if (settings.verbose > 1)
|
|
1162 fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
|
|
1163
|
|
1164 /* item_get() has incremented it->refcount for us */
|
|
1165 STATS_LOCK();
|
|
1166 stats.get_hits++;
|
|
1167 STATS_UNLOCK();
|
|
1168 item_update(it);
|
|
1169 *(c->ilist + i) = it;
|
|
1170 i++;
|
|
1171
|
|
1172 } else {
|
|
1173 STATS_LOCK();
|
|
1174 stats.get_misses++;
|
|
1175 STATS_UNLOCK();
|
|
1176 }
|
|
1177
|
|
1178 key_token++;
|
|
1179 }
|
|
1180
|
|
1181 /*
|
|
1182 * If the command string hasn't been fully processed, get the next set
|
|
1183 * of tokens.
|
|
1184 */
|
|
1185 if(key_token->value != NULL) {
|
|
1186 ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
|
|
1187 key_token = tokens;
|
|
1188 }
|
|
1189
|
|
1190 } while(key_token->value != NULL);
|
|
1191
|
|
1192 c->icurr = c->ilist;
|
|
1193 c->ileft = i;
|
|
1194
|
|
1195 if (settings.verbose > 1)
|
|
1196 fprintf(stderr, ">%d END\n", c->sfd);
|
|
1197 add_iov(c, "END\r\n", 5);
|
|
1198
|
|
1199 if (c->udp && build_udp_headers(c) != 0) {
|
|
1200 out_string(c, "SERVER_ERROR out of memory");
|
|
1201 }
|
|
1202 else {
|
|
1203 conn_set_state(c, conn_mwrite);
|
|
1204 c->msgcurr = 0;
|
|
1205 }
|
|
1206 return;
|
|
1207 }
|
|
1208
|
|
1209 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm) {
|
|
1210 char *key;
|
|
1211 size_t nkey;
|
|
1212 int flags;
|
|
1213 time_t exptime;
|
|
1214 int vlen;
|
|
1215 item *it;
|
|
1216
|
|
1217 assert(c != NULL);
|
|
1218
|
|
1219 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
|
|
1220 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1221 return;
|
|
1222 }
|
|
1223
|
|
1224 key = tokens[KEY_TOKEN].value;
|
|
1225 nkey = tokens[KEY_TOKEN].length;
|
|
1226
|
|
1227 flags = strtoul(tokens[2].value, NULL, 10);
|
|
1228 exptime = strtol(tokens[3].value, NULL, 10);
|
|
1229 vlen = strtol(tokens[4].value, NULL, 10);
|
|
1230
|
|
1231 if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
|
|
1232 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1233 return;
|
|
1234 }
|
|
1235
|
|
1236 if (settings.detail_enabled) {
|
|
1237 stats_prefix_record_set(key);
|
|
1238 }
|
|
1239
|
|
1240 if (settings.managed) {
|
|
1241 int bucket = c->bucket;
|
|
1242 if (bucket == -1) {
|
|
1243 out_string(c, "CLIENT_ERROR no BG data in managed mode");
|
|
1244 return;
|
|
1245 }
|
|
1246 c->bucket = -1;
|
|
1247 if (buckets[bucket] != c->gen) {
|
|
1248 out_string(c, "ERROR_NOT_OWNER");
|
|
1249 return;
|
|
1250 }
|
|
1251 }
|
|
1252
|
|
1253 it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
|
|
1254
|
|
1255 if (it == 0) {
|
|
1256 if (! item_size_ok(nkey, flags, vlen + 2))
|
|
1257 out_string(c, "SERVER_ERROR object too large for cache");
|
|
1258 else
|
|
1259 out_string(c, "SERVER_ERROR out of memory");
|
|
1260 /* swallow the data line */
|
|
1261 c->write_and_go = conn_swallow;
|
|
1262 c->sbytes = vlen + 2;
|
|
1263 return;
|
|
1264 }
|
|
1265
|
|
1266 c->item_comm = comm;
|
|
1267 c->item = it;
|
|
1268 c->ritem = ITEM_data(it);
|
|
1269 c->rlbytes = it->nbytes;
|
|
1270 conn_set_state(c, conn_nread);
|
|
1271 }
|
|
1272
|
|
1273 static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) {
|
|
1274 char temp[32];
|
|
1275 item *it;
|
|
1276 unsigned int delta;
|
|
1277 char *key;
|
|
1278 size_t nkey;
|
|
1279
|
|
1280 assert(c != NULL);
|
|
1281
|
|
1282 if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
|
|
1283 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1284 return;
|
|
1285 }
|
|
1286
|
|
1287 key = tokens[KEY_TOKEN].value;
|
|
1288 nkey = tokens[KEY_TOKEN].length;
|
|
1289
|
|
1290 if (settings.managed) {
|
|
1291 int bucket = c->bucket;
|
|
1292 if (bucket == -1) {
|
|
1293 out_string(c, "CLIENT_ERROR no BG data in managed mode");
|
|
1294 return;
|
|
1295 }
|
|
1296 c->bucket = -1;
|
|
1297 if (buckets[bucket] != c->gen) {
|
|
1298 out_string(c, "ERROR_NOT_OWNER");
|
|
1299 return;
|
|
1300 }
|
|
1301 }
|
|
1302
|
|
1303 delta = strtoul(tokens[2].value, NULL, 10);
|
|
1304
|
|
1305 if(errno == ERANGE) {
|
|
1306 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1307 return;
|
|
1308 }
|
|
1309
|
|
1310 it = item_get(key, nkey);
|
|
1311 if (!it) {
|
|
1312 out_string(c, "NOT_FOUND");
|
|
1313 return;
|
|
1314 }
|
|
1315
|
|
1316 out_string(c, add_delta(it, incr, delta, temp));
|
|
1317 item_remove(it); /* release our reference */
|
|
1318 }
|
|
1319
|
|
1320 /*
|
|
1321 * adds a delta value to a numeric item.
|
|
1322 *
|
|
1323 * it item to adjust
|
|
1324 * incr true to increment value, false to decrement
|
|
1325 * delta amount to adjust value by
|
|
1326 * buf buffer for response string
|
|
1327 *
|
|
1328 * returns a response string to send back to the client.
|
|
1329 */
|
|
1330 char *do_add_delta(item *it, const int incr, unsigned int delta, char *buf) {
|
|
1331 char *ptr;
|
|
1332 unsigned int value;
|
|
1333 int res;
|
|
1334
|
|
1335 ptr = ITEM_data(it);
|
|
1336 while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++; // BUG: can't be true
|
|
1337
|
|
1338 value = strtol(ptr, NULL, 10);
|
|
1339
|
|
1340 if(errno == ERANGE) {
|
|
1341 return "CLIENT_ERROR cannot increment or decrement non-numeric value";
|
|
1342 }
|
|
1343
|
|
1344 if (incr != 0)
|
|
1345 value += delta;
|
|
1346 else {
|
|
1347 if (delta >= value) value = 0;
|
|
1348 else value -= delta;
|
|
1349 }
|
|
1350 snprintf(buf, 32, "%u", value);
|
|
1351 res = strlen(buf);
|
|
1352 if (res + 2 > it->nbytes) { /* need to realloc */
|
|
1353 item *new_it;
|
|
1354 new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
|
|
1355 if (new_it == 0) {
|
|
1356 return "SERVER_ERROR out of memory";
|
|
1357 }
|
|
1358 memcpy(ITEM_data(new_it), buf, res);
|
|
1359 memcpy(ITEM_data(new_it) + res, "\r\n", 3);
|
|
1360 do_item_replace(it, new_it);
|
|
1361 do_item_remove(new_it); /* release our reference */
|
|
1362 } else { /* replace in-place */
|
|
1363 memcpy(ITEM_data(it), buf, res);
|
|
1364 memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
|
|
1365 }
|
|
1366
|
|
1367 return buf;
|
|
1368 }
|
|
1369
|
|
1370 static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
|
|
1371 char *key;
|
|
1372 size_t nkey;
|
|
1373 item *it;
|
|
1374 time_t exptime = 0;
|
|
1375
|
|
1376 assert(c != NULL);
|
|
1377
|
|
1378 if (settings.managed) {
|
|
1379 int bucket = c->bucket;
|
|
1380 if (bucket == -1) {
|
|
1381 out_string(c, "CLIENT_ERROR no BG data in managed mode");
|
|
1382 return;
|
|
1383 }
|
|
1384 c->bucket = -1;
|
|
1385 if (buckets[bucket] != c->gen) {
|
|
1386 out_string(c, "ERROR_NOT_OWNER");
|
|
1387 return;
|
|
1388 }
|
|
1389 }
|
|
1390
|
|
1391 key = tokens[KEY_TOKEN].value;
|
|
1392 nkey = tokens[KEY_TOKEN].length;
|
|
1393
|
|
1394 if(nkey > KEY_MAX_LENGTH) {
|
|
1395 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1396 return;
|
|
1397 }
|
|
1398
|
|
1399 if(ntokens == 4) {
|
|
1400 exptime = strtol(tokens[2].value, NULL, 10);
|
|
1401
|
|
1402 if(errno == ERANGE) {
|
|
1403 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1404 return;
|
|
1405 }
|
|
1406 }
|
|
1407
|
|
1408 if (settings.detail_enabled) {
|
|
1409 stats_prefix_record_delete(key);
|
|
1410 }
|
|
1411
|
|
1412 it = item_get(key, nkey);
|
|
1413 if (it) {
|
|
1414 if (exptime == 0) {
|
|
1415 item_unlink(it);
|
|
1416 item_remove(it); /* release our reference */
|
|
1417 out_string(c, "DELETED");
|
|
1418 } else {
|
|
1419 /* our reference will be transfered to the delete queue */
|
|
1420 out_string(c, defer_delete(it, exptime));
|
|
1421 }
|
|
1422 } else {
|
|
1423 out_string(c, "NOT_FOUND");
|
|
1424 }
|
|
1425 }
|
|
1426
|
|
1427 /*
|
|
1428 * Adds an item to the deferred-delete list so it can be reaped later.
|
|
1429 *
|
|
1430 * Returns the result to send to the client.
|
|
1431 */
|
|
1432 char *do_defer_delete(item *it, time_t exptime)
|
|
1433 {
|
|
1434 if (delcurr >= deltotal) {
|
|
1435 item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
|
|
1436 if (new_delete) {
|
|
1437 todelete = new_delete;
|
|
1438 deltotal *= 2;
|
|
1439 } else {
|
|
1440 /*
|
|
1441 * can't delete it immediately, user wants a delay,
|
|
1442 * but we ran out of memory for the delete queue
|
|
1443 */
|
|
1444 item_remove(it); /* release reference */
|
|
1445 return "SERVER_ERROR out of memory";
|
|
1446 }
|
|
1447 }
|
|
1448
|
|
1449 /* use its expiration time as its deletion time now */
|
|
1450 it->exptime = realtime(exptime);
|
|
1451 it->it_flags |= ITEM_DELETED;
|
|
1452 todelete[delcurr++] = it;
|
|
1453
|
|
1454 return "DELETED";
|
|
1455 }
|
|
1456
|
|
1457 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
|
|
1458 unsigned int level;
|
|
1459
|
|
1460 assert(c != NULL);
|
|
1461
|
|
1462 level = strtoul(tokens[1].value, NULL, 10);
|
|
1463 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
|
|
1464 out_string(c, "OK");
|
|
1465 return;
|
|
1466 }
|
|
1467
|
|
1468 static void process_command(conn *c, char *command) {
|
|
1469
|
|
1470 token_t tokens[MAX_TOKENS];
|
|
1471 size_t ntokens;
|
|
1472 int comm;
|
|
1473
|
|
1474 assert(c != NULL);
|
|
1475
|
|
1476 if (settings.verbose > 1)
|
|
1477 fprintf(stderr, "<%d %s\n", c->sfd, command);
|
|
1478
|
|
1479 /*
|
|
1480 * for commands set/add/replace, we build an item and read the data
|
|
1481 * directly into it, then continue in nread_complete().
|
|
1482 */
|
|
1483
|
|
1484 c->msgcurr = 0;
|
|
1485 c->msgused = 0;
|
|
1486 c->iovused = 0;
|
|
1487 if (add_msghdr(c) != 0) {
|
|
1488 out_string(c, "SERVER_ERROR out of memory");
|
|
1489 return;
|
|
1490 }
|
|
1491
|
|
1492 ntokens = tokenize_command(command, tokens, MAX_TOKENS);
|
|
1493
|
|
1494 if (ntokens >= 3 &&
|
|
1495 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
|
|
1496 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
|
|
1497
|
|
1498 process_get_command(c, tokens, ntokens);
|
|
1499
|
|
1500 } else if (ntokens == 6 &&
|
|
1501 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
|
|
1502 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
|
2
|
1503 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) ||
|
|
1504 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
|
0
|
1505 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
|
|
1506
|
|
1507 process_update_command(c, tokens, ntokens, comm);
|
|
1508
|
|
1509 } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
|
|
1510
|
|
1511 process_arithmetic_command(c, tokens, ntokens, 1);
|
|
1512
|
|
1513 } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
|
|
1514
|
|
1515 process_arithmetic_command(c, tokens, ntokens, 0);
|
|
1516
|
|
1517 } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
|
|
1518
|
|
1519 process_delete_command(c, tokens, ntokens);
|
|
1520
|
|
1521 } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
|
|
1522 unsigned int bucket, gen;
|
|
1523 if (!settings.managed) {
|
|
1524 out_string(c, "CLIENT_ERROR not a managed instance");
|
|
1525 return;
|
|
1526 }
|
|
1527
|
|
1528 if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
|
|
1529 if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
|
|
1530 out_string(c, "CLIENT_ERROR bucket number out of range");
|
|
1531 return;
|
|
1532 }
|
|
1533 buckets[bucket] = gen;
|
|
1534 out_string(c, "OWNED");
|
|
1535 return;
|
|
1536 } else {
|
|
1537 out_string(c, "CLIENT_ERROR bad format");
|
|
1538 return;
|
|
1539 }
|
|
1540
|
|
1541 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
|
|
1542
|
|
1543 int bucket;
|
|
1544 if (!settings.managed) {
|
|
1545 out_string(c, "CLIENT_ERROR not a managed instance");
|
|
1546 return;
|
|
1547 }
|
|
1548 if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
|
|
1549 if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
|
|
1550 out_string(c, "CLIENT_ERROR bucket number out of range");
|
|
1551 return;
|
|
1552 }
|
|
1553 buckets[bucket] = 0;
|
|
1554 out_string(c, "DISOWNED");
|
|
1555 return;
|
|
1556 } else {
|
|
1557 out_string(c, "CLIENT_ERROR bad format");
|
|
1558 return;
|
|
1559 }
|
|
1560
|
|
1561 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
|
|
1562 int bucket, gen;
|
|
1563 if (!settings.managed) {
|
|
1564 out_string(c, "CLIENT_ERROR not a managed instance");
|
|
1565 return;
|
|
1566 }
|
|
1567 if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
|
|
1568 /* we never write anything back, even if input's wrong */
|
|
1569 if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
|
|
1570 /* do nothing, bad input */
|
|
1571 } else {
|
|
1572 c->bucket = bucket;
|
|
1573 c->gen = gen;
|
|
1574 }
|
|
1575 conn_set_state(c, conn_read);
|
|
1576 return;
|
|
1577 } else {
|
|
1578 out_string(c, "CLIENT_ERROR bad format");
|
|
1579 return;
|
|
1580 }
|
|
1581
|
|
1582 } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
|
|
1583
|
|
1584 process_stat(c, tokens, ntokens);
|
|
1585
|
|
1586 } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
|
|
1587 time_t exptime = 0;
|
|
1588 set_current_time();
|
|
1589
|
|
1590 if(ntokens == 2) {
|
|
1591 settings.oldest_live = current_time - 1;
|
|
1592 item_flush_expired();
|
|
1593 out_string(c, "OK");
|
|
1594 return;
|
|
1595 }
|
|
1596
|
|
1597 exptime = strtol(tokens[1].value, NULL, 10);
|
|
1598 if(errno == ERANGE) {
|
|
1599 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1600 return;
|
|
1601 }
|
|
1602
|
|
1603 settings.oldest_live = realtime(exptime) - 1;
|
|
1604 item_flush_expired();
|
|
1605 out_string(c, "OK");
|
|
1606 return;
|
|
1607
|
|
1608 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
|
|
1609
|
|
1610 out_string(c, "VERSION " VERSION);
|
|
1611
|
|
1612 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
|
|
1613
|
|
1614 conn_set_state(c, conn_closing);
|
|
1615
|
|
1616 } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
|
|
1617 strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
|
|
1618 #ifdef ALLOW_SLABS_REASSIGN
|
|
1619
|
|
1620 int src, dst, rv;
|
|
1621
|
|
1622 src = strtol(tokens[2].value, NULL, 10);
|
|
1623 dst = strtol(tokens[3].value, NULL, 10);
|
|
1624
|
|
1625 if(errno == ERANGE) {
|
|
1626 out_string(c, "CLIENT_ERROR bad command line format");
|
|
1627 return;
|
|
1628 }
|
|
1629
|
|
1630 rv = slabs_reassign(src, dst);
|
|
1631 if (rv == 1) {
|
|
1632 out_string(c, "DONE");
|
|
1633 return;
|
|
1634 }
|
|
1635 if (rv == 0) {
|
|
1636 out_string(c, "CANT");
|
|
1637 return;
|
|
1638 }
|
|
1639 if (rv == -1) {
|
|
1640 out_string(c, "BUSY");
|
|
1641 return;
|
|
1642 }
|
|
1643 #else
|
|
1644 out_string(c, "CLIENT_ERROR Slab reassignment not supported");
|
|
1645 #endif
|
|
1646 } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
|
|
1647 process_verbosity_command(c, tokens, ntokens);
|
|
1648 } else {
|
|
1649 out_string(c, "ERROR");
|
|
1650 }
|
|
1651 return;
|
|
1652 }
|
|
1653
|
|
1654 /*
|
|
1655 * if we have a complete line in the buffer, process it.
|
|
1656 */
|
|
1657 static int try_read_command(conn *c) {
|
|
1658 char *el, *cont;
|
|
1659
|
|
1660 assert(c != NULL);
|
|
1661 assert(c->rcurr <= (c->rbuf + c->rsize));
|
|
1662
|
|
1663 if (c->rbytes == 0)
|
|
1664 return 0;
|
|
1665 el = memchr(c->rcurr, '\n', c->rbytes);
|
|
1666 if (!el)
|
|
1667 return 0;
|
|
1668 cont = el + 1;
|
|
1669 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
|
|
1670 el--;
|
|
1671 }
|
|
1672 *el = '\0';
|
|
1673
|
|
1674 assert(cont <= (c->rcurr + c->rbytes));
|
|
1675
|
|
1676 process_command(c, c->rcurr);
|
|
1677
|
|
1678 c->rbytes -= (cont - c->rcurr);
|
|
1679 c->rcurr = cont;
|
|
1680
|
|
1681 assert(c->rcurr <= (c->rbuf + c->rsize));
|
|
1682
|
|
1683 return 1;
|
|
1684 }
|
|
1685
|
|
1686 /*
|
|
1687 * read a UDP request.
|
|
1688 * return 0 if there's nothing to read.
|
|
1689 */
|
|
1690 static int try_read_udp(conn *c) {
|
|
1691 int res;
|
|
1692
|
|
1693 assert(c != NULL);
|
|
1694
|
|
1695 c->request_addr_size = sizeof(c->request_addr);
|
|
1696 res = recvfrom(c->sfd, c->rbuf, c->rsize,
|
|
1697 0, &c->request_addr, &c->request_addr_size);
|
|
1698 if (res > 8) {
|
|
1699 unsigned char *buf = (unsigned char *)c->rbuf;
|
|
1700 STATS_LOCK();
|
|
1701 stats.bytes_read += res;
|
|
1702 STATS_UNLOCK();
|
|
1703
|
|
1704 /* Beginning of UDP packet is the request ID; save it. */
|
|
1705 c->request_id = buf[0] * 256 + buf[1];
|
|
1706
|
|
1707 /* If this is a multi-packet request, drop it. */
|
|
1708 if (buf[4] != 0 || buf[5] != 1) {
|
|
1709 out_string(c, "SERVER_ERROR multi-packet request not supported");
|
|
1710 return 0;
|
|
1711 }
|
|
1712
|
|
1713 /* Don't care about any of the rest of the header. */
|
|
1714 res -= 8;
|
|
1715 memmove(c->rbuf, c->rbuf + 8, res);
|
|
1716
|
|
1717 c->rbytes += res;
|
|
1718 c->rcurr = c->rbuf;
|
|
1719 return 1;
|
|
1720 }
|
|
1721 return 0;
|
|
1722 }
|
|
1723
|
|
1724 /*
|
|
1725 * read from network as much as we can, handle buffer overflow and connection
|
|
1726 * close.
|
|
1727 * before reading, move the remaining incomplete fragment of a command
|
|
1728 * (if any) to the beginning of the buffer.
|
|
1729 * return 0 if there's nothing to read on the first read.
|
|
1730 */
|
|
1731 static int try_read_network(conn *c) {
|
|
1732 int gotdata = 0;
|
|
1733 int res;
|
|
1734
|
|
1735 assert(c != NULL);
|
|
1736
|
|
1737 if (c->rcurr != c->rbuf) {
|
|
1738 if (c->rbytes != 0) /* otherwise there's nothing to copy */
|
|
1739 memmove(c->rbuf, c->rcurr, c->rbytes);
|
|
1740 c->rcurr = c->rbuf;
|
|
1741 }
|
|
1742
|
|
1743 while (1) {
|
|
1744 if (c->rbytes >= c->rsize) {
|
|
1745 char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
|
|
1746 if (!new_rbuf) {
|
|
1747 if (settings.verbose > 0)
|
|
1748 fprintf(stderr, "Couldn't realloc input buffer\n");
|
|
1749 c->rbytes = 0; /* ignore what we read */
|
|
1750 out_string(c, "SERVER_ERROR out of memory");
|
|
1751 c->write_and_go = conn_closing;
|
|
1752 return 1;
|
|
1753 }
|
|
1754 c->rcurr = c->rbuf = new_rbuf;
|
|
1755 c->rsize *= 2;
|
|
1756 }
|
|
1757
|
|
1758 /* unix socket mode doesn't need this, so zeroed out. but why
|
|
1759 * is this done for every command? presumably for UDP
|
|
1760 * mode. */
|
|
1761 if (!settings.socketpath) {
|
|
1762 c->request_addr_size = sizeof(c->request_addr);
|
|
1763 } else {
|
|
1764 c->request_addr_size = 0;
|
|
1765 }
|
|
1766
|
|
1767 res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
|
|
1768 if (res > 0) {
|
|
1769 STATS_LOCK();
|
|
1770 stats.bytes_read += res;
|
|
1771 STATS_UNLOCK();
|
|
1772 gotdata = 1;
|
|
1773 c->rbytes += res;
|
|
1774 continue;
|
|
1775 }
|
|
1776 if (res == 0) {
|
|
1777 /* connection closed */
|
|
1778 conn_set_state(c, conn_closing);
|
|
1779 return 1;
|
|
1780 }
|
|
1781 if (res == -1) {
|
|
1782 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
|
|
1783 else return 0;
|
|
1784 }
|
|
1785 }
|
|
1786 return gotdata;
|
|
1787 }
|
|
1788
|
|
1789 static bool update_event(conn *c, const int new_flags) {
|
|
1790 assert(c != NULL);
|
|
1791
|
|
1792 struct event_base *base = c->event.ev_base;
|
|
1793 if (c->ev_flags == new_flags)
|
|
1794 return true;
|
|
1795 if (event_del(&c->event) == -1) return false;
|
|
1796 event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
|
|
1797 event_base_set(base, &c->event);
|
|
1798 c->ev_flags = new_flags;
|
|
1799 if (event_add(&c->event, 0) == -1) return false;
|
|
1800 return true;
|
|
1801 }
|
|
1802
|
|
1803 /*
|
|
1804 * Sets whether we are listening for new connections or not.
|
|
1805 */
|
|
1806 void accept_new_conns(const bool do_accept) {
|
|
1807 if (! is_listen_thread())
|
|
1808 return;
|
|
1809 if (do_accept) {
|
|
1810 update_event(listen_conn, EV_READ | EV_PERSIST);
|
|
1811 if (listen(listen_conn->sfd, 1024) != 0) {
|
|
1812 perror("listen");
|
|
1813 }
|
|
1814 }
|
|
1815 else {
|
|
1816 update_event(listen_conn, 0);
|
|
1817 if (listen(listen_conn->sfd, 0) != 0) {
|
|
1818 perror("listen");
|
|
1819 }
|
|
1820 }
|
|
1821 }
|
|
1822
|
|
1823
|
|
1824 /*
|
|
1825 * Transmit the next chunk of data from our list of msgbuf structures.
|
|
1826 *
|
|
1827 * Returns:
|
|
1828 * TRANSMIT_COMPLETE All done writing.
|
|
1829 * TRANSMIT_INCOMPLETE More data remaining to write.
|
|
1830 * TRANSMIT_SOFT_ERROR Can't write any more right now.
|
|
1831 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
|
|
1832 */
|
|
1833 static int transmit(conn *c) {
|
|
1834 assert(c != NULL);
|
|
1835
|
|
1836 if (c->msgcurr < c->msgused &&
|
|
1837 c->msglist[c->msgcurr].msg_iovlen == 0) {
|
|
1838 /* Finished writing the current msg; advance to the next. */
|
|
1839 c->msgcurr++;
|
|
1840 }
|
|
1841 if (c->msgcurr < c->msgused) {
|
|
1842 ssize_t res;
|
|
1843 struct msghdr *m = &c->msglist[c->msgcurr];
|
|
1844
|
|
1845 res = sendmsg(c->sfd, m, 0);
|
|
1846 if (res > 0) {
|
|
1847 STATS_LOCK();
|
|
1848 stats.bytes_written += res;
|
|
1849 STATS_UNLOCK();
|
|
1850
|
|
1851 /* We've written some of the data. Remove the completed
|
|
1852 iovec entries from the list of pending writes. */
|
|
1853 while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
|
|
1854 res -= m->msg_iov->iov_len;
|
|
1855 m->msg_iovlen--;
|
|
1856 m->msg_iov++;
|
|
1857 }
|
|
1858
|
|
1859 /* Might have written just part of the last iovec entry;
|
|
1860 adjust it so the next write will do the rest. */
|
|
1861 if (res > 0) {
|
|
1862 m->msg_iov->iov_base += res;
|
|
1863 m->msg_iov->iov_len -= res;
|
|
1864 }
|
|
1865 return TRANSMIT_INCOMPLETE;
|
|
1866 }
|
|
1867 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
|
1868 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
|
|
1869 if (settings.verbose > 0)
|
|
1870 fprintf(stderr, "Couldn't update event\n");
|
|
1871 conn_set_state(c, conn_closing);
|
|
1872 return TRANSMIT_HARD_ERROR;
|
|
1873 }
|
|
1874 return TRANSMIT_SOFT_ERROR;
|
|
1875 }
|
|
1876 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
|
|
1877 we have a real error, on which we close the connection */
|
|
1878 if (settings.verbose > 0)
|
|
1879 perror("Failed to write, and not due to blocking");
|
|
1880
|
|
1881 if (c->udp)
|
|
1882 conn_set_state(c, conn_read);
|
|
1883 else
|
|
1884 conn_set_state(c, conn_closing);
|
|
1885 return TRANSMIT_HARD_ERROR;
|
|
1886 } else {
|
|
1887 return TRANSMIT_COMPLETE;
|
|
1888 }
|
|
1889 }
|
|
1890
|
|
1891 static void drive_machine(conn *c) {
|
|
1892 bool stop = false;
|
|
1893 int sfd, flags = 1;
|
|
1894 socklen_t addrlen;
|
|
1895 struct sockaddr addr;
|
|
1896 int res;
|
|
1897
|
|
1898 assert(c != NULL);
|
|
1899
|
|
1900 while (!stop) {
|
|
1901
|
|
1902 switch(c->state) {
|
|
1903 case conn_listening:
|
|
1904 addrlen = sizeof(addr);
|
|
1905 if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
|
|
1906 if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
1907 /* these are transient, so don't log anything */
|
|
1908 stop = true;
|
|
1909 } else if (errno == EMFILE) {
|
|
1910 if (settings.verbose > 0)
|
|
1911 fprintf(stderr, "Too many open connections\n");
|
|
1912 accept_new_conns(false);
|
|
1913 stop = true;
|
|
1914 } else {
|
|
1915 perror("accept()");
|
|
1916 stop = true;
|
|
1917 }
|
|
1918 break;
|
|
1919 }
|
|
1920 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
|
|
1921 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
|
|
1922 perror("setting O_NONBLOCK");
|
|
1923 close(sfd);
|
|
1924 break;
|
|
1925 }
|
|
1926 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
|
|
1927 DATA_BUFFER_SIZE, false);
|
|
1928 break;
|
|
1929
|
|
1930 case conn_read:
|
|
1931 if (try_read_command(c) != 0) {
|
|
1932 continue;
|
|
1933 }
|
|
1934 if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
|
|
1935 continue;
|
|
1936 }
|
|
1937 /* we have no command line and no data to read from network */
|
|
1938 if (!update_event(c, EV_READ | EV_PERSIST)) {
|
|
1939 if (settings.verbose > 0)
|
|
1940 fprintf(stderr, "Couldn't update event\n");
|
|
1941 conn_set_state(c, conn_closing);
|
|
1942 break;
|
|
1943 }
|
|
1944 stop = true;
|
|
1945 break;
|
|
1946
|
|
1947 case conn_nread:
|
|
1948 /* we are reading rlbytes into ritem; */
|
|
1949 if (c->rlbytes == 0) {
|
|
1950 complete_nread(c);
|
|
1951 break;
|
|
1952 }
|
|
1953 /* first check if we have leftovers in the conn_read buffer */
|
|
1954 if (c->rbytes > 0) {
|
|
1955 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
|
|
1956 memcpy(c->ritem, c->rcurr, tocopy);
|
|
1957 c->ritem += tocopy;
|
|
1958 c->rlbytes -= tocopy;
|
|
1959 c->rcurr += tocopy;
|
|
1960 c->rbytes -= tocopy;
|
|
1961 break;
|
|
1962 }
|
|
1963
|
|
1964 /* now try reading from the socket */
|
|
1965 res = read(c->sfd, c->ritem, c->rlbytes);
|
|
1966 if (res > 0) {
|
|
1967 STATS_LOCK();
|
|
1968 stats.bytes_read += res;
|
|
1969 STATS_UNLOCK();
|
|
1970 c->ritem += res;
|
|
1971 c->rlbytes -= res;
|
|
1972 break;
|
|
1973 }
|
|
1974 if (res == 0) { /* end of stream */
|
|
1975 conn_set_state(c, conn_closing);
|
|
1976 break;
|
|
1977 }
|
|
1978 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
|
1979 if (!update_event(c, EV_READ | EV_PERSIST)) {
|
|
1980 if (settings.verbose > 0)
|
|
1981 fprintf(stderr, "Couldn't update event\n");
|
|
1982 conn_set_state(c, conn_closing);
|
|
1983 break;
|
|
1984 }
|
|
1985 stop = true;
|
|
1986 break;
|
|
1987 }
|
|
1988 /* otherwise we have a real error, on which we close the connection */
|
|
1989 if (settings.verbose > 0)
|
|
1990 fprintf(stderr, "Failed to read, and not due to blocking\n");
|
|
1991 conn_set_state(c, conn_closing);
|
|
1992 break;
|
|
1993
|
|
1994 case conn_swallow:
|
|
1995 /* we are reading sbytes and throwing them away */
|
|
1996 if (c->sbytes == 0) {
|
|
1997 conn_set_state(c, conn_read);
|
|
1998 break;
|
|
1999 }
|
|
2000
|
|
2001 /* first check if we have leftovers in the conn_read buffer */
|
|
2002 if (c->rbytes > 0) {
|
|
2003 int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
|
|
2004 c->sbytes -= tocopy;
|
|
2005 c->rcurr += tocopy;
|
|
2006 c->rbytes -= tocopy;
|
|
2007 break;
|
|
2008 }
|
|
2009
|
|
2010 /* now try reading from the socket */
|
|
2011 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
|
|
2012 if (res > 0) {
|
|
2013 STATS_LOCK();
|
|
2014 stats.bytes_read += res;
|
|
2015 STATS_UNLOCK();
|
|
2016 c->sbytes -= res;
|
|
2017 break;
|
|
2018 }
|
|
2019 if (res == 0) { /* end of stream */
|
|
2020 conn_set_state(c, conn_closing);
|
|
2021 break;
|
|
2022 }
|
|
2023 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
|
2024 if (!update_event(c, EV_READ | EV_PERSIST)) {
|
|
2025 if (settings.verbose > 0)
|
|
2026 fprintf(stderr, "Couldn't update event\n");
|
|
2027 conn_set_state(c, conn_closing);
|
|
2028 break;
|
|
2029 }
|
|
2030 stop = true;
|
|
2031 break;
|
|
2032 }
|
|
2033 /* otherwise we have a real error, on which we close the connection */
|
|
2034 if (settings.verbose > 0)
|
|
2035 fprintf(stderr, "Failed to read, and not due to blocking\n");
|
|
2036 conn_set_state(c, conn_closing);
|
|
2037 break;
|
|
2038
|
|
2039 case conn_write:
|
|
2040 /*
|
|
2041 * We want to write out a simple response. If we haven't already,
|
|
2042 * assemble it into a msgbuf list (this will be a single-entry
|
|
2043 * list for TCP or a two-entry list for UDP).
|
|
2044 */
|
|
2045 if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
|
|
2046 if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
|
|
2047 (c->udp && build_udp_headers(c) != 0)) {
|
|
2048 if (settings.verbose > 0)
|
|
2049 fprintf(stderr, "Couldn't build response\n");
|
|
2050 conn_set_state(c, conn_closing);
|
|
2051 break;
|
|
2052 }
|
|
2053 }
|
|
2054
|
|
2055 /* fall through... */
|
|
2056
|
|
2057 case conn_mwrite:
|
|
2058 switch (transmit(c)) {
|
|
2059 case TRANSMIT_COMPLETE:
|
|
2060 if (c->state == conn_mwrite) {
|
|
2061 while (c->ileft > 0) {
|
|
2062 item *it = *(c->icurr);
|
|
2063 assert((it->it_flags & ITEM_SLABBED) == 0);
|
|
2064 item_remove(it);
|
|
2065 c->icurr++;
|
|
2066 c->ileft--;
|
|
2067 }
|
|
2068 conn_set_state(c, conn_read);
|
|
2069 } else if (c->state == conn_write) {
|
|
2070 if (c->write_and_free) {
|
|
2071 free(c->write_and_free);
|
|
2072 c->write_and_free = 0;
|
|
2073 }
|
|
2074 conn_set_state(c, c->write_and_go);
|
|
2075 } else {
|
|
2076 if (settings.verbose > 0)
|
|
2077 fprintf(stderr, "Unexpected state %d\n", c->state);
|
|
2078 conn_set_state(c, conn_closing);
|
|
2079 }
|
|
2080 break;
|
|
2081
|
|
2082 case TRANSMIT_INCOMPLETE:
|
|
2083 case TRANSMIT_HARD_ERROR:
|
|
2084 break; /* Continue in state machine. */
|
|
2085
|
|
2086 case TRANSMIT_SOFT_ERROR:
|
|
2087 stop = true;
|
|
2088 break;
|
|
2089 }
|
|
2090 break;
|
|
2091
|
|
2092 case conn_closing:
|
|
2093 if (c->udp)
|
|
2094 conn_cleanup(c);
|
|
2095 else
|
|
2096 conn_close(c);
|
|
2097 stop = true;
|
|
2098 break;
|
|
2099 }
|
|
2100 }
|
|
2101
|
|
2102 return;
|
|
2103 }
|
|
2104
|
|
2105 void event_handler(const int fd, const short which, void *arg) {
|
|
2106 conn *c;
|
|
2107
|
|
2108 c = (conn *)arg;
|
|
2109 assert(c != NULL);
|
|
2110
|
|
2111 c->which = which;
|
|
2112
|
|
2113 /* sanity */
|
|
2114 if (fd != c->sfd) {
|
|
2115 if (settings.verbose > 0)
|
|
2116 fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
|
|
2117 conn_close(c);
|
|
2118 return;
|
|
2119 }
|
|
2120
|
|
2121 drive_machine(c);
|
|
2122
|
|
2123 /* wait for next event */
|
|
2124 return;
|
|
2125 }
|
|
2126
|
|
2127 static int new_socket(const bool is_udp) {
|
|
2128 int sfd;
|
|
2129 int flags;
|
|
2130
|
|
2131 if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
|
|
2132 perror("socket()");
|
|
2133 return -1;
|
|
2134 }
|
|
2135
|
|
2136 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
|
|
2137 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
|
|
2138 perror("setting O_NONBLOCK");
|
|
2139 close(sfd);
|
|
2140 return -1;
|
|
2141 }
|
|
2142 return sfd;
|
|
2143 }
|
|
2144
|
|
2145
|
|
2146 /*
|
|
2147 * Sets a socket's send buffer size to the maximum allowed by the system.
|
|
2148 */
|
|
2149 static void maximize_sndbuf(const int sfd) {
|
|
2150 socklen_t intsize = sizeof(int);
|
|
2151 int last_good = 0;
|
|
2152 int min, max, avg;
|
|
2153 int old_size;
|
|
2154
|
|
2155 /* Start with the default size. */
|
|
2156 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
|
|
2157 if (settings.verbose > 0)
|
|
2158 perror("getsockopt(SO_SNDBUF)");
|
|
2159 return;
|
|
2160 }
|
|
2161
|
|
2162 /* Binary-search for the real maximum. */
|
|
2163 min = old_size;
|
|
2164 max = MAX_SENDBUF_SIZE;
|
|
2165
|
|
2166 while (min <= max) {
|
|
2167 avg = ((unsigned int)(min + max)) / 2;
|
|
2168 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
|
|
2169 last_good = avg;
|
|
2170 min = avg + 1;
|
|
2171 } else {
|
|
2172 max = avg - 1;
|
|
2173 }
|
|
2174 }
|
|
2175
|
|
2176 if (settings.verbose > 1)
|
|
2177 fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
|
|
2178 }
|
|
2179
|
|
2180
|
|
2181 static int server_socket(const int port, const bool is_udp) {
|
|
2182 int sfd;
|
|
2183 struct linger ling = {0, 0};
|
|
2184 struct sockaddr_in addr;
|
|
2185 int flags =1;
|
|
2186
|
|
2187 if ((sfd = new_socket(is_udp)) == -1) {
|
|
2188 return -1;
|
|
2189 }
|
|
2190
|
|
2191 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
|
|
2192 if (is_udp) {
|
|
2193 maximize_sndbuf(sfd);
|
|
2194 } else {
|
|
2195 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
|
|
2196 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
|
|
2197 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
|
|
2198 }
|
|
2199
|
|
2200 /*
|
|
2201 * the memset call clears nonstandard fields in some impementations
|
|
2202 * that otherwise mess things up.
|
|
2203 */
|
|
2204 memset(&addr, 0, sizeof(addr));
|
|
2205
|
|
2206 addr.sin_family = AF_INET;
|
|
2207 addr.sin_port = htons(port);
|
|
2208 addr.sin_addr = settings.interf;
|
|
2209 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
|
2210 perror("bind()");
|
|
2211 close(sfd);
|
|
2212 return -1;
|
|
2213 }
|
|
2214 if (!is_udp && listen(sfd, 1024) == -1) {
|
|
2215 perror("listen()");
|
|
2216 close(sfd);
|
|
2217 return -1;
|
|
2218 }
|
|
2219 return sfd;
|
|
2220 }
|
|
2221
|
|
2222 static int new_socket_unix(void) {
|
|
2223 int sfd;
|
|
2224 int flags;
|
|
2225
|
|
2226 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
|
|
2227 perror("socket()");
|
|
2228 return -1;
|
|
2229 }
|
|
2230
|
|
2231 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
|
|
2232 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
|
|
2233 perror("setting O_NONBLOCK");
|
|
2234 close(sfd);
|
|
2235 return -1;
|
|
2236 }
|
|
2237 return sfd;
|
|
2238 }
|
|
2239
|
|
2240 static int server_socket_unix(const char *path) {
|
|
2241 int sfd;
|
|
2242 struct linger ling = {0, 0};
|
|
2243 struct sockaddr_un addr;
|
|
2244 struct stat tstat;
|
|
2245 int flags =1;
|
|
2246
|
|
2247 if (!path) {
|
|
2248 return -1;
|
|
2249 }
|
|
2250
|
|
2251 if ((sfd = new_socket_unix()) == -1) {
|
|
2252 return -1;
|
|
2253 }
|
|
2254
|
|
2255 /*
|
|
2256 * Clean up a previous socket file if we left it around
|
|
2257 */
|
|
2258 if (lstat(path, &tstat) == 0) {
|
|
2259 if (S_ISSOCK(tstat.st_mode))
|
|
2260 unlink(path);
|
|
2261 }
|
|
2262
|
|
2263 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
|
|
2264 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
|
|
2265 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
|
|
2266
|
|
2267 /*
|
|
2268 * the memset call clears nonstandard fields in some impementations
|
|
2269 * that otherwise mess things up.
|
|
2270 */
|
|
2271 memset(&addr, 0, sizeof(addr));
|
|
2272
|
|
2273 addr.sun_family = AF_UNIX;
|
|
2274 strcpy(addr.sun_path, path);
|
|
2275 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
|
2276 perror("bind()");
|
|
2277 close(sfd);
|
|
2278 return -1;
|
|
2279 }
|
|
2280 if (listen(sfd, 1024) == -1) {
|
|
2281 perror("listen()");
|
|
2282 close(sfd);
|
|
2283 return -1;
|
|
2284 }
|
|
2285 return sfd;
|
|
2286 }
|
|
2287
|
|
2288 /* listening socket */
|
|
2289 static int l_socket = 0;
|
|
2290
|
|
2291 /* udp socket */
|
|
2292 static int u_socket = -1;
|
|
2293
|
|
2294 /* invoke right before gdb is called, on assert */
|
|
2295 void pre_gdb(void) {
|
|
2296 int i;
|
|
2297 if (l_socket > -1) close(l_socket);
|
|
2298 if (u_socket > -1) close(u_socket);
|
|
2299 for (i = 3; i <= 500; i++) close(i); /* so lame */
|
|
2300 kill(getpid(), SIGABRT);
|
|
2301 }
|
|
2302
|
|
2303 /*
|
|
2304 * We keep the current time of day in a global variable that's updated by a
|
|
2305 * timer event. This saves us a bunch of time() system calls (we really only
|
|
2306 * need to get the time once a second, whereas there can be tens of thousands
|
|
2307 * of requests a second) and allows us to use server-start-relative timestamps
|
|
2308 * rather than absolute UNIX timestamps, a space savings on systems where
|
|
2309 * sizeof(time_t) > sizeof(unsigned int).
|
|
2310 */
|
|
2311 volatile rel_time_t current_time;
|
|
2312 static struct event clockevent;
|
|
2313
|
|
2314 /* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
|
|
2315 static void set_current_time(void) {
|
|
2316 current_time = (rel_time_t) (time(0) - stats.started);
|
|
2317 }
|
|
2318
|
|
2319 static void clock_handler(const int fd, const short which, void *arg) {
|
|
2320 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
|
|
2321 static bool initialized = false;
|
|
2322
|
|
2323 if (initialized) {
|
|
2324 /* only delete the event if it's actually there. */
|
|
2325 evtimer_del(&clockevent);
|
|
2326 } else {
|
|
2327 initialized = true;
|
|
2328 }
|
|
2329
|
|
2330 evtimer_set(&clockevent, clock_handler, 0);
|
|
2331 event_base_set(main_base, &clockevent);
|
|
2332 evtimer_add(&clockevent, &t);
|
|
2333
|
|
2334 set_current_time();
|
|
2335 }
|
|
2336
|
|
2337 static struct event deleteevent;
|
|
2338
|
|
2339 static void delete_handler(const int fd, const short which, void *arg) {
|
|
2340 struct timeval t = {.tv_sec = 5, .tv_usec = 0};
|
|
2341 static bool initialized = false;
|
|
2342
|
|
2343 if (initialized) {
|
|
2344 /* some versions of libevent don't like deleting events that don't exist,
|
|
2345 so only delete once we know this event has been added. */
|
|
2346 evtimer_del(&deleteevent);
|
|
2347 } else {
|
|
2348 initialized = true;
|
|
2349 }
|
|
2350
|
|
2351 evtimer_set(&deleteevent, delete_handler, 0);
|
|
2352 event_base_set(main_base, &deleteevent);
|
|
2353 evtimer_add(&deleteevent, &t);
|
|
2354 run_deferred_deletes();
|
|
2355 }
|
|
2356
|
|
2357 /* Call run_deferred_deletes instead of this. */
|
|
2358 void do_run_deferred_deletes(void)
|
|
2359 {
|
|
2360 int i, j = 0;
|
|
2361
|
|
2362 for (i = 0; i < delcurr; i++) {
|
|
2363 item *it = todelete[i];
|
|
2364 if (item_delete_lock_over(it)) {
|
|
2365 assert(it->refcount > 0);
|
|
2366 it->it_flags &= ~ITEM_DELETED;
|
|
2367 do_item_unlink(it);
|
|
2368 do_item_remove(it);
|
|
2369 } else {
|
|
2370 todelete[j++] = it;
|
|
2371 }
|
|
2372 }
|
|
2373 delcurr = j;
|
|
2374 }
|
|
2375
|
|
2376 static void usage(void) {
|
|
2377 printf(PACKAGE " " VERSION "\n");
|
|
2378 printf("-p <num> TCP port number to listen on (default: 11211)\n"
|
|
2379 "-U <num> UDP port number to listen on (default: 0, off)\n"
|
|
2380 "-s <file> unix socket path to listen on (disables network support)\n"
|
|
2381 "-l <ip_addr> interface to listen on, default is INDRR_ANY\n"
|
|
2382 "-d run as a daemon\n"
|
|
2383 "-r maximize core file limit\n"
|
|
2384 "-u <username> assume identity of <username> (only when run as root)\n"
|
|
2385 "-m <num> max memory to use for items in megabytes, default is 64 MB\n"
|
|
2386 "-M return error on memory exhausted (rather than removing items)\n"
|
|
2387 "-c <num> max simultaneous connections, default is 1024\n"
|
|
2388 "-k lock down all paged memory\n"
|
|
2389 "-v verbose (print errors/warnings while in event loop)\n"
|
|
2390 "-vv very verbose (also print client commands/reponses)\n"
|
|
2391 "-h print this help and exit\n"
|
|
2392 "-i print memcached and libevent license\n"
|
|
2393 "-b run a managed instanced (mnemonic: buckets)\n"
|
|
2394 "-P <file> save PID in <file>, only used with -d option\n"
|
|
2395 "-f <factor> chunk size growth factor, default 1.25\n"
|
|
2396 "-n <bytes> minimum space allocated for key+value+flags, default 48\n");
|
|
2397 #ifdef USE_THREADS
|
|
2398 printf("-t <num> number of threads to use, default 4\n");
|
|
2399 #endif
|
|
2400 return;
|
|
2401 }
|
|
2402
|
|
2403 static void usage_license(void) {
|
|
2404 printf(PACKAGE " " VERSION "\n\n");
|
|
2405 printf(
|
|
2406 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
|
|
2407 "All rights reserved.\n"
|
|
2408 "\n"
|
|
2409 "Redistribution and use in source and binary forms, with or without\n"
|
|
2410 "modification, are permitted provided that the following conditions are\n"
|
|
2411 "met:\n"
|
|
2412 "\n"
|
|
2413 " * Redistributions of source code must retain the above copyright\n"
|
|
2414 "notice, this list of conditions and the following disclaimer.\n"
|
|
2415 "\n"
|
|
2416 " * Redistributions in binary form must reproduce the above\n"
|
|
2417 "copyright notice, this list of conditions and the following disclaimer\n"
|
|
2418 "in the documentation and/or other materials provided with the\n"
|
|
2419 "distribution.\n"
|
|
2420 "\n"
|
|
2421 " * Neither the name of the Danga Interactive nor the names of its\n"
|
|
2422 "contributors may be used to endorse or promote products derived from\n"
|
|
2423 "this software without specific prior written permission.\n"
|
|
2424 "\n"
|
|
2425 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
|
|
2426 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
|
|
2427 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
|
|
2428 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
|
|
2429 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
|
|
2430 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
|
|
2431 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
|
|
2432 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
|
|
2433 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
|
|
2434 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
|
|
2435 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
|
|
2436 "\n"
|
|
2437 "\n"
|
|
2438 "This product includes software developed by Niels Provos.\n"
|
|
2439 "\n"
|
|
2440 "[ libevent ]\n"
|
|
2441 "\n"
|
|
2442 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
|
|
2443 "All rights reserved.\n"
|
|
2444 "\n"
|
|
2445 "Redistribution and use in source and binary forms, with or without\n"
|
|
2446 "modification, are permitted provided that the following conditions\n"
|
|
2447 "are met:\n"
|
|
2448 "1. Redistributions of source code must retain the above copyright\n"
|
|
2449 " notice, this list of conditions and the following disclaimer.\n"
|
|
2450 "2. Redistributions in binary form must reproduce the above copyright\n"
|
|
2451 " notice, this list of conditions and the following disclaimer in the\n"
|
|
2452 " documentation and/or other materials provided with the distribution.\n"
|
|
2453 "3. All advertising materials mentioning features or use of this software\n"
|
|
2454 " must display the following acknowledgement:\n"
|
|
2455 " This product includes software developed by Niels Provos.\n"
|
|
2456 "4. The name of the author may not be used to endorse or promote products\n"
|
|
2457 " derived from this software without specific prior written permission.\n"
|
|
2458 "\n"
|
|
2459 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
|
|
2460 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
|
|
2461 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
|
|
2462 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
|
|
2463 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
|
|
2464 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
|
|
2465 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
|
|
2466 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
|
|
2467 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
|
|
2468 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
|
|
2469 );
|
|
2470
|
|
2471 return;
|
|
2472 }
|
|
2473
|
|
2474 static void save_pid(const pid_t pid, const char *pid_file) {
|
|
2475 FILE *fp;
|
|
2476 if (pid_file == NULL)
|
|
2477 return;
|
|
2478
|
|
2479 if (!(fp = fopen(pid_file, "w"))) {
|
|
2480 fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
|
|
2481 return;
|
|
2482 }
|
|
2483
|
|
2484 fprintf(fp,"%ld\n", (long)pid);
|
|
2485 if (fclose(fp) == -1) {
|
|
2486 fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
|
|
2487 return;
|
|
2488 }
|
|
2489 }
|
|
2490
|
|
2491 static void remove_pidfile(const char *pid_file) {
|
|
2492 if (pid_file == NULL)
|
|
2493 return;
|
|
2494
|
|
2495 if (unlink(pid_file) != 0) {
|
|
2496 fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
|
|
2497 }
|
|
2498
|
|
2499 }
|
|
2500
|
|
2501
|
|
2502 static void sig_handler(const int sig) {
|
|
2503 printf("SIGINT handled.\n");
|
|
2504 exit(EXIT_SUCCESS);
|
|
2505 }
|
|
2506
|
|
2507 int main (int argc, char **argv) {
|
|
2508 int c;
|
|
2509 struct in_addr addr;
|
|
2510 bool lock_memory = false;
|
|
2511 bool daemonize = false;
|
|
2512 int maxcore = 0;
|
|
2513 char *username = NULL;
|
|
2514 char *pid_file = NULL;
|
|
2515 struct passwd *pw;
|
|
2516 struct sigaction sa;
|
|
2517 struct rlimit rlim;
|
|
2518
|
|
2519 /* handle SIGINT */
|
|
2520 signal(SIGINT, sig_handler);
|
|
2521
|
|
2522 /* init settings */
|
|
2523 settings_init();
|
|
2524
|
|
2525 /* set stderr non-buffering (for running under, say, daemontools) */
|
|
2526 setbuf(stderr, NULL);
|
|
2527
|
|
2528 /* process arguments */
|
|
2529 while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
|
|
2530 switch (c) {
|
|
2531 case 'U':
|
|
2532 settings.udpport = atoi(optarg);
|
|
2533 break;
|
|
2534 case 'b':
|
|
2535 settings.managed = true;
|
|
2536 break;
|
|
2537 case 'p':
|
|
2538 settings.port = atoi(optarg);
|
|
2539 break;
|
|
2540 case 's':
|
|
2541 settings.socketpath = optarg;
|
|
2542 break;
|
|
2543 case 'm':
|
|
2544 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
|
|
2545 break;
|
|
2546 case 'M':
|
|
2547 settings.evict_to_free = 0;
|
|
2548 break;
|
|
2549 case 'c':
|
|
2550 settings.maxconns = atoi(optarg);
|
|
2551 break;
|
|
2552 case 'h':
|
|
2553 usage();
|
|
2554 exit(EXIT_SUCCESS);
|
|
2555 case 'i':
|
|
2556 usage_license();
|
|
2557 exit(EXIT_SUCCESS);
|
|
2558 case 'k':
|
|
2559 lock_memory = true;
|
|
2560 break;
|
|
2561 case 'v':
|
|
2562 settings.verbose++;
|
|
2563 break;
|
|
2564 case 'l':
|
|
2565 if (inet_pton(AF_INET, optarg, &addr) <= 0) {
|
|
2566 fprintf(stderr, "Illegal address: %s\n", optarg);
|
|
2567 return 1;
|
|
2568 } else {
|
|
2569 settings.interf = addr;
|
|
2570 }
|
|
2571 break;
|
|
2572 case 'd':
|
|
2573 daemonize = true;
|
|
2574 break;
|
|
2575 case 'r':
|
|
2576 maxcore = 1;
|
|
2577 break;
|
|
2578 case 'u':
|
|
2579 username = optarg;
|
|
2580 break;
|
|
2581 case 'P':
|
|
2582 pid_file = optarg;
|
|
2583 break;
|
|
2584 case 'f':
|
|
2585 settings.factor = atof(optarg);
|
|
2586 if (settings.factor <= 1.0) {
|
|
2587 fprintf(stderr, "Factor must be greater than 1\n");
|
|
2588 return 1;
|
|
2589 }
|
|
2590 break;
|
|
2591 case 'n':
|
|
2592 settings.chunk_size = atoi(optarg);
|
|
2593 if (settings.chunk_size == 0) {
|
|
2594 fprintf(stderr, "Chunk size must be greater than 0\n");
|
|
2595 return 1;
|
|
2596 }
|
|
2597 break;
|
|
2598 case 't':
|
|
2599 settings.num_threads = atoi(optarg);
|
|
2600 if (settings.num_threads == 0) {
|
|
2601 fprintf(stderr, "Number of threads must be greater than 0\n");
|
|
2602 return 1;
|
|
2603 }
|
|
2604 break;
|
|
2605 case 'D':
|
|
2606 if (! optarg || ! optarg[0]) {
|
|
2607 fprintf(stderr, "No delimiter specified\n");
|
|
2608 return 1;
|
|
2609 }
|
|
2610 settings.prefix_delimiter = optarg[0];
|
|
2611 settings.detail_enabled = 1;
|
|
2612 break;
|
|
2613 default:
|
|
2614 fprintf(stderr, "Illegal argument \"%c\"\n", c);
|
|
2615 return 1;
|
|
2616 }
|
|
2617 }
|
|
2618
|
|
2619 if (maxcore != 0) {
|
|
2620 struct rlimit rlim_new;
|
|
2621 /*
|
|
2622 * First try raising to infinity; if that fails, try bringing
|
|
2623 * the soft limit to the hard.
|
|
2624 */
|
|
2625 if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
|
|
2626 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
|
|
2627 if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
|
|
2628 /* failed. try raising just to the old max */
|
|
2629 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
|
|
2630 (void)setrlimit(RLIMIT_CORE, &rlim_new);
|
|
2631 }
|
|
2632 }
|
|
2633 /*
|
|
2634 * getrlimit again to see what we ended up with. Only fail if
|
|
2635 * the soft limit ends up 0, because then no core files will be
|
|
2636 * created at all.
|
|
2637 */
|
|
2638
|
|
2639 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
|
|
2640 fprintf(stderr, "failed to ensure corefile creation\n");
|
|
2641 exit(EXIT_FAILURE);
|
|
2642 }
|
|
2643 }
|
|
2644
|
|
2645 /*
|
|
2646 * If needed, increase rlimits to allow as many connections
|
|
2647 * as needed.
|
|
2648 */
|
|
2649
|
|
2650 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
|
|
2651 fprintf(stderr, "failed to getrlimit number of files\n");
|
|
2652 exit(EXIT_FAILURE);
|
|
2653 } else {
|
|
2654 int maxfiles = settings.maxconns;
|
|
2655 if (rlim.rlim_cur < maxfiles)
|
|
2656 rlim.rlim_cur = maxfiles + 3;
|
|
2657 if (rlim.rlim_max < rlim.rlim_cur)
|
|
2658 rlim.rlim_max = rlim.rlim_cur;
|
|
2659 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
|
|
2660 fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n");
|
|
2661 exit(EXIT_FAILURE);
|
|
2662 }
|
|
2663 }
|
|
2664
|
|
2665 /*
|
|
2666 * initialization order: first create the listening sockets
|
|
2667 * (may need root on low ports), then drop root if needed,
|
|
2668 * then daemonise if needed, then init libevent (in some cases
|
|
2669 * descriptors created by libevent wouldn't survive forking).
|
|
2670 */
|
|
2671
|
|
2672 /* create the listening socket and bind it */
|
|
2673 if (settings.socketpath == NULL) {
|
|
2674 l_socket = server_socket(settings.port, 0);
|
|
2675 if (l_socket == -1) {
|
|
2676 fprintf(stderr, "failed to listen\n");
|
|
2677 exit(EXIT_FAILURE);
|
|
2678 }
|
|
2679 }
|
|
2680
|
|
2681 if (settings.udpport > 0 && settings.socketpath == NULL) {
|
|
2682 /* create the UDP listening socket and bind it */
|
|
2683 u_socket = server_socket(settings.udpport, 1);
|
|
2684 if (u_socket == -1) {
|
|
2685 fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
|
|
2686 exit(EXIT_FAILURE);
|
|
2687 }
|
|
2688 }
|
|
2689
|
|
2690 /* lose root privileges if we have them */
|
|
2691 if (getuid() == 0 || geteuid() == 0) {
|
|
2692 if (username == 0 || *username == '\0') {
|
|
2693 fprintf(stderr, "can't run as root without the -u switch\n");
|
|
2694 return 1;
|
|
2695 }
|
|
2696 if ((pw = getpwnam(username)) == 0) {
|
|
2697 fprintf(stderr, "can't find the user %s to switch to\n", username);
|
|
2698 return 1;
|
|
2699 }
|
|
2700 if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
|
|
2701 fprintf(stderr, "failed to assume identity of user %s\n", username);
|
|
2702 return 1;
|
|
2703 }
|
|
2704 }
|
|
2705
|
|
2706 /* create unix mode sockets after dropping privileges */
|
|
2707 if (settings.socketpath != NULL) {
|
|
2708 l_socket = server_socket_unix(settings.socketpath);
|
|
2709 if (l_socket == -1) {
|
|
2710 fprintf(stderr, "failed to listen\n");
|
|
2711 exit(EXIT_FAILURE);
|
|
2712 }
|
|
2713 }
|
|
2714
|
|
2715 /* daemonize if requested */
|
|
2716 /* if we want to ensure our ability to dump core, don't chdir to / */
|
|
2717 if (daemonize) {
|
|
2718 int res;
|
|
2719 res = daemon(maxcore, settings.verbose);
|
|
2720 if (res == -1) {
|
|
2721 fprintf(stderr, "failed to daemon() in order to daemonize\n");
|
|
2722 return 1;
|
|
2723 }
|
|
2724 }
|
|
2725
|
|
2726 /* initialize main thread libevent instance */
|
|
2727 main_base = event_init();
|
|
2728
|
|
2729 /* initialize other stuff */
|
|
2730 item_init();
|
|
2731 stats_init();
|
|
2732 assoc_init();
|
|
2733 conn_init();
|
|
2734 slabs_init(settings.maxbytes, settings.factor);
|
|
2735
|
|
2736 /* managed instance? alloc and zero a bucket array */
|
|
2737 if (settings.managed) {
|
|
2738 buckets = malloc(sizeof(int) * MAX_BUCKETS);
|
|
2739 if (buckets == 0) {
|
|
2740 fprintf(stderr, "failed to allocate the bucket array");
|
|
2741 exit(EXIT_FAILURE);
|
|
2742 }
|
|
2743 memset(buckets, 0, sizeof(int) * MAX_BUCKETS);
|
|
2744 }
|
|
2745
|
|
2746 /* lock paged memory if needed */
|
|
2747 if (lock_memory) {
|
|
2748 #ifdef HAVE_MLOCKALL
|
|
2749 mlockall(MCL_CURRENT | MCL_FUTURE);
|
|
2750 #else
|
|
2751 fprintf(stderr, "warning: mlockall() not supported on this platform. proceeding without.\n");
|
|
2752 #endif
|
|
2753 }
|
|
2754
|
|
2755 /*
|
|
2756 * ignore SIGPIPE signals; we can use errno==EPIPE if we
|
|
2757 * need that information
|
|
2758 */
|
|
2759 sa.sa_handler = SIG_IGN;
|
|
2760 sa.sa_flags = 0;
|
|
2761 if (sigemptyset(&sa.sa_mask) == -1 ||
|
|
2762 sigaction(SIGPIPE, &sa, 0) == -1) {
|
|
2763 perror("failed to ignore SIGPIPE; sigaction");
|
|
2764 exit(EXIT_FAILURE);
|
|
2765 }
|
|
2766 /* create the initial listening connection */
|
|
2767 if (!(listen_conn = conn_new(l_socket, conn_listening,
|
|
2768 EV_READ | EV_PERSIST, 1, false, main_base))) {
|
|
2769 fprintf(stderr, "failed to create listening connection");
|
|
2770 exit(EXIT_FAILURE);
|
|
2771 }
|
|
2772 /* save the PID in if we're a daemon */
|
|
2773 if (daemonize)
|
|
2774 save_pid(getpid(), pid_file);
|
|
2775 /* start up worker threads if MT mode */
|
|
2776 thread_init(settings.num_threads, main_base);
|
|
2777 /* initialise clock event */
|
|
2778 clock_handler(0, 0, 0);
|
|
2779 /* initialise deletion array and timer event */
|
|
2780 deltotal = 200;
|
|
2781 delcurr = 0;
|
|
2782 todelete = malloc(sizeof(item *) * deltotal);
|
|
2783 delete_handler(0, 0, 0); /* sets up the event */
|
|
2784 /* create the initial listening udp connection, monitored on all threads */
|
|
2785 if (u_socket > -1) {
|
|
2786 for (c = 0; c < settings.num_threads; c++) {
|
|
2787 /* this is guaranteed to hit all threads because we round-robin */
|
|
2788 dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
|
|
2789 UDP_READ_BUFFER_SIZE, 1);
|
|
2790 }
|
|
2791 }
|
|
2792 /* enter the event loop */
|
|
2793 event_base_loop(main_base, 0);
|
|
2794 /* remove the PID file if we're a daemon */
|
|
2795 if (daemonize)
|
|
2796 remove_pidfile(pid_file);
|
|
2797 return 0;
|
|
2798 }
|