# HG changeset patch # User Maxim Dounin # Date 1361195452 0 # Node ID 7fa7e60a7f66f62b6ff6d071d9425d5b7a454f19 # Parent e14b49c12a73d0586357d1d8a1b3c55618e14c13 Proxy: support for connection upgrade (101 Switching Protocols). This allows to proxy WebSockets by using configuration like this: location /chat/ { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } Connection upgrade is allowed as long as it was requested by a client via the Upgrade request header. diff --git a/src/http/modules/ngx_http_chunked_filter_module.c b/src/http/modules/ngx_http_chunked_filter_module.c --- a/src/http/modules/ngx_http_chunked_filter_module.c +++ b/src/http/modules/ngx_http_chunked_filter_module.c @@ -62,6 +62,7 @@ ngx_http_chunked_header_filter(ngx_http_ if (r->headers_out.status == NGX_HTTP_NOT_MODIFIED || r->headers_out.status == NGX_HTTP_NO_CONTENT + || r->headers_out.status < NGX_HTTP_OK || r != r->main || (r->method & NGX_HTTP_HEAD)) { diff --git a/src/http/modules/ngx_http_proxy_module.c b/src/http/modules/ngx_http_proxy_module.c --- a/src/http/modules/ngx_http_proxy_module.c +++ b/src/http/modules/ngx_http_proxy_module.c @@ -1474,6 +1474,14 @@ ngx_http_proxy_process_header(ngx_http_r u->keepalive = !u->headers_in.connection_close; } + if (u->headers_in.status_n == NGX_HTTP_SWITCHING_PROTOCOLS) { + u->keepalive = 0; + + if (r->headers_in.upgrade) { + u->upgrade = 1; + } + } + return NGX_OK; } diff --git a/src/http/ngx_http_header_filter_module.c b/src/http/ngx_http_header_filter_module.c --- a/src/http/ngx_http_header_filter_module.c +++ b/src/http/ngx_http_header_filter_module.c @@ -379,7 +379,10 @@ ngx_http_header_filter(ngx_http_request_ len += sizeof("Transfer-Encoding: chunked" CRLF) - 1; } - if (r->keepalive) { + if (r->headers_out.status == NGX_HTTP_SWITCHING_PROTOCOLS) { + len += sizeof("Connection: upgrade" CRLF) - 1; + + } else if (r->keepalive) { len += sizeof("Connection: keep-alive" CRLF) - 1; /* @@ -548,7 +551,11 @@ ngx_http_header_filter(ngx_http_request_ sizeof("Transfer-Encoding: chunked" CRLF) - 1); } - if (r->keepalive) { + if (r->headers_out.status == NGX_HTTP_SWITCHING_PROTOCOLS) { + b->last = ngx_cpymem(b->last, "Connection: upgrade" CRLF, + sizeof("Connection: upgrade" CRLF) - 1); + + } else if (r->keepalive) { b->last = ngx_cpymem(b->last, "Connection: keep-alive" CRLF, sizeof("Connection: keep-alive" CRLF) - 1); diff --git a/src/http/ngx_http_request.c b/src/http/ngx_http_request.c --- a/src/http/ngx_http_request.c +++ b/src/http/ngx_http_request.c @@ -130,6 +130,10 @@ ngx_http_header_t ngx_http_headers_in[] offsetof(ngx_http_headers_in_t, expect), ngx_http_process_unique_header_line }, + { ngx_string("Upgrade"), + offsetof(ngx_http_headers_in_t, upgrade), + ngx_http_process_header_line }, + #if (NGX_HTTP_GZIP) { ngx_string("Accept-Encoding"), offsetof(ngx_http_headers_in_t, accept_encoding), diff --git a/src/http/ngx_http_request.h b/src/http/ngx_http_request.h --- a/src/http/ngx_http_request.h +++ b/src/http/ngx_http_request.h @@ -64,6 +64,10 @@ #define NGX_HTTP_LOG_UNSAFE 8 +#define NGX_HTTP_CONTINUE 100 +#define NGX_HTTP_SWITCHING_PROTOCOLS 101 +#define NGX_HTTP_PROCESSING 102 + #define NGX_HTTP_OK 200 #define NGX_HTTP_CREATED 201 #define NGX_HTTP_ACCEPTED 202 @@ -184,6 +188,7 @@ typedef struct { ngx_table_elt_t *transfer_encoding; ngx_table_elt_t *expect; + ngx_table_elt_t *upgrade; #if (NGX_HTTP_GZIP) ngx_table_elt_t *accept_encoding; diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c --- a/src/http/ngx_http_upstream.c +++ b/src/http/ngx_http_upstream.c @@ -46,6 +46,16 @@ static void ngx_http_upstream_process_bo ngx_http_upstream_t *u); static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u); +static void ngx_http_upstream_upgrade(ngx_http_request_t *r, + ngx_http_upstream_t *u); +static void ngx_http_upstream_upgraded_read_downstream(ngx_http_request_t *r); +static void ngx_http_upstream_upgraded_write_downstream(ngx_http_request_t *r); +static void ngx_http_upstream_upgraded_read_upstream(ngx_http_request_t *r, + ngx_http_upstream_t *u); +static void ngx_http_upstream_upgraded_write_upstream(ngx_http_request_t *r, + ngx_http_upstream_t *u); +static void ngx_http_upstream_process_upgraded(ngx_http_request_t *r, + ngx_uint_t from_upstream); static void ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r); static void @@ -1327,6 +1337,7 @@ ngx_http_upstream_reinit(ngx_http_reques } u->keepalive = 0; + u->upgrade = 0; ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t)); u->headers_in.content_length_n = -1; @@ -2078,6 +2089,11 @@ ngx_http_upstream_send_response(ngx_http return; } + if (u->upgrade) { + ngx_http_upstream_upgrade(r, u); + return; + } + c = r->connection; if (r->header_only) { @@ -2361,6 +2377,278 @@ ngx_http_upstream_send_response(ngx_http static void +ngx_http_upstream_upgrade(ngx_http_request_t *r, ngx_http_upstream_t *u) +{ + int tcp_nodelay; + ngx_connection_t *c; + ngx_http_core_loc_conf_t *clcf; + + c = r->connection; + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + + /* TODO: prevent upgrade if not requested or not possible */ + + r->keepalive = 0; + c->log->action = "proxying upgraded connection"; + + u->read_event_handler = ngx_http_upstream_upgraded_read_upstream; + u->write_event_handler = ngx_http_upstream_upgraded_write_upstream; + r->read_event_handler = ngx_http_upstream_upgraded_read_downstream; + r->write_event_handler = ngx_http_upstream_upgraded_write_downstream; + + if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay"); + + tcp_nodelay = 1; + + if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, + (const void *) &tcp_nodelay, sizeof(int)) == -1) + { + ngx_connection_error(c, ngx_socket_errno, + "setsockopt(TCP_NODELAY) failed"); + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + c->tcp_nodelay = NGX_TCP_NODELAY_SET; + + if (setsockopt(u->peer.connection->fd, IPPROTO_TCP, TCP_NODELAY, + (const void *) &tcp_nodelay, sizeof(int)) == -1) + { + ngx_connection_error(u->peer.connection, ngx_socket_errno, + "setsockopt(TCP_NODELAY) failed"); + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + u->peer.connection->tcp_nodelay = NGX_TCP_NODELAY_SET; + } + + if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) { + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + if (u->peer.connection->read->ready + || u->buffer.pos != u->buffer.last) + { + ngx_http_upstream_process_upgraded(r, 1); + } + + if (c->read->ready + || r->header_in->pos != r->header_in->last) + { + ngx_http_upstream_process_upgraded(r, 0); + } +} + + +static void +ngx_http_upstream_upgraded_read_downstream(ngx_http_request_t *r) +{ + ngx_http_upstream_process_upgraded(r, 0); +} + + +static void +ngx_http_upstream_upgraded_write_downstream(ngx_http_request_t *r) +{ + ngx_http_upstream_process_upgraded(r, 1); +} + + +static void +ngx_http_upstream_upgraded_read_upstream(ngx_http_request_t *r, + ngx_http_upstream_t *u) +{ + ngx_http_upstream_process_upgraded(r, 1); +} + + +static void +ngx_http_upstream_upgraded_write_upstream(ngx_http_request_t *r, + ngx_http_upstream_t *u) +{ + ngx_http_upstream_process_upgraded(r, 0); +} + + +static void +ngx_http_upstream_process_upgraded(ngx_http_request_t *r, + ngx_uint_t from_upstream) +{ + size_t size; + ssize_t n; + ngx_buf_t *b; + ngx_uint_t do_write; + ngx_connection_t *c, *downstream, *upstream, *dst, *src; + ngx_http_upstream_t *u; + ngx_http_core_loc_conf_t *clcf; + + c = r->connection; + u = r->upstream; + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream process upgraded, fu:%ui", from_upstream); + + downstream = c; + upstream = u->peer.connection; + + if (downstream->write->timedout) { + c->timedout = 1; + ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out"); + ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT); + return; + } + + if (upstream->read->timedout || upstream->write->timedout) { + ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out"); + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + if (from_upstream) { + src = upstream; + dst = downstream; + b = &u->buffer; + + } else { + src = downstream; + dst = upstream; + b = &u->from_client; + + if (r->header_in->last > r->header_in->pos) { + b = r->header_in; + b->end = b->last; + do_write = 1; + } + + if (b->start == NULL) { + b->start = ngx_palloc(r->pool, u->conf->buffer_size); + if (b->start == NULL) { + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + b->pos = b->start; + b->last = b->start; + b->end = b->start + u->conf->buffer_size; + b->temporary = 1; + b->tag = u->output.tag; + } + } + + for ( ;; ) { + + if (do_write) { + + size = b->last - b->pos; + + if (size && dst->write->ready) { + + n = dst->send(dst, b->pos, size); + + if (n == NGX_ERROR) { + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + if (n > 0) { + b->pos += n; + + if (b->pos == b->last) { + b->pos = b->start; + b->last = b->start; + } + } + } + } + + size = b->end - b->last; + + if (size && src->read->ready) { + + n = src->recv(src, b->last, size); + + if (n == NGX_AGAIN || n == 0) { + break; + } + + if (n > 0) { + do_write = 1; + b->last += n; + + continue; + } + + if (n == NGX_ERROR) { + src->read->eof = 1; + } + } + + break; + } + + if ((upstream->read->eof && u->buffer.pos == u->buffer.last) + || (downstream->read->eof && u->from_client.pos == u->from_client.last) + || (downstream->read->eof && upstream->read->eof)) + { + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream upgraded done"); + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + + if (ngx_handle_write_event(upstream->write, u->conf->send_lowat) + != NGX_OK) + { + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + if (upstream->write->active && !upstream->write->ready) { + ngx_add_timer(upstream->write, u->conf->send_timeout); + + } else if (upstream->write->timer_set) { + ngx_del_timer(upstream->write); + } + + if (ngx_handle_read_event(upstream->read, 0) != NGX_OK) { + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + if (upstream->read->active && !upstream->read->ready) { + ngx_add_timer(upstream->read, u->conf->read_timeout); + + } else if (upstream->read->timer_set) { + ngx_del_timer(upstream->read); + } + + if (ngx_handle_write_event(downstream->write, clcf->send_lowat) + != NGX_OK) + { + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + if (ngx_handle_read_event(downstream->read, 0) != NGX_OK) { + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + + if (downstream->write->active && !downstream->write->ready) { + ngx_add_timer(downstream->write, clcf->send_timeout); + + } else if (downstream->write->timer_set) { + ngx_del_timer(downstream->write); + } +} + + +static void ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r) { ngx_event_t *wev; diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h --- a/src/http/ngx_http_upstream.h +++ b/src/http/ngx_http_upstream.h @@ -284,6 +284,8 @@ struct ngx_http_upstream_s { ngx_http_upstream_resolved_t *resolved; + ngx_buf_t from_client; + ngx_buf_t buffer; off_t length; @@ -329,6 +331,7 @@ struct ngx_http_upstream_s { unsigned buffering:1; unsigned keepalive:1; + unsigned upgrade:1; unsigned request_sent:1; unsigned header_sent:1; diff --git a/src/http/ngx_http_variables.c b/src/http/ngx_http_variables.c --- a/src/http/ngx_http_variables.c +++ b/src/http/ngx_http_variables.c @@ -1747,7 +1747,11 @@ ngx_http_variable_sent_connection(ngx_ht size_t len; char *p; - if (r->keepalive) { + if (r->headers_out.status == NGX_HTTP_SWITCHING_PROTOCOLS) { + len = sizeof("upgrade") - 1; + p = "upgrade"; + + } else if (r->keepalive) { len = sizeof("keep-alive") - 1; p = "keep-alive";