Mercurial > hg > nginx-quic
comparison src/stream/ngx_stream_proxy_module.c @ 7286:d27aa9060c95
Stream: udp streams.
Previously, only one client packet could be processed in a udp stream session
even though multiple response packets were supported. Now multiple packets
coming from the same client address and port are delivered to the same stream
session.
If it's required to maintain a single stream of data, nginx should be
configured in a way that all packets from a client are delivered to the same
worker. On Linux and DragonFly BSD the "reuseport" parameter should be
specified for this. Other systems do not currently provide appropriate
mechanisms. For these systems a single stream of udp packets is only
guaranteed in single-worker configurations.
The proxy_response directive now specifies how many packets are expected in
response to a single client packet.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Mon, 04 Jun 2018 19:50:00 +0300 |
parents | ec4d95eed062 |
children | 696df3ac27ac |
comparison
equal
deleted
inserted
replaced
7285:88a624c9b491 | 7286:d27aa9060c95 |
---|---|
375 | 375 |
376 s->upstream = u; | 376 s->upstream = u; |
377 | 377 |
378 s->log_handler = ngx_stream_proxy_log_error; | 378 s->log_handler = ngx_stream_proxy_log_error; |
379 | 379 |
380 u->requests = 1; | |
381 | |
380 u->peer.log = c->log; | 382 u->peer.log = c->log; |
381 u->peer.log_error = NGX_ERROR_ERR; | 383 u->peer.log_error = NGX_ERROR_ERR; |
382 | 384 |
383 if (ngx_stream_proxy_set_local(s, u, pscf->local) != NGX_OK) { | 385 if (ngx_stream_proxy_set_local(s, u, pscf->local) != NGX_OK) { |
384 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); | 386 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); |
396 if (s->upstream_states == NULL) { | 398 if (s->upstream_states == NULL) { |
397 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); | 399 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); |
398 return; | 400 return; |
399 } | 401 } |
400 | 402 |
401 if (c->type == SOCK_STREAM) { | 403 p = ngx_pnalloc(c->pool, pscf->buffer_size); |
402 p = ngx_pnalloc(c->pool, pscf->buffer_size); | 404 if (p == NULL) { |
403 if (p == NULL) { | 405 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); |
404 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); | 406 return; |
405 return; | 407 } |
406 } | 408 |
407 | 409 u->downstream_buf.start = p; |
408 u->downstream_buf.start = p; | 410 u->downstream_buf.end = p + pscf->buffer_size; |
409 u->downstream_buf.end = p + pscf->buffer_size; | 411 u->downstream_buf.pos = p; |
410 u->downstream_buf.pos = p; | 412 u->downstream_buf.last = p; |
411 u->downstream_buf.last = p; | 413 |
412 | 414 if (c->read->ready) { |
413 if (c->read->ready) { | 415 ngx_post_event(c->read, &ngx_posted_events); |
414 ngx_post_event(c->read, &ngx_posted_events); | |
415 } | |
416 } | 416 } |
417 | 417 |
418 if (pscf->upstream_value) { | 418 if (pscf->upstream_value) { |
419 if (ngx_stream_proxy_eval(s, pscf) != NGX_OK) { | 419 if (ngx_stream_proxy_eval(s, pscf) != NGX_OK) { |
420 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); | 420 ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); |
827 | 827 |
828 *cl->buf = *c->buffer; | 828 *cl->buf = *c->buffer; |
829 | 829 |
830 cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module; | 830 cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module; |
831 cl->buf->flush = 1; | 831 cl->buf->flush = 1; |
832 cl->buf->last_buf = (c->type == SOCK_DGRAM); | |
833 | 832 |
834 cl->next = u->upstream_out; | 833 cl->next = u->upstream_out; |
835 u->upstream_out = cl; | 834 u->upstream_out = cl; |
836 } | 835 } |
837 | 836 |
869 u->upstream_out = cl; | 868 u->upstream_out = cl; |
870 | 869 |
871 u->proxy_protocol = 0; | 870 u->proxy_protocol = 0; |
872 } | 871 } |
873 | 872 |
874 if (c->type == SOCK_DGRAM && pscf->responses == 0) { | |
875 pc->read->ready = 0; | |
876 pc->read->eof = 1; | |
877 } | |
878 | |
879 u->connected = 1; | 873 u->connected = 1; |
880 | 874 |
881 pc->read->handler = ngx_stream_proxy_upstream_handler; | 875 pc->read->handler = ngx_stream_proxy_upstream_handler; |
882 pc->write->handler = ngx_stream_proxy_upstream_handler; | 876 pc->write->handler = ngx_stream_proxy_upstream_handler; |
883 | 877 |
884 if (pc->read->ready || pc->read->eof) { | 878 if (pc->read->ready) { |
885 ngx_post_event(pc->read, &ngx_posted_events); | 879 ngx_post_event(pc->read, &ngx_posted_events); |
886 } | 880 } |
887 | 881 |
888 ngx_stream_proxy_process(s, 0, 1); | 882 ngx_stream_proxy_process(s, 0, 1); |
889 } | 883 } |
1278 | 1272 |
1279 static void | 1273 static void |
1280 ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream) | 1274 ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream) |
1281 { | 1275 { |
1282 ngx_connection_t *c, *pc; | 1276 ngx_connection_t *c, *pc; |
1277 ngx_log_handler_pt handler; | |
1283 ngx_stream_session_t *s; | 1278 ngx_stream_session_t *s; |
1284 ngx_stream_upstream_t *u; | 1279 ngx_stream_upstream_t *u; |
1285 ngx_stream_proxy_srv_conf_t *pscf; | 1280 ngx_stream_proxy_srv_conf_t *pscf; |
1286 | 1281 |
1287 c = ev->data; | 1282 c = ev->data; |
1326 /* | 1321 /* |
1327 * successfully terminate timed out UDP session | 1322 * successfully terminate timed out UDP session |
1328 * with unspecified number of responses | 1323 * with unspecified number of responses |
1329 */ | 1324 */ |
1330 | 1325 |
1331 pc->read->ready = 0; | 1326 handler = c->log->handler; |
1332 pc->read->eof = 1; | 1327 c->log->handler = NULL; |
1333 | 1328 |
1334 ngx_stream_proxy_process(s, 1, 0); | 1329 ngx_log_error(NGX_LOG_INFO, c->log, 0, |
1330 "udp timed out" | |
1331 ", packets from/to client:%ui/%ui" | |
1332 ", bytes from/to client:%O/%O" | |
1333 ", bytes from/to upstream:%O/%O", | |
1334 u->requests, u->responses, | |
1335 s->received, c->sent, u->received, | |
1336 pc ? pc->sent : 0); | |
1337 | |
1338 c->log->handler = handler; | |
1339 | |
1340 ngx_stream_proxy_finalize(s, NGX_STREAM_OK); | |
1335 return; | 1341 return; |
1336 } | 1342 } |
1337 | 1343 |
1338 ngx_connection_error(pc, NGX_ETIMEDOUT, "upstream timed out"); | 1344 ngx_connection_error(pc, NGX_ETIMEDOUT, "upstream timed out"); |
1339 | 1345 |
1340 if (u->received == 0) { | 1346 pc->read->error = 1; |
1341 ngx_stream_proxy_next_upstream(s); | 1347 |
1342 return; | 1348 ngx_stream_proxy_finalize(s, NGX_STREAM_BAD_GATEWAY); |
1343 } | 1349 |
1344 | 1350 return; |
1345 } else { | |
1346 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); | |
1347 } | 1351 } |
1348 | 1352 |
1353 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); | |
1354 | |
1349 ngx_stream_proxy_finalize(s, NGX_STREAM_OK); | 1355 ngx_stream_proxy_finalize(s, NGX_STREAM_OK); |
1356 | |
1350 return; | 1357 return; |
1351 } | 1358 } |
1352 | 1359 |
1353 } else if (ev->delayed) { | 1360 } else if (ev->delayed) { |
1354 | 1361 |
1451 off_t *received, limit; | 1458 off_t *received, limit; |
1452 size_t size, limit_rate; | 1459 size_t size, limit_rate; |
1453 ssize_t n; | 1460 ssize_t n; |
1454 ngx_buf_t *b; | 1461 ngx_buf_t *b; |
1455 ngx_int_t rc; | 1462 ngx_int_t rc; |
1456 ngx_uint_t flags; | 1463 ngx_uint_t flags, *packets; |
1457 ngx_msec_t delay; | 1464 ngx_msec_t delay; |
1458 ngx_chain_t *cl, **ll, **out, **busy; | 1465 ngx_chain_t *cl, **ll, **out, **busy; |
1459 ngx_connection_t *c, *pc, *src, *dst; | 1466 ngx_connection_t *c, *pc, *src, *dst; |
1460 ngx_log_handler_pt handler; | 1467 ngx_log_handler_pt handler; |
1461 ngx_stream_upstream_t *u; | 1468 ngx_stream_upstream_t *u; |
1487 src = pc; | 1494 src = pc; |
1488 dst = c; | 1495 dst = c; |
1489 b = &u->upstream_buf; | 1496 b = &u->upstream_buf; |
1490 limit_rate = pscf->download_rate; | 1497 limit_rate = pscf->download_rate; |
1491 received = &u->received; | 1498 received = &u->received; |
1499 packets = &u->responses; | |
1492 out = &u->downstream_out; | 1500 out = &u->downstream_out; |
1493 busy = &u->downstream_busy; | 1501 busy = &u->downstream_busy; |
1494 recv_action = "proxying and reading from upstream"; | 1502 recv_action = "proxying and reading from upstream"; |
1495 send_action = "proxying and sending to client"; | 1503 send_action = "proxying and sending to client"; |
1496 | 1504 |
1498 src = c; | 1506 src = c; |
1499 dst = pc; | 1507 dst = pc; |
1500 b = &u->downstream_buf; | 1508 b = &u->downstream_buf; |
1501 limit_rate = pscf->upload_rate; | 1509 limit_rate = pscf->upload_rate; |
1502 received = &s->received; | 1510 received = &s->received; |
1511 packets = &u->requests; | |
1503 out = &u->upstream_out; | 1512 out = &u->upstream_out; |
1504 busy = &u->upstream_busy; | 1513 busy = &u->upstream_busy; |
1505 recv_action = "proxying and reading from client"; | 1514 recv_action = "proxying and reading from client"; |
1506 send_action = "proxying and sending to upstream"; | 1515 send_action = "proxying and sending to upstream"; |
1507 } | 1516 } |
1514 c->log->action = send_action; | 1523 c->log->action = send_action; |
1515 | 1524 |
1516 rc = ngx_stream_top_filter(s, *out, from_upstream); | 1525 rc = ngx_stream_top_filter(s, *out, from_upstream); |
1517 | 1526 |
1518 if (rc == NGX_ERROR) { | 1527 if (rc == NGX_ERROR) { |
1519 if (c->type == SOCK_DGRAM && !from_upstream) { | |
1520 ngx_stream_proxy_next_upstream(s); | |
1521 return; | |
1522 } | |
1523 | |
1524 ngx_stream_proxy_finalize(s, NGX_STREAM_OK); | 1528 ngx_stream_proxy_finalize(s, NGX_STREAM_OK); |
1525 return; | 1529 return; |
1526 } | 1530 } |
1527 | 1531 |
1528 ngx_chain_update_chains(c->pool, &u->free, busy, out, | 1532 ngx_chain_update_chains(c->pool, &u->free, busy, out, |
1563 if (n == NGX_AGAIN) { | 1567 if (n == NGX_AGAIN) { |
1564 break; | 1568 break; |
1565 } | 1569 } |
1566 | 1570 |
1567 if (n == NGX_ERROR) { | 1571 if (n == NGX_ERROR) { |
1568 if (c->type == SOCK_DGRAM && u->received == 0) { | |
1569 ngx_stream_proxy_next_upstream(s); | |
1570 return; | |
1571 } | |
1572 | |
1573 src->read->eof = 1; | 1572 src->read->eof = 1; |
1574 n = 0; | 1573 n = 0; |
1575 } | 1574 } |
1576 | 1575 |
1577 if (n >= 0) { | 1576 if (n >= 0) { |
1589 u->state->first_byte_time = ngx_current_msec | 1588 u->state->first_byte_time = ngx_current_msec |
1590 - u->state->response_time; | 1589 - u->state->response_time; |
1591 } | 1590 } |
1592 } | 1591 } |
1593 | 1592 |
1594 if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses) | |
1595 { | |
1596 src->read->ready = 0; | |
1597 src->read->eof = 1; | |
1598 } | |
1599 | |
1600 for (ll = out; *ll; ll = &(*ll)->next) { /* void */ } | 1593 for (ll = out; *ll; ll = &(*ll)->next) { /* void */ } |
1601 | 1594 |
1602 cl = ngx_chain_get_free_buf(c->pool, &u->free); | 1595 cl = ngx_chain_get_free_buf(c->pool, &u->free); |
1603 if (cl == NULL) { | 1596 if (cl == NULL) { |
1604 ngx_stream_proxy_finalize(s, | 1597 ngx_stream_proxy_finalize(s, |
1614 | 1607 |
1615 cl->buf->temporary = (n ? 1 : 0); | 1608 cl->buf->temporary = (n ? 1 : 0); |
1616 cl->buf->last_buf = src->read->eof; | 1609 cl->buf->last_buf = src->read->eof; |
1617 cl->buf->flush = 1; | 1610 cl->buf->flush = 1; |
1618 | 1611 |
1612 (*packets)++; | |
1619 *received += n; | 1613 *received += n; |
1620 b->last += n; | 1614 b->last += n; |
1621 do_write = 1; | 1615 do_write = 1; |
1622 | 1616 |
1623 continue; | 1617 continue; |
1627 break; | 1621 break; |
1628 } | 1622 } |
1629 | 1623 |
1630 c->log->action = "proxying connection"; | 1624 c->log->action = "proxying connection"; |
1631 | 1625 |
1632 if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) { | 1626 if (c->type == SOCK_DGRAM |
1627 && pscf->responses != NGX_MAX_INT32_VALUE | |
1628 && u->responses >= pscf->responses * u->requests | |
1629 && !src->buffered && dst && !dst->buffered) | |
1630 { | |
1633 handler = c->log->handler; | 1631 handler = c->log->handler; |
1634 c->log->handler = NULL; | 1632 c->log->handler = NULL; |
1635 | 1633 |
1636 ngx_log_error(NGX_LOG_INFO, c->log, 0, | 1634 ngx_log_error(NGX_LOG_INFO, c->log, 0, |
1637 "%s%s disconnected" | 1635 "udp done" |
1636 ", packets from/to client:%ui/%ui" | |
1638 ", bytes from/to client:%O/%O" | 1637 ", bytes from/to client:%O/%O" |
1639 ", bytes from/to upstream:%O/%O", | 1638 ", bytes from/to upstream:%O/%O", |
1640 src->type == SOCK_DGRAM ? "udp " : "", | 1639 u->requests, u->responses, |
1640 s->received, c->sent, u->received, pc ? pc->sent : 0); | |
1641 | |
1642 c->log->handler = handler; | |
1643 | |
1644 ngx_stream_proxy_finalize(s, NGX_STREAM_OK); | |
1645 return; | |
1646 } | |
1647 | |
1648 if (c->type == SOCK_STREAM | |
1649 && src->read->eof && dst && (dst->read->eof || !dst->buffered)) | |
1650 { | |
1651 handler = c->log->handler; | |
1652 c->log->handler = NULL; | |
1653 | |
1654 ngx_log_error(NGX_LOG_INFO, c->log, 0, | |
1655 "%s disconnected" | |
1656 ", bytes from/to client:%O/%O" | |
1657 ", bytes from/to upstream:%O/%O", | |
1641 from_upstream ? "upstream" : "client", | 1658 from_upstream ? "upstream" : "client", |
1642 s->received, c->sent, u->received, pc ? pc->sent : 0); | 1659 s->received, c->sent, u->received, pc ? pc->sent : 0); |
1643 | 1660 |
1644 c->log->handler = handler; | 1661 c->log->handler = handler; |
1645 | 1662 |
1737 | 1754 |
1738 | 1755 |
1739 static void | 1756 static void |
1740 ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_uint_t rc) | 1757 ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_uint_t rc) |
1741 { | 1758 { |
1759 ngx_uint_t state; | |
1742 ngx_connection_t *pc; | 1760 ngx_connection_t *pc; |
1743 ngx_stream_upstream_t *u; | 1761 ngx_stream_upstream_t *u; |
1744 | 1762 |
1745 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, | 1763 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, |
1746 "finalize stream proxy: %i", rc); | 1764 "finalize stream proxy: %i", rc); |
1766 u->state->bytes_sent = pc->sent; | 1784 u->state->bytes_sent = pc->sent; |
1767 } | 1785 } |
1768 } | 1786 } |
1769 | 1787 |
1770 if (u->peer.free && u->peer.sockaddr) { | 1788 if (u->peer.free && u->peer.sockaddr) { |
1771 u->peer.free(&u->peer, u->peer.data, 0); | 1789 state = 0; |
1790 | |
1791 if (pc && pc->type == SOCK_DGRAM | |
1792 && (pc->read->error || pc->write->error)) | |
1793 { | |
1794 state = NGX_PEER_FAILED; | |
1795 } | |
1796 | |
1797 u->peer.free(&u->peer, u->peer.data, state); | |
1772 u->peer.sockaddr = NULL; | 1798 u->peer.sockaddr = NULL; |
1773 } | 1799 } |
1774 | 1800 |
1775 if (pc) { | 1801 if (pc) { |
1776 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, | 1802 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, |