changeset 380:5ce6561246a5

nginx-0.0.7-2004-07-07-10:15:04 import
author Igor Sysoev <igor@sysoev.ru>
date Wed, 07 Jul 2004 06:15:04 +0000
parents 73688d5d7fc3
children 02a511569afb
files src/core/ngx_connection.h src/event/modules/ngx_aio_module.c src/event/modules/ngx_devpoll_module.c src/event/modules/ngx_epoll_module.c src/event/modules/ngx_kqueue_module.c src/event/modules/ngx_poll_module.c src/event/modules/ngx_rtsig_module.c src/event/modules/ngx_select_module.c src/event/ngx_event.c src/event/ngx_event.h src/event/ngx_event_accept.c src/event/ngx_event_connect.c src/event/ngx_event_posted.c src/http/modules/proxy/ngx_http_proxy_upstream.c src/http/ngx_http_request.c src/os/unix/ngx_freebsd_rfork_thread.h src/os/unix/ngx_process_cycle.c src/os/unix/ngx_thread.h
diffstat 18 files changed, 241 insertions(+), 70 deletions(-) [+]
line wrap: on
line diff
--- a/src/core/ngx_connection.h
+++ b/src/core/ngx_connection.h
@@ -101,6 +101,7 @@ struct ngx_connection_s {
 
     unsigned          log_error:2;  /* ngx_connection_log_error_e */
 
+    unsigned          single_connection:1;
     unsigned          pipeline:1;
     unsigned          unexpected_eof:1;
     unsigned          timedout:1;
--- a/src/event/modules/ngx_aio_module.c
+++ b/src/event/modules/ngx_aio_module.c
@@ -40,6 +40,7 @@ ngx_event_module_t  ngx_aio_module_ctx =
         NULL,                              /* disable an event */
         NULL,                              /* add an connection */
         ngx_aio_del_connection,            /* delete an connection */
+        NULL,                              /* process the changes */
         ngx_aio_process_events,            /* process the events */
         ngx_aio_init,                      /* init the events */
         ngx_aio_done                       /* done the events */
@@ -139,7 +140,7 @@ static int ngx_aio_del_connection(ngx_co
 
 static int ngx_aio_process_events(ngx_cycle_t *cycle)
 {
-    return ngx_kqueue_module_ctx.actions.process(cycle);
+    return ngx_kqueue_module_ctx.actions.process_events(cycle);
 }
 
 #endif /* HAVE_KQUEUE */
--- a/src/event/modules/ngx_devpoll_module.c
+++ b/src/event/modules/ngx_devpoll_module.c
@@ -82,6 +82,7 @@ ngx_event_module_t  ngx_devpoll_module_c
         ngx_devpoll_del_event,             /* disable an event */
         NULL,                              /* add an connection */
         NULL,                              /* delete an connection */
+        NULL,                              /* process the changes */
         ngx_devpoll_process_events,        /* process the events */
         ngx_devpoll_init,                  /* init the events */
         ngx_devpoll_done,                  /* done the events */
@@ -95,7 +96,7 @@ ngx_module_t  ngx_devpoll_module = {
     ngx_devpoll_commands,                  /* module directives */
     NGX_EVENT_MODULE,                      /* module type */
     NULL,                                  /* init module */
-    NULL                                   /* init child */
+    NULL                                   /* init process */
 };
 
 
--- a/src/event/modules/ngx_epoll_module.c
+++ b/src/event/modules/ngx_epoll_module.c
@@ -113,6 +113,7 @@ ngx_event_module_t  ngx_epoll_module_ctx
         ngx_epoll_del_event,             /* disable an event */
         NULL,                            /* add an connection */
         NULL,                            /* delete an connection */
+        NULL,                            /* process the changes */
         ngx_epoll_process_events,        /* process the events */
         ngx_epoll_init,                  /* init the events */
         ngx_epoll_done,                  /* done the events */
@@ -125,7 +126,7 @@ ngx_module_t  ngx_epoll_module = {
     ngx_epoll_commands,                  /* module directives */
     NGX_EVENT_MODULE,                      /* module type */
     NULL,                                  /* init module */
-    NULL                                   /* init child */
+    NULL                                   /* init process */
 };
 
 
--- a/src/event/modules/ngx_kqueue_module.c
+++ b/src/event/modules/ngx_kqueue_module.c
@@ -21,6 +21,7 @@ static void ngx_kqueue_done(ngx_cycle_t 
 static ngx_int_t ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags);
 static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags);
 static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags);
