diff src/event/modules/ngx_kqueue_module.c @ 380:5ce6561246a5

nginx-0.0.7-2004-07-07-10:15:04 import
author Igor Sysoev <igor@sysoev.ru>
date Wed, 07 Jul 2004 06:15:04 +0000
parents 73688d5d7fc3
children 02a511569afb
line wrap: on
line diff
--- a/src/event/modules/ngx_kqueue_module.c
+++ b/src/event/modules/ngx_kqueue_module.c
@@ -21,6 +21,7 @@ static void ngx_kqueue_done(ngx_cycle_t 
 static ngx_int_t ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags);
 static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags);
 static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags);
+static ngx_int_t ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try);
 static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle);
 static ngx_inline void ngx_kqueue_dump_event(ngx_log_t *log,
                                              struct kevent *kev);
@@ -31,9 +32,23 @@ static char *ngx_kqueue_init_conf(ngx_cy
 
 int                    ngx_kqueue = -1;
 
-static struct kevent  *change_list, *event_list;
+/*
+ * The "change_list" should be declared as ngx_thread_volatile.
+ * However, the use of the change_list is localized in kqueue functions and
+ * is protected by the mutex so even the "icc -ipo" should not build the code
+ * with the race condition.  Thus we avoid the declaration to make a more
+ * readable code.
+ */
+
+static struct kevent  *change_list, *change_list0, *change_list1;
+static struct kevent  *event_list;
 static int             max_changes, nchanges, nevents;
 
+#if (NGX_THREADS)
+static ngx_mutex_t    *ngx_kqueue_mutex;
+#endif
+
+
 
 static ngx_str_t      kqueue_name = ngx_string("kqueue");
 
@@ -69,6 +84,7 @@ ngx_event_module_t  ngx_kqueue_module_ct
         ngx_kqueue_del_event,              /* disable an event */
         NULL,                              /* add an connection */
         NULL,                              /* delete an connection */
+        ngx_kqueue_process_changes,        /* process the changes */
         ngx_kqueue_process_events,         /* process the events */
         ngx_kqueue_init,                   /* init the events */
         ngx_kqueue_done                    /* done the events */
@@ -82,7 +98,7 @@ ngx_module_t  ngx_kqueue_module = {
     ngx_kqueue_commands,                   /* module directives */
     NGX_EVENT_MODULE,                      /* module type */
     NULL,                                  /* init module */
-    NULL                                   /* init child */
+    NULL                                   /* init process */
 };
 
 
@@ -102,6 +118,12 @@ static ngx_int_t ngx_kqueue_init(ngx_cyc
                           "kqueue() failed");
             return NGX_ERROR;
         }
