Mercurial > hg > nginx
diff src/event/modules/ngx_kqueue_module.c @ 380:5ce6561246a5
nginx-0.0.7-2004-07-07-10:15:04 import
author | Igor Sysoev <igor@sysoev.ru> |
---|---|
date | Wed, 07 Jul 2004 06:15:04 +0000 |
parents | 73688d5d7fc3 |
children | 02a511569afb |
line wrap: on
line diff
--- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -21,6 +21,7 @@ static void ngx_kqueue_done(ngx_cycle_t static ngx_int_t ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags); +static ngx_int_t ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try); static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle); static ngx_inline void ngx_kqueue_dump_event(ngx_log_t *log, struct kevent *kev); @@ -31,9 +32,23 @@ static char *ngx_kqueue_init_conf(ngx_cy int ngx_kqueue = -1; -static struct kevent *change_list, *event_list; +/* + * The "change_list" should be declared as ngx_thread_volatile. + * However, the use of the change_list is localized in kqueue functions and + * is protected by the mutex so even the "icc -ipo" should not build the code + * with the race condition. Thus we avoid the declaration to make a more + * readable code. + */ + +static struct kevent *change_list, *change_list0, *change_list1; +static struct kevent *event_list; static int max_changes, nchanges, nevents; +#if (NGX_THREADS) +static ngx_mutex_t *ngx_kqueue_mutex; +#endif + + static ngx_str_t kqueue_name = ngx_string("kqueue"); @@ -69,6 +84,7 @@ ngx_event_module_t ngx_kqueue_module_ct ngx_kqueue_del_event, /* disable an event */ NULL, /* add an connection */ NULL, /* delete an connection */ + ngx_kqueue_process_changes, /* process the changes */ ngx_kqueue_process_events, /* process the events */ ngx_kqueue_init, /* init the events */ ngx_kqueue_done /* done the events */ @@ -82,7 +98,7 @@ ngx_module_t ngx_kqueue_module = { ngx_kqueue_commands, /* module directives */ NGX_EVENT_MODULE, /* module type */ NULL, /* init module */ - NULL /* init child */ + NULL /* init process */ }; @@ -102,6 +118,12 @@ static ngx_int_t ngx_kqueue_init(ngx_cyc "kqueue() failed"); return NGX_ERROR; } + +#if (NGX_THREADS) + if (!(ngx_kqueue_mutex = ngx_mutex_init(cycle->log, 0))) { + return NGX_ERROR; + } +#endif } if (max_changes < kcf->changes) { @@ -117,15 +139,27 @@ static ngx_int_t ngx_kqueue_init(ngx_cyc nchanges = 0; } - if (change_list) { - ngx_free(change_list); + if (change_list0) { + ngx_free(change_list0); + } + + change_list0 = ngx_alloc(kcf->changes * sizeof(struct kevent), + cycle->log); + if (change_list0 == NULL) { + return NGX_ERROR; } - change_list = ngx_alloc(kcf->changes * sizeof(struct kevent), - cycle->log); - if (change_list == NULL) { + if (change_list1) { + ngx_free(change_list1); + } + + change_list1 = ngx_alloc(kcf->changes * sizeof(struct kevent), + cycle->log); + if (change_list1 == NULL) { return NGX_ERROR; } + + change_list = change_list0; } max_changes = kcf->changes; @@ -135,8 +169,7 @@ static ngx_int_t ngx_kqueue_init(ngx_cyc ngx_free(event_list); } - event_list = ngx_alloc(kcf->events * sizeof(struct kevent), - cycle->log); + event_list = ngx_alloc(kcf->events * sizeof(struct kevent), cycle->log); if (event_list == NULL) { return NGX_ERROR; } @@ -172,9 +205,14 @@ static void ngx_kqueue_done(ngx_cycle_t ngx_kqueue = -1; - ngx_free(change_list); + ngx_mutex_destroy(ngx_kqueue_mutex); + + ngx_free(change_list1); + ngx_free(change_list0); ngx_free(event_list); + change_list1 = NULL; + change_list0 = NULL; change_list = NULL; event_list = NULL; max_changes = 0; @@ -185,6 +223,7 @@ static void ngx_kqueue_done(ngx_cycle_t static ngx_int_t ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags) { + ngx_int_t rc; ngx_event_t *e; ngx_connection_t *c; @@ -192,8 +231,11 @@ static ngx_int_t ngx_kqueue_add_event(ng ev->disabled = 0; ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0; - if (ngx_thread_main() - && nchanges > 0 + if (ngx_mutex_lock(ngx_kqueue_mutex) == NGX_ERROR) { + return NGX_ERROR; + } + + if (nchanges > 0 && ev->index < (u_int) nchanges && ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1) == (uintptr_t) ev) @@ -215,29 +257,42 @@ static ngx_int_t ngx_kqueue_add_event(ng e->index = ev->index; } + ngx_mutex_unlock(ngx_kqueue_mutex); + return NGX_OK; } c = ev->data; + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "previous event on #%d were not passed in kernel", c->fd); + ngx_mutex_unlock(ngx_kqueue_mutex); + return NGX_ERROR; } - return ngx_kqueue_set_event(ev, event, EV_ADD|EV_ENABLE|flags); + rc = ngx_kqueue_set_event(ev, event, EV_ADD|EV_ENABLE|flags); + + ngx_mutex_unlock(ngx_kqueue_mutex); + + return rc; } static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags) { + ngx_int_t rc; ngx_event_t *e; ev->active = 0; ev->disabled = 0; - if (ngx_thread_main() - && nchanges > 0 + if (ngx_mutex_lock(ngx_kqueue_mutex) == NGX_ERROR) { + return NGX_ERROR; + } + + if (nchanges > 0 && ev->index < (u_int) nchanges && ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1) == (uintptr_t) ev) @@ -254,6 +309,8 @@ static ngx_int_t ngx_kqueue_del_event(ng e->index = ev->index; } + ngx_mutex_unlock(ngx_kqueue_mutex); + return NGX_OK; } @@ -264,6 +321,7 @@ static ngx_int_t ngx_kqueue_del_event(ng */ if (flags & NGX_CLOSE_EVENT) { + ngx_mutex_unlock(ngx_kqueue_mutex); return NGX_OK; } @@ -271,14 +329,18 @@ static ngx_int_t ngx_kqueue_del_event(ng ev->disabled = 1; } - return ngx_kqueue_set_event(ev, event, + rc = ngx_kqueue_set_event(ev, event, flags & NGX_DISABLE_EVENT ? EV_DISABLE : EV_DELETE); + + ngx_mutex_unlock(ngx_kqueue_mutex); + + return rc; } static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags) { - struct kevent *kev, kv; + struct kevent *kev; struct timespec ts; ngx_connection_t *c; @@ -288,7 +350,7 @@ static ngx_int_t ngx_kqueue_set_event(ng "kevent set event: %d: ft:%d fl:%04X", c->fd, filter, flags); - if (ngx_thread_main() && nchanges >= max_changes) { + if (nchanges >= max_changes) { ngx_log_error(NGX_LOG_WARN, ev->log, 0, "kqueue change list is filled up"); @@ -303,7 +365,7 @@ static ngx_int_t ngx_kqueue_set_event(ng nchanges = 0; } - kev = ngx_thread_main() ? &change_list[nchanges] : &kv; + kev = &change_list[nchanges]; kev->ident = c->fd; kev->filter = filter; @@ -336,19 +398,8 @@ static ngx_int_t ngx_kqueue_set_event(ng #endif } - if (ngx_thread_main()) { - ev->index = nchanges; - nchanges++; - - } else { - ts.tv_sec = 0; - ts.tv_nsec = 0; - - if (kevent(ngx_kqueue, &kv, 1, NULL, 0, &ts) == -1) { - ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "kevent() failed"); - return NGX_ERROR; - } - } + ev->index = nchanges; + nchanges++; return NGX_OK; } @@ -356,7 +407,7 @@ static ngx_int_t ngx_kqueue_set_event(ng static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle) { - int events; + int events, n; ngx_int_t i, instance; ngx_uint_t lock, accept_lock, expire; ngx_err_t err; @@ -419,6 +470,19 @@ static ngx_int_t ngx_kqueue_process_even } } + if (ngx_threaded) { + if (ngx_kqueue_process_changes(cycle, 0) == NGX_ERROR) { + ngx_accept_mutex_unlock(); + return NGX_ERROR; + } + + n = 0; + + } else { + n = nchanges; + nchanges = 0; + } + if (timer == NGX_TIMER_INFINITE) { tp = NULL; expire = 0; @@ -429,10 +493,10 @@ static ngx_int_t ngx_kqueue_process_even tp = &ts; } - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "kevent timer: %d", timer); + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "kevent timer: %d, changes: %d", timer, n); - events = kevent(ngx_kqueue, change_list, nchanges, event_list, nevents, tp); + events = kevent(ngx_kqueue, change_list, n, event_list, nevents, tp); if (events == -1) { err = ngx_errno; @@ -440,8 +504,6 @@ static ngx_int_t ngx_kqueue_process_even err = 0; } - nchanges = 0; - ngx_gettimeofday(&tv); ngx_time_update(tv.tv_sec); @@ -629,6 +691,63 @@ static ngx_int_t ngx_kqueue_process_even } +static ngx_int_t ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try) +{ + int n; + ngx_int_t rc; + ngx_err_t err; + struct timespec ts; + struct kevent *changes; + + if (try) { + rc = ngx_mutex_trylock(ngx_kqueue_mutex); + if (rc != NGX_OK) { + return rc; + } + + } else { + if (ngx_mutex_lock(ngx_kqueue_mutex) == NGX_ERROR) { + return NGX_ERROR; + } + } + + if (nchanges == 0) { + ngx_mutex_unlock(ngx_kqueue_mutex); + return NGX_OK; + } + + changes = (struct kevent *) change_list; + if (change_list == change_list0) { + change_list = change_list1; + } else { + change_list = change_list0; + } + + n = nchanges; + nchanges = 0; + + ts.tv_sec = 0; + ts.tv_nsec = 0; + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "kevent changes: %d", n); + + if (kevent(ngx_kqueue, changes, n, NULL, 0, &ts) == -1) { + err = ngx_errno; + ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, + cycle->log, err, "kevent() failed"); + rc = NGX_ERROR; + + } else { + rc = NGX_OK; + } + + ngx_mutex_unlock(ngx_kqueue_mutex); + + return rc; +} + + static ngx_inline void ngx_kqueue_dump_event(ngx_log_t *log, struct kevent *kev) { ngx_log_debug6(NGX_LOG_DEBUG_EVENT, log, 0,