comparison src/core/ngx_thread_pool.c @ 6018:466bd63b63d1

Thread pools implementation.
author Valentin Bartenev <vbart@nginx.com>
date Sat, 14 Mar 2015 17:37:07 +0300
parents
children 32099b107191
comparison
equal deleted inserted replaced
6017:83d54192e97b 6018:466bd63b63d1
1
2 /*
3 * Copyright (C) Nginx, Inc.
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) Ruslan Ermilov
6 */
7
8
9 #include <ngx_config.h>
10 #include <ngx_core.h>
11 #include <ngx_thread_pool.h>
12
13
14 typedef struct {
15 ngx_array_t pools;
16 } ngx_thread_pool_conf_t;
17
18
19 typedef struct {
20 ngx_thread_mutex_t mtx;
21 ngx_uint_t count;
22 ngx_thread_task_t *first;
23 ngx_thread_task_t **last;
24 } ngx_thread_pool_queue_t;
25
26
27 struct ngx_thread_pool_s {
28 ngx_thread_cond_t cond;
29
30 ngx_thread_pool_queue_t queue;
31
32 ngx_log_t *log;
33 ngx_pool_t *pool;
34
35 ngx_str_t name;
36 ngx_uint_t threads;
37 ngx_uint_t max_queue;
38
39 u_char *file;
40 ngx_uint_t line;
41 };
42
43
44 static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log,
45 ngx_pool_t *pool);
46 static ngx_int_t ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue,
47 ngx_log_t *log);
48 static ngx_int_t ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue,
49 ngx_log_t *log);
50 static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp);
51
52 static void *ngx_thread_pool_cycle(void *data);
53 static void ngx_thread_pool_handler(ngx_event_t *ev);
54
55 static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
56
57 static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle);
58 static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf);
59
60 static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle);
61 static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle);
62
63
64 static ngx_command_t ngx_thread_pool_commands[] = {
65
66 { ngx_string("thread_pool"),
67 NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23,
68 ngx_thread_pool,
69 0,
70 0,
71 NULL },
72
73 ngx_null_command
74 };
75
76
77 static ngx_core_module_t ngx_thread_pool_module_ctx = {
78 ngx_string("thread_pool"),
79 ngx_thread_pool_create_conf,
80 ngx_thread_pool_init_conf
81 };
82
83
84 ngx_module_t ngx_thread_pool_module = {
85 NGX_MODULE_V1,
86 &ngx_thread_pool_module_ctx, /* module context */
87 ngx_thread_pool_commands, /* module directives */
88 NGX_CORE_MODULE, /* module type */
89 NULL, /* init master */
90 NULL, /* init module */
91 ngx_thread_pool_init_worker, /* init process */
92 NULL, /* init thread */
93 NULL, /* exit thread */
94 ngx_thread_pool_exit_worker, /* exit process */
95 NULL, /* exit master */
96 NGX_MODULE_V1_PADDING
97 };
98
99
100 static ngx_str_t ngx_thread_pool_default = ngx_string("default");
101
102 static ngx_uint_t ngx_thread_pool_task_id;
103 static ngx_thread_pool_queue_t ngx_thread_pool_done;
104
105
106 static ngx_int_t
107 ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
108 {
109 int err;
110 pthread_t tid;
111 ngx_uint_t n;
112 pthread_attr_t attr;
113
114 if (ngx_notify == NULL) {
115 ngx_log_error(NGX_LOG_ALERT, log, 0,
116 "the configured event method cannot be used with thread pools");
117 return NGX_ERROR;
118 }
119
120 if (ngx_thread_pool_queue_init(&tp->queue, log) != NGX_OK) {
121 return NGX_ERROR;
122 }
123
124 if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
125 (void) ngx_thread_pool_queue_destroy(&tp->queue, log);
126 return NGX_ERROR;
127 }
128
129 tp->log = log;
130 tp->pool = pool;
131
132 err = pthread_attr_init(&attr);
133 if (err) {
134 ngx_log_error(NGX_LOG_ALERT, log, err,
135 "pthread_attr_init() failed");
136 return NGX_ERROR;
137 }
138
139 #if 0
140 err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
141 if (err) {
142 ngx_log_error(NGX_LOG_ALERT, log, err,
143 "pthread_attr_setstacksize() failed");
144 return NGX_ERROR;
145 }
146 #endif
147
148 for (n = 0; n < tp->threads; n++) {
149 err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
150 if (err) {
151 ngx_log_error(NGX_LOG_ALERT, log, err,
152 "pthread_create() failed");
153 return NGX_ERROR;
154 }
155 }
156
157 (void) pthread_attr_destroy(&attr);
158
159 return NGX_OK;
160 }
161
162
163 static ngx_int_t
164 ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
165 {
166 queue->count = 0;
167 queue->first = NULL;
168 queue->last = &queue->first;
169
170 return ngx_thread_mutex_create(&queue->mtx, log);
171 }
172
173
174 static ngx_int_t
175 ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
176 {
177 return ngx_thread_mutex_destroy(&queue->mtx, log);
178 }
179
180
181 static void
182 ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
183 {
184 /* TODO: exit threads */
185
186 (void) ngx_thread_cond_destroy(&tp->cond, tp->log);
187 (void) ngx_thread_pool_queue_destroy(&tp->queue, tp->log);
188 }
189
190
191 ngx_thread_task_t *
192 ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
193 {
194 ngx_thread_task_t *task;
195
196 task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size);
197 if (task == NULL) {
198 return NULL;
199 }
200
201 task->ctx = task + 1;
202
203 return task;
204 }
205
206
207 ngx_int_t
208 ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
209 {
210 if (task->event.active) {
211 ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
212 "task #%ui already active", task->id);
213 return NGX_ERROR;
214 }
215
216 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) {
217 return NGX_ERROR;
218 }
219
220 if (tp->queue.count >= tp->max_queue) {
221 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
222
223 ngx_log_error(NGX_LOG_ERR, tp->log, 0,
224 "thread pool \"%V\" queue overflow: %ui tasks waiting",
225 &tp->name, tp->queue.count);
226 return NGX_ERROR;
227 }
228
229 task->event.active = 1;
230
231 task->id = ngx_thread_pool_task_id++;
232 task->next = NULL;
233
234 if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
235 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
236 return NGX_ERROR;
237 }
238
239 *tp->queue.last = task;
240 tp->queue.last = &task->next;
241
242 tp->queue.count++;
243
244 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
245
246 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
247 "task #%ui added to thread pool \"%V\"",
248 task->id, &tp->name);
249
250 return NGX_OK;
251 }
252
253
254 static void *
255 ngx_thread_pool_cycle(void *data)
256 {
257 ngx_thread_pool_t *tp = data;
258
259 int err;
260 sigset_t set;
261 ngx_thread_task_t *task;
262
263 #if 0
264 ngx_time_update();
265 #endif
266
267 ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
268 "thread in pool \"%V\" started", &tp->name);
269
270 sigfillset(&set);
271
272 sigdelset(&set, SIGILL);
273 sigdelset(&set, SIGFPE);
274 sigdelset(&set, SIGSEGV);
275 sigdelset(&set, SIGBUS);
276
277 err = pthread_sigmask(SIG_BLOCK, &set, NULL);
278 if (err) {
279 ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
280 return NULL;
281 }
282
283 for ( ;; ) {
284 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) {
285 return NULL;
286 }
287
288 while (tp->queue.count == 0) {
289 if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log)
290 != NGX_OK)
291 {
292 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
293 return NULL;
294 }
295 }
296
297 tp->queue.count--;
298
299 task = tp->queue.first;
300 tp->queue.first = task->next;
301
302 if (tp->queue.first == NULL) {
303 tp->queue.last = &tp->queue.first;
304 }
305
306 if (ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log) != NGX_OK) {
307 return NULL;
308 }
309
310 #if 0
311 ngx_time_update();
312 #endif
313
314 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
315 "run task #%ui in thread pool \"%V\"",
316 task->id, &tp->name);
317
318 task->handler(task->ctx, tp->log);
319
320 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
321 "complete task #%ui in thread pool \"%V\"",
322 task->id, &tp->name);
323
324 task->next = NULL;
325
326 if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, tp->log)
327 != NGX_OK)
328 {
329 return NULL;
330 }
331
332 *ngx_thread_pool_done.last = task;
333 ngx_thread_pool_done.last = &task->next;
334
335 if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, tp->log)
336 != NGX_OK)
337 {
338 return NULL;
339 }
340
341 (void) ngx_notify(ngx_thread_pool_handler);
342 }
343 }
344
345
346 static void
347 ngx_thread_pool_handler(ngx_event_t *ev)
348 {
349 ngx_event_t *event;
350 ngx_thread_task_t *task;
351
352 ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler");
353
354 if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) {
355 return;
356 }
357
358 task = ngx_thread_pool_done.first;
359 ngx_thread_pool_done.first = NULL;
360 ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
361
362 if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) {
363 return;
364 }
365
366 while (task) {
367 ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
368 "run completion handler for task #%ui", task->id);
369
370 event = &task->event;
371 task = task->next;
372
373 event->complete = 1;
374 event->active = 0;
375
376 event->handler(event);
377 }
378 }
379
380
381 static void *
382 ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
383 {
384 ngx_thread_pool_conf_t *tcf;
385
386 tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
387 if (tcf == NULL) {
388 return NULL;
389 }
390
391 if (ngx_array_init(&tcf->pools, cycle->pool, 4,
392 sizeof(ngx_thread_pool_t *))
393 != NGX_OK)
394 {
395 return NULL;
396 }
397
398 return tcf;
399 }
400
401
402 static char *
403 ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
404 {
405 ngx_thread_pool_conf_t *tcf = conf;
406
407 ngx_uint_t i;
408 ngx_thread_pool_t **tpp;
409
410 tpp = tcf->pools.elts;
411
412 for (i = 0; i < tcf->pools.nelts; i++) {
413
414 if (tpp[i]->threads) {
415 continue;
416 }
417
418 if (tpp[i]->name.len == ngx_thread_pool_default.len
419 && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data,
420 ngx_thread_pool_default.len)
421 == 0)
422 {
423 tpp[i]->threads = 32;
424 tpp[i]->max_queue = 65536;
425 continue;
426 }
427
428 ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
429 "unknown thread pool \"%V\" in %s:%ui",
430 &tpp[i]->name, tpp[i]->file, tpp[i]->line);
431
432 return NGX_CONF_ERROR;
433 }
434
435 return NGX_CONF_OK;
436 }
437
438
439 static char *
440 ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
441 {
442 ngx_str_t *value;
443 ngx_uint_t i;
444 ngx_thread_pool_t *tp;
445
446 value = cf->args->elts;
447
448 tp = ngx_thread_pool_add(cf, &value[1]);
449
450 if (tp == NULL) {
451 return NGX_CONF_ERROR;
452 }
453
454 if (tp->threads) {
455 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
456 "duplicate thread pool \"%V\"", &tp->name);
457 return NGX_CONF_ERROR;
458 }
459
460 tp->max_queue = 65536;
461
462 for (i = 2; i < cf->args->nelts; i++) {
463
464 if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {
465
466 tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8);
467
468 if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) {
469 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
470 "invalid threads value \"%V\"", &value[i]);
471 return NGX_CONF_ERROR;
472 }
473
474 continue;
475 }
476
477 if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
478
479 tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);
480
481 if (tp->max_queue == (ngx_uint_t) NGX_ERROR) {
482 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
483 "invalid max_queue value \"%V\"", &value[i]);
484 return NGX_CONF_ERROR;
485 }
486
487 continue;
488 }
489 }
490
491 if (tp->threads == 0) {
492 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
493 "\"%V\" must have \"threads\" parameter",
494 &cmd->name);
495 return NGX_CONF_ERROR;
496 }
497
498 return NGX_CONF_OK;
499 }
500
501
502 ngx_thread_pool_t *
503 ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name)
504 {
505 ngx_thread_pool_t *tp, **tpp;
506 ngx_thread_pool_conf_t *tcf;
507
508 if (name == NULL) {
509 name = &ngx_thread_pool_default;
510 }
511
512 tp = ngx_thread_pool_get(cf->cycle, name);
513
514 if (tp) {
515 return tp;
516 }
517
518 tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t));
519 if (tp == NULL) {
520 return NULL;
521 }
522
523 tp->name = *name;
524 tp->file = cf->conf_file->file.name.data;
525 tp->line = cf->conf_file->line;
526
527 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
528 ngx_thread_pool_module);
529
530 tpp = ngx_array_push(&tcf->pools);
531 if (tpp == NULL) {
532 return NULL;
533 }
534
535 *tpp = tp;
536
537 return tp;
538 }
539
540
541 ngx_thread_pool_t *
542 ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)
543 {
544 ngx_uint_t i;
545 ngx_thread_pool_t **tpp;
546 ngx_thread_pool_conf_t *tcf;
547
548 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
549 ngx_thread_pool_module);
550
551 tpp = tcf->pools.elts;
552
553 for (i = 0; i < tcf->pools.nelts; i++) {
554
555 if (tpp[i]->name.len == name->len
556 && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0)
557 {
558 return tpp[i];
559 }
560 }
561
562 return NULL;
563 }
564
565
566 static ngx_int_t
567 ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
568 {
569 ngx_uint_t i;
570 ngx_thread_pool_t **tpp;
571 ngx_thread_pool_conf_t *tcf;
572
573 if (ngx_process != NGX_PROCESS_WORKER
574 && ngx_process != NGX_PROCESS_SINGLE)
575 {
576 return NGX_OK;
577 }
578
579 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
580 ngx_thread_pool_module);
581
582 if (tcf == NULL) {
583 return NGX_OK;
584 }
585
586 if (ngx_thread_pool_queue_init(&ngx_thread_pool_done, cycle->log)
587 != NGX_OK)
588 {
589 return NGX_ERROR;
590 }
591
592 tpp = tcf->pools.elts;
593
594 for (i = 0; i < tcf->pools.nelts; i++) {
595 if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
596 return NGX_ERROR;
597 }
598 }
599
600 return NGX_OK;
601 }
602
603
604 static void
605 ngx_thread_pool_exit_worker(ngx_cycle_t *cycle)
606 {
607 ngx_uint_t i;
608 ngx_thread_pool_t **tpp;
609 ngx_thread_pool_conf_t *tcf;
610
611 if (ngx_process != NGX_PROCESS_WORKER
612 && ngx_process != NGX_PROCESS_SINGLE)
613 {
614 return;
615 }
616
617 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
618 ngx_thread_pool_module);
619
620 if (tcf == NULL) {
621 return;
622 }
623
624 tpp = tcf->pools.elts;
625
626 for (i = 0; i < tcf->pools.nelts; i++) {
627 ngx_thread_pool_destroy(tpp[i]);
628 }
629
630 (void) ngx_thread_pool_queue_destroy(&ngx_thread_pool_done, cycle->log);
631 }