# HG changeset patch # User Roman Arutyunyan # Date 1473940546 -10800 # Node ID 56fc55e32f230cc3c74ba08c720770cf838ecae0 # Parent 4bce3edfac2c03c84685c78adc88a549251ae8e5 Stream: filters. diff --git a/auto/modules b/auto/modules --- a/auto/modules +++ b/auto/modules @@ -973,7 +973,8 @@ if [ $STREAM != NO ]; then ngx_stream_core_module \ ngx_stream_log_module \ ngx_stream_proxy_module \ - ngx_stream_upstream_module" + ngx_stream_upstream_module \ + ngx_stream_write_filter_module" ngx_module_incs="src/stream" ngx_module_deps="src/stream/ngx_stream.h \ src/stream/ngx_stream_variables.h \ @@ -988,7 +989,8 @@ if [ $STREAM != NO ]; then src/stream/ngx_stream_log_module.c \ src/stream/ngx_stream_proxy_module.c \ src/stream/ngx_stream_upstream.c \ - src/stream/ngx_stream_upstream_round_robin.c" + src/stream/ngx_stream_upstream_round_robin.c \ + src/stream/ngx_stream_write_filter_module.c" . auto/module diff --git a/auto/sources b/auto/sources --- a/auto/sources +++ b/auto/sources @@ -167,6 +167,7 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \ src/os/unix/ngx_send.c \ src/os/unix/ngx_writev_chain.c \ src/os/unix/ngx_udp_send.c \ + src/os/unix/ngx_udp_sendmsg_chain.c \ src/os/unix/ngx_channel.c \ src/os/unix/ngx_shmem.c \ src/os/unix/ngx_process.c \ diff --git a/src/event/modules/ngx_iocp_module.c b/src/event/modules/ngx_iocp_module.c --- a/src/event/modules/ngx_iocp_module.c +++ b/src/event/modules/ngx_iocp_module.c @@ -93,6 +93,8 @@ ngx_os_io_t ngx_iocp_io = { NULL, ngx_udp_overlapped_wsarecv, NULL, + NULL, + NULL, ngx_overlapped_wsasend_chain, 0 }; diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -430,6 +430,7 @@ extern ngx_os_io_t ngx_io; #define ngx_send ngx_io.send #define ngx_send_chain ngx_io.send_chain #define ngx_udp_send ngx_io.udp_send +#define ngx_udp_send_chain ngx_io.udp_send_chain #define NGX_EVENT_MODULE 0x544E5645 /* "EVNT" */ diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c --- a/src/event/ngx_event_accept.c +++ b/src/event/ngx_event_accept.c @@ -467,6 +467,7 @@ ngx_event_recvmsg(ngx_event_t *ev) *log = ls->log; c->send = ngx_udp_send; + c->send_chain = ngx_udp_send_chain; c->log = log; c->pool->log = log; diff --git a/src/event/ngx_event_connect.c b/src/event/ngx_event_connect.c --- a/src/event/ngx_event_connect.c +++ b/src/event/ngx_event_connect.c @@ -166,6 +166,7 @@ ngx_event_connect_peer(ngx_peer_connecti } else { /* type == SOCK_DGRAM */ c->recv = ngx_udp_recv; c->send = ngx_send; + c->send_chain = ngx_udp_send_chain; } c->log_error = pc->log_error; diff --git a/src/os/unix/ngx_darwin_init.c b/src/os/unix/ngx_darwin_init.c --- a/src/os/unix/ngx_darwin_init.c +++ b/src/os/unix/ngx_darwin_init.c @@ -24,6 +24,7 @@ static ngx_os_io_t ngx_darwin_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, #if (NGX_HAVE_SENDFILE) ngx_darwin_sendfile_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_freebsd_init.c b/src/os/unix/ngx_freebsd_init.c --- a/src/os/unix/ngx_freebsd_init.c +++ b/src/os/unix/ngx_freebsd_init.c @@ -33,6 +33,7 @@ static ngx_os_io_t ngx_freebsd_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, #if (NGX_HAVE_SENDFILE) ngx_freebsd_sendfile_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_linux_init.c b/src/os/unix/ngx_linux_init.c --- a/src/os/unix/ngx_linux_init.c +++ b/src/os/unix/ngx_linux_init.c @@ -19,6 +19,7 @@ static ngx_os_io_t ngx_linux_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, #if (NGX_HAVE_SENDFILE) ngx_linux_sendfile_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_os.h b/src/os/unix/ngx_os.h --- a/src/os/unix/ngx_os.h +++ b/src/os/unix/ngx_os.h @@ -29,6 +29,7 @@ typedef struct { ngx_recv_pt udp_recv; ngx_send_pt send; ngx_send_pt udp_send; + ngx_send_chain_pt udp_send_chain; ngx_send_chain_pt send_chain; ngx_uint_t flags; } ngx_os_io_t; @@ -49,6 +50,8 @@ ssize_t ngx_unix_send(ngx_connection_t * ngx_chain_t *ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit); ssize_t ngx_udp_unix_send(ngx_connection_t *c, u_char *buf, size_t size); +ngx_chain_t *ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in, + off_t limit); #if (IOV_MAX > 64) diff --git a/src/os/unix/ngx_posix_init.c b/src/os/unix/ngx_posix_init.c --- a/src/os/unix/ngx_posix_init.c +++ b/src/os/unix/ngx_posix_init.c @@ -25,6 +25,7 @@ ngx_os_io_t ngx_os_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, ngx_writev_chain, 0 }; diff --git a/src/os/unix/ngx_solaris_init.c b/src/os/unix/ngx_solaris_init.c --- a/src/os/unix/ngx_solaris_init.c +++ b/src/os/unix/ngx_solaris_init.c @@ -20,6 +20,7 @@ static ngx_os_io_t ngx_solaris_io = { ngx_udp_unix_recv, ngx_unix_send, ngx_udp_unix_send, + ngx_udp_unix_sendmsg_chain, #if (NGX_HAVE_SENDFILE) ngx_solaris_sendfilev_chain, NGX_IO_SENDFILE diff --git a/src/os/unix/ngx_udp_sendmsg_chain.c b/src/os/unix/ngx_udp_sendmsg_chain.c new file mode 100644 --- /dev/null +++ b/src/os/unix/ngx_udp_sendmsg_chain.c @@ -0,0 +1,245 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +static ngx_chain_t *ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec, + ngx_chain_t *in, ngx_log_t *log); +static ssize_t ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec); + + +ngx_chain_t * +ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) +{ + ssize_t n; + off_t send; + ngx_chain_t *cl; + ngx_event_t *wev; + ngx_iovec_t vec; + struct iovec iovs[NGX_IOVS_PREALLOCATE]; + + wev = c->write; + + if (!wev->ready) { + return in; + } + +#if (NGX_HAVE_KQUEUE) + + if ((ngx_event_flags & NGX_USE_KQUEUE_EVENT) && wev->pending_eof) { + (void) ngx_connection_error(c, wev->kq_errno, + "kevent() reported about an closed connection"); + wev->error = 1; + return NGX_CHAIN_ERROR; + } + +#endif + + /* the maximum limit size is the maximum size_t value - the page size */ + + if (limit == 0 || limit > (off_t) (NGX_MAX_SIZE_T_VALUE - ngx_pagesize)) { + limit = NGX_MAX_SIZE_T_VALUE - ngx_pagesize; + } + + send = 0; + + vec.iovs = iovs; + vec.nalloc = NGX_IOVS_PREALLOCATE; + + for ( ;; ) { + + /* create the iovec and coalesce the neighbouring bufs */ + + cl = ngx_udp_output_chain_to_iovec(&vec, in, c->log); + + if (cl == NGX_CHAIN_ERROR) { + return NGX_CHAIN_ERROR; + } + + if (cl && cl->buf->in_file) { + ngx_log_error(NGX_LOG_ALERT, c->log, 0, + "file buf in sendmsg " + "t:%d r:%d f:%d %p %p-%p %p %O-%O", + cl->buf->temporary, + cl->buf->recycled, + cl->buf->in_file, + cl->buf->start, + cl->buf->pos, + cl->buf->last, + cl->buf->file, + cl->buf->file_pos, + cl->buf->file_last); + + ngx_debug_point(); + + return NGX_CHAIN_ERROR; + } + + if (cl == in) { + return in; + } + + send += vec.size; + + n = ngx_sendmsg(c, &vec); + + if (n == NGX_ERROR) { + return NGX_CHAIN_ERROR; + } + + if (n == NGX_AGAIN) { + wev->ready = 0; + return in; + } + + c->sent += n; + + in = ngx_chain_update_sent(in, n); + + if (send >= limit || in == NULL) { + return in; + } + } +} + + +static ngx_chain_t * +ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec, ngx_chain_t *in, ngx_log_t *log) +{ + size_t total, size; + u_char *prev; + ngx_uint_t n, flush; + ngx_chain_t *cl; + struct iovec *iov; + + cl = in; + iov = NULL; + prev = NULL; + total = 0; + n = 0; + flush = 0; + + for ( /* void */ ; in && !flush; in = in->next) { + + if (in->buf->flush || in->buf->last_buf) { + flush = 1; + } + + if (ngx_buf_special(in->buf)) { + continue; + } + + if (in->buf->in_file) { + break; + } + + if (!ngx_buf_in_memory(in->buf)) { + ngx_log_error(NGX_LOG_ALERT, log, 0, + "bad buf in output chain " + "t:%d r:%d f:%d %p %p-%p %p %O-%O", + in->buf->temporary, + in->buf->recycled, + in->buf->in_file, + in->buf->start, + in->buf->pos, + in->buf->last, + in->buf->file, + in->buf->file_pos, + in->buf->file_last); + + ngx_debug_point(); + + return NGX_CHAIN_ERROR; + } + + size = in->buf->last - in->buf->pos; + + if (prev == in->buf->pos) { + iov->iov_len += size; + + } else { + if (n == vec->nalloc) { + ngx_log_error(NGX_LOG_ALERT, log, 0, + "too many parts in a datagram"); + return NGX_CHAIN_ERROR; + } + + iov = &vec->iovs[n++]; + + iov->iov_base = (void *) in->buf->pos; + iov->iov_len = size; + } + + prev = in->buf->pos + size; + total += size; + } + + if (!flush) { +#if (NGX_SUPPRESS_WARN) + vec->size = 0; + vec->count = 0; +#endif + return cl; + } + + vec->count = n; + vec->size = total; + + return in; +} + + +static ssize_t +ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec) +{ + ssize_t n; + ngx_err_t err; + struct msghdr msg; + + ngx_memzero(&msg, sizeof(struct msghdr)); + + if (c->socklen) { + msg.msg_name = c->sockaddr; + msg.msg_namelen = c->socklen; + } + + msg.msg_iov = vec->iovs; + msg.msg_iovlen = vec->count; + +eintr: + + n = sendmsg(c->fd, &msg, 0); + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, + "sendmsg: %z of %uz", n, vec->size); + + if (n == -1) { + err = ngx_errno; + + switch (err) { + case NGX_EAGAIN: + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err, + "sendmsg() not ready"); + return NGX_AGAIN; + + case NGX_EINTR: + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err, + "sendmsg() was interrupted"); + goto eintr; + + default: + c->write->error = 1; + ngx_connection_error(c, err, "sendmsg() failed"); + return NGX_ERROR; + } + } + + return n; +} diff --git a/src/os/win32/ngx_os.h b/src/os/win32/ngx_os.h --- a/src/os/win32/ngx_os.h +++ b/src/os/win32/ngx_os.h @@ -28,6 +28,8 @@ typedef struct { ngx_recv_chain_pt recv_chain; ngx_recv_pt udp_recv; ngx_send_pt send; + ngx_send_pt udp_send; + ngx_send_chain_pt udp_send_chain; ngx_send_chain_pt send_chain; ngx_uint_t flags; } ngx_os_io_t; diff --git a/src/os/win32/ngx_win32_init.c b/src/os/win32/ngx_win32_init.c --- a/src/os/win32/ngx_win32_init.c +++ b/src/os/win32/ngx_win32_init.c @@ -25,6 +25,8 @@ ngx_os_io_t ngx_os_io = { ngx_wsarecv_chain, ngx_udp_wsarecv, ngx_wsasend, + NULL, + NULL, ngx_wsasend_chain, 0 }; diff --git a/src/stream/ngx_stream.c b/src/stream/ngx_stream.c --- a/src/stream/ngx_stream.c +++ b/src/stream/ngx_stream.c @@ -27,6 +27,9 @@ static ngx_int_t ngx_stream_cmp_conf_add ngx_uint_t ngx_stream_max_module; +ngx_stream_filter_pt ngx_stream_top_filter; + + static ngx_command_t ngx_stream_commands[] = { { ngx_string("stream"), diff --git a/src/stream/ngx_stream.h b/src/stream/ngx_stream.h --- a/src/stream/ngx_stream.h +++ b/src/stream/ngx_stream.h @@ -243,6 +243,9 @@ typedef struct { NULL) +#define NGX_STREAM_WRITE_BUFFERED 0x10 + + void ngx_stream_init_connection(ngx_connection_t *c); void ngx_stream_finalize_session(ngx_stream_session_t *s, ngx_uint_t rc); @@ -252,4 +255,11 @@ extern ngx_uint_t ngx_stream_max_modu extern ngx_module_t ngx_stream_core_module; +typedef ngx_int_t (*ngx_stream_filter_pt)(ngx_stream_session_t *s, + ngx_chain_t *chain, ngx_uint_t from_upstream); + + +extern ngx_stream_filter_pt ngx_stream_top_filter; + + #endif /* _NGX_STREAM_H_INCLUDED_ */ diff --git a/src/stream/ngx_stream_handler.c b/src/stream/ngx_stream_handler.c --- a/src/stream/ngx_stream_handler.c +++ b/src/stream/ngx_stream_handler.c @@ -134,6 +134,10 @@ ngx_stream_init_connection(ngx_connectio s->ssl = addr_conf->ssl; #endif + if (c->buffer) { + s->received += c->buffer->last - c->buffer->pos; + } + s->connection = c; c->data = s; diff --git a/src/stream/ngx_stream_proxy_module.c b/src/stream/ngx_stream_proxy_module.c --- a/src/stream/ngx_stream_proxy_module.c +++ b/src/stream/ngx_stream_proxy_module.c @@ -84,10 +84,10 @@ static char *ngx_stream_proxy_pass(ngx_c void *conf); static char *ngx_stream_proxy_bind(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); -static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s); #if (NGX_STREAM_SSL) +static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s); static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s); @@ -385,8 +385,6 @@ ngx_stream_proxy_handler(ngx_stream_sess } u->peer.type = c->type; - - u->proxy_protocol = pscf->proxy_protocol; u->start_sec = ngx_time(); c->write->handler = ngx_stream_proxy_downstream_handler; @@ -411,28 +409,6 @@ ngx_stream_proxy_handler(ngx_stream_sess u->downstream_buf.pos = p; u->downstream_buf.last = p; - if (u->proxy_protocol -#if (NGX_STREAM_SSL) - && pscf->ssl == NULL -#endif - && pscf->buffer_size >= NGX_PROXY_PROTOCOL_MAX_HEADER) - { - /* optimization for a typical case */ - - ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, - "stream proxy send PROXY protocol header"); - - p = ngx_proxy_protocol_write(c, u->downstream_buf.last, - u->downstream_buf.end); - if (p == NULL) { - ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); - return; - } - - u->downstream_buf.last = p; - u->proxy_protocol = 0; - } - if (c->read->ready) { ngx_post_event(c->read, &ngx_posted_events); } @@ -682,8 +658,13 @@ ngx_stream_proxy_connect(ngx_stream_sess c->log->action = "connecting to upstream"; + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + u = s->upstream; + u->connected = 0; + u->proxy_protocol = pscf->proxy_protocol; + if (u->state) { u->state->response_time = ngx_current_msec - u->state->response_time; } @@ -740,8 +721,6 @@ ngx_stream_proxy_connect(ngx_stream_sess pc->read->handler = ngx_stream_proxy_connect_handler; pc->write->handler = ngx_stream_proxy_connect_handler; - pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); - ngx_add_timer(pc->write, pscf->connect_timeout); } @@ -751,6 +730,7 @@ ngx_stream_proxy_init_upstream(ngx_strea { int tcp_nodelay; u_char *p; + ngx_chain_t *cl; ngx_connection_t *c, *pc; ngx_log_handler_pt handler; ngx_stream_upstream_t *u; @@ -782,21 +762,26 @@ ngx_stream_proxy_init_upstream(ngx_strea pc->tcp_nodelay = NGX_TCP_NODELAY_SET; } - if (u->proxy_protocol) { - if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) { - return; - } - - u->proxy_protocol = 0; - } - pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); #if (NGX_STREAM_SSL) - if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) { - ngx_stream_proxy_ssl_init_connection(s); - return; + + if (pc->type == SOCK_STREAM && pscf->ssl) { + + if (u->proxy_protocol) { + if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) { + return; + } + + u->proxy_protocol = 0; + } + + if (pc->ssl == NULL) { + ngx_stream_proxy_ssl_init_connection(s); + return; + } } + #endif c = s->connection; @@ -838,14 +823,66 @@ ngx_stream_proxy_init_upstream(ngx_strea u->upstream_buf.last = p; } - if (c->type == SOCK_DGRAM) { - s->received = c->buffer->last - c->buffer->pos; - u->downstream_buf = *c->buffer; - - if (pscf->responses == 0) { - pc->read->ready = 0; - pc->read->eof = 1; + if (c->buffer && c->buffer->pos < c->buffer->last) { + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream proxy add preread buffer: %uz", + c->buffer->last - c->buffer->pos); + + cl = ngx_chain_get_free_buf(c->pool, &u->free); + if (cl == NULL) { + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + *cl->buf = *c->buffer; + + cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module; + cl->buf->flush = 1; + cl->buf->last_buf = (c->type == SOCK_DGRAM); + + cl->next = u->upstream_out; + u->upstream_out = cl; + } + + if (u->proxy_protocol) { + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream proxy add PROXY protocol header"); + + cl = ngx_chain_get_free_buf(c->pool, &u->free); + if (cl == NULL) { + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; } + + p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER); + if (p == NULL) { + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + cl->buf->pos = p; + + p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER); + if (p == NULL) { + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + cl->buf->last = p; + cl->buf->temporary = 1; + cl->buf->flush = 0; + cl->buf->last_buf = 0; + cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module; + + cl->next = u->upstream_out; + u->upstream_out = cl; + + u->proxy_protocol = 0; + } + + if (c->type == SOCK_DGRAM && pscf->responses == 0) { + pc->read->ready = 0; + pc->read->eof = 1; } u->connected = 1; @@ -861,6 +898,8 @@ ngx_stream_proxy_init_upstream(ngx_strea } +#if (NGX_STREAM_SSL) + static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s) { @@ -931,8 +970,6 @@ ngx_stream_proxy_send_proxy_protocol(ngx } -#if (NGX_STREAM_SSL) - static char * ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) @@ -1412,8 +1449,10 @@ ngx_stream_proxy_process(ngx_stream_sess size_t size, limit_rate; ssize_t n; ngx_buf_t *b; + ngx_int_t rc; ngx_uint_t flags; ngx_msec_t delay; + ngx_chain_t *cl, **ll, **out, **busy; ngx_connection_t *c, *pc, *src, *dst; ngx_log_handler_pt handler; ngx_stream_upstream_t *u; @@ -1447,6 +1486,8 @@ ngx_stream_proxy_process(ngx_stream_sess b = &u->upstream_buf; limit_rate = pscf->download_rate; received = &u->received; + out = &u->downstream_out; + busy = &u->downstream_busy; } else { src = c; @@ -1454,24 +1495,18 @@ ngx_stream_proxy_process(ngx_stream_sess b = &u->downstream_buf; limit_rate = pscf->upload_rate; received = &s->received; + out = &u->upstream_out; + busy = &u->upstream_busy; } for ( ;; ) { - if (do_write) { - - size = b->last - b->pos; - - if (size && dst && dst->write->ready) { - - n = dst->send(dst, b->pos, size); - - if (n == NGX_AGAIN && dst->shared) { - /* cannot wait on a shared socket */ - n = NGX_ERROR; - } - - if (n == NGX_ERROR) { + if (do_write && dst) { + + if (*out || *busy || dst->buffered) { + rc = ngx_stream_top_filter(s, *out, from_upstream); + + if (rc == NGX_ERROR) { if (c->type == SOCK_DGRAM && !from_upstream) { ngx_stream_proxy_next_upstream(s); return; @@ -1481,13 +1516,12 @@ ngx_stream_proxy_process(ngx_stream_sess return; } - if (n > 0) { - b->pos += n; - - if (b->pos == b->last) { - b->pos = b->start; - b->last = b->start; - } + ngx_chain_update_chains(c->pool, &u->free, busy, out, + (ngx_buf_tag_t) &ngx_stream_proxy_module); + + if (*busy == NULL) { + b->pos = b->start; + b->last = b->start; } } } @@ -1514,11 +1548,21 @@ ngx_stream_proxy_process(ngx_stream_sess n = src->recv(src, b->last, size); - if (n == NGX_AGAIN || n == 0) { + if (n == NGX_AGAIN) { break; } - if (n > 0) { + if (n == NGX_ERROR) { + if (c->type == SOCK_DGRAM && u->received == 0) { + ngx_stream_proxy_next_upstream(s); + return; + } + + src->read->eof = 1; + n = 0; + } + + if (n >= 0) { if (limit_rate) { delay = (ngx_msec_t) (n * 1000 / limit_rate); @@ -1541,27 +1585,37 @@ ngx_stream_proxy_process(ngx_stream_sess src->read->eof = 1; } + for (ll = out; *ll; ll = &(*ll)->next) { /* void */ } + + cl = ngx_chain_get_free_buf(c->pool, &u->free); + if (cl == NULL) { + ngx_stream_proxy_finalize(s, + NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + *ll = cl; + + cl->buf->pos = b->last; + cl->buf->last = b->last + n; + cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module; + + cl->buf->temporary = (n ? 1 : 0); + cl->buf->last_buf = src->read->eof; + cl->buf->flush = 1; + *received += n; b->last += n; do_write = 1; continue; } - - if (n == NGX_ERROR) { - if (c->type == SOCK_DGRAM && u->received == 0) { - ngx_stream_proxy_next_upstream(s); - return; - } - - src->read->eof = 1; - } } break; } - if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) { + if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) { handler = c->log->handler; c->log->handler = NULL; @@ -1614,6 +1668,14 @@ ngx_stream_proxy_next_upstream(ngx_strea "stream proxy next upstream"); u = s->upstream; + pc = u->peer.connection; + + if (u->upstream_out || u->upstream_busy || (pc && pc->buffered)) { + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "pending buffers on next upstream"); + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } if (u->peer.sockaddr) { u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED); @@ -1632,8 +1694,6 @@ ngx_stream_proxy_next_upstream(ngx_strea return; } - pc = u->peer.connection; - if (pc) { ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, "close proxy upstream connection: %d", pc->fd); diff --git a/src/stream/ngx_stream_return_module.c b/src/stream/ngx_stream_return_module.c --- a/src/stream/ngx_stream_return_module.c +++ b/src/stream/ngx_stream_return_module.c @@ -11,12 +11,12 @@ typedef struct { - ngx_stream_complex_value_t text; + ngx_stream_complex_value_t text; } ngx_stream_return_srv_conf_t; typedef struct { - ngx_buf_t buf; + ngx_chain_t *out; } ngx_stream_return_ctx_t; @@ -72,6 +72,7 @@ static void ngx_stream_return_handler(ngx_stream_session_t *s) { ngx_str_t text; + ngx_buf_t *b; ngx_connection_t *c; ngx_stream_return_ctx_t *ctx; ngx_stream_return_srv_conf_t *rscf; @@ -103,8 +104,25 @@ ngx_stream_return_handler(ngx_stream_ses ngx_stream_set_ctx(s, ctx, ngx_stream_return_module); - ctx->buf.pos = text.data; - ctx->buf.last = text.data + text.len; + b = ngx_calloc_buf(c->pool); + if (b == NULL) { + ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + b->memory = 1; + b->pos = text.data; + b->last = text.data + text.len; + b->last_buf = 1; + + ctx->out = ngx_alloc_chain_link(c->pool); + if (ctx->out == NULL) { + ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + ctx->out->buf = b; + ctx->out->next = NULL; c->write->handler = ngx_stream_return_write_handler; @@ -115,8 +133,6 @@ ngx_stream_return_handler(ngx_stream_ses static void ngx_stream_return_write_handler(ngx_event_t *ev) { - ssize_t n; - ngx_buf_t *b; ngx_connection_t *c; ngx_stream_session_t *s; ngx_stream_return_ctx_t *ctx; @@ -130,25 +146,20 @@ ngx_stream_return_write_handler(ngx_even return; } - if (ev->ready) { - ctx = ngx_stream_get_module_ctx(s, ngx_stream_return_module); + ctx = ngx_stream_get_module_ctx(s, ngx_stream_return_module); - b = &ctx->buf; + if (ngx_stream_top_filter(s, ctx->out, 1) == NGX_ERROR) { + ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } - n = c->send(c, b->pos, b->last - b->pos); - if (n == NGX_ERROR) { - ngx_stream_finalize_session(s, NGX_STREAM_OK); - return; - } + ctx->out = NULL; - if (n > 0) { - b->pos += n; - - if (b->pos == b->last) { - ngx_stream_finalize_session(s, NGX_STREAM_OK); - return; - } - } + if (!c->buffered) { + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream return done sending"); + ngx_stream_finalize_session(s, NGX_STREAM_OK); + return; } if (ngx_handle_write_event(ev, 0) != NGX_OK) { diff --git a/src/stream/ngx_stream_upstream.h b/src/stream/ngx_stream_upstream.h --- a/src/stream/ngx_stream_upstream.h +++ b/src/stream/ngx_stream_upstream.h @@ -106,14 +106,24 @@ typedef struct { typedef struct { ngx_peer_connection_t peer; + ngx_buf_t downstream_buf; ngx_buf_t upstream_buf; + + ngx_chain_t *free; + ngx_chain_t *upstream_out; + ngx_chain_t *upstream_busy; + ngx_chain_t *downstream_out; + ngx_chain_t *downstream_busy; + off_t received; time_t start_sec; ngx_uint_t responses; + #if (NGX_STREAM_SSL) ngx_str_t ssl_name; #endif + ngx_stream_upstream_resolved_t *resolved; ngx_stream_upstream_state_t *state; unsigned connected:1; diff --git a/src/stream/ngx_stream_write_filter_module.c b/src/stream/ngx_stream_write_filter_module.c new file mode 100644 --- /dev/null +++ b/src/stream/ngx_stream_write_filter_module.c @@ -0,0 +1,273 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +typedef struct { + ngx_chain_t *from_upstream; + ngx_chain_t *from_downstream; +} ngx_stream_write_filter_ctx_t; + + +static ngx_int_t ngx_stream_write_filter(ngx_stream_session_t *s, + ngx_chain_t *in, ngx_uint_t from_upstream); +static ngx_int_t ngx_stream_write_filter_init(ngx_conf_t *cf); + + +static ngx_stream_module_t ngx_stream_write_filter_module_ctx = { + NULL, /* preconfiguration */ + ngx_stream_write_filter_init, /* postconfiguration */ + + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + NULL, /* create server configuration */ + NULL /* merge server configuration */ +}; + + +ngx_module_t ngx_stream_write_filter_module = { + NGX_MODULE_V1, + &ngx_stream_write_filter_module_ctx, /* module context */ + NULL, /* module directives */ + NGX_STREAM_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static ngx_int_t +ngx_stream_write_filter(ngx_stream_session_t *s, ngx_chain_t *in, + ngx_uint_t from_upstream) +{ + off_t size; + ngx_uint_t last, flush, sync; + ngx_chain_t *cl, *ln, **ll, **out, *chain; + ngx_connection_t *c; + ngx_stream_write_filter_ctx_t *ctx; + + ctx = ngx_stream_get_module_ctx(s, ngx_stream_write_filter_module); + + if (ctx == NULL) { + ctx = ngx_pcalloc(s->connection->pool, + sizeof(ngx_stream_write_filter_ctx_t)); + if (ctx == NULL) { + return NGX_ERROR; + } + + ngx_stream_set_ctx(s, ctx, ngx_stream_write_filter_module); + } + + if (from_upstream) { + c = s->connection; + out = &ctx->from_upstream; + + } else { + c = s->upstream->peer.connection; + out = &ctx->from_downstream; + } + + if (c->error) { + return NGX_ERROR; + } + + size = 0; + flush = 0; + sync = 0; + last = 0; + ll = out; + + /* find the size, the flush point and the last link of the saved chain */ + + for (cl = *out; cl; cl = cl->next) { + ll = &cl->next; + + ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0, + "write old buf t:%d f:%d %p, pos %p, size: %z " + "file: %O, size: %O", + cl->buf->temporary, cl->buf->in_file, + cl->buf->start, cl->buf->pos, + cl->buf->last - cl->buf->pos, + cl->buf->file_pos, + cl->buf->file_last - cl->buf->file_pos); + +#if 1 + if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) { + ngx_log_error(NGX_LOG_ALERT, c->log, 0, + "zero size buf in writer " + "t:%d r:%d f:%d %p %p-%p %p %O-%O", + cl->buf->temporary, + cl->buf->recycled, + cl->buf->in_file, + cl->buf->start, + cl->buf->pos, + cl->buf->last, + cl->buf->file, + cl->buf->file_pos, + cl->buf->file_last); + + ngx_debug_point(); + return NGX_ERROR; + } +#endif + + size += ngx_buf_size(cl->buf); + + if (cl->buf->flush || cl->buf->recycled) { + flush = 1; + } + + if (cl->buf->sync) { + sync = 1; + } + + if (cl->buf->last_buf) { + last = 1; + } + } + + /* add the new chain to the existent one */ + + for (ln = in; ln; ln = ln->next) { + cl = ngx_alloc_chain_link(c->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = ln->buf; + *ll = cl; + ll = &cl->next; + + ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0, + "write new buf t:%d f:%d %p, pos %p, size: %z " + "file: %O, size: %O", + cl->buf->temporary, cl->buf->in_file, + cl->buf->start, cl->buf->pos, + cl->buf->last - cl->buf->pos, + cl->buf->file_pos, + cl->buf->file_last - cl->buf->file_pos); + +#if 1 + if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) { + ngx_log_error(NGX_LOG_ALERT, c->log, 0, + "zero size buf in writer " + "t:%d r:%d f:%d %p %p-%p %p %O-%O", + cl->buf->temporary, + cl->buf->recycled, + cl->buf->in_file, + cl->buf->start, + cl->buf->pos, + cl->buf->last, + cl->buf->file, + cl->buf->file_pos, + cl->buf->file_last); + + ngx_debug_point(); + return NGX_ERROR; + } +#endif + + size += ngx_buf_size(cl->buf); + + if (cl->buf->flush || cl->buf->recycled) { + flush = 1; + } + + if (cl->buf->sync) { + sync = 1; + } + + if (cl->buf->last_buf) { + last = 1; + } + } + + *ll = NULL; + + ngx_log_debug3(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream write filter: l:%ui f:%ui s:%O", last, flush, size); + + if (size == 0 + && !(c->buffered & NGX_LOWLEVEL_BUFFERED) + && !(last && c->need_last_buf)) + { + if (last || flush || sync) { + for (cl = *out; cl; /* void */) { + ln = cl; + cl = cl->next; + ngx_free_chain(c->pool, ln); + } + + *out = NULL; + c->buffered &= ~NGX_STREAM_WRITE_BUFFERED; + + return NGX_OK; + } + + ngx_log_error(NGX_LOG_ALERT, c->log, 0, + "the stream output chain is empty"); + + ngx_debug_point(); + + return NGX_ERROR; + } + + chain = c->send_chain(c, *out, 0); + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream write filter %p", chain); + + if (chain == NGX_CHAIN_ERROR) { + c->error = 1; + return NGX_ERROR; + } + + for (cl = *out; cl && cl != chain; /* void */) { + ln = cl; + cl = cl->next; + ngx_free_chain(c->pool, ln); + } + + *out = chain; + + if (chain) { + if (c->shared) { + ngx_log_error(NGX_LOG_ALERT, c->log, 0, + "shared connection is busy"); + return NGX_ERROR; + } + + c->buffered |= NGX_STREAM_WRITE_BUFFERED; + return NGX_AGAIN; + } + + c->buffered &= ~NGX_STREAM_WRITE_BUFFERED; + + if (c->buffered & NGX_LOWLEVEL_BUFFERED) { + return NGX_AGAIN; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_write_filter_init(ngx_conf_t *cf) +{ + ngx_stream_top_filter = ngx_stream_write_filter; + + return NGX_OK; +}