Mercurial > hg > nginx
comparison src/event/quic/ngx_event_quic_streams.c @ 9010:a5aebd51e4c7 quic
QUIC: stream lingering.
Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
can persist after connection is closed by application. During this period,
server is expecting stream final size from client for correct flow control.
Also, buffered output is sent to client as more flow control credit is granted.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Sat, 05 Feb 2022 12:54:54 +0300 |
parents | e56a05d6dbd1 |
children | f9c788f3f5cc |
comparison
equal
deleted
inserted
replaced
9009:e5f16d886c97 | 9010:a5aebd51e4c7 |
---|---|
11 | 11 |
12 | 12 |
13 #define NGX_QUIC_STREAM_GONE (void *) -1 | 13 #define NGX_QUIC_STREAM_GONE (void *) -1 |
14 | 14 |
15 | 15 |
16 static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, | |
17 ngx_uint_t err); | |
16 static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c); | 18 static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c); |
17 static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c); | 19 static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c); |
18 static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id); | 20 static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id); |
19 static ngx_int_t ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id); | 21 static ngx_int_t ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id); |
20 static void ngx_quic_init_stream_handler(ngx_event_t *ev); | 22 static void ngx_quic_init_stream_handler(ngx_event_t *ev); |
26 size_t size); | 28 size_t size); |
27 static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, | 29 static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, |
28 size_t size); | 30 size_t size); |
29 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, | 31 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, |
30 ngx_chain_t *in, off_t limit); | 32 ngx_chain_t *in, off_t limit); |
31 static size_t ngx_quic_max_stream_flow(ngx_connection_t *c); | 33 static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs); |
32 static void ngx_quic_stream_cleanup_handler(void *data); | 34 static void ngx_quic_stream_cleanup_handler(void *data); |
33 static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last); | 35 static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs); |
34 static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last); | 36 static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last); |
35 static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c); | 37 static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last); |
38 static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs); | |
36 static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c); | 39 static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c); |
37 static void ngx_quic_set_event(ngx_event_t *ev); | 40 static void ngx_quic_set_event(ngx_event_t *ev); |
38 | 41 |
39 | 42 |
40 ngx_connection_t * | 43 ngx_connection_t * |
184 | 187 |
185 #if (NGX_DEBUG) | 188 #if (NGX_DEBUG) |
186 ns = 0; | 189 ns = 0; |
187 #endif | 190 #endif |
188 | 191 |
189 for (node = ngx_rbtree_min(tree->root, tree->sentinel); | 192 node = ngx_rbtree_min(tree->root, tree->sentinel); |
190 node; | 193 |
191 node = ngx_rbtree_next(tree, node)) | 194 while (node) { |
192 { | |
193 qs = (ngx_quic_stream_t *) node; | 195 qs = (ngx_quic_stream_t *) node; |
196 node = ngx_rbtree_next(tree, node); | |
194 | 197 |
195 qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; | 198 qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; |
196 qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; | 199 qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; |
200 | |
201 if (qs->connection == NULL) { | |
202 ngx_quic_close_stream(qs); | |
203 continue; | |
204 } | |
197 | 205 |
198 ngx_quic_set_event(qs->connection->read); | 206 ngx_quic_set_event(qs->connection->read); |
199 ngx_quic_set_event(qs->connection->write); | 207 ngx_quic_set_event(qs->connection->write); |
200 | 208 |
201 #if (NGX_DEBUG) | 209 #if (NGX_DEBUG) |
211 | 219 |
212 | 220 |
213 ngx_int_t | 221 ngx_int_t |
214 ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err) | 222 ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err) |
215 { | 223 { |
224 return ngx_quic_do_reset_stream(c->quic, err); | |
225 } | |
226 | |
227 | |
228 static ngx_int_t | |
229 ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err) | |
230 { | |
216 ngx_connection_t *pc; | 231 ngx_connection_t *pc; |
217 ngx_quic_frame_t *frame; | 232 ngx_quic_frame_t *frame; |
218 ngx_quic_stream_t *qs; | 233 ngx_quic_connection_t *qc; |
219 ngx_quic_connection_t *qc; | |
220 | |
221 qs = c->quic; | |
222 | 234 |
223 if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD | 235 if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD |
224 || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT | 236 || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT |
225 || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD) | 237 || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD) |
226 { | 238 { |
227 return NGX_OK; | 239 return NGX_OK; |
228 } | 240 } |
229 | 241 |
230 qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; | 242 qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; |
243 qs->send_final_size = qs->send_offset; | |
231 | 244 |
232 pc = qs->parent; | 245 pc = qs->parent; |
233 qc = ngx_quic_get_connection(pc); | 246 qc = ngx_quic_get_connection(pc); |
247 | |
248 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, | |
249 "quic stream id:0x%xL reset", qs->id); | |
234 | 250 |
235 frame = ngx_quic_alloc_frame(pc); | 251 frame = ngx_quic_alloc_frame(pc); |
236 if (frame == NULL) { | 252 if (frame == NULL) { |
237 return NGX_ERROR; | 253 return NGX_ERROR; |
238 } | 254 } |
239 | 255 |
240 frame->level = ssl_encryption_application; | 256 frame->level = ssl_encryption_application; |
241 frame->type = NGX_QUIC_FT_RESET_STREAM; | 257 frame->type = NGX_QUIC_FT_RESET_STREAM; |
242 frame->u.reset_stream.id = qs->id; | 258 frame->u.reset_stream.id = qs->id; |
243 frame->u.reset_stream.error_code = err; | 259 frame->u.reset_stream.error_code = err; |
244 frame->u.reset_stream.final_size = c->sent; | 260 frame->u.reset_stream.final_size = qs->send_offset; |
245 | 261 |
246 ngx_quic_queue_frame(qc, frame); | 262 ngx_quic_queue_frame(qc, frame); |
263 | |
264 ngx_quic_free_chain(pc, qs->out); | |
265 qs->out = NULL; | |
247 | 266 |
248 return NGX_OK; | 267 return NGX_OK; |
249 } | 268 } |
250 | 269 |
251 | 270 |
269 | 288 |
270 | 289 |
271 static ngx_int_t | 290 static ngx_int_t |
272 ngx_quic_shutdown_stream_send(ngx_connection_t *c) | 291 ngx_quic_shutdown_stream_send(ngx_connection_t *c) |
273 { | 292 { |
293 ngx_quic_stream_t *qs; | |
294 | |
295 qs = c->quic; | |
296 | |
297 if (qs->send_state != NGX_QUIC_STREAM_SEND_READY | |
298 && qs->send_state != NGX_QUIC_STREAM_SEND_SEND) | |
299 { | |
300 return NGX_OK; | |
301 } | |
302 | |
303 qs->send_state = NGX_QUIC_STREAM_SEND_SEND; | |
304 qs->send_final_size = c->sent; | |
305 | |
306 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0, | |
307 "quic stream id:0x%xL send shutdown", qs->id); | |
308 | |
309 return ngx_quic_stream_flush(qs); | |
310 } | |
311 | |
312 | |
313 static ngx_int_t | |
314 ngx_quic_shutdown_stream_recv(ngx_connection_t *c) | |
315 { | |
274 ngx_connection_t *pc; | 316 ngx_connection_t *pc; |
275 ngx_quic_frame_t *frame; | 317 ngx_quic_frame_t *frame; |
276 ngx_quic_stream_t *qs; | 318 ngx_quic_stream_t *qs; |
277 ngx_quic_connection_t *qc; | 319 ngx_quic_connection_t *qc; |
278 | 320 |
279 qs = c->quic; | 321 qs = c->quic; |
280 | 322 |
281 if (qs->send_state != NGX_QUIC_STREAM_SEND_READY | 323 if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV |
282 && qs->send_state != NGX_QUIC_STREAM_SEND_SEND) | 324 && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) |
283 { | 325 { |
284 return NGX_OK; | 326 return NGX_OK; |
285 } | 327 } |
286 | |
287 qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; | |
288 | 328 |
289 pc = qs->parent; | 329 pc = qs->parent; |
290 qc = ngx_quic_get_connection(pc); | 330 qc = ngx_quic_get_connection(pc); |
291 | 331 |
332 if (qc->conf->stream_close_code == 0) { | |
333 return NGX_OK; | |
334 } | |
335 | |
292 frame = ngx_quic_alloc_frame(pc); | 336 frame = ngx_quic_alloc_frame(pc); |
293 if (frame == NULL) { | 337 if (frame == NULL) { |
294 return NGX_ERROR; | 338 return NGX_ERROR; |
295 } | 339 } |
296 | 340 |
297 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | 341 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, |
298 "quic stream id:0x%xL send shutdown", qs->id); | |
299 | |
300 frame->level = ssl_encryption_application; | |
301 frame->type = NGX_QUIC_FT_STREAM; | |
302 frame->u.stream.off = 1; | |
303 frame->u.stream.len = 1; | |
304 frame->u.stream.fin = 1; | |
305 | |
306 frame->u.stream.stream_id = qs->id; | |
307 frame->u.stream.offset = c->sent; | |
308 frame->u.stream.length = 0; | |
309 | |
310 ngx_quic_queue_frame(qc, frame); | |
311 | |
312 return NGX_OK; | |
313 } | |
314 | |
315 | |
316 static ngx_int_t | |
317 ngx_quic_shutdown_stream_recv(ngx_connection_t *c) | |
318 { | |
319 ngx_connection_t *pc; | |
320 ngx_quic_frame_t *frame; | |
321 ngx_quic_stream_t *qs; | |
322 ngx_quic_connection_t *qc; | |
323 | |
324 qs = c->quic; | |
325 | |
326 if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV | |
327 && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) | |
328 { | |
329 return NGX_OK; | |
330 } | |
331 | |
332 pc = qs->parent; | |
333 qc = ngx_quic_get_connection(pc); | |
334 | |
335 if (qc->conf->stream_close_code == 0) { | |
336 return NGX_OK; | |
337 } | |
338 | |
339 frame = ngx_quic_alloc_frame(pc); | |
340 if (frame == NULL) { | |
341 return NGX_ERROR; | |
342 } | |
343 | |
344 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
345 "quic stream id:0x%xL recv shutdown", qs->id); | 342 "quic stream id:0x%xL recv shutdown", qs->id); |
346 | 343 |
347 frame->level = ssl_encryption_application; | 344 frame->level = ssl_encryption_application; |
348 frame->type = NGX_QUIC_FT_STOP_SENDING; | 345 frame->type = NGX_QUIC_FT_STOP_SENDING; |
349 frame->u.stop_sending.id = qs->id; | 346 frame->u.stop_sending.id = qs->id; |
589 static ngx_quic_stream_t * | 586 static ngx_quic_stream_t * |
590 ngx_quic_create_stream(ngx_connection_t *c, uint64_t id) | 587 ngx_quic_create_stream(ngx_connection_t *c, uint64_t id) |
591 { | 588 { |
592 ngx_log_t *log; | 589 ngx_log_t *log; |
593 ngx_pool_t *pool; | 590 ngx_pool_t *pool; |
591 ngx_queue_t *q; | |
594 ngx_connection_t *sc; | 592 ngx_connection_t *sc; |
595 ngx_quic_stream_t *qs; | 593 ngx_quic_stream_t *qs; |
596 ngx_pool_cleanup_t *cln; | 594 ngx_pool_cleanup_t *cln; |
597 ngx_quic_connection_t *qc; | 595 ngx_quic_connection_t *qc; |
598 | 596 |
599 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | 597 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, |
600 "quic stream id:0x%xL create", id); | 598 "quic stream id:0x%xL create", id); |
601 | 599 |
602 qc = ngx_quic_get_connection(c); | 600 qc = ngx_quic_get_connection(c); |
603 | 601 |
604 pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log); | 602 if (!ngx_queue_empty(&qc->streams.free)) { |
605 if (pool == NULL) { | 603 q = ngx_queue_head(&qc->streams.free); |
606 return NULL; | 604 qs = ngx_queue_data(q, ngx_quic_stream_t, queue); |
607 } | 605 ngx_queue_remove(&qs->queue); |
608 | 606 |
609 qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t)); | 607 } else { |
610 if (qs == NULL) { | 608 /* |
611 ngx_destroy_pool(pool); | 609 * the number of streams is limited by transport |
612 return NULL; | 610 * parameters and application requirements |
613 } | 611 */ |
612 | |
613 qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t)); | |
614 if (qs == NULL) { | |
615 return NULL; | |
616 } | |
617 } | |
618 | |
619 ngx_memzero(qs, sizeof(ngx_quic_stream_t)); | |
614 | 620 |
615 qs->node.key = id; | 621 qs->node.key = id; |
616 qs->parent = c; | 622 qs->parent = c; |
617 qs->id = id; | 623 qs->id = id; |
618 qs->final_size = (uint64_t) -1; | 624 qs->send_final_size = (uint64_t) -1; |
625 qs->recv_final_size = (uint64_t) -1; | |
626 | |
627 pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log); | |
628 if (pool == NULL) { | |
629 ngx_queue_insert_tail(&qc->streams.free, &qs->queue); | |
630 return NULL; | |
631 } | |
619 | 632 |
620 log = ngx_palloc(pool, sizeof(ngx_log_t)); | 633 log = ngx_palloc(pool, sizeof(ngx_log_t)); |
621 if (log == NULL) { | 634 if (log == NULL) { |
622 ngx_destroy_pool(pool); | 635 ngx_destroy_pool(pool); |
636 ngx_queue_insert_tail(&qc->streams.free, &qs->queue); | |
623 return NULL; | 637 return NULL; |
624 } | 638 } |
625 | 639 |
626 *log = *c->log; | 640 *log = *c->log; |
627 pool->log = log; | 641 pool->log = log; |
628 | 642 |
629 sc = ngx_get_connection(c->fd, log); | 643 sc = ngx_get_connection(c->fd, log); |
630 if (sc == NULL) { | 644 if (sc == NULL) { |
631 ngx_destroy_pool(pool); | 645 ngx_destroy_pool(pool); |
646 ngx_queue_insert_tail(&qc->streams.free, &qs->queue); | |
632 return NULL; | 647 return NULL; |
633 } | 648 } |
634 | 649 |
635 qs->connection = sc; | 650 qs->connection = sc; |
636 | 651 |
695 | 710 |
696 cln = ngx_pool_cleanup_add(pool, 0); | 711 cln = ngx_pool_cleanup_add(pool, 0); |
697 if (cln == NULL) { | 712 if (cln == NULL) { |
698 ngx_close_connection(sc); | 713 ngx_close_connection(sc); |
699 ngx_destroy_pool(pool); | 714 ngx_destroy_pool(pool); |
715 ngx_queue_insert_tail(&qc->streams.free, &qs->queue); | |
700 return NULL; | 716 return NULL; |
701 } | 717 } |
702 | 718 |
703 cln->handler = ngx_quic_stream_cleanup_handler; | 719 cln->handler = ngx_quic_stream_cleanup_handler; |
704 cln->data = sc; | 720 cln->data = sc; |
735 qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_READ; | 751 qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_READ; |
736 rev->error = 1; | 752 rev->error = 1; |
737 return NGX_ERROR; | 753 return NGX_ERROR; |
738 } | 754 } |
739 | 755 |
740 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, | 756 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, |
741 "quic stream id:0x%xL recv buf:%uz", qs->id, size); | 757 "quic stream id:0x%xL recv buf:%uz", qs->id, size); |
742 | 758 |
743 if (size == 0) { | 759 if (size == 0) { |
744 return 0; | 760 return 0; |
745 } | 761 } |
761 | 777 |
762 if (len == 0) { | 778 if (len == 0) { |
763 rev->ready = 0; | 779 rev->ready = 0; |
764 | 780 |
765 if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD | 781 if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD |
766 && qs->recv_offset == qs->final_size) | 782 && qs->recv_offset == qs->recv_final_size) |
767 { | 783 { |
768 qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ; | 784 qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ; |
769 } | 785 } |
770 | 786 |
771 if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_READ) { | 787 if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_READ) { |
779 } | 795 } |
780 | 796 |
781 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, | 797 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, |
782 "quic stream id:0x%xL recv len:%z", qs->id, len); | 798 "quic stream id:0x%xL recv len:%z", qs->id, len); |
783 | 799 |
784 if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) { | 800 if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) { |
785 return NGX_ERROR; | 801 return NGX_ERROR; |
786 } | 802 } |
787 | 803 |
788 return len; | 804 return len; |
789 } | 805 } |
820 ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) | 836 ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) |
821 { | 837 { |
822 off_t flow; | 838 off_t flow; |
823 size_t n; | 839 size_t n; |
824 ngx_event_t *wev; | 840 ngx_event_t *wev; |
825 ngx_chain_t *out; | |
826 ngx_connection_t *pc; | 841 ngx_connection_t *pc; |
827 ngx_quic_frame_t *frame; | |
828 ngx_quic_stream_t *qs; | 842 ngx_quic_stream_t *qs; |
829 ngx_quic_connection_t *qc; | 843 ngx_quic_connection_t *qc; |
830 | 844 |
831 qs = c->quic; | 845 qs = c->quic; |
832 pc = qs->parent; | 846 pc = qs->parent; |
840 return NGX_CHAIN_ERROR; | 854 return NGX_CHAIN_ERROR; |
841 } | 855 } |
842 | 856 |
843 qs->send_state = NGX_QUIC_STREAM_SEND_SEND; | 857 qs->send_state = NGX_QUIC_STREAM_SEND_SEND; |
844 | 858 |
845 flow = ngx_quic_max_stream_flow(c); | 859 flow = qs->acked + qc->conf->stream_buffer_size - c->sent; |
860 | |
846 if (flow == 0) { | 861 if (flow == 0) { |
847 wev->ready = 0; | 862 wev->ready = 0; |
848 return in; | 863 return in; |
849 } | 864 } |
850 | 865 |
851 if (limit == 0 || limit > flow) { | 866 if (limit == 0 || limit > flow) { |
852 limit = flow; | 867 limit = flow; |
853 } | 868 } |
854 | 869 |
855 in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n); | 870 in = ngx_quic_write_chain(pc, &qs->out, in, limit, |
871 c->sent - qs->send_offset, &n); | |
856 if (in == NGX_CHAIN_ERROR) { | 872 if (in == NGX_CHAIN_ERROR) { |
857 return NGX_CHAIN_ERROR; | 873 return NGX_CHAIN_ERROR; |
858 } | 874 } |
859 | 875 |
860 out = ngx_quic_read_chain(pc, &qs->out, n); | 876 c->sent += n; |
877 qc->streams.sent += n; | |
878 | |
879 if (flow == (off_t) n) { | |
880 wev->ready = 0; | |
881 } | |
882 | |
883 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
884 "quic send_chain sent:%uz", n); | |
885 | |
886 if (ngx_quic_stream_flush(qs) != NGX_OK) { | |
887 return NGX_CHAIN_ERROR; | |
888 } | |
889 | |
890 return in; | |
891 } | |
892 | |
893 | |
894 static ngx_int_t | |
895 ngx_quic_stream_flush(ngx_quic_stream_t *qs) | |
896 { | |
897 off_t limit; | |
898 size_t len; | |
899 ngx_uint_t last; | |
900 ngx_chain_t *out, *cl; | |
901 ngx_quic_frame_t *frame; | |
902 ngx_connection_t *pc; | |
903 ngx_quic_connection_t *qc; | |
904 | |
905 if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) { | |
906 return NGX_OK; | |
907 } | |
908 | |
909 pc = qs->parent; | |
910 qc = ngx_quic_get_connection(pc); | |
911 | |
912 if (qc->streams.send_max_data == 0) { | |
913 qc->streams.send_max_data = qc->ctp.initial_max_data; | |
914 } | |
915 | |
916 limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset, | |
917 qs->send_max_data - qs->send_offset); | |
918 | |
919 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, | |
920 "quic stream id:0x%xL flush limit:%O", qs->id, limit); | |
921 | |
922 out = ngx_quic_read_chain(pc, &qs->out, limit); | |
861 if (out == NGX_CHAIN_ERROR) { | 923 if (out == NGX_CHAIN_ERROR) { |
862 return NGX_CHAIN_ERROR; | 924 return NGX_ERROR; |
925 } | |
926 | |
927 len = 0; | |
928 last = 0; | |
929 | |
930 for (cl = out; cl; cl = cl->next) { | |
931 len += cl->buf->last - cl->buf->pos; | |
932 } | |
933 | |
934 if (qs->send_final_size != (uint64_t) -1 | |
935 && qs->send_final_size == qs->send_offset + len) | |
936 { | |
937 qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; | |
938 last = 1; | |
939 } | |
940 | |
941 if (len == 0 && !last) { | |
942 return NGX_OK; | |
863 } | 943 } |
864 | 944 |
865 frame = ngx_quic_alloc_frame(pc); | 945 frame = ngx_quic_alloc_frame(pc); |
866 if (frame == NULL) { | 946 if (frame == NULL) { |
867 return NGX_CHAIN_ERROR; | 947 return NGX_ERROR; |
868 } | 948 } |
869 | 949 |
870 frame->level = ssl_encryption_application; | 950 frame->level = ssl_encryption_application; |
871 frame->type = NGX_QUIC_FT_STREAM; | 951 frame->type = NGX_QUIC_FT_STREAM; |
872 frame->data = out; | 952 frame->data = out; |
953 | |
873 frame->u.stream.off = 1; | 954 frame->u.stream.off = 1; |
874 frame->u.stream.len = 1; | 955 frame->u.stream.len = 1; |
875 frame->u.stream.fin = 0; | 956 frame->u.stream.fin = last; |
876 | 957 |
877 frame->u.stream.stream_id = qs->id; | 958 frame->u.stream.stream_id = qs->id; |
878 frame->u.stream.offset = c->sent; | 959 frame->u.stream.offset = qs->send_offset; |
879 frame->u.stream.length = n; | 960 frame->u.stream.length = len; |
880 | |
881 c->sent += n; | |
882 qc->streams.sent += n; | |
883 | 961 |
884 ngx_quic_queue_frame(qc, frame); | 962 ngx_quic_queue_frame(qc, frame); |
885 | 963 |
886 if (flow == (off_t) n) { | 964 qs->send_offset += len; |
887 wev->ready = 0; | 965 qc->streams.send_offset += len; |
888 } | 966 |
889 | 967 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0, |
890 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | 968 "quic stream id:0x%xL flush len:%uz last:%ui", |
891 "quic send_chain sent:%uz", n); | 969 qs->id, len, last); |
892 | 970 |
893 return in; | 971 if (qs->connection == NULL) { |
894 } | 972 return ngx_quic_close_stream(qs); |
895 | 973 } |
896 | 974 |
897 static size_t | 975 return NGX_OK; |
898 ngx_quic_max_stream_flow(ngx_connection_t *c) | |
899 { | |
900 size_t size; | |
901 uint64_t sent, unacked; | |
902 ngx_quic_stream_t *qs; | |
903 ngx_quic_connection_t *qc; | |
904 | |
905 qs = c->quic; | |
906 qc = ngx_quic_get_connection(qs->parent); | |
907 | |
908 size = qc->conf->stream_buffer_size; | |
909 sent = c->sent; | |
910 unacked = sent - qs->acked; | |
911 | |
912 if (qc->streams.send_max_data == 0) { | |
913 qc->streams.send_max_data = qc->ctp.initial_max_data; | |
914 } | |
915 | |
916 if (unacked >= size) { | |
917 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
918 "quic send flow hit buffer size"); | |
919 return 0; | |
920 } | |
921 | |
922 size -= unacked; | |
923 | |
924 if (qc->streams.sent >= qc->streams.send_max_data) { | |
925 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
926 "quic send flow hit MAX_DATA"); | |
927 return 0; | |
928 } | |
929 | |
930 if (qc->streams.sent + size > qc->streams.send_max_data) { | |
931 size = qc->streams.send_max_data - qc->streams.sent; | |
932 } | |
933 | |
934 if (sent >= qs->send_max_data) { | |
935 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
936 "quic send flow hit MAX_STREAM_DATA"); | |
937 return 0; | |
938 } | |
939 | |
940 if (sent + size > qs->send_max_data) { | |
941 size = qs->send_max_data - sent; | |
942 } | |
943 | |
944 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
945 "quic send flow:%uz", size); | |
946 | |
947 return size; | |
948 } | 976 } |
949 | 977 |
950 | 978 |
951 static void | 979 static void |
952 ngx_quic_stream_cleanup_handler(void *data) | 980 ngx_quic_stream_cleanup_handler(void *data) |
953 { | 981 { |
954 ngx_connection_t *c = data; | 982 ngx_connection_t *c = data; |
955 | 983 |
984 ngx_quic_stream_t *qs; | |
985 | |
986 qs = c->quic; | |
987 | |
988 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0, | |
989 "quic stream id:0x%xL cleanup", qs->id); | |
990 | |
991 if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) { | |
992 ngx_quic_close_connection(c, NGX_ERROR); | |
993 return; | |
994 } | |
995 | |
996 qs->connection = NULL; | |
997 | |
998 if (ngx_quic_close_stream(qs) != NGX_OK) { | |
999 ngx_quic_close_connection(c, NGX_ERROR); | |
1000 return; | |
1001 } | |
1002 } | |
1003 | |
1004 | |
1005 static ngx_int_t | |
1006 ngx_quic_close_stream(ngx_quic_stream_t *qs) | |
1007 { | |
956 ngx_connection_t *pc; | 1008 ngx_connection_t *pc; |
957 ngx_quic_frame_t *frame; | 1009 ngx_quic_frame_t *frame; |
958 ngx_quic_stream_t *qs; | 1010 ngx_quic_connection_t *qc; |
959 ngx_quic_connection_t *qc; | 1011 |
960 | |
961 qs = c->quic; | |
962 pc = qs->parent; | 1012 pc = qs->parent; |
963 qc = ngx_quic_get_connection(pc); | 1013 qc = ngx_quic_get_connection(pc); |
964 | 1014 |
965 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | 1015 if (!qc->closing) { |
966 "quic stream id:0x%xL cleanup", qs->id); | 1016 /* make sure everything is sent and final size is received */ |
967 | 1017 |
968 ngx_rbtree_delete(&qc->streams.tree, &qs->node); | 1018 if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV |
1019 || qs->send_state == NGX_QUIC_STREAM_SEND_READY | |
1020 || qs->send_state == NGX_QUIC_STREAM_SEND_SEND) | |
1021 { | |
1022 return NGX_OK; | |
1023 } | |
1024 } | |
1025 | |
1026 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, | |
1027 "quic stream id:0x%xL close", qs->id); | |
1028 | |
969 ngx_quic_free_chain(pc, qs->in); | 1029 ngx_quic_free_chain(pc, qs->in); |
970 ngx_quic_free_chain(pc, qs->out); | 1030 ngx_quic_free_chain(pc, qs->out); |
1031 | |
1032 ngx_rbtree_delete(&qc->streams.tree, &qs->node); | |
1033 ngx_queue_insert_tail(&qc->streams.free, &qs->queue); | |
971 | 1034 |
972 if (qc->closing) { | 1035 if (qc->closing) { |
973 /* schedule handler call to continue ngx_quic_close_connection() */ | 1036 /* schedule handler call to continue ngx_quic_close_connection() */ |
974 ngx_post_event(pc->read, &ngx_posted_events); | 1037 ngx_post_event(pc->read, &ngx_posted_events); |
975 return; | 1038 return NGX_OK; |
976 } | 1039 } |
977 | |
978 if (qc->error) { | |
979 goto done; | |
980 } | |
981 | |
982 (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN); | |
983 | |
984 (void) ngx_quic_update_flow(c, qs->recv_last); | |
985 | 1040 |
986 if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) { | 1041 if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) { |
987 frame = ngx_quic_alloc_frame(pc); | 1042 frame = ngx_quic_alloc_frame(pc); |
988 if (frame == NULL) { | 1043 if (frame == NULL) { |
989 goto done; | 1044 return NGX_ERROR; |
990 } | 1045 } |
991 | 1046 |
992 frame->level = ssl_encryption_application; | 1047 frame->level = ssl_encryption_application; |
993 frame->type = NGX_QUIC_FT_MAX_STREAMS; | 1048 frame->type = NGX_QUIC_FT_MAX_STREAMS; |
994 | 1049 |
1002 } | 1057 } |
1003 | 1058 |
1004 ngx_quic_queue_frame(qc, frame); | 1059 ngx_quic_queue_frame(qc, frame); |
1005 } | 1060 } |
1006 | 1061 |
1007 done: | |
1008 | |
1009 (void) ngx_quic_output(pc); | |
1010 | |
1011 if (qc->shutdown) { | 1062 if (qc->shutdown) { |
1012 ngx_post_event(pc->read, &ngx_posted_events); | 1063 ngx_post_event(pc->read, &ngx_posted_events); |
1013 } | 1064 } |
1065 | |
1066 return NGX_OK; | |
1014 } | 1067 } |
1015 | 1068 |
1016 | 1069 |
1017 ngx_int_t | 1070 ngx_int_t |
1018 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, | 1071 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, |
1019 ngx_quic_frame_t *frame) | 1072 ngx_quic_frame_t *frame) |
1020 { | 1073 { |
1021 size_t size; | 1074 size_t size; |
1022 uint64_t last; | 1075 uint64_t last; |
1023 ngx_connection_t *sc; | |
1024 ngx_quic_stream_t *qs; | 1076 ngx_quic_stream_t *qs; |
1025 ngx_quic_connection_t *qc; | 1077 ngx_quic_connection_t *qc; |
1026 ngx_quic_stream_frame_t *f; | 1078 ngx_quic_stream_frame_t *f; |
1027 | 1079 |
1028 qc = ngx_quic_get_connection(c); | 1080 qc = ngx_quic_get_connection(c); |
1046 | 1098 |
1047 if (qs == NGX_QUIC_STREAM_GONE) { | 1099 if (qs == NGX_QUIC_STREAM_GONE) { |
1048 return NGX_OK; | 1100 return NGX_OK; |
1049 } | 1101 } |
1050 | 1102 |
1051 sc = qs->connection; | |
1052 | |
1053 if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV | 1103 if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV |
1054 && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) | 1104 && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) |
1055 { | 1105 { |
1056 return NGX_OK; | 1106 return NGX_OK; |
1057 } | 1107 } |
1058 | 1108 |
1059 if (ngx_quic_control_flow(sc, last) != NGX_OK) { | 1109 if (ngx_quic_control_flow(qs, last) != NGX_OK) { |
1060 return NGX_ERROR; | 1110 return NGX_ERROR; |
1061 } | 1111 } |
1062 | 1112 |
1063 if (qs->final_size != (uint64_t) -1 && last > qs->final_size) { | 1113 if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) { |
1064 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | 1114 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; |
1065 return NGX_ERROR; | 1115 return NGX_ERROR; |
1066 } | 1116 } |
1067 | 1117 |
1068 if (last < qs->recv_offset) { | 1118 if (last < qs->recv_offset) { |
1073 ngx_quic_trim_chain(frame->data, qs->recv_offset - f->offset); | 1123 ngx_quic_trim_chain(frame->data, qs->recv_offset - f->offset); |
1074 f->offset = qs->recv_offset; | 1124 f->offset = qs->recv_offset; |
1075 } | 1125 } |
1076 | 1126 |
1077 if (f->fin) { | 1127 if (f->fin) { |
1078 if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { | 1128 if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last) |
1129 { | |
1079 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | 1130 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; |
1080 return NGX_ERROR; | 1131 return NGX_ERROR; |
1081 } | 1132 } |
1082 | 1133 |
1083 if (qs->recv_last > last) { | 1134 if (qs->recv_last > last) { |
1084 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | 1135 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; |
1085 return NGX_ERROR; | 1136 return NGX_ERROR; |
1086 } | 1137 } |
1087 | 1138 |
1088 qs->final_size = last; | 1139 qs->recv_final_size = last; |
1089 qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN; | 1140 qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN; |
1090 } | 1141 } |
1091 | 1142 |
1092 if (ngx_quic_write_chain(c, &qs->in, frame->data, f->length, | 1143 if (ngx_quic_write_chain(c, &qs->in, frame->data, f->length, |
1093 f->offset - qs->recv_offset, &size) | 1144 f->offset - qs->recv_offset, &size) |
1097 } | 1148 } |
1098 | 1149 |
1099 qs->recv_size += size; | 1150 qs->recv_size += size; |
1100 | 1151 |
1101 if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN | 1152 if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN |
1102 && qs->recv_size == qs->final_size) | 1153 && qs->recv_size == qs->recv_final_size) |
1103 { | 1154 { |
1104 qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD; | 1155 qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD; |
1105 } | 1156 } |
1106 | 1157 |
1158 if (qs->connection == NULL) { | |
1159 return ngx_quic_close_stream(qs); | |
1160 } | |
1161 | |
1107 if (f->offset == qs->recv_offset) { | 1162 if (f->offset == qs->recv_offset) { |
1108 ngx_quic_set_event(sc->read); | 1163 ngx_quic_set_event(qs->connection->read); |
1109 } | 1164 } |
1110 | 1165 |
1111 return NGX_OK; | 1166 return NGX_OK; |
1112 } | 1167 } |
1113 | 1168 |
1126 | 1181 |
1127 if (f->max_data <= qc->streams.send_max_data) { | 1182 if (f->max_data <= qc->streams.send_max_data) { |
1128 return NGX_OK; | 1183 return NGX_OK; |
1129 } | 1184 } |
1130 | 1185 |
1131 if (tree->root != tree->sentinel | 1186 if (tree->root == tree->sentinel |
1132 && qc->streams.sent >= qc->streams.send_max_data) | 1187 || qc->streams.send_offset < qc->streams.send_max_data) |
1133 { | 1188 { |
1134 | 1189 /* not blocked on MAX_DATA */ |
1135 for (node = ngx_rbtree_min(tree->root, tree->sentinel); | 1190 qc->streams.send_max_data = f->max_data; |
1136 node; | 1191 return NGX_OK; |
1137 node = ngx_rbtree_next(tree, node)) | |
1138 { | |
1139 qs = (ngx_quic_stream_t *) node; | |
1140 ngx_quic_set_event(qs->connection->write); | |
1141 } | |
1142 } | 1192 } |
1143 | 1193 |
1144 qc->streams.send_max_data = f->max_data; | 1194 qc->streams.send_max_data = f->max_data; |
1195 node = ngx_rbtree_min(tree->root, tree->sentinel); | |
1196 | |
1197 while (node && qc->streams.send_offset < qc->streams.send_max_data) { | |
1198 | |
1199 qs = (ngx_quic_stream_t *) node; | |
1200 node = ngx_rbtree_next(tree, node); | |
1201 | |
1202 if (ngx_quic_stream_flush(qs) != NGX_OK) { | |
1203 return NGX_ERROR; | |
1204 } | |
1205 } | |
1145 | 1206 |
1146 return NGX_OK; | 1207 return NGX_OK; |
1147 } | 1208 } |
1148 | 1209 |
1149 | 1210 |
1187 | 1248 |
1188 if (qs == NGX_QUIC_STREAM_GONE) { | 1249 if (qs == NGX_QUIC_STREAM_GONE) { |
1189 return NGX_OK; | 1250 return NGX_OK; |
1190 } | 1251 } |
1191 | 1252 |
1192 return ngx_quic_update_max_stream_data(qs->connection); | 1253 return ngx_quic_update_max_stream_data(qs); |
1193 } | 1254 } |
1194 | 1255 |
1195 | 1256 |
1196 ngx_int_t | 1257 ngx_int_t |
1197 ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c, | 1258 ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c, |
1198 ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f) | 1259 ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f) |
1199 { | 1260 { |
1200 uint64_t sent; | |
1201 ngx_quic_stream_t *qs; | 1261 ngx_quic_stream_t *qs; |
1202 ngx_quic_connection_t *qc; | 1262 ngx_quic_connection_t *qc; |
1203 | 1263 |
1204 qc = ngx_quic_get_connection(c); | 1264 qc = ngx_quic_get_connection(c); |
1205 | 1265 |
1222 | 1282 |
1223 if (f->limit <= qs->send_max_data) { | 1283 if (f->limit <= qs->send_max_data) { |
1224 return NGX_OK; | 1284 return NGX_OK; |
1225 } | 1285 } |
1226 | 1286 |
1227 sent = qs->connection->sent; | 1287 if (qs->send_offset < qs->send_max_data) { |
1228 | 1288 /* not blocked on MAX_STREAM_DATA */ |
1229 if (sent >= qs->send_max_data) { | 1289 qs->send_max_data = f->limit; |
1230 ngx_quic_set_event(qs->connection->write); | 1290 return NGX_OK; |
1231 } | 1291 } |
1232 | 1292 |
1233 qs->send_max_data = f->limit; | 1293 qs->send_max_data = f->limit; |
1234 | 1294 |
1235 return NGX_OK; | 1295 return ngx_quic_stream_flush(qs); |
1236 } | 1296 } |
1237 | 1297 |
1238 | 1298 |
1239 ngx_int_t | 1299 ngx_int_t |
1240 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, | 1300 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, |
1241 ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) | 1301 ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) |
1242 { | 1302 { |
1243 ngx_connection_t *sc; | |
1244 ngx_quic_stream_t *qs; | 1303 ngx_quic_stream_t *qs; |
1245 ngx_quic_connection_t *qc; | 1304 ngx_quic_connection_t *qc; |
1246 | 1305 |
1247 qc = ngx_quic_get_connection(c); | 1306 qc = ngx_quic_get_connection(c); |
1248 | 1307 |
1269 return NGX_OK; | 1328 return NGX_OK; |
1270 } | 1329 } |
1271 | 1330 |
1272 qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; | 1331 qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; |
1273 | 1332 |
1274 sc = qs->connection; | 1333 if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) { |
1275 | 1334 return NGX_ERROR; |
1276 if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { | 1335 } |
1277 return NGX_ERROR; | 1336 |
1278 } | 1337 if (qs->recv_final_size != (uint64_t) -1 |
1279 | 1338 && qs->recv_final_size != f->final_size) |
1280 if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) { | 1339 { |
1281 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | 1340 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; |
1282 return NGX_ERROR; | 1341 return NGX_ERROR; |
1283 } | 1342 } |
1284 | 1343 |
1285 if (qs->recv_last > f->final_size) { | 1344 if (qs->recv_last > f->final_size) { |
1286 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | 1345 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; |
1287 return NGX_ERROR; | 1346 return NGX_ERROR; |
1288 } | 1347 } |
1289 | 1348 |
1290 qs->final_size = f->final_size; | 1349 qs->recv_final_size = f->final_size; |
1291 | 1350 |
1292 if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) { | 1351 if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) { |
1293 return NGX_ERROR; | 1352 return NGX_ERROR; |
1353 } | |
1354 | |
1355 if (qs->connection == NULL) { | |
1356 return ngx_quic_close_stream(qs); | |
1294 } | 1357 } |
1295 | 1358 |
1296 ngx_quic_set_event(qs->connection->read); | 1359 ngx_quic_set_event(qs->connection->read); |
1297 | 1360 |
1298 return NGX_OK; | 1361 return NGX_OK; |
1323 | 1386 |
1324 if (qs == NGX_QUIC_STREAM_GONE) { | 1387 if (qs == NGX_QUIC_STREAM_GONE) { |
1325 return NGX_OK; | 1388 return NGX_OK; |
1326 } | 1389 } |
1327 | 1390 |
1328 if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) { | 1391 if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) { |
1329 return NGX_ERROR; | 1392 return NGX_ERROR; |
1393 } | |
1394 | |
1395 if (qs->connection == NULL) { | |
1396 return ngx_quic_close_stream(qs); | |
1330 } | 1397 } |
1331 | 1398 |
1332 ngx_quic_set_event(qs->connection->write); | 1399 ngx_quic_set_event(qs->connection->write); |
1333 | 1400 |
1334 return NGX_OK; | 1401 return NGX_OK; |
1376 qs = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id); | 1443 qs = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id); |
1377 if (qs == NULL) { | 1444 if (qs == NULL) { |
1378 return; | 1445 return; |
1379 } | 1446 } |
1380 | 1447 |
1448 if (qs->connection == NULL) { | |
1449 qs->acked += f->u.stream.length; | |
1450 return; | |
1451 } | |
1452 | |
1381 sent = qs->connection->sent; | 1453 sent = qs->connection->sent; |
1382 unacked = sent - qs->acked; | 1454 unacked = sent - qs->acked; |
1383 | |
1384 if (unacked >= qc->conf->stream_buffer_size) { | |
1385 ngx_quic_set_event(qs->connection->write); | |
1386 } | |
1387 | |
1388 qs->acked += f->u.stream.length; | 1455 qs->acked += f->u.stream.length; |
1389 | 1456 |
1390 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0, | 1457 ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, |
1391 "quic stream ack len:%uL acked:%uL unacked:%uL", | 1458 "quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL", |
1392 f->u.stream.length, qs->acked, sent - qs->acked); | 1459 qs->id, f->u.stream.length, qs->acked, sent - qs->acked); |
1460 | |
1461 if (unacked != qc->conf->stream_buffer_size) { | |
1462 /* not blocked on buffer size */ | |
1463 return; | |
1464 } | |
1465 | |
1466 ngx_quic_set_event(qs->connection->write); | |
1393 } | 1467 } |
1394 | 1468 |
1395 | 1469 |
1396 static ngx_int_t | 1470 static ngx_int_t |
1397 ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) | 1471 ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last) |
1398 { | 1472 { |
1399 uint64_t len; | 1473 uint64_t len; |
1400 ngx_quic_stream_t *qs; | 1474 ngx_connection_t *pc; |
1401 ngx_quic_connection_t *qc; | 1475 ngx_quic_connection_t *qc; |
1402 | 1476 |
1403 qs = c->quic; | 1477 pc = qs->parent; |
1404 qc = ngx_quic_get_connection(qs->parent); | 1478 qc = ngx_quic_get_connection(pc); |
1405 | 1479 |
1406 if (last <= qs->recv_last) { | 1480 if (last <= qs->recv_last) { |
1407 return NGX_OK; | 1481 return NGX_OK; |
1408 } | 1482 } |
1409 | 1483 |
1410 len = last - qs->recv_last; | 1484 len = last - qs->recv_last; |
1411 | 1485 |
1412 ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, | 1486 ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0, |
1413 "quic flow control msd:%uL/%uL md:%uL/%uL", | 1487 "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL", |
1414 last, qs->recv_max_data, qc->streams.recv_last + len, | 1488 qs->id, last, qs->recv_max_data, qc->streams.recv_last + len, |
1415 qc->streams.recv_max_data); | 1489 qc->streams.recv_max_data); |
1416 | 1490 |
1417 qs->recv_last += len; | 1491 qs->recv_last += len; |
1418 | 1492 |
1419 if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV | 1493 if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV |
1433 return NGX_OK; | 1507 return NGX_OK; |
1434 } | 1508 } |
1435 | 1509 |
1436 | 1510 |
1437 static ngx_int_t | 1511 static ngx_int_t |
1438 ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) | 1512 ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last) |
1439 { | 1513 { |
1440 uint64_t len; | 1514 uint64_t len; |
1441 ngx_connection_t *pc; | 1515 ngx_connection_t *pc; |
1442 ngx_quic_stream_t *qs; | 1516 ngx_quic_connection_t *qc; |
1443 ngx_quic_connection_t *qc; | 1517 |
1444 | |
1445 qs = c->quic; | |
1446 pc = qs->parent; | 1518 pc = qs->parent; |
1447 qc = ngx_quic_get_connection(pc); | 1519 qc = ngx_quic_get_connection(pc); |
1448 | 1520 |
1449 if (last <= qs->recv_offset) { | 1521 if (last <= qs->recv_offset) { |
1450 return NGX_OK; | 1522 return NGX_OK; |
1451 } | 1523 } |
1452 | 1524 |
1453 len = last - qs->recv_offset; | 1525 len = last - qs->recv_offset; |
1454 | 1526 |
1455 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | 1527 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, |
1456 "quic flow update %uL", last); | 1528 "quic stream id:0x%xL flow update %uL", qs->id, last); |
1457 | 1529 |
1458 qs->recv_offset += len; | 1530 qs->recv_offset += len; |
1459 | 1531 |
1460 if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) { | 1532 if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) { |
1461 if (ngx_quic_update_max_stream_data(c) != NGX_OK) { | 1533 if (ngx_quic_update_max_stream_data(qs) != NGX_OK) { |
1462 return NGX_ERROR; | 1534 return NGX_ERROR; |
1463 } | 1535 } |
1464 } | 1536 } |
1465 | 1537 |
1466 qc->streams.recv_offset += len; | 1538 qc->streams.recv_offset += len; |
1476 return NGX_OK; | 1548 return NGX_OK; |
1477 } | 1549 } |
1478 | 1550 |
1479 | 1551 |
1480 static ngx_int_t | 1552 static ngx_int_t |
1481 ngx_quic_update_max_stream_data(ngx_connection_t *c) | 1553 ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs) |
1482 { | 1554 { |
1483 uint64_t recv_max_data; | 1555 uint64_t recv_max_data; |
1484 ngx_connection_t *pc; | 1556 ngx_connection_t *pc; |
1485 ngx_quic_frame_t *frame; | 1557 ngx_quic_frame_t *frame; |
1486 ngx_quic_stream_t *qs; | 1558 ngx_quic_connection_t *qc; |
1487 ngx_quic_connection_t *qc; | 1559 |
1488 | |
1489 qs = c->quic; | |
1490 pc = qs->parent; | 1560 pc = qs->parent; |
1491 qc = ngx_quic_get_connection(pc); | 1561 qc = ngx_quic_get_connection(pc); |
1492 | 1562 |
1493 if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV) { | 1563 if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV) { |
1494 return NGX_OK; | 1564 return NGX_OK; |
1500 return NGX_OK; | 1570 return NGX_OK; |
1501 } | 1571 } |
1502 | 1572 |
1503 qs->recv_max_data = recv_max_data; | 1573 qs->recv_max_data = recv_max_data; |
1504 | 1574 |
1505 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | 1575 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, |
1506 "quic flow update msd:%uL", qs->recv_max_data); | 1576 "quic stream id:0x%xL flow update msd:%uL", |
1577 qs->id, qs->recv_max_data); | |
1507 | 1578 |
1508 frame = ngx_quic_alloc_frame(pc); | 1579 frame = ngx_quic_alloc_frame(pc); |
1509 if (frame == NULL) { | 1580 if (frame == NULL) { |
1510 return NGX_ERROR; | 1581 return NGX_ERROR; |
1511 } | 1582 } |