changeset 301:744965ec6275

nginx-0.0.3-2004-03-31-19:26:46 import
author Igor Sysoev <igor@sysoev.ru>
date Wed, 31 Mar 2004 15:26:46 +0000
parents 502b03d9d2a3
children 1526e7686b20
files src/core/nginx.c src/core/ngx_atomic.h src/core/ngx_file.c src/event/modules/ngx_kqueue_module.c src/event/ngx_event.c src/event/ngx_event.h src/event/ngx_event_accept.c src/http/ngx_http_request.c src/os/unix/ngx_process_cycle.c
diffstat 9 files changed, 265 insertions(+), 50 deletions(-) [+]
line wrap: on
line diff
--- a/src/core/nginx.c
+++ b/src/core/nginx.c
@@ -174,10 +174,6 @@ int main(int argc, char *const *argv)
         }
     }
 
-    if (ccf->worker_processes == NGX_CONF_UNSET) {
-        ccf->worker_processes = 1;
-    }
-
     if (ccf->pid.len == 0) {
         ccf->pid.len = sizeof(NGINX_PID) - 1;
         ccf->pid.data = NGINX_PID;
--- a/src/core/ngx_atomic.h
+++ b/src/core/ngx_atomic.h
@@ -25,6 +25,7 @@ static ngx_inline uint32_t ngx_atomic_in
 
         NGX_SMP_LOCK
     "   xaddl  %0, %2;   "
+    "   incl   %0;       "
 
     : "=q" (old) : "0" (1), "m" (*value));
 
@@ -40,6 +41,7 @@ static ngx_inline uint32_t ngx_atomic_de
 
         NGX_SMP_LOCK
     "   xaddl  %0, %1;   "
+    "   decl   %0;       "
 
     : "=q" (old) : "0" (-1), "m" (*value));
 
@@ -65,6 +67,15 @@ static ngx_inline uint32_t ngx_atomic_cm
     return res;
 }
 
+
+#elif (WIN32)
+
+#define ngx_atomic_inc(x)                    InterlockedIncrement
+#define ngx_atomic_dec(x)                    InterlockedDecrement
+#define ngx_atomic_cmp_set(lock, old, set)                                   \
+                                  InterlockedCompareExchange(lock, set, old)
+
+
 #else
 
 typedef volatile uint32_t  ngx_atomic_t;
--- a/src/core/ngx_file.c
+++ b/src/core/ngx_file.c
@@ -56,7 +56,7 @@ int ngx_create_temp_file(ngx_file_t *fil
 
         ngx_create_hashed_filename(file, path);
 
-#if 0
+#if 1
         file->fd = ngx_open_tempfile(file->name.data, persistent);
 #else
         file->fd = ngx_open_tempfile(file->name.data, 1);
--- a/src/event/modules/ngx_kqueue_module.c
+++ b/src/event/modules/ngx_kqueue_module.c
@@ -21,7 +21,7 @@ static void ngx_kqueue_done(ngx_cycle_t 
 static int ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags);
 static int ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags);
 static int ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags);
-static int ngx_kqueue_process_events(ngx_log_t *log);
+static int ngx_kqueue_process_events(ngx_cycle_t *cycle);
 #if (NGX_THREADS)
 static void ngx_kqueue_thread_handler(ngx_event_t *ev);
 #endif
@@ -343,10 +343,10 @@ static int ngx_kqueue_set_event(ngx_even
 }
 
 
-static ngx_int_t ngx_kqueue_process_events(ngx_log_t *log)
+static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle)
 {
     int                events;
-    ngx_int_t          instance, i;
+    ngx_int_t          i, instance;
     ngx_err_t          err;
     ngx_msec_t         timer;
     ngx_event_t       *ev;
@@ -370,6 +370,18 @@ static ngx_int_t ngx_kqueue_process_even
 
     ngx_old_elapsed_msec = ngx_elapsed_msec;
 
+    if (ngx_accept_mutex) {
+        if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
+            return NGX_ERROR;
+        }
+
+#if 1
+        if (ngx_accept_token == 0 && timer == 0) {
+            /* STUB */ timer = 500;
+        }
+#endif
+    }
+
     if (timer) {
         ts.tv_sec = timer / 1000;
         ts.tv_nsec = (timer % 1000) * 1000000;
@@ -379,7 +391,8 @@ static ngx_int_t ngx_kqueue_process_even
         tp = NULL;
     }
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, "kevent timer: %d", timer);
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                   "kevent timer: %d", timer);
 
     events = kevent(ngx_kqueue, change_list, nchanges, event_list, nevents, tp);
 
