changeset 8791:af33d1ef1c3c quic

QUIC: stream flow control refactored. - Function ngx_quic_control_flow() is introduced. This functions does both MAX_DATA and MAX_STREAM_DATA flow controls. The function is called from STREAM and RESET_STREAM frame handlers. Previously, flow control was only accounted for STREAM. Also, MAX_DATA flow control was not accounted at all. - Function ngx_quic_update_flow() is introduced. This function advances flow control windows and sends MAX_DATA/MAX_STREAM_DATA. The function is called from RESET_STREAM frame handler, stream cleanup handler and stream recv() handler.
author Roman Arutyunyan <arut@nginx.com>
date Mon, 07 Jun 2021 10:12:46 +0300
parents ac0398da8f23
children 004172345bdc
files src/event/quic/ngx_event_quic.c src/event/quic/ngx_event_quic.h src/event/quic/ngx_event_quic_connection.h src/event/quic/ngx_event_quic_streams.c
diffstat 4 files changed, 179 insertions(+), 64 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/quic/ngx_event_quic.c
+++ b/src/event/quic/ngx_event_quic.c
@@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t
     ctp->active_connection_id_limit = 2;
 
     qc->streams.recv_max_data = qc->tp.initial_max_data;
+    qc->streams.recv_window = qc->streams.recv_max_data;
 
     qc->streams.client_max_streams_uni = qc->tp.initial_max_streams_uni;
     qc->streams.client_max_streams_bidi = qc->tp.initial_max_streams_bidi;
--- a/src/event/quic/ngx_event_quic.h
+++ b/src/event/quic/ngx_event_quic.h
@@ -75,6 +75,7 @@ struct ngx_quic_stream_s {
     uint64_t                   send_max_data;
     uint64_t                   recv_max_data;
     uint64_t                   recv_offset;
+    uint64_t                   recv_window;
     uint64_t                   recv_last;
     uint64_t                   final_size;
     ngx_chain_t               *in;
--- a/src/event/quic/ngx_event_quic_connection.h
+++ b/src/event/quic/ngx_event_quic_connection.h
@@ -115,8 +115,10 @@ typedef struct {
     ngx_rbtree_t                      tree;
     ngx_rbtree_node_t                 sentinel;
 
-    uint64_t                          received;
     uint64_t                          sent;
+    uint64_t                          recv_offset;
+    uint64_t                          recv_window;
+    uint64_t                          recv_last;
     uint64_t                          recv_max_data;
     uint64_t                          send_max_data;
 
--- a/src/event/quic/ngx_event_quic_streams.c
+++ b/src/event/quic/ngx_event_quic_streams.c
@@ -25,6 +25,8 @@ static ngx_chain_t *ngx_quic_stream_send
     ngx_chain_t *in, off_t limit);
 static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
 static void ngx_quic_stream_cleanup_handler(void *data);
+static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
+static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
 
 
 ngx_connection_t *
@@ -413,6 +415,8 @@ ngx_quic_create_stream(ngx_connection_t 
         }
     }
 
