diff memcached.c @ 0:30782bb1fc04 MEMCACHED_1_2_3

memcached-1.2.3
author Maxim Dounin <mdounin@mdounin.ru>
date Sun, 23 Sep 2007 03:58:34 +0400
parents
children e28ab6bd21fa
line wrap: on
line diff
new file mode 100644
--- /dev/null
+++ b/memcached.c
@@ -0,0 +1,2752 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *  memcached - memory caching daemon
+ *
+ *       http://www.danga.com/memcached/
+ *
+ *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
+ *
+ *  Use and distribution licensed under the BSD license.  See
+ *  the LICENSE file for full text.
+ *
+ *  Authors:
+ *      Anatoly Vorobey <mellon@pobox.com>
+ *      Brad Fitzpatrick <brad@danga.com>
+std *
+ *  $Id: memcached.c 570 2007-06-20 01:21:45Z plindner $
+ */
+#include "memcached.h"
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/signal.h>
+#include <sys/resource.h>
+#include <sys/uio.h>
+
+/* some POSIX systems need the following definition
+ * to get mlockall flags out of sys/mman.h.  */
+#ifndef _P1003_1B_VISIBLE
+#define _P1003_1B_VISIBLE
+#endif
+/* need this to get IOV_MAX on some platforms. */
+#ifndef __need_IOV_MAX
+#define __need_IOV_MAX
+#endif
+#include <pwd.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <assert.h>
+#include <limits.h>
+
+#ifdef HAVE_MALLOC_H
+/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
+#ifndef __OpenBSD__
+#include <malloc.h>
+#endif
+#endif
+
+/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
+#ifndef IOV_MAX
+#if defined(__FreeBSD__)
+# define IOV_MAX 1024
+#endif
+#endif
+
+/*
+ * forward declarations
+ */
+static void drive_machine(conn *c);
+static int new_socket(const bool is_udp);
+static int server_socket(const int port, const bool is_udp);
+static int try_read_command(conn *c);
+static int try_read_network(conn *c);
+static int try_read_udp(conn *c);
+
+/* stats */
+static void stats_reset(void);
+static void stats_init(void);
+
+/* defaults */
+static void settings_init(void);
+
+/* event handling, network IO */
+static void event_handler(const int fd, const short which, void *arg);
+static void conn_close(conn *c);
+static void conn_init(void);
+static void accept_new_conns(const bool do_accept);
+static bool update_event(conn *c, const int new_flags);
+static void complete_nread(conn *c);
+static void process_command(conn *c, char *command);
+static int transmit(conn *c);
+static int ensure_iov_space(conn *c);
+static int add_iov(conn *c, const void *buf, int len);
+static int add_msghdr(conn *c);
+
+
+/* time handling */
+static void set_current_time(void);  /* update the global variable holding
+                              global 32-bit seconds-since-start time
+                              (to avoid 64 bit time_t) */
+
+void pre_gdb(void);
+static void conn_free(conn *c);
+
+/** exported globals **/
+struct stats stats;
+struct settings settings;
+
+/** file scope variables **/
+static item **todelete = 0;
+static int delcurr;
+static int deltotal;
+static conn *listen_conn;
+static struct event_base *main_base;
+
+#define TRANSMIT_COMPLETE   0
+#define TRANSMIT_INCOMPLETE 1
+#define TRANSMIT_SOFT_ERROR 2
+#define TRANSMIT_HARD_ERROR 3
+
+static int *buckets = 0; /* bucket->generation array for a managed instance */
+
+#define REALTIME_MAXDELTA 60*60*24*30
+/*
+ * given time value that's either unix time or delta from current unix time, return
+ * unix time. Use the fact that delta can't exceed one month (and real time value can't
+ * be that low).
+ */
+static rel_time_t realtime(const time_t exptime) {
+    /* no. of seconds in 30 days - largest possible delta exptime */
+
+    if (exptime == 0) return 0; /* 0 means never expire */
+
+    if (exptime > REALTIME_MAXDELTA) {
+        /* if item expiration is at/before the server started, give it an
+           expiration time of 1 second after the server started.
+           (because 0 means don't expire).  without this, we'd
+           underflow and wrap around to some large value way in the
+           future, effectively making items expiring in the past
+           really expiring never */
+        if (exptime <= stats.started)
+            return (rel_time_t)1;
+        return (rel_time_t)(exptime - stats.started);
+    } else {
+        return (rel_time_t)(exptime + current_time);
+    }
+}
+
+static void stats_init(void) {
+    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
+    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
+    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
+
+    /* make the time we started always be 2 seconds before we really
+       did, so time(0) - time.started is never zero.  if so, things
+       like 'settings.oldest_live' which act as booleans as well as
+       values are now false in boolean context... */
+    stats.started = time(0) - 2;
+    stats_prefix_init();
+}
+
+static void stats_reset(void) {
+    STATS_LOCK();
+    stats.total_items = stats.total_conns = 0;
+    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
+    stats.bytes_read = stats.bytes_written = 0;
+    stats_prefix_clear();
+    STATS_UNLOCK();
+}
+
+static void settings_init(void) {
+    settings.port = 11211;
+    settings.udpport = 0;
+    settings.interf.s_addr = htonl(INADDR_ANY);
+    settings.maxbytes = 67108864; /* default is 64MB: (64 * 1024 * 1024) */
+    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
+    settings.verbose = 0;
+    settings.oldest_live = 0;
+    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
+    settings.socketpath = NULL;       /* by default, not using a unix socket */
+    settings.managed = false;
+    settings.factor = 1.25;
+    settings.chunk_size = 48;         /* space for a modest key and value */
+#ifdef USE_THREADS
+    settings.num_threads = 4;
+#else
+    settings.num_threads = 1;
+#endif
+    settings.prefix_delimiter = ':';
+    settings.detail_enabled = 0;
+}
+
+/* returns true if a deleted item's delete-locked-time is over, and it
+   should be removed from the namespace */
+static bool item_delete_lock_over (item *it) {
+    assert(it->it_flags & ITEM_DELETED);
+    return (current_time >= it->exptime);
+}
+
+/*
+ * Adds a message header to a connection.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+static int add_msghdr(conn *c)
+{
+    struct msghdr *msg;
+
+    assert(c != NULL);
+
+    if (c->msgsize == c->msgused) {
+        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
+        if (! msg)
+            return -1;
+        c->msglist = msg;
+        c->msgsize *= 2;
+    }
+
+    msg = c->msglist + c->msgused;
+
+    /* this wipes msg_iovlen, msg_control, msg_controllen, and
+       msg_flags, the last 3 of which aren't defined on solaris: */
+    memset(msg, 0, sizeof(struct msghdr));
+
+    msg->msg_iov = &c->iov[c->iovused];
+    msg->msg_name = &c->request_addr;
+    msg->msg_namelen = c->request_addr_size;
+
+    c->msgbytes = 0;
+    c->msgused++;
+
+    if (c->udp) {
+        /* Leave room for the UDP header, which we'll fill in later. */
+        return add_iov(c, NULL, UDP_HEADER_SIZE);
+    }
+
+    return 0;
+}
+
+
+/*
+ * Free list management for connections.
+ */
+
+static conn **freeconns;
+static int freetotal;
+static int freecurr;
+
+
+static void conn_init(void) {
+    freetotal = 200;
+    freecurr = 0;
+    if (!(freeconns = (conn **)malloc(sizeof(conn *) * freetotal))) {
+        perror("malloc()");
+    }
+    return;
+}
+
+/*
+ * Returns a connection from the freelist, if any. Should call this using
+ * conn_from_freelist() for thread safety.
+ */
+conn *do_conn_from_freelist() {
+    conn *c;
+
+    if (freecurr > 0) {
+        c = freeconns[--freecurr];
+    } else {
+        c = NULL;
+    }
+
+    return c;
+}
+
+/*
+ * Adds a connection to the freelist. 0 = success. Should call this using
+ * conn_add_to_freelist() for thread safety.
+ */
+int do_conn_add_to_freelist(conn *c) {
+    if (freecurr < freetotal) {
+        freeconns[freecurr++] = c;
+        return 0;
+    } else {
+        /* try to enlarge free connections array */
+        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
+        if (new_freeconns) {
+            freetotal *= 2;
+            freeconns = new_freeconns;
+            freeconns[freecurr++] = c;
+            return 0;
+        }
+    }
+    return 1;
+}
+
+conn *conn_new(const int sfd, const int init_state, const int event_flags,
+                const int read_buffer_size, const bool is_udp, struct event_base *base) {
+    conn *c = conn_from_freelist();
+
+    if (NULL == c) {
+        if (!(c = (conn *)malloc(sizeof(conn)))) {
+            perror("malloc()");
+            return NULL;
+        }
+        c->rbuf = c->wbuf = 0;
+        c->ilist = 0;
+        c->iov = 0;
+        c->msglist = 0;
+        c->hdrbuf = 0;
+
+        c->rsize = read_buffer_size;
+        c->wsize = DATA_BUFFER_SIZE;
+        c->isize = ITEM_LIST_INITIAL;
+        c->iovsize = IOV_LIST_INITIAL;
+        c->msgsize = MSG_LIST_INITIAL;
+        c->hdrsize = 0;
+
+        c->rbuf = (char *)malloc((size_t)c->rsize);
+        c->wbuf = (char *)malloc((size_t)c->wsize);
+        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
+        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
+        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
+
+        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
+                c->msglist == 0) {
+            if (c->rbuf != 0) free(c->rbuf);
+            if (c->wbuf != 0) free(c->wbuf);
+            if (c->ilist !=0) free(c->ilist);
+            if (c->iov != 0) free(c->iov);
+            if (c->msglist != 0) free(c->msglist);
+            free(c);
+            perror("malloc()");
+            return NULL;
+        }
+
+        STATS_LOCK();
+        stats.conn_structs++;
+        STATS_UNLOCK();
+    }
+
+    if (settings.verbose > 1) {
+        if (init_state == conn_listening)
+            fprintf(stderr, "<%d server listening\n", sfd);
+        else if (is_udp)
+            fprintf(stderr, "<%d server listening (udp)\n", sfd);
+        else
+            fprintf(stderr, "<%d new client connection\n", sfd);
+    }
+
+    c->sfd = sfd;
+    c->udp = is_udp;
+    c->state = init_state;
+    c->rlbytes = 0;
+    c->rbytes = c->wbytes = 0;
+    c->wcurr = c->wbuf;
+    c->rcurr = c->rbuf;
+    c->ritem = 0;
+    c->icurr = c->ilist;
+    c->ileft = 0;
+    c->iovused = 0;
+    c->msgcurr = 0;
+    c->msgused = 0;
+
+    c->write_and_go = conn_read;
+    c->write_and_free = 0;
+    c->item = 0;
+    c->bucket = -1;
+    c->gen = 0;
+
+    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
+    event_base_set(base, &c->event);
+    c->ev_flags = event_flags;
+
+    if (event_add(&c->event, 0) == -1) {
+        if (conn_add_to_freelist(c)) {
+            conn_free(c);
+        }
+        return NULL;
+    }
+
+    STATS_LOCK();
+    stats.curr_conns++;
+    stats.total_conns++;
+    STATS_UNLOCK();
+
+    return c;
+}
+
+static void conn_cleanup(conn *c) {
+    assert(c != NULL);
+
+    if (c->item) {
+        item_remove(c->item);
+        c->item = 0;
+    }
+
+    if (c->ileft != 0) {
+        for (; c->ileft > 0; c->ileft--,c->icurr++) {
+            item_remove(*(c->icurr));
+        }
+    }
+
+    if (c->write_and_free) {
+        free(c->write_and_free);
+        c->write_and_free = 0;
+    }
+}
+
+/*
+ * Frees a connection.
+ */
+void conn_free(conn *c) {
+    if (c) {
+        if (c->hdrbuf)
+            free(c->hdrbuf);
+        if (c->msglist)
+            free(c->msglist);
+        if (c->rbuf)
+            free(c->rbuf);
+        if (c->wbuf)
+            free(c->wbuf);
+        if (c->ilist)
+            free(c->ilist);
+        if (c->iov)
+            free(c->iov);
+        free(c);
+    }
+}
+
+static void conn_close(conn *c) {
+    assert(c != NULL);
+
+    /* delete the event, the socket and the conn */
+    event_del(&c->event);
+
+    if (settings.verbose > 1)
+        fprintf(stderr, "<%d connection closed.\n", c->sfd);
+
+    close(c->sfd);
+    accept_new_conns(true);
+    conn_cleanup(c);
+
+    /* if the connection has big buffers, just free it */
+    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
+        conn_free(c);
+    }
+
+    STATS_LOCK();
+    stats.curr_conns--;
+    STATS_UNLOCK();
+
+    return;
+}
+
+
+/*
+ * Shrinks a connection's buffers if they're too big.  This prevents
+ * periodic large "get" requests from permanently chewing lots of server
+ * memory.
+ *
+ * This should only be called in between requests since it can wipe output
+ * buffers!
+ */
+static void conn_shrink(conn *c) {
+    assert(c != NULL);
+
+    if (c->udp)
+        return;
+
+    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
+        char *newbuf;
+
+        if (c->rcurr != c->rbuf)
+            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
+
+        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
+
+        if (newbuf) {
+            c->rbuf = newbuf;
+            c->rsize = DATA_BUFFER_SIZE;
+        }
+        /* TODO check other branch... */
+        c->rcurr = c->rbuf;
+    }
+
+    if (c->isize > ITEM_LIST_HIGHWAT) {
+        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
+        if (newbuf) {
+            c->ilist = newbuf;
+            c->isize = ITEM_LIST_INITIAL;
+        }
+    /* TODO check error condition? */
+    }
+
+    if (c->msgsize > MSG_LIST_HIGHWAT) {
+        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
+        if (newbuf) {
+            c->msglist = newbuf;
+            c->msgsize = MSG_LIST_INITIAL;
+        }
+    /* TODO check error condition? */
+    }
+
+    if (c->iovsize > IOV_LIST_HIGHWAT) {
+        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
+        if (newbuf) {
+            c->iov = newbuf;
+            c->iovsize = IOV_LIST_INITIAL;
+        }
+    /* TODO check return value */
+    }
+}
+
+/*
+ * Sets a connection's current state in the state machine. Any special
+ * processing that needs to happen on certain state transitions can
+ * happen here.
+ */
+static void conn_set_state(conn *c, int state) {
+    assert(c != NULL);
+
+    if (state != c->state) {
+        if (state == conn_read) {
+            conn_shrink(c);
+            assoc_move_next_bucket();
+        }
+        c->state = state;
+    }
+}
+
+
+/*
+ * Ensures that there is room for another struct iovec in a connection's
+ * iov list.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+static int ensure_iov_space(conn *c) {
+    assert(c != NULL);
+
+    if (c->iovused >= c->iovsize) {
+        int i, iovnum;
+        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
+                                (c->iovsize * 2) * sizeof(struct iovec));
+        if (! new_iov)
+            return -1;
+        c->iov = new_iov;
+        c->iovsize *= 2;
+
+        /* Point all the msghdr structures at the new list. */
+        for (i = 0, iovnum = 0; i < c->msgused; i++) {
+            c->msglist[i].msg_iov = &c->iov[iovnum];
+            iovnum += c->msglist[i].msg_iovlen;
+        }
+    }
+
+    return 0;
+}
+
+
+/*
+ * Adds data to the list of pending data that will be written out to a
+ * connection.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+
+static int add_iov(conn *c, const void *buf, int len) {
+    struct msghdr *m;
+    int leftover;
+    bool limit_to_mtu;
+
+    assert(c != NULL);
+
+    do {
+        m = &c->msglist[c->msgused - 1];
+
+        /*
+         * Limit UDP packets, and the first payloads of TCP replies, to
+         * UDP_MAX_PAYLOAD_SIZE bytes.
+         */
+        limit_to_mtu = c->udp || (1 == c->msgused);
+
+        /* We may need to start a new msghdr if this one is full. */
+        if (m->msg_iovlen == IOV_MAX ||
+            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
+            add_msghdr(c);
+            m = &c->msglist[c->msgused - 1];
+        }
+
+        if (ensure_iov_space(c) != 0)
+            return -1;
+
+        /* If the fragment is too big to fit in the datagram, split it up */
+        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
+            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
+            len -= leftover;
+        } else {
+            leftover = 0;
+        }
+
+        m = &c->msglist[c->msgused - 1];
+        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
+        m->msg_iov[m->msg_iovlen].iov_len = len;
+
+        c->msgbytes += len;
+        c->iovused++;
+        m->msg_iovlen++;
+
+        buf = ((char *)buf) + len;
+        len = leftover;
+    } while (leftover > 0);
+
+    return 0;
+}
+
+
+/*
+ * Constructs a set of UDP headers and attaches them to the outgoing messages.
+ */
+static int build_udp_headers(conn *c) {
+    int i;
+    unsigned char *hdr;
+
+    assert(c != NULL);
+
+    if (c->msgused > c->hdrsize) {
+        void *new_hdrbuf;
+        if (c->hdrbuf)
+            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
+        else
+            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
+        if (! new_hdrbuf)
+            return -1;
+        c->hdrbuf = (unsigned char *)new_hdrbuf;
+        c->hdrsize = c->msgused * 2;
+    }
+
+    hdr = c->hdrbuf;
+    for (i = 0; i < c->msgused; i++) {
+        c->msglist[i].msg_iov[0].iov_base = hdr;
+        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
+        *hdr++ = c->request_id / 256;
+        *hdr++ = c->request_id % 256;
+        *hdr++ = i / 256;
+        *hdr++ = i % 256;
+        *hdr++ = c->msgused / 256;
+        *hdr++ = c->msgused % 256;
+        *hdr++ = 0;
+        *hdr++ = 0;
+        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
+    }
+
+    return 0;
+}
+
+
+static void out_string(conn *c, const char *str) {
+    size_t len;
+
+    assert(c != NULL);
+
+    if (settings.verbose > 1)
+        fprintf(stderr, ">%d %s\n", c->sfd, str);
+
+    len = strlen(str);
+    if ((len + 2) > c->wsize) {
+        /* ought to be always enough. just fail for simplicity */
+        str = "SERVER_ERROR output line too long";
+        len = strlen(str);
+    }
+
+    memcpy(c->wbuf, str, len);
+    memcpy(c->wbuf + len, "\r\n", 3);
+    c->wbytes = len + 2;
+    c->wcurr = c->wbuf;
+
+    conn_set_state(c, conn_write);
+    c->write_and_go = conn_read;
+    return;
+}
+
+/*
+ * we get here after reading the value in set/add/replace commands. The command
+ * has been stored in c->item_comm, and the item is ready in c->item.
+ */
+
+static void complete_nread(conn *c) {
+    assert(c != NULL);
+
+    item *it = c->item;
+    int comm = c->item_comm;
+
+    STATS_LOCK();
+    stats.set_cmds++;
+    STATS_UNLOCK();
+
+    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
+        out_string(c, "CLIENT_ERROR bad data chunk");
+    } else {
+        if (store_item(it, comm)) {
+            out_string(c, "STORED");
+        } else {
+            out_string(c, "NOT_STORED");
+        }
+    }
+
+    item_remove(c->item);       /* release the c->item reference */
+    c->item = 0;
+}
+
+/*
+ * Stores an item in the cache according to the semantics of one of the set
+ * commands. In threaded mode, this is protected by the cache lock.
+ *
+ * Returns true if the item was stored.
+ */
+int do_store_item(item *it, int comm) {
+    char *key = ITEM_key(it);
+    bool delete_locked = false;
+    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
+    int stored = 0;
+
+    if (old_it != NULL && comm == NREAD_ADD) {
+        /* add only adds a nonexistent item, but promote to head of LRU */
+        do_item_update(old_it);
+    } else if (!old_it && comm == NREAD_REPLACE) {
+        /* replace only replaces an existing value; don't store */
+    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
+        /* replace and add can't override delete locks; don't store */
+    } else {
+        /* "set" commands can override the delete lock
+           window... in which case we have to find the old hidden item
+           that's in the namespace/LRU but wasn't returned by
+           item_get.... because we need to replace it */
+        if (delete_locked)
+            old_it = do_item_get_nocheck(key, it->nkey);
+
+        if (old_it != NULL)
+            do_item_replace(old_it, it);
+        else
+            do_item_link(it);
+
+        stored = 1;
+    }
+
+    if (old_it)
+        do_item_remove(old_it);         /* release our reference */
+    return stored;
+}
+
+typedef struct token_s {
+    char *value;
+    size_t length;
+} token_t;
+
+#define COMMAND_TOKEN 0
+#define SUBCOMMAND_TOKEN 1
+#define KEY_TOKEN 1
+#define KEY_MAX_LENGTH 250
+
+#define MAX_TOKENS 6
+
+/*
+ * Tokenize the command string by replacing whitespace with '\0' and update
+ * the token array tokens with pointer to start of each token and length.
+ * Returns total number of tokens.  The last valid token is the terminal
+ * token (value points to the first unprocessed character of the string and
+ * length zero).
+ *
+ * Usage example:
+ *
+ *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
+ *      for(int ix = 0; tokens[ix].length != 0; ix++) {
+ *          ...
+ *      }
+ *      ncommand = tokens[ix].value - command;
+ *      command  = tokens[ix].value;
+ *   }
+ */
+static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
+    char *s, *e;
+    size_t ntokens = 0;
+
+    assert(command != NULL && tokens != NULL && max_tokens > 1);
+
+    for (s = e = command; ntokens < max_tokens - 1; ++e) {
+        if (*e == ' ') {
+            if (s != e) {
+                tokens[ntokens].value = s;
+                tokens[ntokens].length = e - s;
+                ntokens++;
+                *e = '\0';
+            }
+            s = e + 1;
+        }
+        else if (*e == '\0') {
+            if (s != e) {
+                tokens[ntokens].value = s;
+                tokens[ntokens].length = e - s;
+                ntokens++;
+            }
+
+            break; /* string end */
+        }
+    }
+
+    /*
+     * If we scanned the whole string, the terminal value pointer is null,
+     * otherwise it is the first unprocessed character.
+     */
+    tokens[ntokens].value =  *e == '\0' ? NULL : e;
+    tokens[ntokens].length = 0;
+    ntokens++;
+
+    return ntokens;
+}
+
+inline static void process_stats_detail(conn *c, const char *command) {
+    assert(c != NULL);
+
+    if (strcmp(command, "on") == 0) {
+        settings.detail_enabled = 1;
+        out_string(c, "OK");
+    }
+    else if (strcmp(command, "off") == 0) {
+        settings.detail_enabled = 0;
+        out_string(c, "OK");
+    }
+    else if (strcmp(command, "dump") == 0) {
+        int len;
+        char *stats = stats_prefix_dump(&len);
+        if (NULL != stats) {
+            c->write_and_free = stats;
+            c->wcurr = stats;
+            c->wbytes = len;
+            conn_set_state(c, conn_write);
+            c->write_and_go = conn_read;
+        }
+        else {
+            out_string(c, "SERVER_ERROR");
+        }
+    }
+    else {
+        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
+    }
+}
+
+static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
+    rel_time_t now = current_time;
+    char *command;
+    char *subcommand;
+
+    assert(c != NULL);
+
+    if(ntokens < 2) {
+        out_string(c, "CLIENT_ERROR bad command line");
+        return;
+    }
+
+    command = tokens[COMMAND_TOKEN].value;
+
+    if (ntokens == 2 && strcmp(command, "stats") == 0) {
+        char temp[1024];
+        pid_t pid = getpid();
+        char *pos = temp;
+
+#ifndef WIN32
+        struct rusage usage;
+        getrusage(RUSAGE_SELF, &usage);
+#endif /* !WIN32 */
+
+        STATS_LOCK();
+        pos += sprintf(pos, "STAT pid %u\r\n", pid);
+        pos += sprintf(pos, "STAT uptime %u\r\n", now);
+        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
+        pos += sprintf(pos, "STAT version " VERSION "\r\n");
+        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
+#ifndef WIN32
+        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
+        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
+#endif /* !WIN32 */
+        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
+        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
+        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
+        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
+        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
+        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
+        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
+        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
+        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
+        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
+        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
+        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
+        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
+        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
+        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
+        pos += sprintf(pos, "END");
+        STATS_UNLOCK();
+        out_string(c, temp);
+        return;
+    }
+
+    subcommand = tokens[SUBCOMMAND_TOKEN].value;
+
+    if (strcmp(subcommand, "reset") == 0) {
+        stats_reset();
+        out_string(c, "RESET");
+        return;
+    }
+
+#ifdef HAVE_MALLOC_H
+#ifdef HAVE_STRUCT_MALLINFO
+    if (strcmp(subcommand, "malloc") == 0) {
+        char temp[512];
+        struct mallinfo info;
+        char *pos = temp;
+
+        info = mallinfo();
+        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
+        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
+        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
+        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
+        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
+        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
+        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
+        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
+        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
+        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
+        out_string(c, temp);
+        return;
+    }
+#endif /* HAVE_STRUCT_MALLINFO */
+#endif /* HAVE_MALLOC_H */
+
+#if !defined(WIN32) || !defined(__APPLE__)
+    if (strcmp(subcommand, "maps") == 0) {
+        char *wbuf;
+        int wsize = 8192; /* should be enough */
+        int fd;
+        int res;
+
+        if (!(wbuf = (char *)malloc(wsize))) {
+            out_string(c, "SERVER_ERROR out of memory");
+            return;
+        }
+
+        fd = open("/proc/self/maps", O_RDONLY);
+        if (fd == -1) {
+            out_string(c, "SERVER_ERROR cannot open the maps file");
+            free(wbuf);
+            return;
+        }
+
+        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
+        if (res == wsize - 6) {
+            out_string(c, "SERVER_ERROR buffer overflow");
+            free(wbuf); close(fd);
+            return;
+        }
+        if (res == 0 || res == -1) {
+            out_string(c, "SERVER_ERROR can't read the maps file");
+            free(wbuf); close(fd);
+            return;
+        }
+        memcpy(wbuf + res, "END\r\n", 6);
+        c->write_and_free = wbuf;
+        c->wcurr = wbuf;
+        c->wbytes = res + 5; // Don't write the terminal '\0'
+        conn_set_state(c, conn_write);
+        c->write_and_go = conn_read;
+        close(fd);
+        return;
+    }
+#endif
+
+    if (strcmp(subcommand, "cachedump") == 0) {
+
+        char *buf;
+        unsigned int bytes, id, limit = 0;
+
+        if(ntokens < 5) {
+            out_string(c, "CLIENT_ERROR bad command line");
+            return;
+        }
+
+        id = strtoul(tokens[2].value, NULL, 10);
+        limit = strtoul(tokens[3].value, NULL, 10);
+
+        if(errno == ERANGE) {
+            out_string(c, "CLIENT_ERROR bad command line format");
+            return;
+        }
+
+        buf = item_cachedump(id, limit, &bytes);
+        if (buf == 0) {
+            out_string(c, "SERVER_ERROR out of memory");
+            return;
+        }
+
+        c->write_and_free = buf;
+        c->wcurr = buf;
+        c->wbytes = bytes;
+        conn_set_state(c, conn_write);
+        c->write_and_go = conn_read;
+        return;
+    }
+
+    if (strcmp(subcommand, "slabs") == 0) {
+        int bytes = 0;
+        char *buf = slabs_stats(&bytes);
+        if (!buf) {
+            out_string(c, "SERVER_ERROR out of memory");
+            return;
+        }
+        c->write_and_free = buf;
+        c->wcurr = buf;
+        c->wbytes = bytes;
+        conn_set_state(c, conn_write);
+        c->write_and_go = conn_read;
+        return;
+    }
+
+    if (strcmp(subcommand, "items") == 0) {
+        char buffer[4096];
+        item_stats(buffer, 4096);
+        out_string(c, buffer);
+        return;
+    }
+
+    if (strcmp(subcommand, "detail") == 0) {
+        if (ntokens < 4)
+            process_stats_detail(c, "");  /* outputs the error message */
+        else
+            process_stats_detail(c, tokens[2].value);
+        return;
+    }
+
+    if (strcmp(subcommand, "sizes") == 0) {
+        int bytes = 0;
+        char *buf = item_stats_sizes(&bytes);
+        if (! buf) {
+            out_string(c, "SERVER_ERROR out of memory");
+            return;
+        }
+
+        c->write_and_free = buf;
+        c->wcurr = buf;
+        c->wbytes = bytes;
+        conn_set_state(c, conn_write);
+        c->write_and_go = conn_read;
+        return;
+    }
+
+    out_string(c, "ERROR");
+}
+
+/* ntokens is overwritten here... shrug.. */
+static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) {
+    char *key;
+    size_t nkey;
+    int i = 0;
+    item *it;
+    token_t *key_token = &tokens[KEY_TOKEN];
+
+    assert(c != NULL);
+
+    if (settings.managed) {
+        int bucket = c->bucket;
+        if (bucket == -1) {
+            out_string(c, "CLIENT_ERROR no BG data in managed mode");
+            return;
+        }
+        c->bucket = -1;
+        if (buckets[bucket] != c->gen) {
+            out_string(c, "ERROR_NOT_OWNER");
+            return;
+        }
+    }
+
+    do {
+        while(key_token->length != 0) {
+
+            key = key_token->value;
+            nkey = key_token->length;
+
+            if(nkey > KEY_MAX_LENGTH) {
+                out_string(c, "CLIENT_ERROR bad command line format");
+                return;
+            }
+
+            STATS_LOCK();
+            stats.get_cmds++;
+            STATS_UNLOCK();
+            it = item_get(key, nkey);
+            if (settings.detail_enabled) {
+                stats_prefix_record_get(key, NULL != it);
+            }
+            if (it) {
+                if (i >= c->isize) {
+                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
+                    if (new_list) {
+                        c->isize *= 2;
+                        c->ilist = new_list;
+                    } else break;
+                }
+
+                /*
+                 * Construct the response. Each hit adds three elements to the
+                 * outgoing data list:
+                 *   "VALUE "
+                 *   key
+                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
+                 */
+                if (add_iov(c, "VALUE ", 6) != 0 ||
+                    add_iov(c, ITEM_key(it), it->nkey) != 0 ||
+                    add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
+                    {
+                        break;
+                    }
+                if (settings.verbose > 1)
+                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
+
+                /* item_get() has incremented it->refcount for us */
+                STATS_LOCK();
+                stats.get_hits++;
+                STATS_UNLOCK();
+                item_update(it);
+                *(c->ilist + i) = it;
+                i++;
+
+            } else {
+                STATS_LOCK();
+                stats.get_misses++;
+                STATS_UNLOCK();
+            }
+
+            key_token++;
+        }
+
+        /*
+         * If the command string hasn't been fully processed, get the next set
+         * of tokens.
+         */
+        if(key_token->value != NULL) {
+            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
+            key_token = tokens;
+        }
+
+    } while(key_token->value != NULL);
+
+    c->icurr = c->ilist;
+    c->ileft = i;
+
+    if (settings.verbose > 1)
+        fprintf(stderr, ">%d END\n", c->sfd);
+    add_iov(c, "END\r\n", 5);
+
+    if (c->udp && build_udp_headers(c) != 0) {
+        out_string(c, "SERVER_ERROR out of memory");
+    }
+    else {
+        conn_set_state(c, conn_mwrite);
+        c->msgcurr = 0;
+    }
+    return;
+}
+
+static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm) {
+    char *key;
+    size_t nkey;
+    int flags;
+    time_t exptime;
+    int vlen;
+    item *it;
+
+    assert(c != NULL);
+
+    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
+        out_string(c, "CLIENT_ERROR bad command line format");
+        return;
+    }
+
+    key = tokens[KEY_TOKEN].value;
+    nkey = tokens[KEY_TOKEN].length;
+
+    flags = strtoul(tokens[2].value, NULL, 10);
+    exptime = strtol(tokens[3].value, NULL, 10);
+    vlen = strtol(tokens[4].value, NULL, 10);
+
+    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
+        out_string(c, "CLIENT_ERROR bad command line format");
+        return;
+    }
+
+    if (settings.detail_enabled) {
+        stats_prefix_record_set(key);
+    }
+
+    if (settings.managed) {
+        int bucket = c->bucket;
+        if (bucket == -1) {
+            out_string(c, "CLIENT_ERROR no BG data in managed mode");
+            return;
+        }
+        c->bucket = -1;
+        if (buckets[bucket] != c->gen) {
+            out_string(c, "ERROR_NOT_OWNER");
+            return;
+        }
+    }
+
+    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
+
+    if (it == 0) {
+        if (! item_size_ok(nkey, flags, vlen + 2))
+            out_string(c, "SERVER_ERROR object too large for cache");
+        else
+            out_string(c, "SERVER_ERROR out of memory");
+        /* swallow the data line */
+        c->write_and_go = conn_swallow;
+        c->sbytes = vlen + 2;
+        return;
+    }
+
+    c->item_comm = comm;
+    c->item = it;
+    c->ritem = ITEM_data(it);
+    c->rlbytes = it->nbytes;
+    conn_set_state(c, conn_nread);
+}
+
+static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) {
+    char temp[32];
+    item *it;
+    unsigned int delta;
+    char *key;
+    size_t nkey;
+
+    assert(c != NULL);
+
+    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
+        out_string(c, "CLIENT_ERROR bad command line format");
+        return;
+    }
+
+    key = tokens[KEY_TOKEN].value;
+    nkey = tokens[KEY_TOKEN].length;
+
+    if (settings.managed) {
+        int bucket = c->bucket;
+        if (bucket == -1) {
+            out_string(c, "CLIENT_ERROR no BG data in managed mode");
+            return;
+        }
+        c->bucket = -1;
+        if (buckets[bucket] != c->gen) {
+            out_string(c, "ERROR_NOT_OWNER");
+            return;
+        }
+    }
+
+    delta = strtoul(tokens[2].value, NULL, 10);
+
+    if(errno == ERANGE) {
+        out_string(c, "CLIENT_ERROR bad command line format");
+        return;
+    }
+
+    it = item_get(key, nkey);
+    if (!it) {
+        out_string(c, "NOT_FOUND");
+        return;
+    }
+
+    out_string(c, add_delta(it, incr, delta, temp));
+    item_remove(it);         /* release our reference */
+}
+
+/*
+ * adds a delta value to a numeric item.
+ *
+ * it    item to adjust
+ * incr  true to increment value, false to decrement
+ * delta amount to adjust value by
+ * buf   buffer for response string
+ *
+ * returns a response string to send back to the client.
+ */
+char *do_add_delta(item *it, const int incr, unsigned int delta, char *buf) {
+    char *ptr;
+    unsigned int value;
+    int res;
+
+    ptr = ITEM_data(it);
+    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
+
+    value = strtol(ptr, NULL, 10);
+
+    if(errno == ERANGE) {
+        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
+    }
+
+    if (incr != 0)
+        value += delta;
+    else {
+        if (delta >= value) value = 0;
+        else value -= delta;
+    }
+    snprintf(buf, 32, "%u", value);
+    res = strlen(buf);
+    if (res + 2 > it->nbytes) { /* need to realloc */
+        item *new_it;
+        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
+        if (new_it == 0) {
+            return "SERVER_ERROR out of memory";
+        }
+        memcpy(ITEM_data(new_it), buf, res);
+        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
+        do_item_replace(it, new_it);
+        do_item_remove(new_it);       /* release our reference */
+    } else { /* replace in-place */
+        memcpy(ITEM_data(it), buf, res);
+        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
+    }
+
+    return buf;
+}
+
+static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
+    char *key;
+    size_t nkey;
+    item *it;
+    time_t exptime = 0;
+
+    assert(c != NULL);
+
+    if (settings.managed) {
+        int bucket = c->bucket;
+        if (bucket == -1) {
+            out_string(c, "CLIENT_ERROR no BG data in managed mode");
+            return;
+        }
+        c->bucket = -1;
+        if (buckets[bucket] != c->gen) {
+            out_string(c, "ERROR_NOT_OWNER");
+            return;
+        }
+    }
+
+    key = tokens[KEY_TOKEN].value;
+    nkey = tokens[KEY_TOKEN].length;
+
+    if(nkey > KEY_MAX_LENGTH) {
+        out_string(c, "CLIENT_ERROR bad command line format");
+        return;
+    }
+
+    if(ntokens == 4) {
+        exptime = strtol(tokens[2].value, NULL, 10);
+
+        if(errno == ERANGE) {
+            out_string(c, "CLIENT_ERROR bad command line format");
+            return;
+        }
+    }
+
+    if (settings.detail_enabled) {
+        stats_prefix_record_delete(key);
+    }
+
+    it = item_get(key, nkey);
+    if (it) {
+        if (exptime == 0) {
+            item_unlink(it);
+            item_remove(it);      /* release our reference */
+            out_string(c, "DELETED");
+        } else {
+            /* our reference will be transfered to the delete queue */
+            out_string(c, defer_delete(it, exptime));
+        }
+    } else {
+        out_string(c, "NOT_FOUND");
+    }
+}
+
+/*
+ * Adds an item to the deferred-delete list so it can be reaped later.
+ *
+ * Returns the result to send to the client.
+ */
+char *do_defer_delete(item *it, time_t exptime)
+{
+    if (delcurr >= deltotal) {
+        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
+        if (new_delete) {
+            todelete = new_delete;
+            deltotal *= 2;
+        } else {
+            /*
+             * can't delete it immediately, user wants a delay,
+             * but we ran out of memory for the delete queue
+             */
+            item_remove(it);    /* release reference */
+            return "SERVER_ERROR out of memory";
+        }
+    }
+
+    /* use its expiration time as its deletion time now */
+    it->exptime = realtime(exptime);
+    it->it_flags |= ITEM_DELETED;
+    todelete[delcurr++] = it;
+
+    return "DELETED";
+}
+
+static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
+    unsigned int level;
+
+    assert(c != NULL);
+
+    level = strtoul(tokens[1].value, NULL, 10);
+    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
+    out_string(c, "OK");
+    return;
+}
+
+static void process_command(conn *c, char *command) {
+
+    token_t tokens[MAX_TOKENS];
+    size_t ntokens;
+    int comm;
+
+    assert(c != NULL);
+
+    if (settings.verbose > 1)
+        fprintf(stderr, "<%d %s\n", c->sfd, command);
+
+    /*
+     * for commands set/add/replace, we build an item and read the data
+     * directly into it, then continue in nread_complete().
+     */
+
+    c->msgcurr = 0;
+    c->msgused = 0;
+    c->iovused = 0;
+    if (add_msghdr(c) != 0) {
+        out_string(c, "SERVER_ERROR out of memory");
+        return;
+    }
+
+    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
+
+    if (ntokens >= 3 &&
+        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
+         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
+
+        process_get_command(c, tokens, ntokens);
+
+    } else if (ntokens == 6 &&
+               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
+                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
+                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
+
+        process_update_command(c, tokens, ntokens, comm);
+
+    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
+
+        process_arithmetic_command(c, tokens, ntokens, 1);
+
+    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
+
+        process_arithmetic_command(c, tokens, ntokens, 0);
+
+    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
+
+        process_delete_command(c, tokens, ntokens);
+
+    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
+        unsigned int bucket, gen;
+        if (!settings.managed) {
+            out_string(c, "CLIENT_ERROR not a managed instance");
+            return;
+        }
+
+        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
+            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
+                out_string(c, "CLIENT_ERROR bucket number out of range");
+                return;
+            }
+            buckets[bucket] = gen;
+            out_string(c, "OWNED");
+            return;
+        } else {
+            out_string(c, "CLIENT_ERROR bad format");
+            return;
+        }
+
+    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
+
+        int bucket;
+        if (!settings.managed) {
+            out_string(c, "CLIENT_ERROR not a managed instance");
+            return;
+        }
+        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
+            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
+                out_string(c, "CLIENT_ERROR bucket number out of range");
+                return;
+            }
+            buckets[bucket] = 0;
+            out_string(c, "DISOWNED");
+            return;
+        } else {
+            out_string(c, "CLIENT_ERROR bad format");
+            return;
+        }
+
+    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
+        int bucket, gen;
+        if (!settings.managed) {
+            out_string(c, "CLIENT_ERROR not a managed instance");
+            return;
+        }
+        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
+            /* we never write anything back, even if input's wrong */
+            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
+                /* do nothing, bad input */
+            } else {
+                c->bucket = bucket;
+                c->gen = gen;
+            }
+            conn_set_state(c, conn_read);
+            return;
+        } else {
+            out_string(c, "CLIENT_ERROR bad format");
+            return;
+        }
+
+    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
+
+        process_stat(c, tokens, ntokens);
+
+    } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
+        time_t exptime = 0;
+        set_current_time();
+
+        if(ntokens == 2) {
+            settings.oldest_live = current_time - 1;
+            item_flush_expired();
+            out_string(c, "OK");
+            return;
+        }
+
+        exptime = strtol(tokens[1].value, NULL, 10);
+        if(errno == ERANGE) {
+            out_string(c, "CLIENT_ERROR bad command line format");
+            return;
+        }
+
+        settings.oldest_live = realtime(exptime) - 1;
+        item_flush_expired();
+        out_string(c, "OK");
+        return;
+
+    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
+
+        out_string(c, "VERSION " VERSION);
+
+    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
+
+        conn_set_state(c, conn_closing);
+
+    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
+                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
+#ifdef ALLOW_SLABS_REASSIGN
+
+        int src, dst, rv;
+
+        src = strtol(tokens[2].value, NULL, 10);
+        dst  = strtol(tokens[3].value, NULL, 10);
+
+        if(errno == ERANGE) {
+            out_string(c, "CLIENT_ERROR bad command line format");
+            return;
+        }
+
+        rv = slabs_reassign(src, dst);
+        if (rv == 1) {
+            out_string(c, "DONE");
+            return;
+        }
+        if (rv == 0) {
+            out_string(c, "CANT");
+            return;
+        }
+        if (rv == -1) {
+            out_string(c, "BUSY");
+            return;
+        }
+#else
+        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
+#endif
+    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
+        process_verbosity_command(c, tokens, ntokens);
+    } else {
+        out_string(c, "ERROR");
+    }
+    return;
+}
+
+/*
+ * if we have a complete line in the buffer, process it.
+ */
+static int try_read_command(conn *c) {
+    char *el, *cont;
+
+    assert(c != NULL);
+    assert(c->rcurr <= (c->rbuf + c->rsize));
+
+    if (c->rbytes == 0)
+        return 0;
+    el = memchr(c->rcurr, '\n', c->rbytes);
+    if (!el)
+        return 0;
+    cont = el + 1;
+    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
+        el--;
+    }
+    *el = '\0';
+
+    assert(cont <= (c->rcurr + c->rbytes));
+
+    process_command(c, c->rcurr);
+
+    c->rbytes -= (cont - c->rcurr);
+    c->rcurr = cont;
+
+    assert(c->rcurr <= (c->rbuf + c->rsize));
+
+    return 1;
+}
+
+/*
+ * read a UDP request.
+ * return 0 if there's nothing to read.
+ */
+static int try_read_udp(conn *c) {
+    int res;
+
+    assert(c != NULL);
+
+    c->request_addr_size = sizeof(c->request_addr);
+    res = recvfrom(c->sfd, c->rbuf, c->rsize,
+                   0, &c->request_addr, &c->request_addr_size);
+    if (res > 8) {
+        unsigned char *buf = (unsigned char *)c->rbuf;
+        STATS_LOCK();
+        stats.bytes_read += res;
+        STATS_UNLOCK();
+
+        /* Beginning of UDP packet is the request ID; save it. */
+        c->request_id = buf[0] * 256 + buf[1];
+
+        /* If this is a multi-packet request, drop it. */
+        if (buf[4] != 0 || buf[5] != 1) {
+            out_string(c, "SERVER_ERROR multi-packet request not supported");
+            return 0;
+        }
+
+        /* Don't care about any of the rest of the header. */
+        res -= 8;
+        memmove(c->rbuf, c->rbuf + 8, res);
+
+        c->rbytes += res;
+        c->rcurr = c->rbuf;
+        return 1;
+    }
+    return 0;
+}
+
+/*
+ * read from network as much as we can, handle buffer overflow and connection
+ * close.
+ * before reading, move the remaining incomplete fragment of a command
+ * (if any) to the beginning of the buffer.
+ * return 0 if there's nothing to read on the first read.
+ */
+static int try_read_network(conn *c) {
+    int gotdata = 0;
+    int res;
+
+    assert(c != NULL);
+
+    if (c->rcurr != c->rbuf) {
+        if (c->rbytes != 0) /* otherwise there's nothing to copy */
+            memmove(c->rbuf, c->rcurr, c->rbytes);
+        c->rcurr = c->rbuf;
+    }
+
+    while (1) {
+        if (c->rbytes >= c->rsize) {
+            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
+            if (!new_rbuf) {
+                if (settings.verbose > 0)
+                    fprintf(stderr, "Couldn't realloc input buffer\n");
+                c->rbytes = 0; /* ignore what we read */
+                out_string(c, "SERVER_ERROR out of memory");
+                c->write_and_go = conn_closing;
+                return 1;
+            }
+            c->rcurr = c->rbuf = new_rbuf;
+            c->rsize *= 2;
+        }
+
+        /* unix socket mode doesn't need this, so zeroed out.  but why
+         * is this done for every command?  presumably for UDP
+         * mode.  */
+        if (!settings.socketpath) {
+            c->request_addr_size = sizeof(c->request_addr);
+        } else {
+            c->request_addr_size = 0;
+        }
+
+        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
+        if (res > 0) {
+            STATS_LOCK();
+            stats.bytes_read += res;
+            STATS_UNLOCK();
+            gotdata = 1;
+            c->rbytes += res;
+            continue;
+        }
+        if (res == 0) {
+            /* connection closed */
+            conn_set_state(c, conn_closing);
+            return 1;
+        }
+        if (res == -1) {
+            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
+            else return 0;
+        }
+    }
+    return gotdata;
+}
+
+static bool update_event(conn *c, const int new_flags) {
+    assert(c != NULL);
+
+    struct event_base *base = c->event.ev_base;
+    if (c->ev_flags == new_flags)
+        return true;
+    if (event_del(&c->event) == -1) return false;
+    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
+    event_base_set(base, &c->event);
+    c->ev_flags = new_flags;
+    if (event_add(&c->event, 0) == -1) return false;
+    return true;
+}
+
+/*
+ * Sets whether we are listening for new connections or not.
+ */
+void accept_new_conns(const bool do_accept) {
+    if (! is_listen_thread())
+        return;
+    if (do_accept) {
+        update_event(listen_conn, EV_READ | EV_PERSIST);
+        if (listen(listen_conn->sfd, 1024) != 0) {
+            perror("listen");
+        }
+    }
+    else {
+        update_event(listen_conn, 0);
+        if (listen(listen_conn->sfd, 0) != 0) {
+            perror("listen");
+        }
+    }
+}
+
+
+/*
+ * Transmit the next chunk of data from our list of msgbuf structures.
+ *
+ * Returns:
+ *   TRANSMIT_COMPLETE   All done writing.
+ *   TRANSMIT_INCOMPLETE More data remaining to write.
+ *   TRANSMIT_SOFT_ERROR Can't write any more right now.
+ *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
+ */
+static int transmit(conn *c) {
+    assert(c != NULL);
+
+    if (c->msgcurr < c->msgused &&
+            c->msglist[c->msgcurr].msg_iovlen == 0) {
+        /* Finished writing the current msg; advance to the next. */
+        c->msgcurr++;
+    }
+    if (c->msgcurr < c->msgused) {
+        ssize_t res;
+        struct msghdr *m = &c->msglist[c->msgcurr];
+
+        res = sendmsg(c->sfd, m, 0);
+        if (res > 0) {
+            STATS_LOCK();
+            stats.bytes_written += res;
+            STATS_UNLOCK();
+
+            /* We've written some of the data. Remove the completed
+               iovec entries from the list of pending writes. */
+            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
+                res -= m->msg_iov->iov_len;
+                m->msg_iovlen--;
+                m->msg_iov++;
+            }
+
+            /* Might have written just part of the last iovec entry;
+               adjust it so the next write will do the rest. */
+            if (res > 0) {
+                m->msg_iov->iov_base += res;
+                m->msg_iov->iov_len -= res;
+            }
+            return TRANSMIT_INCOMPLETE;
+        }
+        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
+                if (settings.verbose > 0)
+                    fprintf(stderr, "Couldn't update event\n");
+                conn_set_state(c, conn_closing);
+                return TRANSMIT_HARD_ERROR;
+            }
+            return TRANSMIT_SOFT_ERROR;
+        }
+        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
+           we have a real error, on which we close the connection */
+        if (settings.verbose > 0)
+            perror("Failed to write, and not due to blocking");
+
+        if (c->udp)
+            conn_set_state(c, conn_read);
+        else
+            conn_set_state(c, conn_closing);
+        return TRANSMIT_HARD_ERROR;
+    } else {
+        return TRANSMIT_COMPLETE;
+    }
+}
+
+static void drive_machine(conn *c) {
+    bool stop = false;
+    int sfd, flags = 1;
+    socklen_t addrlen;
+    struct sockaddr addr;
+    int res;
+
+    assert(c != NULL);
+
+    while (!stop) {
+
+        switch(c->state) {
+        case conn_listening:
+            addrlen = sizeof(addr);
+            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    /* these are transient, so don't log anything */
+                    stop = true;
+                } else if (errno == EMFILE) {
+                    if (settings.verbose > 0)
+                        fprintf(stderr, "Too many open connections\n");
+                    accept_new_conns(false);
+                    stop = true;
+                } else {
+                    perror("accept()");
+                    stop = true;
+                }
+                break;
+            }
+            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
+                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
+                perror("setting O_NONBLOCK");
+                close(sfd);
+                break;
+            }
+            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
+                                     DATA_BUFFER_SIZE, false);
+            break;
+
+        case conn_read:
+            if (try_read_command(c) != 0) {
+                continue;
+            }
+            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
+                continue;
+            }
+            /* we have no command line and no data to read from network */
+            if (!update_event(c, EV_READ | EV_PERSIST)) {
+                if (settings.verbose > 0)
+                    fprintf(stderr, "Couldn't update event\n");
+                conn_set_state(c, conn_closing);
+                break;
+            }
+            stop = true;
+            break;
+
+        case conn_nread:
+            /* we are reading rlbytes into ritem; */
+            if (c->rlbytes == 0) {
+                complete_nread(c);
+                break;
+            }
+            /* first check if we have leftovers in the conn_read buffer */
+            if (c->rbytes > 0) {
+                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
+                memcpy(c->ritem, c->rcurr, tocopy);
+                c->ritem += tocopy;
+                c->rlbytes -= tocopy;
+                c->rcurr += tocopy;
+                c->rbytes -= tocopy;
+                break;
+            }
+
+            /*  now try reading from the socket */
+            res = read(c->sfd, c->ritem, c->rlbytes);
+            if (res > 0) {
+                STATS_LOCK();
+                stats.bytes_read += res;
+                STATS_UNLOCK();
+                c->ritem += res;
+                c->rlbytes -= res;
+                break;
+            }
+            if (res == 0) { /* end of stream */
+                conn_set_state(c, conn_closing);
+                break;
+            }
+            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+                if (!update_event(c, EV_READ | EV_PERSIST)) {
+                    if (settings.verbose > 0)
+                        fprintf(stderr, "Couldn't update event\n");
+                    conn_set_state(c, conn_closing);
+                    break;
+                }
+                stop = true;
+                break;
+            }
+            /* otherwise we have a real error, on which we close the connection */
+            if (settings.verbose > 0)
+                fprintf(stderr, "Failed to read, and not due to blocking\n");
+            conn_set_state(c, conn_closing);
+            break;
+
+        case conn_swallow:
+            /* we are reading sbytes and throwing them away */
+            if (c->sbytes == 0) {
+                conn_set_state(c, conn_read);
+                break;
+            }
+
+            /* first check if we have leftovers in the conn_read buffer */
+            if (c->rbytes > 0) {
+                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
+                c->sbytes -= tocopy;
+                c->rcurr += tocopy;
+                c->rbytes -= tocopy;
+                break;
+            }
+
+            /*  now try reading from the socket */
+            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
+            if (res > 0) {
+                STATS_LOCK();
+                stats.bytes_read += res;
+                STATS_UNLOCK();
+                c->sbytes -= res;
+                break;
+            }
+            if (res == 0) { /* end of stream */
+                conn_set_state(c, conn_closing);
+                break;
+            }
+            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+                if (!update_event(c, EV_READ | EV_PERSIST)) {
+                    if (settings.verbose > 0)
+                        fprintf(stderr, "Couldn't update event\n");
+                    conn_set_state(c, conn_closing);
+                    break;
+                }
+                stop = true;
+                break;
+            }
+            /* otherwise we have a real error, on which we close the connection */
+            if (settings.verbose > 0)
+                fprintf(stderr, "Failed to read, and not due to blocking\n");
+            conn_set_state(c, conn_closing);
+            break;
+
+        case conn_write:
+            /*
+             * We want to write out a simple response. If we haven't already,
+             * assemble it into a msgbuf list (this will be a single-entry
+             * list for TCP or a two-entry list for UDP).
+             */
+            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
+                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
+                    (c->udp && build_udp_headers(c) != 0)) {
+                    if (settings.verbose > 0)
+                        fprintf(stderr, "Couldn't build response\n");
+                    conn_set_state(c, conn_closing);
+                    break;
+                }
+            }
+
+            /* fall through... */
+
+        case conn_mwrite:
+            switch (transmit(c)) {
+            case TRANSMIT_COMPLETE:
+                if (c->state == conn_mwrite) {
+                    while (c->ileft > 0) {
+                        item *it = *(c->icurr);
+                        assert((it->it_flags & ITEM_SLABBED) == 0);
+                        item_remove(it);
+                        c->icurr++;
+                        c->ileft--;
+                    }
+                    conn_set_state(c, conn_read);
+                } else if (c->state == conn_write) {
+                    if (c->write_and_free) {
+                        free(c->write_and_free);
+                        c->write_and_free = 0;
+                    }
+                    conn_set_state(c, c->write_and_go);
+                } else {
+                    if (settings.verbose > 0)
+                        fprintf(stderr, "Unexpected state %d\n", c->state);
+                    conn_set_state(c, conn_closing);
+                }
+                break;
+
+            case TRANSMIT_INCOMPLETE:
+            case TRANSMIT_HARD_ERROR:
+                break;                   /* Continue in state machine. */
+
+            case TRANSMIT_SOFT_ERROR:
+                stop = true;
+                break;
+            }
+            break;
+
+        case conn_closing:
+            if (c->udp)
+                conn_cleanup(c);
+            else
+                conn_close(c);
+            stop = true;
+            break;
+        }
+    }
+
+    return;
+}
+
+void event_handler(const int fd, const short which, void *arg) {
+    conn *c;
+
+    c = (conn *)arg;
+    assert(c != NULL);
+
+    c->which = which;
+
+    /* sanity */
+    if (fd != c->sfd) {
+        if (settings.verbose > 0)
+            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
+        conn_close(c);
+        return;
+    }
+
+    drive_machine(c);
+
+    /* wait for next event */
+    return;
+}
+
+static int new_socket(const bool is_udp) {
+    int sfd;
+    int flags;
+
+    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
+        perror("socket()");
+        return -1;
+    }
+
+    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
+        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
+        perror("setting O_NONBLOCK");
+        close(sfd);
+        return -1;
+    }
+    return sfd;
+}
+
+
+/*
+ * Sets a socket's send buffer size to the maximum allowed by the system.
+ */
+static void maximize_sndbuf(const int sfd) {
+    socklen_t intsize = sizeof(int);
+    int last_good = 0;
+    int min, max, avg;
+    int old_size;
+
+    /* Start with the default size. */
+    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
+        if (settings.verbose > 0)
+            perror("getsockopt(SO_SNDBUF)");
+        return;
+    }
+
+    /* Binary-search for the real maximum. */
+    min = old_size;
+    max = MAX_SENDBUF_SIZE;
+
+    while (min <= max) {
+        avg = ((unsigned int)(min + max)) / 2;
+        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
+            last_good = avg;
+            min = avg + 1;
+        } else {
+            max = avg - 1;
+        }
+    }
+
+    if (settings.verbose > 1)
+        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
+}
+
+
+static int server_socket(const int port, const bool is_udp) {
+    int sfd;
+    struct linger ling = {0, 0};
+    struct sockaddr_in addr;
+    int flags =1;
+
+    if ((sfd = new_socket(is_udp)) == -1) {
+        return -1;
+    }
+
+    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
+    if (is_udp) {
+        maximize_sndbuf(sfd);
+    } else {
+        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
+        setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
+        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
+    }
+
+    /*
+     * the memset call clears nonstandard fields in some impementations
+     * that otherwise mess things up.
+     */
+    memset(&addr, 0, sizeof(addr));
+
+    addr.sin_family = AF_INET;
+    addr.sin_port = htons(port);
+    addr.sin_addr = settings.interf;
+    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+        perror("bind()");
+        close(sfd);
+        return -1;
+    }
+    if (!is_udp && listen(sfd, 1024) == -1) {
+        perror("listen()");
+        close(sfd);
+        return -1;
+    }
+    return sfd;
+}
+
+static int new_socket_unix(void) {
+    int sfd;
+    int flags;
+
+    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+        perror("socket()");
+        return -1;
+    }
+
+    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
+        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
+        perror("setting O_NONBLOCK");
+        close(sfd);
+        return -1;
+    }
+    return sfd;
+}
+
+static int server_socket_unix(const char *path) {
+    int sfd;
+    struct linger ling = {0, 0};
+    struct sockaddr_un addr;
+    struct stat tstat;
+    int flags =1;
+
+    if (!path) {
+        return -1;
+    }
+
+    if ((sfd = new_socket_unix()) == -1) {
+        return -1;
+    }
+
+    /*
+     * Clean up a previous socket file if we left it around
+     */
+    if (lstat(path, &tstat) == 0) {
+        if (S_ISSOCK(tstat.st_mode))
+            unlink(path);
+    }
+
+    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
+    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
+    setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
+
+    /*
+     * the memset call clears nonstandard fields in some impementations
+     * that otherwise mess things up.
+     */
+    memset(&addr, 0, sizeof(addr));
+
+    addr.sun_family = AF_UNIX;
+    strcpy(addr.sun_path, path);
+    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+        perror("bind()");
+        close(sfd);
+        return -1;
+    }
+    if (listen(sfd, 1024) == -1) {
+        perror("listen()");
+        close(sfd);
+        return -1;
+    }
+    return sfd;
+}
+
+/* listening socket */
+static int l_socket = 0;
+
+/* udp socket */
+static int u_socket = -1;
+
+/* invoke right before gdb is called, on assert */
+void pre_gdb(void) {
+    int i;
+    if (l_socket > -1) close(l_socket);
+    if (u_socket > -1) close(u_socket);
+    for (i = 3; i <= 500; i++) close(i); /* so lame */
+    kill(getpid(), SIGABRT);
+}
+
+/*
+ * We keep the current time of day in a global variable that's updated by a
+ * timer event. This saves us a bunch of time() system calls (we really only
+ * need to get the time once a second, whereas there can be tens of thousands
+ * of requests a second) and allows us to use server-start-relative timestamps
+ * rather than absolute UNIX timestamps, a space savings on systems where
+ * sizeof(time_t) > sizeof(unsigned int).
+ */
+volatile rel_time_t current_time;
+static struct event clockevent;
+
+/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
+static void set_current_time(void) {
+    current_time = (rel_time_t) (time(0) - stats.started);
+}
+
+static void clock_handler(const int fd, const short which, void *arg) {
+    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
+    static bool initialized = false;
+
+    if (initialized) {
+        /* only delete the event if it's actually there. */
+        evtimer_del(&clockevent);
+    } else {
+        initialized = true;
+    }
+
+    evtimer_set(&clockevent, clock_handler, 0);
+    event_base_set(main_base, &clockevent);
+    evtimer_add(&clockevent, &t);
+
+    set_current_time();
+}
+
+static struct event deleteevent;
+
+static void delete_handler(const int fd, const short which, void *arg) {
+    struct timeval t = {.tv_sec = 5, .tv_usec = 0};
+    static bool initialized = false;
+
+    if (initialized) {
+        /* some versions of libevent don't like deleting events that don't exist,
+           so only delete once we know this event has been added. */
+        evtimer_del(&deleteevent);
+    } else {
+        initialized = true;
+    }
+
+    evtimer_set(&deleteevent, delete_handler, 0);
+    event_base_set(main_base, &deleteevent);
+    evtimer_add(&deleteevent, &t);
+    run_deferred_deletes();
+}
+
+/* Call run_deferred_deletes instead of this. */
+void do_run_deferred_deletes(void)
+{
+    int i, j = 0;
+
+    for (i = 0; i < delcurr; i++) {
+        item *it = todelete[i];
+        if (item_delete_lock_over(it)) {
+            assert(it->refcount > 0);
+            it->it_flags &= ~ITEM_DELETED;
+            do_item_unlink(it);
+            do_item_remove(it);
+        } else {
+            todelete[j++] = it;
+        }
+    }
+    delcurr = j;
+}
+
+static void usage(void) {
+    printf(PACKAGE " " VERSION "\n");
+    printf("-p <num>      TCP port number to listen on (default: 11211)\n"
+           "-U <num>      UDP port number to listen on (default: 0, off)\n"
+           "-s <file>     unix socket path to listen on (disables network support)\n"
+           "-l <ip_addr>  interface to listen on, default is INDRR_ANY\n"
+           "-d            run as a daemon\n"
+           "-r            maximize core file limit\n"
+           "-u <username> assume identity of <username> (only when run as root)\n"
+           "-m <num>      max memory to use for items in megabytes, default is 64 MB\n"
+           "-M            return error on memory exhausted (rather than removing items)\n"
+           "-c <num>      max simultaneous connections, default is 1024\n"
+           "-k            lock down all paged memory\n"
+           "-v            verbose (print errors/warnings while in event loop)\n"
+           "-vv           very verbose (also print client commands/reponses)\n"
+           "-h            print this help and exit\n"
+           "-i            print memcached and libevent license\n"
+           "-b            run a managed instanced (mnemonic: buckets)\n"
+           "-P <file>     save PID in <file>, only used with -d option\n"
+           "-f <factor>   chunk size growth factor, default 1.25\n"
+           "-n <bytes>    minimum space allocated for key+value+flags, default 48\n");
+#ifdef USE_THREADS
+    printf("-t <num>      number of threads to use, default 4\n");
+#endif
+    return;
+}
+
+static void usage_license(void) {
+    printf(PACKAGE " " VERSION "\n\n");
+    printf(
+    "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
+    "All rights reserved.\n"
+    "\n"
+    "Redistribution and use in source and binary forms, with or without\n"
+    "modification, are permitted provided that the following conditions are\n"
+    "met:\n"
+    "\n"
+    "    * Redistributions of source code must retain the above copyright\n"
+    "notice, this list of conditions and the following disclaimer.\n"
+    "\n"
+    "    * Redistributions in binary form must reproduce the above\n"
+    "copyright notice, this list of conditions and the following disclaimer\n"
+    "in the documentation and/or other materials provided with the\n"
+    "distribution.\n"
+    "\n"
+    "    * Neither the name of the Danga Interactive nor the names of its\n"
+    "contributors may be used to endorse or promote products derived from\n"
+    "this software without specific prior written permission.\n"
+    "\n"
+    "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
+    "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
+    "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
+    "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
+    "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
+    "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
+    "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
+    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
+    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
+    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
+    "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
+    "\n"
+    "\n"
+    "This product includes software developed by Niels Provos.\n"
+    "\n"
+    "[ libevent ]\n"
+    "\n"
+    "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
+    "All rights reserved.\n"
+    "\n"
+    "Redistribution and use in source and binary forms, with or without\n"
+    "modification, are permitted provided that the following conditions\n"
+    "are met:\n"
+    "1. Redistributions of source code must retain the above copyright\n"
+    "   notice, this list of conditions and the following disclaimer.\n"
+    "2. Redistributions in binary form must reproduce the above copyright\n"
+    "   notice, this list of conditions and the following disclaimer in the\n"
+    "   documentation and/or other materials provided with the distribution.\n"
+    "3. All advertising materials mentioning features or use of this software\n"
+    "   must display the following acknowledgement:\n"
+    "      This product includes software developed by Niels Provos.\n"
+    "4. The name of the author may not be used to endorse or promote products\n"
+    "   derived from this software without specific prior written permission.\n"
+    "\n"
+    "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
+    "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
+    "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
+    "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
+    "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
+    "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
+    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
+    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
+    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
+    "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
+    );
+
+    return;
+}
+
+static void save_pid(const pid_t pid, const char *pid_file) {
+    FILE *fp;
+    if (pid_file == NULL)
+        return;
+
+    if (!(fp = fopen(pid_file, "w"))) {
+        fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
+        return;
+    }
+
+    fprintf(fp,"%ld\n", (long)pid);
+    if (fclose(fp) == -1) {
+        fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
+        return;
+    }
+}
+
+static void remove_pidfile(const char *pid_file) {
+  if (pid_file == NULL)
+      return;
+
+  if (unlink(pid_file) != 0) {
+      fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
+  }
+
+}
+
+
+static void sig_handler(const int sig) {
+    printf("SIGINT handled.\n");
+    exit(EXIT_SUCCESS);
+}
+
+int main (int argc, char **argv) {
+    int c;
+    struct in_addr addr;
+    bool lock_memory = false;
+    bool daemonize = false;
+    int maxcore = 0;
+    char *username = NULL;
+    char *pid_file = NULL;
+    struct passwd *pw;
+    struct sigaction sa;
+    struct rlimit rlim;
+
+    /* handle SIGINT */
+    signal(SIGINT, sig_handler);
+
+    /* init settings */
+    settings_init();
+
+    /* set stderr non-buffering (for running under, say, daemontools) */
+    setbuf(stderr, NULL);
+
+    /* process arguments */
+    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
+        switch (c) {
+        case 'U':
+            settings.udpport = atoi(optarg);
+            break;
+        case 'b':
+            settings.managed = true;
+            break;
+        case 'p':
+            settings.port = atoi(optarg);
+            break;
+        case 's':
+            settings.socketpath = optarg;
+            break;
+        case 'm':
+            settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
+            break;
+        case 'M':
+            settings.evict_to_free = 0;
+            break;
+        case 'c':
+            settings.maxconns = atoi(optarg);
+            break;
+        case 'h':
+            usage();
+            exit(EXIT_SUCCESS);
+        case 'i':
+            usage_license();
+            exit(EXIT_SUCCESS);
+        case 'k':
+            lock_memory = true;
+            break;
+        case 'v':
+            settings.verbose++;
+            break;
+        case 'l':
+            if (inet_pton(AF_INET, optarg, &addr) <= 0) {
+                fprintf(stderr, "Illegal address: %s\n", optarg);
+                return 1;
+            } else {
+                settings.interf = addr;
+            }
+            break;
+        case 'd':
+            daemonize = true;
+            break;
+        case 'r':
+            maxcore = 1;
+            break;
+        case 'u':
+            username = optarg;
+            break;
+        case 'P':
+            pid_file = optarg;
+            break;
+        case 'f':
+            settings.factor = atof(optarg);
+            if (settings.factor <= 1.0) {
+                fprintf(stderr, "Factor must be greater than 1\n");
+                return 1;
+            }
+            break;
+        case 'n':
+            settings.chunk_size = atoi(optarg);
+            if (settings.chunk_size == 0) {
+                fprintf(stderr, "Chunk size must be greater than 0\n");
+                return 1;
+            }
+            break;
+        case 't':
+            settings.num_threads = atoi(optarg);
+            if (settings.num_threads == 0) {
+                fprintf(stderr, "Number of threads must be greater than 0\n");
+                return 1;
+            }
+            break;
+        case 'D':
+            if (! optarg || ! optarg[0]) {
+                fprintf(stderr, "No delimiter specified\n");
+                return 1;
+            }
+            settings.prefix_delimiter = optarg[0];
+            settings.detail_enabled = 1;
+            break;
+        default:
+            fprintf(stderr, "Illegal argument \"%c\"\n", c);
+            return 1;
+        }
+    }
+
+    if (maxcore != 0) {
+        struct rlimit rlim_new;
+        /*
+         * First try raising to infinity; if that fails, try bringing
+         * the soft limit to the hard.
+         */
+        if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
+            rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
+            if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
+                /* failed. try raising just to the old max */
+                rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
+                (void)setrlimit(RLIMIT_CORE, &rlim_new);
+            }
+        }
+        /*
+         * getrlimit again to see what we ended up with. Only fail if
+         * the soft limit ends up 0, because then no core files will be
+         * created at all.
+         */
+
+        if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
+            fprintf(stderr, "failed to ensure corefile creation\n");
+            exit(EXIT_FAILURE);
+        }
+    }
+
+    /*
+     * If needed, increase rlimits to allow as many connections
+     * as needed.
+     */
+
+    if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
+        fprintf(stderr, "failed to getrlimit number of files\n");
+        exit(EXIT_FAILURE);
+    } else {
+        int maxfiles = settings.maxconns;
+        if (rlim.rlim_cur < maxfiles)
+            rlim.rlim_cur = maxfiles + 3;
+        if (rlim.rlim_max < rlim.rlim_cur)
+            rlim.rlim_max = rlim.rlim_cur;
+        if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
+            fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n");
+            exit(EXIT_FAILURE);
+        }
+    }
+
+    /*
+     * initialization order: first create the listening sockets
+     * (may need root on low ports), then drop root if needed,
+     * then daemonise if needed, then init libevent (in some cases
+     * descriptors created by libevent wouldn't survive forking).
+     */
+
+    /* create the listening socket and bind it */
+    if (settings.socketpath == NULL) {
+        l_socket = server_socket(settings.port, 0);
+        if (l_socket == -1) {
+            fprintf(stderr, "failed to listen\n");
+            exit(EXIT_FAILURE);
+        }
+    }
+
+    if (settings.udpport > 0 && settings.socketpath == NULL) {
+        /* create the UDP listening socket and bind it */
+        u_socket = server_socket(settings.udpport, 1);
+        if (u_socket == -1) {
+            fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
+            exit(EXIT_FAILURE);
+        }
+    }
+
+    /* lose root privileges if we have them */
+    if (getuid() == 0 || geteuid() == 0) {
+        if (username == 0 || *username == '\0') {
+            fprintf(stderr, "can't run as root without the -u switch\n");
+            return 1;
+        }
+        if ((pw = getpwnam(username)) == 0) {
+            fprintf(stderr, "can't find the user %s to switch to\n", username);
+            return 1;
+        }
+        if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
+            fprintf(stderr, "failed to assume identity of user %s\n", username);
+            return 1;
+        }
+    }
+
+    /* create unix mode sockets after dropping privileges */
+    if (settings.socketpath != NULL) {
+        l_socket = server_socket_unix(settings.socketpath);
+        if (l_socket == -1) {
+            fprintf(stderr, "failed to listen\n");
+            exit(EXIT_FAILURE);
+        }
+    }
+
+    /* daemonize if requested */
+    /* if we want to ensure our ability to dump core, don't chdir to / */
+    if (daemonize) {
+        int res;
+        res = daemon(maxcore, settings.verbose);
+        if (res == -1) {
+            fprintf(stderr, "failed to daemon() in order to daemonize\n");
+            return 1;
+        }
+    }
+
+    /* initialize main thread libevent instance */
+    main_base = event_init();
+
+    /* initialize other stuff */
+    item_init();
+    stats_init();
+    assoc_init();
+    conn_init();
+    slabs_init(settings.maxbytes, settings.factor);
+
+    /* managed instance? alloc and zero a bucket array */
+    if (settings.managed) {
+        buckets = malloc(sizeof(int) * MAX_BUCKETS);
+        if (buckets == 0) {
+            fprintf(stderr, "failed to allocate the bucket array");
+            exit(EXIT_FAILURE);
+        }
+        memset(buckets, 0, sizeof(int) * MAX_BUCKETS);
+    }
+
+    /* lock paged memory if needed */
+    if (lock_memory) {
+#ifdef HAVE_MLOCKALL
+        mlockall(MCL_CURRENT | MCL_FUTURE);
+#else
+        fprintf(stderr, "warning: mlockall() not supported on this platform.  proceeding without.\n");
+#endif
+    }
+
+    /*
+     * ignore SIGPIPE signals; we can use errno==EPIPE if we
+     * need that information
+     */
+    sa.sa_handler = SIG_IGN;
+    sa.sa_flags = 0;
+    if (sigemptyset(&sa.sa_mask) == -1 ||
+        sigaction(SIGPIPE, &sa, 0) == -1) {
+        perror("failed to ignore SIGPIPE; sigaction");
+        exit(EXIT_FAILURE);
+    }
+    /* create the initial listening connection */
+    if (!(listen_conn = conn_new(l_socket, conn_listening,
+                                 EV_READ | EV_PERSIST, 1, false, main_base))) {
+        fprintf(stderr, "failed to create listening connection");
+        exit(EXIT_FAILURE);
+    }
+    /* save the PID in if we're a daemon */
+    if (daemonize)
+        save_pid(getpid(), pid_file);
+    /* start up worker threads if MT mode */
+    thread_init(settings.num_threads, main_base);
+    /* initialise clock event */
+    clock_handler(0, 0, 0);
+    /* initialise deletion array and timer event */
+    deltotal = 200;
+    delcurr = 0;
+    todelete = malloc(sizeof(item *) * deltotal);
+    delete_handler(0, 0, 0); /* sets up the event */
+    /* create the initial listening udp connection, monitored on all threads */
+    if (u_socket > -1) {
+        for (c = 0; c < settings.num_threads; c++) {
+            /* this is guaranteed to hit all threads because we round-robin */
+            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
+                              UDP_READ_BUFFER_SIZE, 1);
+        }
+    }
+    /* enter the event loop */
+    event_base_loop(main_base, 0);
+    /* remove the PID file if we're a daemon */
+    if (daemonize)
+        remove_pidfile(pid_file);
+    return 0;
+}