Mercurial > hg > nginx
comparison src/core/ngx_thread_pool.c @ 6040:adaedab1e662
Thread pools: keep waiting tasks mutex in ngx_thread_pool_t.
It's not needed for completed tasks queue since the previous change.
No functional changes.
author | Valentin Bartenev <vbart@nginx.com> |
---|---|
date | Mon, 23 Mar 2015 17:51:21 +0300 |
parents | fc36690e7f44 |
children | 2097cd49a158 |
comparison
equal
deleted
inserted
replaced
6039:fc36690e7f44 | 6040:adaedab1e662 |
---|---|
15 ngx_array_t pools; | 15 ngx_array_t pools; |
16 } ngx_thread_pool_conf_t; | 16 } ngx_thread_pool_conf_t; |
17 | 17 |
18 | 18 |
19 typedef struct { | 19 typedef struct { |
20 ngx_thread_mutex_t mtx; | |
21 ngx_thread_task_t *first; | 20 ngx_thread_task_t *first; |
22 ngx_thread_task_t **last; | 21 ngx_thread_task_t **last; |
23 } ngx_thread_pool_queue_t; | 22 } ngx_thread_pool_queue_t; |
24 | 23 |
24 #define ngx_thread_pool_queue_init(q) \ | |
25 (q)->first = NULL; \ | |
26 (q)->last = &(q)->first | |
27 | |
25 | 28 |
26 struct ngx_thread_pool_s { | 29 struct ngx_thread_pool_s { |
30 ngx_thread_mutex_t mtx; | |
27 ngx_thread_pool_queue_t queue; | 31 ngx_thread_pool_queue_t queue; |
28 ngx_int_t waiting; | 32 ngx_int_t waiting; |
29 ngx_thread_cond_t cond; | 33 ngx_thread_cond_t cond; |
30 | 34 |
31 ngx_log_t *log; | 35 ngx_log_t *log; |
40 }; | 44 }; |
41 | 45 |
42 | 46 |
43 static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, | 47 static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, |
44 ngx_pool_t *pool); | 48 ngx_pool_t *pool); |
45 static ngx_int_t ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, | |
46 ngx_log_t *log); | |
47 static ngx_int_t ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, | |
48 ngx_log_t *log); | |
49 static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp); | 49 static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp); |
50 | 50 |
51 static void *ngx_thread_pool_cycle(void *data); | 51 static void *ngx_thread_pool_cycle(void *data); |
52 static void ngx_thread_pool_handler(ngx_event_t *ev); | 52 static void ngx_thread_pool_handler(ngx_event_t *ev); |
53 | 53 |
115 ngx_log_error(NGX_LOG_ALERT, log, 0, | 115 ngx_log_error(NGX_LOG_ALERT, log, 0, |
116 "the configured event method cannot be used with thread pools"); | 116 "the configured event method cannot be used with thread pools"); |
117 return NGX_ERROR; | 117 return NGX_ERROR; |
118 } | 118 } |
119 | 119 |
120 if (ngx_thread_pool_queue_init(&tp->queue, log) != NGX_OK) { | 120 ngx_thread_pool_queue_init(&tp->queue); |
121 | |
122 if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) { | |
121 return NGX_ERROR; | 123 return NGX_ERROR; |
122 } | 124 } |
123 | 125 |
124 if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) { | 126 if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) { |
125 (void) ngx_thread_pool_queue_destroy(&tp->queue, log); | 127 (void) ngx_thread_mutex_destroy(&tp->mtx, log); |
126 return NGX_ERROR; | 128 return NGX_ERROR; |
127 } | 129 } |
128 | 130 |
129 tp->log = log; | 131 tp->log = log; |
130 tp->pool = pool; | 132 tp->pool = pool; |
158 | 160 |
159 return NGX_OK; | 161 return NGX_OK; |
160 } | 162 } |
161 | 163 |
162 | 164 |
163 static ngx_int_t | |
164 ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, ngx_log_t *log) | |
165 { | |
166 queue->first = NULL; | |
167 queue->last = &queue->first; | |
168 | |
169 return ngx_thread_mutex_create(&queue->mtx, log); | |
170 } | |
171 | |
172 | |
173 static ngx_int_t | |
174 ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, ngx_log_t *log) | |
175 { | |
176 #if 0 | |
177 return ngx_thread_mutex_destroy(&queue->mtx, log); | |
178 #else | |
179 return NGX_OK; | |
180 #endif | |
181 } | |
182 | |
183 | |
184 static void | 165 static void |
185 ngx_thread_pool_destroy(ngx_thread_pool_t *tp) | 166 ngx_thread_pool_destroy(ngx_thread_pool_t *tp) |
186 { | 167 { |
187 /* TODO: exit threads */ | 168 /* TODO: exit threads */ |
188 | 169 |
189 #if 0 | 170 #if 0 |
190 (void) ngx_thread_cond_destroy(&tp->cond, tp->log); | 171 (void) ngx_thread_cond_destroy(&tp->cond, tp->log); |
191 #endif | 172 |
192 | 173 (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log); |
193 (void) ngx_thread_pool_queue_destroy(&tp->queue, tp->log); | 174 #endif |
194 } | 175 } |
195 | 176 |
196 | 177 |
197 ngx_thread_task_t * | 178 ngx_thread_task_t * |
198 ngx_thread_task_alloc(ngx_pool_t *pool, size_t size) | 179 ngx_thread_task_alloc(ngx_pool_t *pool, size_t size) |
217 ngx_log_error(NGX_LOG_ALERT, tp->log, 0, | 198 ngx_log_error(NGX_LOG_ALERT, tp->log, 0, |
218 "task #%ui already active", task->id); | 199 "task #%ui already active", task->id); |
219 return NGX_ERROR; | 200 return NGX_ERROR; |
220 } | 201 } |
221 | 202 |
222 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { | 203 if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { |
223 return NGX_ERROR; | 204 return NGX_ERROR; |
224 } | 205 } |
225 | 206 |
226 if (tp->waiting >= tp->max_queue) { | 207 if (tp->waiting >= tp->max_queue) { |
227 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | 208 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); |
228 | 209 |
229 ngx_log_error(NGX_LOG_ERR, tp->log, 0, | 210 ngx_log_error(NGX_LOG_ERR, tp->log, 0, |
230 "thread pool \"%V\" queue overflow: %i tasks waiting", | 211 "thread pool \"%V\" queue overflow: %i tasks waiting", |
231 &tp->name, tp->waiting); | 212 &tp->name, tp->waiting); |
232 return NGX_ERROR; | 213 return NGX_ERROR; |
236 | 217 |
237 task->id = ngx_thread_pool_task_id++; | 218 task->id = ngx_thread_pool_task_id++; |
238 task->next = NULL; | 219 task->next = NULL; |
239 | 220 |
240 if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) { | 221 if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) { |
241 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | 222 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); |
242 return NGX_ERROR; | 223 return NGX_ERROR; |
243 } | 224 } |
244 | 225 |
245 *tp->queue.last = task; | 226 *tp->queue.last = task; |
246 tp->queue.last = &task->next; | 227 tp->queue.last = &task->next; |
247 | 228 |
248 tp->waiting++; | 229 tp->waiting++; |
249 | 230 |
250 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | 231 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); |
251 | 232 |
252 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, | 233 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, |
253 "task #%ui added to thread pool \"%V\"", | 234 "task #%ui added to thread pool \"%V\"", |
254 task->id, &tp->name); | 235 task->id, &tp->name); |
255 | 236 |
285 ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed"); | 266 ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed"); |
286 return NULL; | 267 return NULL; |
287 } | 268 } |
288 | 269 |
289 for ( ;; ) { | 270 for ( ;; ) { |
290 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { | 271 if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { |
291 return NULL; | 272 return NULL; |
292 } | 273 } |
293 | 274 |
294 /* the number may become negative */ | 275 /* the number may become negative */ |
295 tp->waiting--; | 276 tp->waiting--; |
296 | 277 |
297 while (tp->queue.first == NULL) { | 278 while (tp->queue.first == NULL) { |
298 if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log) | 279 if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log) |
299 != NGX_OK) | 280 != NGX_OK) |
300 { | 281 { |
301 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); | 282 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); |
302 return NULL; | 283 return NULL; |
303 } | 284 } |
304 } | 285 } |
305 | 286 |
306 task = tp->queue.first; | 287 task = tp->queue.first; |
308 | 289 |
309 if (tp->queue.first == NULL) { | 290 if (tp->queue.first == NULL) { |
310 tp->queue.last = &tp->queue.first; | 291 tp->queue.last = &tp->queue.first; |
311 } | 292 } |
312 | 293 |
313 if (ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log) != NGX_OK) { | 294 if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) { |
314 return NULL; | 295 return NULL; |
315 } | 296 } |
316 | 297 |
317 #if 0 | 298 #if 0 |
318 ngx_time_update(); | 299 ngx_time_update(); |
576 | 557 |
577 if (tcf == NULL) { | 558 if (tcf == NULL) { |
578 return NGX_OK; | 559 return NGX_OK; |
579 } | 560 } |
580 | 561 |
581 if (ngx_thread_pool_queue_init(&ngx_thread_pool_done, cycle->log) | 562 ngx_thread_pool_queue_init(&ngx_thread_pool_done); |
582 != NGX_OK) | |
583 { | |
584 return NGX_ERROR; | |
585 } | |
586 | 563 |
587 tpp = tcf->pools.elts; | 564 tpp = tcf->pools.elts; |
588 | 565 |
589 for (i = 0; i < tcf->pools.nelts; i++) { | 566 for (i = 0; i < tcf->pools.nelts; i++) { |
590 if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) { | 567 if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) { |
619 tpp = tcf->pools.elts; | 596 tpp = tcf->pools.elts; |
620 | 597 |
621 for (i = 0; i < tcf->pools.nelts; i++) { | 598 for (i = 0; i < tcf->pools.nelts; i++) { |
622 ngx_thread_pool_destroy(tpp[i]); | 599 ngx_thread_pool_destroy(tpp[i]); |
623 } | 600 } |
624 | 601 } |
625 (void) ngx_thread_pool_queue_destroy(&ngx_thread_pool_done, cycle->log); | |
626 } |