changeset 6693:3908156a51fa

Stream: phases.
author Roman Arutyunyan <arut@nginx.com>
date Thu, 15 Sep 2016 14:55:54 +0300
parents 56fc55e32f23
children ea9dfe2f62e7
files src/stream/ngx_stream.c src/stream/ngx_stream.h src/stream/ngx_stream_access_module.c src/stream/ngx_stream_core_module.c src/stream/ngx_stream_handler.c src/stream/ngx_stream_limit_conn_module.c src/stream/ngx_stream_log_module.c src/stream/ngx_stream_realip_module.c src/stream/ngx_stream_ssl_module.c
diffstat 9 files changed, 428 insertions(+), 186 deletions(-) [+]
line wrap: on
line diff
--- a/src/stream/ngx_stream.c
+++ b/src/stream/ngx_stream.c
@@ -12,6 +12,10 @@
 
 
 static char *ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
+static ngx_int_t ngx_stream_init_phases(ngx_conf_t *cf,
+    ngx_stream_core_main_conf_t *cmcf);
+static ngx_int_t ngx_stream_init_phase_handlers(ngx_conf_t *cf,
+    ngx_stream_core_main_conf_t *cmcf);
 static ngx_int_t ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
     ngx_stream_listen_t *listen);
 static char *ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports);
@@ -219,6 +223,10 @@ ngx_stream_block(ngx_conf_t *cf, ngx_com
         }
     }
 
+    if (ngx_stream_init_phases(cf, cmcf) != NGX_OK) {
+        return NGX_CONF_ERROR;
+    }
+
     for (m = 0; cf->cycle->modules[m]; m++) {
         if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) {
             continue;
@@ -239,6 +247,9 @@ ngx_stream_block(ngx_conf_t *cf, ngx_com
 
     *cf = pcf;
 
+    if (ngx_stream_init_phase_handlers(cf, cmcf) != NGX_OK) {
+        return NGX_CONF_ERROR;
+    }
 
     if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_stream_conf_port_t))
         != NGX_OK)