@@ -394,40 +407,54 @@ static ngx_int_t ngx_kqueue_process_even
     ngx_gettimeofday(&tv);
     ngx_time_update(tv.tv_sec);
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, "kevent events: %d", events);
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                   "kevent events: %d", events);
 
     delta = ngx_elapsed_msec;
     ngx_elapsed_msec = tv.tv_sec * 1000 + tv.tv_usec / 1000 - ngx_start_msec;
 
     if (err) {
         ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT,
-                      log, err, "kevent() failed");
+                      cycle->log, err, "kevent() failed");
+
+        if (ngx_accept_token) {
+            *ngx_accept_mutex = 0;
+        }
+
         return NGX_ERROR;
     }
 
     if (timer) {
         delta = ngx_elapsed_msec - delta;
 
-        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, log, 0,
+        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                        "kevent timer: %d, delta: %d", timer, (int) delta);
 
     } else {
         if (events == 0) {
-            ngx_log_error(NGX_LOG_ALERT, log, 0,
+            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                           "kevent() returned no events without timeout");
+
+            if (ngx_accept_token) {
+                *ngx_accept_mutex = 0;
+            }
+
             return NGX_ERROR;
         }
     }
 
-#if (NGX_THREADS0)
     if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+
+        if (ngx_accept_token) {
+            *ngx_accept_mutex = 0;
+        }
+
         return NGX_ERROR;
     }