+static ngx_int_t ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try);
 static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle);
 static ngx_inline void ngx_kqueue_dump_event(ngx_log_t *log,
                                              struct kevent *kev);
@@ -31,9 +32,23 @@ static char *ngx_kqueue_init_conf(ngx_cy
 
 int                    ngx_kqueue = -1;
 
-static struct kevent  *change_list, *event_list;
+/*
+ * The "change_list" should be declared as ngx_thread_volatile.
+ * However, the use of the change_list is localized in kqueue functions and
+ * is protected by the mutex so even the "icc -ipo" should not build the code
+ * with the race condition.  Thus we avoid the declaration to make a more
+ * readable code.
+ */
+
+static struct kevent  *change_list, *change_list0, *change_list1;
+static struct kevent  *event_list;
 static int             max_changes, nchanges, nevents;
 
+#if (NGX_THREADS)
+static ngx_mutex_t    *ngx_kqueue_mutex;
+#endif
+
+
 
 static ngx_str_t      kqueue_name = ngx_string("kqueue");
 
@@ -69,6 +84,7 @@ ngx_event_module_t  ngx_kqueue_module_ct
         ngx_kqueue_del_event,              /* disable an event */
         NULL,                              /* add an connection */
         NULL,                              /* delete an connection */
+        ngx_kqueue_process_changes,        /* process the changes */
         ngx_kqueue_process_events,         /* process the events */
         ngx_kqueue_init,                   /* init the events */
         ngx_kqueue_done                    /* done the events */
@@ -82,7 +98,7 @@ ngx_module_t  ngx_kqueue_module = {
     ngx_kqueue_commands,                   /* module directives */
     NGX_EVENT_MODULE,                      /* module type */
     NULL,                                  /* init module */
-    NULL                                   /* init child */
+    NULL                                   /* init process */
 };
 
 
@@ -102,6 +118,12 @@ static ngx_int_t ngx_kqueue_init(ngx_cyc
                           "kqueue() failed");
             return NGX_ERROR;
         }
+
+#if (NGX_THREADS)
+        if (!(ngx_kqueue_mutex = ngx_mutex_init(cycle->log, 0))) {
+            return NGX_ERROR;
+        }
+#endif
     }
 
     if (max_changes < kcf->changes) {
@@ -117,15 +139,27 @@ static ngx_int_t ngx_kqueue_init(ngx_cyc
             nchanges = 0;
         }
 
-        if (change_list) {
-            ngx_free(change_list);
+        if (change_list0) {
+            ngx_free(change_list0);
+        }
+
+        change_list0 = ngx_alloc(kcf->changes * sizeof(struct kevent),
+                                 cycle->log);
+        if (change_list0 == NULL) {
+            return NGX_ERROR;
         }
 
-        change_list = ngx_alloc(kcf->changes * sizeof(struct kevent),
-                                cycle->log);
-        if (change_list == NULL) {
+        if (change_list1) {
+            ngx_free(change_list1);
+        }
+
+        change_list1 = ngx_alloc(kcf->changes * sizeof(struct kevent),
+                                 cycle->log);
+        if (change_list1 == NULL) {
             return NGX_ERROR;
         }
+
+        change_list = change_list0;
     }
 
     max_changes = kcf->changes;
@@ -135,8 +169,7 @@ static ngx_int_t ngx_kqueue_init(ngx_cyc
             ngx_free(event_list);
         }
 
-        event_list = ngx_alloc(kcf->events * sizeof(struct kevent),
-                                cycle->log);
+        event_list = ngx_alloc(kcf->events * sizeof(struct kevent), cycle->log);
         if (event_list == NULL) {
             return NGX_ERROR;
         }
@@ -172,9 +205,14 @@ static void ngx_kqueue_done(ngx_cycle_t 
 
     ngx_kqueue = -1;
 
-    ngx_free(change_list);
+    ngx_mutex_destroy(ngx_kqueue_mutex);
+
+    ngx_free(change_list1);
+    ngx_free(change_list0);
     ngx_free(event_list);
 
+    change_list1 = NULL;
+    change_list0 = NULL;
     change_list = NULL;
     event_list = NULL;
     max_changes = 0;
@@ -185,6 +223,7 @@ static void ngx_kqueue_done(ngx_cycle_t 
 
 static ngx_int_t ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags)
 {
+    ngx_int_t          rc;
     ngx_event_t       *e;
     ngx_connection_t  *c;
 
@@ -192,8 +231,11 @@ static ngx_int_t ngx_kqueue_add_event(ng
     ev->disabled = 0;
     ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
 
-    if (ngx_thread_main()
-        && nchanges > 0
+    if (ngx_mutex_lock(ngx_kqueue_mutex) == NGX_ERROR) {
+        return NGX_ERROR;
+    }
+
+    if (nchanges > 0
         && ev->index < (u_int) nchanges
         && ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1)
                                                              == (uintptr_t) ev)
@@ -215,29 +257,42 @@ static ngx_int_t ngx_kqueue_add_event(ng
                 e->index = ev->index;
             }
 
+            ngx_mutex_unlock(ngx_kqueue_mutex);
+
             return NGX_OK;
         }
 
         c = ev->data;
+
         ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
                       "previous event on #%d were not passed in kernel", c->fd);
 
+        ngx_mutex_unlock(ngx_kqueue_mutex);
+
         return NGX_ERROR;
     }
 
-    return ngx_kqueue_set_event(ev, event, EV_ADD|EV_ENABLE|flags);
+    rc = ngx_kqueue_set_event(ev, event, EV_ADD|EV_ENABLE|flags);
+
+    ngx_mutex_unlock(ngx_kqueue_mutex);
+
+    return rc;
 }
 
 
 static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags)
 {
+    ngx_int_t     rc;
     ngx_event_t  *e;
 
     ev->active = 0;
     ev->disabled = 0;
 
-    if (ngx_thread_main()
-        && nchanges > 0
+    if (ngx_mutex_lock(ngx_kqueue_mutex) == NGX_ERROR) {
+        return NGX_ERROR;
+    }
+
+    if (nchanges > 0
         && ev->index < (u_int) nchanges
         && ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1)
                                                              == (uintptr_t) ev)
@@ -254,6 +309,8 @@ static ngx_int_t ngx_kqueue_del_event(ng
             e->index = ev->index;
         }
 
+        ngx_mutex_unlock(ngx_kqueue_mutex);
+
         return NGX_OK;
     }
 
