changeset 6692:56fc55e32f23

Stream: filters.
author Roman Arutyunyan <arut@nginx.com>
date Thu, 15 Sep 2016 14:55:46 +0300
parents 4bce3edfac2c
children 3908156a51fa
files auto/modules auto/sources src/event/modules/ngx_iocp_module.c src/event/ngx_event.h src/event/ngx_event_accept.c src/event/ngx_event_connect.c src/os/unix/ngx_darwin_init.c src/os/unix/ngx_freebsd_init.c src/os/unix/ngx_linux_init.c src/os/unix/ngx_os.h src/os/unix/ngx_posix_init.c src/os/unix/ngx_solaris_init.c src/os/unix/ngx_udp_sendmsg_chain.c src/os/win32/ngx_os.h src/os/win32/ngx_win32_init.c src/stream/ngx_stream.c src/stream/ngx_stream.h src/stream/ngx_stream_handler.c src/stream/ngx_stream_proxy_module.c src/stream/ngx_stream_return_module.c src/stream/ngx_stream_upstream.h src/stream/ngx_stream_write_filter_module.c
diffstat 22 files changed, 742 insertions(+), 106 deletions(-) [+]
line wrap: on
line diff
--- a/auto/modules
+++ b/auto/modules
@@ -973,7 +973,8 @@ if [ $STREAM != NO ]; then
                      ngx_stream_core_module \
                      ngx_stream_log_module \
                      ngx_stream_proxy_module \
-                     ngx_stream_upstream_module"
+                     ngx_stream_upstream_module \
+                     ngx_stream_write_filter_module"
     ngx_module_incs="src/stream"
     ngx_module_deps="src/stream/ngx_stream.h \
                      src/stream/ngx_stream_variables.h \
@@ -988,7 +989,8 @@ if [ $STREAM != NO ]; then
                      src/stream/ngx_stream_log_module.c \
                      src/stream/ngx_stream_proxy_module.c \
                      src/stream/ngx_stream_upstream.c \
-                     src/stream/ngx_stream_upstream_round_robin.c"
+                     src/stream/ngx_stream_upstream_round_robin.c \
+                     src/stream/ngx_stream_write_filter_module.c"
 
     . auto/module
 
--- a/auto/sources
+++ b/auto/sources
@@ -167,6 +167,7 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \
             src/os/unix/ngx_send.c \
             src/os/unix/ngx_writev_chain.c \
             src/os/unix/ngx_udp_send.c \
+            src/os/unix/ngx_udp_sendmsg_chain.c \
             src/os/unix/ngx_channel.c \
             src/os/unix/ngx_shmem.c \
             src/os/unix/ngx_process.c \
--- a/src/event/modules/ngx_iocp_module.c
+++ b/src/event/modules/ngx_iocp_module.c
@@ -93,6 +93,8 @@ ngx_os_io_t ngx_iocp_io = {
     NULL,
     ngx_udp_overlapped_wsarecv,
     NULL,
+    NULL,
+    NULL,
     ngx_overlapped_wsasend_chain,
     0
 };
--- a/src/event/ngx_event.h
+++ b/src/event/ngx_event.h
@@ -430,6 +430,7 @@ extern ngx_os_io_t  ngx_io;
 #define ngx_send             ngx_io.send
 #define ngx_send_chain       ngx_io.send_chain
 #define ngx_udp_send         ngx_io.udp_send
+#define ngx_udp_send_chain   ngx_io.udp_send_chain
 
 
 #define NGX_EVENT_MODULE      0x544E5645  /* "EVNT" */
--- a/src/event/ngx_event_accept.c
+++ b/src/event/ngx_event_accept.c
@@ -467,6 +467,7 @@ ngx_event_recvmsg(ngx_event_t *ev)
         *log = ls->log;
 
         c->send = ngx_udp_send;
+        c->send_chain = ngx_udp_send_chain;
 
         c->log = log;
         c->pool->log = log;
--- a/src/event/ngx_event_connect.c
+++ b/src/event/ngx_event_connect.c
@@ -166,6 +166,7 @@ ngx_event_connect_peer(ngx_peer_connecti
     } else { /* type == SOCK_DGRAM */
         c->recv = ngx_udp_recv;
         c->send = ngx_send;
+        c->send_chain = ngx_udp_send_chain;
     }
 
     c->log_error = pc->log_error;
