# HG changeset patch # User Igor Sysoev # Date 1080746806 0 # Node ID 744965ec62756ac0187069d4ad6642578073f628 # Parent 502b03d9d2a38b1b2c6860b3e49d9c4711985d18 nginx-0.0.3-2004-03-31-19:26:46 import diff --git a/src/core/nginx.c b/src/core/nginx.c --- a/src/core/nginx.c +++ b/src/core/nginx.c @@ -174,10 +174,6 @@ int main(int argc, char *const *argv) } } - if (ccf->worker_processes == NGX_CONF_UNSET) { - ccf->worker_processes = 1; - } - if (ccf->pid.len == 0) { ccf->pid.len = sizeof(NGINX_PID) - 1; ccf->pid.data = NGINX_PID; diff --git a/src/core/ngx_atomic.h b/src/core/ngx_atomic.h --- a/src/core/ngx_atomic.h +++ b/src/core/ngx_atomic.h @@ -25,6 +25,7 @@ static ngx_inline uint32_t ngx_atomic_in NGX_SMP_LOCK " xaddl %0, %2; " + " incl %0; " : "=q" (old) : "0" (1), "m" (*value)); @@ -40,6 +41,7 @@ static ngx_inline uint32_t ngx_atomic_de NGX_SMP_LOCK " xaddl %0, %1; " + " decl %0; " : "=q" (old) : "0" (-1), "m" (*value)); @@ -65,6 +67,15 @@ static ngx_inline uint32_t ngx_atomic_cm return res; } + +#elif (WIN32) + +#define ngx_atomic_inc(x) InterlockedIncrement +#define ngx_atomic_dec(x) InterlockedDecrement +#define ngx_atomic_cmp_set(lock, old, set) \ + InterlockedCompareExchange(lock, set, old) + + #else typedef volatile uint32_t ngx_atomic_t; diff --git a/src/core/ngx_file.c b/src/core/ngx_file.c --- a/src/core/ngx_file.c +++ b/src/core/ngx_file.c @@ -56,7 +56,7 @@ int ngx_create_temp_file(ngx_file_t *fil ngx_create_hashed_filename(file, path); -#if 0 +#if 1 file->fd = ngx_open_tempfile(file->name.data, persistent); #else file->fd = ngx_open_tempfile(file->name.data, 1); diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -21,7 +21,7 @@ static void ngx_kqueue_done(ngx_cycle_t static int ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags); static int ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags); static int ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags); -static int ngx_kqueue_process_events(ngx_log_t *log); +static int ngx_kqueue_process_events(ngx_cycle_t *cycle); #if (NGX_THREADS) static void ngx_kqueue_thread_handler(ngx_event_t *ev); #endif @@ -343,10 +343,10 @@ static int ngx_kqueue_set_event(ngx_even } -static ngx_int_t ngx_kqueue_process_events(ngx_log_t *log) +static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle) { int events; - ngx_int_t instance, i; + ngx_int_t i, instance; ngx_err_t err; ngx_msec_t timer; ngx_event_t *ev; @@ -370,6 +370,18 @@ static ngx_int_t ngx_kqueue_process_even ngx_old_elapsed_msec = ngx_elapsed_msec; + if (ngx_accept_mutex) { + if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { + return NGX_ERROR; + } + +#if 1 + if (ngx_accept_token == 0 && timer == 0) { + /* STUB */ timer = 500; + } +#endif + } + if (timer) { ts.tv_sec = timer / 1000; ts.tv_nsec = (timer % 1000) * 1000000; @@ -379,7 +391,8 @@ static ngx_int_t ngx_kqueue_process_even tp = NULL; } - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, "kevent timer: %d", timer); + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "kevent timer: %d", timer); events = kevent(ngx_kqueue, change_list, nchanges, event_list, nevents, tp); @@ -394,40 +407,54 @@ static ngx_int_t ngx_kqueue_process_even ngx_gettimeofday(&tv); ngx_time_update(tv.tv_sec); - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, "kevent events: %d", events); + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "kevent events: %d", events); delta = ngx_elapsed_msec; ngx_elapsed_msec = tv.tv_sec * 1000 + tv.tv_usec / 1000 - ngx_start_msec; if (err) { ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, - log, err, "kevent() failed"); + cycle->log, err, "kevent() failed"); + + if (ngx_accept_token) { + *ngx_accept_mutex = 0; + } + return NGX_ERROR; } if (timer) { delta = ngx_elapsed_msec - delta; - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, log, 0, + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "kevent timer: %d, delta: %d", timer, (int) delta); } else { if (events == 0) { - ngx_log_error(NGX_LOG_ALERT, log, 0, + ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "kevent() returned no events without timeout"); + + if (ngx_accept_token) { + *ngx_accept_mutex = 0; + } + return NGX_ERROR; } } -#if (NGX_THREADS0) if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { + + if (ngx_accept_token) { + *ngx_accept_mutex = 0; + } + return NGX_ERROR; } -#endif for (i = 0; i < events; i++) { - ngx_log_debug6(NGX_LOG_DEBUG_EVENT, log, 0, + ngx_log_debug6(NGX_LOG_DEBUG_EVENT, cycle->log, 0, (event_list[i].ident > 0x8000000 && event_list[i].ident != (unsigned) -1) ? @@ -440,7 +467,7 @@ static ngx_int_t ngx_kqueue_process_even event_list[i].data, event_list[i].udata); if (event_list[i].flags & EV_ERROR) { - ngx_log_error(NGX_LOG_ALERT, log, event_list[i].data, + ngx_log_error(NGX_LOG_ALERT, cycle->log, event_list[i].data, "kevent() error on %d", event_list[i].ident); continue; } @@ -454,15 +481,16 @@ static ngx_int_t ngx_kqueue_process_even instance = (uintptr_t) ev & 1; ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1); + ev->returned_instance = instance; - if (ev->active == 0 || ev->instance != instance) { + if (!ev->active || ev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "kevent: stale event " PTR_FMT, ev); continue; } @@ -494,30 +522,29 @@ static ngx_int_t ngx_kqueue_process_even break; default: - ngx_log_error(NGX_LOG_ALERT, log, 0, + ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "unexpected kevent() filter %d", event_list[i].filter); continue; } -#if (NGX_THREADS0) - - if (ngx_threaded) { - - if (ev->light) { - /* - * The light events are the accept event, - * or the event that waits in the mutex queue - we need to - * remove it from the mutex queue before the inserting into - * the posted events queue. - */ +#if 0 + if (ngx_threaded || ngx_accept_token) { +#endif + if (ngx_accept_token) { + if (ev->accept) { ngx_mutex_unlock(ngx_posted_events_mutex); ev->event_handler(ev); if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { + + if (ngx_accept_token) { + *ngx_accept_mutex = 0; + } + return NGX_ERROR; } @@ -529,36 +556,55 @@ static ngx_int_t ngx_kqueue_process_even continue; } -#endif - ev->event_handler(ev); } -#if (NGX_THREADS0) ngx_mutex_unlock(ngx_posted_events_mutex); -#endif + + if (ngx_accept_token) { + *ngx_accept_mutex = 0; + } if (timer && delta) { ngx_event_expire_timers((ngx_msec_t) delta); } -#if (NGX_THREADS0) - if (!ngx_threaded) { +#if (NGX_THREADS) + if (ngx_threaded) { + return NGX_OK; } #endif - /* TODO: non-thread mode only */ - for ( ;; ) { ev = (ngx_event_t *) ngx_posted_events; + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "kevent: posted event " PTR_FMT, ev); + if (ev == NULL) { break; } ngx_posted_events = ev->next; + if ((!ev->posted && !ev->active) + || ev->instance != ev->returned_instance) + { + /* + * the stale event from a file descriptor + * that was just closed in this iteration + */ + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "kevent: stale event " PTR_FMT, ev); + continue; + } + + if (ev->posted) { + ev->posted = 0; + } + ev->event_handler(ev); } @@ -575,8 +621,9 @@ static void ngx_kqueue_thread_handler(ng instance = (uintptr_t) ev & 1; ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1); - if (ev->active == 0 || ev->instance != instance) { - + if ((!ev->posted && !ev->active) + || ev->instance != ev->returned_instance) + { /* * the stale event from a file descriptor * that was just closed in this iteration @@ -587,6 +634,10 @@ static void ngx_kqueue_thread_handler(ng return; } + if (ev->posted) { + ev->posted = 0; + } + ev->event_handler(ev); } diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c --- a/src/event/ngx_event.c +++ b/src/event/ngx_event.c @@ -233,6 +233,8 @@ static int ngx_event_init(ngx_cycle_t *c rev->available = 0; + rev->accept = 1; + #if (HAVE_DEFERRED_ACCEPT) rev->deferred_accept = s[i].deferred_accept; #endif @@ -273,7 +275,9 @@ static int ngx_event_init(ngx_cycle_t *c } else { rev->event_handler = &ngx_event_accept; - ngx_add_event(rev, NGX_READ_EVENT, 0); + if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { + return NGX_ERROR; + } } #else @@ -281,9 +285,13 @@ static int ngx_event_init(ngx_cycle_t *c rev->event_handler = &ngx_event_accept; if (ngx_event_flags & NGX_USE_SIGIO_EVENT) { - ngx_add_conn(c); + if (ngx_add_conn(c) == NGX_ERROR) { + return NGX_ERROR; + } } else { - ngx_add_event(rev, NGX_READ_EVENT, 0); + if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { + return NGX_ERROR; + } } #endif diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -66,6 +66,7 @@ struct ngx_event_s { /* used to detect the stale events in kqueue, rt signals and epoll */ unsigned char instance:1; + unsigned char returned_instance:1; /* * the event was passed or would be passed to a kernel; @@ -75,11 +76,13 @@ struct ngx_event_s { unsigned char disabled:1; + unsigned char posted:1; + /* the ready event; in aio mode 0 means that no operation can be posted */ unsigned char ready:1; /* aio operation is complete */ - unsigned char complete:1; + unsigned short complete:1; unsigned short eof:1; unsigned short error:1; @@ -93,6 +96,8 @@ struct ngx_event_s { unsigned short unexpected_eof:1; + unsigned short accept:1; + unsigned short deferred_accept:1; /* TODO: aio_eof and kq_eof can be the single pending_eof */ @@ -178,7 +183,7 @@ typedef struct { int (*add_conn)(ngx_connection_t *c); int (*del_conn)(ngx_connection_t *c, u_int flags); - int (*process)(ngx_log_t *log); + int (*process)(ngx_cycle_t *cycle); int (*init)(ngx_cycle_t *cycle); void (*done)(ngx_cycle_t *cycle); } ngx_event_actions_t; @@ -391,6 +396,9 @@ extern ngx_thread_volatile ngx_event_t #if (NGX_THREADS) extern ngx_mutex_t *ngx_posted_events_mutex; #endif +extern ngx_atomic_t *ngx_accept_mutex; +extern ngx_uint_t ngx_accept_token; + extern int ngx_event_flags; extern ngx_module_t ngx_events_module; @@ -403,6 +411,10 @@ extern ngx_module_t ngx_event_ void ngx_event_accept(ngx_event_t *ev); +ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle); +ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle); +ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle); + #if (WIN32) void ngx_event_acceptex(ngx_event_t *ev); diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c --- a/src/event/ngx_event_accept.c +++ b/src/event/ngx_event_accept.c @@ -14,9 +14,13 @@ typedef struct { static size_t ngx_accept_log_error(void *data, char *buf, size_t len); +ngx_atomic_t *ngx_accept_mutex; +ngx_uint_t ngx_accept_token; + + void ngx_event_accept(ngx_event_t *ev) { - ngx_uint_t instance, accepted; + ngx_uint_t instance, rinstance, winstance, accepted; socklen_t len; struct sockaddr *sa; ngx_err_t err; @@ -205,6 +209,8 @@ void ngx_event_accept(ngx_event_t *ev) #endif instance = rev->instance; + rinstance = rev->returned_instance; + winstance = wev->returned_instance; ngx_memzero(rev, sizeof(ngx_event_t)); ngx_memzero(wev, sizeof(ngx_event_t)); @@ -217,7 +223,10 @@ void ngx_event_accept(ngx_event_t *ev) c->socklen = len; rev->instance = (u_char) !instance; + rev->returned_instance = (u_char) rinstance; + wev->instance = (u_char) !instance; + wev->returned_instance = (u_char) winstance; rev->index = NGX_INVALID_INDEX; wev->index = NGX_INVALID_INDEX; @@ -295,6 +304,102 @@ void ngx_event_accept(ngx_event_t *ev) } +ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) +{ + if (*ngx_accept_mutex == 0 && ngx_atomic_cmp_set(ngx_accept_mutex, 0, 1)) { + + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "accept mutex locked"); + + if (!ngx_accept_token) { + if (ngx_enable_accept_events(cycle) == NGX_ERROR) { + return NGX_ERROR; + } + + ngx_accept_token = 1; + } + + return NGX_OK; + } + + if (ngx_accept_token) { + if (ngx_disable_accept_events(cycle) == NGX_ERROR) { + return NGX_ERROR; + } + + ngx_accept_token = 0; + } + + return NGX_OK; +} + + +ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle) +{ + ngx_uint_t i; + ngx_listening_t *s; + + s = cycle->listening.elts; + for (i = 0; i < cycle->listening.nelts; i++) { + + /* + * we do not need to handle the Winsock sockets here (divde a socket + * number by 4) because this function would never called + * in the Winsock environment + */ + + if (ngx_event_flags & NGX_USE_SIGIO_EVENT) { + if (ngx_add_conn(&cycle->connections[s[i].fd]) == NGX_ERROR) { + return NGX_ERROR; + } + + } else { + if (ngx_add_event(&cycle->read_events[s[i].fd], NGX_READ_EVENT, 0) + == NGX_ERROR) + { + return NGX_ERROR; + } + } + } + + return NGX_OK; +} + + +ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle) +{ + ngx_uint_t i; + ngx_listening_t *s; + + s = cycle->listening.elts; + for (i = 0; i < cycle->listening.nelts; i++) { + + /* + * we do not need to handle the Winsock sockets here (divde a socket + * number by 4) because this function would never called + * in the Winsock environment + */ + + if (ngx_event_flags & NGX_USE_SIGIO_EVENT) { + if (ngx_del_conn(&cycle->connections[s[i].fd], NGX_DISABLE_EVENT) + == NGX_ERROR) + { + return NGX_ERROR; + } + + } else { + if (ngx_del_event(&cycle->read_events[s[i].fd], NGX_READ_EVENT, + NGX_DISABLE_EVENT) == NGX_ERROR) + { + return NGX_ERROR; + } + } + } + + return NGX_OK; +} + + static size_t ngx_accept_log_error(void *data, char *buf, size_t len) { ngx_accept_log_ctx_t *ctx = data; diff --git a/src/http/ngx_http_request.c b/src/http/ngx_http_request.c --- a/src/http/ngx_http_request.c +++ b/src/http/ngx_http_request.c @@ -90,6 +90,20 @@ void ngx_http_init_connection(ngx_connec if (rev->ready) { /* deferred accept, aio, iocp */ + + if (*ngx_accept_mutex) { + if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { + ngx_http_close_connection(c); + return; + } + + rev->next = ngx_posted_events; + ngx_posted_events = rev; + + ngx_mutex_unlock(ngx_posted_events_mutex); + return; + } + ngx_http_init_request(rev); return; } diff --git a/src/os/unix/ngx_process_cycle.c b/src/os/unix/ngx_process_cycle.c --- a/src/os/unix/ngx_process_cycle.c +++ b/src/os/unix/ngx_process_cycle.c @@ -65,12 +65,26 @@ void ngx_master_process_cycle(ngx_cycle_ signo = 0; live = 0; + ngx_accept_mutex = mmap(NULL, sizeof(ngx_atomic_t), PROT_READ|PROT_WRITE, + MAP_ANON|MAP_SHARED, -1, 0); + + if (ngx_accept_mutex == NULL) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, + "mmap(MAP_ANON|MAP_SHARED) failed"); + /* fatal */ + exit(2); + } + for ( ;; ) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "new cycle"); ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); + if (ccf->worker_processes == NGX_CONF_UNSET) { + ccf->worker_processes = 1; + } + if (ngx_process == NGX_PROCESS_MASTER) { for (i = 0; i < (ngx_uint_t) ccf->worker_processes; i++) { ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL, @@ -150,7 +164,7 @@ void ngx_master_process_cycle(ngx_cycle_ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); - ngx_process_events(cycle->log); + ngx_process_events(cycle); live = 0; } @@ -361,6 +375,10 @@ static void ngx_worker_process_cycle(ngx ngx_process = NGX_PROCESS_WORKER; ngx_last_process = 0; + if (ngx_accept_mutex) { + ngx_accept_token = 1; + } + ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); if (ccf->group != (gid_t) NGX_CONF_UNSET) { @@ -442,7 +460,7 @@ static void ngx_worker_process_cycle(ngx for ( ;; ) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); - ngx_process_events(cycle->log); + ngx_process_events(cycle); if (ngx_terminate) { ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting"); @@ -473,7 +491,7 @@ static void ngx_worker_process_cycle(ngx ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); - ngx_process_events(cycle->log); + ngx_process_events(cycle); if (ngx_reopen) { ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reopen logs");