@@ -264,6 +321,7 @@ static ngx_int_t ngx_kqueue_del_event(ng
      */
 
     if (flags & NGX_CLOSE_EVENT) {
+        ngx_mutex_unlock(ngx_kqueue_mutex);
         return NGX_OK;
     }
 
@@ -271,14 +329,18 @@ static ngx_int_t ngx_kqueue_del_event(ng
         ev->disabled = 1;
     }
 
-    return ngx_kqueue_set_event(ev, event,
+    rc = ngx_kqueue_set_event(ev, event,
                            flags & NGX_DISABLE_EVENT ? EV_DISABLE : EV_DELETE);
+
+    ngx_mutex_unlock(ngx_kqueue_mutex);
+
+    return rc;
 }
 
 
 static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags)
 {
-    struct kevent     *kev, kv;
+    struct kevent     *kev;
     struct timespec    ts;
     ngx_connection_t  *c;
 
@@ -288,7 +350,7 @@ static ngx_int_t ngx_kqueue_set_event(ng
                    "kevent set event: %d: ft:%d fl:%04X",
                    c->fd, filter, flags);
 
-    if (ngx_thread_main() && nchanges >= max_changes) {
+    if (nchanges >= max_changes) {
         ngx_log_error(NGX_LOG_WARN, ev->log, 0,
                       "kqueue change list is filled up");
 
@@ -303,7 +365,7 @@ static ngx_int_t ngx_kqueue_set_event(ng
         nchanges = 0;
     }
 
-    kev = ngx_thread_main() ? &change_list[nchanges] : &kv;
+    kev = &change_list[nchanges];
 
     kev->ident = c->fd;
     kev->filter = filter;
@@ -336,19 +398,8 @@ static ngx_int_t ngx_kqueue_set_event(ng
 #endif
     }
 
-    if (ngx_thread_main()) {
-        ev->index = nchanges;
-        nchanges++;
-
-    } else {
-        ts.tv_sec = 0;
-        ts.tv_nsec = 0;
-
-        if (kevent(ngx_kqueue, &kv, 1, NULL, 0, &ts) == -1) {
-            ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "kevent() failed");
-            return NGX_ERROR;
-        }
-    }
+    ev->index = nchanges;
+    nchanges++;
 
     return NGX_OK;
 }
@@ -356,7 +407,7 @@ static ngx_int_t ngx_kqueue_set_event(ng
 
 static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle)
 {
-    int                events;
+    int                events, n;
     ngx_int_t          i, instance;
     ngx_uint_t         lock, accept_lock, expire;
     ngx_err_t          err;
@@ -419,6 +470,19 @@ static ngx_int_t ngx_kqueue_process_even
         }
     }
 