@@ -259,6 +270,105 @@ ngx_stream_block(ngx_conf_t *cf, ngx_com
 
 
 static ngx_int_t
+ngx_stream_init_phases(ngx_conf_t *cf, ngx_stream_core_main_conf_t *cmcf)
+{
+    if (ngx_array_init(&cmcf->phases[NGX_STREAM_POST_ACCEPT_PHASE].handlers,
+                       cf->pool, 1, sizeof(ngx_stream_handler_pt))
+        != NGX_OK)
+    {
+        return NGX_ERROR;
+    }
+
+    if (ngx_array_init(&cmcf->phases[NGX_STREAM_PREACCESS_PHASE].handlers,
+                       cf->pool, 1, sizeof(ngx_stream_handler_pt))
+        != NGX_OK)
+    {
+        return NGX_ERROR;
+    }
+
+    if (ngx_array_init(&cmcf->phases[NGX_STREAM_ACCESS_PHASE].handlers,
+                       cf->pool, 1, sizeof(ngx_stream_handler_pt))
+        != NGX_OK)
+    {
+        return NGX_ERROR;
+    }
+
+#if (NGX_STREAM_SSL)
+    if (ngx_array_init(&cmcf->phases[NGX_STREAM_SSL_PHASE].handlers,
+                       cf->pool, 1, sizeof(ngx_stream_handler_pt))
+        != NGX_OK)
+    {
+        return NGX_ERROR;
+    }
+#endif
+
+    if (ngx_array_init(&cmcf->phases[NGX_STREAM_LOG_PHASE].handlers,
+                       cf->pool, 1, sizeof(ngx_stream_handler_pt))
+        != NGX_OK)
+    {
+        return NGX_ERROR;
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_init_phase_handlers(ngx_conf_t *cf,
+    ngx_stream_core_main_conf_t *cmcf)
+{
+    ngx_int_t                     j;
+    ngx_uint_t                    i, n;
+    ngx_stream_handler_pt        *h;
+    ngx_stream_phase_handler_t   *ph;
+    ngx_stream_phase_handler_pt   checker;
+
+    n = 1 /* content phase */;
+
+    for (i = 0; i < NGX_STREAM_LOG_PHASE; i++) {
+        n += cmcf->phases[i].handlers.nelts;
+    }
+
+    ph = ngx_pcalloc(cf->pool,
+                     n * sizeof(ngx_stream_phase_handler_t) + sizeof(void *));
+    if (ph == NULL) {
+        return NGX_ERROR;
+    }
+
+    cmcf->phase_engine.handlers = ph;
+    n = 0;
+
+    for (i = 0; i < NGX_STREAM_LOG_PHASE; i++) {
+        h = cmcf->phases[i].handlers.elts;
+
+        switch (i) {
+
+        case NGX_STREAM_CONTENT_PHASE:
+            ph->checker = ngx_stream_core_content_phase;
+            n++;
+            ph++;
+
+            continue;
+
+        default:
+            checker = ngx_stream_core_generic_phase;
+        }
+
+        n += cmcf->phases[i].handlers.nelts;
+
+        for (j = cmcf->phases[i].handlers.nelts - 1; j >= 0; j--) {
+            ph->checker = checker;
+            ph->handler = h[j];
+            ph->next = n;
+            ph++;
+        }
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
 ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
     ngx_stream_listen_t *listen)
 {
--- a/src/stream/ngx_stream.h
+++ b/src/stream/ngx_stream.h
@@ -115,17 +115,48 @@ typedef struct {
 } ngx_stream_conf_addr_t;
 
 
-typedef ngx_int_t (*ngx_stream_access_pt)(ngx_stream_session_t *s);
+typedef enum {
+    NGX_STREAM_POST_ACCEPT_PHASE = 0,
+    NGX_STREAM_PREACCESS_PHASE,
+    NGX_STREAM_ACCESS_PHASE,
+#if (NGX_STREAM_SSL)
+    NGX_STREAM_SSL_PHASE,
+#endif
+    NGX_STREAM_CONTENT_PHASE,
+    NGX_STREAM_LOG_PHASE
+} ngx_stream_phases;
+
+
+typedef struct ngx_stream_phase_handler_s  ngx_stream_phase_handler_t;
+
+typedef ngx_int_t (*ngx_stream_phase_handler_pt)(ngx_stream_session_t *s,
+    ngx_stream_phase_handler_t *ph);
+typedef ngx_int_t (*ngx_stream_handler_pt)(ngx_stream_session_t *s);
+typedef void (*ngx_stream_content_handler_pt)(ngx_stream_session_t *s);
+
+
+struct ngx_stream_phase_handler_s {
+    ngx_stream_phase_handler_pt    checker;
+    ngx_stream_handler_pt          handler;
+    ngx_uint_t                     next;
+};
+
+
+typedef struct {
+    ngx_stream_phase_handler_t    *handlers;
+} ngx_stream_phase_engine_t;
+
+
+typedef struct {
+    ngx_array_t                    handlers;
+} ngx_stream_phase_t;
 
 
 typedef struct {
     ngx_array_t                    servers;     /* ngx_stream_core_srv_conf_t */
     ngx_array_t                    listen;      /* ngx_stream_listen_t */
 
-    ngx_stream_access_pt           realip_handler;
-    ngx_stream_access_pt           limit_conn_handler;
-    ngx_stream_access_pt           access_handler;
-    ngx_stream_access_pt           access_log_handler;
+    ngx_stream_phase_engine_t      phase_engine;
 
     ngx_hash_t                     variables_hash;
 
@@ -136,14 +167,13 @@ typedef struct {
     ngx_uint_t                     variables_hash_bucket_size;
 
     ngx_hash_keys_arrays_t        *variables_keys;
+
+    ngx_stream_phase_t             phases[NGX_STREAM_LOG_PHASE + 1];
 } ngx_stream_core_main_conf_t;
 
 
-typedef void (*ngx_stream_handler_pt)(ngx_stream_session_t *s);
-
-
 typedef struct {
-    ngx_stream_handler_pt          handler;
+    ngx_stream_content_handler_pt  handler;
 
     ngx_stream_conf_ctx_t         *ctx;
 
@@ -189,6 +219,7 @@ struct ngx_stream_session_s {
     u_char                        *captures_data;
 #endif
 
+    ngx_int_t                      phase_handler;
     ngx_uint_t                     status;
 
 #if (NGX_STREAM_SSL)
@@ -246,7 +277,15 @@ typedef struct {
 #define NGX_STREAM_WRITE_BUFFERED  0x10
 
 
+void ngx_stream_core_run_phases(ngx_stream_session_t *s);
+ngx_int_t ngx_stream_core_generic_phase(ngx_stream_session_t *s,
+    ngx_stream_phase_handler_t *ph);
+ngx_int_t ngx_stream_core_content_phase(ngx_stream_session_t *s,
+    ngx_stream_phase_handler_t *ph);
+
+
 void ngx_stream_init_connection(ngx_connection_t *c);
+void ngx_stream_session_handler(ngx_event_t *rev);
 void ngx_stream_finalize_session(ngx_stream_session_t *s, ngx_uint_t rc);
 
 
--- a/src/stream/ngx_stream_access_module.c
+++ b/src/stream/ngx_stream_access_module.c
@@ -275,7 +275,7 @@ ngx_stream_access_found(ngx_stream_sessi
     if (deny) {
         ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
                       "access forbidden by rule");
-        return NGX_ABORT;
+        return NGX_STREAM_FORBIDDEN;
     }
 
     return NGX_OK;
@@ -443,10 +443,17 @@ ngx_stream_access_merge_srv_conf(ngx_con
 static ngx_int_t
 ngx_stream_access_init(ngx_conf_t *cf)
 {
+    ngx_stream_handler_pt        *h;
     ngx_stream_core_main_conf_t  *cmcf;
 
     cmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_core_module);
-    cmcf->access_handler = ngx_stream_access_handler;
+
+    h = ngx_array_push(&cmcf->phases[NGX_STREAM_ACCESS_PHASE].handlers);
+    if (h == NULL) {
+        return NGX_ERROR;
+    }
+
+    *h = ngx_stream_access_handler;
 
     return NGX_OK;
 }
--- a/src/stream/ngx_stream_core_module.c
+++ b/src/stream/ngx_stream_core_module.c
@@ -123,6 +123,108 @@ ngx_module_t  ngx_stream_core_module = {
 };
 
 
+void
+ngx_stream_core_run_phases(ngx_stream_session_t *s)
+{
+    ngx_int_t                     rc;
+    ngx_stream_phase_handler_t   *ph;
+    ngx_stream_core_main_conf_t  *cmcf;
+
+    cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
+
+    ph = cmcf->phase_engine.handlers;
+
+    while (ph[s->phase_handler].checker) {
+
+        rc = ph[s->phase_handler].checker(s, &ph[s->phase_handler]);
+
+        if (rc == NGX_OK) {
+            return;
+        }
+    }
+}
+
+
+ngx_int_t
+ngx_stream_core_generic_phase(ngx_stream_session_t *s,
+    ngx_stream_phase_handler_t *ph)
+{
+    ngx_int_t  rc;
+
+    /*
+     * generic phase checker,
+     * used by all phases, except for content
+     */
+
+    ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+                   "generic phase: %ui", s->phase_handler);
+
+    rc = ph->handler(s);
+
+    if (rc == NGX_OK) {
+        s->phase_handler = ph->next;
+        return NGX_AGAIN;
+    }
+
+    if (rc == NGX_DECLINED) {
+        s->phase_handler++;
+        return NGX_AGAIN;
+    }
+
+    if (rc == NGX_AGAIN || rc == NGX_DONE) {
+        return NGX_OK;
+    }
+
+    if (rc == NGX_ERROR) {
+        rc = NGX_STREAM_INTERNAL_SERVER_ERROR;
+    }
+
+    ngx_stream_finalize_session(s, rc);
+
+    return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_stream_core_content_phase(ngx_stream_session_t *s,
+    ngx_stream_phase_handler_t *ph)
+{
+    int                          tcp_nodelay;
+    ngx_connection_t            *c;
+    ngx_stream_core_srv_conf_t  *cscf;
+
+    c = s->connection;
+
+    c->log->action = NULL;
+
+    cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
+
+    if (c->type == SOCK_STREAM
+        && cscf->tcp_nodelay
+        && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET)
+    {
+        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, "tcp_nodelay");
+
+        tcp_nodelay = 1;
+
+        if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
+                       (const void *) &tcp_nodelay, sizeof(int)) == -1)
+        {
+            ngx_connection_error(c, ngx_socket_errno,
+                                 "setsockopt(TCP_NODELAY) failed");
+            ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+            return NGX_OK;
+        }
+
+        c->tcp_nodelay = NGX_TCP_NODELAY_SET;
+    }
+
+    cscf->handler(s);
+
+    return NGX_OK;
+}
+
+
 static ngx_int_t
 ngx_stream_core_preconfiguration(ngx_conf_t *cf)
 {
--- a/src/stream/ngx_stream_handler.c
+++ b/src/stream/ngx_stream_handler.c
@@ -11,15 +11,10 @@
 #include <ngx_stream.h>
 
 
+static void ngx_stream_log_session(ngx_stream_session_t *s);
 static void ngx_stream_close_connection(ngx_connection_t *c);
 static u_char *ngx_stream_log_error(ngx_log_t *log, u_char *buf, size_t len);
 static void ngx_stream_proxy_protocol_handler(ngx_event_t *rev);
-static void ngx_stream_init_session_handler(ngx_event_t *rev);
-
-#if (NGX_STREAM_SSL)
-static void ngx_stream_ssl_init_connection(ngx_ssl_t *ssl, ngx_connection_t *c);
-static void ngx_stream_ssl_handshake_handler(ngx_connection_t *c);
-#endif
 
 
 void
@@ -154,7 +149,7 @@ ngx_stream_init_connection(ngx_connectio
     c->log->connection = c->number;
     c->log->handler = ngx_stream_log_error;
     c->log->data = s;
-    c->log->action = "initializing connection";
+    c->log->action = "initializing session";
     c->log_error = NGX_ERROR_INFO;
 
     s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module);
@@ -179,7 +174,7 @@ ngx_stream_init_connection(ngx_connectio
     s->start_msec = tp->msec;
 
     rev = c->read;
-    rev->handler = ngx_stream_init_session_handler;
+    rev->handler = ngx_stream_session_handler;
 
     if (addr_conf->proxy_protocol) {
         c->log->action = "reading PROXY protocol";
@@ -279,189 +274,54 @@ ngx_stream_proxy_protocol_handler(ngx_ev
         return;
     }
 
-    ngx_stream_init_session_handler(rev);
+    c->log->action = "initializing session";
+
+    ngx_stream_session_handler(rev);
 }
 
 
-static void
-ngx_stream_init_session_handler(ngx_event_t *rev)
+void
+ngx_stream_session_handler(ngx_event_t *rev)
 {
-    int                           tcp_nodelay;
-    ngx_int_t                     rc;
-    ngx_connection_t             *c;
-    ngx_stream_session_t         *s;
-    ngx_stream_core_srv_conf_t   *cscf;
-    ngx_stream_core_main_conf_t  *cmcf;
+    ngx_connection_t      *c;
+    ngx_stream_session_t  *s;
 
     c = rev->data;
     s = c->data;
 
-    c->log->action = "initializing session";
-
-    cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
-
-    if (cmcf->realip_handler) {
-        rc = cmcf->realip_handler(s);
-
-        if (rc == NGX_ERROR) {
-            ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
-            return;
-        }
-    }
-
-    if (cmcf->limit_conn_handler) {
-        rc = cmcf->limit_conn_handler(s);
-
-        if (rc == NGX_ERROR) {
-            ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
-            return;
-        }
-
-        if (rc == NGX_ABORT) {
-            ngx_stream_finalize_session(s, NGX_STREAM_SERVICE_UNAVAILABLE);
-            return;
-        }
-    }
-
-    if (cmcf->access_handler) {
-        rc = cmcf->access_handler(s);
-
-        if (rc == NGX_ERROR) {
-            ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
-            return;
-        }
-
-        if (rc == NGX_ABORT) {
-            ngx_stream_finalize_session(s, NGX_STREAM_FORBIDDEN);
-            return;
-        }
-    }
-
-    cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
-
-    if (c->type == SOCK_STREAM
-        && cscf->tcp_nodelay
-        && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET)
-    {
-        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, "tcp_nodelay");
-
-        tcp_nodelay = 1;
-
-        if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
-                       (const void *) &tcp_nodelay, sizeof(int)) == -1)
-        {
-            ngx_connection_error(c, ngx_socket_errno,
-                                 "setsockopt(TCP_NODELAY) failed");
-            ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
-            return;
-        }
-
-        c->tcp_nodelay = NGX_TCP_NODELAY_SET;
-    }
-
-
-#if (NGX_STREAM_SSL)
-    {
-    ngx_stream_ssl_conf_t  *sslcf;
-
-    sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module);
-
-    if (s->ssl) {
-        c->log->action = "SSL handshaking";
-
-        if (sslcf->ssl.ctx == NULL) {
-            ngx_log_error(NGX_LOG_ERR, c->log, 0,
-                          "no \"ssl_certificate\" is defined "
-                          "in server listening on SSL port");
-            ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
-            return;
-        }
-
-        ngx_stream_ssl_init_connection(&sslcf->ssl, c);
-        return;
-    }
-    }
-#endif
-
-    c->log->action = "handling client connection";
-
-    cscf->handler(s);
+    ngx_stream_core_run_phases(s);
 }
 
 
-#if (NGX_STREAM_SSL)
-
-static void
-ngx_stream_ssl_init_connection(ngx_ssl_t *ssl, ngx_connection_t *c)
-{
-    ngx_stream_session_t   *s;
-    ngx_stream_ssl_conf_t  *sslcf;
-
-    s = c->data;
-
-    if (ngx_ssl_create_connection(ssl, c, 0) == NGX_ERROR) {
-        ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
-        return;
-    }
-
-    if (ngx_ssl_handshake(c) == NGX_AGAIN) {
-        sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module);
-
-        ngx_add_timer(c->read, sslcf->handshake_timeout);
-
-        c->ssl->handler = ngx_stream_ssl_handshake_handler;
-
-        return;
-    }
-
-    ngx_stream_ssl_handshake_handler(c);
-}
-
-
-static void
-ngx_stream_ssl_handshake_handler(ngx_connection_t *c)
-{
-    ngx_stream_session_t        *s;
-    ngx_stream_core_srv_conf_t  *cscf;
-
-    if (!c->ssl->handshaked) {
-        ngx_stream_finalize_session(c->data, NGX_STREAM_INTERNAL_SERVER_ERROR);
-        return;
-    }
-
-    if (c->read->timer_set) {
-        ngx_del_timer(c->read);
-    }
-
-    c->log->action = "handling client connection";
-
-    s = c->data;
-
-    cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
-
-    cscf->handler(s);
-}
-
-#endif
-
-
 void
 ngx_stream_finalize_session(ngx_stream_session_t *s, ngx_uint_t rc)
 {
-    ngx_stream_core_main_conf_t  *cmcf;
-
     ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
                    "finalize stream session: %i", rc);
 
     s->status = rc;
 
+    ngx_stream_log_session(s);
+
+    ngx_stream_close_connection(s->connection);
+}
+
+
+static void
+ngx_stream_log_session(ngx_stream_session_t *s)
+{
+    ngx_uint_t                    i, n;
+    ngx_stream_handler_pt        *log_handler;
+    ngx_stream_core_main_conf_t  *cmcf;
+
     cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
 
-    if (cmcf->access_log_handler) {
-        (void) cmcf->access_log_handler(s);
+    log_handler = cmcf->phases[NGX_STREAM_LOG_PHASE].handlers.elts;
+    n = cmcf->phases[NGX_STREAM_LOG_PHASE].handlers.nelts;
+
+    for (i = 0; i < n; i++) {
+        log_handler[i](s);
     }
-
-    ngx_stream_close_connection(s->connection);
 }
 
 
--- a/src/stream/ngx_stream_limit_conn_module.c
+++ b/src/stream/ngx_stream_limit_conn_module.c
@@ -178,7 +178,7 @@ ngx_stream_limit_conn_handler(ngx_stream
             if (node == NULL) {
                 ngx_shmtx_unlock(&shpool->mutex);
                 ngx_stream_limit_conn_cleanup_all(s->connection->pool);
-                return NGX_ABORT;
+                return NGX_STREAM_SERVICE_UNAVAILABLE;
             }
 
             lc = (ngx_stream_limit_conn_node_t *) &node->color;
@@ -203,7 +203,7 @@ ngx_stream_limit_conn_handler(ngx_stream
                               &limits[i].shm_zone->shm.name);
 
                 ngx_stream_limit_conn_cleanup_all(s->connection->pool);
-                return NGX_ABORT;
+                return NGX_STREAM_SERVICE_UNAVAILABLE;
             }
 
             lc->conn++;
@@ -630,11 +630,17 @@ ngx_stream_limit_conn(ngx_conf_t *cf, ng
 static ngx_int_t
 ngx_stream_limit_conn_init(ngx_conf_t *cf)
 {
+    ngx_stream_handler_pt        *h;
     ngx_stream_core_main_conf_t  *cmcf;
 
     cmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_core_module);
 
-    cmcf->limit_conn_handler = ngx_stream_limit_conn_handler;
+    h = ngx_array_push(&cmcf->phases[NGX_STREAM_PREACCESS_PHASE].handlers);
+    if (h == NULL) {
+        return NGX_ERROR;
+    }
+
+    *h = ngx_stream_limit_conn_handler;
 
     return NGX_OK;
 }
--- a/src/stream/ngx_stream_log_module.c
+++ b/src/stream/ngx_stream_log_module.c
@@ -1464,11 +1464,17 @@ ngx_stream_log_open_file_cache(ngx_conf_
 static ngx_int_t
 ngx_stream_log_init(ngx_conf_t *cf)
 {
+    ngx_stream_handler_pt        *h;
     ngx_stream_core_main_conf_t  *cmcf;
 
     cmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_core_module);
 
-    cmcf->access_log_handler = ngx_stream_log_handler;
+    h = ngx_array_push(&cmcf->phases[NGX_STREAM_LOG_PHASE].handlers);
+    if (h == NULL) {
+        return NGX_ERROR;
+    }
+
+    *h = ngx_stream_log_handler;
 
     return NGX_OK;
 }
--- a/src/stream/ngx_stream_realip_module.c
+++ b/src/stream/ngx_stream_realip_module.c
@@ -279,11 +279,17 @@ ngx_stream_realip_add_variables(ngx_conf
 static ngx_int_t
 ngx_stream_realip_init(ngx_conf_t *cf)
 {
+    ngx_stream_handler_pt        *h;
     ngx_stream_core_main_conf_t  *cmcf;
 
     cmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_core_module);
 
-    cmcf->realip_handler = ngx_stream_realip_handler;
+    h = ngx_array_push(&cmcf->phases[NGX_STREAM_POST_ACCEPT_PHASE].handlers);
+    if (h == NULL) {
+        return NGX_ERROR;
+    }
+
+    *h = ngx_stream_realip_handler;
 
     return NGX_OK;
 }
--- a/src/stream/ngx_stream_ssl_module.c
+++ b/src/stream/ngx_stream_ssl_module.c
@@ -18,6 +18,10 @@ typedef ngx_int_t (*ngx_ssl_variable_han
 #define NGX_DEFAULT_ECDH_CURVE  "auto"
 
 
+static ngx_int_t ngx_stream_ssl_handler(ngx_stream_session_t *s);
+static ngx_int_t ngx_stream_ssl_init_connection(ngx_ssl_t *ssl,
+    ngx_connection_t *c);
+static void ngx_stream_ssl_handshake_handler(ngx_connection_t *c);
 static ngx_int_t ngx_stream_ssl_static_variable(ngx_stream_session_t *s,
     ngx_stream_variable_value_t *v, uintptr_t data);
 static ngx_int_t ngx_stream_ssl_variable(ngx_stream_session_t *s,
@@ -32,6 +36,7 @@ static char *ngx_stream_ssl_password_fil
     void *conf);
 static char *ngx_stream_ssl_session_cache(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf);
+static ngx_int_t ngx_stream_ssl_init(ngx_conf_t *cf);
 
 
 static ngx_conf_bitmask_t  ngx_stream_ssl_protocols[] = {
@@ -143,7 +148,7 @@ static ngx_command_t  ngx_stream_ssl_com
 
 static ngx_stream_module_t  ngx_stream_ssl_module_ctx = {
     ngx_stream_ssl_add_variables,          /* preconfiguration */
-    NULL,                                  /* postconfiguration */
+    ngx_stream_ssl_init,                   /* postconfiguration */
 
     NULL,                                  /* create main configuration */
     NULL,                                  /* init main configuration */
@@ -194,6 +199,88 @@ static ngx_str_t ngx_stream_ssl_sess_id_
 
 
 static ngx_int_t
+ngx_stream_ssl_handler(ngx_stream_session_t *s)
+{
+    ngx_connection_t       *c;
+    ngx_stream_ssl_conf_t  *sslcf;
+
+    c = s->connection;
+
+    sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module);
+
+    if (s->ssl && c->ssl == NULL) {
+        c->log->action = "SSL handshaking";
+
+        if (sslcf->ssl.ctx == NULL) {
+            ngx_log_error(NGX_LOG_ERR, c->log, 0,
+                          "no \"ssl_certificate\" is defined "
+                          "in server listening on SSL port");
+            return NGX_ERROR;
+        }
+
+        return ngx_stream_ssl_init_connection(&sslcf->ssl, c);
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_ssl_init_connection(ngx_ssl_t *ssl, ngx_connection_t *c)
+{
+    ngx_int_t               rc;
+    ngx_stream_session_t   *s;
+    ngx_stream_ssl_conf_t  *sslcf;
+
+    s = c->data;
+
+    if (ngx_ssl_create_connection(ssl, c, 0) == NGX_ERROR) {
+        return NGX_ERROR;
+    }
+
+    rc = ngx_ssl_handshake(c);
+
+    if (rc == NGX_ERROR) {
+        return NGX_ERROR;
+    }
+
+    if (rc == NGX_AGAIN) {
+        sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module);
+
+        ngx_add_timer(c->read, sslcf->handshake_timeout);
+
+        c->ssl->handler = ngx_stream_ssl_handshake_handler;
+
+        return NGX_AGAIN;
+    }
+
+    /* rc == NGX_OK */
+
+    return NGX_OK;
+}
+
+
+static void
+ngx_stream_ssl_handshake_handler(ngx_connection_t *c)
+{
+    ngx_stream_session_t  *s;
+
+    s = c->data;
+
+    if (!c->ssl->handshaked) {
+        ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+        return;
+    }
+
+    if (c->read->timer_set) {
+        ngx_del_timer(c->read);
+    }
+
+    ngx_stream_core_run_phases(s);
+}
+
+
+static ngx_int_t
 ngx_stream_ssl_static_variable(ngx_stream_session_t *s,
     ngx_stream_variable_value_t *v, uintptr_t data)
 {
@@ -565,3 +652,22 @@ invalid:
 
     return NGX_CONF_ERROR;
 }
+
+
+static ngx_int_t
+ngx_stream_ssl_init(ngx_conf_t *cf)
+{
+    ngx_stream_handler_pt        *h;
+    ngx_stream_core_main_conf_t  *cmcf;
+
+    cmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_core_module);
+
+    h = ngx_array_push(&cmcf->phases[NGX_STREAM_SSL_PHASE].handlers);
+    if (h == NULL) {
+        return NGX_ERROR;
+    }
+
+    *h = ngx_stream_ssl_handler;
+
+    return NGX_OK;
+}