--- a/src/os/unix/ngx_darwin_init.c
+++ b/src/os/unix/ngx_darwin_init.c
@@ -24,6 +24,7 @@ static ngx_os_io_t ngx_darwin_io = {
     ngx_udp_unix_recv,
     ngx_unix_send,
     ngx_udp_unix_send,
+    ngx_udp_unix_sendmsg_chain,
 #if (NGX_HAVE_SENDFILE)
     ngx_darwin_sendfile_chain,
     NGX_IO_SENDFILE
--- a/src/os/unix/ngx_freebsd_init.c
+++ b/src/os/unix/ngx_freebsd_init.c
@@ -33,6 +33,7 @@ static ngx_os_io_t ngx_freebsd_io = {
     ngx_udp_unix_recv,
     ngx_unix_send,
     ngx_udp_unix_send,
+    ngx_udp_unix_sendmsg_chain,
 #if (NGX_HAVE_SENDFILE)
     ngx_freebsd_sendfile_chain,
     NGX_IO_SENDFILE
--- a/src/os/unix/ngx_linux_init.c
+++ b/src/os/unix/ngx_linux_init.c
@@ -19,6 +19,7 @@ static ngx_os_io_t ngx_linux_io = {
     ngx_udp_unix_recv,
     ngx_unix_send,
     ngx_udp_unix_send,
+    ngx_udp_unix_sendmsg_chain,
 #if (NGX_HAVE_SENDFILE)
     ngx_linux_sendfile_chain,
     NGX_IO_SENDFILE
--- a/src/os/unix/ngx_os.h
+++ b/src/os/unix/ngx_os.h
@@ -29,6 +29,7 @@ typedef struct {
     ngx_recv_pt        udp_recv;
     ngx_send_pt        send;
     ngx_send_pt        udp_send;
+    ngx_send_chain_pt  udp_send_chain;
     ngx_send_chain_pt  send_chain;
     ngx_uint_t         flags;
 } ngx_os_io_t;
@@ -49,6 +50,8 @@ ssize_t ngx_unix_send(ngx_connection_t *
 ngx_chain_t *ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in,
     off_t limit);
 ssize_t ngx_udp_unix_send(ngx_connection_t *c, u_char *buf, size_t size);
+ngx_chain_t *ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in,
+    off_t limit);
 
 
 #if (IOV_MAX > 64)
--- a/src/os/unix/ngx_posix_init.c
+++ b/src/os/unix/ngx_posix_init.c
@@ -25,6 +25,7 @@ ngx_os_io_t ngx_os_io = {
     ngx_udp_unix_recv,
     ngx_unix_send,
     ngx_udp_unix_send,
+    ngx_udp_unix_sendmsg_chain,
     ngx_writev_chain,
     0
 };
--- a/src/os/unix/ngx_solaris_init.c
+++ b/src/os/unix/ngx_solaris_init.c
@@ -20,6 +20,7 @@ static ngx_os_io_t ngx_solaris_io = {
     ngx_udp_unix_recv,
     ngx_unix_send,
     ngx_udp_unix_send,
+    ngx_udp_unix_sendmsg_chain,
 #if (NGX_HAVE_SENDFILE)
     ngx_solaris_sendfilev_chain,
     NGX_IO_SENDFILE
new file mode 100644
--- /dev/null
+++ b/src/os/unix/ngx_udp_sendmsg_chain.c
@@ -0,0 +1,245 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_event.h>
+
+
+static ngx_chain_t *ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec,
+    ngx_chain_t *in, ngx_log_t *log);
+static ssize_t ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec);
+
+
+ngx_chain_t *
+ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
+{
+    ssize_t        n;
+    off_t          send;
+    ngx_chain_t   *cl;
+    ngx_event_t   *wev;
+    ngx_iovec_t    vec;
+    struct iovec   iovs[NGX_IOVS_PREALLOCATE];
+
+    wev = c->write;
+
+    if (!wev->ready) {
+        return in;
+    }
+
+#if (NGX_HAVE_KQUEUE)
+
+    if ((ngx_event_flags & NGX_USE_KQUEUE_EVENT) && wev->pending_eof) {
+        (void) ngx_connection_error(c, wev->kq_errno,
+                               "kevent() reported about an closed connection");
+        wev->error = 1;
+        return NGX_CHAIN_ERROR;
+    }
+
+#endif
+
+    /* the maximum limit size is the maximum size_t value - the page size */
+
+    if (limit == 0 || limit > (off_t) (NGX_MAX_SIZE_T_VALUE - ngx_pagesize)) {
+        limit = NGX_MAX_SIZE_T_VALUE - ngx_pagesize;
+    }
+
+    send = 0;
+
+    vec.iovs = iovs;
+    vec.nalloc = NGX_IOVS_PREALLOCATE;
+
+    for ( ;; ) {
+
+        /* create the iovec and coalesce the neighbouring bufs */
+
+        cl = ngx_udp_output_chain_to_iovec(&vec, in, c->log);
+
+        if (cl == NGX_CHAIN_ERROR) {
+            return NGX_CHAIN_ERROR;
+        }
+
+        if (cl && cl->buf->in_file) {
+            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+                          "file buf in sendmsg "
+                          "t:%d r:%d f:%d %p %p-%p %p %O-%O",
+                          cl->buf->temporary,
+                          cl->buf->recycled,
+                          cl->buf->in_file,
+                          cl->buf->start,
+                          cl->buf->pos,
+                          cl->buf->last,
+                          cl->buf->file,
+                          cl->buf->file_pos,
+                          cl->buf->file_last);
+
+            ngx_debug_point();
+
+            return NGX_CHAIN_ERROR;
+        }
+
+        if (cl == in) {
+            return in;
+        }
+
+        send += vec.size;
+
+        n = ngx_sendmsg(c, &vec);
+
+        if (n == NGX_ERROR) {
+            return NGX_CHAIN_ERROR;
+        }
+
+        if (n == NGX_AGAIN) {
+            wev->ready = 0;
+            return in;
+        }
+
+        c->sent += n;
+
+        in = ngx_chain_update_sent(in, n);
+
+        if (send >= limit || in == NULL) {
+            return in;
+        }
+    }
+}
+
+
+static ngx_chain_t *
+ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec, ngx_chain_t *in, ngx_log_t *log)
+{
+    size_t         total, size;
+    u_char        *prev;
+    ngx_uint_t     n, flush;
+    ngx_chain_t   *cl;
+    struct iovec  *iov;
+
+    cl = in;
+    iov = NULL;
+    prev = NULL;
+    total = 0;
+    n = 0;
+    flush = 0;
+
+    for ( /* void */ ; in && !flush; in = in->next) {
+
+        if (in->buf->flush || in->buf->last_buf) {
+            flush = 1;
+        }
+
+        if (ngx_buf_special(in->buf)) {
+            continue;
+        }
+
+        if (in->buf->in_file) {
+            break;
+        }
+
+        if (!ngx_buf_in_memory(in->buf)) {
+            ngx_log_error(NGX_LOG_ALERT, log, 0,
+                          "bad buf in output chain "
+                          "t:%d r:%d f:%d %p %p-%p %p %O-%O",
+                          in->buf->temporary,
+                          in->buf->recycled,
+                          in->buf->in_file,
+                          in->buf->start,
+                          in->buf->pos,
+                          in->buf->last,
+                          in->buf->file,
+                          in->buf->file_pos,
+                          in->buf->file_last);
+
+            ngx_debug_point();
+
+            return NGX_CHAIN_ERROR;
+        }
+
+        size = in->buf->last - in->buf->pos;
+
+        if (prev == in->buf->pos) {
+            iov->iov_len += size;
+
+        } else {
+            if (n == vec->nalloc) {
+                ngx_log_error(NGX_LOG_ALERT, log, 0,
+                              "too many parts in a datagram");
+                return NGX_CHAIN_ERROR;
+            }
+
+            iov = &vec->iovs[n++];
+
+            iov->iov_base = (void *) in->buf->pos;
+            iov->iov_len = size;
+        }
+
+        prev = in->buf->pos + size;
+        total += size;
+    }
+
+    if (!flush) {
+#if (NGX_SUPPRESS_WARN)
+        vec->size = 0;
+        vec->count = 0;
+#endif
+        return cl;
+    }
+
+    vec->count = n;
+    vec->size = total;
+
+    return in;
+}
+
+
+static ssize_t
+ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec)
+{
+    ssize_t        n;
+    ngx_err_t      err;
+    struct msghdr  msg;
+
+    ngx_memzero(&msg, sizeof(struct msghdr));
+
+    if (c->socklen) {
+        msg.msg_name = c->sockaddr;
+        msg.msg_namelen = c->socklen;
+    }
+
+    msg.msg_iov = vec->iovs;
+    msg.msg_iovlen = vec->count;
+
+eintr:
+
+    n = sendmsg(c->fd, &msg, 0);
+
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "sendmsg: %z of %uz", n, vec->size);
+
+    if (n == -1) {
+        err = ngx_errno;
+
+        switch (err) {
+        case NGX_EAGAIN:
+            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err,
+                           "sendmsg() not ready");
+            return NGX_AGAIN;
+
+        case NGX_EINTR:
+            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err,
+                           "sendmsg() was interrupted");
+            goto eintr;
+
+        default:
+            c->write->error = 1;
+            ngx_connection_error(c, err, "sendmsg() failed");
+            return NGX_ERROR;
+        }
+    }
+
+    return n;
+}
--- a/src/os/win32/ngx_os.h
+++ b/src/os/win32/ngx_os.h
@@ -28,6 +28,8 @@ typedef struct {
     ngx_recv_chain_pt  recv_chain;
     ngx_recv_pt        udp_recv;
     ngx_send_pt        send;
+    ngx_send_pt        udp_send;
+    ngx_send_chain_pt  udp_send_chain;
     ngx_send_chain_pt  send_chain;
     ngx_uint_t         flags;
 } ngx_os_io_t;
