diff memcached.h @ 0:30782bb1fc04 MEMCACHED_1_2_3

memcached-1.2.3
author Maxim Dounin <mdounin@mdounin.ru>
date Sun, 23 Sep 2007 03:58:34 +0400
parents
children e28ab6bd21fa
line wrap: on
line diff
new file mode 100644
--- /dev/null
+++ b/memcached.h
@@ -0,0 +1,332 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/* $Id: memcached.h 570 2007-06-20 01:21:45Z plindner $ */
+
+#include "config.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <netinet/in.h>
+#include <event.h>
+
+#define DATA_BUFFER_SIZE 2048
+#define UDP_READ_BUFFER_SIZE 65536
+#define UDP_MAX_PAYLOAD_SIZE 1400
+#define UDP_HEADER_SIZE 8
+#define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
+
+/* Initial size of list of items being returned by "get". */
+#define ITEM_LIST_INITIAL 200
+
+/* Initial size of the sendmsg() scatter/gather array. */
+#define IOV_LIST_INITIAL 400
+
+/* Initial number of sendmsg() argument structures to allocate. */
+#define MSG_LIST_INITIAL 10
+
+/* High water marks for buffer shrinking */
+#define READ_BUFFER_HIGHWAT 8192
+#define ITEM_LIST_HIGHWAT 400
+#define IOV_LIST_HIGHWAT 600
+#define MSG_LIST_HIGHWAT 100
+
+/* Get a consistent bool type */
+#if HAVE_STDBOOL_H
+# include <stdbool.h>
+#else
+  typedef enum {false = 0, true = 1} bool;
+#endif
+
+#if HAVE_STDINT_H
+# include <stdint.h>
+#else
+ typedef unsigned char             uint8_t;
+#endif
+
+/* unistd.h is here */
+#if HAVE_UNISTD_H
+# include <unistd.h>
+#endif
+
+/* Time relative to server start. Smaller than time_t on 64-bit systems. */
+typedef unsigned int rel_time_t;
+
+struct stats {
+    unsigned int  curr_items;
+    unsigned int  total_items;
+    uint64_t      curr_bytes;
+    unsigned int  curr_conns;
+    unsigned int  total_conns;
+    unsigned int  conn_structs;
+    uint64_t      get_cmds;
+    uint64_t      set_cmds;
+    uint64_t      get_hits;
+    uint64_t      get_misses;
+    uint64_t      evictions;
+    time_t        started;          /* when the process was started */
+    uint64_t      bytes_read;
+    uint64_t      bytes_written;
+};
+
+#define MAX_VERBOSITY_LEVEL 2
+
+struct settings {
+    size_t maxbytes;
+    int maxconns;
+    int port;
+    int udpport;
+    struct in_addr interf;
+    int verbose;
+    rel_time_t oldest_live; /* ignore existing items older than this */
+    bool managed;          /* if 1, a tracker manages virtual buckets */
+    int evict_to_free;
+    char *socketpath;   /* path to unix socket if using local socket */
+    double factor;          /* chunk size growth factor */
+    int chunk_size;
+    int num_threads;        /* number of libevent threads to run */
+    char prefix_delimiter;  /* character that marks a key prefix (for stats) */
+    int detail_enabled;     /* nonzero if we're collecting detailed stats */
+};
+
+extern struct stats stats;
+extern struct settings settings;
+
+#define ITEM_LINKED 1
+#define ITEM_DELETED 2
+
+/* temp */
+#define ITEM_SLABBED 4
+
+typedef struct _stritem {
+    struct _stritem *next;
+    struct _stritem *prev;
+    struct _stritem *h_next;    /* hash chain next */
+    rel_time_t      time;       /* least recent access */
+    rel_time_t      exptime;    /* expire time */
+    int             nbytes;     /* size of data */
+    unsigned short  refcount;
+    uint8_t         nsuffix;    /* length of flags-and-length string */
+    uint8_t         it_flags;   /* ITEM_* above */
+    uint8_t         slabs_clsid;/* which slab class we're in */
+    uint8_t         nkey;       /* key length, w/terminating null and padding */
+    void * end[];
+    /* then null-terminated key */
+    /* then " flags length\r\n" (no terminating null) */
+    /* then data with terminating \r\n (no terminating null; it's binary!) */
+} item;
+
+#define ITEM_key(item) ((char*)&((item)->end[0]))
+
+/* warning: don't use these macros with a function, as it evals its arg twice */
+#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1)
+#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 + (item)->nsuffix)
+#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 + (item)->nsuffix + (item)->nbytes)
+
+enum conn_states {
+    conn_listening,  /* the socket which listens for connections */
+    conn_read,       /* reading in a command line */
+    conn_write,      /* writing out a simple response */
+    conn_nread,      /* reading in a fixed number of bytes */
+    conn_swallow,    /* swallowing unnecessary bytes w/o storing */
+    conn_closing,    /* closing this connection */
+    conn_mwrite      /* writing out many items sequentially */
+};
+
+#define NREAD_ADD 1
+#define NREAD_SET 2
+#define NREAD_REPLACE 3
+
+typedef struct {
+    int    sfd;
+    int    state;
+    struct event event;
+    short  ev_flags;
+    short  which;   /* which events were just triggered */
+
+    char   *rbuf;   /* buffer to read commands into */
+    char   *rcurr;  /* but if we parsed some already, this is where we stopped */
+    int    rsize;   /* total allocated size of rbuf */
+    int    rbytes;  /* how much data, starting from rcur, do we have unparsed */
+
+    char   *wbuf;
+    char   *wcurr;
+    int    wsize;
+    int    wbytes;
+    int    write_and_go; /* which state to go into after finishing current write */
+    void   *write_and_free; /* free this memory after finishing writing */
+
+    char   *ritem;  /* when we read in an item's value, it goes here */
+    int    rlbytes;
+
+    /* data for the nread state */
+
+    /*
+     * item is used to hold an item structure created after reading the command
+     * line of set/add/replace commands, but before we finished reading the actual
+     * data. The data is read into ITEM_data(item) to avoid extra copying.
+     */
+
+    void   *item;     /* for commands set/add/replace  */
+    int    item_comm; /* which one is it: set/add/replace */
+
+    /* data for the swallow state */
+    int    sbytes;    /* how many bytes to swallow */
+
+    /* data for the mwrite state */
+    struct iovec *iov;
+    int    iovsize;   /* number of elements allocated in iov[] */
+    int    iovused;   /* number of elements used in iov[] */
+
+    struct msghdr *msglist;
+    int    msgsize;   /* number of elements allocated in msglist[] */
+    int    msgused;   /* number of elements used in msglist[] */
+    int    msgcurr;   /* element in msglist[] being transmitted now */
+    int    msgbytes;  /* number of bytes in current msg */
+
+    item   **ilist;   /* list of items to write out */
+    int    isize;
+    item   **icurr;
+    int    ileft;
+
+    /* data for UDP clients */
+    bool   udp;       /* is this is a UDP "connection" */
+    int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
+    struct sockaddr request_addr; /* Who sent the most recent request */
+    socklen_t request_addr_size;
+    unsigned char *hdrbuf; /* udp packet headers */
+    int    hdrsize;   /* number of headers' worth of space is allocated */
+
+    int    binary;    /* are we in binary mode */
+    int    bucket;    /* bucket number for the next command, if running as
+                         a managed instance. -1 (_not_ 0) means invalid. */
+    int    gen;       /* generation requested for the bucket */
+} conn;
+
+/* number of virtual buckets for a managed instance */
+#define MAX_BUCKETS 32768
+
+/* current time of day (updated periodically) */
+extern volatile rel_time_t current_time;
+
+/* temporary hack */
+/* #define assert(x) if(!(x)) { printf("assert failure: %s\n", #x); pre_gdb(); }
+   void pre_gdb (); */
+
+/*
+ * Functions
+ */
+
+conn *do_conn_from_freelist();
+int do_conn_add_to_freelist(conn *c);
+char *do_defer_delete(item *item, time_t exptime);
+void do_run_deferred_deletes(void);
+char *do_add_delta(item *item, int incr, const unsigned int delta, char *buf);
+int do_store_item(item *item, int comm);
+conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp, struct event_base *base);
+
+
+#include "stats.h"
+#include "slabs.h"
+#include "assoc.h"
+#include "items.h"
+
+
+/*
+ * In multithreaded mode, we wrap certain functions with lock management and
+ * replace the logic of some other functions. All wrapped functions have
+ * "mt_" and "do_" variants. In multithreaded mode, the plain version of a
+ * function is #define-d to the "mt_" variant, which often just grabs a
+ * lock and calls the "do_" function. In singlethreaded mode, the "do_"
+ * function is called directly.
+ *
+ * Functions such as the libevent-related calls that need to do cross-thread
+ * communication in multithreaded mode (rather than actually doing the work
+ * in the current thread) are called via "dispatch_" frontends, which are
+ * also #define-d to directly call the underlying code in singlethreaded mode.
+ */
+#ifdef USE_THREADS
+
+void thread_init(int nthreads, struct event_base *main_base);
+int  dispatch_event_add(int thread, conn *c);
+void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
+
+/* Lock wrappers for cache functions that are called from main loop. */
+char *mt_add_delta(item *item, const int incr, const unsigned int delta, char *buf);
+void mt_assoc_move_next_bucket(void);
+conn *mt_conn_from_freelist(void);
+int   mt_conn_add_to_freelist(conn *c);
+char *mt_defer_delete(item *it, time_t exptime);
+int   mt_is_listen_thread(void);
+item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
+void  mt_item_flush_expired(void);
+item *mt_item_get_notedeleted(const char *key, const size_t nkey, bool *delete_locked);
+int   mt_item_link(item *it);
+void  mt_item_remove(item *it);
+int   mt_item_replace(item *it, item *new_it);
+void  mt_item_unlink(item *it);
+void  mt_item_update(item *it);
+void  mt_run_deferred_deletes(void);
+void *mt_slabs_alloc(size_t size);
+void  mt_slabs_free(void *ptr, size_t size);
+int   mt_slabs_reassign(unsigned char srcid, unsigned char dstid);
+char *mt_slabs_stats(int *buflen);
+void  mt_stats_lock(void);
+void  mt_stats_unlock(void);
+int   mt_store_item(item *item, int comm);
+
+
+# define add_delta(x,y,z,a)          mt_add_delta(x,y,z,a)
+# define assoc_move_next_bucket()    mt_assoc_move_next_bucket()
+# define conn_from_freelist()        mt_conn_from_freelist()
+# define conn_add_to_freelist(x)     mt_conn_add_to_freelist(x)
+# define defer_delete(x,y)           mt_defer_delete(x,y)
+# define is_listen_thread()          mt_is_listen_thread()
+# define item_alloc(x,y,z,a,b)       mt_item_alloc(x,y,z,a,b)
+# define item_flush_expired()        mt_item_flush_expired()
+# define item_get_notedeleted(x,y,z) mt_item_get_notedeleted(x,y,z)
+# define item_link(x)                mt_item_link(x)
+# define item_remove(x)              mt_item_remove(x)
+# define item_replace(x,y)           mt_item_replace(x,y)
+# define item_update(x)              mt_item_update(x)
+# define item_unlink(x)              mt_item_unlink(x)
+# define run_deferred_deletes()      mt_run_deferred_deletes()
+# define slabs_alloc(x)              mt_slabs_alloc(x)
+# define slabs_free(x,y)             mt_slabs_free(x,y)
+# define slabs_reassign(x,y)         mt_slabs_reassign(x,y)
+# define slabs_stats(x)              mt_slabs_stats(x)
+# define store_item(x,y)             mt_store_item(x,y)
+
+# define STATS_LOCK()                mt_stats_lock()
+# define STATS_UNLOCK()              mt_stats_unlock()
+
+#else /* !USE_THREADS */
+
+# define add_delta(x,y,z,a)          do_add_delta(x,y,z,a)
+# define assoc_move_next_bucket()    do_assoc_move_next_bucket()
+# define conn_from_freelist()        do_conn_from_freelist()
+# define conn_add_to_freelist(x)     do_conn_add_to_freelist(x)
+# define defer_delete(x,y)           do_defer_delete(x,y)
+# define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base)
+# define dispatch_event_add(t,c)     event_add(&(c)->event, 0)
+# define is_listen_thread()          1
+# define item_alloc(x,y,z,a,b)       do_item_alloc(x,y,z,a,b)
+# define item_flush_expired()        do_item_flush_expired()
+# define item_get_notedeleted(x,y,z) do_item_get_notedeleted(x,y,z)
+# define item_link(x)                do_item_link(x)
+# define item_remove(x)              do_item_remove(x)
+# define item_replace(x,y)           do_item_replace(x,y)
+# define item_unlink(x)              do_item_unlink(x)
+# define item_update(x)              do_item_update(x)
+# define run_deferred_deletes()      do_run_deferred_deletes()
+# define slabs_alloc(x)              do_slabs_alloc(x)
+# define slabs_free(x,y)             do_slabs_free(x,y)
+# define slabs_reassign(x,y)         do_slabs_reassign(x,y)
+# define slabs_stats(x)              do_slabs_stats(x)
+# define store_item(x,y)             do_store_item(x,y)
+# define thread_init(x,y)            0
+
+# define STATS_LOCK()                /**/
+# define STATS_UNLOCK()              /**/
+
+#endif /* !USE_THREADS */
+
+