-#endif
 
     for (i = 0; i < events; i++) {
 
-        ngx_log_debug6(NGX_LOG_DEBUG_EVENT, log, 0,
+        ngx_log_debug6(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
 
                        (event_list[i].ident > 0x8000000
                         && event_list[i].ident != (unsigned) -1) ?
@@ -440,7 +467,7 @@ static ngx_int_t ngx_kqueue_process_even
                         event_list[i].data, event_list[i].udata);
 
         if (event_list[i].flags & EV_ERROR) {
-            ngx_log_error(NGX_LOG_ALERT, log, event_list[i].data,
+            ngx_log_error(NGX_LOG_ALERT, cycle->log, event_list[i].data,
                           "kevent() error on %d", event_list[i].ident);
             continue;
         }
@@ -454,15 +481,16 @@ static ngx_int_t ngx_kqueue_process_even
 
             instance = (uintptr_t) ev & 1;
             ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1);
+            ev->returned_instance = instance;
 
-            if (ev->active == 0 || ev->instance != instance) {
+            if (!ev->active || ev->instance != instance) {
 
                 /*
                  * the stale event from a file descriptor
                  * that was just closed in this iteration
                  */
 
-                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0,
+                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                                "kevent: stale event " PTR_FMT, ev);
                 continue;
             }
@@ -494,30 +522,29 @@ static ngx_int_t ngx_kqueue_process_even
             break;
 
         default:
-            ngx_log_error(NGX_LOG_ALERT, log, 0,
+            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                           "unexpected kevent() filter %d",
                           event_list[i].filter);
             continue;
         }
 
-#if (NGX_THREADS0)
-
-        if (ngx_threaded) {
-
-            if (ev->light) {
 
-                /*
-                 * The light events are the accept event,
-                 * or the event that waits in the mutex queue - we need to
-                 * remove it from the mutex queue before the inserting into
-                 * the posted events queue.
-                 */
+#if 0
+        if (ngx_threaded || ngx_accept_token) {
+#endif
+        if (ngx_accept_token) {
 
+            if (ev->accept) {
                 ngx_mutex_unlock(ngx_posted_events_mutex);
 
                 ev->event_handler(ev);
 
                 if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+
+                    if (ngx_accept_token) {
+                        *ngx_accept_mutex = 0;
+                    }
+
                     return NGX_ERROR;
                 }
 
@@ -529,36 +556,55 @@ static ngx_int_t ngx_kqueue_process_even
             continue;
         }
 
-#endif
-
         ev->event_handler(ev);
     }
 
-#if (NGX_THREADS0)
     ngx_mutex_unlock(ngx_posted_events_mutex);
-#endif
+
+    if (ngx_accept_token) {
+        *ngx_accept_mutex = 0;
+    }
 
     if (timer && delta) {
         ngx_event_expire_timers((ngx_msec_t) delta);
     }
 
-#if (NGX_THREADS0)
-    if (!ngx_threaded) {
+#if (NGX_THREADS)
+    if (ngx_threaded) {
+        return NGX_OK;
     }
 #endif
 
-    /* TODO: non-thread mode only */
-
     for ( ;; ) {
 
         ev = (ngx_event_t *) ngx_posted_events;
 
+        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                      "kevent: posted event " PTR_FMT, ev);
+
         if (ev == NULL) {
             break;
         }
 
         ngx_posted_events = ev->next;
 
+        if ((!ev->posted && !ev->active)
+            || ev->instance != ev->returned_instance)
+        {
+            /*
+             * the stale event from a file descriptor
+             * that was just closed in this iteration
+             */
+
+            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                          "kevent: stale event " PTR_FMT, ev);
+            continue;
+        }
+
+        if (ev->posted) {
+            ev->posted = 0;
+        }
+
         ev->event_handler(ev);
     }
 
@@ -575,8 +621,9 @@ static void ngx_kqueue_thread_handler(ng
     instance = (uintptr_t) ev & 1;
     ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1);
 
-    if (ev->active == 0 || ev->instance != instance) {
-
+    if ((!ev->posted && !ev->active)
+        || ev->instance != ev->returned_instance)
+    {
         /*
          * the stale event from a file descriptor
          * that was just closed in this iteration
@@ -587,6 +634,10 @@ static void ngx_kqueue_thread_handler(ng
         return;
     }
 
+    if (ev->posted) {
+        ev->posted = 0;
+    }
+
     ev->event_handler(ev);
 }
 
--- a/src/event/ngx_event.c
+++ b/src/event/ngx_event.c
@@ -233,6 +233,8 @@ static int ngx_event_init(ngx_cycle_t *c
 
         rev->available = 0;
 
+        rev->accept = 1;
+
 #if (HAVE_DEFERRED_ACCEPT)
         rev->deferred_accept = s[i].deferred_accept;
 #endif
@@ -273,7 +275,9 @@ static int ngx_event_init(ngx_cycle_t *c
 
         } else {
             rev->event_handler = &ngx_event_accept;
-            ngx_add_event(rev, NGX_READ_EVENT, 0);
+            if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
+                return NGX_ERROR;
+            }
         }
 
 #else
@@ -281,9 +285,13 @@ static int ngx_event_init(ngx_cycle_t *c
         rev->event_handler = &ngx_event_accept;
 
         if (ngx_event_flags & NGX_USE_SIGIO_EVENT) {
-            ngx_add_conn(c);
+            if (ngx_add_conn(c) == NGX_ERROR) {
+                return NGX_ERROR;
+            }
         } else {
-            ngx_add_event(rev, NGX_READ_EVENT, 0);
+            if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
+                return NGX_ERROR;
+            }
         }
 
 #endif
--- a/src/event/ngx_event.h
+++ b/src/event/ngx_event.h
@@ -66,6 +66,7 @@ struct ngx_event_s {
 
     /* used to detect the stale events in kqueue, rt signals and epoll */
     unsigned char    instance:1;
+    unsigned char    returned_instance:1;
 
     /*
      * the event was passed or would be passed to a kernel;
@@ -75,11 +76,13 @@ struct ngx_event_s {
 
     unsigned char    disabled:1;
 
+    unsigned char    posted:1;
+
     /* the ready event; in aio mode 0 means that no operation can be posted */
     unsigned char    ready:1;
 
     /* aio operation is complete */
-    unsigned char    complete:1;
+    unsigned short   complete:1;
 
     unsigned short   eof:1;
     unsigned short   error:1;
@@ -93,6 +96,8 @@ struct ngx_event_s {
 
     unsigned short   unexpected_eof:1;
 
+    unsigned short   accept:1;
+
     unsigned short   deferred_accept:1;
 
     /* TODO: aio_eof and kq_eof can be the single pending_eof */
@@ -178,7 +183,7 @@ typedef struct {
     int   (*add_conn)(ngx_connection_t *c);
     int   (*del_conn)(ngx_connection_t *c, u_int flags);
 
-    int   (*process)(ngx_log_t *log);
+    int   (*process)(ngx_cycle_t *cycle);
     int   (*init)(ngx_cycle_t *cycle);
     void  (*done)(ngx_cycle_t *cycle);
 } ngx_event_actions_t;
@@ -391,6 +396,9 @@ extern ngx_thread_volatile ngx_event_t  
 #if (NGX_THREADS)
 extern ngx_mutex_t           *ngx_posted_events_mutex;
 #endif
+extern ngx_atomic_t          *ngx_accept_mutex;
+extern ngx_uint_t             ngx_accept_token;
+
 
 extern int                    ngx_event_flags;
 extern ngx_module_t           ngx_events_module;
@@ -403,6 +411,10 @@ extern ngx_module_t           ngx_event_
 
 
 void ngx_event_accept(ngx_event_t *ev);
+ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle);
+ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle);
+ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle);
+
 
 #if (WIN32)
 void ngx_event_acceptex(ngx_event_t *ev);
--- a/src/event/ngx_event_accept.c
+++ b/src/event/ngx_event_accept.c
@@ -14,9 +14,13 @@ typedef struct {
 static size_t ngx_accept_log_error(void *data, char *buf, size_t len);
 
 
+ngx_atomic_t  *ngx_accept_mutex;
+ngx_uint_t     ngx_accept_token;
+
+
 void ngx_event_accept(ngx_event_t *ev)
 {
-    ngx_uint_t             instance, accepted;
+    ngx_uint_t             instance, rinstance, winstance, accepted;
     socklen_t              len;
     struct sockaddr       *sa;
     ngx_err_t              err;
@@ -205,6 +209,8 @@ void ngx_event_accept(ngx_event_t *ev)
 #endif
 
         instance = rev->instance;
+        rinstance = rev->returned_instance;
+        winstance = wev->returned_instance;
 
         ngx_memzero(rev, sizeof(ngx_event_t));
         ngx_memzero(wev, sizeof(ngx_event_t));
@@ -217,7 +223,10 @@ void ngx_event_accept(ngx_event_t *ev)
         c->socklen = len;
 
         rev->instance = (u_char) !instance;
+        rev->returned_instance = (u_char) rinstance;
+
         wev->instance = (u_char) !instance;
+        wev->returned_instance = (u_char) winstance;
 
         rev->index = NGX_INVALID_INDEX;
         wev->index = NGX_INVALID_INDEX;
@@ -295,6 +304,102 @@ void ngx_event_accept(ngx_event_t *ev)
 }
 
 
+ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
+{
+    if (*ngx_accept_mutex == 0 && ngx_atomic_cmp_set(ngx_accept_mutex, 0, 1)) {
+
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                       "accept mutex locked");
+
+        if (!ngx_accept_token) {
+            if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
+                return NGX_ERROR;
+            }
+
+            ngx_accept_token = 1;
+        }
+
+        return NGX_OK;
+    }
+
+    if (ngx_accept_token) {
+        if (ngx_disable_accept_events(cycle) == NGX_ERROR) {
+            return NGX_ERROR;
+        }
+
+        ngx_accept_token = 0;
+    }
+
+    return NGX_OK;
+}
+
+
+ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle)
+{
+    ngx_uint_t        i;
+    ngx_listening_t  *s;
+
+    s = cycle->listening.elts;
+    for (i = 0; i < cycle->listening.nelts; i++) {
+
+        /*
+         * we do not need to handle the Winsock sockets here (divde a socket
+         * number by 4) because this function would never called
+         * in the Winsock environment
+         */
+
+        if (ngx_event_flags & NGX_USE_SIGIO_EVENT) {
+            if (ngx_add_conn(&cycle->connections[s[i].fd]) == NGX_ERROR) {
+                return NGX_ERROR;
+            }
+
+        } else {
+            if (ngx_add_event(&cycle->read_events[s[i].fd], NGX_READ_EVENT, 0)
+                                                                  == NGX_ERROR)
+            {
+                return NGX_ERROR;
+            }
+        }
+    }
+
+    return NGX_OK;
+}
+
+
+ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle)
+{
+    ngx_uint_t        i;
+    ngx_listening_t  *s;
+
+    s = cycle->listening.elts;
+    for (i = 0; i < cycle->listening.nelts; i++) {
+
+        /*
+         * we do not need to handle the Winsock sockets here (divde a socket
+         * number by 4) because this function would never called
+         * in the Winsock environment
+         */
+
+        if (ngx_event_flags & NGX_USE_SIGIO_EVENT) {
+            if (ngx_del_conn(&cycle->connections[s[i].fd], NGX_DISABLE_EVENT)
+                                                                  == NGX_ERROR)
+            {
+                return NGX_ERROR;
+            }
+
+        } else {
+            if (ngx_del_event(&cycle->read_events[s[i].fd], NGX_READ_EVENT,
+                                               NGX_DISABLE_EVENT) == NGX_ERROR)
+            {
+                return NGX_ERROR;
+            }
+        }
+    }
+
+    return NGX_OK;
+}
+
+
 static size_t ngx_accept_log_error(void *data, char *buf, size_t len)
 {
     ngx_accept_log_ctx_t  *ctx = data;
--- a/src/http/ngx_http_request.c
+++ b/src/http/ngx_http_request.c
@@ -90,6 +90,20 @@ void ngx_http_init_connection(ngx_connec
 
     if (rev->ready) {
         /* deferred accept, aio, iocp */
+
+        if (*ngx_accept_mutex) {
+            if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+                ngx_http_close_connection(c);
+                return;
+            }
+
+            rev->next = ngx_posted_events;
+            ngx_posted_events = rev; 
+
+            ngx_mutex_unlock(ngx_posted_events_mutex);
+            return;
+        }
+
         ngx_http_init_request(rev);
         return;
     }
--- a/src/os/unix/ngx_process_cycle.c
+++ b/src/os/unix/ngx_process_cycle.c
@@ -65,12 +65,26 @@ void ngx_master_process_cycle(ngx_cycle_
     signo = 0;
     live = 0;
 
+    ngx_accept_mutex = mmap(NULL, sizeof(ngx_atomic_t), PROT_READ|PROT_WRITE,
+                            MAP_ANON|MAP_SHARED, -1, 0);
+
+    if (ngx_accept_mutex == NULL) {
+        ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
+                      "mmap(MAP_ANON|MAP_SHARED) failed");
+        /* fatal */
+        exit(2);
+    }
+
     for ( ;; ) {
         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "new cycle");
 
         ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
                                                ngx_core_module);
 
