comparison src/http/ngx_http_upstream.c @ 5072:7fa7e60a7f66

Proxy: support for connection upgrade (101 Switching Protocols). This allows to proxy WebSockets by using configuration like this: location /chat/ { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } Connection upgrade is allowed as long as it was requested by a client via the Upgrade request header.
author Maxim Dounin <mdounin@mdounin.ru>
date Mon, 18 Feb 2013 13:50:52 +0000
parents fd84344f1df7
children 52eb762c68a9
comparison
equal deleted inserted replaced
5071:e14b49c12a73 5072:7fa7e60a7f66
44 ngx_http_upstream_t *u); 44 ngx_http_upstream_t *u);
45 static void ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r, 45 static void ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
46 ngx_http_upstream_t *u); 46 ngx_http_upstream_t *u);
47 static void ngx_http_upstream_send_response(ngx_http_request_t *r, 47 static void ngx_http_upstream_send_response(ngx_http_request_t *r,
48 ngx_http_upstream_t *u); 48 ngx_http_upstream_t *u);
49 static void ngx_http_upstream_upgrade(ngx_http_request_t *r,
50 ngx_http_upstream_t *u);
51 static void ngx_http_upstream_upgraded_read_downstream(ngx_http_request_t *r);
52 static void ngx_http_upstream_upgraded_write_downstream(ngx_http_request_t *r);
53 static void ngx_http_upstream_upgraded_read_upstream(ngx_http_request_t *r,
54 ngx_http_upstream_t *u);
55 static void ngx_http_upstream_upgraded_write_upstream(ngx_http_request_t *r,
56 ngx_http_upstream_t *u);
57 static void ngx_http_upstream_process_upgraded(ngx_http_request_t *r,
58 ngx_uint_t from_upstream);
49 static void 59 static void
50 ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r); 60 ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r);
51 static void 61 static void
52 ngx_http_upstream_process_non_buffered_upstream(ngx_http_request_t *r, 62 ngx_http_upstream_process_non_buffered_upstream(ngx_http_request_t *r,
53 ngx_http_upstream_t *u); 63 ngx_http_upstream_t *u);
1325 if (u->reinit_request(r) != NGX_OK) { 1335 if (u->reinit_request(r) != NGX_OK) {
1326 return NGX_ERROR; 1336 return NGX_ERROR;
1327 } 1337 }
1328 1338
1329 u->keepalive = 0; 1339 u->keepalive = 0;
1340 u->upgrade = 0;
1330 1341
1331 ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t)); 1342 ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
1332 u->headers_in.content_length_n = -1; 1343 u->headers_in.content_length_n = -1;
1333 1344
1334 if (ngx_list_init(&u->headers_in.headers, r->pool, 8, 1345 if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
2073 2084
2074 rc = ngx_http_send_header(r); 2085 rc = ngx_http_send_header(r);
2075 2086
2076 if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) { 2087 if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
2077 ngx_http_upstream_finalize_request(r, u, rc); 2088 ngx_http_upstream_finalize_request(r, u, rc);
2089 return;
2090 }
2091
2092 if (u->upgrade) {
2093 ngx_http_upstream_upgrade(r, u);
2078 return; 2094 return;
2079 } 2095 }
2080 2096
2081 c = r->connection; 2097 c = r->connection;
2082 2098
2359 ngx_http_upstream_process_upstream(r, u); 2375 ngx_http_upstream_process_upstream(r, u);
2360 } 2376 }
2361 2377
2362 2378
2363 static void 2379 static void
2380 ngx_http_upstream_upgrade(ngx_http_request_t *r, ngx_http_upstream_t *u)
2381 {
2382 int tcp_nodelay;
2383 ngx_connection_t *c;
2384 ngx_http_core_loc_conf_t *clcf;
2385
2386 c = r->connection;
2387 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
2388
2389 /* TODO: prevent upgrade if not requested or not possible */
2390
2391 r->keepalive = 0;
2392 c->log->action = "proxying upgraded connection";
2393
2394 u->read_event_handler = ngx_http_upstream_upgraded_read_upstream;
2395 u->write_event_handler = ngx_http_upstream_upgraded_write_upstream;
2396 r->read_event_handler = ngx_http_upstream_upgraded_read_downstream;
2397 r->write_event_handler = ngx_http_upstream_upgraded_write_downstream;
2398
2399 if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
2400 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
2401
2402 tcp_nodelay = 1;
2403
2404 if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
2405 (const void *) &tcp_nodelay, sizeof(int)) == -1)
2406 {
2407 ngx_connection_error(c, ngx_socket_errno,
2408 "setsockopt(TCP_NODELAY) failed");
2409 ngx_http_upstream_finalize_request(r, u, 0);
2410 return;
2411 }
2412
2413 c->tcp_nodelay = NGX_TCP_NODELAY_SET;
2414
2415 if (setsockopt(u->peer.connection->fd, IPPROTO_TCP, TCP_NODELAY,
2416 (const void *) &tcp_nodelay, sizeof(int)) == -1)
2417 {
2418 ngx_connection_error(u->peer.connection, ngx_socket_errno,
2419 "setsockopt(TCP_NODELAY) failed");
2420 ngx_http_upstream_finalize_request(r, u, 0);
2421 return;
2422 }
2423
2424 u->peer.connection->tcp_nodelay = NGX_TCP_NODELAY_SET;
2425 }
2426
2427 if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
2428 ngx_http_upstream_finalize_request(r, u, 0);
2429 return;
2430 }
2431
2432 if (u->peer.connection->read->ready
2433 || u->buffer.pos != u->buffer.last)
2434 {
2435 ngx_http_upstream_process_upgraded(r, 1);
2436 }
2437
2438 if (c->read->ready
2439 || r->header_in->pos != r->header_in->last)
2440 {
2441 ngx_http_upstream_process_upgraded(r, 0);
2442 }
2443 }
2444
2445
2446 static void
2447 ngx_http_upstream_upgraded_read_downstream(ngx_http_request_t *r)
2448 {
2449 ngx_http_upstream_process_upgraded(r, 0);
2450 }
2451
2452
2453 static void
2454 ngx_http_upstream_upgraded_write_downstream(ngx_http_request_t *r)
2455 {
2456 ngx_http_upstream_process_upgraded(r, 1);
2457 }
2458
2459
2460 static void
2461 ngx_http_upstream_upgraded_read_upstream(ngx_http_request_t *r,
2462 ngx_http_upstream_t *u)
2463 {
2464 ngx_http_upstream_process_upgraded(r, 1);
2465 }
2466
2467
2468 static void
2469 ngx_http_upstream_upgraded_write_upstream(ngx_http_request_t *r,
2470 ngx_http_upstream_t *u)
2471 {
2472 ngx_http_upstream_process_upgraded(r, 0);
2473 }
2474
2475
2476 static void
2477 ngx_http_upstream_process_upgraded(ngx_http_request_t *r,
2478 ngx_uint_t from_upstream)
2479 {
2480 size_t size;
2481 ssize_t n;
2482 ngx_buf_t *b;
2483 ngx_uint_t do_write;
2484 ngx_connection_t *c, *downstream, *upstream, *dst, *src;
2485 ngx_http_upstream_t *u;
2486 ngx_http_core_loc_conf_t *clcf;
2487
2488 c = r->connection;
2489 u = r->upstream;
2490
2491 ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
2492 "http upstream process upgraded, fu:%ui", from_upstream);
2493
2494 downstream = c;
2495 upstream = u->peer.connection;
2496
2497 if (downstream->write->timedout) {
2498 c->timedout = 1;
2499 ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
2500 ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
2501 return;
2502 }
2503
2504 if (upstream->read->timedout || upstream->write->timedout) {
2505 ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
2506 ngx_http_upstream_finalize_request(r, u, 0);
2507 return;
2508 }
2509
2510 if (from_upstream) {
2511 src = upstream;
2512 dst = downstream;
2513 b = &u->buffer;
2514
2515 } else {
2516 src = downstream;
2517 dst = upstream;
2518 b = &u->from_client;
2519
2520 if (r->header_in->last > r->header_in->pos) {
2521 b = r->header_in;
2522 b->end = b->last;
2523 do_write = 1;
2524 }
2525
2526 if (b->start == NULL) {
2527 b->start = ngx_palloc(r->pool, u->conf->buffer_size);
2528 if (b->start == NULL) {
2529 ngx_http_upstream_finalize_request(r, u, 0);
2530 return;
2531 }
2532
2533 b->pos = b->start;
2534 b->last = b->start;
2535 b->end = b->start + u->conf->buffer_size;
2536 b->temporary = 1;
2537 b->tag = u->output.tag;
2538 }
2539 }
2540
2541 for ( ;; ) {
2542
2543 if (do_write) {
2544
2545 size = b->last - b->pos;
2546
2547 if (size && dst->write->ready) {
2548
2549 n = dst->send(dst, b->pos, size);
2550
2551 if (n == NGX_ERROR) {
2552 ngx_http_upstream_finalize_request(r, u, 0);
2553 return;
2554 }
2555
2556 if (n > 0) {
2557 b->pos += n;
2558
2559 if (b->pos == b->last) {
2560 b->pos = b->start;
2561 b->last = b->start;
2562 }
2563 }
2564 }
2565 }
2566
2567 size = b->end - b->last;
2568
2569 if (size && src->read->ready) {
2570
2571 n = src->recv(src, b->last, size);
2572
2573 if (n == NGX_AGAIN || n == 0) {
2574 break;
2575 }
2576
2577 if (n > 0) {
2578 do_write = 1;
2579 b->last += n;
2580
2581 continue;
2582 }
2583
2584 if (n == NGX_ERROR) {
2585 src->read->eof = 1;
2586 }
2587 }
2588
2589 break;
2590 }
2591
2592 if ((upstream->read->eof && u->buffer.pos == u->buffer.last)
2593 || (downstream->read->eof && u->from_client.pos == u->from_client.last)
2594 || (downstream->read->eof && upstream->read->eof))
2595 {
2596 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
2597 "http upstream upgraded done");
2598 ngx_http_upstream_finalize_request(r, u, 0);
2599 return;
2600 }
2601
2602 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
2603
2604 if (ngx_handle_write_event(upstream->write, u->conf->send_lowat)
2605 != NGX_OK)
2606 {
2607 ngx_http_upstream_finalize_request(r, u, 0);
2608 return;
2609 }
2610
2611 if (upstream->write->active && !upstream->write->ready) {
2612 ngx_add_timer(upstream->write, u->conf->send_timeout);
2613
2614 } else if (upstream->write->timer_set) {
2615 ngx_del_timer(upstream->write);
2616 }
2617
2618 if (ngx_handle_read_event(upstream->read, 0) != NGX_OK) {
2619 ngx_http_upstream_finalize_request(r, u, 0);
2620 return;
2621 }
2622
2623 if (upstream->read->active && !upstream->read->ready) {
2624 ngx_add_timer(upstream->read, u->conf->read_timeout);
2625
2626 } else if (upstream->read->timer_set) {
2627 ngx_del_timer(upstream->read);
2628 }
2629
2630 if (ngx_handle_write_event(downstream->write, clcf->send_lowat)
2631 != NGX_OK)
2632 {
2633 ngx_http_upstream_finalize_request(r, u, 0);
2634 return;
2635 }
2636
2637 if (ngx_handle_read_event(downstream->read, 0) != NGX_OK) {
2638 ngx_http_upstream_finalize_request(r, u, 0);
2639 return;
2640 }
2641
2642 if (downstream->write->active && !downstream->write->ready) {
2643 ngx_add_timer(downstream->write, clcf->send_timeout);
2644
2645 } else if (downstream->write->timer_set) {
2646 ngx_del_timer(downstream->write);
2647 }
2648 }
2649
2650
2651 static void
2364 ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r) 2652 ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r)
2365 { 2653 {
2366 ngx_event_t *wev; 2654 ngx_event_t *wev;
2367 ngx_connection_t *c; 2655 ngx_connection_t *c;
2368 ngx_http_upstream_t *u; 2656 ngx_http_upstream_t *u;