Mercurial > hg > nginx-quic
comparison src/event/ngx_event_quic.c @ 7814:ab443e80d9e4 quic
Create new stream immediately on receiving new stream id.
Before the patch, full STREAM frame handling was delayed until the frame with
zero offset is received. Only node in the streams tree was created.
This lead to problems when such stream was deleted, in particular, it had no
handlers set for read events.
This patch creates new stream immediately, but delays data delivery until
the proper offset will arrive. This is somewhat similar to how accept()
operation works.
The ngx_quic_add_stream() function is no longer needed and merged into stream
handler. The ngx_quic_stream_input() now only handles frames for existing
streams and does not deal with stream creation.
author | Vladimir Homutov <vl@nginx.com> |
---|---|
date | Wed, 15 Apr 2020 14:29:00 +0300 |
parents | 739f018225af |
children | 0f9e9786b90d |
comparison
equal
deleted
inserted
replaced
7813:739f018225af | 7814:ab443e80d9e4 |
---|---|
170 ngx_quic_frame_t *frame); | 170 ngx_quic_frame_t *frame); |
171 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, | 171 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, |
172 ngx_quic_header_t *pkt, ngx_quic_frame_t *frame); | 172 ngx_quic_header_t *pkt, ngx_quic_frame_t *frame); |
173 static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c, | 173 static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c, |
174 ngx_quic_frame_t *frame); | 174 ngx_quic_frame_t *frame); |
175 static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c, | |
176 ngx_quic_stream_frame_t *f); | |
177 | 175 |
178 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c); | 176 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c); |
179 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, | 177 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, |
180 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); | 178 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); |
181 static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, | 179 static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, |
1740 | 1738 |
1741 static ngx_int_t | 1739 static ngx_int_t |
1742 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, | 1740 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, |
1743 ngx_quic_frame_t *frame) | 1741 ngx_quic_frame_t *frame) |
1744 { | 1742 { |
1743 size_t n; | |
1744 ngx_buf_t *b; | |
1745 ngx_event_t *rev; | |
1745 ngx_quic_stream_t *sn; | 1746 ngx_quic_stream_t *sn; |
1746 ngx_quic_connection_t *qc; | 1747 ngx_quic_connection_t *qc; |
1747 ngx_quic_stream_frame_t *f; | 1748 ngx_quic_stream_frame_t *f; |
1748 ngx_quic_frames_stream_t *fs; | 1749 ngx_quic_frames_stream_t *fs; |
1749 | 1750 |
1751 f = &frame->u.stream; | 1752 f = &frame->u.stream; |
1752 | 1753 |
1753 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); | 1754 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); |
1754 | 1755 |
1755 if (sn == NULL) { | 1756 if (sn == NULL) { |
1756 sn = ngx_quic_add_stream(c, f); | 1757 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); |
1758 | |
1759 n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) | |
1760 ? qc->tp.initial_max_stream_data_uni | |
1761 : qc->tp.initial_max_stream_data_bidi_remote; | |
1762 | |
1763 if (n < NGX_QUIC_STREAM_BUFSIZE) { | |
1764 n = NGX_QUIC_STREAM_BUFSIZE; | |
1765 } | |
1766 | |
1767 if (n < f->length) { | |
1768 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); | |
1769 return NGX_ERROR; | |
1770 } | |
1771 | |
1772 /* | |
1773 * TODO: check IDs are increasing ? create all lower-numbered? | |
1774 * | |
1775 * 2.1. Stream Types and Identifiers | |
1776 * | |
1777 * Within each type, streams are created with numerically increasing | |
1778 * stream IDs. A stream ID that is used out of order results in all | |
1779 * streams of that type with lower-numbered stream IDs also being | |
1780 * opened. | |
1781 */ | |
1782 sn = ngx_quic_create_stream(c, f->stream_id, n); | |
1757 if (sn == NULL) { | 1783 if (sn == NULL) { |
1758 return NGX_ERROR; | 1784 return NGX_ERROR; |
1759 } | 1785 } |
1786 | |
1787 rev = sn->c->read; | |
1788 | |
1789 if (f->offset == 0) { | |
1790 | |
1791 b = sn->b; | |
1792 b->last = ngx_cpymem(b->last, f->data, f->length); | |
1793 | |
1794 sn->fs.received += f->length; | |
1795 | |
1796 rev->ready = 1; | |
1797 | |
1798 if (f->fin) { | |
1799 rev->pending_eof = 1; | |
1800 } | |
1801 | |
1802 } else { | |
1803 rev->ready = 0; | |
1804 } | |
1805 | |
1806 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) { | |
1807 ngx_quic_handle_max_streams(c); | |
1808 } | |
1809 | |
1810 qc->streams.handler(sn->c); | |
1811 | |
1812 if (f->offset == 0) { | |
1813 return NGX_OK; | |
1814 } | |
1815 | |
1816 /* out-of-order stream: proceed to buffering */ | |
1760 } | 1817 } |
1761 | 1818 |
1762 fs = &sn->fs; | 1819 fs = &sn->fs; |
1763 | 1820 |
1764 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input); | 1821 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input); |
1777 qc = c->quic; | 1834 qc = c->quic; |
1778 | 1835 |
1779 f = &frame->u.stream; | 1836 f = &frame->u.stream; |
1780 | 1837 |
1781 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); | 1838 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); |
1782 | |
1783 if (sn == NULL) { | 1839 if (sn == NULL) { |
1784 // TODO: possible? | 1840 // TODO: possible? |
1785 // deleted while stream is in reordering queue? | 1841 // stream was deleted while in reordering queue ? |
1786 return NGX_ERROR; | 1842 return NGX_ERROR; |
1787 } | 1843 } |
1788 | 1844 |
1789 if (sn->fs.received != 0) { | 1845 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); |
1790 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); | |
1791 b = sn->b; | |
1792 | |
1793 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) { | |
1794 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); | |
1795 return NGX_ERROR; | |
1796 } | |
1797 | |
1798 if ((size_t) (b->end - b->last) < f->length) { | |
1799 b->last = ngx_movemem(b->start, b->pos, b->last - b->pos); | |
1800 b->pos = b->start; | |
1801 } | |
1802 | |
1803 b->last = ngx_cpymem(b->last, f->data, f->length); | |
1804 | |
1805 rev = sn->c->read; | |
1806 rev->ready = 1; | |
1807 | |
1808 if (f->fin) { | |
1809 rev->pending_eof = 1; | |
1810 } | |
1811 | |
1812 if (rev->active) { | |
1813 rev->handler(rev); | |
1814 } | |
1815 | |
1816 /* check if stream was destroyed */ | |
1817 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) { | |
1818 return NGX_DONE; | |
1819 } | |
1820 | |
1821 return NGX_OK; | |
1822 } | |
1823 | 1846 |
1824 b = sn->b; | 1847 b = sn->b; |
1848 | |
1849 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) { | |
1850 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); | |
1851 return NGX_ERROR; | |
1852 } | |
1853 | |
1854 if ((size_t) (b->end - b->last) < f->length) { | |
1855 b->last = ngx_movemem(b->start, b->pos, b->last - b->pos); | |
1856 b->pos = b->start; | |
1857 } | |
1858 | |
1825 b->last = ngx_cpymem(b->last, f->data, f->length); | 1859 b->last = ngx_cpymem(b->last, f->data, f->length); |
1826 | 1860 |
1827 rev = sn->c->read; | 1861 rev = sn->c->read; |
1828 rev->ready = 1; | 1862 rev->ready = 1; |
1829 | 1863 |
1830 if (f->fin) { | 1864 if (f->fin) { |
1831 rev->pending_eof = 1; | 1865 rev->pending_eof = 1; |
1832 } | 1866 } |
1833 | 1867 |
1834 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) { | 1868 if (rev->active) { |
1835 ngx_quic_handle_max_streams(c); | 1869 rev->handler(rev); |
1836 } | 1870 } |
1837 | 1871 |
1838 qc->streams.handler(sn->c); | 1872 /* check if stream was destroyed by handler */ |
1839 | |
1840 /* check if stream was destroyed */ | |
1841 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) { | 1873 if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) { |
1842 return NGX_DONE; | 1874 return NGX_DONE; |
1843 } | 1875 } |
1844 | 1876 |
1845 return NGX_OK; | 1877 return NGX_OK; |
1846 } | |
1847 | |
1848 | |
1849 static ngx_quic_stream_t * | |
1850 ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f) | |
1851 { | |
1852 size_t n; | |
1853 ngx_quic_stream_t *sn; | |
1854 ngx_quic_connection_t *qc; | |
1855 | |
1856 qc = c->quic; | |
1857 | |
1858 // TODO: check increasing IDs | |
1859 | |
1860 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); | |
1861 | |
1862 n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) | |
1863 ? qc->tp.initial_max_stream_data_uni | |
1864 : qc->tp.initial_max_stream_data_bidi_remote; | |
1865 | |
1866 if (n < NGX_QUIC_STREAM_BUFSIZE) { | |
1867 n = NGX_QUIC_STREAM_BUFSIZE; | |
1868 } | |
1869 | |
1870 if (n < f->length) { | |
1871 ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); | |
1872 return NULL; | |
1873 } | |
1874 | |
1875 sn = ngx_quic_create_stream(c, f->stream_id, n); | |
1876 if (sn == NULL) { | |
1877 return NULL; | |
1878 } | |
1879 | |
1880 return sn; | |
1881 } | 1878 } |
1882 | 1879 |
1883 | 1880 |
1884 static ngx_int_t | 1881 static ngx_int_t |
1885 ngx_quic_handle_max_streams(ngx_connection_t *c) | 1882 ngx_quic_handle_max_streams(ngx_connection_t *c) |