changeset 8334:72d20158c814 quic

Added reordering support for STREAM frames. Each stream node now includes incoming frames queue and sent/received counters for tracking offset. The sent counter is not used, c->sent is used, not like in crypto buffers, which have no connections.
author Vladimir Homutov <vl@nginx.com>
date Wed, 15 Apr 2020 11:11:54 +0300
parents 167d32476737
children 76839f55bc48
files src/event/ngx_event_quic.c src/event/ngx_event_quic.h
diffstat 2 files changed, 159 insertions(+), 72 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -70,15 +70,6 @@ typedef struct {
 } ngx_quic_send_ctx_t;
 
 
-/* ordered frames stream context */
-typedef struct {
-    uint64_t                          sent;
-    uint64_t                          received;
-    ngx_queue_t                       frames;
-    size_t                            total; /* size of buffered data */
-} ngx_quic_frames_stream_t;
-
-
 struct ngx_quic_connection_s {
     ngx_str_t                         scid;
     ngx_str_t                         dcid;
@@ -177,7 +168,12 @@ static ngx_int_t ngx_quic_handle_ordered
 static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c,
     ngx_quic_frame_t *frame);
 static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame);
+    ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
+static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
+    ngx_quic_frame_t *frame);
+static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c,
+    ngx_quic_stream_frame_t *f);
+
 static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
 static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
