comparison src/os/unix/ngx_freebsd_rfork_thread.c @ 266:5238e93961a1

nginx-0.0.2-2004-02-23-23:57:12 import
author Igor Sysoev <igor@sysoev.ru>
date Mon, 23 Feb 2004 20:57:12 +0000
parents 0b67be7d4489
children 83205e0b5522
comparison
equal deleted inserted replaced
265:6468241715e6 266:5238e93961a1
1 1
2 #include <ngx_config.h> 2 #include <ngx_config.h>
3 #include <ngx_core.h> 3 #include <ngx_core.h>
4 4
5 5 /*
6 extern int __isthreaded; 6 * The threads implementation uses the rfork(RFPROC|RFTHREAD|RFMEM)
7 7 * to create threads. All threads use the stacks of the same size mmap()ed
8 8 * below the main stack. Thus the stack pointer is used to determine
9 typedef int ngx_tid_t; 9 * the current thread id.
10 *
11 * The mutex implementation uses the ngx_atomic_cmp_set() operation
12 * to acquire mutex and the SysV semaphore to wait on a mutex or to wake up
13 * the waiting threads.
14 *
15 * The condition variable implementation uses the SysV semaphore set of two
16 * semaphores. The first is used by the CV mutex, and the second is used
17 * by CV itself.
18 */
19
20
21 extern int __isthreaded;
10 22
11 23
12 static inline int ngx_gettid(); 24 static inline int ngx_gettid();
13 25
14 26
15 static char *usrstack; 27 static char *usrstack;
16 static int red_zone = 4096; 28 static size_t rz_size = /* STUB: PAGE_SIZE */ 4096;
17 29
18 static size_t stack_size; 30 static size_t stack_size;
19 static size_t usable_stack_size; 31 static size_t usable_stack_size;
20 static char *last_stack; 32 static char *last_stack;
21 33
22 static int threads; 34 static ngx_uint_t nthreads;
23 static int nthreads; 35 static ngx_uint_t max_threads;
24 static ngx_tid_t *tids; 36 static ngx_tid_t *tids; /* the threads tids array */
37
25 38
26 /* the thread-safe errno */ 39 /* the thread-safe errno */
27 40
28 static int errno0; /* the main thread's errno */ 41 static int errno0; /* the main thread's errno */
29 static int *errnos; 42 static int *errnos; /* the threads errno's array */
30 43
31 int *__error() 44 int *__error()
32 { 45 {
33 int tid; 46 int tid;
34 47
39 52
40 53
41 int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg, 54 int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg,
42 ngx_log_t *log) 55 ngx_log_t *log)
43 { 56 {
44 int id, err; 57 int id, err;
45 char *stack, *stack_top; 58 char *stack, *stack_top;
46 59
47 if (threads >= nthreads) { 60 if (nthreads >= max_threads) {
48 ngx_log_error(NGX_LOG_CRIT, log, 0, 61 ngx_log_error(NGX_LOG_CRIT, log, 0,
49 "no more than %d threads can be created", nthreads); 62 "no more than %d threads can be created", max_threads);
50 return NGX_ERROR; 63 return NGX_ERROR;
51 } 64 }
52 65
53 last_stack -= stack_size; 66 last_stack -= stack_size;
67
54 stack = mmap(last_stack, usable_stack_size, PROT_READ|PROT_WRITE, 68 stack = mmap(last_stack, usable_stack_size, PROT_READ|PROT_WRITE,
55 MAP_STACK, -1, 0); 69 MAP_STACK, -1, 0);
70
56 if (stack == MAP_FAILED) { 71 if (stack == MAP_FAILED) {
57 ngx_log_error(NGX_LOG_ALERT, log, errno, 72 ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
58 "mmap(%08X:%d, MAP_STACK) thread stack failed", 73 "mmap(%08X:%d, MAP_STACK) thread stack failed",
59 last_stack, usable_stack_size); 74 last_stack, usable_stack_size);
60 return NGX_ERROR; 75 return NGX_ERROR;
61 } 76 }
62 77
64 ngx_log_error(NGX_LOG_ALERT, log, 0, "stack address was changed"); 79 ngx_log_error(NGX_LOG_ALERT, log, 0, "stack address was changed");
65 } 80 }
66 81
67 stack_top = stack + usable_stack_size; 82 stack_top = stack + usable_stack_size;
68 83
69 printf("stack: %08X-%08X\n", stack, stack_top); 84 ngx_log_debug2(NGX_LOG_DEBUG_CORE, log, 0,
85 "thread stack: %08X-%08X", stack, stack_top);
70 86
71 #if 1 87 #if 1
72 id = rfork_thread(RFPROC|RFTHREAD|RFMEM, stack_top, func, arg); 88 id = rfork_thread(RFPROC|RFTHREAD|RFMEM, stack_top, func, arg);
73 #elif 1 89 #elif 1
74 id = rfork_thread(RFPROC|RFMEM, stack_top, func, arg); 90 id = rfork_thread(RFPROC|RFMEM, stack_top, func, arg);
76 id = rfork_thread(RFFDG|RFCFDG, stack_top, func, arg); 92 id = rfork_thread(RFFDG|RFCFDG, stack_top, func, arg);
77 #else 93 #else
78 id = rfork(RFFDG|RFCFDG); 94 id = rfork(RFFDG|RFCFDG);
79 #endif 95 #endif
80 96
81 err = errno; 97 err = ngx_errno;
82 98
83 if (id == -1) { 99 if (id == -1) {
84 ngx_log_error(NGX_LOG_ALERT, log, err, "rfork() failed"); 100 ngx_log_error(NGX_LOG_ALERT, log, err, "rfork() failed");
85 101
86 } else { 102 } else {
87 *tid = id; 103 *tid = id;
88 threads = (usrstack - stack_top) / stack_size; 104 nthreads = (usrstack - stack_top) / stack_size;
89 tids[threads] = id; 105 tids[nthreads] = id;
90 106
91 /* allow the spinlock in libc malloc() */ 107 ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, "rfork()ed thread: %d", id);
92 __isthreaded = 1;
93 } 108 }
94 109
95 return err; 110 return err;
96 } 111 }
97 112
98 113
99 int ngx_init_thread_env(int n, size_t size, ngx_log_t *log) 114 ngx_int_t ngx_init_threads(int n, size_t size, ngx_log_t *log)
100 { 115 {
101 int len; 116 int len;
102 char *rz, *zone; 117 char *red_zone, *zone;
103 118
104 nthreads = n; 119 max_threads = n;
105 120
106 len = 4; 121 len = 4;
107 if (sysctlbyname("kern.usrstack", &usrstack, &len, NULL, 0) == -1) { 122 if (sysctlbyname("kern.usrstack", &usrstack, &len, NULL, 0) == -1) {
108 ngx_log_error(NGX_LOG_ALERT, log, errno, 123 ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
109 "sysctlbyname(kern.usrstack) failed"); 124 "sysctlbyname(kern.usrstack) failed");
110 return NGX_ERROR; 125 return NGX_ERROR;
111 } 126 }
112 127
113 printf("usrstack: %08X\n", usrstack); 128 /* the main thread stack red zone */
114 129 red_zone = usrstack - (size + rz_size);
115 /* red zone */ 130
116 rz = usrstack - (size + red_zone); 131 ngx_log_debug2(NGX_LOG_DEBUG_CORE, log, 0,
117 132 "usrstack: %08X, red zone: %08X", usrstack, red_zone);
118 printf("red zone: %08X\n", rz); 133
119 134 zone = mmap(red_zone, rz_size, PROT_NONE, MAP_ANON, -1, 0);
120 zone = mmap(rz, red_zone, PROT_NONE, MAP_ANON, -1, 0);
121 if (zone == MAP_FAILED) { 135 if (zone == MAP_FAILED) {
122 ngx_log_error(NGX_LOG_ALERT, log, errno, 136 ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
123 "mmap(%08X:%d, PROT_NONE, MAP_ANON) red zone failed", 137 "mmap(%08X:%d, PROT_NONE, MAP_ANON) red zone failed",
124 rz, red_zone); 138 red_zone, rz_size);
125 return NGX_ERROR; 139 return NGX_ERROR;
126 } 140 }
127 141
128 if (zone != rz) { 142 if (zone != red_zone) {
129 ngx_log_error(NGX_LOG_ALERT, log, 0, "red zone address was changed"); 143 ngx_log_error(NGX_LOG_ALERT, log, 0, "red zone address was changed");
130 } 144 }
131 145
132 /* create the thread errno array */ 146 /* create the threads errno array */
133 ngx_test_null(errnos, ngx_calloc(n * sizeof(int), log), NGX_ERROR); 147
134 148 if (!(errnos = ngx_calloc(n * sizeof(int), log))) {
135 /* create the thread tid array */ 149 return NGX_ERROR;
136 ngx_test_null(tids, ngx_calloc((n + 1) * sizeof(ngx_tid_t), log), 150 }
137 NGX_ERROR); 151
138 152 /* create the threads tid array */
139 tids[0] = ngx_getpid(); 153
140 threads = 1; 154 if (!(tids = ngx_calloc((n + 1) * sizeof(ngx_tid_t), log))) {
141 155 return NGX_ERROR;
142 last_stack = zone + red_zone; 156 }
157
158 tids[0] = ngx_pid;
159 nthreads = 1;
160
161 last_stack = zone + rz_size;
143 usable_stack_size = size; 162 usable_stack_size = size;
144 stack_size = size + red_zone; 163 stack_size = size + rz_size;
164
165 /* allow the spinlock in libc malloc() */
166 __isthreaded = 1;
145 167
146 return NGX_OK; 168 return NGX_OK;
169 }
170
171
172 static inline int ngx_gettid()
173 {
174 char *sp;
175
176 if (stack_size == 0) {
177 return 0;
178 }
179
180 __asm__ ("mov %%esp, %0" : "=q" (sp));
181
182 return (usrstack - sp) / stack_size;
147 } 183 }
148 184
149 185
150 ngx_tid_t ngx_thread_self() 186 ngx_tid_t ngx_thread_self()
151 { 187 {
152 int tid; 188 int tid;
153 ngx_tid_t pid; 189 ngx_tid_t pid;
154 190
155 tid = ngx_gettid(); 191 tid = ngx_gettid();
156 192
193 if (tids == NULL) {
194 return ngx_pid;
195 }
196
197 #if 0
157 if (tids[tid] == 0) { 198 if (tids[tid] == 0) {
158 pid = ngx_getpid(); 199 pid = ngx_pid;
159 tids[tid] = pid; 200 tids[tid] = pid;
160 return pid; 201 return pid;
161 } 202 }
203 #endif
162 204
163 return tids[tid]; 205 return tids[tid];
164 } 206 }
165 207
166 208
167 static inline int ngx_gettid() 209 ngx_mutex_t *ngx_mutex_init(ngx_log_t *log, uint flags)
168 { 210 {
169 char *sp; 211 int nsem, i;
170 212 ngx_mutex_t *m;
171 if (stack_size == 0) { 213 union semun op;
172 return 0; 214
173 } 215 if (!(m = ngx_alloc(sizeof(ngx_mutex_t), log))) {
174 216 return NULL;
175 __asm__ ("mov %%esp, %0" : "=q" (sp)); 217 }
176 218
177 return (usrstack - sp) / stack_size; 219 m->lock = 0;
178 } 220 m->log = log;
221
222 if (flags & NGX_MUTEX_LIGHT) {
223 m->semid = -1;
224 return m;
225 }
226
227 nsem = flags & NGX_MUTEX_CV ? 2 : 1;
228
229 m->semid = semget(IPC_PRIVATE, nsem, SEM_R|SEM_A);
230 if (m->semid == -1) {
231 ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "semget() failed");
232 return NULL;
233 }
234
235 op.val = 0;
236 for (i = 0; i < nsem; i++) {
237 if (semctl(m->semid, i, SETVAL, op) == -1) {
238 ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
239 "semctl(SETVAL) failed");
240
241 if (semctl(m->semid, 0, IPC_RMID) == -1) {
242 ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
243 "semctl(IPC_RMID) failed");
244 }
245
246 return NULL;
247 }
248 }
249
250 return m;
251 }
252
253
254 void ngx_mutex_done(ngx_mutex_t *m)
255 {
256 if (semctl(m->semid, 0, IPC_RMID) == -1) {
257 ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
258 "semctl(IPC_RMID) failed");
259 }
260
261 ngx_free(m);
262 }
263
264
265 ngx_int_t ngx_mutex_do_lock(ngx_mutex_t *m, ngx_int_t try)
266 {
267 uint32_t lock, new, old;
268 ngx_uint_t tries;
269 struct sembuf op;
270
271 #if (NGX_DEBUG)
272 if (try) {
273 ngx_log_debug2(NGX_LOG_DEBUG_CORE, m->log, 0,
274 "try lock mutex %08X lock:%X", m, m->lock);
275 } else {
276 ngx_log_debug2(NGX_LOG_DEBUG_CORE, m->log, 0,
277 "lock mutex %08X lock:%X", m, m->lock);
278 }
279 #endif
280
281 old = m->lock;
282 tries = 0;
283
284 for ( ;; ) {
285 if (old & NGX_MUTEX_LOCK_BUSY) {
286
287 if (try) {
288 return NGX_AGAIN;
289 }
290
291 if (ngx_freebsd_hw_ncpu > 1 && tries++ < 1000) {
292
293 /* the spinlock is used only on the SMP system */
294
295 old = m->lock;
296 continue;
297 }
298
299 if (m->semid == -1) {
300 sched_yield();
301
302 tries = 0;
303 old = m->lock;
304 continue;
305 }
306
307 ngx_log_debug2(NGX_LOG_DEBUG_CORE, m->log, 0,
308 "mutex %08X lock:%X", m, m->lock);
309
310 /*
311 * The mutex is locked so we increase a number
312 * of the threads that are waiting on the mutex
313 */
314
315 lock = old + 1;
316
317 if ((lock & ~NGX_MUTEX_LOCK_BUSY) > nthreads) {
318 ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
319 "%d threads wait for mutex %0X, "
320 "while only %d threads are available",
321 lock & ~NGX_MUTEX_LOCK_BUSY, m, nthreads);
322 return NGX_ERROR;
323 }
324
325 if (ngx_atomic_cmp_set(&m->lock, old, lock)) {
326
327 ngx_log_debug2(NGX_LOG_DEBUG_CORE, m->log, 0,
328 "wait mutex %08X lock:%X", m, m->lock);
329
330 /*
331 * The number of the waiting threads has been increased
332 * and we would wait on the SysV semaphore.
333 * A semaphore should wake up us more efficiently than
334 * a simple usleep().
335 */
336
337 op.sem_num = 0;
338 op.sem_op = -1;
339 op.sem_flg = SEM_UNDO;
340
341 if (semop(m->semid, &op, 1) == -1) {
342 ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
343 "semop() failed while waiting "
344 "on mutex %08X", m);
345 return NGX_ERROR;
346 }
347
348 tries = 0;
349 old = m->lock;
350 continue;
351 }
352
353 old = m->lock;
354
355 } else {
356 lock = old | NGX_MUTEX_LOCK_BUSY;
357
358 if (ngx_atomic_cmp_set(&m->lock, old, lock)) {
359
360 /* we locked the mutex */
361
362 break;
363 }
364
365 old = m->lock;
366 }
367
368 if (tries++ > 1000) {
369
370 ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
371 "mutex %08X is contested", m);
372
373 /* the mutex is probably contested so we are giving up now */
374
375 sched_yield();
376
377 tries = 0;
378 old = m->lock;
379 }
380 }
381
382 ngx_log_debug2(NGX_LOG_DEBUG_CORE, m->log, 0,
383 "mutex %08X is locked, lock:%X", m, m->lock);
384
385 return NGX_OK;
386 }
387
388
389 ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m)
390 {
391 uint32_t lock, new, old;
392 struct sembuf op;
393
394 old = m->lock;
395
396 if (!(old & NGX_MUTEX_LOCK_BUSY)) {
397 ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
398 "tring to unlock the free mutex %0X", m);
399 return NGX_ERROR;
400 }
401
402 /* free the mutex */
403
404 for ( ;; ) {
405 lock = old & ~NGX_MUTEX_LOCK_BUSY;
406
407 if (ngx_atomic_cmp_set(&m->lock, old, lock)) {
408 break;
409 }
410
411 old = m->lock;
412 }
413
414 if (m->semid == -1) {
415 ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
416 "mutex %08X is unlocked", m);
417
418 return NGX_OK;
419 }
420
421 /* check weather we need to wake up a waiting thread */
422
423 old = m->lock;
424
425 for ( ;; ) {
426 if (old & NGX_MUTEX_LOCK_BUSY) {
427
428 /* the mutex is just locked by another thread */
429
430 break;
431 }
432
433 if (old == 0) {
434 break;
435 }
436
437 /* there are the waiting threads */
438
439 lock = old - 1;
440
441 if (ngx_atomic_cmp_set(&m->lock, old, lock)) {
442
443 /* wake up the thread that waits on semaphore */
444
445 op.sem_num = 0;
446 op.sem_op = 1;
447 op.sem_flg = SEM_UNDO;
448
449 if (semop(m->semid, &op, 1) == -1) {
450 ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
451 "semop() failed while waking up on mutex %08X",
452 m);
453 return NGX_ERROR;
454 }
455
456 break;
457 }
458
459 old = m->lock;
460 }
461
462 ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
463 "mutex %08X is unlocked", m);
464
465 return NGX_OK;
466 }