comparison src/core/ngx_thread_pool.c @ 6040:adaedab1e662

Thread pools: keep waiting tasks mutex in ngx_thread_pool_t. It's not needed for completed tasks queue since the previous change. No functional changes.
author Valentin Bartenev <vbart@nginx.com>
date Mon, 23 Mar 2015 17:51:21 +0300
parents fc36690e7f44
children 2097cd49a158
comparison
equal deleted inserted replaced
6039:fc36690e7f44 6040:adaedab1e662
15 ngx_array_t pools; 15 ngx_array_t pools;
16 } ngx_thread_pool_conf_t; 16 } ngx_thread_pool_conf_t;
17 17
18 18
19 typedef struct { 19 typedef struct {
20 ngx_thread_mutex_t mtx;
21 ngx_thread_task_t *first; 20 ngx_thread_task_t *first;
22 ngx_thread_task_t **last; 21 ngx_thread_task_t **last;
23 } ngx_thread_pool_queue_t; 22 } ngx_thread_pool_queue_t;
24 23
24 #define ngx_thread_pool_queue_init(q) \
25 (q)->first = NULL; \
26 (q)->last = &(q)->first
27
25 28
26 struct ngx_thread_pool_s { 29 struct ngx_thread_pool_s {
30 ngx_thread_mutex_t mtx;
27 ngx_thread_pool_queue_t queue; 31 ngx_thread_pool_queue_t queue;
28 ngx_int_t waiting; 32 ngx_int_t waiting;
29 ngx_thread_cond_t cond; 33 ngx_thread_cond_t cond;
30 34
31 ngx_log_t *log; 35 ngx_log_t *log;
40 }; 44 };
41 45
42 46
43 static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, 47 static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log,
44 ngx_pool_t *pool); 48 ngx_pool_t *pool);
45 static ngx_int_t ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue,
46 ngx_log_t *log);
47 static ngx_int_t ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue,
48 ngx_log_t *log);
49 static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp); 49 static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp);
50 50
51 static void *ngx_thread_pool_cycle(void *data); 51 static void *ngx_thread_pool_cycle(void *data);
52 static void ngx_thread_pool_handler(ngx_event_t *ev); 52 static void ngx_thread_pool_handler(ngx_event_t *ev);
53 53
115 ngx_log_error(NGX_LOG_ALERT, log, 0, 115 ngx_log_error(NGX_LOG_ALERT, log, 0,
116 "the configured event method cannot be used with thread pools"); 116 "the configured event method cannot be used with thread pools");
117 return NGX_ERROR; 117 return NGX_ERROR;
118 } 118 }
119 119
120 if (ngx_thread_pool_queue_init(&tp->queue, log) != NGX_OK) { 120 ngx_thread_pool_queue_init(&tp->queue);
121
122 if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
121 return NGX_ERROR; 123 return NGX_ERROR;
122 } 124 }
123 125
124 if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) { 126 if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
125 (void) ngx_thread_pool_queue_destroy(&tp->queue, log); 127 (void) ngx_thread_mutex_destroy(&tp->mtx, log);
126 return NGX_ERROR; 128 return NGX_ERROR;
127 } 129 }
128 130
129 tp->log = log; 131 tp->log = log;
130 tp->pool = pool; 132 tp->pool = pool;
158 160
159 return NGX_OK; 161 return NGX_OK;
160 } 162 }
161 163
162 164
163 static ngx_int_t
164 ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
165 {
166 queue->first = NULL;
167 queue->last = &queue->first;
168
169 return ngx_thread_mutex_create(&queue->mtx, log);
170 }
171
172
173 static ngx_int_t
174 ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
175 {
176 #if 0
177 return ngx_thread_mutex_destroy(&queue->mtx, log);
178 #else
179 return NGX_OK;
180 #endif
181 }
182
183
184 static void 165 static void
185 ngx_thread_pool_destroy(ngx_thread_pool_t *tp) 166 ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
186 { 167 {
187 /* TODO: exit threads */ 168 /* TODO: exit threads */
188 169
189 #if 0 170 #if 0
190 (void) ngx_thread_cond_destroy(&tp->cond, tp->log); 171 (void) ngx_thread_cond_destroy(&tp->cond, tp->log);
191 #endif 172
192 173 (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);
193 (void) ngx_thread_pool_queue_destroy(&tp->queue, tp->log); 174 #endif
194 } 175 }
195 176
196 177
197 ngx_thread_task_t * 178 ngx_thread_task_t *
198 ngx_thread_task_alloc(ngx_pool_t *pool, size_t size) 179 ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
217 ngx_log_error(NGX_LOG_ALERT, tp->log, 0, 198 ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
218 "task #%ui already active", task->id); 199 "task #%ui already active", task->id);
219 return NGX_ERROR; 200 return NGX_ERROR;
220 } 201 }
221 202
222 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { 203 if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
223 return NGX_ERROR; 204 return NGX_ERROR;
224 } 205 }
225 206
226 if (tp->waiting >= tp->max_queue) { 207 if (tp->waiting >= tp->max_queue) {
227 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); 208 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
228 209
229 ngx_log_error(NGX_LOG_ERR, tp->log, 0, 210 ngx_log_error(NGX_LOG_ERR, tp->log, 0,
230 "thread pool \"%V\" queue overflow: %i tasks waiting", 211 "thread pool \"%V\" queue overflow: %i tasks waiting",
231 &tp->name, tp->waiting); 212 &tp->name, tp->waiting);
232 return NGX_ERROR; 213 return NGX_ERROR;
236 217
237 task->id = ngx_thread_pool_task_id++; 218 task->id = ngx_thread_pool_task_id++;
238 task->next = NULL; 219 task->next = NULL;
239 220
240 if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) { 221 if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
241 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); 222 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
242 return NGX_ERROR; 223 return NGX_ERROR;
243 } 224 }
244 225
245 *tp->queue.last = task; 226 *tp->queue.last = task;
246 tp->queue.last = &task->next; 227 tp->queue.last = &task->next;
247 228
248 tp->waiting++; 229 tp->waiting++;
249 230
250 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); 231 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
251 232
252 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, 233 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
253 "task #%ui added to thread pool \"%V\"", 234 "task #%ui added to thread pool \"%V\"",
254 task->id, &tp->name); 235 task->id, &tp->name);
255 236
285 ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed"); 266 ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
286 return NULL; 267 return NULL;
287 } 268 }
288 269
289 for ( ;; ) { 270 for ( ;; ) {
290 if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { 271 if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
291 return NULL; 272 return NULL;
292 } 273 }
293 274
294 /* the number may become negative */ 275 /* the number may become negative */
295 tp->waiting--; 276 tp->waiting--;
296 277
297 while (tp->queue.first == NULL) { 278 while (tp->queue.first == NULL) {
298 if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log) 279 if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
299 != NGX_OK) 280 != NGX_OK)
300 { 281 {
301 (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); 282 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
302 return NULL; 283 return NULL;
303 } 284 }
304 } 285 }
305 286
306 task = tp->queue.first; 287 task = tp->queue.first;
308 289
309 if (tp->queue.first == NULL) { 290 if (tp->queue.first == NULL) {
310 tp->queue.last = &tp->queue.first; 291 tp->queue.last = &tp->queue.first;
311 } 292 }
312 293
313 if (ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log) != NGX_OK) { 294 if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
314 return NULL; 295 return NULL;
315 } 296 }
316 297
317 #if 0 298 #if 0
318 ngx_time_update(); 299 ngx_time_update();
576 557
577 if (tcf == NULL) { 558 if (tcf == NULL) {
578 return NGX_OK; 559 return NGX_OK;
579 } 560 }
580 561
581 if (ngx_thread_pool_queue_init(&ngx_thread_pool_done, cycle->log) 562 ngx_thread_pool_queue_init(&ngx_thread_pool_done);
582 != NGX_OK)
583 {
584 return NGX_ERROR;
585 }
586 563
587 tpp = tcf->pools.elts; 564 tpp = tcf->pools.elts;
588 565
589 for (i = 0; i < tcf->pools.nelts; i++) { 566 for (i = 0; i < tcf->pools.nelts; i++) {
590 if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) { 567 if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
619 tpp = tcf->pools.elts; 596 tpp = tcf->pools.elts;
620 597
621 for (i = 0; i < tcf->pools.nelts; i++) { 598 for (i = 0; i < tcf->pools.nelts; i++) {
622 ngx_thread_pool_destroy(tpp[i]); 599 ngx_thread_pool_destroy(tpp[i]);
623 } 600 }
624 601 }
625 (void) ngx_thread_pool_queue_destroy(&ngx_thread_pool_done, cycle->log);
626 }