comparison src/event/ngx_event_quic.c @ 7811:72d20158c814 quic

Added reordering support for STREAM frames. Each stream node now includes incoming frames queue and sent/received counters for tracking offset. The sent counter is not used, c->sent is used, not like in crypto buffers, which have no connections.
author Vladimir Homutov <vl@nginx.com>
date Wed, 15 Apr 2020 11:11:54 +0300
parents 167d32476737
children 76839f55bc48
comparison
equal deleted inserted replaced
7810:167d32476737 7811:72d20158c814
66 uint64_t largest_ack; /* number received from peer */ 66 uint64_t largest_ack; /* number received from peer */
67 67
68 ngx_queue_t frames; 68 ngx_queue_t frames;
69 ngx_queue_t sent; 69 ngx_queue_t sent;
70 } ngx_quic_send_ctx_t; 70 } ngx_quic_send_ctx_t;
71
72
73 /* ordered frames stream context */
74 typedef struct {
75 uint64_t sent;
76 uint64_t received;
77 ngx_queue_t frames;
78 size_t total; /* size of buffered data */
79 } ngx_quic_frames_stream_t;
80 71
81 72
82 struct ngx_quic_connection_s { 73 struct ngx_quic_connection_s {
83 ngx_str_t scid; 74 ngx_str_t scid;
84 ngx_str_t dcid; 75 ngx_str_t dcid;
175 ngx_quic_frame_handler_pt handler); 166 ngx_quic_frame_handler_pt handler);
176 167
177 static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c, 168 static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c,
178 ngx_quic_frame_t *frame); 169 ngx_quic_frame_t *frame);
179 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, 170 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
180 ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame); 171 ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
172 static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
173 ngx_quic_frame_t *frame);
174 static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c,
175 ngx_quic_stream_frame_t *f);
176
181 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c); 177 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
182 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, 178 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
183 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); 179 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
184 static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, 180 static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
185 ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f); 181 ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f);
737 ngx_quic_close_connection(ngx_connection_t *c) 733 ngx_quic_close_connection(ngx_connection_t *c)
738 { 734 {
739 #if (NGX_DEBUG) 735 #if (NGX_DEBUG)
740 ngx_uint_t ns; 736 ngx_uint_t ns;
741 #endif 737 #endif
738 ngx_uint_t i;
742 ngx_pool_t *pool; 739 ngx_pool_t *pool;
743 ngx_event_t *rev; 740 ngx_event_t *rev;
744 ngx_rbtree_t *tree; 741 ngx_rbtree_t *tree;
745 ngx_rbtree_node_t *node; 742 ngx_rbtree_node_t *node;
746 ngx_quic_stream_t *qs; 743 ngx_quic_stream_t *qs;
747 ngx_quic_connection_t *qc; 744 ngx_quic_connection_t *qc;
748 745
749 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection"); 746 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection");
750 747
751 // TODO: free frames from reorder queue if any
752
753 qc = c->quic; 748 qc = c->quic;
754 749
755 if (qc) { 750 if (qc) {
751
752 for (i = 0; i < NGX_QUIC_ENCRYPTION_LAST; i++) {
753 ngx_quic_free_frames(c, &qc->crypto[i].frames);
754 }
755
756 qc->closing = 1; 756 qc->closing = 1;
757 tree = &qc->streams.tree; 757 tree = &qc->streams.tree;
758 758
759 if (tree->root != tree->sentinel) { 759 if (tree->root != tree->sentinel) {
760 if (c->read->timer_set) { 760 if (c->read->timer_set) {
1199 case NGX_QUIC_FT_STREAM4: 1199 case NGX_QUIC_FT_STREAM4:
1200 case NGX_QUIC_FT_STREAM5: 1200 case NGX_QUIC_FT_STREAM5:
1201 case NGX_QUIC_FT_STREAM6: 1201 case NGX_QUIC_FT_STREAM6:
1202 case NGX_QUIC_FT_STREAM7: 1202 case NGX_QUIC_FT_STREAM7:
1203 1203
1204 if (ngx_quic_handle_stream_frame(c, pkt, &frame.u.stream) 1204 if (ngx_quic_handle_stream_frame(c, pkt, &frame) != NGX_OK) {
1205 != NGX_OK)
1206 {
1207 return NGX_ERROR; 1205 return NGX_ERROR;
1208 } 1206 }
1209 1207
1210 ack_this = 1; 1208 ack_this = 1;
1211 break; 1209 break;
1439 static ngx_int_t 1437 static ngx_int_t
1440 ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs, 1438 ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
1441 ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler) 1439 ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler)
1442 { 1440 {
1443 size_t full_len; 1441 size_t full_len;
1442 ngx_int_t rc;
1444 ngx_queue_t *q; 1443 ngx_queue_t *q;
1445 ngx_quic_ordered_frame_t *f; 1444 ngx_quic_ordered_frame_t *f;
1446 1445
1447 f = &frame->u.ord; 1446 f = &frame->u.ord;
1448 1447
1466 /* intersecting data range, frame modified */ 1465 /* intersecting data range, frame modified */
1467 } 1466 }
1468 1467
1469 /* f->offset == fs->received */ 1468 /* f->offset == fs->received */
1470 1469
1471 if (handler(c, frame) != NGX_OK) { 1470 rc = handler(c, frame);
1472 return NGX_ERROR; 1471 if (rc == NGX_ERROR) {
1473 } 1472 return NGX_ERROR;
1473
1474 } else if (rc == NGX_DONE) {
1475 /* handler destroyed stream, queue no longer exists */
1476 return NGX_OK;
1477 }
1478
1479 /* rc == NGX_OK */
1474 1480
1475 fs->received += f->length; 1481 fs->received += f->length;
1476 1482
1477 /* now check the queue if we can continue with buffered frames */ 1483 /* now check the queue if we can continue with buffered frames */
1478 1484
1510 /* frame was adjusted, proceed to input */ 1516 /* frame was adjusted, proceed to input */
1511 } 1517 }
1512 1518
1513 /* f->offset == fs->received */ 1519 /* f->offset == fs->received */
1514 1520
1515 if (handler(c, frame) != NGX_OK) { 1521 rc = handler(c, frame);
1522
1523 if (rc == NGX_ERROR) {
1516 return NGX_ERROR; 1524 return NGX_ERROR;
1525
1526 } else if (rc == NGX_DONE) {
1527 /* handler destroyed stream, queue no longer exists */
1528 return NGX_OK;
1517 } 1529 }
1518 1530
1519 fs->received += f->length; 1531 fs->received += f->length;
1520 fs->total -= full_len; 1532 fs->total -= full_len;
1521 1533
1719 return NGX_OK; 1731 return NGX_OK;
1720 } 1732 }
1721 1733
1722 1734
1723 static ngx_int_t 1735 static ngx_int_t
1724 ngx_quic_handle_stream_frame(ngx_connection_t *c, 1736 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
1725 ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f) 1737 ngx_quic_frame_t *frame)
1726 { 1738 {
1727 size_t n; 1739 ngx_quic_stream_t *sn;
1728 ngx_buf_t *b; 1740 ngx_quic_connection_t *qc;
1729 ngx_event_t *rev; 1741 ngx_quic_stream_frame_t *f;
1730 ngx_quic_stream_t *sn; 1742 ngx_quic_frames_stream_t *fs;
1731 ngx_quic_connection_t *qc;
1732 1743
1733 qc = c->quic; 1744 qc = c->quic;
1745 f = &frame->u.stream;
1734 1746
1735 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); 1747 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
1736 1748
1737 if (sn) { 1749 if (sn == NULL) {
1750 sn = ngx_quic_add_stream(c, f);
1751 if (sn == NULL) {
1752 return NGX_ERROR;
1753 }
1754 }
1755
1756 fs = &sn->fs;
1757
1758 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input);
1759 }
1760
1761
1762 static ngx_int_t
1763 ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
1764 {
1765 ngx_buf_t *b;
1766 ngx_event_t *rev;
1767 ngx_quic_stream_t *sn;
1768 ngx_quic_connection_t *qc;
1769 ngx_quic_stream_frame_t *f;
1770
1771 qc = c->quic;
1772
1773 f = &frame->u.stream;
1774
1775 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
1776
1777 if (sn == NULL) {
1778 // TODO: possible?
1779 // deleted while stream is in reordering queue?
1780 return NGX_ERROR;
1781 }
1782
1783 if (sn->fs.received != 0) {
1738 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); 1784 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
1739 b = sn->b; 1785 b = sn->b;
1740 1786
1741 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) { 1787 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
1742 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); 1788 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
1759 1805
1760 if (rev->active) { 1806 if (rev->active) {
1761 rev->handler(rev); 1807 rev->handler(rev);
1762 } 1808 }
1763 1809
1810 /* check if stream was destroyed */
1811 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
1812 return NGX_DONE;
1813 }
1814
1764 return NGX_OK; 1815 return NGX_OK;
1765 } 1816 }
1817
1818 b = sn->b;
1819 b->last = ngx_cpymem(b->last, f->data, f->length);
1820
1821 rev = sn->c->read;
1822 rev->ready = 1;
1823
1824 if (f->fin) {
1825 rev->pending_eof = 1;
1826 }
1827
1828 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
1829 ngx_quic_handle_max_streams(c);
1830 }
1831
1832 qc->streams.handler(sn->c);
1833
1834 /* check if stream was destroyed */
1835 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
1836 return NGX_DONE;
1837 }
1838
1839 return NGX_OK;
1840 }
1841
1842
1843 static ngx_quic_stream_t *
1844 ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f)
1845 {
1846 size_t n;
1847 ngx_quic_stream_t *sn;
1848 ngx_quic_connection_t *qc;
1849
1850 qc = c->quic;
1851
1852 // TODO: check increasing IDs
1766 1853
1767 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); 1854 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
1768 1855
1769 n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) 1856 n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
1770 ? qc->tp.initial_max_stream_data_uni 1857 ? qc->tp.initial_max_stream_data_uni
1774 n = NGX_QUIC_STREAM_BUFSIZE; 1861 n = NGX_QUIC_STREAM_BUFSIZE;
1775 } 1862 }
1776 1863
1777 if (n < f->length) { 1864 if (n < f->length) {
1778 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); 1865 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
1779 return NGX_ERROR; 1866 return NULL;
1780 } 1867 }
1781 1868
1782 sn = ngx_quic_create_stream(c, f->stream_id, n); 1869 sn = ngx_quic_create_stream(c, f->stream_id, n);
1783 if (sn == NULL) { 1870 if (sn == NULL) {
1784 return NGX_ERROR; 1871 return NULL;
1785 } 1872 }
1786 1873
1787 b = sn->b; 1874 return sn;
1788 b->last = ngx_cpymem(b->last, f->data, f->length);
1789
1790 rev = sn->c->read;
1791 rev->ready = 1;
1792
1793 if (f->fin) {
1794 rev->pending_eof = 1;
1795 }
1796
1797 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
1798 ngx_quic_handle_max_streams(c);
1799 }
1800
1801 qc->streams.handler(sn->c);
1802
1803 return NGX_OK;
1804 } 1875 }
1805 1876
1806 1877
1807 static ngx_int_t 1878 static ngx_int_t
1808 ngx_quic_handle_max_streams(ngx_connection_t *c) 1879 ngx_quic_handle_max_streams(ngx_connection_t *c)
2022 2093
2023 } else { 2094 } else {
2024 return NGX_ERROR; 2095 return NGX_ERROR;
2025 } 2096 }
2026 2097
2027
2028 } while (q != ngx_queue_sentinel(&ctx->frames)); 2098 } while (q != ngx_queue_sentinel(&ctx->frames));
2029 2099
2030 return NGX_OK; 2100 return NGX_OK;
2031 } 2101 }
2032 2102
2035 ngx_quic_free_frames(ngx_connection_t *c, ngx_queue_t *frames) 2105 ngx_quic_free_frames(ngx_connection_t *c, ngx_queue_t *frames)
2036 { 2106 {
2037 ngx_queue_t *q; 2107 ngx_queue_t *q;
2038 ngx_quic_frame_t *f; 2108 ngx_quic_frame_t *f;
2039 2109
2040 q = ngx_queue_head(frames);
2041
2042 do { 2110 do {
2111 q = ngx_queue_head(frames);
2112
2113 if (q == ngx_queue_sentinel(frames)) {
2114 break;
2115 }
2116
2117 ngx_queue_remove(q);
2118
2043 f = ngx_queue_data(q, ngx_quic_frame_t, queue); 2119 f = ngx_queue_data(q, ngx_quic_frame_t, queue);
2044 q = ngx_queue_next(q);
2045 2120
2046 ngx_quic_free_frame(c, f); 2121 ngx_quic_free_frame(c, f);
2047 2122 } while (1);
2048 } while (q != ngx_queue_sentinel(frames));
2049 } 2123 }
2050 2124
2051 2125
2052 /* pack a group of frames [start; end) into memory p and send as single packet */ 2126 /* pack a group of frames [start; end) into memory p and send as single packet */
2053 static ngx_int_t 2127 static ngx_int_t
2235 2309
2236 2310
2237 static void 2311 static void
2238 ngx_quic_push_handler(ngx_event_t *ev) 2312 ngx_quic_push_handler(ngx_event_t *ev)
2239 { 2313 {
2240 ngx_connection_t *c; 2314 ngx_connection_t *c;
2241 2315
2242 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer"); 2316 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer");
2243 2317
2244 c = ev->data; 2318 c = ev->data;
2245 2319
2427 sn->b = ngx_create_temp_buf(pool, rcvbuf_size); 2501 sn->b = ngx_create_temp_buf(pool, rcvbuf_size);
2428 if (sn->b == NULL) { 2502 if (sn->b == NULL) {
2429 ngx_destroy_pool(pool); 2503 ngx_destroy_pool(pool);
2430 return NULL; 2504 return NULL;
2431 } 2505 }
2506
2507 ngx_queue_init(&sn->fs.frames);
2432 2508
2433 log = ngx_palloc(pool, sizeof(ngx_log_t)); 2509 log = ngx_palloc(pool, sizeof(ngx_log_t));
2434 if (log == NULL) { 2510 if (log == NULL) {
2435 ngx_destroy_pool(pool); 2511 ngx_destroy_pool(pool);
2436 return NULL; 2512 return NULL;
2593 if (qc->closing) { 2669 if (qc->closing) {
2594 ngx_post_event(pc->read, &ngx_posted_events); 2670 ngx_post_event(pc->read, &ngx_posted_events);
2595 return; 2671 return;
2596 } 2672 }
2597 2673
2674 ngx_quic_free_frames(pc, &qs->fs.frames);
2675
2598 if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) { 2676 if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) {
2599 /* do not send fin for client unidirectional streams */ 2677 /* do not send fin for client unidirectional streams */
2600 return; 2678 return;
2601 } 2679 }
2602 2680