@@ -739,6 +735,7 @@ ngx_quic_close_connection(ngx_connection
 #if (NGX_DEBUG)
     ngx_uint_t              ns;
 #endif
+    ngx_uint_t              i;
     ngx_pool_t             *pool;
     ngx_event_t            *rev;
     ngx_rbtree_t           *tree;
@@ -748,11 +745,14 @@ ngx_quic_close_connection(ngx_connection
 
     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection");
 
-    // TODO: free frames from reorder queue if any
-
     qc = c->quic;
 
     if (qc) {
+
+        for (i = 0; i < NGX_QUIC_ENCRYPTION_LAST; i++) {
+            ngx_quic_free_frames(c, &qc->crypto[i].frames);
+        }
+
         qc->closing = 1;
         tree = &qc->streams.tree;
 
@@ -1201,9 +1201,7 @@ ngx_quic_payload_handler(ngx_connection_
         case NGX_QUIC_FT_STREAM6:
         case NGX_QUIC_FT_STREAM7:
 
-            if (ngx_quic_handle_stream_frame(c, pkt, &frame.u.stream)
-                != NGX_OK)
-            {
+            if (ngx_quic_handle_stream_frame(c, pkt, &frame) != NGX_OK) {
                 return NGX_ERROR;
             }
 
@@ -1441,6 +1439,7 @@ ngx_quic_handle_ordered_frame(ngx_connec
     ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler)
 {
     size_t                     full_len;
+    ngx_int_t                  rc;
     ngx_queue_t               *q;
     ngx_quic_ordered_frame_t  *f;
 
@@ -1468,10 +1467,17 @@ ngx_quic_handle_ordered_frame(ngx_connec
 
     /* f->offset == fs->received */
 
-    if (handler(c, frame) != NGX_OK) {
+    rc = handler(c, frame);
+    if (rc == NGX_ERROR) {
         return NGX_ERROR;
+
+    } else if (rc == NGX_DONE) {
+        /* handler destroyed stream, queue no longer exists */
+        return NGX_OK;
     }
 
+    /* rc == NGX_OK */
+
     fs->received += f->length;
 
     /* now check the queue if we can continue with buffered frames */
@@ -1512,8 +1518,14 @@ ngx_quic_handle_ordered_frame(ngx_connec
 
         /* f->offset == fs->received */
 
-        if (handler(c, frame) != NGX_OK) {
+        rc = handler(c, frame);
+
+        if (rc == NGX_ERROR) {
             return NGX_ERROR;
+
+        } else if (rc == NGX_DONE) {
+            /* handler destroyed stream, queue no longer exists */
+            return NGX_OK;
         }
 
         fs->received += f->length;
@@ -1721,20 +1733,54 @@ ngx_quic_crypto_input(ngx_connection_t *
 
 
 static ngx_int_t
-ngx_quic_handle_stream_frame(ngx_connection_t *c,
-    ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f)
+ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
+    ngx_quic_frame_t *frame)
 {
-    size_t                  n;
-    ngx_buf_t              *b;
-    ngx_event_t            *rev;
-    ngx_quic_stream_t      *sn;
-    ngx_quic_connection_t  *qc;
+    ngx_quic_stream_t         *sn;
+    ngx_quic_connection_t     *qc;
+    ngx_quic_stream_frame_t   *f;
+    ngx_quic_frames_stream_t  *fs;
 
     qc = c->quic;
+    f = &frame->u.stream;
 
     sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
 
-    if (sn) {
+    if (sn == NULL) {
+        sn = ngx_quic_add_stream(c, f);
+        if (sn == NULL) {
+            return NGX_ERROR;
+        }
+    }
+
+    fs = &sn->fs;
+
+    return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input);
+}
+
+
+static ngx_int_t
+ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
+{
+    ngx_buf_t                *b;
+    ngx_event_t              *rev;
+    ngx_quic_stream_t        *sn;
+    ngx_quic_connection_t    *qc;
+    ngx_quic_stream_frame_t  *f;
+
+    qc = c->quic;
+
+    f = &frame->u.stream;
+
+    sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
+
+    if (sn == NULL) {
+        // TODO: possible?
+        // deleted while stream is in reordering queue?
+        return NGX_ERROR;
+    }
+
+    if (sn->fs.received != 0) {
         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
         b = sn->b;
 
@@ -1761,29 +1807,14 @@ ngx_quic_handle_stream_frame(ngx_connect
             rev->handler(rev);
         }
 
+        /* check if stream was destroyed */
+        if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
+            return NGX_DONE;
+        }
+
         return NGX_OK;
     }
 
-    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
-
-    n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
-        ? qc->tp.initial_max_stream_data_uni
-        : qc->tp.initial_max_stream_data_bidi_remote;
-
-    if (n < NGX_QUIC_STREAM_BUFSIZE) {
-        n = NGX_QUIC_STREAM_BUFSIZE;
-    }
-
-    if (n < f->length) {
-        ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
-        return NGX_ERROR;
-    }
-
-    sn = ngx_quic_create_stream(c, f->stream_id, n);
-    if (sn == NULL) {
-        return NGX_ERROR;
-    }
-
     b = sn->b;
     b->last = ngx_cpymem(b->last, f->data, f->length);
 
@@ -1800,10 +1831,50 @@ ngx_quic_handle_stream_frame(ngx_connect
 
     qc->streams.handler(sn->c);
 
+    /* check if stream was destroyed */
+    if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
+        return NGX_DONE;
+    }
+
     return NGX_OK;
 }
 
 
+static ngx_quic_stream_t *
+ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f)
+{
+    size_t                  n;
+    ngx_quic_stream_t      *sn;
+    ngx_quic_connection_t  *qc;
+
+    qc = c->quic;
+
+    // TODO: check increasing IDs
+
+    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
+
+    n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
+        ? qc->tp.initial_max_stream_data_uni
+        : qc->tp.initial_max_stream_data_bidi_remote;
+
+    if (n < NGX_QUIC_STREAM_BUFSIZE) {
+        n = NGX_QUIC_STREAM_BUFSIZE;
+    }
+
+    if (n < f->length) {
+        ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
+        return NULL;
+    }
+
+    sn = ngx_quic_create_stream(c, f->stream_id, n);
+    if (sn == NULL) {
+        return NULL;
+    }
+
+    return sn;
+}
+
+
 static ngx_int_t
 ngx_quic_handle_max_streams(ngx_connection_t *c)
 {
@@ -2024,7 +2095,6 @@ ngx_quic_output_frames(ngx_connection_t 
             return NGX_ERROR;
         }
 
-
     } while (q != ngx_queue_sentinel(&ctx->frames));
 
     return NGX_OK;
@@ -2037,15 +2107,19 @@ ngx_quic_free_frames(ngx_connection_t *c
     ngx_queue_t       *q;
     ngx_quic_frame_t  *f;
 
-    q = ngx_queue_head(frames);
-
     do {
+        q = ngx_queue_head(frames);
+
+        if (q == ngx_queue_sentinel(frames)) {
+            break;
+        }
+
+        ngx_queue_remove(q);
+
         f = ngx_queue_data(q, ngx_quic_frame_t, queue);
-        q = ngx_queue_next(q);
 
         ngx_quic_free_frame(c, f);
-
-    } while (q != ngx_queue_sentinel(frames));
+    } while (1);
 }
 
 
@@ -2237,7 +2311,7 @@ ngx_quic_retransmit_handler(ngx_event_t 
 static void
 ngx_quic_push_handler(ngx_event_t *ev)
 {
-    ngx_connection_t       *c;
+    ngx_connection_t  *c;
 
     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer");
 
@@ -2430,6 +2504,8 @@ ngx_quic_create_stream(ngx_connection_t 
         return NULL;
     }
 
+    ngx_queue_init(&sn->fs.frames);
+
     log = ngx_palloc(pool, sizeof(ngx_log_t));
     if (log == NULL) {
         ngx_destroy_pool(pool);
@@ -2595,6 +2671,8 @@ ngx_quic_stream_cleanup_handler(void *da
         return;
     }
 
+    ngx_quic_free_frames(pc, &qs->fs.frames);
+
     if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) {
         /* do not send fin for client unidirectional streams */
         return;
--- a/src/event/ngx_event_quic.h
+++ b/src/event/ngx_event_quic.h
@@ -29,33 +29,42 @@
 
 typedef struct {
     /* configurable */
-    ngx_msec_t          max_idle_timeout;
-    ngx_msec_t          max_ack_delay;
+    ngx_msec_t                 max_idle_timeout;
+    ngx_msec_t                 max_ack_delay;
 
-    ngx_uint_t          max_packet_size;
-    ngx_uint_t          initial_max_data;
-    ngx_uint_t          initial_max_stream_data_bidi_local;
-    ngx_uint_t          initial_max_stream_data_bidi_remote;
-    ngx_uint_t          initial_max_stream_data_uni;
-    ngx_uint_t          initial_max_streams_bidi;
-    ngx_uint_t          initial_max_streams_uni;
-    ngx_uint_t          ack_delay_exponent;
-    ngx_uint_t          disable_active_migration;
-    ngx_uint_t          active_connection_id_limit;
+    ngx_uint_t                 max_packet_size;
+    ngx_uint_t                 initial_max_data;
+    ngx_uint_t                 initial_max_stream_data_bidi_local;
+    ngx_uint_t                 initial_max_stream_data_bidi_remote;
+    ngx_uint_t                 initial_max_stream_data_uni;
+    ngx_uint_t                 initial_max_streams_bidi;
+    ngx_uint_t                 initial_max_streams_uni;
+    ngx_uint_t                 ack_delay_exponent;
+    ngx_uint_t                 disable_active_migration;
+    ngx_uint_t                 active_connection_id_limit;
 
     /* TODO */
-    ngx_uint_t          original_connection_id;
-    u_char              stateless_reset_token[16];
-    void               *preferred_address;
+    ngx_uint_t                 original_connection_id;
+    u_char                     stateless_reset_token[16];
+    void                      *preferred_address;
 } ngx_quic_tp_t;
 
 
+typedef struct {
+    uint64_t                   sent;
+    uint64_t                   received;
+    ngx_queue_t                frames;   /* reorder queue */
+    size_t                     total;    /* size of buffered data */
+} ngx_quic_frames_stream_t;
+
+
 struct ngx_quic_stream_s {
-    ngx_rbtree_node_t   node;
-    ngx_connection_t   *parent;
-    ngx_connection_t   *c;
-    uint64_t            id;
-    ngx_buf_t          *b;
+    ngx_rbtree_node_t          node;
+    ngx_connection_t          *parent;
+    ngx_connection_t          *c;
+    uint64_t                   id;
+    ngx_buf_t                 *b;
+    ngx_quic_frames_stream_t   fs;
 };