Mercurial > hg > nginx-quic
comparison src/core/ngx_thread_pool.c @ 6018:466bd63b63d1
Thread pools implementation.
author | Valentin Bartenev <vbart@nginx.com> |
---|---|
date | Sat, 14 Mar 2015 17:37:07 +0300 |
parents | |
children | 32099b107191 |
comparison
equal
deleted
inserted
replaced
6017:83d54192e97b | 6018:466bd63b63d1 |
---|---|
1 | |
2 /* | |
3 * Copyright (C) Nginx, Inc. | |
4 * Copyright (C) Valentin V. Bartenev | |
5 * Copyright (C) Ruslan Ermilov | |
6 */ | |
7 | |
8 | |
9 #include <ngx_config.h> | |
10 #include <ngx_core.h> | |
11 #include <ngx_thread_pool.h> | |
12 | |
13 | |
14 typedef struct { | |
15 ngx_array_t pools; | |
16 } ngx_thread_pool_conf_t; | |
17 | |
18 | |
19 typedef struct { | |
20 ngx_thread_mutex_t mtx; | |
21 ngx_uint_t count; | |
22 ngx_thread_task_t *first; | |
23 ngx_thread_task_t **last; | |
24 } ngx_thread_pool_queue_t; | |
25 | |
26 | |
27 struct ngx_thread_pool_s { | |
28 ngx_thread_cond_t cond; | |
29 | |
30 ngx_thread_pool_queue_t queue; | |
31 | |
32 ngx_log_t *log; | |
33 ngx_pool_t *pool; | |
34 | |
35 ngx_str_t name; | |
36 ngx_uint_t threads; | |
37 ngx_uint_t max_queue; | |
38 | |
39 u_char *file; | |
40 ngx_uint_t line; | |
41 }; | |
42 | |
43 | |
44 static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, | |
45 ngx_pool_t *pool); | |
46 static ngx_int_t ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, | |
47 ngx_log_t *log); | |
48 static ngx_int_t ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, | |
49 ngx_log_t *log); | |
50 static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp); | |
51 | |
52 static void *ngx_thread_pool_cycle(void *data); | |
53 static void ngx_thread_pool_handler(ngx_event_t *ev); | |
54 | |
55 static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); | |
56 | |
57 static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle); | |
58 static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf); | |
59 | |
60 static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle); | |
61 static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle); | |
62 | |
63 | |
64 static ngx_command_t ngx_thread_pool_commands[] = { | |
65 | |
66 { ngx_string("thread_pool"), | |
67 NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23, | |
68 ngx_thread_pool, | |
69 0, | |
70 0, | |
71 NULL }, | |
72 | |
73 ngx_null_command | |
74 }; | |
75 | |
76 | |
77 static ngx_core_module_t ngx_thread_pool_module_ctx = { | |
78 ngx_string("thread_pool"), | |
79 ngx_thread_pool_create_conf, | |
80 ngx_thread_pool_init_conf | |
81 }; | |
82 | |
83 | |
84 ngx_module_t ngx_thread_pool_module = { | |
85 NGX_MODULE_V1, | |
86 &ngx_thread_pool_module_ctx, /* module context */ | |
87 ngx_thread_pool_commands, /* module directives */ | |
88 NGX_CORE_MODULE, /* module type */ | |
89 NULL, /* init master */ | |
90 NULL, /* init module */ | |
91 ngx_thread_pool_init_worker, /* init process */ | |
92 NULL, /* init thread */ | |
93 NULL, /* exit thread */ | |
94 ngx_thread_pool_exit_worker, /* exit process */ | |
95 NULL, /* exit master */ | |
96 NGX_MODULE_V1_PADDING | |
97 }; | |
98 | |
99 | |
100 static ngx_str_t ngx_thread_pool_default = ngx_string("default"); | |
101 | |
102 static ngx_uint_t ngx_thread_pool_task_id; | |
103 static ngx_thread_pool_queue_t ngx_thread_pool_done; | |
104 | |
105 | |
106 static ngx_int_t | |
107 ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool) | |
108 { | |
109 int err; | |
110 pthread_t tid; | |
111 ngx_uint_t n; | |
112 pthread_attr_t attr; | |
113 | |
114 if (ngx_notify == NULL) { | |
115 ngx_log_error(NGX_LOG_ALERT, log, 0, | |
116 "the configured event method cannot be used with thread pools"); | |
117 return NGX_ERROR; | |
118 } | |
119 | |
120 if (ngx_thread_pool_queue_init(&tp->queue, log) != NGX_OK) { | |
121 return NGX_ERROR; | |
122 } | |
123 | |
124 if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) { | |
125 (void) ngx_thread_pool_queue_destroy(&tp->queue, log); | |
126 return NGX_ERROR; | |
127 } | |
128 | |
129 tp->log = log; | |
130 tp->pool = pool; | |
131 | |
132 err = pthread_attr_init(&attr); | |
133 if (err) { | |
134 ngx_log_error(NGX_LOG_ALERT, log, err, | |
135 "pthread_attr_init() failed"); | |
136 return NGX_ERROR; | |
137 } | |
138 | |
139 #if 0 | |
140 err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); | |
141 if (err) { | |
142 ngx_log_error(NGX_LOG_ALERT, log, err, | |
143 "pthread_attr_setstacksize() failed"); | |
144 return NGX_ERROR; | |
145 } | |
146 #endif | |
147 | |
148 for (n = 0; n < tp->threads; n++) { | |
149 err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp); | |
150 if (err) { | |
151 ngx_log_error(NGX_LOG_ALERT, log, err, | |
152 "pthread_create() failed"); | |
153 return NGX_ERROR; | |
154 } | |
155 } | |
156 | |
157 (void) pthread_attr_destroy(&attr); | |
158 | |
159 return NGX_OK; | |
160 } | |
161 | |
162 | |
163 static ngx_int_t | |
164 ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, ngx_log_t *log) | |
165 { | |
166 queue->count = 0; | |
167 queue->first = NULL; | |
168 queue->last = &queue->first; | |
169 | |
170 return ngx_thread_mutex_create(&queue->mtx, log); | |
171 } | |
172 | |
173 | |
174 static ngx_int_t | |
175 ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, ngx_log_t *log) | |
176 { | |
177 return ngx_thread_mutex_destroy(&queue->mtx, log); | |
178 } | |
179 | |
180 | |
181 static void | |
182 ngx_thread_pool_destroy(ngx_thread_pool_t *tp) | |
183 { | |
184 /* TODO: exit threads */ | |
185 | |
186 (void) ngx_thread_cond_destroy(&tp->cond, tp->log); | |
187 (void) ngx_thread_pool_queue_destroy(&tp->queue, tp->log); | |
188 } | |
189 | |
190 | |
191 ngx_thread_task_t * | |
192 ngx_thread_task_alloc(ngx_pool_t *pool, size_t size) | |
193 { | |
194 ngx_thread_task_t *task; | |
195 | |
196 task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size); | |
197 if (task == NULL) { | |
198 return NULL; | |
199 } | |
200 | |
201 task->ctx = task + 1; | |
202 | |
203 return task; | |
204 } | |
205 | |
206 | |
207 ngx_int_t | |
208 ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task) | |
209 { | |
210 if (task->event.active) { | |
211 ngx_log_error(NGX_LOG_ALERT, tp->log, 0, | |
212 "task #%ui already active", task->id); | |
213 return NGX_ERROR; | |
214 } | |
215 | |
216 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { | |
217 return NGX_ERROR; | |
218 } | |
219 | |
220 if (tp->queue.count >= tp->max_queue) { | |
221 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | |
222 | |
223 ngx_log_error(NGX_LOG_ERR, tp->log, 0, | |
224 "thread pool \"%V\" queue overflow: %ui tasks waiting", | |
225 &tp->name, tp->queue.count); | |
226 return NGX_ERROR; | |
227 } | |
228 | |
229 task->event.active = 1; | |
230 | |
231 task->id = ngx_thread_pool_task_id++; | |
232 task->next = NULL; | |
233 | |
234 if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) { | |
235 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | |
236 return NGX_ERROR; | |
237 } | |
238 | |
239 *tp->queue.last = task; | |
240 tp->queue.last = &task->next; | |
241 | |
242 tp->queue.count++; | |
243 | |
244 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | |
245 | |
246 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, | |
247 "task #%ui added to thread pool \"%V\"", | |
248 task->id, &tp->name); | |
249 | |
250 return NGX_OK; | |
251 } | |
252 | |
253 | |
254 static void * | |
255 ngx_thread_pool_cycle(void *data) | |
256 { | |
257 ngx_thread_pool_t *tp = data; | |
258 | |
259 int err; | |
260 sigset_t set; | |
261 ngx_thread_task_t *task; | |
262 | |
263 #if 0 | |
264 ngx_time_update(); | |
265 #endif | |
266 | |
267 ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0, | |
268 "thread in pool \"%V\" started", &tp->name); | |
269 | |
270 sigfillset(&set); | |
271 | |
272 sigdelset(&set, SIGILL); | |
273 sigdelset(&set, SIGFPE); | |
274 sigdelset(&set, SIGSEGV); | |
275 sigdelset(&set, SIGBUS); | |
276 | |
277 err = pthread_sigmask(SIG_BLOCK, &set, NULL); | |
278 if (err) { | |
279 ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed"); | |
280 return NULL; | |
281 } | |
282 | |
283 for ( ;; ) { | |
284 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { | |
285 return NULL; | |
286 } | |
287 | |
288 while (tp->queue.count == 0) { | |
289 if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log) | |
290 != NGX_OK) | |
291 { | |
292 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | |
293 return NULL; | |
294 } | |
295 } | |
296 | |
297 tp->queue.count--; | |
298 | |
299 task = tp->queue.first; | |
300 tp->queue.first = task->next; | |
301 | |
302 if (tp->queue.first == NULL) { | |
303 tp->queue.last = &tp->queue.first; | |
304 } | |
305 | |
306 if (ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log) != NGX_OK) { | |
307 return NULL; | |
308 } | |
309 | |
310 #if 0 | |
311 ngx_time_update(); | |
312 #endif | |
313 | |
314 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, | |
315 "run task #%ui in thread pool \"%V\"", | |
316 task->id, &tp->name); | |
317 | |
318 task->handler(task->ctx, tp->log); | |
319 | |
320 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, | |
321 "complete task #%ui in thread pool \"%V\"", | |
322 task->id, &tp->name); | |
323 | |
324 task->next = NULL; | |
325 | |
326 if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, tp->log) | |
327 != NGX_OK) | |
328 { | |
329 return NULL; | |
330 } | |
331 | |
332 *ngx_thread_pool_done.last = task; | |
333 ngx_thread_pool_done.last = &task->next; | |
334 | |
335 if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, tp->log) | |
336 != NGX_OK) | |
337 { | |
338 return NULL; | |
339 } | |
340 | |
341 (void) ngx_notify(ngx_thread_pool_handler); | |
342 } | |
343 } | |
344 | |
345 | |
346 static void | |
347 ngx_thread_pool_handler(ngx_event_t *ev) | |
348 { | |
349 ngx_event_t *event; | |
350 ngx_thread_task_t *task; | |
351 | |
352 ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler"); | |
353 | |
354 if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) { | |
355 return; | |
356 } | |
357 | |
358 task = ngx_thread_pool_done.first; | |
359 ngx_thread_pool_done.first = NULL; | |
360 ngx_thread_pool_done.last = &ngx_thread_pool_done.first; | |
361 | |
362 if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) { | |
363 return; | |
364 } | |
365 | |
366 while (task) { | |
367 ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, | |
368 "run completion handler for task #%ui", task->id); | |
369 | |
370 event = &task->event; | |
371 task = task->next; | |
372 | |
373 event->complete = 1; | |
374 event->active = 0; | |
375 | |
376 event->handler(event); | |
377 } | |
378 } | |
379 | |
380 | |
381 static void * | |
382 ngx_thread_pool_create_conf(ngx_cycle_t *cycle) | |
383 { | |
384 ngx_thread_pool_conf_t *tcf; | |
385 | |
386 tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t)); | |
387 if (tcf == NULL) { | |
388 return NULL; | |
389 } | |
390 | |
391 if (ngx_array_init(&tcf->pools, cycle->pool, 4, | |
392 sizeof(ngx_thread_pool_t *)) | |
393 != NGX_OK) | |
394 { | |
395 return NULL; | |
396 } | |
397 | |
398 return tcf; | |
399 } | |
400 | |
401 | |
402 static char * | |
403 ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf) | |
404 { | |
405 ngx_thread_pool_conf_t *tcf = conf; | |
406 | |
407 ngx_uint_t i; | |
408 ngx_thread_pool_t **tpp; | |
409 | |
410 tpp = tcf->pools.elts; | |
411 | |
412 for (i = 0; i < tcf->pools.nelts; i++) { | |
413 | |
414 if (tpp[i]->threads) { | |
415 continue; | |
416 } | |
417 | |
418 if (tpp[i]->name.len == ngx_thread_pool_default.len | |
419 && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data, | |
420 ngx_thread_pool_default.len) | |
421 == 0) | |
422 { | |
423 tpp[i]->threads = 32; | |
424 tpp[i]->max_queue = 65536; | |
425 continue; | |
426 } | |
427 | |
428 ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, | |
429 "unknown thread pool \"%V\" in %s:%ui", | |
430 &tpp[i]->name, tpp[i]->file, tpp[i]->line); | |
431 | |
432 return NGX_CONF_ERROR; | |
433 } | |
434 | |
435 return NGX_CONF_OK; | |
436 } | |
437 | |
438 | |
439 static char * | |
440 ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) | |
441 { | |
442 ngx_str_t *value; | |
443 ngx_uint_t i; | |
444 ngx_thread_pool_t *tp; | |
445 | |
446 value = cf->args->elts; | |
447 | |
448 tp = ngx_thread_pool_add(cf, &value[1]); | |
449 | |
450 if (tp == NULL) { | |
451 return NGX_CONF_ERROR; | |
452 } | |
453 | |
454 if (tp->threads) { | |
455 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, | |
456 "duplicate thread pool \"%V\"", &tp->name); | |
457 return NGX_CONF_ERROR; | |
458 } | |
459 | |
460 tp->max_queue = 65536; | |
461 | |
462 for (i = 2; i < cf->args->nelts; i++) { | |
463 | |
464 if (ngx_strncmp(value[i].data, "threads=", 8) == 0) { | |
465 | |
466 tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8); | |
467 | |
468 if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) { | |
469 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, | |
470 "invalid threads value \"%V\"", &value[i]); | |
471 return NGX_CONF_ERROR; | |
472 } | |
473 | |
474 continue; | |
475 } | |
476 | |
477 if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) { | |
478 | |
479 tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10); | |
480 | |
481 if (tp->max_queue == (ngx_uint_t) NGX_ERROR) { | |
482 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, | |
483 "invalid max_queue value \"%V\"", &value[i]); | |
484 return NGX_CONF_ERROR; | |
485 } | |
486 | |
487 continue; | |
488 } | |
489 } | |
490 | |
491 if (tp->threads == 0) { | |
492 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, | |
493 "\"%V\" must have \"threads\" parameter", | |
494 &cmd->name); | |
495 return NGX_CONF_ERROR; | |
496 } | |
497 | |
498 return NGX_CONF_OK; | |
499 } | |
500 | |
501 | |
502 ngx_thread_pool_t * | |
503 ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name) | |
504 { | |
505 ngx_thread_pool_t *tp, **tpp; | |
506 ngx_thread_pool_conf_t *tcf; | |
507 | |
508 if (name == NULL) { | |
509 name = &ngx_thread_pool_default; | |
510 } | |
511 | |
512 tp = ngx_thread_pool_get(cf->cycle, name); | |
513 | |
514 if (tp) { | |
515 return tp; | |
516 } | |
517 | |
518 tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t)); | |
519 if (tp == NULL) { | |
520 return NULL; | |
521 } | |
522 | |
523 tp->name = *name; | |
524 tp->file = cf->conf_file->file.name.data; | |
525 tp->line = cf->conf_file->line; | |
526 | |
527 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, | |
528 ngx_thread_pool_module); | |
529 | |
530 tpp = ngx_array_push(&tcf->pools); | |
531 if (tpp == NULL) { | |
532 return NULL; | |
533 } | |
534 | |
535 *tpp = tp; | |
536 | |
537 return tp; | |
538 } | |
539 | |
540 | |
541 ngx_thread_pool_t * | |
542 ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name) | |
543 { | |
544 ngx_uint_t i; | |
545 ngx_thread_pool_t **tpp; | |
546 ngx_thread_pool_conf_t *tcf; | |
547 | |
548 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, | |
549 ngx_thread_pool_module); | |
550 | |
551 tpp = tcf->pools.elts; | |
552 | |
553 for (i = 0; i < tcf->pools.nelts; i++) { | |
554 | |
555 if (tpp[i]->name.len == name->len | |
556 && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0) | |
557 { | |
558 return tpp[i]; | |
559 } | |
560 } | |
561 | |
562 return NULL; | |
563 } | |
564 | |
565 | |
566 static ngx_int_t | |
567 ngx_thread_pool_init_worker(ngx_cycle_t *cycle) | |
568 { | |
569 ngx_uint_t i; | |
570 ngx_thread_pool_t **tpp; | |
571 ngx_thread_pool_conf_t *tcf; | |
572 | |
573 if (ngx_process != NGX_PROCESS_WORKER | |
574 && ngx_process != NGX_PROCESS_SINGLE) | |
575 { | |
576 return NGX_OK; | |
577 } | |
578 | |
579 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, | |
580 ngx_thread_pool_module); | |
581 | |
582 if (tcf == NULL) { | |
583 return NGX_OK; | |
584 } | |
585 | |
586 if (ngx_thread_pool_queue_init(&ngx_thread_pool_done, cycle->log) | |
587 != NGX_OK) | |
588 { | |
589 return NGX_ERROR; | |
590 } | |
591 | |
592 tpp = tcf->pools.elts; | |
593 | |
594 for (i = 0; i < tcf->pools.nelts; i++) { | |
595 if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) { | |
596 return NGX_ERROR; | |
597 } | |
598 } | |
599 | |
600 return NGX_OK; | |
601 } | |
602 | |
603 | |
604 static void | |
605 ngx_thread_pool_exit_worker(ngx_cycle_t *cycle) | |
606 { | |
607 ngx_uint_t i; | |
608 ngx_thread_pool_t **tpp; | |
609 ngx_thread_pool_conf_t *tcf; | |
610 | |
611 if (ngx_process != NGX_PROCESS_WORKER | |
612 && ngx_process != NGX_PROCESS_SINGLE) | |
613 { | |
614 return; | |
615 } | |
616 | |
617 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, | |
618 ngx_thread_pool_module); | |
619 | |
620 if (tcf == NULL) { | |
621 return; | |
622 } | |
623 | |
624 tpp = tcf->pools.elts; | |
625 | |
626 for (i = 0; i < tcf->pools.nelts; i++) { | |
627 ngx_thread_pool_destroy(tpp[i]); | |
628 } | |
629 | |
630 (void) ngx_thread_pool_queue_destroy(&ngx_thread_pool_done, cycle->log); | |
631 } |