comparison src/event/quic/ngx_event_quic_streams.c @ 8750:41807e581de9 quic

QUIC: separate files for stream related processing.
author Vladimir Homutov <vl@nginx.com>
date Tue, 13 Apr 2021 14:40:00 +0300
parents
children 915c2f7092ed
comparison
equal deleted inserted replaced
8749:660c4a2f95f3 8750:41807e581de9
1
2 /*
3 * Copyright (C) Nginx, Inc.
4 */
5
6
7 #include <ngx_config.h>
8 #include <ngx_core.h>
9 #include <ngx_event.h>
10 #include <ngx_event_quic_transport.h>
11 #include <ngx_event_quic_connection.h>
12 #include <ngx_event_quic_streams.h>
13
14
15 #define NGX_QUIC_STREAM_GONE (void *) -1
16
17
18 static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
19 uint64_t id);
20 static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
21 uint64_t id, size_t rcvbuf_size);
22 static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
23 size_t size);
24 static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
25 size_t size);
26 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
27 ngx_chain_t *in, off_t limit);
28 static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
29 static void ngx_quic_stream_cleanup_handler(void *data);
30
31
32 ngx_connection_t *
33 ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
34 {
35 size_t rcvbuf_size;
36 uint64_t id;
37 ngx_quic_stream_t *qs, *sn;
38 ngx_quic_connection_t *qc;
39
40 qs = c->quic;
41 qc = ngx_quic_get_connection(qs->parent);
42
43 if (bidi) {
44 if (qc->streams.server_streams_bidi
45 >= qc->streams.server_max_streams_bidi)
46 {
47 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
48 "quic too many server bidi streams:%uL",
49 qc->streams.server_streams_bidi);
50 return NULL;
51 }
52
53 id = (qc->streams.server_streams_bidi << 2)
54 | NGX_QUIC_STREAM_SERVER_INITIATED;
55
56 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
57 "quic creating server bidi stream"
58 " streams:%uL max:%uL id:0x%xL",
59 qc->streams.server_streams_bidi,
60 qc->streams.server_max_streams_bidi, id);
61
62 qc->streams.server_streams_bidi++;
63 rcvbuf_size = qc->tp.initial_max_stream_data_bidi_local;
64
65 } else {
66 if (qc->streams.server_streams_uni
67 >= qc->streams.server_max_streams_uni)
68 {
69 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
70 "quic too many server uni streams:%uL",
71 qc->streams.server_streams_uni);
72 return NULL;
73 }
74
75 id = (qc->streams.server_streams_uni << 2)
76 | NGX_QUIC_STREAM_SERVER_INITIATED
77 | NGX_QUIC_STREAM_UNIDIRECTIONAL;
78
79 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
80 "quic creating server uni stream"
81 " streams:%uL max:%uL id:0x%xL",
82 qc->streams.server_streams_uni,
83 qc->streams.server_max_streams_uni, id);
84
85 qc->streams.server_streams_uni++;
86 rcvbuf_size = 0;
87 }
88
89 sn = ngx_quic_create_stream(qs->parent, id, rcvbuf_size);
90 if (sn == NULL) {
91 return NULL;
92 }
93
94 return sn->c;
95 }
96
97
98 void
99 ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
100 ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
101 {
102 ngx_rbtree_node_t **p;
103 ngx_quic_stream_t *qn, *qnt;
104
105 for ( ;; ) {
106 qn = (ngx_quic_stream_t *) node;
107 qnt = (ngx_quic_stream_t *) temp;
108
109 p = (qn->id < qnt->id) ? &temp->left : &temp->right;
110
111 if (*p == sentinel) {
112 break;
113 }
114
115 temp = *p;
116 }
117
118 *p = node;
119 node->parent = temp;
120 node->left = sentinel;
121 node->right = sentinel;
122 ngx_rbt_red(node);
123 }
124
125
126 ngx_quic_stream_t *
127 ngx_quic_find_stream(ngx_rbtree_t *rbtree, uint64_t id)
128 {
129 ngx_rbtree_node_t *node, *sentinel;
130 ngx_quic_stream_t *qn;
131
132 node = rbtree->root;
133 sentinel = rbtree->sentinel;
134
135 while (node != sentinel) {
136 qn = (ngx_quic_stream_t *) node;
137
138 if (id == qn->id) {
139 return qn;
140 }
141
142 node = (id < qn->id) ? node->left : node->right;
143 }
144
145 return NULL;
146 }
147
148
149 ngx_int_t
150 ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc)
151 {
152 ngx_event_t *rev, *wev;
153 ngx_rbtree_t *tree;
154 ngx_rbtree_node_t *node;
155 ngx_quic_stream_t *qs;
156
157 #if (NGX_DEBUG)
158 ngx_uint_t ns;
159 #endif
160
161 tree = &qc->streams.tree;
162
163 if (tree->root == tree->sentinel) {
164 return NGX_OK;
165 }
166
167 #if (NGX_DEBUG)
168 ns = 0;
169 #endif
170
171 for (node = ngx_rbtree_min(tree->root, tree->sentinel);
172 node;
173 node = ngx_rbtree_next(tree, node))
174 {
175 qs = (ngx_quic_stream_t *) node;
176
177 rev = qs->c->read;
178 rev->error = 1;
179 rev->ready = 1;
180
181 wev = qs->c->write;
182 wev->error = 1;
183 wev->ready = 1;
184
185 ngx_post_event(rev, &ngx_posted_events);
186
187 if (rev->timer_set) {
188 ngx_del_timer(rev);
189 }
190
191 #if (NGX_DEBUG)
192 ns++;
193 #endif
194 }
195
196 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
197 "quic connection has %ui active streams", ns);
198
199 return NGX_AGAIN;
200 }
201
202
203 ngx_int_t
204 ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
205 {
206 ngx_event_t *wev;
207 ngx_connection_t *pc;
208 ngx_quic_frame_t *frame;
209 ngx_quic_stream_t *qs;
210 ngx_quic_connection_t *qc;
211
212 qs = c->quic;
213 pc = qs->parent;
214 qc = ngx_quic_get_connection(pc);
215
216 frame = ngx_quic_alloc_frame(pc);
217 if (frame == NULL) {
218 return NGX_ERROR;
219 }
220
221 frame->level = ssl_encryption_application;
222 frame->type = NGX_QUIC_FT_RESET_STREAM;
223 frame->u.reset_stream.id = qs->id;
224 frame->u.reset_stream.error_code = err;
225 frame->u.reset_stream.final_size = c->sent;
226
227 ngx_quic_queue_frame(qc, frame);
228
229 wev = c->write;
230 wev->error = 1;
231 wev->ready = 1;
232
233 return NGX_OK;
234 }
235
236
237 static ngx_quic_stream_t *
238 ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
239 {
240 size_t n;
241 uint64_t min_id;
242 ngx_quic_stream_t *sn;
243 ngx_quic_connection_t *qc;
244
245 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
246 "quic stream id:0x%xL is new", id);
247
248 qc = ngx_quic_get_connection(c);
249
250 if (qc->shutdown) {
251 return NGX_QUIC_STREAM_GONE;
252 }
253
254 if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
255
256 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
257 if ((id >> 2) < qc->streams.server_streams_uni) {
258 return NGX_QUIC_STREAM_GONE;
259 }
260
261 qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
262 return NULL;
263 }
264
265 if ((id >> 2) < qc->streams.client_streams_uni) {
266 return NGX_QUIC_STREAM_GONE;
267 }
268
269 if ((id >> 2) >= qc->streams.client_max_streams_uni) {
270 qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
271 return NULL;
272 }
273
274 min_id = (qc->streams.client_streams_uni << 2)
275 | NGX_QUIC_STREAM_UNIDIRECTIONAL;
276 qc->streams.client_streams_uni = (id >> 2) + 1;
277 n = qc->tp.initial_max_stream_data_uni;
278
279 } else {
280
281 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
282 if ((id >> 2) < qc->streams.server_streams_bidi) {
283 return NGX_QUIC_STREAM_GONE;
284 }
285
286 qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
287 return NULL;
288 }
289
290 if ((id >> 2) < qc->streams.client_streams_bidi) {
291 return NGX_QUIC_STREAM_GONE;
292 }
293
294 if ((id >> 2) >= qc->streams.client_max_streams_bidi) {
295 qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
296 return NULL;
297 }
298
299 min_id = (qc->streams.client_streams_bidi << 2);
300 qc->streams.client_streams_bidi = (id >> 2) + 1;
301 n = qc->tp.initial_max_stream_data_bidi_remote;
302 }
303
304 if (n < NGX_QUIC_STREAM_BUFSIZE) {
305 n = NGX_QUIC_STREAM_BUFSIZE;
306 }
307
308 /*
309 * 2.1. Stream Types and Identifiers
310 *
311 * Within each type, streams are created with numerically increasing
312 * stream IDs. A stream ID that is used out of order results in all
313 * streams of that type with lower-numbered stream IDs also being
314 * opened.
315 */
316
317 for ( /* void */ ; min_id < id; min_id += 0x04) {
318
319 sn = ngx_quic_create_stream(c, min_id, n);
320 if (sn == NULL) {
321 return NULL;
322 }
323
324 sn->c->listening->handler(sn->c);
325
326 if (qc->shutdown) {
327 return NGX_QUIC_STREAM_GONE;
328 }
329 }
330
331 return ngx_quic_create_stream(c, id, n);
332 }
333
334
335 static ngx_quic_stream_t *
336 ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
337 {
338 ngx_log_t *log;
339 ngx_pool_t *pool;
340 ngx_quic_stream_t *sn;
341 ngx_pool_cleanup_t *cln;
342 ngx_quic_connection_t *qc;
343
344 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
345 "quic stream id:0x%xL create", id);
346
347 qc = ngx_quic_get_connection(c);
348
349 pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
350 if (pool == NULL) {
351 return NULL;
352 }
353
354 sn = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
355 if (sn == NULL) {
356 ngx_destroy_pool(pool);
357 return NULL;
358 }
359
360 sn->node.key = id;
361 sn->parent = c;
362 sn->id = id;
363
364 sn->b = ngx_create_temp_buf(pool, rcvbuf_size);
365 if (sn->b == NULL) {
366 ngx_destroy_pool(pool);
367 return NULL;
368 }
369
370 ngx_queue_init(&sn->fs.frames);
371
372 log = ngx_palloc(pool, sizeof(ngx_log_t));
373 if (log == NULL) {
374 ngx_destroy_pool(pool);
375 return NULL;
376 }
377
378 *log = *c->log;
379 pool->log = log;
380
381 sn->c = ngx_get_connection(-1, log);
382 if (sn->c == NULL) {
383 ngx_destroy_pool(pool);
384 return NULL;
385 }
386
387 sn->c->quic = sn;
388 sn->c->type = SOCK_STREAM;
389 sn->c->pool = pool;
390 sn->c->ssl = c->ssl;
391 sn->c->sockaddr = c->sockaddr;
392 sn->c->listening = c->listening;
393 sn->c->addr_text = c->addr_text;
394 sn->c->local_sockaddr = c->local_sockaddr;
395 sn->c->local_socklen = c->local_socklen;
396 sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
397
398 sn->c->recv = ngx_quic_stream_recv;
399 sn->c->send = ngx_quic_stream_send;
400 sn->c->send_chain = ngx_quic_stream_send_chain;
401
402 sn->c->read->log = log;
403 sn->c->write->log = log;
404
405 log->connection = sn->c->number;
406
407 if ((id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0
408 || (id & NGX_QUIC_STREAM_SERVER_INITIATED))
409 {
410 sn->c->write->ready = 1;
411 }
412
413 if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
414 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
415 sn->send_max_data = qc->ctp.initial_max_stream_data_uni;
416 }
417
418 } else {
419 if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
420 sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
421 } else {
422 sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
423 }
424 }
425
426 cln = ngx_pool_cleanup_add(pool, 0);
427 if (cln == NULL) {
428 ngx_close_connection(sn->c);
429 ngx_destroy_pool(pool);
430 return NULL;
431 }
432
433 cln->handler = ngx_quic_stream_cleanup_handler;
434 cln->data = sn->c;
435
436 ngx_rbtree_insert(&qc->streams.tree, &sn->node);
437
438 return sn;
439 }
440
441
442 static ssize_t
443 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
444 {
445 ssize_t len;
446 ngx_buf_t *b;
447 ngx_event_t *rev;
448 ngx_connection_t *pc;
449 ngx_quic_frame_t *frame;
450 ngx_quic_stream_t *qs;
451 ngx_quic_connection_t *qc;
452
453 qs = c->quic;
454 b = qs->b;
455 pc = qs->parent;
456 qc = ngx_quic_get_connection(pc);
457 rev = c->read;
458
459 if (rev->error) {
460 return NGX_ERROR;
461 }
462
463 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
464 "quic stream recv id:0x%xL eof:%d avail:%z",
465 qs->id, rev->pending_eof, b->last - b->pos);
466
467 if (b->pos == b->last) {
468 rev->ready = 0;
469
470 if (rev->pending_eof) {
471 rev->eof = 1;
472 return 0;
473 }
474
475 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
476 "quic stream id:0x%xL recv() not ready", qs->id);
477 return NGX_AGAIN;
478 }
479
480 len = ngx_min(b->last - b->pos, (ssize_t) size);
481
482 ngx_memcpy(buf, b->pos, len);
483
484 b->pos += len;
485 qc->streams.received += len;
486
487 if (b->pos == b->last) {
488 b->pos = b->start;
489 b->last = b->start;
490 rev->ready = rev->pending_eof;
491 }
492
493 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
494 "quic stream id:0x%xL recv len:%z of size:%uz",
495 qs->id, len, size);
496
497 if (!rev->pending_eof) {
498 frame = ngx_quic_alloc_frame(pc);
499 if (frame == NULL) {
500 return NGX_ERROR;
501 }
502
503 frame->level = ssl_encryption_application;
504 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
505 frame->u.max_stream_data.id = qs->id;
506 frame->u.max_stream_data.limit = qs->fs.received + (b->pos - b->start)
507 + (b->end - b->last);
508
509 ngx_quic_queue_frame(qc, frame);
510 }
511
512 if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
513
514 frame = ngx_quic_alloc_frame(pc);
515
516 if (frame == NULL) {
517 return NGX_ERROR;
518 }
519
520 qc->streams.recv_max_data *= 2;
521
522 frame->level = ssl_encryption_application;
523 frame->type = NGX_QUIC_FT_MAX_DATA;
524 frame->u.max_data.max_data = qc->streams.recv_max_data;
525
526 ngx_quic_queue_frame(qc, frame);
527
528 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
529 "quic stream id:0x%xL recv: increased max_data:%uL",
530 qs->id, qc->streams.recv_max_data);
531 }
532
533 return len;
534 }
535
536
537 static ssize_t
538 ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
539 {
540 ngx_buf_t b;
541 ngx_chain_t cl;
542
543 ngx_memzero(&b, sizeof(ngx_buf_t));
544
545 b.memory = 1;
546 b.pos = buf;
547 b.last = buf + size;
548
549 cl.buf = &b;
550 cl.next = NULL;
551
552 if (ngx_quic_stream_send_chain(c, &cl, 0) == NGX_CHAIN_ERROR) {
553 return NGX_ERROR;
554 }
555
556 if (b.pos == buf) {
557 return NGX_AGAIN;
558 }
559
560 return b.pos - buf;
561 }
562
563
564 static ngx_chain_t *
565 ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
566 {
567 size_t n, flow;
568 ngx_event_t *wev;
569 ngx_chain_t *cl;
570 ngx_connection_t *pc;
571 ngx_quic_frame_t *frame;
572 ngx_quic_stream_t *qs;
573 ngx_quic_connection_t *qc;
574
575 qs = c->quic;
576 pc = qs->parent;
577 qc = ngx_quic_get_connection(pc);
578 wev = c->write;
579
580 if (wev->error) {
581 return NGX_CHAIN_ERROR;
582 }
583
584 flow = ngx_quic_max_stream_flow(c);
585 if (flow == 0) {
586 wev->ready = 0;
587 return in;
588 }
589
590 n = (limit && (size_t) limit < flow) ? (size_t) limit : flow;
591
592 frame = ngx_quic_alloc_frame(pc);
593 if (frame == NULL) {
594 return NGX_CHAIN_ERROR;
595 }
596
597 frame->data = ngx_quic_copy_chain(pc, in, n);
598 if (frame->data == NGX_CHAIN_ERROR) {
599 return NGX_CHAIN_ERROR;
600 }
601
602 for (n = 0, cl = frame->data; cl; cl = cl->next) {
603 n += ngx_buf_size(cl->buf);
604 }
605
606 while (in && ngx_buf_size(in->buf) == 0) {
607 in = in->next;
608 }
609
610 frame->level = ssl_encryption_application;
611 frame->type = NGX_QUIC_FT_STREAM6; /* OFF=1 LEN=1 FIN=0 */
612 frame->u.stream.off = 1;
613 frame->u.stream.len = 1;
614 frame->u.stream.fin = 0;
615
616 frame->u.stream.type = frame->type;
617 frame->u.stream.stream_id = qs->id;
618 frame->u.stream.offset = c->sent;
619 frame->u.stream.length = n;
620
621 c->sent += n;
622 qc->streams.sent += n;
623
624 ngx_quic_queue_frame(qc, frame);
625
626 wev->ready = (n < flow) ? 1 : 0;
627
628 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
629 "quic send_chain sent:%uz", n);
630
631 return in;
632 }
633
634
635 static size_t
636 ngx_quic_max_stream_flow(ngx_connection_t *c)
637 {
638 size_t size;
639 uint64_t sent, unacked;
640 ngx_quic_stream_t *qs;
641 ngx_quic_connection_t *qc;
642
643 qs = c->quic;
644 qc = ngx_quic_get_connection(qs->parent);
645
646 size = NGX_QUIC_STREAM_BUFSIZE;
647 sent = c->sent;
648 unacked = sent - qs->acked;
649
650 if (qc->streams.send_max_data == 0) {
651 qc->streams.send_max_data = qc->ctp.initial_max_data;
652 }
653
654 if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
655 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
656 "quic send flow hit buffer size");
657 return 0;
658 }
659
660 if (unacked + size > NGX_QUIC_STREAM_BUFSIZE) {
661 size = NGX_QUIC_STREAM_BUFSIZE - unacked;
662 }
663
664 if (qc->streams.sent >= qc->streams.send_max_data) {
665 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
666 "quic send flow hit MAX_DATA");
667 return 0;
668 }
669
670 if (qc->streams.sent + size > qc->streams.send_max_data) {
671 size = qc->streams.send_max_data - qc->streams.sent;
672 }
673
674 if (sent >= qs->send_max_data) {
675 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
676 "quic send flow hit MAX_STREAM_DATA");
677 return 0;
678 }
679
680 if (sent + size > qs->send_max_data) {
681 size = qs->send_max_data - sent;
682 }
683
684 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
685 "quic send flow:%uz", size);
686
687 return size;
688 }
689
690
691 static void
692 ngx_quic_stream_cleanup_handler(void *data)
693 {
694 ngx_connection_t *c = data;
695
696 ngx_connection_t *pc;
697 ngx_quic_frame_t *frame;
698 ngx_quic_stream_t *qs;
699 ngx_quic_connection_t *qc;
700
701 qs = c->quic;
702 pc = qs->parent;
703 qc = ngx_quic_get_connection(pc);
704
705 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
706 "quic stream id:0x%xL cleanup", qs->id);
707
708 ngx_rbtree_delete(&qc->streams.tree, &qs->node);
709 ngx_quic_free_frames(pc, &qs->fs.frames);
710
711 if (qc->closing) {
712 /* schedule handler call to continue ngx_quic_close_connection() */
713 ngx_post_event(pc->read, &ngx_posted_events);
714 return;
715 }
716
717 if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
718 || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
719 {
720 if (!c->read->pending_eof && !c->read->error) {
721 frame = ngx_quic_alloc_frame(pc);
722 if (frame == NULL) {
723 goto done;
724 }
725
726 frame->level = ssl_encryption_application;
727 frame->type = NGX_QUIC_FT_STOP_SENDING;
728 frame->u.stop_sending.id = qs->id;
729 frame->u.stop_sending.error_code = 0x100; /* HTTP/3 no error */
730
731 ngx_quic_queue_frame(qc, frame);
732 }
733 }
734
735 if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
736 frame = ngx_quic_alloc_frame(pc);
737 if (frame == NULL) {
738 goto done;
739 }
740
741 frame->level = ssl_encryption_application;
742 frame->type = NGX_QUIC_FT_MAX_STREAMS;
743
744 if (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
745 frame->u.max_streams.limit = ++qc->streams.client_max_streams_uni;
746 frame->u.max_streams.bidi = 0;
747
748 } else {
749 frame->u.max_streams.limit = ++qc->streams.client_max_streams_bidi;
750 frame->u.max_streams.bidi = 1;
751 }
752
753 ngx_quic_queue_frame(qc, frame);
754
755 if (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
756 /* do not send fin for client unidirectional streams */
757 goto done;
758 }
759 }
760
761 if (c->write->error) {
762 goto done;
763 }
764
765 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
766 "quic stream id:0x%xL send fin", qs->id);
767
768 frame = ngx_quic_alloc_frame(pc);
769 if (frame == NULL) {
770 goto done;
771 }
772
773 frame->level = ssl_encryption_application;
774 frame->type = NGX_QUIC_FT_STREAM7; /* OFF=1 LEN=1 FIN=1 */
775 frame->u.stream.off = 1;
776 frame->u.stream.len = 1;
777 frame->u.stream.fin = 1;
778
779 frame->u.stream.type = frame->type;
780 frame->u.stream.stream_id = qs->id;
781 frame->u.stream.offset = c->sent;
782 frame->u.stream.length = 0;
783
784 ngx_quic_queue_frame(qc, frame);
785
786 done:
787
788 (void) ngx_quic_output(pc);
789
790 if (qc->shutdown) {
791 ngx_quic_shutdown_quic(pc);
792 }
793 }
794
795
796 ngx_int_t
797 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
798 ngx_quic_frame_t *frame)
799 {
800 size_t window;
801 uint64_t last;
802 ngx_buf_t *b;
803 ngx_pool_t *pool;
804 ngx_connection_t *sc;
805 ngx_quic_stream_t *sn;
806 ngx_quic_connection_t *qc;
807 ngx_quic_stream_frame_t *f;
808 ngx_quic_frames_stream_t *fs;
809
810 qc = ngx_quic_get_connection(c);
811 f = &frame->u.stream;
812
813 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
814 && (f->stream_id & NGX_QUIC_STREAM_SERVER_INITIATED))
815 {
816 qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
817 return NGX_ERROR;
818 }
819
820 /* no overflow since both values are 62-bit */
821 last = f->offset + f->length;
822
823 sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
824
825 if (sn == NULL) {
826 sn = ngx_quic_create_client_stream(c, f->stream_id);
827
828 if (sn == NULL) {
829 return NGX_ERROR;
830 }
831
832 if (sn == NGX_QUIC_STREAM_GONE) {
833 return NGX_OK;
834 }
835
836 sc = sn->c;
837 fs = &sn->fs;
838 b = sn->b;
839 window = b->end - b->last;
840
841 if (last > window) {
842 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
843 goto cleanup;
844 }
845
846 if (ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
847 sn)
848 != NGX_OK)
849 {
850 goto cleanup;
851 }
852
853 sc->listening->handler(sc);
854
855 return NGX_OK;
856 }
857
858 fs = &sn->fs;
859 b = sn->b;
860 window = (b->pos - b->start) + (b->end - b->last);
861
862 if (last > fs->received && last - fs->received > window) {
863 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
864 return NGX_ERROR;
865 }
866
867 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
868 sn);
869
870 cleanup:
871
872 pool = sc->pool;
873
874 ngx_close_connection(sc);
875 ngx_destroy_pool(pool);
876
877 return NGX_ERROR;
878 }
879
880
881 ngx_int_t
882 ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
883 {
884 uint64_t id;
885 ngx_buf_t *b;
886 ngx_event_t *rev;
887 ngx_chain_t *cl;
888 ngx_quic_stream_t *sn;
889 ngx_quic_connection_t *qc;
890 ngx_quic_stream_frame_t *f;
891
892 qc = ngx_quic_get_connection(c);
893 sn = data;
894
895 f = &frame->u.stream;
896 id = f->stream_id;
897
898 b = sn->b;
899
900 if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
901 ngx_log_error(NGX_LOG_INFO, c->log, 0,
902 "quic no space in stream buffer");
903 return NGX_ERROR;
904 }
905
906 if ((size_t) (b->end - b->last) < f->length) {
907 b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
908 b->pos = b->start;
909 }
910
911 for (cl = frame->data; cl; cl = cl->next) {
912 b->last = ngx_cpymem(b->last, cl->buf->pos,
913 cl->buf->last - cl->buf->pos);
914 }
915
916 rev = sn->c->read;
917 rev->ready = 1;
918
919 if (f->fin) {
920 rev->pending_eof = 1;
921 }
922
923 if (rev->active) {
924 rev->handler(rev);
925 }
926
927 /* check if stream was destroyed by handler */
928 if (ngx_quic_find_stream(&qc->streams.tree, id) == NULL) {
929 return NGX_DONE;
930 }
931
932 return NGX_OK;
933 }
934
935
936 ngx_int_t
937 ngx_quic_handle_max_data_frame(ngx_connection_t *c,
938 ngx_quic_max_data_frame_t *f)
939 {
940 ngx_event_t *wev;
941 ngx_rbtree_t *tree;
942 ngx_rbtree_node_t *node;
943 ngx_quic_stream_t *qs;
944 ngx_quic_connection_t *qc;
945
946 qc = ngx_quic_get_connection(c);
947 tree = &qc->streams.tree;
948
949 if (f->max_data <= qc->streams.send_max_data) {
950 return NGX_OK;
951 }
952
953 if (qc->streams.sent >= qc->streams.send_max_data) {
954
955 for (node = ngx_rbtree_min(tree->root, tree->sentinel);
956 node;
957 node = ngx_rbtree_next(tree, node))
958 {
959 qs = (ngx_quic_stream_t *) node;
960 wev = qs->c->write;
961
962 if (wev->active) {
963 wev->ready = 1;
964 ngx_post_event(wev, &ngx_posted_events);
965 }
966 }
967 }
968
969 qc->streams.send_max_data = f->max_data;
970
971 return NGX_OK;
972 }
973
974
975 ngx_int_t
976 ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
977 ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f)
978 {
979 return NGX_OK;
980 }
981
982
983 ngx_int_t
984 ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
985 ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
986 {
987 size_t n;
988 ngx_buf_t *b;
989 ngx_quic_frame_t *frame;
990 ngx_quic_stream_t *sn;
991 ngx_quic_connection_t *qc;
992
993 qc = ngx_quic_get_connection(c);
994
995 if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
996 && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
997 {
998 qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
999 return NGX_ERROR;
1000 }
1001
1002 sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
1003
1004 if (sn == NULL) {
1005 sn = ngx_quic_create_client_stream(c, f->id);
1006
1007 if (sn == NULL) {
1008 return NGX_ERROR;
1009 }
1010
1011 if (sn == NGX_QUIC_STREAM_GONE) {
1012 return NGX_OK;
1013 }
1014
1015 b = sn->b;
1016 n = b->end - b->last;
1017
1018 sn->c->listening->handler(sn->c);
1019
1020 } else {
1021 b = sn->b;
1022 n = sn->fs.received + (b->pos - b->start) + (b->end - b->last);
1023 }
1024
1025 frame = ngx_quic_alloc_frame(c);
1026 if (frame == NULL) {
1027 return NGX_ERROR;
1028 }
1029
1030 frame->level = pkt->level;
1031 frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
1032 frame->u.max_stream_data.id = f->id;
1033 frame->u.max_stream_data.limit = n;
1034
1035 ngx_quic_queue_frame(qc, frame);
1036
1037 return NGX_OK;
1038 }
1039
1040
1041 ngx_int_t
1042 ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
1043 ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
1044 {
1045 uint64_t sent;
1046 ngx_event_t *wev;
1047 ngx_quic_stream_t *sn;
1048 ngx_quic_connection_t *qc;
1049
1050 qc = ngx_quic_get_connection(c);
1051
1052 if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
1053 && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
1054 {
1055 qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
1056 return NGX_ERROR;
1057 }
1058
1059 sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
1060
1061 if (sn == NULL) {
1062 sn = ngx_quic_create_client_stream(c, f->id);
1063
1064 if (sn == NULL) {
1065 return NGX_ERROR;
1066 }
1067
1068 if (sn == NGX_QUIC_STREAM_GONE) {
1069 return NGX_OK;
1070 }
1071
1072 if (f->limit > sn->send_max_data) {
1073 sn->send_max_data = f->limit;
1074 }
1075
1076 sn->c->listening->handler(sn->c);
1077
1078 return NGX_OK;
1079 }
1080
1081 if (f->limit <= sn->send_max_data) {
1082 return NGX_OK;
1083 }
1084
1085 sent = sn->c->sent;
1086
1087 if (sent >= sn->send_max_data) {
1088 wev = sn->c->write;
1089
1090 if (wev->active) {
1091 wev->ready = 1;
1092 ngx_post_event(wev, &ngx_posted_events);
1093 }
1094 }
1095
1096 sn->send_max_data = f->limit;
1097
1098 return NGX_OK;
1099 }
1100
1101
1102 ngx_int_t
1103 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
1104 ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
1105 {
1106 ngx_event_t *rev;
1107 ngx_connection_t *sc;
1108 ngx_quic_stream_t *sn;
1109 ngx_quic_connection_t *qc;
1110
1111 qc = ngx_quic_get_connection(c);
1112
1113 if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
1114 && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
1115 {
1116 qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
1117 return NGX_ERROR;
1118 }
1119
1120 sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
1121
1122 if (sn == NULL) {
1123 sn = ngx_quic_create_client_stream(c, f->id);
1124
1125 if (sn == NULL) {
1126 return NGX_ERROR;
1127 }
1128
1129 if (sn == NGX_QUIC_STREAM_GONE) {
1130 return NGX_OK;
1131 }
1132
1133 sc = sn->c;
1134
1135 rev = sc->read;
1136 rev->error = 1;
1137 rev->ready = 1;
1138
1139 sc->listening->handler(sc);
1140
1141 return NGX_OK;
1142 }
1143
1144 rev = sn->c->read;
1145 rev->error = 1;
1146 rev->ready = 1;
1147
1148 if (rev->active) {
1149 rev->handler(rev);
1150 }
1151
1152 return NGX_OK;
1153 }
1154
1155
1156 ngx_int_t
1157 ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
1158 ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f)
1159 {
1160 ngx_event_t *wev;
1161 ngx_connection_t *sc;
1162 ngx_quic_stream_t *sn;
1163 ngx_quic_connection_t *qc;
1164
1165 qc = ngx_quic_get_connection(c);
1166
1167 if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
1168 && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
1169 {
1170 qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
1171 return NGX_ERROR;
1172 }
1173
1174 sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
1175
1176 if (sn == NULL) {
1177 sn = ngx_quic_create_client_stream(c, f->id);
1178
1179 if (sn == NULL) {
1180 return NGX_ERROR;
1181 }
1182
1183 if (sn == NGX_QUIC_STREAM_GONE) {
1184 return NGX_OK;
1185 }
1186
1187 sc = sn->c;
1188
1189 wev = sc->write;
1190 wev->error = 1;
1191 wev->ready = 1;
1192
1193 sc->listening->handler(sc);
1194
1195 return NGX_OK;
1196 }
1197
1198 wev = sn->c->write;
1199 wev->error = 1;
1200 wev->ready = 1;
1201
1202 if (wev->active) {
1203 wev->handler(wev);
1204 }
1205
1206 return NGX_OK;
1207 }
1208
1209
1210 ngx_int_t
1211 ngx_quic_handle_max_streams_frame(ngx_connection_t *c,
1212 ngx_quic_header_t *pkt, ngx_quic_max_streams_frame_t *f)
1213 {
1214 ngx_quic_connection_t *qc;
1215
1216 qc = ngx_quic_get_connection(c);
1217
1218 if (f->bidi) {
1219 if (qc->streams.server_max_streams_bidi < f->limit) {
1220 qc->streams.server_max_streams_bidi = f->limit;
1221
1222 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
1223 "quic max_streams_bidi:%uL", f->limit);
1224 }
1225
1226 } else {
1227 if (qc->streams.server_max_streams_uni < f->limit) {
1228 qc->streams.server_max_streams_uni = f->limit;
1229
1230 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
1231 "quic max_streams_uni:%uL", f->limit);
1232 }
1233 }
1234
1235 return NGX_OK;
1236 }
1237
1238
1239 void
1240 ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
1241 {
1242 uint64_t sent, unacked;
1243 ngx_event_t *wev;
1244 ngx_quic_stream_t *sn;
1245 ngx_quic_connection_t *qc;
1246
1247 qc = ngx_quic_get_connection(c);
1248
1249 sn = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
1250 if (sn == NULL) {
1251 return;
1252 }
1253
1254 wev = sn->c->write;
1255 sent = sn->c->sent;
1256 unacked = sent - sn->acked;
1257
1258 if (unacked >= NGX_QUIC_STREAM_BUFSIZE && wev->active) {
1259 wev->ready = 1;
1260 ngx_post_event(wev, &ngx_posted_events);
1261 }
1262
1263 sn->acked += f->u.stream.length;
1264
1265 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, sn->c->log, 0,
1266 "quic stream ack len:%uL acked:%uL unacked:%uL",
1267 f->u.stream.length, sn->acked, sent - sn->acked);
1268 }