changeset 8937:0692355a3519 quic

QUIC: simplified stream initialization. After creation, a client stream is added to qc->streams.uninitialized queue. After initialization it's removed from the queue. If a stream is never initialized, it is freed in ngx_quic_close_streams(). Stream initializer is now set as read event handler in stream connection. Previously qc->streams.uninitialized was used only for delayed stream initialization. The change makes it possible not to handle separately the case of a new stream in stream-related frame handlers. It makes these handlers simpler since new streams and existing streams are now handled by the same code.
author Roman Arutyunyan <arut@nginx.com>
date Fri, 10 Dec 2021 19:43:50 +0300
parents 86f5a738ac2a
children 23880e4ad3e2
files src/event/quic/ngx_event_quic_streams.c
diffstat 1 files changed, 62 insertions(+), 172 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/quic/ngx_event_quic_streams.c
+++ b/src/event/quic/ngx_event_quic_streams.c
@@ -13,10 +13,9 @@
 #define NGX_QUIC_STREAM_GONE     (void *) -1
 
 
-static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
-    uint64_t id);
+static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
 static ngx_int_t ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id);
-static ngx_int_t ngx_quic_init_stream(ngx_quic_stream_t *qs);
+static void ngx_quic_init_stream_handler(ngx_event_t *ev);
 static void ngx_quic_init_streams_handler(ngx_connection_t *c);
 static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
     uint64_t id);
