changeset 348:68ff8000a974

nginx-0.0.3-2004-06-09-11:45:27 import
author Igor Sysoev <igor@sysoev.ru>
date Wed, 09 Jun 2004 07:45:27 +0000
parents f48d579daf78
children a958aa92f9a5
files src/core/ngx_buf.h src/event/modules/ngx_rtsig_module.c
diffstat 2 files changed, 176 insertions(+), 54 deletions(-) [+]
line wrap: on
line diff
--- a/src/core/ngx_buf.h
+++ b/src/core/ngx_buf.h
@@ -136,7 +136,7 @@ typedef struct {
 #define ngx_buf_in_memory(b)        (b->temporary || b->memory || b->mmap)
 #define ngx_buf_in_memory_only(b)   (ngx_buf_in_memory(b) && !b->in_file)
 #define ngx_buf_special(b)                                                   \
-        ((b->flush || b->last) && !ngx_buf_in_memory(b) && !b->in_file)
+        ((b->flush || b->last_buf) && !ngx_buf_in_memory(b) && !b->in_file)
 
 #define ngx_buf_size(b)                                                      \
         (ngx_buf_in_memory(b) ? (size_t) (b->last - b->pos):                 \
--- a/src/event/modules/ngx_rtsig_module.c
+++ b/src/event/modules/ngx_rtsig_module.c
@@ -11,9 +11,11 @@
 
 #if (TEST_BUILD_RTSIG)
 
-#define F_SETSIG  10
-#define SIGRTMIN  33
-#define si_fd     __spare__[0]
+#define F_SETSIG       10
+#define SIGRTMIN       33
+#define si_fd          __spare__[0]
+#define KERN_RTSIGNR   30
+#define KERN_RTSIGMAX  31
 
 int sigtimedwait(const sigset_t *set, siginfo_t *info,
                  const struct timespec *timeout)
@@ -31,18 +33,19 @@ typedef struct {
 
 extern ngx_event_module_t  ngx_poll_module_ctx;
 
-static int ngx_rtsig_init(ngx_cycle_t *cycle);
+static ngx_int_t ngx_rtsig_init(ngx_cycle_t *cycle);
 static void ngx_rtsig_done(ngx_cycle_t *cycle);
-static int ngx_rtsig_add_connection(ngx_connection_t *c);
-static int ngx_rtsig_del_connection(ngx_connection_t *c, u_int flags);
-static int ngx_rtsig_process_events(ngx_cycle_t *cycle);
-static int ngx_rtsig_process_overflow(ngx_cycle_t *cycle);
+static ngx_int_t ngx_rtsig_add_connection(ngx_connection_t *c);
+static ngx_int_t ngx_rtsig_del_connection(ngx_connection_t *c, u_int flags);
+static ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle);
+static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle);
 
 static void *ngx_rtsig_create_conf(ngx_cycle_t *cycle);
 static char *ngx_rtsig_init_conf(ngx_cycle_t *cycle, void *conf);
 
 
-static sigset_t  set;
+static sigset_t    set;
+static ngx_uint_t  overflow, current;
 
 
 static ngx_str_t      rtsig_name = ngx_string("rtsig");
@@ -89,7 +92,7 @@ ngx_module_t  ngx_rtsig_module = {
 };
 
 
-static int ngx_rtsig_init(ngx_cycle_t *cycle)
+static ngx_int_t ngx_rtsig_init(ngx_cycle_t *cycle)
 {
     ngx_rtsig_conf_t  *rtscf;
 
@@ -128,7 +131,7 @@ static void ngx_rtsig_done(ngx_cycle_t *
 }
 
 
-static int ngx_rtsig_add_connection(ngx_connection_t *c)
+static ngx_int_t ngx_rtsig_add_connection(ngx_connection_t *c)
 {
     int                signo;
     ngx_rtsig_conf_t  *rtscf;
@@ -173,7 +176,7 @@ static int ngx_rtsig_add_connection(ngx_
 }
 
 
-static int ngx_rtsig_del_connection(ngx_connection_t *c, u_int flags)
+static ngx_int_t ngx_rtsig_del_connection(ngx_connection_t *c, u_int flags)
 {
     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
                    "rtsig del connection: fd:%d", c->fd);
@@ -193,7 +196,7 @@ static int ngx_rtsig_del_connection(ngx_
 }
 
 
-int ngx_rtsig_process_events(ngx_cycle_t *cycle)
+ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle)
 {
     int                 signo;
     ngx_int_t           instance, i;
@@ -209,38 +212,44 @@ int ngx_rtsig_process_events(ngx_cycle_t
     ngx_connection_t   *c;
     ngx_rtsig_conf_t   *rtscf;
 
-    for ( ;; ) {
-        timer = ngx_event_find_timer();
+    if (overflow) {
+        timer = 0;
+        expire = 0;
+
+    } else {
+        for ( ;; ) {
+            timer = ngx_event_find_timer();
 
-        if (timer != 0) {
-            break;
+            if (timer != 0) {
+                break;
+            }
+
+            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                           "rtsig expired timer");
+
+            ngx_event_expire_timers((ngx_msec_t)
+                                    (ngx_elapsed_msec - ngx_old_elapsed_msec));
         }
 
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
-                       "rtsig expired timer");
+        expire = 1;
 
-        ngx_event_expire_timers((ngx_msec_t)
-                                    (ngx_elapsed_msec - ngx_old_elapsed_msec));
-    }
-
-    expire = 1;
-
-    if (ngx_accept_mutex) {
-        if (ngx_accept_disabled > 0) {
-            ngx_accept_disabled--;
+        if (ngx_accept_mutex) {
+            if (ngx_accept_disabled > 0) {
+                ngx_accept_disabled--;
 
-        } else {
-            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
-                return NGX_ERROR;
-            }
+            } else {
+                if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
+                    return NGX_ERROR;
+                }
 
-            if (ngx_accept_mutex_held == 0
-                && (timer == NGX_TIMER_INFINITE
-                    || timer > ngx_accept_mutex_delay))
-            {
-                timer = ngx_accept_mutex_delay;
-                expire = 0;
-            } 
+                if (ngx_accept_mutex_held == 0
+                    && (timer == NGX_TIMER_INFINITE
+                        || timer > ngx_accept_mutex_delay))
+                {
+                    timer = ngx_accept_mutex_delay;
+                    expire = 0;
+                } 
+            }
         }
     }
 
@@ -265,6 +274,19 @@ int ngx_rtsig_process_events(ngx_cycle_t
 
     if (signo == -1) {
         err = ngx_errno;
+
+        if (err == NGX_EAGAIN) {
+
+            if (timer == NGX_TIMER_INFINITE) {
+                ngx_accept_mutex_unlock();
+                ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
+                              "sigtimedwait() returned EAGAIN without timeout");
+                return NGX_ERROR;
+            }
+
+            err = 0;
+        }
+
     } else {
         err = 0;
     }
@@ -296,6 +318,10 @@ int ngx_rtsig_process_events(ngx_cycle_t
 
     if (signo == rtscf->signo || signo == rtscf->signo + 1) {
 
+        if (overflow && (ngx_uint_t) si.si_fd > current) {
+            return NGX_OK;
+        }
+
         /* TODO: old_cycles */
 
         c = &ngx_cycle->connections[si.si_fd];
@@ -374,29 +400,32 @@ int ngx_rtsig_process_events(ngx_cycle_t
     } else if (signo == SIGIO) {
         ngx_accept_mutex_unlock();
 
-        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
-                      "signal queue overflowed: "
-                      "SIGIO, fd:%d, band:%X", si.si_fd, si.si_band);
+        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
+                      "rt signal queue overflowed");
 
-        /* TODO: flush all the used RT signals */
+        /* flush the RT signal queue */
 
         ngx_memzero(&sa, sizeof(struct sigaction));
         sa.sa_handler = SIG_DFL;
         sigemptyset(&sa.sa_mask);
+
         if (sigaction(rtscf->signo, &sa, NULL) == -1) {
             ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                           "sigaction(%d, SIG_DFL) failed", rtscf->signo);
         }
 
-        ngx_event_actions = ngx_poll_module_ctx.actions;
-        ngx_event_actions.process = ngx_rtsig_process_overflow;
-        ngx_event_flags = NGX_OVERFLOW_EVENT
-                          |NGX_USE_LEVEL_EVENT|NGX_USE_ONESHOT_EVENT;
+        if (sigaction(rtscf->signo + 1, &sa, NULL) == -1) {
+            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
+                          "sigaction(%d, SIG_DFL) failed", rtscf->signo + 1);
+        }
 
-        /* STUB: add events. WHAT to do with fcntl()s ? */
+        overflow = 1;
+        current = 0;
+        ngx_event_actions.process = ngx_rtsig_process_overflow;
 
+        return NGX_OK;
 
-    } else {
+    } else if (signo != -1) {
         ngx_accept_mutex_unlock();
 
         ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
@@ -419,13 +448,106 @@ int ngx_rtsig_process_events(ngx_cycle_t
 }
 
 
-static int ngx_rtsig_process_overflow(ngx_cycle_t *cycle)
+static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle)
 {
-    if (ngx_poll_module_ctx.actions.process(cycle) == NGX_OK) {
-        ngx_event_actions = ngx_rtsig_module_ctx.actions;
-        ngx_event_flags = NGX_USE_RTSIG_EVENT;
+    int                name[2], len, rtsig_max, rtsig_nr;
+    ngx_uint_t         i;
+    ngx_connection_t  *c;
+
+    /* TODO: old cylces */
+
+    c = cycle->connections;
+    for (current = 0; current < cycle->connection_n; current++) {
+
+        i = current;
+
+        if (c[i].fd == -1) {
+            continue;
+        }
+
+        if (c[i].read->active && c[i].read->event_handler) {
+            c[i].read->ready = 1;
+
+            if (!ngx_threaded) {
+                c[i].read->event_handler(c[i].read); 
+
+            } else {
+                if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+                    return NGX_ERROR;
+                }
+
+                ngx_post_event(c[i].read);
+
+                ngx_mutex_unlock(ngx_posted_events_mutex);
+            }
+        }
+
+        if (c[i].write->active && c[i].write->event_handler) {
+            c[i].write->ready = 1;
+    
+            if (!ngx_threaded) {
+                c[i].write->event_handler(c[i].write);
+
+            } else {
+                if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+                    return NGX_ERROR;
+                } 
+
+                ngx_post_event(c[i].write);
+
+                ngx_mutex_unlock(ngx_posted_events_mutex);
+            }
+        }
+
+        if (i && (i % 100 == 0)) {
+
+            /*
+             * Check the current rt queue length to prevent the new overflow.
+             *
+             * Learn the /proc/sys/kernel/rtsig-max value because
+             * it can be changed sisnce the last checking
+             */
+
+            name[0] = CTL_KERN;
+            name[1] = KERN_RTSIGMAX;
+            len = sizeof(rtsig_max);
+            if (sysctl(name, sizeof(name), &rtsig_max, &len, NULL, 0) == -1) {
+                ngx_log_error(NGX_LOG_ALERT, cycle->log, errno,
+                              "sysctl(KERN_RTSIGMAX) failed");
+                return NGX_ERROR;
+            }
+
+            name[0] = CTL_KERN;
+            name[1] = KERN_RTSIGNR;
+            len = sizeof(rtsig_nr);
+            if (sysctl(name, sizeof(name), &rtsig_nr, &len, NULL, 0) == -1) {
+                ngx_log_error(NGX_LOG_ALERT, cycle->log, errno,
+                              "sysctl(KERN_RTSIGNR) failed");
+                return NGX_ERROR;
+            }
+
+            /*
+             * drain rt signal queue if the /proc/sys/kernel/rtsig-nr
+             * is bigger then "/proc/sys/kernel/rtsig-max / 4"
+             */
+
+            while (rtsig_max / 4 < rtsig_nr) {
+                ngx_rtsig_process_events(cycle);
+                rtsig_nr--;
+            }
+        }
     }
 
+    if (!ngx_threaded) {
+        ngx_event_process_posted(cycle);
+    }
+
+    ngx_log_error(NGX_LOG_INFO, cycle->log, 0,
+                  "rt signal queue overflow recovered");
+
+    overflow = 0;
+    ngx_event_actions.process = ngx_rtsig_process_events;
+
     return NGX_OK;
 }