Mercurial > hg > nginx
comparison src/event/ngx_event_quic.c @ 8334:72d20158c814 quic
Added reordering support for STREAM frames.
Each stream node now includes incoming frames queue and sent/received counters
for tracking offset. The sent counter is not used, c->sent is used, not like
in crypto buffers, which have no connections.
author | Vladimir Homutov <vl@nginx.com> |
---|---|
date | Wed, 15 Apr 2020 11:11:54 +0300 |
parents | 167d32476737 |
children | 76839f55bc48 |
comparison
equal
deleted
inserted
replaced
8333:167d32476737 | 8334:72d20158c814 |
---|---|
66 uint64_t largest_ack; /* number received from peer */ | 66 uint64_t largest_ack; /* number received from peer */ |
67 | 67 |
68 ngx_queue_t frames; | 68 ngx_queue_t frames; |
69 ngx_queue_t sent; | 69 ngx_queue_t sent; |
70 } ngx_quic_send_ctx_t; | 70 } ngx_quic_send_ctx_t; |
71 | |
72 | |
73 /* ordered frames stream context */ | |
74 typedef struct { | |
75 uint64_t sent; | |
76 uint64_t received; | |
77 ngx_queue_t frames; | |
78 size_t total; /* size of buffered data */ | |
79 } ngx_quic_frames_stream_t; | |
80 | 71 |
81 | 72 |
82 struct ngx_quic_connection_s { | 73 struct ngx_quic_connection_s { |
83 ngx_str_t scid; | 74 ngx_str_t scid; |
84 ngx_str_t dcid; | 75 ngx_str_t dcid; |
175 ngx_quic_frame_handler_pt handler); | 166 ngx_quic_frame_handler_pt handler); |
176 | 167 |
177 static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c, | 168 static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c, |
178 ngx_quic_frame_t *frame); | 169 ngx_quic_frame_t *frame); |
179 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, | 170 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, |
180 ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame); | 171 ngx_quic_header_t *pkt, ngx_quic_frame_t *frame); |
172 static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c, | |
173 ngx_quic_frame_t *frame); | |
174 static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c, | |
175 ngx_quic_stream_frame_t *f); | |
176 | |
181 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c); | 177 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c); |
182 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, | 178 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, |
183 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); | 179 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); |
184 static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, | 180 static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, |
185 ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f); | 181 ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f); |
737 ngx_quic_close_connection(ngx_connection_t *c) | 733 ngx_quic_close_connection(ngx_connection_t *c) |
738 { | 734 { |
739 #if (NGX_DEBUG) | 735 #if (NGX_DEBUG) |
740 ngx_uint_t ns; | 736 ngx_uint_t ns; |
741 #endif | 737 #endif |
738 ngx_uint_t i; | |
742 ngx_pool_t *pool; | 739 ngx_pool_t *pool; |
743 ngx_event_t *rev; | 740 ngx_event_t *rev; |
744 ngx_rbtree_t *tree; | 741 ngx_rbtree_t *tree; |
745 ngx_rbtree_node_t *node; | 742 ngx_rbtree_node_t *node; |
746 ngx_quic_stream_t *qs; | 743 ngx_quic_stream_t *qs; |
747 ngx_quic_connection_t *qc; | 744 ngx_quic_connection_t *qc; |
748 | 745 |
749 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection"); | 746 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection"); |
750 | 747 |
751 // TODO: free frames from reorder queue if any | |
752 | |
753 qc = c->quic; | 748 qc = c->quic; |
754 | 749 |
755 if (qc) { | 750 if (qc) { |
751 | |
752 for (i = 0; i < NGX_QUIC_ENCRYPTION_LAST; i++) { | |
753 ngx_quic_free_frames(c, &qc->crypto[i].frames); | |
754 } | |
755 | |
756 qc->closing = 1; | 756 qc->closing = 1; |
757 tree = &qc->streams.tree; | 757 tree = &qc->streams.tree; |
758 | 758 |
759 if (tree->root != tree->sentinel) { | 759 if (tree->root != tree->sentinel) { |
760 if (c->read->timer_set) { | 760 if (c->read->timer_set) { |
1199 case NGX_QUIC_FT_STREAM4: | 1199 case NGX_QUIC_FT_STREAM4: |
1200 case NGX_QUIC_FT_STREAM5: | 1200 case NGX_QUIC_FT_STREAM5: |
1201 case NGX_QUIC_FT_STREAM6: | 1201 case NGX_QUIC_FT_STREAM6: |
1202 case NGX_QUIC_FT_STREAM7: | 1202 case NGX_QUIC_FT_STREAM7: |
1203 | 1203 |
1204 if (ngx_quic_handle_stream_frame(c, pkt, &frame.u.stream) | 1204 if (ngx_quic_handle_stream_frame(c, pkt, &frame) != NGX_OK) { |
1205 != NGX_OK) | |
1206 { | |
1207 return NGX_ERROR; | 1205 return NGX_ERROR; |
1208 } | 1206 } |
1209 | 1207 |
1210 ack_this = 1; | 1208 ack_this = 1; |
1211 break; | 1209 break; |
1439 static ngx_int_t | 1437 static ngx_int_t |
1440 ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs, | 1438 ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs, |
1441 ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler) | 1439 ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler) |
1442 { | 1440 { |
1443 size_t full_len; | 1441 size_t full_len; |
1442 ngx_int_t rc; | |
1444 ngx_queue_t *q; | 1443 ngx_queue_t *q; |
1445 ngx_quic_ordered_frame_t *f; | 1444 ngx_quic_ordered_frame_t *f; |
1446 | 1445 |
1447 f = &frame->u.ord; | 1446 f = &frame->u.ord; |
1448 | 1447 |
1466 /* intersecting data range, frame modified */ | 1465 /* intersecting data range, frame modified */ |
1467 } | 1466 } |
1468 | 1467 |
1469 /* f->offset == fs->received */ | 1468 /* f->offset == fs->received */ |
1470 | 1469 |
1471 if (handler(c, frame) != NGX_OK) { | 1470 rc = handler(c, frame); |
1472 return NGX_ERROR; | 1471 if (rc == NGX_ERROR) { |
1473 } | 1472 return NGX_ERROR; |
1473 | |
1474 } else if (rc == NGX_DONE) { | |
1475 /* handler destroyed stream, queue no longer exists */ | |
1476 return NGX_OK; | |
1477 } | |
1478 | |
1479 /* rc == NGX_OK */ | |
1474 | 1480 |
1475 fs->received += f->length; | 1481 fs->received += f->length; |
1476 | 1482 |
1477 /* now check the queue if we can continue with buffered frames */ | 1483 /* now check the queue if we can continue with buffered frames */ |
1478 | 1484 |
1510 /* frame was adjusted, proceed to input */ | 1516 /* frame was adjusted, proceed to input */ |
1511 } | 1517 } |
1512 | 1518 |
1513 /* f->offset == fs->received */ | 1519 /* f->offset == fs->received */ |
1514 | 1520 |
1515 if (handler(c, frame) != NGX_OK) { | 1521 rc = handler(c, frame); |
1522 | |
1523 if (rc == NGX_ERROR) { | |
1516 return NGX_ERROR; | 1524 return NGX_ERROR; |
1525 | |
1526 } else if (rc == NGX_DONE) { | |
1527 /* handler destroyed stream, queue no longer exists */ | |
1528 return NGX_OK; | |
1517 } | 1529 } |
1518 | 1530 |
1519 fs->received += f->length; | 1531 fs->received += f->length; |
1520 fs->total -= full_len; | 1532 fs->total -= full_len; |
1521 | 1533 |
1719 return NGX_OK; | 1731 return NGX_OK; |
1720 } | 1732 } |
1721 | 1733 |
1722 | 1734 |
1723 static ngx_int_t | 1735 static ngx_int_t |
1724 ngx_quic_handle_stream_frame(ngx_connection_t *c, | 1736 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, |
1725 ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f) | 1737 ngx_quic_frame_t *frame) |
1726 { | 1738 { |
1727 size_t n; | 1739 ngx_quic_stream_t *sn; |
1728 ngx_buf_t *b; | 1740 ngx_quic_connection_t *qc; |
1729 ngx_event_t *rev; | 1741 ngx_quic_stream_frame_t *f; |
1730 ngx_quic_stream_t *sn; | 1742 ngx_quic_frames_stream_t *fs; |
1731 ngx_quic_connection_t *qc; | |
1732 | 1743 |
1733 qc = c->quic; | 1744 qc = c->quic; |
1745 f = &frame->u.stream; | |
1734 | 1746 |
1735 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); | 1747 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); |
1736 | 1748 |
1737 if (sn) { | 1749 if (sn == NULL) { |
1750 sn = ngx_quic_add_stream(c, f); | |
1751 if (sn == NULL) { | |
1752 return NGX_ERROR; | |
1753 } | |
1754 } | |
1755 | |
1756 fs = &sn->fs; | |
1757 | |
1758 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input); | |
1759 } | |
1760 | |
1761 | |
1762 static ngx_int_t | |
1763 ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame) | |
1764 { | |
1765 ngx_buf_t *b; | |
1766 ngx_event_t *rev; | |
1767 ngx_quic_stream_t *sn; | |
1768 ngx_quic_connection_t *qc; | |
1769 ngx_quic_stream_frame_t *f; | |
1770 | |
1771 qc = c->quic; | |
1772 | |
1773 f = &frame->u.stream; | |
1774 | |
1775 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); | |
1776 | |
1777 if (sn == NULL) { | |
1778 // TODO: possible? | |
1779 // deleted while stream is in reordering queue? | |
1780 return NGX_ERROR; | |
1781 } | |
1782 | |
1783 if (sn->fs.received != 0) { | |
1738 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); | 1784 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); |
1739 b = sn->b; | 1785 b = sn->b; |
1740 | 1786 |
1741 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) { | 1787 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) { |
1742 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); | 1788 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); |
1759 | 1805 |
1760 if (rev->active) { | 1806 if (rev->active) { |
1761 rev->handler(rev); | 1807 rev->handler(rev); |
1762 } | 1808 } |
1763 | 1809 |
1810 /* check if stream was destroyed */ | |
1811 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) { | |
1812 return NGX_DONE; | |
1813 } | |
1814 | |
1764 return NGX_OK; | 1815 return NGX_OK; |
1765 } | 1816 } |
1817 | |
1818 b = sn->b; | |
1819 b->last = ngx_cpymem(b->last, f->data, f->length); | |
1820 | |
1821 rev = sn->c->read; | |
1822 rev->ready = 1; | |
1823 | |
1824 if (f->fin) { | |
1825 rev->pending_eof = 1; | |
1826 } | |
1827 | |
1828 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) { | |
1829 ngx_quic_handle_max_streams(c); | |
1830 } | |
1831 | |
1832 qc->streams.handler(sn->c); | |
1833 | |
1834 /* check if stream was destroyed */ | |
1835 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) { | |
1836 return NGX_DONE; | |
1837 } | |
1838 | |
1839 return NGX_OK; | |
1840 } | |
1841 | |
1842 | |
1843 static ngx_quic_stream_t * | |
1844 ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f) | |
1845 { | |
1846 size_t n; | |
1847 ngx_quic_stream_t *sn; | |
1848 ngx_quic_connection_t *qc; | |
1849 | |
1850 qc = c->quic; | |
1851 | |
1852 // TODO: check increasing IDs | |
1766 | 1853 |
1767 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); | 1854 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); |
1768 | 1855 |
1769 n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) | 1856 n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) |
1770 ? qc->tp.initial_max_stream_data_uni | 1857 ? qc->tp.initial_max_stream_data_uni |
1774 n = NGX_QUIC_STREAM_BUFSIZE; | 1861 n = NGX_QUIC_STREAM_BUFSIZE; |
1775 } | 1862 } |
1776 | 1863 |
1777 if (n < f->length) { | 1864 if (n < f->length) { |
1778 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); | 1865 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); |
1779 return NGX_ERROR; | 1866 return NULL; |
1780 } | 1867 } |
1781 | 1868 |
1782 sn = ngx_quic_create_stream(c, f->stream_id, n); | 1869 sn = ngx_quic_create_stream(c, f->stream_id, n); |
1783 if (sn == NULL) { | 1870 if (sn == NULL) { |
1784 return NGX_ERROR; | 1871 return NULL; |
1785 } | 1872 } |
1786 | 1873 |
1787 b = sn->b; | 1874 return sn; |
1788 b->last = ngx_cpymem(b->last, f->data, f->length); | |
1789 | |
1790 rev = sn->c->read; | |
1791 rev->ready = 1; | |
1792 | |
1793 if (f->fin) { | |
1794 rev->pending_eof = 1; | |
1795 } | |
1796 | |
1797 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) { | |
1798 ngx_quic_handle_max_streams(c); | |
1799 } | |
1800 | |
1801 qc->streams.handler(sn->c); | |
1802 | |
1803 return NGX_OK; | |
1804 } | 1875 } |
1805 | 1876 |
1806 | 1877 |
1807 static ngx_int_t | 1878 static ngx_int_t |
1808 ngx_quic_handle_max_streams(ngx_connection_t *c) | 1879 ngx_quic_handle_max_streams(ngx_connection_t *c) |
2022 | 2093 |
2023 } else { | 2094 } else { |
2024 return NGX_ERROR; | 2095 return NGX_ERROR; |
2025 } | 2096 } |
2026 | 2097 |
2027 | |
2028 } while (q != ngx_queue_sentinel(&ctx->frames)); | 2098 } while (q != ngx_queue_sentinel(&ctx->frames)); |
2029 | 2099 |
2030 return NGX_OK; | 2100 return NGX_OK; |
2031 } | 2101 } |
2032 | 2102 |
2035 ngx_quic_free_frames(ngx_connection_t *c, ngx_queue_t *frames) | 2105 ngx_quic_free_frames(ngx_connection_t *c, ngx_queue_t *frames) |
2036 { | 2106 { |
2037 ngx_queue_t *q; | 2107 ngx_queue_t *q; |
2038 ngx_quic_frame_t *f; | 2108 ngx_quic_frame_t *f; |
2039 | 2109 |
2040 q = ngx_queue_head(frames); | |
2041 | |
2042 do { | 2110 do { |
2111 q = ngx_queue_head(frames); | |
2112 | |
2113 if (q == ngx_queue_sentinel(frames)) { | |
2114 break; | |
2115 } | |
2116 | |
2117 ngx_queue_remove(q); | |
2118 | |
2043 f = ngx_queue_data(q, ngx_quic_frame_t, queue); | 2119 f = ngx_queue_data(q, ngx_quic_frame_t, queue); |
2044 q = ngx_queue_next(q); | |
2045 | 2120 |
2046 ngx_quic_free_frame(c, f); | 2121 ngx_quic_free_frame(c, f); |
2047 | 2122 } while (1); |
2048 } while (q != ngx_queue_sentinel(frames)); | |
2049 } | 2123 } |
2050 | 2124 |
2051 | 2125 |
2052 /* pack a group of frames [start; end) into memory p and send as single packet */ | 2126 /* pack a group of frames [start; end) into memory p and send as single packet */ |
2053 static ngx_int_t | 2127 static ngx_int_t |
2235 | 2309 |
2236 | 2310 |
2237 static void | 2311 static void |
2238 ngx_quic_push_handler(ngx_event_t *ev) | 2312 ngx_quic_push_handler(ngx_event_t *ev) |
2239 { | 2313 { |
2240 ngx_connection_t *c; | 2314 ngx_connection_t *c; |
2241 | 2315 |
2242 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer"); | 2316 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer"); |
2243 | 2317 |
2244 c = ev->data; | 2318 c = ev->data; |
2245 | 2319 |
2427 sn->b = ngx_create_temp_buf(pool, rcvbuf_size); | 2501 sn->b = ngx_create_temp_buf(pool, rcvbuf_size); |
2428 if (sn->b == NULL) { | 2502 if (sn->b == NULL) { |
2429 ngx_destroy_pool(pool); | 2503 ngx_destroy_pool(pool); |
2430 return NULL; | 2504 return NULL; |
2431 } | 2505 } |
2506 | |
2507 ngx_queue_init(&sn->fs.frames); | |
2432 | 2508 |
2433 log = ngx_palloc(pool, sizeof(ngx_log_t)); | 2509 log = ngx_palloc(pool, sizeof(ngx_log_t)); |
2434 if (log == NULL) { | 2510 if (log == NULL) { |
2435 ngx_destroy_pool(pool); | 2511 ngx_destroy_pool(pool); |
2436 return NULL; | 2512 return NULL; |
2593 if (qc->closing) { | 2669 if (qc->closing) { |
2594 ngx_post_event(pc->read, &ngx_posted_events); | 2670 ngx_post_event(pc->read, &ngx_posted_events); |
2595 return; | 2671 return; |
2596 } | 2672 } |
2597 | 2673 |
2674 ngx_quic_free_frames(pc, &qs->fs.frames); | |
2675 | |
2598 if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) { | 2676 if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) { |
2599 /* do not send fin for client unidirectional streams */ | 2677 /* do not send fin for client unidirectional streams */ |
2600 return; | 2678 return; |
2601 } | 2679 } |
2602 | 2680 |