comparison src/stream/ngx_stream_proxy_module.c @ 7286:d27aa9060c95

Stream: udp streams. Previously, only one client packet could be processed in a udp stream session even though multiple response packets were supported. Now multiple packets coming from the same client address and port are delivered to the same stream session. If it's required to maintain a single stream of data, nginx should be configured in a way that all packets from a client are delivered to the same worker. On Linux and DragonFly BSD the "reuseport" parameter should be specified for this. Other systems do not currently provide appropriate mechanisms. For these systems a single stream of udp packets is only guaranteed in single-worker configurations. The proxy_response directive now specifies how many packets are expected in response to a single client packet.
author Roman Arutyunyan <arut@nginx.com>
date Mon, 04 Jun 2018 19:50:00 +0300
parents ec4d95eed062
children 696df3ac27ac
comparison
equal deleted inserted replaced
7285:88a624c9b491 7286:d27aa9060c95
375 375
376 s->upstream = u; 376 s->upstream = u;
377 377
378 s->log_handler = ngx_stream_proxy_log_error; 378 s->log_handler = ngx_stream_proxy_log_error;
379 379
380 u->requests = 1;
381
380 u->peer.log = c->log; 382 u->peer.log = c->log;
381 u->peer.log_error = NGX_ERROR_ERR; 383 u->peer.log_error = NGX_ERROR_ERR;
382 384
383 if (ngx_stream_proxy_set_local(s, u, pscf->local) != NGX_OK) { 385 if (ngx_stream_proxy_set_local(s, u, pscf->local) != NGX_OK) {
384 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); 386 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
396 if (s->upstream_states == NULL) { 398 if (s->upstream_states == NULL) {
397 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); 399 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
398 return; 400 return;
399 } 401 }
400 402
401 if (c->type == SOCK_STREAM) { 403 p = ngx_pnalloc(c->pool, pscf->buffer_size);
402 p = ngx_pnalloc(c->pool, pscf->buffer_size); 404 if (p == NULL) {
403 if (p == NULL) { 405 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
404 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); 406 return;
405 return; 407 }
406 } 408
407 409 u->downstream_buf.start = p;
408 u->downstream_buf.start = p; 410 u->downstream_buf.end = p + pscf->buffer_size;
409 u->downstream_buf.end = p + pscf->buffer_size; 411 u->downstream_buf.pos = p;
410 u->downstream_buf.pos = p; 412 u->downstream_buf.last = p;
411 u->downstream_buf.last = p; 413
412 414 if (c->read->ready) {
413 if (c->read->ready) { 415 ngx_post_event(c->read, &ngx_posted_events);
414 ngx_post_event(c->read, &ngx_posted_events);
415 }
416 } 416 }
417 417
418 if (pscf->upstream_value) { 418 if (pscf->upstream_value) {
419 if (ngx_stream_proxy_eval(s, pscf) != NGX_OK) { 419 if (ngx_stream_proxy_eval(s, pscf) != NGX_OK) {
420 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); 420 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
827 827
828 *cl->buf = *c->buffer; 828 *cl->buf = *c->buffer;
829 829
830 cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module; 830 cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
831 cl->buf->flush = 1; 831 cl->buf->flush = 1;
832 cl->buf->last_buf = (c->type == SOCK_DGRAM);
833 832
834 cl->next = u->upstream_out; 833 cl->next = u->upstream_out;
835 u->upstream_out = cl; 834 u->upstream_out = cl;
836 } 835 }
837 836
869 u->upstream_out = cl; 868 u->upstream_out = cl;
870 869
871 u->proxy_protocol = 0; 870 u->proxy_protocol = 0;
872 } 871 }
873 872
874 if (c->type == SOCK_DGRAM && pscf->responses == 0) {
875 pc->read->ready = 0;
876 pc->read->eof = 1;
877 }
878
879 u->connected = 1; 873 u->connected = 1;
880 874
881 pc->read->handler = ngx_stream_proxy_upstream_handler; 875 pc->read->handler = ngx_stream_proxy_upstream_handler;
882 pc->write->handler = ngx_stream_proxy_upstream_handler; 876 pc->write->handler = ngx_stream_proxy_upstream_handler;
883 877
884 if (pc->read->ready || pc->read->eof) { 878 if (pc->read->ready) {
885 ngx_post_event(pc->read, &ngx_posted_events); 879 ngx_post_event(pc->read, &ngx_posted_events);
886 } 880 }
887 881
888 ngx_stream_proxy_process(s, 0, 1); 882 ngx_stream_proxy_process(s, 0, 1);
889 } 883 }
1278 1272
1279 static void 1273 static void
1280 ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream) 1274 ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream)
1281 { 1275 {
1282 ngx_connection_t *c, *pc; 1276 ngx_connection_t *c, *pc;
1277 ngx_log_handler_pt handler;
1283 ngx_stream_session_t *s; 1278 ngx_stream_session_t *s;
1284 ngx_stream_upstream_t *u; 1279 ngx_stream_upstream_t *u;
1285 ngx_stream_proxy_srv_conf_t *pscf; 1280 ngx_stream_proxy_srv_conf_t *pscf;
1286 1281
1287 c = ev->data; 1282 c = ev->data;
1326 /* 1321 /*
1327 * successfully terminate timed out UDP session 1322 * successfully terminate timed out UDP session
1328 * with unspecified number of responses 1323 * with unspecified number of responses
1329 */ 1324 */
1330 1325
1331 pc->read->ready = 0; 1326 handler = c->log->handler;
1332 pc->read->eof = 1; 1327 c->log->handler = NULL;
1333 1328
1334 ngx_stream_proxy_process(s, 1, 0); 1329 ngx_log_error(NGX_LOG_INFO, c->log, 0,
1330 "udp timed out"
1331 ", packets from/to client:%ui/%ui"
1332 ", bytes from/to client:%O/%O"
1333 ", bytes from/to upstream:%O/%O",
1334 u->requests, u->responses,
1335 s->received, c->sent, u->received,
1336 pc ? pc->sent : 0);
1337
1338 c->log->handler = handler;
1339
1340 ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
1335 return; 1341 return;
1336 } 1342 }
1337 1343
1338 ngx_connection_error(pc, NGX_ETIMEDOUT, "upstream timed out"); 1344 ngx_connection_error(pc, NGX_ETIMEDOUT, "upstream timed out");
1339 1345
1340 if (u->received == 0) { 1346 pc->read->error = 1;
1341 ngx_stream_proxy_next_upstream(s); 1347
1342 return; 1348 ngx_stream_proxy_finalize(s, NGX_STREAM_BAD_GATEWAY);
1343 } 1349
1344 1350 return;
1345 } else {
1346 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
1347 } 1351 }
1348 1352
1353 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
1354
1349 ngx_stream_proxy_finalize(s, NGX_STREAM_OK); 1355 ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
1356
1350 return; 1357 return;
1351 } 1358 }
1352 1359
1353 } else if (ev->delayed) { 1360 } else if (ev->delayed) {
1354 1361
1451 off_t *received, limit; 1458 off_t *received, limit;
1452 size_t size, limit_rate; 1459 size_t size, limit_rate;
1453 ssize_t n; 1460 ssize_t n;
1454 ngx_buf_t *b; 1461 ngx_buf_t *b;
1455 ngx_int_t rc; 1462 ngx_int_t rc;
1456 ngx_uint_t flags; 1463 ngx_uint_t flags, *packets;
1457 ngx_msec_t delay; 1464 ngx_msec_t delay;
1458 ngx_chain_t *cl, **ll, **out, **busy; 1465 ngx_chain_t *cl, **ll, **out, **busy;
1459 ngx_connection_t *c, *pc, *src, *dst; 1466 ngx_connection_t *c, *pc, *src, *dst;
1460 ngx_log_handler_pt handler; 1467 ngx_log_handler_pt handler;
1461 ngx_stream_upstream_t *u; 1468 ngx_stream_upstream_t *u;
1487 src = pc; 1494 src = pc;
1488 dst = c; 1495 dst = c;
1489 b = &u->upstream_buf; 1496 b = &u->upstream_buf;
1490 limit_rate = pscf->download_rate; 1497 limit_rate = pscf->download_rate;
1491 received = &u->received; 1498 received = &u->received;
1499 packets = &u->responses;
1492 out = &u->downstream_out; 1500 out = &u->downstream_out;
1493 busy = &u->downstream_busy; 1501 busy = &u->downstream_busy;
1494 recv_action = "proxying and reading from upstream"; 1502 recv_action = "proxying and reading from upstream";
1495 send_action = "proxying and sending to client"; 1503 send_action = "proxying and sending to client";
1496 1504
1498 src = c; 1506 src = c;
1499 dst = pc; 1507 dst = pc;
1500 b = &u->downstream_buf; 1508 b = &u->downstream_buf;
1501 limit_rate = pscf->upload_rate; 1509 limit_rate = pscf->upload_rate;
1502 received = &s->received; 1510 received = &s->received;
1511 packets = &u->requests;
1503 out = &u->upstream_out; 1512 out = &u->upstream_out;
1504 busy = &u->upstream_busy; 1513 busy = &u->upstream_busy;
1505 recv_action = "proxying and reading from client"; 1514 recv_action = "proxying and reading from client";
1506 send_action = "proxying and sending to upstream"; 1515 send_action = "proxying and sending to upstream";
1507 } 1516 }
1514 c->log->action = send_action; 1523 c->log->action = send_action;
1515 1524
1516 rc = ngx_stream_top_filter(s, *out, from_upstream); 1525 rc = ngx_stream_top_filter(s, *out, from_upstream);
1517 1526
1518 if (rc == NGX_ERROR) { 1527 if (rc == NGX_ERROR) {
1519 if (c->type == SOCK_DGRAM && !from_upstream) {
1520 ngx_stream_proxy_next_upstream(s);
1521 return;
1522 }
1523
1524 ngx_stream_proxy_finalize(s, NGX_STREAM_OK); 1528 ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
1525 return; 1529 return;
1526 } 1530 }
1527 1531
1528 ngx_chain_update_chains(c->pool, &u->free, busy, out, 1532 ngx_chain_update_chains(c->pool, &u->free, busy, out,
1563 if (n == NGX_AGAIN) { 1567 if (n == NGX_AGAIN) {
1564 break; 1568 break;
1565 } 1569 }
1566 1570
1567 if (n == NGX_ERROR) { 1571 if (n == NGX_ERROR) {
1568 if (c->type == SOCK_DGRAM && u->received == 0) {
1569 ngx_stream_proxy_next_upstream(s);
1570 return;
1571 }
1572
1573 src->read->eof = 1; 1572 src->read->eof = 1;
1574 n = 0; 1573 n = 0;
1575 } 1574 }
1576 1575
1577 if (n >= 0) { 1576 if (n >= 0) {
1589 u->state->first_byte_time = ngx_current_msec 1588 u->state->first_byte_time = ngx_current_msec
1590 - u->state->response_time; 1589 - u->state->response_time;
1591 } 1590 }
1592 } 1591 }
1593 1592
1594 if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses)
1595 {
1596 src->read->ready = 0;
1597 src->read->eof = 1;
1598 }
1599
1600 for (ll = out; *ll; ll = &(*ll)->next) { /* void */ } 1593 for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }
1601 1594
1602 cl = ngx_chain_get_free_buf(c->pool, &u->free); 1595 cl = ngx_chain_get_free_buf(c->pool, &u->free);
1603 if (cl == NULL) { 1596 if (cl == NULL) {
1604 ngx_stream_proxy_finalize(s, 1597 ngx_stream_proxy_finalize(s,
1614 1607
1615 cl->buf->temporary = (n ? 1 : 0); 1608 cl->buf->temporary = (n ? 1 : 0);
1616 cl->buf->last_buf = src->read->eof; 1609 cl->buf->last_buf = src->read->eof;
1617 cl->buf->flush = 1; 1610 cl->buf->flush = 1;
1618 1611
1612 (*packets)++;
1619 *received += n; 1613 *received += n;
1620 b->last += n; 1614 b->last += n;
1621 do_write = 1; 1615 do_write = 1;
1622 1616
1623 continue; 1617 continue;
1627 break; 1621 break;
1628 } 1622 }
1629 1623
1630 c->log->action = "proxying connection"; 1624 c->log->action = "proxying connection";
1631 1625
1632 if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) { 1626 if (c->type == SOCK_DGRAM
1627 && pscf->responses != NGX_MAX_INT32_VALUE
1628 && u->responses >= pscf->responses * u->requests
1629 && !src->buffered && dst && !dst->buffered)
1630 {
1633 handler = c->log->handler; 1631 handler = c->log->handler;
1634 c->log->handler = NULL; 1632 c->log->handler = NULL;
1635 1633
1636 ngx_log_error(NGX_LOG_INFO, c->log, 0, 1634 ngx_log_error(NGX_LOG_INFO, c->log, 0,
1637 "%s%s disconnected" 1635 "udp done"
1636 ", packets from/to client:%ui/%ui"
1638 ", bytes from/to client:%O/%O" 1637 ", bytes from/to client:%O/%O"
1639 ", bytes from/to upstream:%O/%O", 1638 ", bytes from/to upstream:%O/%O",
1640 src->type == SOCK_DGRAM ? "udp " : "", 1639 u->requests, u->responses,
1640 s->received, c->sent, u->received, pc ? pc->sent : 0);
1641
1642 c->log->handler = handler;
1643
1644 ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
1645 return;
1646 }
1647
1648 if (c->type == SOCK_STREAM
1649 && src->read->eof && dst && (dst->read->eof || !dst->buffered))
1650 {
1651 handler = c->log->handler;
1652 c->log->handler = NULL;
1653
1654 ngx_log_error(NGX_LOG_INFO, c->log, 0,
1655 "%s disconnected"
1656 ", bytes from/to client:%O/%O"
1657 ", bytes from/to upstream:%O/%O",
1641 from_upstream ? "upstream" : "client", 1658 from_upstream ? "upstream" : "client",
1642 s->received, c->sent, u->received, pc ? pc->sent : 0); 1659 s->received, c->sent, u->received, pc ? pc->sent : 0);
1643 1660
1644 c->log->handler = handler; 1661 c->log->handler = handler;
1645 1662
1737 1754
1738 1755
1739 static void 1756 static void
1740 ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_uint_t rc) 1757 ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_uint_t rc)
1741 { 1758 {
1759 ngx_uint_t state;
1742 ngx_connection_t *pc; 1760 ngx_connection_t *pc;
1743 ngx_stream_upstream_t *u; 1761 ngx_stream_upstream_t *u;
1744 1762
1745 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, 1763 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
1746 "finalize stream proxy: %i", rc); 1764 "finalize stream proxy: %i", rc);
1766 u->state->bytes_sent = pc->sent; 1784 u->state->bytes_sent = pc->sent;
1767 } 1785 }
1768 } 1786 }
1769 1787
1770 if (u->peer.free && u->peer.sockaddr) { 1788 if (u->peer.free && u->peer.sockaddr) {
1771 u->peer.free(&u->peer, u->peer.data, 0); 1789 state = 0;
1790
1791 if (pc && pc->type == SOCK_DGRAM
1792 && (pc->read->error || pc->write->error))
1793 {
1794 state = NGX_PEER_FAILED;
1795 }
1796
1797 u->peer.free(&u->peer, u->peer.data, state);
1772 u->peer.sockaddr = NULL; 1798 u->peer.sockaddr = NULL;
1773 } 1799 }
1774 1800
1775 if (pc) { 1801 if (pc) {
1776 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, 1802 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,