comparison src/stream/ngx_stream_proxy_module.c @ 6692:56fc55e32f23

Stream: filters.
author Roman Arutyunyan <arut@nginx.com>
date Thu, 15 Sep 2016 14:55:46 +0300
parents c02290241cbe
children edcd9303a4d3
comparison
equal deleted inserted replaced
6691:4bce3edfac2c 6692:56fc55e32f23
82 void *child); 82 void *child);
83 static char *ngx_stream_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd, 83 static char *ngx_stream_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd,
84 void *conf); 84 void *conf);
85 static char *ngx_stream_proxy_bind(ngx_conf_t *cf, ngx_command_t *cmd, 85 static char *ngx_stream_proxy_bind(ngx_conf_t *cf, ngx_command_t *cmd,
86 void *conf); 86 void *conf);
87
88 #if (NGX_STREAM_SSL)
89
87 static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s); 90 static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
88
89 #if (NGX_STREAM_SSL)
90
91 static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, 91 static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf,
92 ngx_command_t *cmd, void *conf); 92 ngx_command_t *cmd, void *conf);
93 static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s); 93 static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s);
94 static void ngx_stream_proxy_ssl_handshake(ngx_connection_t *pc); 94 static void ngx_stream_proxy_ssl_handshake(ngx_connection_t *pc);
95 static ngx_int_t ngx_stream_proxy_ssl_name(ngx_stream_session_t *s); 95 static ngx_int_t ngx_stream_proxy_ssl_name(ngx_stream_session_t *s);
383 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); 383 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
384 return; 384 return;
385 } 385 }
386 386
387 u->peer.type = c->type; 387 u->peer.type = c->type;
388
389 u->proxy_protocol = pscf->proxy_protocol;
390 u->start_sec = ngx_time(); 388 u->start_sec = ngx_time();
391 389
392 c->write->handler = ngx_stream_proxy_downstream_handler; 390 c->write->handler = ngx_stream_proxy_downstream_handler;
393 c->read->handler = ngx_stream_proxy_downstream_handler; 391 c->read->handler = ngx_stream_proxy_downstream_handler;
394 392
408 406
409 u->downstream_buf.start = p; 407 u->downstream_buf.start = p;
410 u->downstream_buf.end = p + pscf->buffer_size; 408 u->downstream_buf.end = p + pscf->buffer_size;
411 u->downstream_buf.pos = p; 409 u->downstream_buf.pos = p;
412 u->downstream_buf.last = p; 410 u->downstream_buf.last = p;
413
414 if (u->proxy_protocol
415 #if (NGX_STREAM_SSL)
416 && pscf->ssl == NULL
417 #endif
418 && pscf->buffer_size >= NGX_PROXY_PROTOCOL_MAX_HEADER)
419 {
420 /* optimization for a typical case */
421
422 ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
423 "stream proxy send PROXY protocol header");
424
425 p = ngx_proxy_protocol_write(c, u->downstream_buf.last,
426 u->downstream_buf.end);
427 if (p == NULL) {
428 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
429 return;
430 }
431
432 u->downstream_buf.last = p;
433 u->proxy_protocol = 0;
434 }
435 411
436 if (c->read->ready) { 412 if (c->read->ready) {
437 ngx_post_event(c->read, &ngx_posted_events); 413 ngx_post_event(c->read, &ngx_posted_events);
438 } 414 }
439 } 415 }
680 656
681 c = s->connection; 657 c = s->connection;
682 658
683 c->log->action = "connecting to upstream"; 659 c->log->action = "connecting to upstream";
684 660
661 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
662
685 u = s->upstream; 663 u = s->upstream;
664
665 u->connected = 0;
666 u->proxy_protocol = pscf->proxy_protocol;
686 667
687 if (u->state) { 668 if (u->state) {
688 u->state->response_time = ngx_current_msec - u->state->response_time; 669 u->state->response_time = ngx_current_msec - u->state->response_time;
689 } 670 }
690 671
738 } 719 }
739 720
740 pc->read->handler = ngx_stream_proxy_connect_handler; 721 pc->read->handler = ngx_stream_proxy_connect_handler;
741 pc->write->handler = ngx_stream_proxy_connect_handler; 722 pc->write->handler = ngx_stream_proxy_connect_handler;
742 723
743 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
744
745 ngx_add_timer(pc->write, pscf->connect_timeout); 724 ngx_add_timer(pc->write, pscf->connect_timeout);
746 } 725 }
747 726
748 727
749 static void 728 static void
750 ngx_stream_proxy_init_upstream(ngx_stream_session_t *s) 729 ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
751 { 730 {
752 int tcp_nodelay; 731 int tcp_nodelay;
753 u_char *p; 732 u_char *p;
733 ngx_chain_t *cl;
754 ngx_connection_t *c, *pc; 734 ngx_connection_t *c, *pc;
755 ngx_log_handler_pt handler; 735 ngx_log_handler_pt handler;
756 ngx_stream_upstream_t *u; 736 ngx_stream_upstream_t *u;
757 ngx_stream_core_srv_conf_t *cscf; 737 ngx_stream_core_srv_conf_t *cscf;
758 ngx_stream_proxy_srv_conf_t *pscf; 738 ngx_stream_proxy_srv_conf_t *pscf;
780 } 760 }
781 761
782 pc->tcp_nodelay = NGX_TCP_NODELAY_SET; 762 pc->tcp_nodelay = NGX_TCP_NODELAY_SET;
783 } 763 }
784 764
785 if (u->proxy_protocol) { 765 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
786 if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) { 766
767 #if (NGX_STREAM_SSL)
768
769 if (pc->type == SOCK_STREAM && pscf->ssl) {
770
771 if (u->proxy_protocol) {
772 if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
773 return;
774 }
775
776 u->proxy_protocol = 0;
777 }
778
779 if (pc->ssl == NULL) {
780 ngx_stream_proxy_ssl_init_connection(s);
787 return; 781 return;
788 } 782 }
789 783 }
790 u->proxy_protocol = 0; 784
791 }
792
793 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
794
795 #if (NGX_STREAM_SSL)
796 if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) {
797 ngx_stream_proxy_ssl_init_connection(s);
798 return;
799 }
800 #endif 785 #endif
801 786
802 c = s->connection; 787 c = s->connection;
803 788
804 if (c->log->log_level >= NGX_LOG_INFO) { 789 if (c->log->log_level >= NGX_LOG_INFO) {
836 u->upstream_buf.end = p + pscf->buffer_size; 821 u->upstream_buf.end = p + pscf->buffer_size;
837 u->upstream_buf.pos = p; 822 u->upstream_buf.pos = p;
838 u->upstream_buf.last = p; 823 u->upstream_buf.last = p;
839 } 824 }
840 825
841 if (c->type == SOCK_DGRAM) { 826 if (c->buffer && c->buffer->pos < c->buffer->last) {
842 s->received = c->buffer->last - c->buffer->pos; 827 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
843 u->downstream_buf = *c->buffer; 828 "stream proxy add preread buffer: %uz",
844 829 c->buffer->last - c->buffer->pos);
845 if (pscf->responses == 0) { 830
846 pc->read->ready = 0; 831 cl = ngx_chain_get_free_buf(c->pool, &u->free);
847 pc->read->eof = 1; 832 if (cl == NULL) {
848 } 833 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
834 return;
835 }
836
837 *cl->buf = *c->buffer;
838
839 cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
840 cl->buf->flush = 1;
841 cl->buf->last_buf = (c->type == SOCK_DGRAM);
842
843 cl->next = u->upstream_out;
844 u->upstream_out = cl;
845 }
846
847 if (u->proxy_protocol) {
848 ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
849 "stream proxy add PROXY protocol header");
850
851 cl = ngx_chain_get_free_buf(c->pool, &u->free);
852 if (cl == NULL) {
853 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
854 return;
855 }
856
857 p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER);
858 if (p == NULL) {
859 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
860 return;
861 }
862
863 cl->buf->pos = p;
864
865 p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER);
866 if (p == NULL) {
867 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
868 return;
869 }
870
871 cl->buf->last = p;
872 cl->buf->temporary = 1;
873 cl->buf->flush = 0;
874 cl->buf->last_buf = 0;
875 cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
876
877 cl->next = u->upstream_out;
878 u->upstream_out = cl;
879
880 u->proxy_protocol = 0;
881 }
882
883 if (c->type == SOCK_DGRAM && pscf->responses == 0) {
884 pc->read->ready = 0;
885 pc->read->eof = 1;
849 } 886 }
850 887
851 u->connected = 1; 888 u->connected = 1;
852 889
853 pc->read->handler = ngx_stream_proxy_upstream_handler; 890 pc->read->handler = ngx_stream_proxy_upstream_handler;
858 } 895 }
859 896
860 ngx_stream_proxy_process(s, 0, 1); 897 ngx_stream_proxy_process(s, 0, 1);
861 } 898 }
862 899
900
901 #if (NGX_STREAM_SSL)
863 902
864 static ngx_int_t 903 static ngx_int_t
865 ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s) 904 ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s)
866 { 905 {
867 u_char *p; 906 u_char *p;
929 968
930 return NGX_OK; 969 return NGX_OK;
931 } 970 }
932 971
933 972
934 #if (NGX_STREAM_SSL)
935
936 static char * 973 static char *
937 ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, 974 ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd,
938 void *conf) 975 void *conf)
939 { 976 {
940 ngx_stream_proxy_srv_conf_t *pscf = conf; 977 ngx_stream_proxy_srv_conf_t *pscf = conf;
1410 { 1447 {
1411 off_t *received, limit; 1448 off_t *received, limit;
1412 size_t size, limit_rate; 1449 size_t size, limit_rate;
1413 ssize_t n; 1450 ssize_t n;
1414 ngx_buf_t *b; 1451 ngx_buf_t *b;
1452 ngx_int_t rc;
1415 ngx_uint_t flags; 1453 ngx_uint_t flags;
1416 ngx_msec_t delay; 1454 ngx_msec_t delay;
1455 ngx_chain_t *cl, **ll, **out, **busy;
1417 ngx_connection_t *c, *pc, *src, *dst; 1456 ngx_connection_t *c, *pc, *src, *dst;
1418 ngx_log_handler_pt handler; 1457 ngx_log_handler_pt handler;
1419 ngx_stream_upstream_t *u; 1458 ngx_stream_upstream_t *u;
1420 ngx_stream_proxy_srv_conf_t *pscf; 1459 ngx_stream_proxy_srv_conf_t *pscf;
1421 1460
1445 src = pc; 1484 src = pc;
1446 dst = c; 1485 dst = c;
1447 b = &u->upstream_buf; 1486 b = &u->upstream_buf;
1448 limit_rate = pscf->download_rate; 1487 limit_rate = pscf->download_rate;
1449 received = &u->received; 1488 received = &u->received;
1489 out = &u->downstream_out;
1490 busy = &u->downstream_busy;
1450 1491
1451 } else { 1492 } else {
1452 src = c; 1493 src = c;
1453 dst = pc; 1494 dst = pc;
1454 b = &u->downstream_buf; 1495 b = &u->downstream_buf;
1455 limit_rate = pscf->upload_rate; 1496 limit_rate = pscf->upload_rate;
1456 received = &s->received; 1497 received = &s->received;
1498 out = &u->upstream_out;
1499 busy = &u->upstream_busy;
1457 } 1500 }
1458 1501
1459 for ( ;; ) { 1502 for ( ;; ) {
1460 1503
1461 if (do_write) { 1504 if (do_write && dst) {
1462 1505
1463 size = b->last - b->pos; 1506 if (*out || *busy || dst->buffered) {
1464 1507 rc = ngx_stream_top_filter(s, *out, from_upstream);
1465 if (size && dst && dst->write->ready) { 1508
1466 1509 if (rc == NGX_ERROR) {
1467 n = dst->send(dst, b->pos, size);
1468
1469 if (n == NGX_AGAIN && dst->shared) {
1470 /* cannot wait on a shared socket */
1471 n = NGX_ERROR;
1472 }
1473
1474 if (n == NGX_ERROR) {
1475 if (c->type == SOCK_DGRAM && !from_upstream) { 1510 if (c->type == SOCK_DGRAM && !from_upstream) {
1476 ngx_stream_proxy_next_upstream(s); 1511 ngx_stream_proxy_next_upstream(s);
1477 return; 1512 return;
1478 } 1513 }
1479 1514
1480 ngx_stream_proxy_finalize(s, NGX_STREAM_OK); 1515 ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
1481 return; 1516 return;
1482 } 1517 }
1483 1518
1484 if (n > 0) { 1519 ngx_chain_update_chains(c->pool, &u->free, busy, out,
1485 b->pos += n; 1520 (ngx_buf_tag_t) &ngx_stream_proxy_module);
1486 1521
1487 if (b->pos == b->last) { 1522 if (*busy == NULL) {
1488 b->pos = b->start; 1523 b->pos = b->start;
1489 b->last = b->start; 1524 b->last = b->start;
1490 }
1491 } 1525 }
1492 } 1526 }
1493 } 1527 }
1494 1528
1495 size = b->end - b->last; 1529 size = b->end - b->last;
1512 } 1546 }
1513 } 1547 }
1514 1548
1515 n = src->recv(src, b->last, size); 1549 n = src->recv(src, b->last, size);
1516 1550
1517 if (n == NGX_AGAIN || n == 0) { 1551 if (n == NGX_AGAIN) {
1518 break; 1552 break;
1519 } 1553 }
1520 1554
1521 if (n > 0) { 1555 if (n == NGX_ERROR) {
1556 if (c->type == SOCK_DGRAM && u->received == 0) {
1557 ngx_stream_proxy_next_upstream(s);
1558 return;
1559 }
1560
1561 src->read->eof = 1;
1562 n = 0;
1563 }
1564
1565 if (n >= 0) {
1522 if (limit_rate) { 1566 if (limit_rate) {
1523 delay = (ngx_msec_t) (n * 1000 / limit_rate); 1567 delay = (ngx_msec_t) (n * 1000 / limit_rate);
1524 1568
1525 if (delay > 0) { 1569 if (delay > 0) {
1526 src->read->delayed = 1; 1570 src->read->delayed = 1;
1539 { 1583 {
1540 src->read->ready = 0; 1584 src->read->ready = 0;
1541 src->read->eof = 1; 1585 src->read->eof = 1;
1542 } 1586 }
1543 1587
1588 for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }
1589
1590 cl = ngx_chain_get_free_buf(c->pool, &u->free);
1591 if (cl == NULL) {
1592 ngx_stream_proxy_finalize(s,
1593 NGX_STREAM_INTERNAL_SERVER_ERROR);
1594 return;
1595 }
1596
1597 *ll = cl;
1598
1599 cl->buf->pos = b->last;
1600 cl->buf->last = b->last + n;
1601 cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
1602
1603 cl->buf->temporary = (n ? 1 : 0);
1604 cl->buf->last_buf = src->read->eof;
1605 cl->buf->flush = 1;
1606
1544 *received += n; 1607 *received += n;
1545 b->last += n; 1608 b->last += n;
1546 do_write = 1; 1609 do_write = 1;
1547 1610
1548 continue; 1611 continue;
1549 } 1612 }
1550
1551 if (n == NGX_ERROR) {
1552 if (c->type == SOCK_DGRAM && u->received == 0) {
1553 ngx_stream_proxy_next_upstream(s);
1554 return;
1555 }
1556
1557 src->read->eof = 1;
1558 }
1559 } 1613 }
1560 1614
1561 break; 1615 break;
1562 } 1616 }
1563 1617
1564 if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) { 1618 if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) {
1565 handler = c->log->handler; 1619 handler = c->log->handler;
1566 c->log->handler = NULL; 1620 c->log->handler = NULL;
1567 1621
1568 ngx_log_error(NGX_LOG_INFO, c->log, 0, 1622 ngx_log_error(NGX_LOG_INFO, c->log, 0,
1569 "%s%s disconnected" 1623 "%s%s disconnected"
1612 1666
1613 ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, 1667 ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
1614 "stream proxy next upstream"); 1668 "stream proxy next upstream");
1615 1669
1616 u = s->upstream; 1670 u = s->upstream;
1671 pc = u->peer.connection;
1672
1673 if (u->upstream_out || u->upstream_busy || (pc && pc->buffered)) {
1674 ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
1675 "pending buffers on next upstream");
1676 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
1677 return;
1678 }
1617 1679
1618 if (u->peer.sockaddr) { 1680 if (u->peer.sockaddr) {
1619 u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED); 1681 u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED);
1620 u->peer.sockaddr = NULL; 1682 u->peer.sockaddr = NULL;
1621 } 1683 }
1629 || (timeout && ngx_current_msec - u->peer.start_time >= timeout)) 1691 || (timeout && ngx_current_msec - u->peer.start_time >= timeout))
1630 { 1692 {
1631 ngx_stream_proxy_finalize(s, NGX_STREAM_BAD_GATEWAY); 1693 ngx_stream_proxy_finalize(s, NGX_STREAM_BAD_GATEWAY);
1632 return; 1694 return;
1633 } 1695 }
1634
1635 pc = u->peer.connection;
1636 1696
1637 if (pc) { 1697 if (pc) {
1638 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, 1698 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
1639 "close proxy upstream connection: %d", pc->fd); 1699 "close proxy upstream connection: %d", pc->fd);
1640 1700