+
+#if (NGX_THREADS)
+        if (!(ngx_kqueue_mutex = ngx_mutex_init(cycle->log, 0))) {
+            return NGX_ERROR;
+        }
+#endif
     }
 
     if (max_changes < kcf->changes) {
@@ -117,15 +139,27 @@ static ngx_int_t ngx_kqueue_init(ngx_cyc
             nchanges = 0;
         }
 
-        if (change_list) {
-            ngx_free(change_list);
+        if (change_list0) {
+            ngx_free(change_list0);
+        }
+
+        change_list0 = ngx_alloc(kcf->changes * sizeof(struct kevent),
+                                 cycle->log);
+        if (change_list0 == NULL) {
+            return NGX_ERROR;
         }
 
-        change_list = ngx_alloc(kcf->changes * sizeof(struct kevent),
-                                cycle->log);
-        if (change_list == NULL) {
+        if (change_list1) {
+            ngx_free(change_list1);
+        }
+
+        change_list1 = ngx_alloc(kcf->changes * sizeof(struct kevent),
+                                 cycle->log);
+        if (change_list1 == NULL) {
             return NGX_ERROR;
         }
+
+        change_list = change_list0;
     }
 
     max_changes = kcf->changes;
@@ -135,8 +169,7 @@ static ngx_int_t ngx_kqueue_init(ngx_cyc
             ngx_free(event_list);
         }
 
-        event_list = ngx_alloc(kcf->events * sizeof(struct kevent),
-                                cycle->log);
+        event_list = ngx_alloc(kcf->events * sizeof(struct kevent), cycle->log);
         if (event_list == NULL) {
             return NGX_ERROR;
         }
@@ -172,9 +205,14 @@ static void ngx_kqueue_done(ngx_cycle_t 
 
     ngx_kqueue = -1;
 
-    ngx_free(change_list);
+    ngx_mutex_destroy(ngx_kqueue_mutex);
+
+    ngx_free(change_list1);
+    ngx_free(change_list0);
     ngx_free(event_list);
 
+    change_list1 = NULL;
+    change_list0 = NULL;
     change_list = NULL;
     event_list = NULL;
     max_changes = 0;
@@ -185,6 +223,7 @@ static void ngx_kqueue_done(ngx_cycle_t 
 
 static ngx_int_t ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags)
 {
+    ngx_int_t          rc;
     ngx_event_t       *e;
     ngx_connection_t  *c;
 
@@ -192,8 +231,11 @@ static ngx_int_t ngx_kqueue_add_event(ng
     ev->disabled = 0;
     ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
 
-    if (ngx_thread_main()
-        && nchanges > 0
+    if (ngx_mutex_lock(ngx_kqueue_mutex) == NGX_ERROR) {
+        return NGX_ERROR;
+    }
+
+    if (nchanges > 0
         && ev->index < (u_int) nchanges
         && ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1)
                                                              == (uintptr_t) ev)
@@ -215,29 +257,42 @@ static ngx_int_t ngx_kqueue_add_event(ng
                 e->index = ev->index;
             }
 
+            ngx_mutex_unlock(ngx_kqueue_mutex);
+
             return NGX_OK;
         }
 
         c = ev->data;
+
         ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
                       "previous event on #%d were not passed in kernel", c->fd);
 
+        ngx_mutex_unlock(ngx_kqueue_mutex);
+
         return NGX_ERROR;
     }
 
-    return ngx_kqueue_set_event(ev, event, EV_ADD|EV_ENABLE|flags);
+    rc = ngx_kqueue_set_event(ev, event, EV_ADD|EV_ENABLE|flags);
+
+    ngx_mutex_unlock(ngx_kqueue_mutex);
+
+    return rc;
 }
 
 
 static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags)
 {
+    ngx_int_t     rc;
     ngx_event_t  *e;
 
     ev->active = 0;
     ev->disabled = 0;
 
-    if (ngx_thread_main()
-        && nchanges > 0
+    if (ngx_mutex_lock(ngx_kqueue_mutex) == NGX_ERROR) {
+        return NGX_ERROR;
+    }
+
+    if (nchanges > 0
         && ev->index < (u_int) nchanges
         && ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1)
                                                              == (uintptr_t) ev)
@@ -254,6 +309,8 @@ static ngx_int_t ngx_kqueue_del_event(ng
             e->index = ev->index;
         }
 
+        ngx_mutex_unlock(ngx_kqueue_mutex);
+
         return NGX_OK;
     }
 
@@ -264,6 +321,7 @@ static ngx_int_t ngx_kqueue_del_event(ng
      */
 
     if (flags & NGX_CLOSE_EVENT) {
+        ngx_mutex_unlock(ngx_kqueue_mutex);
         return NGX_OK;
     }
 
@@ -271,14 +329,18 @@ static ngx_int_t ngx_kqueue_del_event(ng
         ev->disabled = 1;
     }
 
-    return ngx_kqueue_set_event(ev, event,
+    rc = ngx_kqueue_set_event(ev, event,
                            flags & NGX_DISABLE_EVENT ? EV_DISABLE : EV_DELETE);
+
+    ngx_mutex_unlock(ngx_kqueue_mutex);
+
+    return rc;
 }
 
 
 static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags)
 {
-    struct kevent     *kev, kv;
+    struct kevent     *kev;
     struct timespec    ts;
     ngx_connection_t  *c;
 
@@ -288,7 +350,7 @@ static ngx_int_t ngx_kqueue_set_event(ng
                    "kevent set event: %d: ft:%d fl:%04X",
                    c->fd, filter, flags);
 
-    if (ngx_thread_main() && nchanges >= max_changes) {
+    if (nchanges >= max_changes) {
         ngx_log_error(NGX_LOG_WARN, ev->log, 0,
                       "kqueue change list is filled up");
 
@@ -303,7 +365,7 @@ static ngx_int_t ngx_kqueue_set_event(ng
         nchanges = 0;
     }
 
-    kev = ngx_thread_main() ? &change_list[nchanges] : &kv;
+    kev = &change_list[nchanges];
 
     kev->ident = c->fd;
     kev->filter = filter;
@@ -336,19 +398,8 @@ static ngx_int_t ngx_kqueue_set_event(ng
 #endif
     }
 
-    if (ngx_thread_main()) {
-        ev->index = nchanges;
-        nchanges++;
-
-    } else {
-        ts.tv_sec = 0;
-        ts.tv_nsec = 0;
-
-        if (kevent(ngx_kqueue, &kv, 1, NULL, 0, &ts) == -1) {
-            ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "kevent() failed");
-            return NGX_ERROR;
-        }
-    }
+    ev->index = nchanges;
+    nchanges++;
 
     return NGX_OK;
 }
@@ -356,7 +407,7 @@ static ngx_int_t ngx_kqueue_set_event(ng
 
 static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle)
 {
-    int                events;
+    int                events, n;
     ngx_int_t          i, instance;
     ngx_uint_t         lock, accept_lock, expire;
     ngx_err_t          err;
@@ -419,6 +470,19 @@ static ngx_int_t ngx_kqueue_process_even
         }
     }
 
