# HG changeset patch # User Vladimir Homutov # Date 1586938314 -10800 # Node ID 72d20158c814fe22552f93578bbf0e7da6d4ef26 # Parent 167d324767376de2e85dfcadb5a177ee139595f7 Added reordering support for STREAM frames. Each stream node now includes incoming frames queue and sent/received counters for tracking offset. The sent counter is not used, c->sent is used, not like in crypto buffers, which have no connections. diff --git a/src/event/ngx_event_quic.c b/src/event/ngx_event_quic.c --- a/src/event/ngx_event_quic.c +++ b/src/event/ngx_event_quic.c @@ -70,15 +70,6 @@ typedef struct { } ngx_quic_send_ctx_t; -/* ordered frames stream context */ -typedef struct { - uint64_t sent; - uint64_t received; - ngx_queue_t frames; - size_t total; /* size of buffered data */ -} ngx_quic_frames_stream_t; - - struct ngx_quic_connection_s { ngx_str_t scid; ngx_str_t dcid; @@ -177,7 +168,12 @@ static ngx_int_t ngx_quic_handle_ordered static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c, ngx_quic_frame_t *frame); static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c, - ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame); + ngx_quic_header_t *pkt, ngx_quic_frame_t *frame); +static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c, + ngx_quic_frame_t *frame); +static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c, + ngx_quic_stream_frame_t *f); + static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c); static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f); @@ -739,6 +735,7 @@ ngx_quic_close_connection(ngx_connection #if (NGX_DEBUG) ngx_uint_t ns; #endif + ngx_uint_t i; ngx_pool_t *pool; ngx_event_t *rev; ngx_rbtree_t *tree; @@ -748,11 +745,14 @@ ngx_quic_close_connection(ngx_connection ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection"); - // TODO: free frames from reorder queue if any - qc = c->quic; if (qc) { + + for (i = 0; i < NGX_QUIC_ENCRYPTION_LAST; i++) { + ngx_quic_free_frames(c, &qc->crypto[i].frames); + } + qc->closing = 1; tree = &qc->streams.tree; @@ -1201,9 +1201,7 @@ ngx_quic_payload_handler(ngx_connection_ case NGX_QUIC_FT_STREAM6: case NGX_QUIC_FT_STREAM7: - if (ngx_quic_handle_stream_frame(c, pkt, &frame.u.stream) - != NGX_OK) - { + if (ngx_quic_handle_stream_frame(c, pkt, &frame) != NGX_OK) { return NGX_ERROR; } @@ -1441,6 +1439,7 @@ ngx_quic_handle_ordered_frame(ngx_connec ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler) { size_t full_len; + ngx_int_t rc; ngx_queue_t *q; ngx_quic_ordered_frame_t *f; @@ -1468,10 +1467,17 @@ ngx_quic_handle_ordered_frame(ngx_connec /* f->offset == fs->received */ - if (handler(c, frame) != NGX_OK) { + rc = handler(c, frame); + if (rc == NGX_ERROR) { return NGX_ERROR; + + } else if (rc == NGX_DONE) { + /* handler destroyed stream, queue no longer exists */ + return NGX_OK; } + /* rc == NGX_OK */ + fs->received += f->length; /* now check the queue if we can continue with buffered frames */ @@ -1512,8 +1518,14 @@ ngx_quic_handle_ordered_frame(ngx_connec /* f->offset == fs->received */ - if (handler(c, frame) != NGX_OK) { + rc = handler(c, frame); + + if (rc == NGX_ERROR) { return NGX_ERROR; + + } else if (rc == NGX_DONE) { + /* handler destroyed stream, queue no longer exists */ + return NGX_OK; } fs->received += f->length; @@ -1721,20 +1733,54 @@ ngx_quic_crypto_input(ngx_connection_t * static ngx_int_t -ngx_quic_handle_stream_frame(ngx_connection_t *c, - ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f) +ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, + ngx_quic_frame_t *frame) { - size_t n; - ngx_buf_t *b; - ngx_event_t *rev; - ngx_quic_stream_t *sn; - ngx_quic_connection_t *qc; + ngx_quic_stream_t *sn; + ngx_quic_connection_t *qc; + ngx_quic_stream_frame_t *f; + ngx_quic_frames_stream_t *fs; qc = c->quic; + f = &frame->u.stream; sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); - if (sn) { + if (sn == NULL) { + sn = ngx_quic_add_stream(c, f); + if (sn == NULL) { + return NGX_ERROR; + } + } + + fs = &sn->fs; + + return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input); +} + + +static ngx_int_t +ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame) +{ + ngx_buf_t *b; + ngx_event_t *rev; + ngx_quic_stream_t *sn; + ngx_quic_connection_t *qc; + ngx_quic_stream_frame_t *f; + + qc = c->quic; + + f = &frame->u.stream; + + sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id); + + if (sn == NULL) { + // TODO: possible? + // deleted while stream is in reordering queue? + return NGX_ERROR; + } + + if (sn->fs.received != 0) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); b = sn->b; @@ -1761,29 +1807,14 @@ ngx_quic_handle_stream_frame(ngx_connect rev->handler(rev); } + /* check if stream was destroyed */ + if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) { + return NGX_DONE; + } + return NGX_OK; } - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); - - n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) - ? qc->tp.initial_max_stream_data_uni - : qc->tp.initial_max_stream_data_bidi_remote; - - if (n < NGX_QUIC_STREAM_BUFSIZE) { - n = NGX_QUIC_STREAM_BUFSIZE; - } - - if (n < f->length) { - ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); - return NGX_ERROR; - } - - sn = ngx_quic_create_stream(c, f->stream_id, n); - if (sn == NULL) { - return NGX_ERROR; - } - b = sn->b; b->last = ngx_cpymem(b->last, f->data, f->length); @@ -1800,10 +1831,50 @@ ngx_quic_handle_stream_frame(ngx_connect qc->streams.handler(sn->c); + /* check if stream was destroyed */ + if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) { + return NGX_DONE; + } + return NGX_OK; } +static ngx_quic_stream_t * +ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f) +{ + size_t n; + ngx_quic_stream_t *sn; + ngx_quic_connection_t *qc; + + qc = c->quic; + + // TODO: check increasing IDs + + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); + + n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) + ? qc->tp.initial_max_stream_data_uni + : qc->tp.initial_max_stream_data_bidi_remote; + + if (n < NGX_QUIC_STREAM_BUFSIZE) { + n = NGX_QUIC_STREAM_BUFSIZE; + } + + if (n < f->length) { + ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer"); + return NULL; + } + + sn = ngx_quic_create_stream(c, f->stream_id, n); + if (sn == NULL) { + return NULL; + } + + return sn; +} + + static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c) { @@ -2024,7 +2095,6 @@ ngx_quic_output_frames(ngx_connection_t return NGX_ERROR; } - } while (q != ngx_queue_sentinel(&ctx->frames)); return NGX_OK; @@ -2037,15 +2107,19 @@ ngx_quic_free_frames(ngx_connection_t *c ngx_queue_t *q; ngx_quic_frame_t *f; - q = ngx_queue_head(frames); - do { + q = ngx_queue_head(frames); + + if (q == ngx_queue_sentinel(frames)) { + break; + } + + ngx_queue_remove(q); + f = ngx_queue_data(q, ngx_quic_frame_t, queue); - q = ngx_queue_next(q); ngx_quic_free_frame(c, f); - - } while (q != ngx_queue_sentinel(frames)); + } while (1); } @@ -2237,7 +2311,7 @@ ngx_quic_retransmit_handler(ngx_event_t static void ngx_quic_push_handler(ngx_event_t *ev) { - ngx_connection_t *c; + ngx_connection_t *c; ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer"); @@ -2430,6 +2504,8 @@ ngx_quic_create_stream(ngx_connection_t return NULL; } + ngx_queue_init(&sn->fs.frames); + log = ngx_palloc(pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_destroy_pool(pool); @@ -2595,6 +2671,8 @@ ngx_quic_stream_cleanup_handler(void *da return; } + ngx_quic_free_frames(pc, &qs->fs.frames); + if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) { /* do not send fin for client unidirectional streams */ return; diff --git a/src/event/ngx_event_quic.h b/src/event/ngx_event_quic.h --- a/src/event/ngx_event_quic.h +++ b/src/event/ngx_event_quic.h @@ -29,33 +29,42 @@ typedef struct { /* configurable */ - ngx_msec_t max_idle_timeout; - ngx_msec_t max_ack_delay; + ngx_msec_t max_idle_timeout; + ngx_msec_t max_ack_delay; - ngx_uint_t max_packet_size; - ngx_uint_t initial_max_data; - ngx_uint_t initial_max_stream_data_bidi_local; - ngx_uint_t initial_max_stream_data_bidi_remote; - ngx_uint_t initial_max_stream_data_uni; - ngx_uint_t initial_max_streams_bidi; - ngx_uint_t initial_max_streams_uni; - ngx_uint_t ack_delay_exponent; - ngx_uint_t disable_active_migration; - ngx_uint_t active_connection_id_limit; + ngx_uint_t max_packet_size; + ngx_uint_t initial_max_data; + ngx_uint_t initial_max_stream_data_bidi_local; + ngx_uint_t initial_max_stream_data_bidi_remote; + ngx_uint_t initial_max_stream_data_uni; + ngx_uint_t initial_max_streams_bidi; + ngx_uint_t initial_max_streams_uni; + ngx_uint_t ack_delay_exponent; + ngx_uint_t disable_active_migration; + ngx_uint_t active_connection_id_limit; /* TODO */ - ngx_uint_t original_connection_id; - u_char stateless_reset_token[16]; - void *preferred_address; + ngx_uint_t original_connection_id; + u_char stateless_reset_token[16]; + void *preferred_address; } ngx_quic_tp_t; +typedef struct { + uint64_t sent; + uint64_t received; + ngx_queue_t frames; /* reorder queue */ + size_t total; /* size of buffered data */ +} ngx_quic_frames_stream_t; + + struct ngx_quic_stream_s { - ngx_rbtree_node_t node; - ngx_connection_t *parent; - ngx_connection_t *c; - uint64_t id; - ngx_buf_t *b; + ngx_rbtree_node_t node; + ngx_connection_t *parent; + ngx_connection_t *c; + uint64_t id; + ngx_buf_t *b; + ngx_quic_frames_stream_t fs; };