--- a/src/os/win32/ngx_win32_init.c
+++ b/src/os/win32/ngx_win32_init.c
@@ -25,6 +25,8 @@ ngx_os_io_t ngx_os_io = {
     ngx_wsarecv_chain,
     ngx_udp_wsarecv,
     ngx_wsasend,
+    NULL,
+    NULL,
     ngx_wsasend_chain,
     0
 };
--- a/src/stream/ngx_stream.c
+++ b/src/stream/ngx_stream.c
@@ -27,6 +27,9 @@ static ngx_int_t ngx_stream_cmp_conf_add
 ngx_uint_t  ngx_stream_max_module;
 
 
+ngx_stream_filter_pt  ngx_stream_top_filter;
+
+
 static ngx_command_t  ngx_stream_commands[] = {
 
     { ngx_string("stream"),
--- a/src/stream/ngx_stream.h
+++ b/src/stream/ngx_stream.h
@@ -243,6 +243,9 @@ typedef struct {
         NULL)
 
 
+#define NGX_STREAM_WRITE_BUFFERED  0x10
+
+
 void ngx_stream_init_connection(ngx_connection_t *c);
 void ngx_stream_finalize_session(ngx_stream_session_t *s, ngx_uint_t rc);
 
@@ -252,4 +255,11 @@ extern ngx_uint_t    ngx_stream_max_modu
 extern ngx_module_t  ngx_stream_core_module;
 
 
+typedef ngx_int_t (*ngx_stream_filter_pt)(ngx_stream_session_t *s,
+    ngx_chain_t *chain, ngx_uint_t from_upstream);
+
+
+extern ngx_stream_filter_pt  ngx_stream_top_filter;
+
+
 #endif /* _NGX_STREAM_H_INCLUDED_ */
--- a/src/stream/ngx_stream_handler.c
+++ b/src/stream/ngx_stream_handler.c
@@ -134,6 +134,10 @@ ngx_stream_init_connection(ngx_connectio
     s->ssl = addr_conf->ssl;
 #endif
 
+    if (c->buffer) {
+        s->received += c->buffer->last - c->buffer->pos;
+    }
+
     s->connection = c;
     c->data = s;
 
--- a/src/stream/ngx_stream_proxy_module.c
+++ b/src/stream/ngx_stream_proxy_module.c
@@ -84,10 +84,10 @@ static char *ngx_stream_proxy_pass(ngx_c
     void *conf);
 static char *ngx_stream_proxy_bind(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf);
-static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
 
 #if (NGX_STREAM_SSL)
 
+static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
 static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf,
     ngx_command_t *cmd, void *conf);
 static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s);
@@ -385,8 +385,6 @@ ngx_stream_proxy_handler(ngx_stream_sess
     }
 
     u->peer.type = c->type;
-
-    u->proxy_protocol = pscf->proxy_protocol;
     u->start_sec = ngx_time();
 
     c->write->handler = ngx_stream_proxy_downstream_handler;
@@ -411,28 +409,6 @@ ngx_stream_proxy_handler(ngx_stream_sess
         u->downstream_buf.pos = p;
         u->downstream_buf.last = p;
 
-        if (u->proxy_protocol
-#if (NGX_STREAM_SSL)
-            && pscf->ssl == NULL
-#endif
-            && pscf->buffer_size >= NGX_PROXY_PROTOCOL_MAX_HEADER)
-        {
-            /* optimization for a typical case */
-
-            ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
-                           "stream proxy send PROXY protocol header");
-
-            p = ngx_proxy_protocol_write(c, u->downstream_buf.last,
-                                         u->downstream_buf.end);
-            if (p == NULL) {
-                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
-                return;
-            }
-
-            u->downstream_buf.last = p;
-            u->proxy_protocol = 0;
-        }
-
         if (c->read->ready) {
             ngx_post_event(c->read, &ngx_posted_events);
         }
@@ -682,8 +658,13 @@ ngx_stream_proxy_connect(ngx_stream_sess
 
     c->log->action = "connecting to upstream";
 
+    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
     u = s->upstream;
 
+    u->connected = 0;
+    u->proxy_protocol = pscf->proxy_protocol;
+
     if (u->state) {
         u->state->response_time = ngx_current_msec - u->state->response_time;
     }
@@ -740,8 +721,6 @@ ngx_stream_proxy_connect(ngx_stream_sess
     pc->read->handler = ngx_stream_proxy_connect_handler;
     pc->write->handler = ngx_stream_proxy_connect_handler;
 
-    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
-
     ngx_add_timer(pc->write, pscf->connect_timeout);
 }
 
@@ -751,6 +730,7 @@ ngx_stream_proxy_init_upstream(ngx_strea
 {
     int                           tcp_nodelay;
     u_char                       *p;
+    ngx_chain_t                  *cl;
     ngx_connection_t             *c, *pc;
     ngx_log_handler_pt            handler;
     ngx_stream_upstream_t        *u;
@@ -782,21 +762,26 @@ ngx_stream_proxy_init_upstream(ngx_strea
         pc->tcp_nodelay = NGX_TCP_NODELAY_SET;
     }
 
-    if (u->proxy_protocol) {
-        if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
-            return;
-        }
-
-        u->proxy_protocol = 0;
-    }
-
     pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
 
 #if (NGX_STREAM_SSL)
-    if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) {
-        ngx_stream_proxy_ssl_init_connection(s);
-        return;
+
+    if (pc->type == SOCK_STREAM && pscf->ssl) {
+
+        if (u->proxy_protocol) {
+            if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
+                return;
+            }
+
+            u->proxy_protocol = 0;
+        }
+
+        if (pc->ssl == NULL) {
+            ngx_stream_proxy_ssl_init_connection(s);
+            return;
+        }
     }
+
 #endif
 
     c = s->connection;
@@ -838,14 +823,66 @@ ngx_stream_proxy_init_upstream(ngx_strea
         u->upstream_buf.last = p;
     }
 
-    if (c->type == SOCK_DGRAM) {
-        s->received = c->buffer->last - c->buffer->pos;
-        u->downstream_buf = *c->buffer;
-
-        if (pscf->responses == 0) {
-            pc->read->ready = 0;
-            pc->read->eof = 1;
+    if (c->buffer && c->buffer->pos < c->buffer->last) {
+        ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                       "stream proxy add preread buffer: %uz",
+                       c->buffer->last - c->buffer->pos);
+
+        cl = ngx_chain_get_free_buf(c->pool, &u->free);
+        if (cl == NULL) {
+            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+            return;
+        }
+
+        *cl->buf = *c->buffer;
+
+        cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+        cl->buf->flush = 1;
+        cl->buf->last_buf = (c->type == SOCK_DGRAM);
+
+        cl->next = u->upstream_out;
+        u->upstream_out = cl;
+    }
+
+    if (u->proxy_protocol) {
+        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                       "stream proxy add PROXY protocol header");
+
+        cl = ngx_chain_get_free_buf(c->pool, &u->free);
+        if (cl == NULL) {
+            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+            return;
         }
+
+        p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER);
+        if (p == NULL) {
+            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+            return;
+        }
+
+        cl->buf->pos = p;
+
+        p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER);
+        if (p == NULL) {
+            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+            return;
+        }
+
+        cl->buf->last = p;
+        cl->buf->temporary = 1;
+        cl->buf->flush = 0;
+        cl->buf->last_buf = 0;
+        cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+
+        cl->next = u->upstream_out;
+        u->upstream_out = cl;
+
+        u->proxy_protocol = 0;
+    }
+
+    if (c->type == SOCK_DGRAM && pscf->responses == 0) {
+        pc->read->ready = 0;
+        pc->read->eof = 1;
     }
 
     u->connected = 1;
@@ -861,6 +898,8 @@ ngx_stream_proxy_init_upstream(ngx_strea
 }
 
 
+#if (NGX_STREAM_SSL)
+
 static ngx_int_t
 ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s)
 {
@@ -931,8 +970,6 @@ ngx_stream_proxy_send_proxy_protocol(ngx
 }
 
 
-#if (NGX_STREAM_SSL)
-
 static char *
 ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf)
@@ -1412,8 +1449,10 @@ ngx_stream_proxy_process(ngx_stream_sess
     size_t                        size, limit_rate;
     ssize_t                       n;
     ngx_buf_t                    *b;
+    ngx_int_t                     rc;
     ngx_uint_t                    flags;
     ngx_msec_t                    delay;
+    ngx_chain_t                  *cl, **ll, **out, **busy;
     ngx_connection_t             *c, *pc, *src, *dst;
     ngx_log_handler_pt            handler;
     ngx_stream_upstream_t        *u;
@@ -1447,6 +1486,8 @@ ngx_stream_proxy_process(ngx_stream_sess
         b = &u->upstream_buf;
         limit_rate = pscf->download_rate;
         received = &u->received;
+        out = &u->downstream_out;
+        busy = &u->downstream_busy;
 
     } else {
         src = c;
@@ -1454,24 +1495,18 @@ ngx_stream_proxy_process(ngx_stream_sess
         b = &u->downstream_buf;
         limit_rate = pscf->upload_rate;
         received = &s->received;
+        out = &u->upstream_out;
+        busy = &u->upstream_busy;
     }
 
     for ( ;; ) {
 
-        if (do_write) {
-
-            size = b->last - b->pos;
-
-            if (size && dst && dst->write->ready) {
-
-                n = dst->send(dst, b->pos, size);
-
-                if (n == NGX_AGAIN && dst->shared) {
-                    /* cannot wait on a shared socket */
-                    n = NGX_ERROR;
-                }
-
-                if (n == NGX_ERROR) {
+        if (do_write && dst) {
+
+            if (*out || *busy || dst->buffered) {
+                rc = ngx_stream_top_filter(s, *out, from_upstream);
+
+                if (rc == NGX_ERROR) {
                     if (c->type == SOCK_DGRAM && !from_upstream) {
                         ngx_stream_proxy_next_upstream(s);
                         return;
@@ -1481,13 +1516,12 @@ ngx_stream_proxy_process(ngx_stream_sess
                     return;
                 }
 
-                if (n > 0) {
-                    b->pos += n;
-
-                    if (b->pos == b->last) {
-                        b->pos = b->start;
-                        b->last = b->start;
-                    }
+                ngx_chain_update_chains(c->pool, &u->free, busy, out,
+                                      (ngx_buf_tag_t) &ngx_stream_proxy_module);
+
+                if (*busy == NULL) {
+                    b->pos = b->start;
+                    b->last = b->start;
                 }
             }
         }
@@ -1514,11 +1548,21 @@ ngx_stream_proxy_process(ngx_stream_sess
 
             n = src->recv(src, b->last, size);
 
-            if (n == NGX_AGAIN || n == 0) {
+            if (n == NGX_AGAIN) {
                 break;
             }
 
-            if (n > 0) {
+            if (n == NGX_ERROR) {
+                if (c->type == SOCK_DGRAM && u->received == 0) {
+                    ngx_stream_proxy_next_upstream(s);
+                    return;
+                }
+
+                src->read->eof = 1;
+                n = 0;
+            }
+
+            if (n >= 0) {
                 if (limit_rate) {
                     delay = (ngx_msec_t) (n * 1000 / limit_rate);
 
@@ -1541,27 +1585,37 @@ ngx_stream_proxy_process(ngx_stream_sess
                     src->read->eof = 1;
                 }
 
+                for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }
+
+                cl = ngx_chain_get_free_buf(c->pool, &u->free);
+                if (cl == NULL) {
+                    ngx_stream_proxy_finalize(s,
+                                              NGX_STREAM_INTERNAL_SERVER_ERROR);
+                    return;
+                }
+
+                *ll = cl;
+
+                cl->buf->pos = b->last;
+                cl->buf->last = b->last + n;
+                cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+
+                cl->buf->temporary = (n ? 1 : 0);
+                cl->buf->last_buf = src->read->eof;
+                cl->buf->flush = 1;
+
                 *received += n;
                 b->last += n;
                 do_write = 1;
 
                 continue;
             }
-
-            if (n == NGX_ERROR) {
-                if (c->type == SOCK_DGRAM && u->received == 0) {
-                    ngx_stream_proxy_next_upstream(s);
-                    return;
-                }
-
-                src->read->eof = 1;
-            }
         }
 
         break;
     }
 
-    if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) {
+    if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) {
         handler = c->log->handler;
         c->log->handler = NULL;
 
@@ -1614,6 +1668,14 @@ ngx_stream_proxy_next_upstream(ngx_strea
                    "stream proxy next upstream");
 
     u = s->upstream;
+    pc = u->peer.connection;
+
+    if (u->upstream_out || u->upstream_busy || (pc && pc->buffered)) {
+        ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
+                      "pending buffers on next upstream");
+        ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+        return;
+    }
 
     if (u->peer.sockaddr) {
         u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED);
@@ -1632,8 +1694,6 @@ ngx_stream_proxy_next_upstream(ngx_strea
         return;
     }
 
-    pc = u->peer.connection;
-
     if (pc) {
         ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
                        "close proxy upstream connection: %d", pc->fd);
--- a/src/stream/ngx_stream_return_module.c
+++ b/src/stream/ngx_stream_return_module.c
@@ -11,12 +11,12 @@
 
 
 typedef struct {
-    ngx_stream_complex_value_t  text;
+    ngx_stream_complex_value_t   text;
 } ngx_stream_return_srv_conf_t;
 
 
 typedef struct {
-    ngx_buf_t                   buf;
+    ngx_chain_t                 *out;
 } ngx_stream_return_ctx_t;
 
 
@@ -72,6 +72,7 @@ static void
 ngx_stream_return_handler(ngx_stream_session_t *s)
 {
     ngx_str_t                      text;
+    ngx_buf_t                     *b;
     ngx_connection_t              *c;
     ngx_stream_return_ctx_t       *ctx;
     ngx_stream_return_srv_conf_t  *rscf;
@@ -103,8 +104,25 @@ ngx_stream_return_handler(ngx_stream_ses
 
     ngx_stream_set_ctx(s, ctx, ngx_stream_return_module);
 
-    ctx->buf.pos = text.data;
-    ctx->buf.last = text.data + text.len;
+    b = ngx_calloc_buf(c->pool);
+    if (b == NULL) {
+        ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+        return;
+    }
+
+    b->memory = 1;
+    b->pos = text.data;
+    b->last = text.data + text.len;
+    b->last_buf = 1;
+
+    ctx->out = ngx_alloc_chain_link(c->pool);
+    if (ctx->out == NULL) {
+        ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+        return;
+    }
+
+    ctx->out->buf = b;
+    ctx->out->next = NULL;
 
     c->write->handler = ngx_stream_return_write_handler;
 
@@ -115,8 +133,6 @@ ngx_stream_return_handler(ngx_stream_ses
 static void
 ngx_stream_return_write_handler(ngx_event_t *ev)
 {
-    ssize_t                   n;
-    ngx_buf_t                *b;
     ngx_connection_t         *c;
     ngx_stream_session_t     *s;
     ngx_stream_return_ctx_t  *ctx;
@@ -130,25 +146,20 @@ ngx_stream_return_write_handler(ngx_even
         return;
     }
 
-    if (ev->ready) {
-        ctx = ngx_stream_get_module_ctx(s, ngx_stream_return_module);
+    ctx = ngx_stream_get_module_ctx(s, ngx_stream_return_module);
 
-        b = &ctx->buf;
+    if (ngx_stream_top_filter(s, ctx->out, 1) == NGX_ERROR) {
+        ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+        return;
+    }
 
-        n = c->send(c, b->pos, b->last - b->pos);
-        if (n == NGX_ERROR) {
-            ngx_stream_finalize_session(s, NGX_STREAM_OK);
-            return;
-        }
+    ctx->out = NULL;
 
-        if (n > 0) {
-            b->pos += n;
-
-            if (b->pos == b->last) {
-                ngx_stream_finalize_session(s, NGX_STREAM_OK);
-                return;
-            }
-        }
+    if (!c->buffered) {
+        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                       "stream return done sending");
+        ngx_stream_finalize_session(s, NGX_STREAM_OK);
+        return;
     }
 
     if (ngx_handle_write_event(ev, 0) != NGX_OK) {
--- a/src/stream/ngx_stream_upstream.h
+++ b/src/stream/ngx_stream_upstream.h
@@ -106,14 +106,24 @@ typedef struct {
 
 typedef struct {
     ngx_peer_connection_t              peer;
+
     ngx_buf_t                          downstream_buf;
     ngx_buf_t                          upstream_buf;
+
+    ngx_chain_t                       *free;
+    ngx_chain_t                       *upstream_out;
+    ngx_chain_t                       *upstream_busy;
+    ngx_chain_t                       *downstream_out;
+    ngx_chain_t                       *downstream_busy;
+
     off_t                              received;
     time_t                             start_sec;
     ngx_uint_t                         responses;
+
 #if (NGX_STREAM_SSL)
     ngx_str_t                          ssl_name;
 #endif
+
     ngx_stream_upstream_resolved_t    *resolved;
     ngx_stream_upstream_state_t       *state;
     unsigned                           connected:1;
new file mode 100644
--- /dev/null
+++ b/src/stream/ngx_stream_write_filter_module.c
@@ -0,0 +1,273 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+typedef struct {
+    ngx_chain_t  *from_upstream;
+    ngx_chain_t  *from_downstream;
+} ngx_stream_write_filter_ctx_t;
+
+
+static ngx_int_t ngx_stream_write_filter(ngx_stream_session_t *s,
+    ngx_chain_t *in, ngx_uint_t from_upstream);
+static ngx_int_t ngx_stream_write_filter_init(ngx_conf_t *cf);
+
+
+static ngx_stream_module_t  ngx_stream_write_filter_module_ctx = {
+    NULL,                                  /* preconfiguration */
+    ngx_stream_write_filter_init,          /* postconfiguration */
+
+    NULL,                                  /* create main configuration */
+    NULL,                                  /* init main configuration */
+
+    NULL,                                  /* create server configuration */
+    NULL                                   /* merge server configuration */
+};
+
+
+ngx_module_t  ngx_stream_write_filter_module = {
+    NGX_MODULE_V1,
+    &ngx_stream_write_filter_module_ctx,   /* module context */
+    NULL,                                  /* module directives */
+    NGX_STREAM_MODULE,                     /* module type */
+    NULL,                                  /* init master */
+    NULL,                                  /* init module */
+    NULL,                                  /* init process */
+    NULL,                                  /* init thread */
+    NULL,                                  /* exit thread */
+    NULL,                                  /* exit process */
+    NULL,                                  /* exit master */
+    NGX_MODULE_V1_PADDING
+};
+
+
+static ngx_int_t
+ngx_stream_write_filter(ngx_stream_session_t *s, ngx_chain_t *in,
+    ngx_uint_t from_upstream)
+{
+    off_t                           size;
+    ngx_uint_t                      last, flush, sync;
+    ngx_chain_t                    *cl, *ln, **ll, **out, *chain;
+    ngx_connection_t               *c;
+    ngx_stream_write_filter_ctx_t  *ctx;
+
+    ctx = ngx_stream_get_module_ctx(s, ngx_stream_write_filter_module);
+
+    if (ctx == NULL) {
+        ctx = ngx_pcalloc(s->connection->pool,
+                          sizeof(ngx_stream_write_filter_ctx_t));
+        if (ctx == NULL) {
+            return NGX_ERROR;
+        }
+
+        ngx_stream_set_ctx(s, ctx, ngx_stream_write_filter_module);
+    }
+
+    if (from_upstream) {
+        c = s->connection;
+        out = &ctx->from_upstream;
+
+    } else {
+        c = s->upstream->peer.connection;
+        out = &ctx->from_downstream;
+    }
+
+    if (c->error) {
+        return NGX_ERROR;
+    }
+
+    size = 0;
+    flush = 0;
+    sync = 0;
+    last = 0;
+    ll = out;
+
+    /* find the size, the flush point and the last link of the saved chain */
+
+    for (cl = *out; cl; cl = cl->next) {
+        ll = &cl->next;
+
+        ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "write old buf t:%d f:%d %p, pos %p, size: %z "
+                       "file: %O, size: %O",
+                       cl->buf->temporary, cl->buf->in_file,
+                       cl->buf->start, cl->buf->pos,
+                       cl->buf->last - cl->buf->pos,
+                       cl->buf->file_pos,
+                       cl->buf->file_last - cl->buf->file_pos);
+
+#if 1
+        if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
+            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+                          "zero size buf in writer "
+                          "t:%d r:%d f:%d %p %p-%p %p %O-%O",
+                          cl->buf->temporary,
+                          cl->buf->recycled,
+                          cl->buf->in_file,
+                          cl->buf->start,
+                          cl->buf->pos,
+                          cl->buf->last,
+                          cl->buf->file,
+                          cl->buf->file_pos,
+                          cl->buf->file_last);
+
+            ngx_debug_point();
+            return NGX_ERROR;
+        }
+#endif
+
+        size += ngx_buf_size(cl->buf);
+
+        if (cl->buf->flush || cl->buf->recycled) {
+            flush = 1;
+        }
+
+        if (cl->buf->sync) {
+            sync = 1;
+        }
+
+        if (cl->buf->last_buf) {
+            last = 1;
+        }
+    }
+
+    /* add the new chain to the existent one */
+
+    for (ln = in; ln; ln = ln->next) {
+        cl = ngx_alloc_chain_link(c->pool);
+        if (cl == NULL) {
+            return NGX_ERROR;
+        }
+
+        cl->buf = ln->buf;
+        *ll = cl;
+        ll = &cl->next;
+
+        ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "write new buf t:%d f:%d %p, pos %p, size: %z "
+                       "file: %O, size: %O",
+                       cl->buf->temporary, cl->buf->in_file,
+                       cl->buf->start, cl->buf->pos,
+                       cl->buf->last - cl->buf->pos,
+                       cl->buf->file_pos,
+                       cl->buf->file_last - cl->buf->file_pos);
+
+#if 1
+        if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
+            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+                          "zero size buf in writer "
+                          "t:%d r:%d f:%d %p %p-%p %p %O-%O",
+                          cl->buf->temporary,
+                          cl->buf->recycled,
+                          cl->buf->in_file,
+                          cl->buf->start,
+                          cl->buf->pos,
+                          cl->buf->last,
+                          cl->buf->file,
+                          cl->buf->file_pos,
+                          cl->buf->file_last);
+
+            ngx_debug_point();
+            return NGX_ERROR;
+        }
+#endif
+
+        size += ngx_buf_size(cl->buf);
+
+        if (cl->buf->flush || cl->buf->recycled) {
+            flush = 1;
+        }
+
+        if (cl->buf->sync) {
+            sync = 1;
+        }
+
+        if (cl->buf->last_buf) {
+            last = 1;
+        }
+    }
+
+    *ll = NULL;
+
+    ngx_log_debug3(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                   "stream write filter: l:%ui f:%ui s:%O", last, flush, size);
+
+    if (size == 0
+        && !(c->buffered & NGX_LOWLEVEL_BUFFERED)
+        && !(last && c->need_last_buf))
+    {
+        if (last || flush || sync) {
+            for (cl = *out; cl; /* void */) {
+                ln = cl;
+                cl = cl->next;
+                ngx_free_chain(c->pool, ln);
+            }
+
+            *out = NULL;
+            c->buffered &= ~NGX_STREAM_WRITE_BUFFERED;
+
+            return NGX_OK;
+        }
+
+        ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+                      "the stream output chain is empty");
+
+        ngx_debug_point();
+
+        return NGX_ERROR;
+    }
+
+    chain = c->send_chain(c, *out, 0);
+
+    ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                   "stream write filter %p", chain);
+
+    if (chain == NGX_CHAIN_ERROR) {
+        c->error = 1;
+        return NGX_ERROR;
+    }
+
+    for (cl = *out; cl && cl != chain; /* void */) {
+        ln = cl;
+        cl = cl->next;
+        ngx_free_chain(c->pool, ln);
+    }
+
+    *out = chain;
+
+    if (chain) {
+        if (c->shared) {
+            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+                          "shared connection is busy");
+            return NGX_ERROR;
+        }
+
+        c->buffered |= NGX_STREAM_WRITE_BUFFERED;
+        return NGX_AGAIN;
+    }
+
+    c->buffered &= ~NGX_STREAM_WRITE_BUFFERED;
+
+    if (c->buffered & NGX_LOWLEVEL_BUFFERED) {
+        return NGX_AGAIN;
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_write_filter_init(ngx_conf_t *cf)
+{
+    ngx_stream_top_filter = ngx_stream_write_filter;
+
+    return NGX_OK;
+}