# HG changeset patch # User Vladimir Homutov # Date 1584021283 -10800 # Node ID 4ae9ac69ab93d974bd93e16a98337e0e09f9ce43 # Parent cc8d211cb45c66d7907361406c16db9e67131225 HTTP/QUIC interface reworked. - events handling moved into src/event/ngx_event_quic.c - http invokes once ngx_quic_run() and passes stream callback (diff to original http_request.c is now minimal) - streams are stored in rbtree using ID as a key - when a new stream is registered, appropriate callback is called - ngx_quic_stream_t type represents STREAM and stored in c->qs diff --git a/src/core/ngx_connection.h b/src/core/ngx_connection.h --- a/src/core/ngx_connection.h +++ b/src/core/ngx_connection.h @@ -151,6 +151,7 @@ struct ngx_connection_s { #if (NGX_SSL || NGX_COMPAT) ngx_quic_connection_t *quic; + ngx_quic_stream_t *qs; ngx_ssl_connection_t *ssl; #endif diff --git a/src/core/ngx_core.h b/src/core/ngx_core.h --- a/src/core/ngx_core.h +++ b/src/core/ngx_core.h @@ -28,6 +28,7 @@ typedef struct ngx_thread_task_s ng typedef struct ngx_ssl_s ngx_ssl_t; typedef struct ngx_proxy_protocol_s ngx_proxy_protocol_t; typedef struct ngx_quic_connection_s ngx_quic_connection_t; +typedef struct ngx_quic_stream_s ngx_quic_stream_t; typedef struct ngx_ssl_connection_s ngx_ssl_connection_t; typedef struct ngx_udp_connection_s ngx_udp_connection_t; diff --git a/src/event/ngx_event_quic.c b/src/event/ngx_event_quic.c --- a/src/event/ngx_event_quic.c +++ b/src/event/ngx_event_quic.c @@ -6,6 +6,7 @@ #include #include +#include #define quic_version 0xff000018 @@ -228,10 +229,24 @@ struct ngx_quic_connection_s { ngx_quic_secret_t server_in; ngx_quic_secret_t server_hs; ngx_quic_secret_t server_ad; + + /* streams */ + ngx_rbtree_t stree; + ngx_rbtree_node_t stree_sentinel; + ngx_msec_t stream_timeout; + ngx_connection_handler_pt stream_handler; }; typedef struct { + ngx_rbtree_node_t node; + ngx_buf_t *b; + ngx_connection_t *c; + ngx_quic_stream_t s; +} ngx_quic_stream_node_t; + + +typedef struct { ngx_quic_secret_t *secret; ngx_uint_t type; ngx_uint_t *number; @@ -259,7 +274,14 @@ typedef struct { static ngx_int_t ngx_quic_new_connection(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_quic_header_t *pkt); - +static void ngx_quic_close_connection(ngx_connection_t *c); + +static ngx_quic_stream_node_t *ngx_quic_stream_lookup(ngx_rbtree_t *rbtree, + ngx_uint_t key); +static void ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp, + ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel); + +static void ngx_quic_handshake_handler(ngx_event_t *rev); static ngx_int_t ngx_quic_handshake_input(ngx_connection_t *c, ngx_quic_header_t *pkt); static ngx_int_t ngx_quic_app_input(ngx_connection_t *c, @@ -348,20 +370,134 @@ ngx_quic_init_ssl_methods(SSL_CTX* ctx) } +void +ngx_quic_run(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_msec_t timeout, + ngx_connection_handler_pt handler) +{ + ngx_buf_t *b; + ngx_quic_header_t pkt; + + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic handshake"); + + c->log->action = "QUIC handshaking"; + + ngx_memzero(&pkt, sizeof(ngx_quic_header_t)); + + b = c->buffer; + + pkt.raw = b; + pkt.data = b->start; + pkt.len = b->last - b->start; + + if (ngx_quic_new_connection(c, ssl, &pkt) != NGX_OK) { + ngx_quic_close_connection(c); + return; + } + + // we don't need stream handler for initial packet processing + c->quic->stream_handler = handler; + c->quic->stream_timeout = timeout; + + ngx_add_timer(c->read, timeout); + + c->read->handler = ngx_quic_handshake_handler; + + return; +} + + +static void +ngx_quic_handshake_handler(ngx_event_t *rev) +{ + ssize_t n; + ngx_connection_t *c; + u_char buf[512]; + ngx_buf_t b; + + b.start = buf; + b.end = buf + 512; + b.pos = b.last = b.start; + + c = rev->data; + + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, rev->log, 0, "quic handshake handler"); + + if (rev->timedout) { + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); + ngx_quic_close_connection(c); + return; + } + + if (c->close) { + ngx_quic_close_connection(c); + return; + } + + n = c->recv(c, b.start, b.end - b.start); + + if (n == NGX_AGAIN) { + return; + } + + if (n == NGX_ERROR) { + c->read->eof = 1; + ngx_quic_close_connection(c); + return; + } + + b.last += n; + + if (ngx_quic_input(c, NULL, &b) != NGX_OK) { + ngx_quic_close_connection(c); + return; + } +} + + +static void +ngx_quic_close_connection(ngx_connection_t *c) +{ + ngx_pool_t *pool; + + /* XXX wait for all streams to close */ + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, + "close quic connection: %d", c->fd); + + (void) ngx_ssl_shutdown(c); + +#if (NGX_STAT_STUB) + (void) ngx_atomic_fetch_add(ngx_stat_active, -1); +#endif + + c->destroyed = 1; + + pool = c->pool; + + ngx_close_connection(c); + + ngx_destroy_pool(pool); +} + + +ngx_connection_t * +ngx_quic_create_uni_stream(ngx_connection_t *c) +{ + /* XXX */ + return NULL; +} + + ngx_int_t ngx_quic_input(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_buf_t *b) { u_char *p; ngx_quic_header_t pkt; - ngx_memzero(&pkt, sizeof(ngx_quic_header_t)); - - pkt.raw = b; - pkt.data = b->start; - pkt.len = b->last - b->start; - if (c->quic == NULL) { - return ngx_quic_new_connection(c, ssl, &pkt); + // XXX: possible? + ngx_log_error(NGX_LOG_INFO, c->log, 0, "BUG: no QUIC in connection"); + return NGX_ERROR; } p = b->start; @@ -1649,11 +1785,13 @@ ngx_quic_init_connection(ngx_connection_ static ngx_int_t ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt) { - u_char *end, *p; - ssize_t len; - ngx_uint_t ack_this; - ngx_quic_frame_t frame, *ack_frame; - ngx_quic_connection_t *qc; + u_char *end, *p; + ssize_t len; + ngx_buf_t *b; + ngx_uint_t ack_this; + ngx_quic_frame_t frame, *ack_frame; + ngx_quic_connection_t *qc; + ngx_quic_stream_node_t *sn; qc = c->quic; @@ -1735,6 +1873,55 @@ ngx_quic_payload_handler(ngx_connection_ frame.u.stream.offset, frame.u.stream.length); + + sn = ngx_quic_stream_lookup(&qc->stree, frame.u.stream.stream_id); + if (sn == NULL) { + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new"); + + sn = ngx_pcalloc(c->pool, sizeof(ngx_quic_stream_node_t)); + if (sn == NULL) { + return NGX_ERROR; + } + + sn->c = ngx_get_connection(-1, c->log); // TODO: free on connection termination + if (sn->c == NULL) { + return NGX_ERROR; + } + + sn->node.key = frame.u.stream.stream_id; + sn->b = ngx_create_temp_buf(c->pool, 16 * 1024); // XXX enough for everyone + if (sn->b == NULL) { + return NGX_ERROR; + } + b = sn->b; + + ngx_memcpy(b->start, frame.u.stream.data, frame.u.stream.length); + b->last = b->start + frame.u.stream.length; + + ngx_rbtree_insert(&qc->stree, &sn->node); + + sn->s.id = frame.u.stream.stream_id; + sn->s.parent = c; + sn->c->qs = &sn->s; + + qc->stream_handler(sn->c); + + } else { + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream"); + b = sn->b; + + if ((size_t) (b->end - b->pos) < frame.u.stream.length) { + ngx_log_error(NGX_LOG_INFO, c->log, 0, + "no space in stream buffer"); + return NGX_ERROR; + } + + ngx_memcpy(b->pos, frame.u.stream.data, frame.u.stream.length); + b->pos += frame.u.stream.length; + + // TODO: ngx_post_event(&c->read, &ngx_posted_events) ??? + } + ngx_quic_hexdump0(c->log, "STREAM.data", frame.u.stream.data, frame.u.stream.length); break; @@ -1777,6 +1964,71 @@ ngx_quic_payload_handler(ngx_connection_ } +static void +ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp, + ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel) +{ + ngx_rbtree_node_t **p; + ngx_quic_stream_node_t *qn, *qnt; + + for ( ;; ) { + + if (node->key < temp->key) { + + p = &temp->left; + + } else if (node->key > temp->key) { + + p = &temp->right; + + } else { /* node->key == temp->key */ + + qn = (ngx_quic_stream_node_t *) &node->color; + qnt = (ngx_quic_stream_node_t *) &temp->color; + + if (qn->c < qnt->c) { + p = &temp->left; + } else { + p = &temp->right; + } + } + + if (*p == sentinel) { + break; + } + + temp = *p; + } + + *p = node; + node->parent = temp; + node->left = sentinel; + node->right = sentinel; + ngx_rbt_red(node); +} + + +static ngx_quic_stream_node_t * +ngx_quic_stream_lookup(ngx_rbtree_t *rbtree, ngx_uint_t key) +{ + ngx_rbtree_node_t *node, *sentinel; + + node = rbtree->root; + sentinel = rbtree->sentinel; + + while (node != sentinel) { + + if (key == node->key) { + return (ngx_quic_stream_node_t *) node; + } + + node = (key < node->key) ? node->left : node->right; + } + + return NULL; +} + + static ngx_int_t ngx_quic_new_connection(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_quic_header_t *pkt) @@ -1807,6 +2059,9 @@ ngx_quic_new_connection(ngx_connection_t return NGX_ERROR; } + ngx_rbtree_init(&qc->stree, &qc->stree_sentinel, + ngx_quic_rbtree_insert_stream); + c->quic = qc; qc->ssl = ssl; diff --git a/src/event/ngx_event_quic.h b/src/event/ngx_event_quic.h --- a/src/event/ngx_event_quic.h +++ b/src/event/ngx_event_quic.h @@ -10,11 +10,20 @@ #include +struct ngx_quic_stream_s { + uint64_t id; + ngx_uint_t unidirectional:1; + ngx_connection_t *parent; +}; + /* TODO: get rid somehow of ssl argument? */ ngx_int_t ngx_quic_input(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_buf_t *b); ngx_int_t ngx_quic_output(ngx_connection_t *c); void ngx_quic_init_ssl_methods(SSL_CTX* ctx); +void ngx_quic_run(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_msec_t timeout, + ngx_connection_handler_pt handler); +ngx_connection_t *ngx_quic_create_uni_stream(ngx_connection_t *c); #endif /* _NGX_EVENT_QUIC_H_INCLUDED_ */ diff --git a/src/http/ngx_http_request.c b/src/http/ngx_http_request.c --- a/src/http/ngx_http_request.c +++ b/src/http/ngx_http_request.c @@ -62,11 +62,9 @@ static u_char *ngx_http_log_error_handle #if (NGX_HTTP_SSL) static void ngx_http_ssl_handshake(ngx_event_t *rev); static void ngx_http_ssl_handshake_handler(ngx_connection_t *c); - -static void ngx_http_quic_handshake(ngx_event_t *rev); -static void ngx_http_quic_handshake_handler(ngx_event_t *rev); #endif +static void ngx_http_quic_stream_handler(ngx_connection_t *c); static char *ngx_http_client_errors[] = { @@ -333,9 +331,15 @@ ngx_http_init_connection(ngx_connection_ #if (NGX_HTTP_SSL) if (hc->addr_conf->http3) { + ngx_http_ssl_srv_conf_t *sscf; + hc->quic = 1; - c->log->action = "QUIC handshaking"; - rev->handler = ngx_http_quic_handshake; + + sscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_ssl_module); + + ngx_quic_run(c, &sscf->ssl, c->listening->post_accept_timeout, + ngx_http_quic_stream_handler); + return; } #endif @@ -387,6 +391,15 @@ ngx_http_init_connection(ngx_connection_ static void +ngx_http_quic_stream_handler(ngx_connection_t *c) +{ + ngx_quic_stream_t *qs = c->qs; + + printf("quic stream: 0x%lx\n", qs->id); +} + + +static void ngx_http_wait_request_handler(ngx_event_t *rev) { u_char *p; @@ -401,10 +414,6 @@ ngx_http_wait_request_handler(ngx_event_ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http wait request handler"); - if (c->shared) { - goto request; - } - if (rev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); ngx_http_close_connection(c); @@ -505,8 +514,6 @@ ngx_http_wait_request_handler(ngx_event_ } } -request: - c->log->action = "reading client request line"; ngx_reusable_connection(c, 0); @@ -659,82 +666,6 @@ ngx_http_alloc_request(ngx_connection_t #if (NGX_HTTP_SSL) static void -ngx_http_quic_handshake(ngx_event_t *rev) -{ - ngx_connection_t *c; - ngx_http_connection_t *hc; - ngx_http_ssl_srv_conf_t *sscf; - - ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0, "quic handshake"); - - c = rev->data; - hc = c->data; - - sscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_ssl_module); - - if (ngx_quic_input(c, &sscf->ssl, c->buffer) != NGX_OK) { - ngx_http_close_connection(c); - return; - } - - if (!rev->timer_set) { - ngx_add_timer(rev, c->listening->post_accept_timeout); - } - - rev->handler = ngx_http_quic_handshake_handler; - return; -} - - -static void -ngx_http_quic_handshake_handler(ngx_event_t *rev) -{ - ssize_t n; - ngx_connection_t *c; - u_char buf[512]; - ngx_buf_t b; - - b.start = buf; - b.end = buf + 512; - b.pos = b.last = b.start; - - c = rev->data; - - ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0, "quic handshake handler"); - - if (rev->timedout) { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); - ngx_http_close_connection(c); - return; - } - - if (c->close) { - ngx_http_close_connection(c); - return; - } - - n = c->recv(c, b.start, b.end - b.start); - - if (n == NGX_AGAIN) { - return; - } - - if (n == NGX_ERROR) { - c->read->eof = 1; - ngx_http_close_connection(c); - return; - } - - b.last += n; - - if (ngx_quic_input(c, NULL, &b) != NGX_OK) { - ngx_http_close_connection(c); - return; - } -} - - -static void ngx_http_ssl_handshake(ngx_event_t *rev) { u_char *p, buf[NGX_PROXY_PROTOCOL_MAX_HEADER + 1];