+    qs->recv_window = qs->recv_max_data;
+
     cln = ngx_pool_cleanup_add(pool, 0);
     if (cln == NULL) {
         ngx_close_connection(sc);
@@ -432,18 +436,15 @@ ngx_quic_create_stream(ngx_connection_t 
 static ssize_t
 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
 {
-    ssize_t                 len, n;
-    ngx_buf_t              *b;
-    ngx_chain_t            *cl, **ll;
-    ngx_event_t            *rev;
-    ngx_connection_t       *pc;
-    ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *qs;
-    ngx_quic_connection_t  *qc;
+    ssize_t             len, n;
+    ngx_buf_t          *b;
+    ngx_chain_t        *cl, **ll;
+    ngx_event_t        *rev;
+    ngx_connection_t   *pc;
+    ngx_quic_stream_t  *qs;
 
     qs = c->quic;
     pc = qs->parent;
-    qc = ngx_quic_get_connection(pc);
     rev = c->read;
 
     if (rev->error) {
@@ -495,10 +496,6 @@ ngx_quic_stream_recv(ngx_connection_t *c
 
     ngx_quic_free_bufs(pc, cl);
 
-    qc->streams.received += len;
-    qs->recv_max_data += len;
-    qs->recv_offset += len;
-
     if (qs->in == NULL) {
         rev->ready = rev->pending_eof;
     }
@@ -506,39 +503,8 @@ ngx_quic_stream_recv(ngx_connection_t *c
     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
                    "quic stream id:0x%xL recv len:%z", qs->id, len);
 
-    if (!rev->pending_eof) {
-        frame = ngx_quic_alloc_frame(pc);
-        if (frame == NULL) {
-            return NGX_ERROR;
-        }
-
-        frame->level = ssl_encryption_application;
-        frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
-        frame->u.max_stream_data.id = qs->id;
-        frame->u.max_stream_data.limit = qs->recv_max_data;
-
-        ngx_quic_queue_frame(qc, frame);
-    }
-
-    if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
-
-        frame = ngx_quic_alloc_frame(pc);
-
-        if (frame == NULL) {
-            return NGX_ERROR;
-        }
-
-        qc->streams.recv_max_data *= 2;
-
-        frame->level = ssl_encryption_application;
-        frame->type = NGX_QUIC_FT_MAX_DATA;
-        frame->u.max_data.max_data = qc->streams.recv_max_data;
-
-        ngx_quic_queue_frame(qc, frame);
-
-        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic stream id:0x%xL recv: increased max_data:%uL",
-                       qs->id, qc->streams.recv_max_data);
+    if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
+        return NGX_ERROR;
     }
 
     return len;
@@ -729,6 +695,10 @@ ngx_quic_stream_cleanup_handler(void *da
         goto done;
     }
 
+    c->read->pending_eof = 1;
+
+    (void) ngx_quic_update_flow(c, qs->recv_last);
+
     if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
         || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
     {
@@ -848,8 +818,7 @@ ngx_quic_handle_stream_frame(ngx_connect
 
         sc = qs->connection;
 
-        if (last > qs->recv_max_data) {
-            qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+        if (ngx_quic_control_flow(sc, last) != NGX_OK) {
             goto cleanup;
         }
 
@@ -858,8 +827,6 @@ ngx_quic_handle_stream_frame(ngx_connect
             qs->final_size = last;
         }
 
-        qs->recv_last = last;
-
         if (f->offset == 0) {
             sc->read->ready = 1;
         }
@@ -873,8 +840,15 @@ ngx_quic_handle_stream_frame(ngx_connect
         return NGX_OK;
     }
 
-    if (last > qs->recv_max_data) {
-        qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+    sc = qs->connection;
+
+    rev = sc->read;
+
+    if (rev->error) {
+        return NGX_OK;
+    }
+
+    if (ngx_quic_control_flow(sc, last) != NGX_OK) {
         return NGX_ERROR;
     }
 
@@ -887,17 +861,11 @@ ngx_quic_handle_stream_frame(ngx_connect
         return NGX_OK;
     }
 
-    if (qs->recv_last < last) {
-        qs->recv_last = last;
-    }
-
     if (f->offset < qs->recv_offset) {
         ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset);
         f->offset = qs->recv_offset;
     }
 
-    rev = qs->connection->read;
-
     if (f->fin) {
         if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
             qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
@@ -1108,6 +1076,7 @@ ngx_int_t
 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
 {
+    ngx_pool_t             *pool;
     ngx_event_t            *rev;
     ngx_connection_t       *sc;
     ngx_quic_stream_t      *qs;
@@ -1135,19 +1104,37 @@ ngx_quic_handle_reset_stream_frame(ngx_c
             return NGX_OK;
         }
 
-        qs->final_size = f->final_size;
-
         sc = qs->connection;
 
         rev = sc->read;
         rev->error = 1;
         rev->ready = 1;
 
+        if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
+            goto cleanup;
+        }
+
+        qs->final_size = f->final_size;
+
+        if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
+            goto cleanup;
+        }
+
         sc->listening->handler(sc);
 
         return NGX_OK;
     }
 
+    sc = qs->connection;
+
+    rev = sc->read;
+    rev->error = 1;
+    rev->ready = 1;
+
+    if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
+        return NGX_ERROR;
+    }
+
     if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
         qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
         return NGX_ERROR;
@@ -1160,15 +1147,24 @@ ngx_quic_handle_reset_stream_frame(ngx_c
 
     qs->final_size = f->final_size;
 
-    rev = qs->connection->read;
-    rev->error = 1;
-    rev->ready = 1;
+    if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
+        return NGX_ERROR;
+    }
 
     if (rev->active) {
         rev->handler(rev);
     }
 
     return NGX_OK;
+
+cleanup:
+
+    pool = sc->pool;
+
+    ngx_close_connection(sc);
+    ngx_destroy_pool(pool);
+
+    return NGX_ERROR;
 }
 
 
@@ -1285,3 +1281,118 @@ ngx_quic_handle_stream_ack(ngx_connectio
                    "quic stream ack len:%uL acked:%uL unacked:%uL",
                    f->u.stream.length, qs->acked, sent - qs->acked);
 }
+
+
+static ngx_int_t
+ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
+{
+    uint64_t                len;
+    ngx_event_t            *rev;
+    ngx_quic_stream_t      *qs;
+    ngx_quic_connection_t  *qc;
+
+    rev = c->read;
+    qs = c->quic;
+    qc = ngx_quic_get_connection(qs->parent);
+
+    if (last <= qs->recv_last) {
+        return NGX_OK;
+    }
+
+    len = last - qs->recv_last;
+
+    ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic flow control msd:%uL/%uL md:%uL/%uL",
+                   last, qs->recv_max_data, qc->streams.recv_last + len,
+                   qc->streams.recv_max_data);
+
+    qs->recv_last += len;
+
+    if (!rev->error && qs->recv_last > qs->recv_max_data) {
+        qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+        return NGX_ERROR;
+    }
+
+    qc->streams.recv_last += len;
+
+    if (qc->streams.recv_last > qc->streams.recv_max_data) {
+        qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+        return NGX_ERROR;
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
+{
+    uint64_t                len;
+    ngx_event_t            *rev;
+    ngx_connection_t       *pc;
+    ngx_quic_frame_t       *frame;
+    ngx_quic_stream_t      *qs;
+    ngx_quic_connection_t  *qc;
+
+    rev = c->read;
+    qs = c->quic;
+    pc = qs->parent;
+    qc = ngx_quic_get_connection(pc);
+
+    if (last <= qs->recv_offset) {
+        return NGX_OK;
+    }
+
+    len = last - qs->recv_offset;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic flow update %uL", last);
+
+    qs->recv_offset += len;
+
+    if (!rev->pending_eof && !rev->error
+        && qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2)
+    {
+        qs->recv_max_data = qs->recv_offset + qs->recv_window;
+
+        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic flow update msd:%uL", qs->recv_max_data);
+
+        frame = ngx_quic_alloc_frame(pc);
+        if (frame == NULL) {
+            return NGX_ERROR;
+        }
+
+        frame->level = ssl_encryption_application;
+        frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
+        frame->u.max_stream_data.id = qs->id;
+        frame->u.max_stream_data.limit = qs->recv_max_data;
+
+        ngx_quic_queue_frame(qc, frame);
+    }
+
+    qc->streams.recv_offset += len;
+
+    if (qc->streams.recv_max_data
+        <= qc->streams.recv_offset + qc->streams.recv_window / 2)
+    {
+        qc->streams.recv_max_data = qc->streams.recv_offset
+                                    + qc->streams.recv_window;
+
+        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+                       "quic flow update md:%uL", qc->streams.recv_max_data);
+
+        frame = ngx_quic_alloc_frame(pc);
+        if (frame == NULL) {
+            return NGX_ERROR;
+        }
+
+        frame->level = ssl_encryption_application;
+        frame->type = NGX_QUIC_FT_MAX_DATA;
+        frame->u.max_data.max_data = qc->streams.recv_max_data;
+
+        ngx_quic_queue_frame(qc, frame);
+    }
+
+    return NGX_OK;
+}