+    if (ngx_threaded) {
+        if (ngx_kqueue_process_changes(cycle, 0) == NGX_ERROR) {
+            ngx_accept_mutex_unlock();
+            return NGX_ERROR;
+        }
+
+        n = 0;
+
+    } else {
+        n = nchanges;
+        nchanges = 0;
+    }
+
     if (timer == NGX_TIMER_INFINITE) {
         tp = NULL;
         expire = 0;
@@ -429,10 +493,10 @@ static ngx_int_t ngx_kqueue_process_even
         tp = &ts;
     }
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
-                   "kevent timer: %d", timer);
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                   "kevent timer: %d, changes: %d", timer, n);
 
-    events = kevent(ngx_kqueue, change_list, nchanges, event_list, nevents, tp);
+    events = kevent(ngx_kqueue, change_list, n, event_list, nevents, tp);
 
     if (events == -1) {
         err = ngx_errno;
@@ -440,8 +504,6 @@ static ngx_int_t ngx_kqueue_process_even
         err = 0;
     }
 
-    nchanges = 0;
-
     ngx_gettimeofday(&tv);
     ngx_time_update(tv.tv_sec);
 
@@ -629,6 +691,63 @@ static ngx_int_t ngx_kqueue_process_even
 }
 
 
+static ngx_int_t ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try)
+{
+    int              n;
+    ngx_int_t        rc;
+    ngx_err_t        err;
+    struct timespec  ts;
+    struct kevent   *changes;
+
+    if (try) {
+        rc = ngx_mutex_trylock(ngx_kqueue_mutex);
+        if (rc != NGX_OK) {
+            return rc;
+        }
+
+    } else {
+        if (ngx_mutex_lock(ngx_kqueue_mutex) == NGX_ERROR) {
+            return NGX_ERROR;
+        }
+    }
+
+    if (nchanges == 0) {
+        ngx_mutex_unlock(ngx_kqueue_mutex);
+        return NGX_OK;
+    }
+
+    changes = (struct kevent *) change_list;
+    if (change_list == change_list0) {
+        change_list = change_list1;
+    } else {
+        change_list = change_list0;
+    }
+
+    n = nchanges;
+    nchanges = 0;
+
+    ts.tv_sec = 0;
+    ts.tv_nsec = 0;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+                   "kevent changes: %d", n);
+
+    if (kevent(ngx_kqueue, changes, n, NULL, 0, &ts) == -1) {
+        err = ngx_errno;
+        ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT,
+                      cycle->log, err, "kevent() failed");
+        rc = NGX_ERROR;
+
+    } else {
+        rc = NGX_OK;
+    }
+
+    ngx_mutex_unlock(ngx_kqueue_mutex);
+
+    return rc;
+}
+
+
 static ngx_inline void ngx_kqueue_dump_event(ngx_log_t *log, struct kevent *kev)
 {
     ngx_log_debug6(NGX_LOG_DEBUG_EVENT, log, 0,
--- a/src/event/modules/ngx_poll_module.c
+++ b/src/event/modules/ngx_poll_module.c
@@ -40,6 +40,7 @@ ngx_event_module_t  ngx_poll_module_ctx 
         ngx_poll_del_event,                /* disable an event */
         NULL,                              /* add an connection */
         NULL,                              /* delete an connection */
+        NULL,                              /* process the changes */
         ngx_poll_process_events,           /* process the events */
         ngx_poll_init,                     /* init the events */
         ngx_poll_done                      /* done the events */
--- a/src/event/modules/ngx_rtsig_module.c
+++ b/src/event/modules/ngx_rtsig_module.c
@@ -105,6 +105,7 @@ ngx_event_module_t  ngx_rtsig_module_ctx
         NULL,                            /* disable an event */
         ngx_rtsig_add_connection,        /* add an connection */
         ngx_rtsig_del_connection,        /* delete an connection */
+        NULL,                            /* process the changes */
         ngx_rtsig_process_events,        /* process the events */
         ngx_rtsig_init,                  /* init the events */
         ngx_rtsig_done,                  /* done the events */
@@ -118,7 +119,7 @@ ngx_module_t  ngx_rtsig_module = {
     ngx_rtsig_commands,                  /* module directives */
     NGX_EVENT_MODULE,                      /* module type */
     NULL,                                  /* init module */
-    NULL                                   /* init child */
+    NULL                                   /* init process */
 };
 
 
@@ -492,7 +493,7 @@ ngx_int_t ngx_rtsig_process_events(ngx_c
 
         overflow = 1;
         overflow_current = 0;
-        ngx_event_actions.process = ngx_rtsig_process_overflow;
+        ngx_event_actions.process_events = ngx_rtsig_process_overflow;
 
         return NGX_ERROR;
 
@@ -690,7 +691,7 @@ static ngx_int_t ngx_rtsig_process_overf
                   "rt signal queue overflow recovered");
 
     overflow = 0;
-    ngx_event_actions.process = ngx_rtsig_process_events;
+    ngx_event_actions.process_events = ngx_rtsig_process_events;
 
     return NGX_OK;
 }
--- a/src/event/modules/ngx_select_module.c
+++ b/src/event/modules/ngx_select_module.c
@@ -55,6 +55,7 @@ ngx_event_module_t  ngx_select_module_ct
         ngx_select_del_event,              /* disable an event */
         NULL,                              /* add an connection */
         NULL,                              /* delete an connection */
+        NULL,                              /* process the changes */
         ngx_select_process_events,         /* process the events */
         ngx_select_init,                   /* init the events */
         ngx_select_done                    /* done the events */
@@ -68,7 +69,7 @@ ngx_module_t  ngx_select_module = {
     NULL,                                  /* module directives */
     NGX_EVENT_MODULE,                      /* module type */
     NULL,                                  /* init module */
-    NULL                                   /* init child */
+    NULL                                   /* init process */
 };
 
 
--- a/src/event/ngx_event.c
+++ b/src/event/ngx_event.c
@@ -150,7 +150,7 @@ ngx_event_module_t  ngx_event_core_modul
     ngx_event_create_conf,                 /* create configuration */
     ngx_event_init_conf,                   /* init configuration */
 
-    { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
+    { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
 };
 
 
@@ -281,6 +281,7 @@ static ngx_int_t ngx_event_process_init(
         rev[i].closed = 1;
 #if (NGX_THREADS)
         rev[i].lock = &c[i].lock;
+        rev[i].own_lock = &c[i].lock;
 #endif
     }
 
@@ -295,6 +296,7 @@ static ngx_int_t ngx_event_process_init(
         wev[i].closed = 1;
 #if (NGX_THREADS)
         wev[i].lock = &c[i].lock;
+        wev[i].own_lock = &c[i].lock;
 #endif
     }
 
--- a/src/event/ngx_event.h
+++ b/src/event/ngx_event.h
@@ -161,6 +161,7 @@ struct ngx_event_s {
 #endif
 
     ngx_atomic_t    *lock;
+    ngx_atomic_t    *own_lock;
 
 #endif
 
@@ -201,7 +202,9 @@ typedef struct {
     ngx_int_t  (*add_conn)(ngx_connection_t *c);
     ngx_int_t  (*del_conn)(ngx_connection_t *c, u_int flags);
 
-    ngx_int_t  (*process)(ngx_cycle_t *cycle);
+    ngx_int_t  (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t try);
+    ngx_int_t  (*process_events)(ngx_cycle_t *cycle);
+
     ngx_int_t  (*init)(ngx_cycle_t *cycle);
     void       (*done)(ngx_cycle_t *cycle);
 } ngx_event_actions_t;
@@ -378,7 +381,10 @@ extern ngx_event_actions_t   ngx_event_a
 #endif
 
 
-#define ngx_process_events   ngx_event_actions.process
+#define ngx_process_changes  ngx_event_actions.process_changes
+#define ngx_process_events   ngx_event_actions.process_events
+#define ngx_done_events      ngx_event_actions.done
+
 #define ngx_add_event        ngx_event_actions.add
 #define ngx_del_event        ngx_event_actions.del
 #define ngx_add_conn         ngx_event_actions.add_conn
--- a/src/event/ngx_event_accept.c
+++ b/src/event/ngx_event_accept.c
@@ -211,12 +211,14 @@ void ngx_event_accept(ngx_event_t *ev)
         winstance = wev->returned_instance;
 
 #if (NGX_THREADS)
-        if (*(rev->lock)) {
+
+        if (*(&c->lock)) {
             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0,
-                           "spinlock event " PTR_FMT " in accept", rev);
-            ngx_spinlock(rev->lock, 1000);
-            ngx_unlock(rev->lock);
+                           "spinlock in accept, fd:%", s);
+            ngx_spinlock(&c->lock, 1000);
+            ngx_unlock(&c->lock);
         }
+
 #endif
 
         ngx_memzero(rev, sizeof(ngx_event_t));
@@ -290,6 +292,8 @@ void ngx_event_accept(ngx_event_t *ev)
 #if (NGX_THREADS)
         rev->lock = &c->lock;
         wev->lock = &c->lock;
+        rev->own_lock = &c->lock;
+        wev->own_lock = &c->lock;
 #endif
 
         ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
--- a/src/event/ngx_event_connect.c
+++ b/src/event/ngx_event_connect.c
@@ -184,12 +184,14 @@ int ngx_event_connect_peer(ngx_peer_conn
     winstance = wev->returned_instance;
 
 #if (NGX_THREADS)
-    if (*(rev->lock)) {
+
+    if (*(&c->lock)) {
         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
-                       "spinlock event " PTR_FMT " in connect", rev);
-        ngx_spinlock(rev->lock, 1000);
-        ngx_unlock(rev->lock);
+                       "spinlock in connect, fd:%d", s);
+        ngx_spinlock(&c->lock, 1000);
+        ngx_unlock(&c->lock);
     }
+
 #endif
 
     ngx_memzero(c, sizeof(ngx_connection_t));
@@ -240,6 +242,8 @@ int ngx_event_connect_peer(ngx_peer_conn
 #if (NGX_THREADS)
     rev->lock = pc->lock;
     wev->lock = pc->lock;
+    rev->own_lock = &c->lock;
+    wev->own_lock = &c->lock;
 #endif
 
     if (ngx_add_conn) {
--- a/src/event/ngx_event_posted.c
+++ b/src/event/ngx_event_posted.c
@@ -100,6 +100,18 @@ ngx_int_t ngx_event_thread_process_poste
                 continue;
             }
 
+            if (ev->lock != ev->own_lock) {
+                if (*(ev->own_lock)) {
+                    ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
+                                  "the own lock of the posted event "
+                                  PTR_FMT " is busy", ev);
+                    ngx_unlock(ev->lock);
+                    ev = ev->next;
+                    continue;
+                }
+                *(ev->own_lock) = 1;
+            }
+
             ngx_delete_posted_event(ev);
 
             ev->locked = 1;
@@ -134,6 +146,10 @@ ngx_int_t ngx_event_thread_process_poste
 
             if (ev->locked) {
                 ngx_unlock(ev->lock);
+
+                if (ev->lock != ev->own_lock) {
+                    ngx_unlock(ev->own_lock);
+                }
             }
 
             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
--- a/src/http/modules/proxy/ngx_http_proxy_upstream.c
+++ b/src/http/modules/proxy/ngx_http_proxy_upstream.c
@@ -573,6 +573,8 @@ static void ngx_http_proxy_connect(ngx_h
 
     p->action = "connecting to upstream";
 
+    p->request->connection->single_connection = 0;
+
     rc = ngx_event_connect_peer(&p->upstream->peer);
 
     ngx_log_debug1(NGX_LOG_DEBUG_HTTP, p->request->connection->log, 0,
--- a/src/http/ngx_http_request.c
+++ b/src/http/ngx_http_request.c
@@ -290,6 +290,7 @@ static void ngx_http_init_request(ngx_ev
         return;
     }
 
+    c->single_connection = 1;
     r->connection = c;
     r->pipeline = c->pipeline;
     r->header_in = c->buffer;
@@ -1760,9 +1761,11 @@ void ngx_http_close_connection(ngx_conne
         c->read->closed = 1;
         c->write->closed = 1;
 
-        ngx_unlock(&c->lock);
-        c->read->locked = 0;
-        c->write->locked = 0;
+        if (c->single_connection) {
+            ngx_unlock(&c->lock);
+            c->read->locked = 0;
+            c->write->locked = 0;
+        }
 
         ngx_mutex_unlock(ngx_posted_events_mutex);
     }
--- a/src/os/unix/ngx_freebsd_rfork_thread.h
+++ b/src/os/unix/ngx_freebsd_rfork_thread.h
@@ -85,7 +85,6 @@ static inline int ngx_gettid()
 
 
 ngx_tid_t ngx_thread_self();
-#define ngx_thread_main()   (ngx_gettid() == 0)
 
 
 #define ngx_mutex_trylock(m)  ngx_mutex_dolock(m, 1)
--- a/src/os/unix/ngx_process_cycle.c
+++ b/src/os/unix/ngx_process_cycle.c
@@ -793,7 +793,7 @@ static void ngx_wakeup_worker_threads(ng
             if (ngx_threads[i].state < NGX_THREAD_EXIT) {
                 ngx_cond_signal(ngx_threads[i].cv);
 
-                if (ngx_threads[i].cv->tid == -1) {
+                if (ngx_threads[i].cv->tid == (ngx_tid_t) -1) {
                     ngx_threads[i].state = NGX_THREAD_DONE;
                 } else {
                     live = 1;
@@ -811,6 +811,7 @@ static void ngx_wakeup_worker_threads(ng
                            "all worker threads are joined");
 
             /* STUB */
+            ngx_done_events(cycle);
             ngx_mutex_destroy(ngx_event_timer_mutex);
             ngx_mutex_destroy(ngx_posted_events_mutex);
 
@@ -829,10 +830,13 @@ static void* ngx_worker_thread_cycle(voi
     sigset_t          set;
     ngx_err_t         err;
     ngx_tls_t        *tls;
+    ngx_cycle_t      *cycle;
     struct timeval    tv;
 
     thr->cv->tid = ngx_thread_self();
 
+    cycle = (ngx_cycle_t *) ngx_cycle;
+
     sigemptyset(&set);
     sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
     sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
@@ -840,23 +844,23 @@ static void* ngx_worker_thread_cycle(voi
 
     err = ngx_thread_sigmask(SIG_BLOCK, &set, NULL);
     if (err) {
-        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
+        ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
                       ngx_thread_sigmask_n " failed");
         return (void *) 1;
     }
 
-    ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno,
+    ngx_log_debug1(NGX_LOG_DEBUG_CORE, cycle->log, ngx_errno,
                    "thread " TID_T_FMT " started", ngx_thread_self());
 
     ngx_setthrtitle("worker thread");
 
-    if (!(tls = ngx_calloc(sizeof(ngx_tls_t), ngx_cycle->log))) {
+    if (!(tls = ngx_calloc(sizeof(ngx_tls_t), cycle->log))) {
         return (void *) 1;
     }
 
     err = ngx_thread_create_tls();
     if (err != 0) {
-        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
+        ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
                       ngx_thread_create_tls_n " failed");
         return (void *) 1;
     }
@@ -879,7 +883,7 @@ static void* ngx_worker_thread_cycle(voi
 
             ngx_mutex_unlock(ngx_posted_events_mutex);
 
-            ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno,
+            ngx_log_debug1(NGX_LOG_DEBUG_CORE, cycle->log, ngx_errno,
                            "thread %d is done", ngx_thread_self());
 
             return (void *) 0;
@@ -887,9 +891,15 @@ static void* ngx_worker_thread_cycle(voi
 
         thr->state = NGX_THREAD_BUSY;
 
-        if (ngx_event_thread_process_posted((ngx_cycle_t *) ngx_cycle)
-                                                                  == NGX_ERROR)
-        {
+        if (ngx_event_thread_process_posted(cycle) == NGX_ERROR) {
+            return (void *) 1;
+        }
+
+        if (ngx_event_thread_process_posted(cycle) == NGX_ERROR) {
+            return (void *) 1;
+        }
+
+        if (ngx_process_changes(cycle, 1) == NGX_ERROR) {
             return (void *) 1;
         }
     }
--- a/src/os/unix/ngx_thread.h
+++ b/src/os/unix/ngx_thread.h
@@ -21,7 +21,6 @@
 typedef pthread_t  ngx_tid_t;
 
 #define ngx_thread_self()   pthread_self()
-#define ngx_thread_main()   pthread_main_np()
 #define ngx_log_tid         (int) ngx_thread_self()
 
 #define TID_T_FMT           PTR_FMT