+    if (ngx_threaded) {
+        if (ngx_kqueue_process_changes(cycle, 0) == NGX_ERROR) {
+            ngx_accept_mutex_unlock();
+            return NGX_ERROR;
+        }
+
+        n = 0;
+
+    } else {
+        n = nchanges;
+        nchanges = 0;
+    }
+
     if (timer == NGX_TIMER_INFINITE) {
         tp = NULL;
         expire = 0;
@@ -429,10 +493,10 @@ static ngx_int_t ngx_kqueue_process_even
         tp = &ts;
     }
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
-                   "kevent timer: %d", timer);
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                   "kevent timer: %d, changes: %d", timer, n);
 
-    events = kevent(ngx_kqueue, change_list, nchanges, event_list, nevents, tp);
+    events = kevent(ngx_kqueue, change_list, n, event_list, nevents, tp);
 
     if (events == -1) {
         err = ngx_errno;
@@ -440,8 +504,6 @@ static ngx_int_t ngx_kqueue_process_even
         err = 0;
     }
 
-    nchanges = 0;
-
     ngx_gettimeofday(&tv);
     ngx_time_update(tv.tv_sec);
 
@@ -629,6 +691,63 @@ static ngx_int_t ngx_kqueue_process_even
 }
 
 
+static ngx_int_t ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try)
+{
+    int              n;
+    ngx_int_t        rc;
+    ngx_err_t        err;
+    struct timespec  ts;
+    struct kevent   *changes;
+
+    if (try) {
+        rc = ngx_mutex_trylock(ngx_kqueue_mutex);
+        if (rc != NGX_OK) {
+            return rc;
+        }
+
+    } else {
+        if (ngx_mutex_lock(ngx_kqueue_mutex) == NGX_ERROR) {
+            return NGX_ERROR;
+        }
+    }
+
+    if (nchanges == 0) {
+        ngx_mutex_unlock(ngx_kqueue_mutex);
+        return NGX_OK;
+    }
+
+    changes = (struct kevent *) change_list;
+    if (change_list == change_list0) {
+        change_list = change_list1;
+    } else {
+        change_list = change_list0;
+    }
+
+    n = nchanges;
+    nchanges = 0;
+
+    ts.tv_sec = 0;
+    ts.tv_nsec = 0;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                   "kevent changes: %d", n);
+
+    if (kevent(ngx_kqueue, changes, n, NULL, 0, &ts) == -1) {
+        err = ngx_errno;
+        ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT,
+                      cycle->log, err, "kevent() failed");
+        rc = NGX_ERROR;
+
+    } else {
+        rc = NGX_OK;
+    }
+
+    ngx_mutex_unlock(ngx_kqueue_mutex);
+
+    return rc;
+}
+
+
 static ngx_inline void ngx_kqueue_dump_event(ngx_log_t *log, struct kevent *kev)
 {
     ngx_log_debug6(NGX_LOG_DEBUG_EVENT, log, 0,