+        if (ccf->worker_processes == NGX_CONF_UNSET) {
+            ccf->worker_processes = 1;
+        }
+
         if (ngx_process == NGX_PROCESS_MASTER) {
             for (i = 0; i < (ngx_uint_t) ccf->worker_processes; i++) {
                 ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
@@ -150,7 +164,7 @@ void ngx_master_process_cycle(ngx_cycle_
                     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                                    "worker cycle");
 
-                    ngx_process_events(cycle->log);
+                    ngx_process_events(cycle);
                     live = 0;
                 }
 
@@ -361,6 +375,10 @@ static void ngx_worker_process_cycle(ngx
     ngx_process = NGX_PROCESS_WORKER;
     ngx_last_process = 0;
 
+    if (ngx_accept_mutex) {
+        ngx_accept_token = 1;
+    }
+
     ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
 
     if (ccf->group != (gid_t) NGX_CONF_UNSET) {
@@ -442,7 +460,7 @@ static void ngx_worker_process_cycle(ngx
     for ( ;; ) {
         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
 
-        ngx_process_events(cycle->log);
+        ngx_process_events(cycle);
 
         if (ngx_terminate) {
             ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting");
@@ -473,7 +491,7 @@ static void ngx_worker_process_cycle(ngx
 
         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
 
-        ngx_process_events(cycle->log);
+        ngx_process_events(cycle);
 
         if (ngx_reopen) {
             ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reopen logs");