diff src/http/ngx_http_upstream.c @ 5072:7fa7e60a7f66

Proxy: support for connection upgrade (101 Switching Protocols). This allows to proxy WebSockets by using configuration like this: location /chat/ { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } Connection upgrade is allowed as long as it was requested by a client via the Upgrade request header.
author Maxim Dounin <mdounin@mdounin.ru>
date Mon, 18 Feb 2013 13:50:52 +0000
parents fd84344f1df7
children 52eb762c68a9
line wrap: on
line diff
--- a/src/http/ngx_http_upstream.c
+++ b/src/http/ngx_http_upstream.c
@@ -46,6 +46,16 @@ static void ngx_http_upstream_process_bo
     ngx_http_upstream_t *u);
 static void ngx_http_upstream_send_response(ngx_http_request_t *r,
     ngx_http_upstream_t *u);
+static void ngx_http_upstream_upgrade(ngx_http_request_t *r,
+    ngx_http_upstream_t *u);
+static void ngx_http_upstream_upgraded_read_downstream(ngx_http_request_t *r);
+static void ngx_http_upstream_upgraded_write_downstream(ngx_http_request_t *r);
+static void ngx_http_upstream_upgraded_read_upstream(ngx_http_request_t *r,
+    ngx_http_upstream_t *u);
+static void ngx_http_upstream_upgraded_write_upstream(ngx_http_request_t *r,
+    ngx_http_upstream_t *u);
+static void ngx_http_upstream_process_upgraded(ngx_http_request_t *r,
+    ngx_uint_t from_upstream);
 static void
     ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r);
 static void
@@ -1327,6 +1337,7 @@ ngx_http_upstream_reinit(ngx_http_reques
     }
 
     u->keepalive = 0;
+    u->upgrade = 0;
 
     ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
     u->headers_in.content_length_n = -1;
@@ -2078,6 +2089,11 @@ ngx_http_upstream_send_response(ngx_http
         return;
     }
 
