comparison thread.c @ 0:30782bb1fc04 MEMCACHED_1_2_3

memcached-1.2.3
author Maxim Dounin <mdounin@mdounin.ru>
date Sun, 23 Sep 2007 03:58:34 +0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:30782bb1fc04
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Thread management for memcached.
4 *
5 * $Id$
6 */
7 #include "memcached.h"
8 #include <stdio.h>
9 #include <errno.h>
10 #include <stdlib.h>
11 #include <errno.h>
12
13 #ifdef HAVE_MALLOC_H
14 #include <malloc.h>
15 #endif
16
17 #ifdef HAVE_STRING_H
18 #include <string.h>
19 #endif
20
21 #ifdef USE_THREADS
22
23 #include <pthread.h>
24
25 #define ITEMS_PER_ALLOC 64
26
27 /* An item in the connection queue. */
28 typedef struct conn_queue_item CQ_ITEM;
29 struct conn_queue_item {
30 int sfd;
31 int init_state;
32 int event_flags;
33 int read_buffer_size;
34 int is_udp;
35 CQ_ITEM *next;
36 };
37
38 /* A connection queue. */
39 typedef struct conn_queue CQ;
40 struct conn_queue {
41 CQ_ITEM *head;
42 CQ_ITEM *tail;
43 pthread_mutex_t lock;
44 pthread_cond_t cond;
45 };
46
47 /* Lock for connection freelist */
48 static pthread_mutex_t conn_lock;
49
50 /* Lock for cache operations (item_*, assoc_*) */
51 static pthread_mutex_t cache_lock;
52
53 /* Lock for slab allocator operations */
54 static pthread_mutex_t slabs_lock;
55
56 /* Lock for global stats */
57 static pthread_mutex_t stats_lock;
58
59 /* Free list of CQ_ITEM structs */
60 static CQ_ITEM *cqi_freelist;
61 static pthread_mutex_t cqi_freelist_lock;
62
63 /*
64 * Each libevent instance has a wakeup pipe, which other threads
65 * can use to signal that they've put a new connection on its queue.
66 */
67 typedef struct {
68 pthread_t thread_id; /* unique ID of this thread */
69 struct event_base *base; /* libevent handle this thread uses */
70 struct event notify_event; /* listen event for notify pipe */
71 int notify_receive_fd; /* receiving end of notify pipe */
72 int notify_send_fd; /* sending end of notify pipe */
73 CQ new_conn_queue; /* queue of new connections to handle */
74 } LIBEVENT_THREAD;
75
76 static LIBEVENT_THREAD *threads;
77
78 /*
79 * Number of threads that have finished setting themselves up.
80 */
81 static int init_count = 0;
82 static pthread_mutex_t init_lock;
83 static pthread_cond_t init_cond;
84
85
86 static void thread_libevent_process(int fd, short which, void *arg);
87
88 /*
89 * Initializes a connection queue.
90 */
91 static void cq_init(CQ *cq) {
92 pthread_mutex_init(&cq->lock, NULL);
93 pthread_cond_init(&cq->cond, NULL);
94 cq->head = NULL;
95 cq->tail = NULL;
96 }
97
98 /*
99 * Waits for work on a connection queue.
100 */
101 static CQ_ITEM *cq_pop(CQ *cq) {
102 CQ_ITEM *item;
103
104 pthread_mutex_lock(&cq->lock);
105 while (NULL == cq->head)
106 pthread_cond_wait(&cq->cond, &cq->lock);
107 item = cq->head;
108 cq->head = item->next;
109 if (NULL == cq->head)
110 cq->tail = NULL;
111 pthread_mutex_unlock(&cq->lock);
112
113 return item;
114 }
115
116 /*
117 * Looks for an item on a connection queue, but doesn't block if there isn't
118 * one.
119 * Returns the item, or NULL if no item is available
120 */
121 static CQ_ITEM *cq_peek(CQ *cq) {
122 CQ_ITEM *item;
123
124 pthread_mutex_lock(&cq->lock);
125 item = cq->head;
126 if (NULL != item) {
127 cq->head = item->next;
128 if (NULL == cq->head)
129 cq->tail = NULL;
130 }
131 pthread_mutex_unlock(&cq->lock);
132
133 return item;
134 }
135
136 /*
137 * Adds an item to a connection queue.
138 */
139 static void cq_push(CQ *cq, CQ_ITEM *item) {
140 item->next = NULL;
141
142 pthread_mutex_lock(&cq->lock);
143 if (NULL == cq->tail)
144 cq->head = item;
145 else
146 cq->tail->next = item;
147 cq->tail = item;
148 pthread_cond_signal(&cq->cond);
149 pthread_mutex_unlock(&cq->lock);
150 }
151
152 /*
153 * Returns a fresh connection queue item.
154 */
155 static CQ_ITEM *cqi_new() {
156 CQ_ITEM *item = NULL;
157 pthread_mutex_lock(&cqi_freelist_lock);
158 if (cqi_freelist) {
159 item = cqi_freelist;
160 cqi_freelist = item->next;
161 }
162 pthread_mutex_unlock(&cqi_freelist_lock);
163
164 if (NULL == item) {
165 int i;
166
167 /* Allocate a bunch of items at once to reduce fragmentation */
168 item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
169 if (NULL == item)
170 return NULL;
171
172 /*
173 * Link together all the new items except the first one
174 * (which we'll return to the caller) for placement on
175 * the freelist.
176 */
177 for (i = 2; i < ITEMS_PER_ALLOC; i++)
178 item[i - 1].next = &item[i];
179
180 pthread_mutex_lock(&cqi_freelist_lock);
181 item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
182 cqi_freelist = &item[1];
183 pthread_mutex_unlock(&cqi_freelist_lock);
184 }
185
186 return item;
187 }
188
189
190 /*
191 * Frees a connection queue item (adds it to the freelist.)
192 */
193 static void cqi_free(CQ_ITEM *item) {
194 pthread_mutex_lock(&cqi_freelist_lock);
195 item->next = cqi_freelist;
196 cqi_freelist = item;
197 pthread_mutex_unlock(&cqi_freelist_lock);
198 }
199
200
201 /*
202 * Creates a worker thread.
203 */
204 static void create_worker(void *(*func)(void *), void *arg) {
205 pthread_t thread;
206 pthread_attr_t attr;
207 int ret;
208
209 pthread_attr_init(&attr);
210
211 if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
212 fprintf(stderr, "Can't create thread: %s\n",
213 strerror(ret));
214 exit(1);
215 }
216 }
217
218
219 /*
220 * Pulls a conn structure from the freelist, if one is available.
221 */
222 conn *mt_conn_from_freelist() {
223 conn *c;
224
225 pthread_mutex_lock(&conn_lock);
226 c = do_conn_from_freelist();
227 pthread_mutex_unlock(&conn_lock);
228
229 return c;
230 }
231
232
233 /*
234 * Adds a conn structure to the freelist.
235 *
236 * Returns 0 on success, 1 if the structure couldn't be added.
237 */
238 int mt_conn_add_to_freelist(conn *c) {
239 int result;
240
241 pthread_mutex_lock(&conn_lock);
242 result = do_conn_add_to_freelist(c);
243 pthread_mutex_unlock(&conn_lock);
244
245 return result;
246 }
247
248 /****************************** LIBEVENT THREADS *****************************/
249
250 /*
251 * Set up a thread's information.
252 */
253 static void setup_thread(LIBEVENT_THREAD *me) {
254 if (! me->base) {
255 me->base = event_init();
256 if (! me->base) {
257 fprintf(stderr, "Can't allocate event base\n");
258 exit(1);
259 }
260 }
261
262 /* Listen for notifications from other threads */
263 event_set(&me->notify_event, me->notify_receive_fd,
264 EV_READ | EV_PERSIST, thread_libevent_process, me);
265 event_base_set(me->base, &me->notify_event);
266
267 if (event_add(&me->notify_event, 0) == -1) {
268 fprintf(stderr, "Can't monitor libevent notify pipe\n");
269 exit(1);
270 }
271
272 cq_init(&me->new_conn_queue);
273 }
274
275
276 /*
277 * Worker thread: main event loop
278 */
279 static void *worker_libevent(void *arg) {
280 LIBEVENT_THREAD *me = arg;
281
282 /* Any per-thread setup can happen here; thread_init() will block until
283 * all threads have finished initializing.
284 */
285
286 pthread_mutex_lock(&init_lock);
287 init_count++;
288 pthread_cond_signal(&init_cond);
289 pthread_mutex_unlock(&init_lock);
290
291 return (void*) event_base_loop(me->base, 0);
292 }
293
294
295 /*
296 * Processes an incoming "handle a new connection" item. This is called when
297 * input arrives on the libevent wakeup pipe.
298 */
299 static void thread_libevent_process(int fd, short which, void *arg) {
300 LIBEVENT_THREAD *me = arg;
301 CQ_ITEM *item;
302 char buf[1];
303
304 if (read(fd, buf, 1) != 1)
305 if (settings.verbose > 0)
306 fprintf(stderr, "Can't read from libevent pipe\n");
307
308 item = cq_peek(&me->new_conn_queue);
309
310 if (NULL != item) {
311 conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
312 item->read_buffer_size, item->is_udp, me->base);
313 if (!c) {
314 if (item->is_udp) {
315 fprintf(stderr, "Can't listen for events on UDP socket\n");
316 exit(1);
317 } else {
318 if (settings.verbose > 0) {
319 fprintf(stderr, "Can't listen for events on fd %d\n",
320 item->sfd);
321 }
322 close(item->sfd);
323 }
324 }
325 cqi_free(item);
326 }
327 }
328
329 /* Which thread we assigned a connection to most recently. */
330 static int last_thread = -1;
331
332 /*
333 * Dispatches a new connection to another thread. This is only ever called
334 * from the main thread, either during initialization (for UDP) or because
335 * of an incoming connection.
336 */
337 void dispatch_conn_new(int sfd, int init_state, int event_flags,
338 int read_buffer_size, int is_udp) {
339 CQ_ITEM *item = cqi_new();
340 int thread = (last_thread + 1) % settings.num_threads;
341
342 last_thread = thread;
343
344 item->sfd = sfd;
345 item->init_state = init_state;
346 item->event_flags = event_flags;
347 item->read_buffer_size = read_buffer_size;
348 item->is_udp = is_udp;
349
350 cq_push(&threads[thread].new_conn_queue, item);
351 if (write(threads[thread].notify_send_fd, "", 1) != 1) {
352 perror("Writing to thread notify pipe");
353 }
354 }
355
356 /*
357 * Returns true if this is the thread that listens for new TCP connections.
358 */
359 int mt_is_listen_thread() {
360 return pthread_self() == threads[0].thread_id;
361 }
362
363 /********************************* ITEM ACCESS *******************************/
364
365 /*
366 * Walks through the list of deletes that have been deferred because the items
367 * were locked down at the tmie.
368 */
369 void mt_run_deferred_deletes() {
370 pthread_mutex_lock(&cache_lock);
371 do_run_deferred_deletes();
372 pthread_mutex_unlock(&cache_lock);
373 }
374
375 /*
376 * Allocates a new item.
377 */
378 item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
379 item *it;
380 pthread_mutex_lock(&cache_lock);
381 it = do_item_alloc(key, nkey, flags, exptime, nbytes);
382 pthread_mutex_unlock(&cache_lock);
383 return it;
384 }
385
386 /*
387 * Returns an item if it hasn't been marked as expired or deleted,
388 * lazy-expiring as needed.
389 */
390 item *mt_item_get_notedeleted(const char *key, const size_t nkey, bool *delete_locked) {
391 item *it;
392 pthread_mutex_lock(&cache_lock);
393 it = do_item_get_notedeleted(key, nkey, delete_locked);
394 pthread_mutex_unlock(&cache_lock);
395 return it;
396 }
397
398 /*
399 * Links an item into the LRU and hashtable.
400 */
401 int mt_item_link(item *item) {
402 int ret;
403
404 pthread_mutex_lock(&cache_lock);
405 ret = do_item_link(item);
406 pthread_mutex_unlock(&cache_lock);
407 return ret;
408 }
409
410 /*
411 * Decrements the reference count on an item and adds it to the freelist if
412 * needed.
413 */
414 void mt_item_remove(item *item) {
415 pthread_mutex_lock(&cache_lock);
416 do_item_remove(item);
417 pthread_mutex_unlock(&cache_lock);
418 }
419
420 /*
421 * Replaces one item with another in the hashtable.
422 */
423 int mt_item_replace(item *old, item *new) {
424 int ret;
425
426 pthread_mutex_lock(&cache_lock);
427 ret = do_item_replace(old, new);
428 pthread_mutex_unlock(&cache_lock);
429 return ret;
430 }
431
432 /*
433 * Unlinks an item from the LRU and hashtable.
434 */
435 void mt_item_unlink(item *item) {
436 pthread_mutex_lock(&cache_lock);
437 do_item_unlink(item);
438 pthread_mutex_unlock(&cache_lock);
439 }
440
441 /*
442 * Moves an item to the back of the LRU queue.
443 */
444 void mt_item_update(item *item) {
445 pthread_mutex_lock(&cache_lock);
446 do_item_update(item);
447 pthread_mutex_unlock(&cache_lock);
448 }
449
450 /*
451 * Adds an item to the deferred-delete list so it can be reaped later.
452 */
453 char *mt_defer_delete(item *item, time_t exptime) {
454 char *ret;
455
456 pthread_mutex_lock(&cache_lock);
457 ret = do_defer_delete(item, exptime);
458 pthread_mutex_unlock(&cache_lock);
459 return ret;
460 }
461
462 /*
463 * Does arithmetic on a numeric item value.
464 */
465 char *mt_add_delta(item *item, int incr, unsigned int delta, char *buf) {
466 char *ret;
467
468 pthread_mutex_lock(&cache_lock);
469 ret = do_add_delta(item, incr, delta, buf);
470 pthread_mutex_unlock(&cache_lock);
471 return ret;
472 }
473
474 /*
475 * Stores an item in the cache (high level, obeys set/add/replace semantics)
476 */
477 int mt_store_item(item *item, int comm) {
478 int ret;
479
480 pthread_mutex_lock(&cache_lock);
481 ret = do_store_item(item, comm);
482 pthread_mutex_unlock(&cache_lock);
483 return ret;
484 }
485
486 /*
487 * Flushes expired items after a flush_all call
488 */
489 void mt_item_flush_expired() {
490 pthread_mutex_lock(&cache_lock);
491 do_item_flush_expired();
492 pthread_mutex_unlock(&cache_lock);
493 }
494
495 /****************************** HASHTABLE MODULE *****************************/
496
497 void mt_assoc_move_next_bucket() {
498 pthread_mutex_lock(&cache_lock);
499 do_assoc_move_next_bucket();
500 pthread_mutex_unlock(&cache_lock);
501 }
502
503 /******************************* SLAB ALLOCATOR ******************************/
504
505 void *mt_slabs_alloc(size_t size) {
506 void *ret;
507
508 pthread_mutex_lock(&slabs_lock);
509 ret = do_slabs_alloc(size);
510 pthread_mutex_unlock(&slabs_lock);
511 return ret;
512 }
513
514 void mt_slabs_free(void *ptr, size_t size) {
515 pthread_mutex_lock(&slabs_lock);
516 do_slabs_free(ptr, size);
517 pthread_mutex_unlock(&slabs_lock);
518 }
519
520 char *mt_slabs_stats(int *buflen) {
521 char *ret;
522
523 pthread_mutex_lock(&slabs_lock);
524 ret = do_slabs_stats(buflen);
525 pthread_mutex_unlock(&slabs_lock);
526 return ret;
527 }
528
529 #ifdef ALLOW_SLABS_REASSIGN
530 int mt_slabs_reassign(unsigned char srcid, unsigned char dstid) {
531 int ret;
532
533 pthread_mutex_lock(&slabs_lock);
534 ret = do_slabs_reassign(srcid, dstid);
535 pthread_mutex_unlock(&slabs_lock);
536 return ret;
537 }
538 #endif
539
540 /******************************* GLOBAL STATS ******************************/
541
542 void mt_stats_lock() {
543 pthread_mutex_lock(&stats_lock);
544 }
545
546 void mt_stats_unlock() {
547 pthread_mutex_unlock(&stats_lock);
548 }
549
550 /*
551 * Initializes the thread subsystem, creating various worker threads.
552 *
553 * nthreads Number of event handler threads to spawn
554 * main_base Event base for main thread
555 */
556 void thread_init(int nthreads, struct event_base *main_base) {
557 int i;
558
559 pthread_mutex_init(&cache_lock, NULL);
560 pthread_mutex_init(&conn_lock, NULL);
561 pthread_mutex_init(&slabs_lock, NULL);
562 pthread_mutex_init(&stats_lock, NULL);
563
564 pthread_mutex_init(&init_lock, NULL);
565 pthread_cond_init(&init_cond, NULL);
566
567 pthread_mutex_init(&cqi_freelist_lock, NULL);
568 cqi_freelist = NULL;
569
570 threads = malloc(sizeof(LIBEVENT_THREAD) * nthreads);
571 if (! threads) {
572 perror("Can't allocate thread descriptors");
573 exit(1);
574 }
575
576 threads[0].base = main_base;
577 threads[0].thread_id = pthread_self();
578
579 for (i = 0; i < nthreads; i++) {
580 int fds[2];
581 if (pipe(fds)) {
582 perror("Can't create notify pipe");
583 exit(1);
584 }
585
586 threads[i].notify_receive_fd = fds[0];
587 threads[i].notify_send_fd = fds[1];
588
589 setup_thread(&threads[i]);
590 }
591
592 /* Create threads after we've done all the libevent setup. */
593 for (i = 1; i < nthreads; i++) {
594 create_worker(worker_libevent, &threads[i]);
595 }
596
597 /* Wait for all the threads to set themselves up before returning. */
598 pthread_mutex_lock(&init_lock);
599 init_count++; // main thread
600 while (init_count < nthreads) {
601 pthread_cond_wait(&init_cond, &init_lock);
602 }
603 pthread_mutex_unlock(&init_lock);
604 }
605
606 #endif