@@ -306,21 +305,28 @@ ngx_quic_shutdown_stream(ngx_connection_
 
 
 static ngx_quic_stream_t *
-ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
+ngx_quic_get_stream(ngx_connection_t *c, uint64_t id)
 {
     uint64_t                min_id;
+    ngx_event_t            *rev;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL is new", id);
+    qc = ngx_quic_get_connection(c);
+
+    qs = ngx_quic_find_stream(&qc->streams.tree, id);
 
-    qc = ngx_quic_get_connection(c);
+    if (qs) {
+        return qs;
+    }
 
     if (qc->shutdown || qc->closing) {
         return NGX_QUIC_STREAM_GONE;
     }
 
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL is missing", id);
+
     if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
 
         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
@@ -377,7 +383,11 @@ ngx_quic_create_client_stream(ngx_connec
      * streams of that type with lower-numbered stream IDs also being opened.
      */
 
-    for ( /* void */ ; min_id < id; min_id += 0x04) {
+#if (NGX_SUPPRESS_WARN)
+    qs = NULL;
+#endif
+
+    for ( /* void */ ; min_id <= id; min_id += 0x04) {
 
         qs = ngx_quic_create_stream(c, min_id);
 
@@ -389,22 +399,17 @@ ngx_quic_create_client_stream(ngx_connec
             continue;
         }
 
-        if (ngx_quic_init_stream(qs) != NGX_OK) {
-            return NULL;
-        }
+        ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue);
 
-        if (qc->shutdown || qc->closing) {
-            return NGX_QUIC_STREAM_GONE;
+        rev = qs->connection->read;
+        rev->handler = ngx_quic_init_stream_handler;
+
+        if (qc->streams.initialized) {
+            ngx_post_event(rev, &ngx_posted_events);
         }
     }
 
-    qs = ngx_quic_create_stream(c, id);
-
     if (qs == NULL) {
-        if (ngx_quic_reject_stream(c, id) != NGX_OK) {
-            return NULL;
-        }
-
         return NGX_QUIC_STREAM_GONE;
     }
 
@@ -461,29 +466,20 @@ ngx_quic_reject_stream(ngx_connection_t 
 }
 
 
-static ngx_int_t
-ngx_quic_init_stream(ngx_quic_stream_t *qs)
+static void
+ngx_quic_init_stream_handler(ngx_event_t *ev)
 {
-    ngx_connection_t       *c;
-    ngx_quic_connection_t  *qc;
-
-    qc = ngx_quic_get_connection(qs->parent);
+    ngx_connection_t   *c;
+    ngx_quic_stream_t  *qs;
 
-    c = qs->connection;
-
-    if (!qc->streams.initialized) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic postpone stream init");
-
-        ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue);
-        return NGX_OK;
-    }
+    c = ev->data;
+    qs = c->quic;
 
     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic init stream");
 
-    c->listening->handler(c);
+    ngx_queue_remove(&qs->queue);
 
-    return NGX_OK;
+    c->listening->handler(c);
 }
 
 
@@ -527,16 +523,12 @@ ngx_quic_init_streams_handler(ngx_connec
 
     qc = ngx_quic_get_connection(c);
 
-    while (!ngx_queue_empty(&qc->streams.uninitialized)) {
-        q = ngx_queue_head(&qc->streams.uninitialized);
-        ngx_queue_remove(q);
-
+    for (q = ngx_queue_head(&qc->streams.uninitialized);
+         q != ngx_queue_sentinel(&qc->streams.uninitialized);
+         q = ngx_queue_next(q))
+    {
         qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
-
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
-                       "quic init postponed stream");
-
-        qs->connection->listening->handler(qs->connection);
+        ngx_post_event(qs->connection->read, &ngx_posted_events);
     }
 
     qc->streams.initialized = 1;
@@ -1013,7 +1005,6 @@ ngx_quic_handle_stream_frame(ngx_connect
     ngx_quic_frame_t *frame)
 {
     uint64_t                  last;
-    ngx_pool_t               *pool;
     ngx_event_t              *rev;
     ngx_connection_t         *sc;
     ngx_quic_stream_t        *qs;
@@ -1033,39 +1024,14 @@ ngx_quic_handle_stream_frame(ngx_connect
     /* no overflow since both values are 62-bit */
     last = f->offset + f->length;
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
+    qs = ngx_quic_get_stream(c, f->stream_id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->stream_id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
-
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        sc = qs->connection;
+        return NGX_ERROR;
+    }
 
-        if (ngx_quic_control_flow(sc, last) != NGX_OK) {
-            goto cleanup;
-        }
-
-        if (f->fin) {
-            sc->read->pending_eof = 1;
-            qs->final_size = last;
-        }
-
-        if (f->offset == 0) {
-            sc->read->ready = 1;
-        }
-
-        if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) {
-            goto cleanup;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     sc = qs->connection;
@@ -1125,15 +1091,6 @@ ngx_quic_handle_stream_frame(ngx_connect
     }
 
     return NGX_OK;
-
-cleanup:
-
-    pool = sc->pool;
-
-    ngx_close_connection(sc);
-    ngx_destroy_pool(pool);
-
-    return NGX_ERROR;
 }
 
 
@@ -1210,20 +1167,14 @@ ngx_quic_handle_stream_data_blocked_fram
         return NGX_ERROR;
     }
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
+    qs = ngx_quic_get_stream(c, f->id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
+        return NGX_ERROR;
+    }
 
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     return ngx_quic_update_max_stream_data(qs->connection);
@@ -1248,24 +1199,14 @@ ngx_quic_handle_max_stream_data_frame(ng
         return NGX_ERROR;
     }
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
+    qs = ngx_quic_get_stream(c, f->id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
+        return NGX_ERROR;
+    }
 
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        if (f->limit > qs->send_max_data) {
-            qs->send_max_data = f->limit;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     if (f->limit <= qs->send_max_data) {
@@ -1293,7 +1234,6 @@ ngx_int_t
 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
 {
-    ngx_pool_t             *pool;
     ngx_event_t            *rev;
     ngx_connection_t       *sc;
     ngx_quic_stream_t      *qs;
@@ -1308,36 +1248,14 @@ ngx_quic_handle_reset_stream_frame(ngx_c
         return NGX_ERROR;
     }
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
+    qs = ngx_quic_get_stream(c, f->id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
-
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        sc = qs->connection;
+        return NGX_ERROR;
+    }
 
-        rev = sc->read;
-        rev->error = 1;
-        rev->ready = 1;
-
-        if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
-            goto cleanup;
-        }
-
-        qs->final_size = f->final_size;
-
-        if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
-            goto cleanup;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     sc = qs->connection;
@@ -1371,15 +1289,6 @@ ngx_quic_handle_reset_stream_frame(ngx_c
     }
 
     return NGX_OK;
-
-cleanup:
-
-    pool = sc->pool;
-
-    ngx_close_connection(sc);
-    ngx_destroy_pool(pool);
-
-    return NGX_ERROR;
 }
 
 
@@ -1387,9 +1296,7 @@ ngx_int_t
 ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f)
 {
-    ngx_pool_t             *pool;
     ngx_event_t            *wev;
-    ngx_connection_t       *sc;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
@@ -1402,31 +1309,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c
         return NGX_ERROR;
     }
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
+    qs = ngx_quic_get_stream(c, f->id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
-
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
+        return NGX_ERROR;
+    }
 
-        sc = qs->connection;
-
-        if (ngx_quic_reset_stream(sc, f->error_code) != NGX_OK) {
-            pool = sc->pool;
-
-            ngx_close_connection(sc);
-            ngx_destroy_pool(pool);
-
-            return NGX_ERROR;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {