comparison src/core/ngx_thread_pool.c @ 6026:25fda43e3bcb

Thread pools: fixed the waiting tasks accounting. Behave like POSIX semaphores. If N worker threads are waiting for tasks, at least that number of tasks should be allowed to be put into the queue.
author Ruslan Ermilov <ru@nginx.com>
date Thu, 19 Mar 2015 13:00:48 +0300
parents 32099b107191
children 67717d4e4f47
comparison
equal deleted inserted replaced
6025:32099b107191 6026:25fda43e3bcb
23 } ngx_thread_pool_queue_t; 23 } ngx_thread_pool_queue_t;
24 24
25 25
26 struct ngx_thread_pool_s { 26 struct ngx_thread_pool_s {
27 ngx_thread_pool_queue_t queue; 27 ngx_thread_pool_queue_t queue;
28 ngx_uint_t waiting; 28 ngx_int_t waiting;
29 ngx_thread_cond_t cond; 29 ngx_thread_cond_t cond;
30 30
31 ngx_log_t *log; 31 ngx_log_t *log;
32 ngx_pool_t *pool; 32 ngx_pool_t *pool;
33 33
34 ngx_str_t name; 34 ngx_str_t name;
35 ngx_uint_t threads; 35 ngx_uint_t threads;
36 ngx_uint_t max_queue; 36 ngx_int_t max_queue;
37 37
38 u_char *file; 38 u_char *file;
39 ngx_uint_t line; 39 ngx_uint_t line;
40 }; 40 };
41 41
217 217
218 if (tp->waiting >= tp->max_queue) { 218 if (tp->waiting >= tp->max_queue) {
219 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); 219 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
220 220
221 ngx_log_error(NGX_LOG_ERR, tp->log, 0, 221 ngx_log_error(NGX_LOG_ERR, tp->log, 0,
222 "thread pool \"%V\" queue overflow: %ui tasks waiting", 222 "thread pool \"%V\" queue overflow: %i tasks waiting",
223 &tp->name, tp->waiting); 223 &tp->name, tp->waiting);
224 return NGX_ERROR; 224 return NGX_ERROR;
225 } 225 }
226 226
227 task->event.active = 1; 227 task->event.active = 1;
281 for ( ;; ) { 281 for ( ;; ) {
282 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { 282 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) {
283 return NULL; 283 return NULL;
284 } 284 }
285 285
286 while (tp->waiting == 0) { 286 /* the number may become negative */
287 tp->waiting--;
288
289 while (tp->queue.first == NULL) {
287 if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log) 290 if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log)
288 != NGX_OK) 291 != NGX_OK)
289 { 292 {
290 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); 293 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
291 return NULL; 294 return NULL;
292 } 295 }
293 } 296 }
294
295 tp->waiting--;
296 297
297 task = tp->queue.first; 298 task = tp->queue.first;
298 tp->queue.first = task->next; 299 tp->queue.first = task->next;
299 300
300 if (tp->queue.first == NULL) { 301 if (tp->queue.first == NULL) {
474 475
475 if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) { 476 if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
476 477
477 tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10); 478 tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);
478 479
479 if (tp->max_queue == (ngx_uint_t) NGX_ERROR) { 480 if (tp->max_queue == NGX_ERROR) {
480 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, 481 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
481 "invalid max_queue value \"%V\"", &value[i]); 482 "invalid max_queue value \"%V\"", &value[i]);
482 return NGX_CONF_ERROR; 483 return NGX_CONF_ERROR;
483 } 484 }
484 485