Mercurial > hg > nginx-quic
comparison src/core/ngx_thread_pool.c @ 6026:25fda43e3bcb
Thread pools: fixed the waiting tasks accounting.
Behave like POSIX semaphores. If N worker threads are waiting for tasks,
at least that number of tasks should be allowed to be put into the queue.
author | Ruslan Ermilov <ru@nginx.com> |
---|---|
date | Thu, 19 Mar 2015 13:00:48 +0300 |
parents | 32099b107191 |
children | 67717d4e4f47 |
comparison
equal
deleted
inserted
replaced
6025:32099b107191 | 6026:25fda43e3bcb |
---|---|
23 } ngx_thread_pool_queue_t; | 23 } ngx_thread_pool_queue_t; |
24 | 24 |
25 | 25 |
26 struct ngx_thread_pool_s { | 26 struct ngx_thread_pool_s { |
27 ngx_thread_pool_queue_t queue; | 27 ngx_thread_pool_queue_t queue; |
28 ngx_uint_t waiting; | 28 ngx_int_t waiting; |
29 ngx_thread_cond_t cond; | 29 ngx_thread_cond_t cond; |
30 | 30 |
31 ngx_log_t *log; | 31 ngx_log_t *log; |
32 ngx_pool_t *pool; | 32 ngx_pool_t *pool; |
33 | 33 |
34 ngx_str_t name; | 34 ngx_str_t name; |
35 ngx_uint_t threads; | 35 ngx_uint_t threads; |
36 ngx_uint_t max_queue; | 36 ngx_int_t max_queue; |
37 | 37 |
38 u_char *file; | 38 u_char *file; |
39 ngx_uint_t line; | 39 ngx_uint_t line; |
40 }; | 40 }; |
41 | 41 |
217 | 217 |
218 if (tp->waiting >= tp->max_queue) { | 218 if (tp->waiting >= tp->max_queue) { |
219 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | 219 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); |
220 | 220 |
221 ngx_log_error(NGX_LOG_ERR, tp->log, 0, | 221 ngx_log_error(NGX_LOG_ERR, tp->log, 0, |
222 "thread pool \"%V\" queue overflow: %ui tasks waiting", | 222 "thread pool \"%V\" queue overflow: %i tasks waiting", |
223 &tp->name, tp->waiting); | 223 &tp->name, tp->waiting); |
224 return NGX_ERROR; | 224 return NGX_ERROR; |
225 } | 225 } |
226 | 226 |
227 task->event.active = 1; | 227 task->event.active = 1; |
281 for ( ;; ) { | 281 for ( ;; ) { |
282 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { | 282 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { |
283 return NULL; | 283 return NULL; |
284 } | 284 } |
285 | 285 |
286 while (tp->waiting == 0) { | 286 /* the number may become negative */ |
287 tp->waiting--; | |
288 | |
289 while (tp->queue.first == NULL) { | |
287 if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log) | 290 if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log) |
288 != NGX_OK) | 291 != NGX_OK) |
289 { | 292 { |
290 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | 293 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); |
291 return NULL; | 294 return NULL; |
292 } | 295 } |
293 } | 296 } |
294 | |
295 tp->waiting--; | |
296 | 297 |
297 task = tp->queue.first; | 298 task = tp->queue.first; |
298 tp->queue.first = task->next; | 299 tp->queue.first = task->next; |
299 | 300 |
300 if (tp->queue.first == NULL) { | 301 if (tp->queue.first == NULL) { |
474 | 475 |
475 if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) { | 476 if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) { |
476 | 477 |
477 tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10); | 478 tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10); |
478 | 479 |
479 if (tp->max_queue == (ngx_uint_t) NGX_ERROR) { | 480 if (tp->max_queue == NGX_ERROR) { |
480 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, | 481 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, |
481 "invalid max_queue value \"%V\"", &value[i]); | 482 "invalid max_queue value \"%V\"", &value[i]); |
482 return NGX_CONF_ERROR; | 483 return NGX_CONF_ERROR; |
483 } | 484 } |
484 | 485 |