+    if (u->upgrade) {
+        ngx_http_upstream_upgrade(r, u);
+        return;
+    }
+
     c = r->connection;
 
     if (r->header_only) {
@@ -2361,6 +2377,278 @@ ngx_http_upstream_send_response(ngx_http
 
 
 static void
+ngx_http_upstream_upgrade(ngx_http_request_t *r, ngx_http_upstream_t *u)
+{
+    int                        tcp_nodelay;
+    ngx_connection_t          *c;
+    ngx_http_core_loc_conf_t  *clcf;
+
+    c = r->connection;
+    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
+
+    /* TODO: prevent upgrade if not requested or not possible */
+
+    r->keepalive = 0;
+    c->log->action = "proxying upgraded connection";
+
+    u->read_event_handler = ngx_http_upstream_upgraded_read_upstream;
+    u->write_event_handler = ngx_http_upstream_upgraded_write_upstream;
+    r->read_event_handler = ngx_http_upstream_upgraded_read_downstream;
+    r->write_event_handler = ngx_http_upstream_upgraded_write_downstream;
+
+    if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
+        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
+
+        tcp_nodelay = 1;
+
+        if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
+                       (const void *) &tcp_nodelay, sizeof(int)) == -1)
+        {
+            ngx_connection_error(c, ngx_socket_errno,
+                                 "setsockopt(TCP_NODELAY) failed");
+            ngx_http_upstream_finalize_request(r, u, 0);
+            return;
+        }
+
+        c->tcp_nodelay = NGX_TCP_NODELAY_SET;
+
+        if (setsockopt(u->peer.connection->fd, IPPROTO_TCP, TCP_NODELAY,
+                       (const void *) &tcp_nodelay, sizeof(int)) == -1)
+        {
+            ngx_connection_error(u->peer.connection, ngx_socket_errno,
+                                 "setsockopt(TCP_NODELAY) failed");
+            ngx_http_upstream_finalize_request(r, u, 0);
+            return;
+        }
+
+        u->peer.connection->tcp_nodelay = NGX_TCP_NODELAY_SET;
+    }
+
+    if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
+    if (u->peer.connection->read->ready
+        || u->buffer.pos != u->buffer.last)
+    {
+        ngx_http_upstream_process_upgraded(r, 1);
+    }
+
+    if (c->read->ready
+        || r->header_in->pos != r->header_in->last)
+    {
+        ngx_http_upstream_process_upgraded(r, 0);
+    }
+}
+
+
+static void
+ngx_http_upstream_upgraded_read_downstream(ngx_http_request_t *r)
+{
+    ngx_http_upstream_process_upgraded(r, 0);
+}
+
+
+static void
+ngx_http_upstream_upgraded_write_downstream(ngx_http_request_t *r)
+{
+    ngx_http_upstream_process_upgraded(r, 1);
+}
+
+
+static void
+ngx_http_upstream_upgraded_read_upstream(ngx_http_request_t *r,
+    ngx_http_upstream_t *u)
+{
+    ngx_http_upstream_process_upgraded(r, 1);
+}
+
+
+static void
+ngx_http_upstream_upgraded_write_upstream(ngx_http_request_t *r,
+    ngx_http_upstream_t *u)
+{
+    ngx_http_upstream_process_upgraded(r, 0);
+}
+
+
+static void
+ngx_http_upstream_process_upgraded(ngx_http_request_t *r,
+    ngx_uint_t from_upstream)
+{
+    size_t                     size;
+    ssize_t                    n;
+    ngx_buf_t                 *b;
+    ngx_uint_t                 do_write;
+    ngx_connection_t          *c, *downstream, *upstream, *dst, *src;
+    ngx_http_upstream_t       *u;
+    ngx_http_core_loc_conf_t  *clcf;
+
+    c = r->connection;
+    u = r->upstream;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
+                   "http upstream process upgraded, fu:%ui", from_upstream);
+
+    downstream = c;
+    upstream = u->peer.connection;
+
+    if (downstream->write->timedout) {
+        c->timedout = 1;
+        ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
+        ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
+        return;
+    }
+
+    if (upstream->read->timedout || upstream->write->timedout) {
+        ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
+    if (from_upstream) {
+        src = upstream;
+        dst = downstream;
+        b = &u->buffer;
+
+    } else {
+        src = downstream;
+        dst = upstream;
+        b = &u->from_client;
+
+        if (r->header_in->last > r->header_in->pos) {
+            b = r->header_in;
+            b->end = b->last;
+            do_write = 1;
+        }
+
+        if (b->start == NULL) {
+            b->start = ngx_palloc(r->pool, u->conf->buffer_size);
+            if (b->start == NULL) {
+                ngx_http_upstream_finalize_request(r, u, 0);
+                return;
+            }
+
+            b->pos = b->start;
+            b->last = b->start;
+            b->end = b->start + u->conf->buffer_size;
+            b->temporary = 1;
+            b->tag = u->output.tag;
+        }
+    }
+
+    for ( ;; ) {
+
+        if (do_write) {
+
+            size = b->last - b->pos;
+
+            if (size && dst->write->ready) {
+
+                n = dst->send(dst, b->pos, size);
+
+                if (n == NGX_ERROR) {
+                    ngx_http_upstream_finalize_request(r, u, 0);
+                    return;
+                }
+
+                if (n > 0) {
+                    b->pos += n;
+
+                    if (b->pos == b->last) {
+                        b->pos = b->start;
+                        b->last = b->start;
+                    }
+                }
+            }
+        }
+
+        size = b->end - b->last;
+
+        if (size && src->read->ready) {
+
+            n = src->recv(src, b->last, size);
+
+            if (n == NGX_AGAIN || n == 0) {
+                break;
+            }
+
+            if (n > 0) {
+                do_write = 1;
+                b->last += n;
+
+                continue;
+            }
+
+            if (n == NGX_ERROR) {
+                src->read->eof = 1;
+            }
+        }
+
+        break;
+    }
+
+    if ((upstream->read->eof && u->buffer.pos == u->buffer.last)
+        || (downstream->read->eof && u->from_client.pos == u->from_client.last)
+        || (downstream->read->eof && upstream->read->eof))
+    {
+        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
+                       "http upstream upgraded done");
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
+    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
+
+    if (ngx_handle_write_event(upstream->write, u->conf->send_lowat)
+        != NGX_OK)
+    {
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
+    if (upstream->write->active && !upstream->write->ready) {
+        ngx_add_timer(upstream->write, u->conf->send_timeout);
+
+    } else if (upstream->write->timer_set) {
+        ngx_del_timer(upstream->write);
+    }
+
+    if (ngx_handle_read_event(upstream->read, 0) != NGX_OK) {
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
+    if (upstream->read->active && !upstream->read->ready) {
+        ngx_add_timer(upstream->read, u->conf->read_timeout);
+
+    } else if (upstream->read->timer_set) {
+        ngx_del_timer(upstream->read);
+    }
+
+    if (ngx_handle_write_event(downstream->write, clcf->send_lowat)
+        != NGX_OK)
+    {
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
+    if (ngx_handle_read_event(downstream->read, 0) != NGX_OK) {
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
+    if (downstream->write->active && !downstream->write->ready) {
+        ngx_add_timer(downstream->write, clcf->send_timeout);
+
+    } else if (downstream->write->timer_set) {
+        ngx_del_timer(downstream->write);
+    }
+}
+
+
+static void
 ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r)
 {
     ngx_event_t          *wev;