changeset 8337:ab443e80d9e4 quic

Create new stream immediately on receiving new stream id. Before the patch, full STREAM frame handling was delayed until the frame with zero offset is received. Only node in the streams tree was created. This lead to problems when such stream was deleted, in particular, it had no handlers set for read events. This patch creates new stream immediately, but delays data delivery until the proper offset will arrive. This is somewhat similar to how accept() operation works. The ngx_quic_add_stream() function is no longer needed and merged into stream handler. The ngx_quic_stream_input() now only handles frames for existing streams and does not deal with stream creation.
author Vladimir Homutov <vl@nginx.com>
date Wed, 15 Apr 2020 14:29:00 +0300
parents 739f018225af
children 0f9e9786b90d
files src/event/ngx_event_quic.c
diffstat 1 files changed, 76 insertions(+), 79 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -172,8 +172,6 @@ static ngx_int_t ngx_quic_handle_stream_
     ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
 static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
     ngx_quic_frame_t *frame);
-static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c,
-    ngx_quic_stream_frame_t *f);
 
 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
@@ -1742,6 +1740,9 @@ static ngx_int_t
 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
     ngx_quic_frame_t *frame)
 {
+    size_t                     n;
+    ngx_buf_t                *b;
+    ngx_event_t              *rev;
     ngx_quic_stream_t         *sn;
     ngx_quic_connection_t     *qc;
     ngx_quic_stream_frame_t   *f;
@@ -1753,10 +1754,66 @@ ngx_quic_handle_stream_frame(ngx_connect
     sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
 
     if (sn == NULL) {
-        sn = ngx_quic_add_stream(c, f);
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
+
+        n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
+            ? qc->tp.initial_max_stream_data_uni
+            : qc->tp.initial_max_stream_data_bidi_remote;
+
+        if (n < NGX_QUIC_STREAM_BUFSIZE) {
+            n = NGX_QUIC_STREAM_BUFSIZE;
+        }
+
+        if (n < f->length) {
+            ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
+            return NGX_ERROR;
+        }
+
+        /*
+         *   TODO: check IDs are increasing ? create all lower-numbered?
+         *
+         *   2.1.  Stream Types and Identifiers
+         *
+         *   Within each type, streams are created with numerically increasing
+         *   stream IDs.  A stream ID that is used out of order results in all
+         *   streams of that type with lower-numbered stream IDs also being
+         *   opened.
+         */
+        sn = ngx_quic_create_stream(c, f->stream_id, n);
         if (sn == NULL) {
             return NGX_ERROR;
         }
+
+        rev = sn->c->read;
+
+        if (f->offset == 0) {
+
+            b = sn->b;
+            b->last = ngx_cpymem(b->last, f->data, f->length);
+
+            sn->fs.received += f->length;
+
+            rev->ready = 1;
+
+            if (f->fin) {
+                rev->pending_eof = 1;
+            }
+
+        } else {
+            rev->ready = 0;
+        }
+
+        if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
+            ngx_quic_handle_max_streams(c);
+        }
+
+        qc->streams.handler(sn->c);
+
+        if (f->offset == 0) {
+            return NGX_OK;
+        }
+
+        /* out-of-order stream: proceed to buffering */
     }
 
     fs = &sn->fs;
@@ -1779,49 +1836,26 @@ ngx_quic_stream_input(ngx_connection_t *
     f = &frame->u.stream;
 
     sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
-
     if (sn == NULL) {
         // TODO: possible?
-        // deleted while stream is in reordering queue?
+        // stream was deleted while in reordering queue ?
         return NGX_ERROR;
     }
 
-    if (sn->fs.received != 0) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
-        b = sn->b;
-
-        if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
-            ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
-            return NGX_ERROR;
-        }
-
-        if ((size_t) (b->end - b->last) < f->length) {
-            b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
-            b->pos = b->start;
-        }
-
-        b->last = ngx_cpymem(b->last, f->data, f->length);
-
-        rev = sn->c->read;
-        rev->ready = 1;
-
-        if (f->fin) {
-            rev->pending_eof = 1;
-        }
-
-        if (rev->active) {
-            rev->handler(rev);
-        }
-
-        /* check if stream was destroyed */
-        if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
-            return NGX_DONE;
-        }
-
-        return NGX_OK;
-    }
+    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
 
     b = sn->b;
+
+    if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
+        ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
+        return NGX_ERROR;
+    }
+
+    if ((size_t) (b->end - b->last) < f->length) {
+        b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
+        b->pos = b->start;
+    }
+
     b->last = ngx_cpymem(b->last, f->data, f->length);
 
     rev = sn->c->read;
@@ -1831,13 +1865,11 @@ ngx_quic_stream_input(ngx_connection_t *
         rev->pending_eof = 1;
     }
 
-    if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
-        ngx_quic_handle_max_streams(c);
+    if (rev->active) {
+        rev->handler(rev);
     }
 
-    qc->streams.handler(sn->c);
-
-    /* check if stream was destroyed */
+    /* check if stream was destroyed by handler */
     if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
         return NGX_DONE;
     }
@@ -1846,41 +1878,6 @@ ngx_quic_stream_input(ngx_connection_t *
 }
 
 
-static ngx_quic_stream_t *
-ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f)
-{
-    size_t                  n;
-    ngx_quic_stream_t      *sn;
-    ngx_quic_connection_t  *qc;
-
-    qc = c->quic;
-
-    // TODO: check increasing IDs
-
-    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
-
-    n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
-        ? qc->tp.initial_max_stream_data_uni
-        : qc->tp.initial_max_stream_data_bidi_remote;
-
-    if (n < NGX_QUIC_STREAM_BUFSIZE) {
-        n = NGX_QUIC_STREAM_BUFSIZE;
-    }
-
-    if (n < f->length) {
-        ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
-        return NULL;
-    }
-
-    sn = ngx_quic_create_stream(c, f->stream_id, n);
-    if (sn == NULL) {
-        return NULL;
-    }
-
-    return sn;
-}
-
-
 static ngx_int_t
 ngx_quic_handle_max_streams(ngx_connection_t *c)
 {