comparison src/event/ngx_event_quic.c @ 8337:ab443e80d9e4 quic

Create new stream immediately on receiving new stream id. Before the patch, full STREAM frame handling was delayed until the frame with zero offset is received. Only node in the streams tree was created. This lead to problems when such stream was deleted, in particular, it had no handlers set for read events. This patch creates new stream immediately, but delays data delivery until the proper offset will arrive. This is somewhat similar to how accept() operation works. The ngx_quic_add_stream() function is no longer needed and merged into stream handler. The ngx_quic_stream_input() now only handles frames for existing streams and does not deal with stream creation.
author Vladimir Homutov <vl@nginx.com>
date Wed, 15 Apr 2020 14:29:00 +0300
parents 739f018225af
children 0f9e9786b90d
comparison
equal deleted inserted replaced
8336:739f018225af 8337:ab443e80d9e4
170 ngx_quic_frame_t *frame); 170 ngx_quic_frame_t *frame);
171 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, 171 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
172 ngx_quic_header_t *pkt, ngx_quic_frame_t *frame); 172 ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
173 static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c, 173 static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
174 ngx_quic_frame_t *frame); 174 ngx_quic_frame_t *frame);
175 static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c,
176 ngx_quic_stream_frame_t *f);
177 175
178 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c); 176 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
179 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, 177 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
180 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); 178 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
181 static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, 179 static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
1740 1738
1741 static ngx_int_t 1739 static ngx_int_t
1742 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, 1740 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
1743 ngx_quic_frame_t *frame) 1741 ngx_quic_frame_t *frame)
1744 { 1742 {
1743 size_t n;
1744 ngx_buf_t *b;
1745 ngx_event_t *rev;
1745 ngx_quic_stream_t *sn; 1746 ngx_quic_stream_t *sn;
1746 ngx_quic_connection_t *qc; 1747 ngx_quic_connection_t *qc;
1747 ngx_quic_stream_frame_t *f; 1748 ngx_quic_stream_frame_t *f;
1748 ngx_quic_frames_stream_t *fs; 1749 ngx_quic_frames_stream_t *fs;
1749 1750
1751 f = &frame->u.stream; 1752 f = &frame->u.stream;
1752 1753
1753 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); 1754 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
1754 1755
1755 if (sn == NULL) { 1756 if (sn == NULL) {
1756 sn = ngx_quic_add_stream(c, f); 1757 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
1758
1759 n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
1760 ? qc->tp.initial_max_stream_data_uni
1761 : qc->tp.initial_max_stream_data_bidi_remote;
1762
1763 if (n < NGX_QUIC_STREAM_BUFSIZE) {
1764 n = NGX_QUIC_STREAM_BUFSIZE;
1765 }
1766
1767 if (n < f->length) {
1768 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
1769 return NGX_ERROR;
1770 }
1771
1772 /*
1773 * TODO: check IDs are increasing ? create all lower-numbered?
1774 *
1775 * 2.1. Stream Types and Identifiers
1776 *
1777 * Within each type, streams are created with numerically increasing
1778 * stream IDs. A stream ID that is used out of order results in all
1779 * streams of that type with lower-numbered stream IDs also being
1780 * opened.
1781 */
1782 sn = ngx_quic_create_stream(c, f->stream_id, n);
1757 if (sn == NULL) { 1783 if (sn == NULL) {
1758 return NGX_ERROR; 1784 return NGX_ERROR;
1759 } 1785 }
1786
1787 rev = sn->c->read;
1788
1789 if (f->offset == 0) {
1790
1791 b = sn->b;
1792 b->last = ngx_cpymem(b->last, f->data, f->length);
1793
1794 sn->fs.received += f->length;
1795
1796 rev->ready = 1;
1797
1798 if (f->fin) {
1799 rev->pending_eof = 1;
1800 }
1801
1802 } else {
1803 rev->ready = 0;
1804 }
1805
1806 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
1807 ngx_quic_handle_max_streams(c);
1808 }
1809
1810 qc->streams.handler(sn->c);
1811
1812 if (f->offset == 0) {
1813 return NGX_OK;
1814 }
1815
1816 /* out-of-order stream: proceed to buffering */
1760 } 1817 }
1761 1818
1762 fs = &sn->fs; 1819 fs = &sn->fs;
1763 1820
1764 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input); 1821 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input);
1777 qc = c->quic; 1834 qc = c->quic;
1778 1835
1779 f = &frame->u.stream; 1836 f = &frame->u.stream;
1780 1837
1781 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); 1838 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
1782
1783 if (sn == NULL) { 1839 if (sn == NULL) {
1784 // TODO: possible? 1840 // TODO: possible?
1785 // deleted while stream is in reordering queue? 1841 // stream was deleted while in reordering queue ?
1786 return NGX_ERROR; 1842 return NGX_ERROR;
1787 } 1843 }
1788 1844
1789 if (sn->fs.received != 0) { 1845 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
1790 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
1791 b = sn->b;
1792
1793 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
1794 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
1795 return NGX_ERROR;
1796 }
1797
1798 if ((size_t) (b->end - b->last) < f->length) {
1799 b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
1800 b->pos = b->start;
1801 }
1802
1803 b->last = ngx_cpymem(b->last, f->data, f->length);
1804
1805 rev = sn->c->read;
1806 rev->ready = 1;
1807
1808 if (f->fin) {
1809 rev->pending_eof = 1;
1810 }
1811
1812 if (rev->active) {
1813 rev->handler(rev);
1814 }
1815
1816 /* check if stream was destroyed */
1817 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
1818 return NGX_DONE;
1819 }
1820
1821 return NGX_OK;
1822 }
1823 1846
1824 b = sn->b; 1847 b = sn->b;
1848
1849 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
1850 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
1851 return NGX_ERROR;
1852 }
1853
1854 if ((size_t) (b->end - b->last) < f->length) {
1855 b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
1856 b->pos = b->start;
1857 }
1858
1825 b->last = ngx_cpymem(b->last, f->data, f->length); 1859 b->last = ngx_cpymem(b->last, f->data, f->length);
1826 1860
1827 rev = sn->c->read; 1861 rev = sn->c->read;
1828 rev->ready = 1; 1862 rev->ready = 1;
1829 1863
1830 if (f->fin) { 1864 if (f->fin) {
1831 rev->pending_eof = 1; 1865 rev->pending_eof = 1;
1832 } 1866 }
1833 1867
1834 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) { 1868 if (rev->active) {
1835 ngx_quic_handle_max_streams(c); 1869 rev->handler(rev);
1836 } 1870 }
1837 1871
1838 qc->streams.handler(sn->c); 1872 /* check if stream was destroyed by handler */
1839
1840 /* check if stream was destroyed */
1841 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) { 1873 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
1842 return NGX_DONE; 1874 return NGX_DONE;
1843 } 1875 }
1844 1876
1845 return NGX_OK; 1877 return NGX_OK;
1846 }
1847
1848
1849 static ngx_quic_stream_t *
1850 ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f)
1851 {
1852 size_t n;
1853 ngx_quic_stream_t *sn;
1854 ngx_quic_connection_t *qc;
1855
1856 qc = c->quic;
1857
1858 // TODO: check increasing IDs
1859
1860 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
1861
1862 n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
1863 ? qc->tp.initial_max_stream_data_uni
1864 : qc->tp.initial_max_stream_data_bidi_remote;
1865
1866 if (n < NGX_QUIC_STREAM_BUFSIZE) {
1867 n = NGX_QUIC_STREAM_BUFSIZE;
1868 }
1869
1870 if (n < f->length) {
1871 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
1872 return NULL;
1873 }
1874
1875 sn = ngx_quic_create_stream(c, f->stream_id, n);
1876 if (sn == NULL) {
1877 return NULL;
1878 }
1879
1880 return sn;
1881 } 1878 }
1882 1879
1883 1880
1884 static ngx_int_t 1881 static ngx_int_t
1885 ngx_quic_handle_max_streams(ngx_connection_t *c) 1882 ngx_quic_handle_max_streams(ngx_connection_t *c)