Mercurial > hg > memcached
comparison thread.c @ 0:30782bb1fc04 MEMCACHED_1_2_3
memcached-1.2.3
author | Maxim Dounin <mdounin@mdounin.ru> |
---|---|
date | Sun, 23 Sep 2007 03:58:34 +0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:30782bb1fc04 |
---|---|
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ | |
2 /* | |
3 * Thread management for memcached. | |
4 * | |
5 * $Id$ | |
6 */ | |
7 #include "memcached.h" | |
8 #include <stdio.h> | |
9 #include <errno.h> | |
10 #include <stdlib.h> | |
11 #include <errno.h> | |
12 | |
13 #ifdef HAVE_MALLOC_H | |
14 #include <malloc.h> | |
15 #endif | |
16 | |
17 #ifdef HAVE_STRING_H | |
18 #include <string.h> | |
19 #endif | |
20 | |
21 #ifdef USE_THREADS | |
22 | |
23 #include <pthread.h> | |
24 | |
25 #define ITEMS_PER_ALLOC 64 | |
26 | |
27 /* An item in the connection queue. */ | |
28 typedef struct conn_queue_item CQ_ITEM; | |
29 struct conn_queue_item { | |
30 int sfd; | |
31 int init_state; | |
32 int event_flags; | |
33 int read_buffer_size; | |
34 int is_udp; | |
35 CQ_ITEM *next; | |
36 }; | |
37 | |
38 /* A connection queue. */ | |
39 typedef struct conn_queue CQ; | |
40 struct conn_queue { | |
41 CQ_ITEM *head; | |
42 CQ_ITEM *tail; | |
43 pthread_mutex_t lock; | |
44 pthread_cond_t cond; | |
45 }; | |
46 | |
47 /* Lock for connection freelist */ | |
48 static pthread_mutex_t conn_lock; | |
49 | |
50 /* Lock for cache operations (item_*, assoc_*) */ | |
51 static pthread_mutex_t cache_lock; | |
52 | |
53 /* Lock for slab allocator operations */ | |
54 static pthread_mutex_t slabs_lock; | |
55 | |
56 /* Lock for global stats */ | |
57 static pthread_mutex_t stats_lock; | |
58 | |
59 /* Free list of CQ_ITEM structs */ | |
60 static CQ_ITEM *cqi_freelist; | |
61 static pthread_mutex_t cqi_freelist_lock; | |
62 | |
63 /* | |
64 * Each libevent instance has a wakeup pipe, which other threads | |
65 * can use to signal that they've put a new connection on its queue. | |
66 */ | |
67 typedef struct { | |
68 pthread_t thread_id; /* unique ID of this thread */ | |
69 struct event_base *base; /* libevent handle this thread uses */ | |
70 struct event notify_event; /* listen event for notify pipe */ | |
71 int notify_receive_fd; /* receiving end of notify pipe */ | |
72 int notify_send_fd; /* sending end of notify pipe */ | |
73 CQ new_conn_queue; /* queue of new connections to handle */ | |
74 } LIBEVENT_THREAD; | |
75 | |
76 static LIBEVENT_THREAD *threads; | |
77 | |
78 /* | |
79 * Number of threads that have finished setting themselves up. | |
80 */ | |
81 static int init_count = 0; | |
82 static pthread_mutex_t init_lock; | |
83 static pthread_cond_t init_cond; | |
84 | |
85 | |
86 static void thread_libevent_process(int fd, short which, void *arg); | |
87 | |
88 /* | |
89 * Initializes a connection queue. | |
90 */ | |
91 static void cq_init(CQ *cq) { | |
92 pthread_mutex_init(&cq->lock, NULL); | |
93 pthread_cond_init(&cq->cond, NULL); | |
94 cq->head = NULL; | |
95 cq->tail = NULL; | |
96 } | |
97 | |
98 /* | |
99 * Waits for work on a connection queue. | |
100 */ | |
101 static CQ_ITEM *cq_pop(CQ *cq) { | |
102 CQ_ITEM *item; | |
103 | |
104 pthread_mutex_lock(&cq->lock); | |
105 while (NULL == cq->head) | |
106 pthread_cond_wait(&cq->cond, &cq->lock); | |
107 item = cq->head; | |
108 cq->head = item->next; | |
109 if (NULL == cq->head) | |
110 cq->tail = NULL; | |
111 pthread_mutex_unlock(&cq->lock); | |
112 | |
113 return item; | |
114 } | |
115 | |
116 /* | |
117 * Looks for an item on a connection queue, but doesn't block if there isn't | |
118 * one. | |
119 * Returns the item, or NULL if no item is available | |
120 */ | |
121 static CQ_ITEM *cq_peek(CQ *cq) { | |
122 CQ_ITEM *item; | |
123 | |
124 pthread_mutex_lock(&cq->lock); | |
125 item = cq->head; | |
126 if (NULL != item) { | |
127 cq->head = item->next; | |
128 if (NULL == cq->head) | |
129 cq->tail = NULL; | |
130 } | |
131 pthread_mutex_unlock(&cq->lock); | |
132 | |
133 return item; | |
134 } | |
135 | |
136 /* | |
137 * Adds an item to a connection queue. | |
138 */ | |
139 static void cq_push(CQ *cq, CQ_ITEM *item) { | |
140 item->next = NULL; | |
141 | |
142 pthread_mutex_lock(&cq->lock); | |
143 if (NULL == cq->tail) | |
144 cq->head = item; | |
145 else | |
146 cq->tail->next = item; | |
147 cq->tail = item; | |
148 pthread_cond_signal(&cq->cond); | |
149 pthread_mutex_unlock(&cq->lock); | |
150 } | |
151 | |
152 /* | |
153 * Returns a fresh connection queue item. | |
154 */ | |
155 static CQ_ITEM *cqi_new() { | |
156 CQ_ITEM *item = NULL; | |
157 pthread_mutex_lock(&cqi_freelist_lock); | |
158 if (cqi_freelist) { | |
159 item = cqi_freelist; | |
160 cqi_freelist = item->next; | |
161 } | |
162 pthread_mutex_unlock(&cqi_freelist_lock); | |
163 | |
164 if (NULL == item) { | |
165 int i; | |
166 | |
167 /* Allocate a bunch of items at once to reduce fragmentation */ | |
168 item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC); | |
169 if (NULL == item) | |
170 return NULL; | |
171 | |
172 /* | |
173 * Link together all the new items except the first one | |
174 * (which we'll return to the caller) for placement on | |
175 * the freelist. | |
176 */ | |
177 for (i = 2; i < ITEMS_PER_ALLOC; i++) | |
178 item[i - 1].next = &item[i]; | |
179 | |
180 pthread_mutex_lock(&cqi_freelist_lock); | |
181 item[ITEMS_PER_ALLOC - 1].next = cqi_freelist; | |
182 cqi_freelist = &item[1]; | |
183 pthread_mutex_unlock(&cqi_freelist_lock); | |
184 } | |
185 | |
186 return item; | |
187 } | |
188 | |
189 | |
190 /* | |
191 * Frees a connection queue item (adds it to the freelist.) | |
192 */ | |
193 static void cqi_free(CQ_ITEM *item) { | |
194 pthread_mutex_lock(&cqi_freelist_lock); | |
195 item->next = cqi_freelist; | |
196 cqi_freelist = item; | |
197 pthread_mutex_unlock(&cqi_freelist_lock); | |
198 } | |
199 | |
200 | |
201 /* | |
202 * Creates a worker thread. | |
203 */ | |
204 static void create_worker(void *(*func)(void *), void *arg) { | |
205 pthread_t thread; | |
206 pthread_attr_t attr; | |
207 int ret; | |
208 | |
209 pthread_attr_init(&attr); | |
210 | |
211 if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) { | |
212 fprintf(stderr, "Can't create thread: %s\n", | |
213 strerror(ret)); | |
214 exit(1); | |
215 } | |
216 } | |
217 | |
218 | |
219 /* | |
220 * Pulls a conn structure from the freelist, if one is available. | |
221 */ | |
222 conn *mt_conn_from_freelist() { | |
223 conn *c; | |
224 | |
225 pthread_mutex_lock(&conn_lock); | |
226 c = do_conn_from_freelist(); | |
227 pthread_mutex_unlock(&conn_lock); | |
228 | |
229 return c; | |
230 } | |
231 | |
232 | |
233 /* | |
234 * Adds a conn structure to the freelist. | |
235 * | |
236 * Returns 0 on success, 1 if the structure couldn't be added. | |
237 */ | |
238 int mt_conn_add_to_freelist(conn *c) { | |
239 int result; | |
240 | |
241 pthread_mutex_lock(&conn_lock); | |
242 result = do_conn_add_to_freelist(c); | |
243 pthread_mutex_unlock(&conn_lock); | |
244 | |
245 return result; | |
246 } | |
247 | |
248 /****************************** LIBEVENT THREADS *****************************/ | |
249 | |
250 /* | |
251 * Set up a thread's information. | |
252 */ | |
253 static void setup_thread(LIBEVENT_THREAD *me) { | |
254 if (! me->base) { | |
255 me->base = event_init(); | |
256 if (! me->base) { | |
257 fprintf(stderr, "Can't allocate event base\n"); | |
258 exit(1); | |
259 } | |
260 } | |
261 | |
262 /* Listen for notifications from other threads */ | |
263 event_set(&me->notify_event, me->notify_receive_fd, | |
264 EV_READ | EV_PERSIST, thread_libevent_process, me); | |
265 event_base_set(me->base, &me->notify_event); | |
266 | |
267 if (event_add(&me->notify_event, 0) == -1) { | |
268 fprintf(stderr, "Can't monitor libevent notify pipe\n"); | |
269 exit(1); | |
270 } | |
271 | |
272 cq_init(&me->new_conn_queue); | |
273 } | |
274 | |
275 | |
276 /* | |
277 * Worker thread: main event loop | |
278 */ | |
279 static void *worker_libevent(void *arg) { | |
280 LIBEVENT_THREAD *me = arg; | |
281 | |
282 /* Any per-thread setup can happen here; thread_init() will block until | |
283 * all threads have finished initializing. | |
284 */ | |
285 | |
286 pthread_mutex_lock(&init_lock); | |
287 init_count++; | |
288 pthread_cond_signal(&init_cond); | |
289 pthread_mutex_unlock(&init_lock); | |
290 | |
291 return (void*) event_base_loop(me->base, 0); | |
292 } | |
293 | |
294 | |
295 /* | |
296 * Processes an incoming "handle a new connection" item. This is called when | |
297 * input arrives on the libevent wakeup pipe. | |
298 */ | |
299 static void thread_libevent_process(int fd, short which, void *arg) { | |
300 LIBEVENT_THREAD *me = arg; | |
301 CQ_ITEM *item; | |
302 char buf[1]; | |
303 | |
304 if (read(fd, buf, 1) != 1) | |
305 if (settings.verbose > 0) | |
306 fprintf(stderr, "Can't read from libevent pipe\n"); | |
307 | |
308 item = cq_peek(&me->new_conn_queue); | |
309 | |
310 if (NULL != item) { | |
311 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, | |
312 item->read_buffer_size, item->is_udp, me->base); | |
313 if (!c) { | |
314 if (item->is_udp) { | |
315 fprintf(stderr, "Can't listen for events on UDP socket\n"); | |
316 exit(1); | |
317 } else { | |
318 if (settings.verbose > 0) { | |
319 fprintf(stderr, "Can't listen for events on fd %d\n", | |
320 item->sfd); | |
321 } | |
322 close(item->sfd); | |
323 } | |
324 } | |
325 cqi_free(item); | |
326 } | |
327 } | |
328 | |
329 /* Which thread we assigned a connection to most recently. */ | |
330 static int last_thread = -1; | |
331 | |
332 /* | |
333 * Dispatches a new connection to another thread. This is only ever called | |
334 * from the main thread, either during initialization (for UDP) or because | |
335 * of an incoming connection. | |
336 */ | |
337 void dispatch_conn_new(int sfd, int init_state, int event_flags, | |
338 int read_buffer_size, int is_udp) { | |
339 CQ_ITEM *item = cqi_new(); | |
340 int thread = (last_thread + 1) % settings.num_threads; | |
341 | |
342 last_thread = thread; | |
343 | |
344 item->sfd = sfd; | |
345 item->init_state = init_state; | |
346 item->event_flags = event_flags; | |
347 item->read_buffer_size = read_buffer_size; | |
348 item->is_udp = is_udp; | |
349 | |
350 cq_push(&threads[thread].new_conn_queue, item); | |
351 if (write(threads[thread].notify_send_fd, "", 1) != 1) { | |
352 perror("Writing to thread notify pipe"); | |
353 } | |
354 } | |
355 | |
356 /* | |
357 * Returns true if this is the thread that listens for new TCP connections. | |
358 */ | |
359 int mt_is_listen_thread() { | |
360 return pthread_self() == threads[0].thread_id; | |
361 } | |
362 | |
363 /********************************* ITEM ACCESS *******************************/ | |
364 | |
365 /* | |
366 * Walks through the list of deletes that have been deferred because the items | |
367 * were locked down at the tmie. | |
368 */ | |
369 void mt_run_deferred_deletes() { | |
370 pthread_mutex_lock(&cache_lock); | |
371 do_run_deferred_deletes(); | |
372 pthread_mutex_unlock(&cache_lock); | |
373 } | |
374 | |
375 /* | |
376 * Allocates a new item. | |
377 */ | |
378 item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) { | |
379 item *it; | |
380 pthread_mutex_lock(&cache_lock); | |
381 it = do_item_alloc(key, nkey, flags, exptime, nbytes); | |
382 pthread_mutex_unlock(&cache_lock); | |
383 return it; | |
384 } | |
385 | |
386 /* | |
387 * Returns an item if it hasn't been marked as expired or deleted, | |
388 * lazy-expiring as needed. | |
389 */ | |
390 item *mt_item_get_notedeleted(const char *key, const size_t nkey, bool *delete_locked) { | |
391 item *it; | |
392 pthread_mutex_lock(&cache_lock); | |
393 it = do_item_get_notedeleted(key, nkey, delete_locked); | |
394 pthread_mutex_unlock(&cache_lock); | |
395 return it; | |
396 } | |
397 | |
398 /* | |
399 * Links an item into the LRU and hashtable. | |
400 */ | |
401 int mt_item_link(item *item) { | |
402 int ret; | |
403 | |
404 pthread_mutex_lock(&cache_lock); | |
405 ret = do_item_link(item); | |
406 pthread_mutex_unlock(&cache_lock); | |
407 return ret; | |
408 } | |
409 | |
410 /* | |
411 * Decrements the reference count on an item and adds it to the freelist if | |
412 * needed. | |
413 */ | |
414 void mt_item_remove(item *item) { | |
415 pthread_mutex_lock(&cache_lock); | |
416 do_item_remove(item); | |
417 pthread_mutex_unlock(&cache_lock); | |
418 } | |
419 | |
420 /* | |
421 * Replaces one item with another in the hashtable. | |
422 */ | |
423 int mt_item_replace(item *old, item *new) { | |
424 int ret; | |
425 | |
426 pthread_mutex_lock(&cache_lock); | |
427 ret = do_item_replace(old, new); | |
428 pthread_mutex_unlock(&cache_lock); | |
429 return ret; | |
430 } | |
431 | |
432 /* | |
433 * Unlinks an item from the LRU and hashtable. | |
434 */ | |
435 void mt_item_unlink(item *item) { | |
436 pthread_mutex_lock(&cache_lock); | |
437 do_item_unlink(item); | |
438 pthread_mutex_unlock(&cache_lock); | |
439 } | |
440 | |
441 /* | |
442 * Moves an item to the back of the LRU queue. | |
443 */ | |
444 void mt_item_update(item *item) { | |
445 pthread_mutex_lock(&cache_lock); | |
446 do_item_update(item); | |
447 pthread_mutex_unlock(&cache_lock); | |
448 } | |
449 | |
450 /* | |
451 * Adds an item to the deferred-delete list so it can be reaped later. | |
452 */ | |
453 char *mt_defer_delete(item *item, time_t exptime) { | |
454 char *ret; | |
455 | |
456 pthread_mutex_lock(&cache_lock); | |
457 ret = do_defer_delete(item, exptime); | |
458 pthread_mutex_unlock(&cache_lock); | |
459 return ret; | |
460 } | |
461 | |
462 /* | |
463 * Does arithmetic on a numeric item value. | |
464 */ | |
465 char *mt_add_delta(item *item, int incr, unsigned int delta, char *buf) { | |
466 char *ret; | |
467 | |
468 pthread_mutex_lock(&cache_lock); | |
469 ret = do_add_delta(item, incr, delta, buf); | |
470 pthread_mutex_unlock(&cache_lock); | |
471 return ret; | |
472 } | |
473 | |
474 /* | |
475 * Stores an item in the cache (high level, obeys set/add/replace semantics) | |
476 */ | |
477 int mt_store_item(item *item, int comm) { | |
478 int ret; | |
479 | |
480 pthread_mutex_lock(&cache_lock); | |
481 ret = do_store_item(item, comm); | |
482 pthread_mutex_unlock(&cache_lock); | |
483 return ret; | |
484 } | |
485 | |
486 /* | |
487 * Flushes expired items after a flush_all call | |
488 */ | |
489 void mt_item_flush_expired() { | |
490 pthread_mutex_lock(&cache_lock); | |
491 do_item_flush_expired(); | |
492 pthread_mutex_unlock(&cache_lock); | |
493 } | |
494 | |
495 /****************************** HASHTABLE MODULE *****************************/ | |
496 | |
497 void mt_assoc_move_next_bucket() { | |
498 pthread_mutex_lock(&cache_lock); | |
499 do_assoc_move_next_bucket(); | |
500 pthread_mutex_unlock(&cache_lock); | |
501 } | |
502 | |
503 /******************************* SLAB ALLOCATOR ******************************/ | |
504 | |
505 void *mt_slabs_alloc(size_t size) { | |
506 void *ret; | |
507 | |
508 pthread_mutex_lock(&slabs_lock); | |
509 ret = do_slabs_alloc(size); | |
510 pthread_mutex_unlock(&slabs_lock); | |
511 return ret; | |
512 } | |
513 | |
514 void mt_slabs_free(void *ptr, size_t size) { | |
515 pthread_mutex_lock(&slabs_lock); | |
516 do_slabs_free(ptr, size); | |
517 pthread_mutex_unlock(&slabs_lock); | |
518 } | |
519 | |
520 char *mt_slabs_stats(int *buflen) { | |
521 char *ret; | |
522 | |
523 pthread_mutex_lock(&slabs_lock); | |
524 ret = do_slabs_stats(buflen); | |
525 pthread_mutex_unlock(&slabs_lock); | |
526 return ret; | |
527 } | |
528 | |
529 #ifdef ALLOW_SLABS_REASSIGN | |
530 int mt_slabs_reassign(unsigned char srcid, unsigned char dstid) { | |
531 int ret; | |
532 | |
533 pthread_mutex_lock(&slabs_lock); | |
534 ret = do_slabs_reassign(srcid, dstid); | |
535 pthread_mutex_unlock(&slabs_lock); | |
536 return ret; | |
537 } | |
538 #endif | |
539 | |
540 /******************************* GLOBAL STATS ******************************/ | |
541 | |
542 void mt_stats_lock() { | |
543 pthread_mutex_lock(&stats_lock); | |
544 } | |
545 | |
546 void mt_stats_unlock() { | |
547 pthread_mutex_unlock(&stats_lock); | |
548 } | |
549 | |
550 /* | |
551 * Initializes the thread subsystem, creating various worker threads. | |
552 * | |
553 * nthreads Number of event handler threads to spawn | |
554 * main_base Event base for main thread | |
555 */ | |
556 void thread_init(int nthreads, struct event_base *main_base) { | |
557 int i; | |
558 | |
559 pthread_mutex_init(&cache_lock, NULL); | |
560 pthread_mutex_init(&conn_lock, NULL); | |
561 pthread_mutex_init(&slabs_lock, NULL); | |
562 pthread_mutex_init(&stats_lock, NULL); | |
563 | |
564 pthread_mutex_init(&init_lock, NULL); | |
565 pthread_cond_init(&init_cond, NULL); | |
566 | |
567 pthread_mutex_init(&cqi_freelist_lock, NULL); | |
568 cqi_freelist = NULL; | |
569 | |
570 threads = malloc(sizeof(LIBEVENT_THREAD) * nthreads); | |
571 if (! threads) { | |
572 perror("Can't allocate thread descriptors"); | |
573 exit(1); | |
574 } | |
575 | |
576 threads[0].base = main_base; | |
577 threads[0].thread_id = pthread_self(); | |
578 | |
579 for (i = 0; i < nthreads; i++) { | |
580 int fds[2]; | |
581 if (pipe(fds)) { | |
582 perror("Can't create notify pipe"); | |
583 exit(1); | |
584 } | |
585 | |
586 threads[i].notify_receive_fd = fds[0]; | |
587 threads[i].notify_send_fd = fds[1]; | |
588 | |
589 setup_thread(&threads[i]); | |
590 } | |
591 | |
592 /* Create threads after we've done all the libevent setup. */ | |
593 for (i = 1; i < nthreads; i++) { | |
594 create_worker(worker_libevent, &threads[i]); | |
595 } | |
596 | |
597 /* Wait for all the threads to set themselves up before returning. */ | |
598 pthread_mutex_lock(&init_lock); | |
599 init_count++; // main thread | |
600 while (init_count < nthreads) { | |
601 pthread_cond_wait(&init_cond, &init_lock); | |
602 } | |
603 pthread_mutex_unlock(&init_lock); | |
604 } | |
605 | |
606 #endif |