Mercurial > hg > nginx
comparison src/event/quic/ngx_event_quic_streams.c @ 8782:b3f6ad181df4 quic
QUIC: refactored CRYPTO and STREAM buffer ordering.
Generic function ngx_quic_order_bufs() is introduced. This function creates
and maintains a chain of buffers with holes. Holes are marked with b->sync
flag. Several buffers and holes in this chain may share the same underlying
memory buffer.
When processing STREAM frames with this function, frame data is copied only
once to the right place in the stream input chain. Previously data could
be copied twice. First when buffering an out-of-order frame data, and then
when filling stream buffer from ordered frame queue. Now there's only one
data chain for both tasks.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Tue, 25 May 2021 13:55:12 +0300 |
parents | f52a2b77d406 |
children | 60c6e8d8d3ae |
comparison
equal
deleted
inserted
replaced
8781:81d491f0dc8c | 8782:b3f6ad181df4 |
---|---|
347 } | 347 } |
348 | 348 |
349 qs->node.key = id; | 349 qs->node.key = id; |
350 qs->parent = c; | 350 qs->parent = c; |
351 qs->id = id; | 351 qs->id = id; |
352 | 352 qs->final_size = (uint64_t) -1; |
353 qs->fs = ngx_pcalloc(pool, sizeof(ngx_quic_frames_stream_t)); | |
354 if (qs->fs == NULL) { | |
355 ngx_destroy_pool(pool); | |
356 return NULL; | |
357 } | |
358 | |
359 ngx_queue_init(&qs->fs->frames); | |
360 | 353 |
361 log = ngx_palloc(pool, sizeof(ngx_log_t)); | 354 log = ngx_palloc(pool, sizeof(ngx_log_t)); |
362 if (log == NULL) { | 355 if (log == NULL) { |
363 ngx_destroy_pool(pool); | 356 ngx_destroy_pool(pool); |
364 return NULL; | 357 return NULL; |
455 | 448 |
456 if (rev->error) { | 449 if (rev->error) { |
457 return NGX_ERROR; | 450 return NGX_ERROR; |
458 } | 451 } |
459 | 452 |
460 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, | 453 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0, |
461 "quic stream recv id:0x%xL eof:%d", | 454 "quic stream id:0x%xL recv eof:%d buf:%uz", |
462 qs->id, rev->pending_eof); | 455 qs->id, rev->pending_eof, size); |
463 | 456 |
464 if (qs->in == NULL) { | 457 if (qs->in == NULL || qs->in->buf->sync) { |
465 rev->ready = 0; | 458 rev->ready = 0; |
466 | 459 |
467 if (rev->pending_eof) { | 460 if (qs->recv_offset == qs->final_size) { |
468 rev->eof = 1; | 461 rev->eof = 1; |
469 return 0; | 462 return 0; |
470 } | 463 } |
471 | 464 |
472 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | 465 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, |
478 cl = qs->in; | 471 cl = qs->in; |
479 | 472 |
480 for (ll = &cl; *ll; ll = &(*ll)->next) { | 473 for (ll = &cl; *ll; ll = &(*ll)->next) { |
481 b = (*ll)->buf; | 474 b = (*ll)->buf; |
482 | 475 |
476 if (b->sync) { | |
477 /* hole */ | |
478 break; | |
479 } | |
480 | |
483 n = ngx_min(b->last - b->pos, (ssize_t) size); | 481 n = ngx_min(b->last - b->pos, (ssize_t) size); |
484 buf = ngx_cpymem(buf, b->pos, n); | 482 buf = ngx_cpymem(buf, b->pos, n); |
485 | 483 |
486 len += n; | 484 len += n; |
487 size -= n; | 485 size -= n; |
497 | 495 |
498 ngx_quic_free_bufs(pc, cl); | 496 ngx_quic_free_bufs(pc, cl); |
499 | 497 |
500 qc->streams.received += len; | 498 qc->streams.received += len; |
501 qs->recv_max_data += len; | 499 qs->recv_max_data += len; |
500 qs->recv_offset += len; | |
502 | 501 |
503 if (qs->in == NULL) { | 502 if (qs->in == NULL) { |
504 rev->ready = rev->pending_eof; | 503 rev->ready = rev->pending_eof; |
505 } | 504 } |
506 | 505 |
507 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0, | 506 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, |
508 "quic stream id:0x%xL recv len:%z of size:%uz", | 507 "quic stream id:0x%xL recv len:%z", qs->id, len); |
509 qs->id, len, size); | |
510 | 508 |
511 if (!rev->pending_eof) { | 509 if (!rev->pending_eof) { |
512 frame = ngx_quic_alloc_frame(pc); | 510 frame = ngx_quic_alloc_frame(pc); |
513 if (frame == NULL) { | 511 if (frame == NULL) { |
514 return NGX_ERROR; | 512 return NGX_ERROR; |
717 | 715 |
718 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, | 716 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, |
719 "quic stream id:0x%xL cleanup", qs->id); | 717 "quic stream id:0x%xL cleanup", qs->id); |
720 | 718 |
721 ngx_rbtree_delete(&qc->streams.tree, &qs->node); | 719 ngx_rbtree_delete(&qc->streams.tree, &qs->node); |
722 ngx_quic_free_frames(pc, &qs->fs->frames); | |
723 ngx_quic_free_bufs(pc, qs->in); | 720 ngx_quic_free_bufs(pc, qs->in); |
724 | 721 |
725 if (qc->closing) { | 722 if (qc->closing) { |
726 /* schedule handler call to continue ngx_quic_close_connection() */ | 723 /* schedule handler call to continue ngx_quic_close_connection() */ |
727 ngx_post_event(pc->read, &ngx_posted_events); | 724 ngx_post_event(pc->read, &ngx_posted_events); |
813 | 810 |
814 ngx_int_t | 811 ngx_int_t |
815 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, | 812 ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, |
816 ngx_quic_frame_t *frame) | 813 ngx_quic_frame_t *frame) |
817 { | 814 { |
818 uint64_t last; | 815 uint64_t last; |
819 ngx_pool_t *pool; | 816 ngx_pool_t *pool; |
820 ngx_connection_t *sc; | 817 ngx_connection_t *sc; |
821 ngx_quic_stream_t *qs; | 818 ngx_quic_stream_t *qs; |
822 ngx_quic_connection_t *qc; | 819 ngx_quic_connection_t *qc; |
823 ngx_quic_stream_frame_t *f; | 820 ngx_quic_stream_frame_t *f; |
824 ngx_quic_frames_stream_t *fs; | |
825 | 821 |
826 qc = ngx_quic_get_connection(c); | 822 qc = ngx_quic_get_connection(c); |
827 f = &frame->u.stream; | 823 f = &frame->u.stream; |
828 | 824 |
829 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) | 825 if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) |
848 if (qs == NGX_QUIC_STREAM_GONE) { | 844 if (qs == NGX_QUIC_STREAM_GONE) { |
849 return NGX_OK; | 845 return NGX_OK; |
850 } | 846 } |
851 | 847 |
852 sc = qs->connection; | 848 sc = qs->connection; |
853 fs = qs->fs; | |
854 | 849 |
855 if (last > qs->recv_max_data) { | 850 if (last > qs->recv_max_data) { |
856 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; | 851 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; |
857 goto cleanup; | 852 goto cleanup; |
858 } | 853 } |
859 | 854 |
860 if (ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input, | 855 if (f->fin) { |
861 qs) | 856 sc->read->pending_eof = 1; |
862 != NGX_OK) | 857 qs->final_size = last; |
863 { | 858 } |
859 | |
860 if (f->offset == 0) { | |
861 sc->read->ready = 1; | |
862 } | |
863 | |
864 if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) { | |
864 goto cleanup; | 865 goto cleanup; |
865 } | 866 } |
866 | 867 |
867 sc->listening->handler(sc); | 868 sc->listening->handler(sc); |
868 | 869 |
869 return NGX_OK; | 870 return NGX_OK; |
870 } | 871 } |
871 | |
872 fs = qs->fs; | |
873 | 872 |
874 if (last > qs->recv_max_data) { | 873 if (last > qs->recv_max_data) { |
875 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; | 874 qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR; |
876 return NGX_ERROR; | 875 return NGX_ERROR; |
877 } | 876 } |
878 | 877 |
879 return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input, | 878 if (qs->final_size != (uint64_t) -1 && last > qs->final_size) { |
880 qs); | 879 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; |
880 return NGX_ERROR; | |
881 } | |
882 | |
883 if (last <= qs->recv_offset) { | |
884 return NGX_OK; | |
885 } | |
886 | |
887 if (f->offset < qs->recv_offset) { | |
888 ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset); | |
889 f->offset = qs->recv_offset; | |
890 } | |
891 | |
892 if (f->offset == qs->recv_offset) { | |
893 qs->connection->read->ready = 1; | |
894 } | |
895 | |
896 if (f->fin) { | |
897 if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { | |
898 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | |
899 return NGX_ERROR; | |
900 } | |
901 | |
902 qs->connection->read->pending_eof = 1; | |
903 qs->final_size = last; | |
904 } | |
905 | |
906 return ngx_quic_order_bufs(c, &qs->in, frame->data, | |
907 f->offset - qs->recv_offset); | |
881 | 908 |
882 cleanup: | 909 cleanup: |
883 | 910 |
884 pool = sc->pool; | 911 pool = sc->pool; |
885 | 912 |
886 ngx_close_connection(sc); | 913 ngx_close_connection(sc); |
887 ngx_destroy_pool(pool); | 914 ngx_destroy_pool(pool); |
888 | 915 |
889 return NGX_ERROR; | 916 return NGX_ERROR; |
890 } | |
891 | |
892 | |
893 ngx_int_t | |
894 ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data) | |
895 { | |
896 ssize_t n; | |
897 uint64_t id; | |
898 ngx_buf_t *b; | |
899 ngx_event_t *rev; | |
900 ngx_chain_t *cl, **ll; | |
901 ngx_quic_stream_t *qs; | |
902 ngx_quic_connection_t *qc; | |
903 ngx_quic_stream_frame_t *f; | |
904 | |
905 qc = ngx_quic_get_connection(c); | |
906 qs = data; | |
907 | |
908 f = &frame->u.stream; | |
909 id = f->stream_id; | |
910 cl = frame->data; | |
911 | |
912 for (ll = &qs->in; *ll; ll = &(*ll)->next) { | |
913 if ((*ll)->next) { | |
914 continue; | |
915 } | |
916 | |
917 /* append to last buffer */ | |
918 | |
919 b = (*ll)->buf; | |
920 | |
921 while (cl && b->last != b->end) { | |
922 n = ngx_min(cl->buf->last - cl->buf->pos, b->end - b->last); | |
923 b->last = ngx_cpymem(b->last, cl->buf->pos, n); | |
924 cl->buf->pos += n; | |
925 | |
926 if (cl->buf->pos == cl->buf->last) { | |
927 cl = cl->next; | |
928 } | |
929 } | |
930 } | |
931 | |
932 cl = ngx_quic_copy_chain(c, cl, 0); | |
933 if (cl == NGX_CHAIN_ERROR) { | |
934 return NGX_ERROR; | |
935 } | |
936 | |
937 *ll = cl; | |
938 | |
939 rev = qs->connection->read; | |
940 rev->ready = 1; | |
941 | |
942 if (f->fin) { | |
943 rev->pending_eof = 1; | |
944 } | |
945 | |
946 if (rev->active) { | |
947 rev->handler(rev); | |
948 } | |
949 | |
950 /* check if stream was destroyed by handler */ | |
951 if (ngx_quic_find_stream(&qc->streams.tree, id) == NULL) { | |
952 return NGX_DONE; | |
953 } | |
954 | |
955 return NGX_OK; | |
956 } | 917 } |
957 | 918 |
958 | 919 |
959 ngx_int_t | 920 ngx_int_t |
960 ngx_quic_handle_max_data_frame(ngx_connection_t *c, | 921 ngx_quic_handle_max_data_frame(ngx_connection_t *c, |
1148 | 1109 |
1149 if (qs == NGX_QUIC_STREAM_GONE) { | 1110 if (qs == NGX_QUIC_STREAM_GONE) { |
1150 return NGX_OK; | 1111 return NGX_OK; |
1151 } | 1112 } |
1152 | 1113 |
1114 qs->final_size = f->final_size; | |
1115 | |
1153 sc = qs->connection; | 1116 sc = qs->connection; |
1154 | 1117 |
1155 rev = sc->read; | 1118 rev = sc->read; |
1156 rev->error = 1; | 1119 rev->error = 1; |
1157 rev->ready = 1; | 1120 rev->ready = 1; |
1158 | 1121 |
1159 sc->listening->handler(sc); | 1122 sc->listening->handler(sc); |
1160 | 1123 |
1161 return NGX_OK; | 1124 return NGX_OK; |
1162 } | 1125 } |
1126 | |
1127 if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) { | |
1128 qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; | |
1129 return NGX_ERROR; | |
1130 } | |
1131 | |
1132 qs->final_size = f->final_size; | |
1163 | 1133 |
1164 rev = qs->connection->read; | 1134 rev = qs->connection->read; |
1165 rev->error = 1; | 1135 rev->error = 1; |
1166 rev->ready = 1; | 1136 rev->ready = 1; |
1167 | 1137 |