Mercurial > hg > nginx
diff src/event/quic/ngx_event_quic_streams.c @ 8990:b42a041d23a2 quic
QUIC: introduced explicit stream states.
This allows to eliminate the usage of stream connection event flags for tracking
stream state.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Mon, 31 Jan 2022 09:46:02 +0300 |
parents | 6434160b4b78 |
children | 7626aa7a2156 |
line wrap: on
line diff
--- a/src/event/quic/ngx_event_quic_streams.c +++ b/src/event/quic/ngx_event_quic_streams.c @@ -192,12 +192,13 @@ ngx_quic_close_streams(ngx_connection_t { qs = (ngx_quic_stream_t *) node; + qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; + qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; + rev = qs->connection->read; - rev->error = 1; rev->ready = 1; wev = qs->connection->write; - wev->error = 1; wev->ready = 1; ngx_post_event(rev, &ngx_posted_events); @@ -221,19 +222,22 @@ ngx_quic_close_streams(ngx_connection_t ngx_int_t ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err) { - ngx_event_t *wev; ngx_connection_t *pc; ngx_quic_frame_t *frame; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - wev = c->write; + qs = c->quic; - if (wev->error) { + if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD + || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT + || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD) + { return NGX_OK; } - qs = c->quic; + qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; + pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -250,9 +254,6 @@ ngx_quic_reset_stream(ngx_connection_t * ngx_quic_queue_frame(qc, frame); - wev->error = 1; - wev->ready = 1; - return NGX_OK; } @@ -260,27 +261,15 @@ ngx_quic_reset_stream(ngx_connection_t * ngx_int_t ngx_quic_shutdown_stream(ngx_connection_t *c, int how) { - ngx_quic_stream_t *qs; - - qs = c->quic; - if (how == NGX_RDWR_SHUTDOWN || how == NGX_WRITE_SHUTDOWN) { - if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) - || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) - { - if (ngx_quic_shutdown_stream_send(c) != NGX_OK) { - return NGX_ERROR; - } + if (ngx_quic_shutdown_stream_send(c) != NGX_OK) { + return NGX_ERROR; } } if (how == NGX_RDWR_SHUTDOWN || how == NGX_READ_SHUTDOWN) { - if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0 - || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) - { - if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) { - return NGX_ERROR; - } + if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) { + return NGX_ERROR; } } @@ -291,19 +280,21 @@ ngx_quic_shutdown_stream(ngx_connection_ static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c) { - ngx_event_t *wev; ngx_connection_t *pc; ngx_quic_frame_t *frame; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - wev = c->write; + qs = c->quic; - if (wev->error) { + if (qs->send_state != NGX_QUIC_STREAM_SEND_READY + && qs->send_state != NGX_QUIC_STREAM_SEND_SEND) + { return NGX_OK; } - qs = c->quic; + qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; + pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -327,8 +318,6 @@ ngx_quic_shutdown_stream_send(ngx_connec ngx_quic_queue_frame(qc, frame); - wev->error = 1; - return NGX_OK; } @@ -336,19 +325,19 @@ ngx_quic_shutdown_stream_send(ngx_connec static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c) { - ngx_event_t *rev; ngx_connection_t *pc; ngx_quic_frame_t *frame; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - rev = c->read; + qs = c->quic; - if (rev->pending_eof || rev->error) { + if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV + && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) + { return NGX_OK; } - qs = c->quic; pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -371,8 +360,6 @@ ngx_quic_shutdown_stream_recv(ngx_connec ngx_quic_queue_frame(qc, frame); - rev->error = 1; - return NGX_OK; } @@ -690,9 +677,13 @@ ngx_quic_create_stream(ngx_connection_t if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) { if (id & NGX_QUIC_STREAM_SERVER_INITIATED) { qs->send_max_data = qc->ctp.initial_max_stream_data_uni; + qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ; + qs->send_state = NGX_QUIC_STREAM_SEND_READY; } else { qs->recv_max_data = qc->tp.initial_max_stream_data_uni; + qs->recv_state = NGX_QUIC_STREAM_RECV_RECV; + qs->send_state = NGX_QUIC_STREAM_SEND_DATA_RECVD; } } else { @@ -704,6 +695,9 @@ ngx_quic_create_stream(ngx_connection_t qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local; qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote; } + + qs->recv_state = NGX_QUIC_STREAM_RECV_RECV; + qs->send_state = NGX_QUIC_STREAM_SEND_READY; } qs->recv_window = qs->recv_max_data; @@ -744,25 +738,19 @@ ngx_quic_stream_recv(ngx_connection_t *c pc = qs->parent; rev = c->read; - if (rev->error) { + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD + || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ) + { + qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_READ; + rev->error = 1; return NGX_ERROR; } - ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic stream id:0x%xL recv eof:%d buf:%uz", - qs->id, rev->pending_eof, size); - - if (qs->in == NULL || qs->in->buf->sync) { - rev->ready = 0; + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic stream id:0x%xL recv buf:%uz", qs->id, size); - if (qs->recv_offset == qs->final_size) { - rev->eof = 1; - return 0; - } - - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic stream id:0x%xL recv() not ready", qs->id); - return NGX_AGAIN; + if (size == 0) { + return 0; } in = ngx_quic_read_chain(pc, &qs->in, size); @@ -780,8 +768,23 @@ ngx_quic_stream_recv(ngx_connection_t *c ngx_quic_free_chain(pc, in); - if (qs->in == NULL) { - rev->ready = rev->pending_eof; + if (len == 0) { + rev->ready = 0; + + if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN + && qs->recv_offset == qs->final_size) + { + qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ; + } + + if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_READ) { + rev->eof = 1; + return 0; + } + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic stream id:0x%xL recv() not ready", qs->id); + return NGX_AGAIN; } ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, @@ -839,10 +842,15 @@ ngx_quic_stream_send_chain(ngx_connectio qc = ngx_quic_get_connection(pc); wev = c->write; - if (wev->error) { + if (qs->send_state != NGX_QUIC_STREAM_SEND_READY + && qs->send_state != NGX_QUIC_STREAM_SEND_SEND) + { + wev->error = 1; return NGX_CHAIN_ERROR; } + qs->send_state = NGX_QUIC_STREAM_SEND_SEND; + flow = ngx_quic_max_stream_flow(c); if (flow == 0) { wev->ready = 0; @@ -1051,9 +1059,9 @@ ngx_quic_handle_stream_frame(ngx_connect sc = qs->connection; - rev = sc->read; - - if (rev->error) { + if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV + && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) + { return NGX_OK; } @@ -1086,8 +1094,8 @@ ngx_quic_handle_stream_frame(ngx_connect return NGX_ERROR; } - rev->pending_eof = 1; qs->final_size = last; + qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN; } if (ngx_quic_write_chain(c, &qs->in, frame->data, f->length, @@ -1098,6 +1106,7 @@ ngx_quic_handle_stream_frame(ngx_connect } if (f->offset == qs->recv_offset) { + rev = sc->read; rev->ready = 1; if (rev->active) { @@ -1273,11 +1282,15 @@ ngx_quic_handle_reset_stream_frame(ngx_c return NGX_OK; } - sc = qs->connection; + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD + || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ) + { + return NGX_OK; + } - rev = sc->read; - rev->error = 1; - rev->ready = 1; + qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; + + sc = qs->connection; if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { return NGX_ERROR; @@ -1299,6 +1312,9 @@ ngx_quic_handle_reset_stream_frame(ngx_c return NGX_ERROR; } + rev = sc->read; + rev->ready = 1; + if (rev->active) { ngx_post_event(rev, &ngx_posted_events); } @@ -1341,6 +1357,7 @@ ngx_quic_handle_stop_sending_frame(ngx_c wev = qs->connection->write; if (wev->active) { + wev->ready = 1; ngx_post_event(wev, &ngx_posted_events); } @@ -1413,11 +1430,9 @@ static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) { uint64_t len; - ngx_event_t *rev; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - rev = c->read; qs = c->quic; qc = ngx_quic_get_connection(qs->parent); @@ -1434,7 +1449,9 @@ ngx_quic_control_flow(ngx_connection_t * qs->recv_last += len; - if (!rev->error && qs->recv_last > qs->recv_max_data) { + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV + && qs->recv_last > qs->recv_max_data) + { qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; return NGX_ERROR; } @@ -1454,12 +1471,10 @@ static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) { uint64_t len; - ngx_event_t *rev; ngx_connection_t *pc; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - rev = c->read; qs = c->quic; pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -1475,9 +1490,7 @@ ngx_quic_update_flow(ngx_connection_t *c qs->recv_offset += len; - if (!rev->pending_eof && !rev->error - && qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) - { + if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) { if (ngx_quic_update_max_stream_data(c) != NGX_OK) { return NGX_ERROR; } @@ -1510,6 +1523,10 @@ ngx_quic_update_max_stream_data(ngx_conn pc = qs->parent; qc = ngx_quic_get_connection(pc); + if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV) { + return NGX_OK; + } + recv_max_data = qs->recv_offset + qs->recv_window; if (qs->recv_max_data == recv_max_data) {