# HG changeset patch # User Roman Arutyunyan # Date 1639154630 -10800 # Node ID 0692355a3519024ed9b3a71a7216dcf6fe7e31ca # Parent 86f5a738ac2a9c93d2b3ef66cf84baa83b107f44 QUIC: simplified stream initialization. After creation, a client stream is added to qc->streams.uninitialized queue. After initialization it's removed from the queue. If a stream is never initialized, it is freed in ngx_quic_close_streams(). Stream initializer is now set as read event handler in stream connection. Previously qc->streams.uninitialized was used only for delayed stream initialization. The change makes it possible not to handle separately the case of a new stream in stream-related frame handlers. It makes these handlers simpler since new streams and existing streams are now handled by the same code. diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c --- a/src/event/quic/ngx_event_quic_streams.c +++ b/src/event/quic/ngx_event_quic_streams.c @@ -13,10 +13,9 @@ #define NGX_QUIC_STREAM_GONE (void *) -1 -static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c, - uint64_t id); +static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id); static ngx_int_t ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id); -static ngx_int_t ngx_quic_init_stream(ngx_quic_stream_t *qs); +static void ngx_quic_init_stream_handler(ngx_event_t *ev); static void ngx_quic_init_streams_handler(ngx_connection_t *c); static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c, uint64_t id); @@ -306,21 +305,28 @@ ngx_quic_shutdown_stream(ngx_connection_ static ngx_quic_stream_t * -ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id) +ngx_quic_get_stream(ngx_connection_t *c, uint64_t id) { uint64_t min_id; + ngx_event_t *rev; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic stream id:0x%xL is new", id); + qc = ngx_quic_get_connection(c); + + qs = ngx_quic_find_stream(&qc->streams.tree, id); - qc = ngx_quic_get_connection(c); + if (qs) { + return qs; + } if (qc->shutdown || qc->closing) { return NGX_QUIC_STREAM_GONE; } + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic stream id:0x%xL is missing", id); + if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) { if (id & NGX_QUIC_STREAM_SERVER_INITIATED) { @@ -377,7 +383,11 @@ ngx_quic_create_client_stream(ngx_connec * streams of that type with lower-numbered stream IDs also being opened. */ - for ( /* void */ ; min_id < id; min_id += 0x04) { +#if (NGX_SUPPRESS_WARN) + qs = NULL; +#endif + + for ( /* void */ ; min_id <= id; min_id += 0x04) { qs = ngx_quic_create_stream(c, min_id); @@ -389,22 +399,17 @@ ngx_quic_create_client_stream(ngx_connec continue; } - if (ngx_quic_init_stream(qs) != NGX_OK) { - return NULL; - } + ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue); - if (qc->shutdown || qc->closing) { - return NGX_QUIC_STREAM_GONE; + rev = qs->connection->read; + rev->handler = ngx_quic_init_stream_handler; + + if (qc->streams.initialized) { + ngx_post_event(rev, &ngx_posted_events); } } - qs = ngx_quic_create_stream(c, id); - if (qs == NULL) { - if (ngx_quic_reject_stream(c, id) != NGX_OK) { - return NULL; - } - return NGX_QUIC_STREAM_GONE; } @@ -461,29 +466,20 @@ ngx_quic_reject_stream(ngx_connection_t } -static ngx_int_t -ngx_quic_init_stream(ngx_quic_stream_t *qs) +static void +ngx_quic_init_stream_handler(ngx_event_t *ev) { - ngx_connection_t *c; - ngx_quic_connection_t *qc; - - qc = ngx_quic_get_connection(qs->parent); + ngx_connection_t *c; + ngx_quic_stream_t *qs; - c = qs->connection; - - if (!qc->streams.initialized) { - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic postpone stream init"); - - ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue); - return NGX_OK; - } + c = ev->data; + qs = c->quic; ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic init stream"); - c->listening->handler(c); + ngx_queue_remove(&qs->queue); - return NGX_OK; + c->listening->handler(c); } @@ -527,16 +523,12 @@ ngx_quic_init_streams_handler(ngx_connec qc = ngx_quic_get_connection(c); - while (!ngx_queue_empty(&qc->streams.uninitialized)) { - q = ngx_queue_head(&qc->streams.uninitialized); - ngx_queue_remove(q); - + for (q = ngx_queue_head(&qc->streams.uninitialized); + q != ngx_queue_sentinel(&qc->streams.uninitialized); + q = ngx_queue_next(q)) + { qs = ngx_queue_data(q, ngx_quic_stream_t, queue); - - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0, - "quic init postponed stream"); - - qs->connection->listening->handler(qs->connection); + ngx_post_event(qs->connection->read, &ngx_posted_events); } qc->streams.initialized = 1; @@ -1013,7 +1005,6 @@ ngx_quic_handle_stream_frame(ngx_connect ngx_quic_frame_t *frame) { uint64_t last; - ngx_pool_t *pool; ngx_event_t *rev; ngx_connection_t *sc; ngx_quic_stream_t *qs; @@ -1033,39 +1024,14 @@ ngx_quic_handle_stream_frame(ngx_connect /* no overflow since both values are 62-bit */ last = f->offset + f->length; - qs = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); + qs = ngx_quic_get_stream(c, f->stream_id); if (qs == NULL) { - qs = ngx_quic_create_client_stream(c, f->stream_id); - - if (qs == NULL) { - return NGX_ERROR; - } - - if (qs == NGX_QUIC_STREAM_GONE) { - return NGX_OK; - } - - sc = qs->connection; + return NGX_ERROR; + } - if (ngx_quic_control_flow(sc, last) != NGX_OK) { - goto cleanup; - } - - if (f->fin) { - sc->read->pending_eof = 1; - qs->final_size = last; - } - - if (f->offset == 0) { - sc->read->ready = 1; - } - - if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) { - goto cleanup; - } - - return ngx_quic_init_stream(qs); + if (qs == NGX_QUIC_STREAM_GONE) { + return NGX_OK; } sc = qs->connection; @@ -1125,15 +1091,6 @@ ngx_quic_handle_stream_frame(ngx_connect } return NGX_OK; - -cleanup: - - pool = sc->pool; - - ngx_close_connection(sc); - ngx_destroy_pool(pool); - - return NGX_ERROR; } @@ -1210,20 +1167,14 @@ ngx_quic_handle_stream_data_blocked_fram return NGX_ERROR; } - qs = ngx_quic_find_stream(&qc->streams.tree, f->id); + qs = ngx_quic_get_stream(c, f->id); if (qs == NULL) { - qs = ngx_quic_create_client_stream(c, f->id); - - if (qs == NULL) { - return NGX_ERROR; - } + return NGX_ERROR; + } - if (qs == NGX_QUIC_STREAM_GONE) { - return NGX_OK; - } - - return ngx_quic_init_stream(qs); + if (qs == NGX_QUIC_STREAM_GONE) { + return NGX_OK; } return ngx_quic_update_max_stream_data(qs->connection); @@ -1248,24 +1199,14 @@ ngx_quic_handle_max_stream_data_frame(ng return NGX_ERROR; } - qs = ngx_quic_find_stream(&qc->streams.tree, f->id); + qs = ngx_quic_get_stream(c, f->id); if (qs == NULL) { - qs = ngx_quic_create_client_stream(c, f->id); - - if (qs == NULL) { - return NGX_ERROR; - } + return NGX_ERROR; + } - if (qs == NGX_QUIC_STREAM_GONE) { - return NGX_OK; - } - - if (f->limit > qs->send_max_data) { - qs->send_max_data = f->limit; - } - - return ngx_quic_init_stream(qs); + if (qs == NGX_QUIC_STREAM_GONE) { + return NGX_OK; } if (f->limit <= qs->send_max_data) { @@ -1293,7 +1234,6 @@ ngx_int_t ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) { - ngx_pool_t *pool; ngx_event_t *rev; ngx_connection_t *sc; ngx_quic_stream_t *qs; @@ -1308,36 +1248,14 @@ ngx_quic_handle_reset_stream_frame(ngx_c return NGX_ERROR; } - qs = ngx_quic_find_stream(&qc->streams.tree, f->id); + qs = ngx_quic_get_stream(c, f->id); if (qs == NULL) { - qs = ngx_quic_create_client_stream(c, f->id); - - if (qs == NULL) { - return NGX_ERROR; - } - - if (qs == NGX_QUIC_STREAM_GONE) { - return NGX_OK; - } - - sc = qs->connection; + return NGX_ERROR; + } - rev = sc->read; - rev->error = 1; - rev->ready = 1; - - if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { - goto cleanup; - } - - qs->final_size = f->final_size; - - if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) { - goto cleanup; - } - - return ngx_quic_init_stream(qs); + if (qs == NGX_QUIC_STREAM_GONE) { + return NGX_OK; } sc = qs->connection; @@ -1371,15 +1289,6 @@ ngx_quic_handle_reset_stream_frame(ngx_c } return NGX_OK; - -cleanup: - - pool = sc->pool; - - ngx_close_connection(sc); - ngx_destroy_pool(pool); - - return NGX_ERROR; } @@ -1387,9 +1296,7 @@ ngx_int_t ngx_quic_handle_stop_sending_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f) { - ngx_pool_t *pool; ngx_event_t *wev; - ngx_connection_t *sc; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; @@ -1402,31 +1309,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c return NGX_ERROR; } - qs = ngx_quic_find_stream(&qc->streams.tree, f->id); + qs = ngx_quic_get_stream(c, f->id); if (qs == NULL) { - qs = ngx_quic_create_client_stream(c, f->id); - - if (qs == NULL) { - return NGX_ERROR; - } - - if (qs == NGX_QUIC_STREAM_GONE) { - return NGX_OK; - } + return NGX_ERROR; + } - sc = qs->connection; - - if (ngx_quic_reset_stream(sc, f->error_code) != NGX_OK) { - pool = sc->pool; - - ngx_close_connection(sc); - ngx_destroy_pool(pool); - - return NGX_ERROR; - } - - return ngx_quic_init_stream(qs); + if (qs == NGX_QUIC_STREAM_GONE) { + return NGX_OK; } if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {