# HG changeset patch # User Igor Sysoev # Date 1089010554 0 # Node ID d0451e402e278f88348b56a6673458ef00bad501 # Parent 744ccb59062d4558c040b98efafb1144dac44f81 nginx-0.0.7-2004-07-05-10:55:54 import diff --git a/auto/lib/md5/conf b/auto/lib/md5/conf --- a/auto/lib/md5/conf +++ b/auto/lib/md5/conf @@ -65,7 +65,7 @@ else ngx_lib="rsaref md library" ngx_lib_test="MD5_CTX md5; MD5Init(&md5)" - #ngx_libs=-lmd + ngx_libs=-lmd . auto/lib/test fi diff --git a/auto/os/freebsd b/auto/os/freebsd --- a/auto/os/freebsd +++ b/auto/os/freebsd @@ -54,8 +54,26 @@ if [ \( $version -lt 500000 -a $version -o $version -ge 500018 ] then echo " + using kqueue's NOTE_LOWAT" + have=HAVE_LOWAT_EVENT . auto/have +fi - have=HAVE_LOWAT_EVENT . auto/have + +if [ $USE_THREADS = "rfork" ]; then + + echo " + using rfork()" + + # kqueue's EVFILT_SIGNAL is safe + + if [ $version -gt 460101 ]; then + echo " + kqueue's EVFILT_SIGNAL is safe" + have=HAVE_SAFE_EVFILT_SIGNAL . auto/have + else + echo "$0: error: the kqueue's EVFILT_SIGNAL is unsafe on this" + echo "FreeBSD version, so --with-threads=rfork could not be used" + echo + + exit 1 + fi fi diff --git a/auto/sources b/auto/sources --- a/auto/sources +++ b/auto/sources @@ -135,9 +135,12 @@ POSIX_DEPS=src/os/unix/ngx_posix_config. FREEBSD_DEPS=src/os/unix/ngx_freebsd_config.h FREEBSD_SRCS=src/os/unix/ngx_freebsd_init.c FREEBSD_SENDFILE_SRCS=src/os/unix/ngx_freebsd_sendfile_chain.c +FREEBSD_RFORK_DEPS="src/os/unix/ngx_freebsd_rfork_thread.h" FREEBSD_RFORK_SRCS="src/os/unix/ngx_freebsd_rfork_thread.c" FREEBSD_RFORK_THREAD_SRCS="src/os/unix/rfork_thread.S" +PTHREAD_SRCS="src/os/unix/ngx_pthread_thread.c" + LINUX_DEPS=src/os/unix/ngx_linux_config.h LINUX_SRCS=src/os/unix/ngx_linux_init.c LINUX_SENDFILE_SRCS=src/os/unix/ngx_linux_sendfile_chain.c diff --git a/auto/threads b/auto/threads --- a/auto/threads +++ b/auto/threads @@ -1,13 +1,51 @@ -if [ $USE_THREADS = "rfork" ]; then +case $USE_THREADS in + rfork) + have=NGX_THREADS . auto/have + have=NGX_USE_RFORK . auto/have + CORE_DEPS="$CORE_DEPS $FREEBSD_RFORK_DEPS" + CORE_SRCS="$CORE_SRCS $FREEBSD_RFORK_SRCS" + + case $PLATFORM in + *:i386) + if [ \( $version -gt 500000 -a $version -lt 501000 \) \ + -o $version -lt 491000 ] + then + CORE_SRCS="$CORE_SRCS $FREEBSD_RFORK_THREAD_SRCS" + fi + ;; + esac + ;; + + pthread) + have=NGX_THREADS . auto/have + CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS" + CORE_LIBS="$CORE_LIBS -lpthread" + ;; - have=NGX_THREADS . auto/have - have=NGX_USE_RFORK . auto/have - CORE_DEPS="$CORE_DEPS $UNIX_THREADS_DEPS" - CORE_SRCS="$CORE_SRCS $FREEBSD_RFORK_SRCS" + freebsd4) + have=NGX_THREADS . auto/have + CFLAGS="$CFLAGS -pthread" + CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS" + CORE_LIBS="$CORE_LIBS -pthread" + ;; + + lc_r) + have=NGX_THREADS . auto/have + CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS" + CORE_LIBS="$CORE_LIBS -lc_r" + ;; - if [ $version -lt 501000 ]; then - CORE_SRCS="$CORE_SRCS $FREEBSD_RFORK_THREAD_SRCS" - fi + lthr) + have=NGX_THREADS . auto/have + CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS" + CORE_LIBS="$CORE_LIBS -lthr" + ;; -fi + lkse) + have=NGX_THREADS . auto/have + CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS" + CORE_LIBS="$CORE_LIBS -lkse" + ;; + +esac diff --git a/src/core/nginx.c b/src/core/nginx.c --- a/src/core/nginx.c +++ b/src/core/nginx.c @@ -35,6 +35,24 @@ static ngx_command_t ngx_core_commands[ offsetof(ngx_core_conf_t, worker_processes), NULL }, +#if (NGX_THREADS) + + { ngx_string("worker_threads"), + NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + 0, + offsetof(ngx_core_conf_t, worker_threads), + NULL }, + + { ngx_string("thread_stack_size"), + NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + 0, + offsetof(ngx_core_conf_t, thread_stack_size), + NULL }, + +#endif + { ngx_string("user"), NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE12, ngx_set_user, @@ -106,12 +124,6 @@ int main(int argc, char *const *argv) ctx.argc = argc; ctx.argv = argv; -#if (NGX_THREADS) - if (ngx_time_mutex_init(log) == NGX_ERROR) { - return 1; - } -#endif - if (ngx_getopt(&ctx, &init_cycle) == NGX_ERROR) { return 1; } @@ -341,6 +353,10 @@ static void *ngx_core_module_create_conf ccf->daemon = NGX_CONF_UNSET; ccf->master = NGX_CONF_UNSET; ccf->worker_processes = NGX_CONF_UNSET; +#if (NGX_THREADS) + ccf->worker_threads = NGX_CONF_UNSET; + ccf->thread_stack_size = NGX_CONF_UNSET; +#endif ccf->user = (ngx_uid_t) NGX_CONF_UNSET; ccf->group = (ngx_gid_t) NGX_CONF_UNSET; @@ -356,6 +372,12 @@ static char *ngx_core_module_init_conf(n ngx_conf_init_value(ccf->master, 1); ngx_conf_init_value(ccf->worker_processes, 1); +#if (NGX_THREADS) + ngx_conf_init_value(ccf->worker_threads, 0); + ngx_threads_n = ccf->worker_threads; + ngx_conf_init_size_value(ccf->thread_stack_size, 2 * 1024 * 1024); +#endif + #if !(WIN32) /* TODO: default "nobody" user */ diff --git a/src/core/ngx_cycle.h b/src/core/ngx_cycle.h --- a/src/core/ngx_cycle.h +++ b/src/core/ngx_cycle.h @@ -40,6 +40,12 @@ typedef struct { ngx_str_t pid; ngx_str_t newpid; + +#if (NGX_THREADS) + ngx_int_t worker_threads; + size_t thread_stack_size; +#endif + } ngx_core_conf_t; diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -279,9 +279,9 @@ static ngx_int_t ngx_kqueue_del_event(ng static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags) { + struct kevent *kev, kv; struct timespec ts; ngx_connection_t *c; - struct kevent *kev, kv; c = ev->data; @@ -370,7 +370,7 @@ static ngx_int_t ngx_kqueue_process_even for ( ;; ) { timer = ngx_event_find_timer(); -#if (NGX_THREADS) +#if (NGX_THREADS0) if (timer == NGX_TIMER_ERROR) { return NGX_ERROR; } @@ -621,7 +621,7 @@ static ngx_int_t ngx_kqueue_process_even if (ngx_posted_events) { if (ngx_threaded) { - ngx_cond_signal(ngx_posted_events_cv); + ngx_wakeup_worker_thread(cycle); } else { ngx_event_process_posted(cycle); diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c --- a/src/event/ngx_event.c +++ b/src/event/ngx_event.c @@ -91,7 +91,7 @@ ngx_module_t ngx_events_module = { ngx_events_commands, /* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init module */ - NULL /* init child */ + NULL /* init process */ }; diff --git a/src/event/ngx_event_posted.c b/src/event/ngx_event_posted.c --- a/src/event/ngx_event_posted.c +++ b/src/event/ngx_event_posted.c @@ -8,7 +8,6 @@ ngx_thread_volatile ngx_event_t *ngx_po #if (NGX_THREADS) ngx_mutex_t *ngx_posted_events_mutex; -ngx_cond_t *ngx_posted_events_cv; #endif @@ -57,6 +56,19 @@ void ngx_event_process_posted(ngx_cycle_ #if (NGX_THREADS) +void ngx_wakeup_worker_thread(ngx_cycle_t *cycle) +{ + ngx_int_t i; + + for (i = 0; i < ngx_threads_n; i++) { + if (ngx_threads[i].state == NGX_THREAD_FREE) { + ngx_cond_signal(ngx_threads[i].cv); + return; + } + } +} + + ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle) { ngx_event_t *ev; @@ -71,7 +83,6 @@ ngx_int_t ngx_event_thread_process_poste "posted event " PTR_FMT, ev); if (ev == NULL) { - ngx_mutex_unlock(ngx_posted_events_mutex); return NGX_OK; } @@ -144,4 +155,10 @@ ngx_int_t ngx_event_thread_process_poste } } +#else + +void ngx_wakeup_worker_thread(ngx_cycle_t *cycle) +{ +} + #endif diff --git a/src/event/ngx_event_posted.h b/src/event/ngx_event_posted.h --- a/src/event/ngx_event_posted.h +++ b/src/event/ngx_event_posted.h @@ -26,6 +26,7 @@ void ngx_event_process_posted(ngx_cycle_t *cycle); +void ngx_wakeup_worker_thread(ngx_cycle_t *cycle); extern ngx_thread_volatile ngx_event_t *ngx_posted_events; @@ -34,7 +35,6 @@ extern ngx_thread_volatile ngx_event_t ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle); extern ngx_mutex_t *ngx_posted_events_mutex; -extern ngx_cond_t *ngx_posted_events_cv; #endif diff --git a/src/os/unix/ngx_freebsd_rfork_thread.c b/src/os/unix/ngx_freebsd_rfork_thread.c --- a/src/os/unix/ngx_freebsd_rfork_thread.c +++ b/src/os/unix/ngx_freebsd_rfork_thread.c @@ -10,18 +10,20 @@ /* * The threads implementation uses the rfork(RFPROC|RFTHREAD|RFMEM) syscall * to create threads. All threads use the stacks of the same size mmap()ed - * below the main stack. Thus the current thread id is determinated through - * the stack pointer. + * below the main stack. Thus the current thread id is determinated via + * the stack pointer value. * * The mutex implementation uses the ngx_atomic_cmp_set() operation - * to acquire a mutex and the SysV semaphore to wait on a mutex or to wake up + * to acquire a mutex and the SysV semaphore to wait on a mutex and to wake up * the waiting threads. The light mutex does not use semaphore, so after * spinning in the lock the thread calls sched_yield(). However the light * mutecies are intended to be used with the "trylock" operation only. + * The SysV semop() is a cheap syscall, particularly if it has little sembuf's + * and does not use SEM_UNDO. * - * The condition variable implementation uses the SysV semaphore set of two - * semaphores. The first is used by the CV mutex, and the second is used - * by the CV to signal. + * The condition variable implementation uses signal #64. The signal handler + * is SIG_IGN so the kill() is a cheap syscall. The thread waits a signal + * in kevent(). The use of the EVFILT_SIGNAL is safe since FreeBSD 4.7. * * This threads implementation currently works on i386 (486+) and amd64 * platforms only. @@ -76,7 +78,7 @@ void _spinlock(ngx_atomic_t *lock) for ( ;; ) { if (*lock) { - if (ngx_freebsd_hw_ncpu > 1 && tries++ < 1000) { + if (ngx_ncpu > 1 && tries++ < 1000) { continue; } @@ -110,7 +112,7 @@ void _spinunlock(ngx_atomic_t *lock) #endif -int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg, +int ngx_create_thread(ngx_tid_t *tid, void* (*func)(void *arg), void *arg, ngx_log_t *log) { int id, err; @@ -144,15 +146,10 @@ int ngx_create_thread(ngx_tid_t *tid, in ngx_log_debug2(NGX_LOG_DEBUG_CORE, log, 0, "thread stack: " PTR_FMT "-" PTR_FMT, stack, stack_top); -#if 1 - id = rfork_thread(RFPROC|RFTHREAD|RFMEM, stack_top, func, arg); -#elif 1 - id = rfork_thread(RFPROC|RFMEM, stack_top, func, arg); -#elif 1 - id = rfork_thread(RFFDG|RFCFDG, stack_top, func, arg); -#else - id = rfork(RFFDG|RFCFDG); -#endif + ngx_set_errno(0); + + id = rfork_thread(RFPROC|RFTHREAD|RFMEM, stack_top, + (ngx_rfork_thread_func_pt) func, arg); err = ngx_errno; @@ -174,10 +171,23 @@ int ngx_create_thread(ngx_tid_t *tid, in ngx_int_t ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle) { - size_t len; - char *red_zone, *zone; + char *red_zone, *zone; + size_t len; + ngx_int_t i; + struct sigaction sa; + + max_threads = n + 1; - max_threads = n; + for (i = 0; i < n; i++) { + ngx_memzero(&sa, sizeof(struct sigaction)); + sa.sa_handler = SIG_IGN; + sigemptyset(&sa.sa_mask); + if (sigaction(NGX_CV_SIGNAL, &sa, NULL) == -1) { + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, + "sigaction(%d, SIG_IGN) failed", NGX_CV_SIGNAL); + return NGX_ERROR; + } + } len = sizeof(ngx_freebsd_kern_usrstack); if (sysctlbyname("kern.usrstack", &ngx_freebsd_kern_usrstack, &len, @@ -249,14 +259,6 @@ ngx_tid_t ngx_thread_self() return ngx_pid; } -#if 0 - if (tids[tid] == 0) { - pid = ngx_pid; - tids[tid] = pid; - return pid; - } -#endif - return tids[tid]; } @@ -301,7 +303,7 @@ ngx_mutex_t *ngx_mutex_init(ngx_log_t *l } -void ngx_mutex_done(ngx_mutex_t *m) +void ngx_mutex_destroy(ngx_mutex_t *m) { if (semctl(m->semid, 0, IPC_RMID) == -1) { ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno, @@ -538,43 +540,26 @@ ngx_int_t ngx_mutex_unlock(ngx_mutex_t * ngx_cond_t *ngx_cond_init(ngx_log_t *log) { - ngx_cond_t *cv; - union semun op; + ngx_cond_t *cv; if (!(cv = ngx_alloc(sizeof(ngx_cond_t), log))) { return NULL; } + cv->signo = NGX_CV_SIGNAL; + cv->tid = 0; cv->log = log; - - cv->semid = semget(IPC_PRIVATE, 2, SEM_R|SEM_A); - if (cv->semid == -1) { - ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "semget() failed"); - return NULL; - } - - op.val = 0; - - if (semctl(cv->semid, 0, SETVAL, op) == -1) { - ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "semctl(SETVAL) failed"); - - if (semctl(cv->semid, 0, IPC_RMID) == -1) { - ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, - "semctl(IPC_RMID) failed"); - } - - return NULL; - } + cv->kq = -1; return cv; } -void ngx_cond_done(ngx_cond_t *cv) +void ngx_cond_destroy(ngx_cond_t *cv) { - if (semctl(cv->semid, 0, IPC_RMID) == -1) { + if (close(cv->kq) == -1) { ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, - "semctl(IPC_RMID) failed"); + "kqueue close() failed"); } ngx_free(cv); @@ -583,19 +568,99 @@ void ngx_cond_done(ngx_cond_t *cv) ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m) { - struct sembuf op; + int n; + ngx_err_t err; + struct kevent kev; + struct timespec ts; + + if (cv->kq == -1) { + + /* + * We have to add the EVFILT_SIGNAL filter in the rfork()ed thread. + * Otherwise the thread would not get a signal event. + * + * However, we have not to open the kqueue in the thread, + * it is simply handy do it together. + */ + + cv->kq = kqueue(); + if (cv->kq == -1) { + ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, "kqueue() failed"); + return NGX_ERROR; + } - ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0, - "cv " PTR_FMT " wait", cv); + ngx_log_debug2(NGX_LOG_DEBUG_CORE, cv->log, 0, + "cv kq:%d signo:%d", cv->kq, cv->signo); + + kev.ident = cv->signo; + kev.filter = EVFILT_SIGNAL; + kev.flags = EV_ADD; + kev.fflags = 0; + kev.data = 0; + kev.udata = NULL; + + ts.tv_sec = 0; + ts.tv_nsec = 0; + + if (kevent(cv->kq, &kev, 1, NULL, 0, &ts) == -1) { + ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, "kevent() failed"); + return NGX_ERROR; + } + } + + if (ngx_mutex_unlock(m) == NGX_ERROR) { + return NGX_ERROR; + } - op.sem_num = 0; - op.sem_op = -1; - op.sem_flg = 0; + ngx_log_debug3(NGX_LOG_DEBUG_CORE, cv->log, 0, + "cv " PTR_FMT " wait, kq:%d, signo:%d", + cv, cv->kq, cv->signo); + + for ( ;; ) { + n = kevent(cv->kq, NULL, 0, &kev, 1, NULL); + + ngx_log_debug2(NGX_LOG_DEBUG_CORE, cv->log, 0, + "cv " PTR_FMT " kevent: %d", cv, n); + + if (n == -1) { + err = ngx_errno; + ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, + cv->log, ngx_errno, + "kevent() failed while waiting condition variable " + PTR_FMT, cv); + + if (err == NGX_EINTR) { + break; + } + + return NGX_ERROR; + } - if (semop(cv->semid, &op, 1) == -1) { - ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, - "semop() failed while waiting on cv " PTR_FMT, cv); - return NGX_ERROR; + if (n == 0) { + ngx_log_error(NGX_LOG_ALERT, cv->log, 0, + "kevent() returned no events " + "while waiting condition variable " PTR_FMT, + cv); + continue; + } + + if (kev.filter != EVFILT_SIGNAL) { + ngx_log_error(NGX_LOG_ALERT, cv->log, 0, + "kevent() returned unexpected events: %d " + "while waiting condition variable " PTR_FMT, + kev.filter, cv); + continue; + } + + if (kev.ident != (uintptr_t) cv->signo) { + ngx_log_error(NGX_LOG_ALERT, cv->log, 0, + "kevent() returned unexpected signal: %d ", + "while waiting condition variable " PTR_FMT, + kev.ident, cv); + continue; + } + + break; } ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0, @@ -611,18 +676,14 @@ ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_int_t ngx_cond_signal(ngx_cond_t *cv) { - struct sembuf op; - - ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0, - "cv " PTR_FMT " to signal", cv); + ngx_log_debug3(NGX_LOG_DEBUG_CORE, cv->log, 0, + "cv " PTR_FMT " to signal " PID_T_FMT " %d", + cv, cv->tid, cv->signo); - op.sem_num = 0; - op.sem_op = 1; - op.sem_flg = 0; - - if (semop(cv->semid, &op, 1) == -1) { + if (kill(cv->tid, cv->signo) == -1) { ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, - "semop() failed while signaling cv " PTR_FMT, cv); + "kill() failed while signaling condition variable " + PTR_FMT, cv); return NGX_ERROR; } diff --git a/src/os/unix/ngx_freebsd_rfork_thread.h b/src/os/unix/ngx_freebsd_rfork_thread.h new file mode 100644 --- /dev/null +++ b/src/os/unix/ngx_freebsd_rfork_thread.h @@ -0,0 +1,93 @@ +#ifndef _NGX_FREEBSD_RFORK_THREAD_H_INCLUDED_ +#define _NGX_FREEBSD_RFORK_THREAD_H_INCLUDED_ + + +#include +#include +#include + +typedef pid_t ngx_tid_t; + +#undef ngx_log_pid +#define ngx_log_pid ngx_thread_self() +#define ngx_log_tid 0 + +#define TID_T_FMT PID_T_FMT + + +#define NGX_MUTEX_LIGHT 1 + +#define NGX_MUTEX_LOCK_BUSY 0x80000000 + +typedef volatile struct { + ngx_atomic_t lock; + ngx_log_t *log; + int semid; +} ngx_mutex_t; + + +#define NGX_CV_SIGNAL 64 + +typedef struct { + int signo; + int kq; + ngx_tid_t tid; + ngx_log_t *log; +} ngx_cond_t; + + +#define ngx_thread_sigmask(how, set, oset) \ + (sigprocmask(how, set, oset) == -1) ? ngx_errno : 0 + +#define ngx_thread_sigmask_n "sigprocmask()" + +#define ngx_thread_join(t, p) + +#define ngx_setthrtitle(n) setproctitle(n) + + +extern char *ngx_freebsd_kern_usrstack; +extern size_t ngx_thread_stack_size; + + +static inline int ngx_gettid() +{ + char *sp; + + if (ngx_thread_stack_size == 0) { + return 0; + } + +#if ( __i386__ ) + + __asm__ volatile ("mov %%esp, %0" : "=q" (sp)); + +#elif ( __amd64__ ) + + __asm__ volatile ("mov %%rsp, %0" : "=q" (sp)); + +#else + +#error "rfork()ed threads are not supported on this platform" + +#endif + + return (ngx_freebsd_kern_usrstack - sp) / ngx_thread_stack_size; +} + + +ngx_tid_t ngx_thread_self(); +#define ngx_thread_main() (ngx_gettid() == 0) + + +#define ngx_mutex_trylock(m) ngx_mutex_dolock(m, 1) +#define ngx_mutex_lock(m) ngx_mutex_dolock(m, 0) +ngx_int_t ngx_mutex_dolock(ngx_mutex_t *m, ngx_int_t try); +ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m); + + +typedef int (*ngx_rfork_thread_func_pt)(void *arg); + + + +#endif /* _NGX_FREEBSD_RFORK_THREAD_H_INCLUDED_ */ diff --git a/src/os/unix/ngx_posix_cycle.c b/src/os/unix/ngx_posix_cycle.c deleted file mode 100644 --- a/src/os/unix/ngx_posix_cycle.c +++ /dev/null @@ -1,231 +0,0 @@ - - -void ngx_posix_master_cycle(ngx_cycle_t *cycle) -{ - static ngx_int_t sent; - static ngx_msec_t delay = 125; - - if (ngx_process == NGX_PROCESS_MASTER) { - if (sent) { - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "sent signal cycle"); - - if (sigprocmask(SIG_UNBLOCK, &set, NULL) == -1) { - ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, - "sigprocmask() failed"); - continue; - } - - /* - * there is very big chance that the pending signals - * would be delivered right on the sigprocmask() return - */ - - if (!ngx_signal) { - - if (delay < 15000) { - delay *= 2; - } - - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "msleep %d", delay); - - ngx_msleep(delay); - - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); - - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "wake up"); - } - - if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) { - ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, - "sigprocmask() failed"); - } - - ngx_signal = 0; - - } else { - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "sigsuspend"); - - sigsuspend(&wset); - - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); - - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "wake up"); - } - - } else { /* NGX_PROCESS_SINGLE */ - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "worker cycle"); - - ngx_process_events(cycle->log); - } - - if (ngx_reap) { - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "reap childs"); - - live = 0; - for (i = 0; i < ngx_last_process; i++) { - - ngx_log_debug6(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "child: " PID_T_FMT - " s:%d e:%d t:%d d:%d r:%d", - ngx_processes[i].pid, - ngx_processes[i].signal, - ngx_processes[i].exiting, - ngx_processes[i].exited, - ngx_processes[i].detached, - ngx_processes[i].respawn); - - if (ngx_processes[i].exited) { - - if (ngx_processes[i].respawn - && !ngx_processes[i].exiting - && !ngx_terminate - && !ngx_quit) - { - if (ngx_spawn_process(cycle, - ngx_processes[i].proc, - ngx_processes[i].data, - ngx_processes[i].name, i) - == NGX_ERROR) - { - ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, - "can not respawn %s", - ngx_processes[i].name); - } - - continue; - } - - if (ngx_processes[i].pid == ngx_new_binary) { - ngx_new_binary = 0; - } - - if (i != --ngx_last_process) { - ngx_processes[i--] = - ngx_processes[ngx_last_process]; - } - - } else if (!ngx_processes[i].detached - && (ngx_terminate || ngx_quit)) - { - live = 1; - - } else if (ngx_processes[i].exiting) { - live = 1; - } - } - - if (!live) { - if (ngx_terminate || ngx_quit) { - - if (ngx_inherited && getppid() > 1) { - name = ctx->pid.name.data; - - } else { - name = ctx->name; - } - - if (ngx_delete_file(name) == NGX_FILE_ERROR) { - ngx_log_error(NGX_LOG_ALERT, cycle->log, - ngx_errno, - ngx_delete_file_n - " \"%s\" failed", name); - } - - ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exit"); - exit(0); - - } else { - sent = 0; - } - } - } - - if (ngx_terminate) { - if (delay > 10000) { - signo = SIGKILL; - } else { - signo = ngx_signal_value(NGX_TERMINATE_SIGNAL); - } - - } else if (ngx_quit) { - signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL); - - } else { - - if (ngx_noaccept) { - signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL); - } - - if (ngx_change_binary) { - ngx_change_binary = 0; - ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "changing binary"); - ngx_new_binary = ngx_exec_new_binary(cycle, ctx->argv); - } - - if (ngx_reconfigure) { - signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL); - ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reconfiguring"); - } - - if (ngx_reopen) { - /* STUB */ - signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL); - - ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reopening logs"); - ngx_reopen_files(cycle); - } - } - - if (signo) { - for (i = 0; i < ngx_last_process; i++) { - - if (!ngx_processes[i].detached) { - ngx_processes[i].signal = signo; - - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, - cycle->log, 0, - "signal " PID_T_FMT " %d", - ngx_processes[i].pid, signo); - } - } - - delay = 125; - signo = 0; - } - - for (i = 0; i < ngx_last_process; i++) { - - if (ngx_processes[i].signal == 0) { - continue; - } - - if (ccf->kqueue_signal != 1) { - sent = 1; - } - - ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, - "kill (" PID_T_FMT ", %d)" , - ngx_processes[i].pid, - ngx_processes[i].signal); - - if (kill(ngx_processes[i].pid, ngx_processes[i].signal) == -1) { - ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, - "kill(%d, %d) failed", - ngx_processes[i].pid, ngx_processes[i].signal); - continue; - } - - if (ngx_processes[i].signal != ngx_signal_value(NGX_REOPEN_SIGNAL)) { - ngx_processes[i].exiting = 1; - } - } -} diff --git a/src/os/unix/ngx_process_cycle.c b/src/os/unix/ngx_process_cycle.c --- a/src/os/unix/ngx_process_cycle.c +++ b/src/os/unix/ngx_process_cycle.c @@ -13,7 +13,8 @@ static void ngx_master_exit(ngx_cycle_t static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data); static void ngx_channel_handler(ngx_event_t *ev); #if (NGX_THREADS) -static int ngx_worker_thread_cycle(void *data); +static void ngx_wakeup_worker_threads(ngx_cycle_t *cycle); +static void *ngx_worker_thread_cycle(void *data); #endif @@ -40,6 +41,12 @@ ngx_uint_t ngx_noaccepting; ngx_uint_t ngx_restart; +#if (NGX_THREADS) +volatile ngx_thread_t ngx_threads[NGX_MAX_THREADS]; +ngx_int_t ngx_threads_n; +#endif + + u_char master_process[] = "master process"; @@ -524,9 +531,6 @@ static void ngx_worker_process_cycle(ngx ngx_listening_t *ls; ngx_core_conf_t *ccf; ngx_connection_t *c; -#if (NGX_THREADS) - ngx_tid_t tid; -#endif ngx_process = NGX_PROCESS_WORKER; @@ -620,23 +624,34 @@ static void ngx_worker_process_cycle(ngx #if (NGX_THREADS) - if (ngx_init_threads(5, 128 * 1024 * 1024, cycle) == NGX_ERROR) { + if (ngx_time_mutex_init(cycle->log) == NGX_ERROR) { /* fatal */ exit(2); } - if (!(ngx_posted_events_cv = ngx_cond_init(cycle->log))) { - /* fatal */ - exit(2); - } - - for (i = 0; i < 2; i++) { - if (ngx_create_thread(&tid, ngx_worker_thread_cycle, - cycle, cycle->log) != 0) + if (ngx_threads_n) { + if (ngx_init_threads(ngx_threads_n, + ccf->thread_stack_size, cycle) == NGX_ERROR) { /* fatal */ exit(2); } + + for (n = 0; n < ngx_threads_n; n++) { + + if (!(ngx_threads[n].cv = ngx_cond_init(cycle->log))) { + /* fatal */ + exit(2); + } + + if (ngx_create_thread((ngx_tid_t *) &ngx_threads[n].tid, + ngx_worker_thread_cycle, + (void *) &ngx_threads[n], cycle->log) != 0) + { + /* fatal */ + exit(2); + } + } } #endif @@ -646,6 +661,14 @@ static void ngx_worker_process_cycle(ngx && ngx_event_timer_rbtree == &ngx_event_timer_sentinel) { ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting"); + + +#if (NGX_THREADS) + ngx_terminate = 1; + + ngx_wakeup_worker_threads(cycle); +#endif + /* * we do not destroy cycle->pool here because a signal handler * that uses cycle->log can be called at this point @@ -659,6 +682,11 @@ static void ngx_worker_process_cycle(ngx if (ngx_terminate) { ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting"); + +#if (NGX_THREADS) + ngx_wakeup_worker_threads(cycle); +#endif + /* * we do not destroy cycle->pool here because a signal handler * that uses cycle->log can be called at this point @@ -752,14 +780,53 @@ static void ngx_channel_handler(ngx_even #if (NGX_THREADS) -int ngx_worker_thread_cycle(void *data) +static void ngx_wakeup_worker_threads(ngx_cycle_t *cycle) { - ngx_cycle_t *cycle = data; + ngx_int_t i; + ngx_uint_t live; + + for ( ;; ) { + + live = 0; + + for (i = 0; i < ngx_threads_n; i++) { + if (ngx_threads[i].state < NGX_THREAD_EXIT) { + ngx_cond_signal(ngx_threads[i].cv); + live = 1; + } + + if (ngx_threads[i].state == NGX_THREAD_EXIT) { + ngx_thread_join(ngx_threads[i].tid, NULL); + ngx_threads[i].state = NGX_THREAD_DONE; + } + } - ngx_err_t err; + if (live == 0) { + ngx_log_debug0(NGX_LOG_DEBUG_CORE, cycle->log, 0, + "all worker threads are joined"); + + /* STUB */ + ngx_mutex_destroy(ngx_event_timer_mutex); + ngx_mutex_destroy(ngx_posted_events_mutex); + + return; + } + + ngx_sched_yield(); + } +} + + +static void* ngx_worker_thread_cycle(void *data) +{ + ngx_thread_t *thr = data; + sigset_t set; + ngx_err_t err; struct timeval tv; + thr->cv->tid = ngx_thread_self(); + sigemptyset(&set); sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL)); @@ -767,32 +834,46 @@ int ngx_worker_thread_cycle(void *data) err = ngx_thread_sigmask(SIG_BLOCK, &set, NULL); if (err) { - ngx_log_error(NGX_LOG_ALERT, cycle->log, err, + ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err, ngx_thread_sigmask_n " failed"); - return 1; + return (void *) 1; } ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno, - "thread %d started", ngx_thread_self()); + "thread " TID_T_FMT " started", ngx_thread_self()); + + ngx_setthrtitle("worker thread"); - ngx_setproctitle("worker thread"); + if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { + return (void *) 1; + } for ( ;; ) { - if (ngx_cond_wait(ngx_posted_events_cv, ngx_posted_events_mutex) + thr->state = NGX_THREAD_FREE; + + if (ngx_cond_wait(thr->cv, ngx_posted_events_mutex) == NGX_ERROR) { + return (void *) 1; + } + + if (ngx_terminate) { + thr->state = NGX_THREAD_EXIT; + + ngx_mutex_unlock(ngx_posted_events_mutex); + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno, + "thread %d is done", ngx_thread_self()); + + return (void *) 0; + } + + thr->state = NGX_THREAD_BUSY; + + if (ngx_event_thread_process_posted((ngx_cycle_t *) ngx_cycle) == NGX_ERROR) { - return 1; - } - - if (ngx_event_thread_process_posted(cycle) == NGX_ERROR) { - return 1; + return (void *) 1; } } - - ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno, - "thread %d done", ngx_thread_self()); - - return 0; } #endif diff --git a/src/os/unix/ngx_process_cycle.h b/src/os/unix/ngx_process_cycle.h --- a/src/os/unix/ngx_process_cycle.h +++ b/src/os/unix/ngx_process_cycle.h @@ -14,8 +14,8 @@ typedef struct { - int argc; - char *const *argv; + int argc; + char *const *argv; } ngx_master_ctx_t; diff --git a/src/os/unix/ngx_pthread.c b/src/os/unix/ngx_pthread.c deleted file mode 100644 --- a/src/os/unix/ngx_pthread.c +++ /dev/null @@ -1,26 +0,0 @@ - - -#include - -#include -#include - - -int ngx_create_os_thread(ngx_os_tid_t *tid, void *stack, - ngx_thread_start_routine_t func, void *arg, - ngx_log_t log) -{ - int err; - pthread_attr_t *attr; - - attr = NULL; - - err = pthread_create(tid, attr, func, arg); - - if (err != 0) { - ngx_log_error(NGX_LOG_ERR, log, err, "pthread_create() failed"); - return NGX_ERROR; - } - - return NGX_OK; -} diff --git a/src/os/unix/ngx_pthread.h b/src/os/unix/ngx_pthread.h deleted file mode 100644 --- a/src/os/unix/ngx_pthread.h +++ /dev/null @@ -1,14 +0,0 @@ -#ifndef _NGX_OS_THREAD_H_INCLUDED_ -#define _NGX_OS_THREAD_H_INCLUDED_ - - -#include - - -typedef pthread_t ngx_os_tid_t; -typedef int ngx_tid_t; - -typedef void *(*)(void *) ngx_thread_start_routine_t - - -#endif /* _NGX_OS_THREAD_H_INCLUDED_ */ diff --git a/src/os/unix/ngx_pthread_thread.c b/src/os/unix/ngx_pthread_thread.c new file mode 100644 --- /dev/null +++ b/src/os/unix/ngx_pthread_thread.c @@ -0,0 +1,268 @@ + +/* + * Copyright (C) 2002-2004 Igor Sysoev, http://sysoev.ru/en/ + */ + + +#include +#include + + +static ngx_uint_t nthreads; +static ngx_uint_t max_threads; + + +static pthread_attr_t thr_attr; + + +int ngx_create_thread(ngx_tid_t *tid, void* (*func)(void *arg), void *arg, + ngx_log_t *log) +{ + int err; + + if (nthreads >= max_threads) { + ngx_log_error(NGX_LOG_CRIT, log, 0, + "no more than %d threads can be created", max_threads); + return NGX_ERROR; + } + + err = pthread_create(tid, &thr_attr, func, arg); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_create() failed"); + return err; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "thread is created: " TID_T_FMT, *tid); + + nthreads++; + + return err; +} + + +ngx_int_t ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle) +{ + int err; + + max_threads = n; + + err = pthread_attr_init(&thr_attr); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, cycle->log, err, + "pthread_attr_init() failed"); + return NGX_ERROR; + } + + err = pthread_attr_setstacksize(&thr_attr, size); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, cycle->log, err, + "pthread_attr_setstacksize() failed"); + return NGX_ERROR; + } + + ngx_threaded = 1; + + return NGX_OK; +} + + +ngx_mutex_t *ngx_mutex_init(ngx_log_t *log, uint flags) +{ + int err; + ngx_mutex_t *m; + + if (!(m = ngx_alloc(sizeof(ngx_mutex_t), log))) { + return NULL; + } + + m->log = log; + + err = pthread_mutex_init(&m->mutex, NULL); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, m->log, err, + "pthread_mutex_init() failed"); + return NULL; + } + + return m; +} + + +void ngx_mutex_destroy(ngx_mutex_t *m) +{ + int err; + + err = pthread_mutex_destroy(&m->mutex); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, m->log, err, + "pthread_mutex_destroy(" PTR_FMT ") failed", m); + } + + ngx_free(m); +} + + +ngx_int_t ngx_mutex_lock(ngx_mutex_t *m) +{ + int err; + + if (!ngx_threaded) { + return NGX_OK; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, "lock mutex " PTR_FMT, m); + + err = pthread_mutex_lock(&m->mutex); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, m->log, err, + "pthread_mutex_lock(" PTR_FMT ") failed", m); + return NGX_ERROR; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, + "mutex " PTR_FMT " is locked", m); + + return NGX_OK; +} + + +ngx_int_t ngx_mutex_trylock(ngx_mutex_t *m) +{ + int err; + + if (!ngx_threaded) { + return NGX_OK; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, "try lock mutex " PTR_FMT, m); + + err = pthread_mutex_trylock(&m->mutex); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, m->log, err, + "pthread_mutex_trylock(" PTR_FMT ") failed", m); + return NGX_ERROR; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, + "mutex " PTR_FMT " is locked", m); + + return NGX_OK; +} + + +ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m) +{ + int err; + + if (!ngx_threaded) { + return NGX_OK; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, "unlock mutex " PTR_FMT, m); + + err = pthread_mutex_unlock(&m->mutex); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, m->log, err, + "pthread_mutex_unlock(" PTR_FMT ") failed", m); + return NGX_ERROR; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, + "mutex " PTR_FMT " is unlocked", m); + + return NGX_OK; +} + + +ngx_cond_t *ngx_cond_init(ngx_log_t *log) +{ + int err; + ngx_cond_t *cv; + + if (!(cv = ngx_alloc(sizeof(ngx_cond_t), log))) { + return NULL; + } + + cv->log = log; + + err = pthread_cond_init(&cv->cond, NULL); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, cv->log, err, + "pthread_cond_init() failed"); + return NULL; + } + + return cv; +} + + +void ngx_cond_destroy(ngx_cond_t *cv) +{ + int err; + + err = pthread_cond_destroy(&cv->cond); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, cv->log, err, + "pthread_cond_destroy(" PTR_FMT ") failed", cv); + } + + ngx_free(cv); +} + + +ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m) +{ + int err; + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0, + "cv " PTR_FMT " wait", cv); + + err = pthread_cond_wait(&cv->cond, &m->mutex); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, cv->log, err, + "pthread_cond_wait(" PTR_FMT ") failed", cv); + return NGX_ERROR; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0, + "cv " PTR_FMT " is waked up", cv); + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, + "mutex " PTR_FMT " is locked", m); + + return NGX_OK; +} + + +ngx_int_t ngx_cond_signal(ngx_cond_t *cv) +{ + int err; + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0, + "cv " PTR_FMT " to signal", cv); + + err = pthread_cond_signal(&cv->cond); + + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, cv->log, err, + "pthread_cond_signal(" PTR_FMT ") failed", cv); + return NGX_ERROR; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0, + "cv " PTR_FMT " is signaled", cv); + + return NGX_OK; +} diff --git a/src/os/unix/ngx_thread.h b/src/os/unix/ngx_thread.h --- a/src/os/unix/ngx_thread.h +++ b/src/os/unix/ngx_thread.h @@ -7,111 +7,84 @@ #if (NGX_THREADS) -#define ngx_thread_volatile volatile +#define NGX_MAX_THREADS 128 #if (NGX_USE_RFORK) - -#include -#include -#include - -typedef pid_t ngx_tid_t; - -#undef ngx_log_pid -#define ngx_log_pid ngx_thread_self() -#define ngx_log_tid 0 - -#define TID_T_FMT PID_T_FMT - - -#define NGX_MUTEX_LIGHT 1 -#define NGX_MUTEX_CV 2 - -#define NGX_MUTEX_LOCK_BUSY 0x80000000 - -typedef volatile struct { - ngx_atomic_t lock; - ngx_log_t *log; - int semid; -} ngx_mutex_t; - - -typedef struct { - int semid; - ngx_log_t *log; -} ngx_cond_t; - - -#define ngx_thread_sigmask(how, set, oset) \ - (sigprocmask(how, set, oset) == -1) ? ngx_errno : 0 - -#define ngx_thread_sigmask_n "sigprocmask()" - - -extern char *ngx_freebsd_kern_usrstack; -extern size_t ngx_thread_stack_size; - -static inline int ngx_gettid() -{ - char *sp; - - if (ngx_thread_stack_size == 0) { - return 0; - } - -#if ( __i386__ ) - - __asm__ volatile ("mov %%esp, %0" : "=q" (sp)); - -#elif ( __amd64__ ) - - __asm__ volatile ("mov %%rsp, %0" : "=q" (sp)); - -#else - -#error "rfork()ed threads are not supported on this platform" - -#endif - - return (ngx_freebsd_kern_usrstack - sp) / ngx_thread_stack_size; -} - - -#define ngx_thread_main() (ngx_gettid() == 0) +#include #else /* use pthreads */ #include +#include typedef pthread_t ngx_tid_t; -#define ngx_gettid() ((ngx_int_t) pthread_getspecific(0)) -#define ngx_log_tid ngx_thread_self() +#define ngx_thread_self() pthread_self() +#define ngx_thread_main() pthread_main_np() +#define ngx_log_tid (int) ngx_thread_self() + +#define TID_T_FMT PTR_FMT + + +#define NGX_MUTEX_LIGHT 0 + +typedef struct { + pthread_mutex_t mutex; + ngx_log_t *log; +} ngx_mutex_t; + +typedef struct { + pthread_cond_t cond; + ngx_tid_t tid; + ngx_log_t *log; +} ngx_cond_t; #define ngx_thread_sigmask pthread_sigmask #define ngx_thread_sigmask_n "pthread_sigmask()" +#define ngx_thread_join(t, p) pthread_join(t, p) + +#define ngx_setthrtitle(n) + + + +ngx_int_t ngx_mutex_trylock(ngx_mutex_t *m); +ngx_int_t ngx_mutex_lock(ngx_mutex_t *m); +ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m); + #endif +#define ngx_thread_volatile volatile + + +typedef struct { + ngx_tid_t tid; + ngx_cond_t *cv; + ngx_uint_t state; +} ngx_thread_t; + +#define NGX_THREAD_FREE 1 +#define NGX_THREAD_BUSY 2 +#define NGX_THREAD_EXIT 3 +#define NGX_THREAD_DONE 4 + +extern ngx_int_t ngx_threads_n; +extern volatile ngx_thread_t ngx_threads[NGX_MAX_THREADS]; + + ngx_int_t ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle); -int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg, +int ngx_create_thread(ngx_tid_t *tid, void* (*func)(void *arg), void *arg, ngx_log_t *log); -ngx_tid_t ngx_thread_self(); ngx_mutex_t *ngx_mutex_init(ngx_log_t *log, uint flags); -void ngx_mutex_done(ngx_mutex_t *m); - -#define ngx_mutex_trylock(m) ngx_mutex_dolock(m, 1) -#define ngx_mutex_lock(m) ngx_mutex_dolock(m, 0) -ngx_int_t ngx_mutex_dolock(ngx_mutex_t *m, ngx_int_t try); -ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m); +void ngx_mutex_destroy(ngx_mutex_t *m); ngx_cond_t *ngx_cond_init(ngx_log_t *log); -void ngx_cond_done(ngx_cond_t *cv); +void ngx_cond_destroy(ngx_cond_t *cv); ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m); ngx_int_t ngx_cond_signal(ngx_cond_t *cv);