# HG changeset patch # User Roman Arutyunyan # Date 1643611562 -10800 # Node ID b42a041d23a2226ec6def395bd0b084889b85473 # Parent 81a3429db8b00ec9fc476d3687d1cd18088f3365 QUIC: introduced explicit stream states. This allows to eliminate the usage of stream connection event flags for tracking stream state. diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h --- a/src/event/quic/ngx_event_quic.h +++ b/src/event/quic/ngx_event_quic.h @@ -28,6 +28,26 @@ #define NGX_QUIC_STREAM_UNIDIRECTIONAL 0x02 +typedef enum { + NGX_QUIC_STREAM_SEND_READY = 0, + NGX_QUIC_STREAM_SEND_SEND, + NGX_QUIC_STREAM_SEND_DATA_SENT, + NGX_QUIC_STREAM_SEND_DATA_RECVD, + NGX_QUIC_STREAM_SEND_RESET_SENT, + NGX_QUIC_STREAM_SEND_RESET_RECVD +} ngx_quic_stream_send_state_e; + + +typedef enum { + NGX_QUIC_STREAM_RECV_RECV = 0, + NGX_QUIC_STREAM_RECV_SIZE_KNOWN, + NGX_QUIC_STREAM_RECV_DATA_RECVD, + NGX_QUIC_STREAM_RECV_DATA_READ, + NGX_QUIC_STREAM_RECV_RESET_RECVD, + NGX_QUIC_STREAM_RECV_RESET_READ +} ngx_quic_stream_recv_state_e; + + typedef struct { ngx_ssl_t *ssl; @@ -66,6 +86,8 @@ struct ngx_quic_stream_s { ngx_chain_t *in; ngx_chain_t *out; ngx_uint_t cancelable; /* unsigned cancelable:1; */ + ngx_quic_stream_send_state_e send_state; + ngx_quic_stream_recv_state_e recv_state; }; diff --git a/src/event/quic/ngx_event_quic_ack.c b/src/event/quic/ngx_event_quic_ack.c --- a/src/event/quic/ngx_event_quic_ack.c +++ b/src/event/quic/ngx_event_quic_ack.c @@ -617,10 +617,13 @@ ngx_quic_resend_frames(ngx_connection_t case NGX_QUIC_FT_STREAM: qs = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id); - if (qs && qs->connection->write->error) { - /* RESET_STREAM was sent */ - ngx_quic_free_frame(c, f); - break; + if (qs) { + if (qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT + || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD) + { + ngx_quic_free_frame(c, f); + break; + } } /* fall through */ diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c --- a/src/event/quic/ngx_event_quic_streams.c +++ b/src/event/quic/ngx_event_quic_streams.c @@ -192,12 +192,13 @@ ngx_quic_close_streams(ngx_connection_t { qs = (ngx_quic_stream_t *) node; + qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; + qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; + rev = qs->connection->read; - rev->error = 1; rev->ready = 1; wev = qs->connection->write; - wev->error = 1; wev->ready = 1; ngx_post_event(rev, &ngx_posted_events); @@ -221,19 +222,22 @@ ngx_quic_close_streams(ngx_connection_t ngx_int_t ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err) { - ngx_event_t *wev; ngx_connection_t *pc; ngx_quic_frame_t *frame; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - wev = c->write; + qs = c->quic; - if (wev->error) { + if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD + || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT + || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD) + { return NGX_OK; } - qs = c->quic; + qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; + pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -250,9 +254,6 @@ ngx_quic_reset_stream(ngx_connection_t * ngx_quic_queue_frame(qc, frame); - wev->error = 1; - wev->ready = 1; - return NGX_OK; } @@ -260,27 +261,15 @@ ngx_quic_reset_stream(ngx_connection_t * ngx_int_t ngx_quic_shutdown_stream(ngx_connection_t *c, int how) { - ngx_quic_stream_t *qs; - - qs = c->quic; - if (how == NGX_RDWR_SHUTDOWN || how == NGX_WRITE_SHUTDOWN) { - if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) - || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) - { - if (ngx_quic_shutdown_stream_send(c) != NGX_OK) { - return NGX_ERROR; - } + if (ngx_quic_shutdown_stream_send(c) != NGX_OK) { + return NGX_ERROR; } } if (how == NGX_RDWR_SHUTDOWN || how == NGX_READ_SHUTDOWN) { - if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0 - || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) - { - if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) { - return NGX_ERROR; - } + if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) { + return NGX_ERROR; } } @@ -291,19 +280,21 @@ ngx_quic_shutdown_stream(ngx_connection_ static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c) { - ngx_event_t *wev; ngx_connection_t *pc; ngx_quic_frame_t *frame; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - wev = c->write; + qs = c->quic; - if (wev->error) { + if (qs->send_state != NGX_QUIC_STREAM_SEND_READY + && qs->send_state != NGX_QUIC_STREAM_SEND_SEND) + { return NGX_OK; } - qs = c->quic; + qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; + pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -327,8 +318,6 @@ ngx_quic_shutdown_stream_send(ngx_connec ngx_quic_queue_frame(qc, frame); - wev->error = 1; - return NGX_OK; } @@ -336,19 +325,19 @@ ngx_quic_shutdown_stream_send(ngx_connec static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c) { - ngx_event_t *rev; ngx_connection_t *pc; ngx_quic_frame_t *frame; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - rev = c->read; + qs = c->quic; - if (rev->pending_eof || rev->error) { + if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV + && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) + { return NGX_OK; } - qs = c->quic; pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -371,8 +360,6 @@ ngx_quic_shutdown_stream_recv(ngx_connec ngx_quic_queue_frame(qc, frame); - rev->error = 1; - return NGX_OK; } @@ -690,9 +677,13 @@ ngx_quic_create_stream(ngx_connection_t if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) { if (id & NGX_QUIC_STREAM_SERVER_INITIATED) { qs->send_max_data = qc->ctp.initial_max_stream_data_uni; + qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ; + qs->send_state = NGX_QUIC_STREAM_SEND_READY; } else { qs->recv_max_data = qc->tp.initial_max_stream_data_uni; + qs->recv_state = NGX_QUIC_STREAM_RECV_RECV; + qs->send_state = NGX_QUIC_STREAM_SEND_DATA_RECVD; } } else { @@ -704,6 +695,9 @@ ngx_quic_create_stream(ngx_connection_t qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local; qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote; } + + qs->recv_state = NGX_QUIC_STREAM_RECV_RECV; + qs->send_state = NGX_QUIC_STREAM_SEND_READY; } qs->recv_window = qs->recv_max_data; @@ -744,25 +738,19 @@ ngx_quic_stream_recv(ngx_connection_t *c pc = qs->parent; rev = c->read; - if (rev->error) { + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD + || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ) + { + qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_READ; + rev->error = 1; return NGX_ERROR; } - ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic stream id:0x%xL recv eof:%d buf:%uz", - qs->id, rev->pending_eof, size); - - if (qs->in == NULL || qs->in->buf->sync) { - rev->ready = 0; + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic stream id:0x%xL recv buf:%uz", qs->id, size); - if (qs->recv_offset == qs->final_size) { - rev->eof = 1; - return 0; - } - - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic stream id:0x%xL recv() not ready", qs->id); - return NGX_AGAIN; + if (size == 0) { + return 0; } in = ngx_quic_read_chain(pc, &qs->in, size); @@ -780,8 +768,23 @@ ngx_quic_stream_recv(ngx_connection_t *c ngx_quic_free_chain(pc, in); - if (qs->in == NULL) { - rev->ready = rev->pending_eof; + if (len == 0) { + rev->ready = 0; + + if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN + && qs->recv_offset == qs->final_size) + { + qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ; + } + + if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_READ) { + rev->eof = 1; + return 0; + } + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic stream id:0x%xL recv() not ready", qs->id); + return NGX_AGAIN; } ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, @@ -839,10 +842,15 @@ ngx_quic_stream_send_chain(ngx_connectio qc = ngx_quic_get_connection(pc); wev = c->write; - if (wev->error) { + if (qs->send_state != NGX_QUIC_STREAM_SEND_READY + && qs->send_state != NGX_QUIC_STREAM_SEND_SEND) + { + wev->error = 1; return NGX_CHAIN_ERROR; } + qs->send_state = NGX_QUIC_STREAM_SEND_SEND; + flow = ngx_quic_max_stream_flow(c); if (flow == 0) { wev->ready = 0; @@ -1051,9 +1059,9 @@ ngx_quic_handle_stream_frame(ngx_connect sc = qs->connection; - rev = sc->read; - - if (rev->error) { + if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV + && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) + { return NGX_OK; } @@ -1086,8 +1094,8 @@ ngx_quic_handle_stream_frame(ngx_connect return NGX_ERROR; } - rev->pending_eof = 1; qs->final_size = last; + qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN; } if (ngx_quic_write_chain(c, &qs->in, frame->data, f->length, @@ -1098,6 +1106,7 @@ ngx_quic_handle_stream_frame(ngx_connect } if (f->offset == qs->recv_offset) { + rev = sc->read; rev->ready = 1; if (rev->active) { @@ -1273,11 +1282,15 @@ ngx_quic_handle_reset_stream_frame(ngx_c return NGX_OK; } - sc = qs->connection; + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD + || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ) + { + return NGX_OK; + } - rev = sc->read; - rev->error = 1; - rev->ready = 1; + qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; + + sc = qs->connection; if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { return NGX_ERROR; @@ -1299,6 +1312,9 @@ ngx_quic_handle_reset_stream_frame(ngx_c return NGX_ERROR; } + rev = sc->read; + rev->ready = 1; + if (rev->active) { ngx_post_event(rev, &ngx_posted_events); } @@ -1341,6 +1357,7 @@ ngx_quic_handle_stop_sending_frame(ngx_c wev = qs->connection->write; if (wev->active) { + wev->ready = 1; ngx_post_event(wev, &ngx_posted_events); } @@ -1413,11 +1430,9 @@ static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) { uint64_t len; - ngx_event_t *rev; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - rev = c->read; qs = c->quic; qc = ngx_quic_get_connection(qs->parent); @@ -1434,7 +1449,9 @@ ngx_quic_control_flow(ngx_connection_t * qs->recv_last += len; - if (!rev->error && qs->recv_last > qs->recv_max_data) { + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV + && qs->recv_last > qs->recv_max_data) + { qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; return NGX_ERROR; } @@ -1454,12 +1471,10 @@ static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) { uint64_t len; - ngx_event_t *rev; ngx_connection_t *pc; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - rev = c->read; qs = c->quic; pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -1475,9 +1490,7 @@ ngx_quic_update_flow(ngx_connection_t *c qs->recv_offset += len; - if (!rev->pending_eof && !rev->error - && qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) - { + if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) { if (ngx_quic_update_max_stream_data(c) != NGX_OK) { return NGX_ERROR; } @@ -1510,6 +1523,10 @@ ngx_quic_update_max_stream_data(ngx_conn pc = qs->parent; qc = ngx_quic_get_connection(pc); + if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV) { + return NGX_OK; + } + recv_max_data = qs->recv_offset + qs->recv_window; if (qs->recv_max_data == recv_max_data) {