changeset 8229:cfc429911c0d quic

Implemented creation of server unidirectional streams. The ngx_quic_create_stream() function is a generic function extracted from the ngx_quic_handle_stream_frame() function.
author Vladimir Homutov <vl@nginx.com>
date Wed, 18 Mar 2020 13:49:39 +0300
parents ac41c53e446d
children 31f7c697b6d9
files src/event/ngx_event_quic.c
diffstat 1 files changed, 108 insertions(+), 59 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -22,6 +22,8 @@ typedef struct {
     ngx_rbtree_node_t                 sentinel;
     ngx_msec_t                        timeout;
     ngx_connection_handler_pt         handler;
+
+    ngx_uint_t                        id_counter;
 } ngx_quic_streams_t;
 
 
@@ -101,6 +103,8 @@ static void ngx_quic_rbtree_insert_strea
     ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
 static ngx_quic_stream_node_t *ngx_quic_find_stream(ngx_rbtree_t *rbtree,
     ngx_uint_t key);
+static ngx_quic_stream_node_t *ngx_quic_create_stream(ngx_connection_t *c,
+    ngx_uint_t id);
 static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
     size_t size);
 static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
@@ -916,9 +920,6 @@ ngx_quic_handle_stream_frame(ngx_connect
     ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f)
 {
     ngx_buf_t               *b;
-    ngx_log_t               *log;
-    ngx_pool_t              *pool;
-    ngx_event_t             *rev, *wev;
     ngx_quic_connection_t   *qc;
     ngx_quic_stream_node_t  *sn;
 
@@ -945,69 +946,16 @@ ngx_quic_handle_stream_frame(ngx_connect
 
     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
 
-    sn = ngx_pcalloc(c->pool, sizeof(ngx_quic_stream_node_t));
+    sn = ngx_quic_create_stream(c, f->stream_id);
     if (sn == NULL) {
         return NGX_ERROR;
     }
 
-    sn->c = ngx_get_connection(-1, c->log); // TODO: free on connection termination
-    if (sn->c == NULL) {
-        return NGX_ERROR;
-    }
-
-    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
-    if (pool == NULL) {
-        /* XXX free connection */
-        return NGX_ERROR;
-    }
-
-    log = ngx_palloc(pool, sizeof(ngx_log_t));
-    if (log == NULL) {
-        /* XXX free pool and connection */
-        return NGX_ERROR;
-    }
-
-    *log = *c->log;
-    pool->log = log;
-
-    sn->c->log = log;
-    sn->c->pool = pool;
-
-    sn->c->listening = c->listening;
-    sn->c->sockaddr = c->sockaddr;
-    sn->c->local_sockaddr = c->local_sockaddr;
-
-    rev = sn->c->read;
-    wev = sn->c->write;
-
-    rev->ready = 1;
-
-    rev->log = c->log;
-    wev->log = c->log;
-
-    sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
-
-    sn->node.key = f->stream_id;
-    sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone
-    if (sn->b == NULL) {
-        return NGX_ERROR;
-    }
     b = sn->b;
 
     ngx_memcpy(b->start, f->data, f->length);
     b->last = b->start + f->length;
 
-    ngx_rbtree_insert(&qc->streams.tree, &sn->node);
-
-    sn->s.id = f->stream_id;
-    sn->s.unidirectional = (sn->s.id & 0x02) ? 1 : 0;
-    sn->s.parent = c;
-    sn->c->qs = &sn->s;
-
-    sn->c->recv = ngx_quic_stream_recv;
-    sn->c->send = ngx_quic_stream_send;
-    sn->c->send_chain = ngx_quic_stream_send_chain;
-
     qc->streams.handler(sn->c);
 
     return NGX_OK;
@@ -1184,8 +1132,34 @@ ngx_quic_send_packet(ngx_connection_t *c
 ngx_connection_t *
 ngx_quic_create_uni_stream(ngx_connection_t *c)
 {
-    /* XXX */
-    return NULL;
+    ngx_uint_t               id;
+    ngx_quic_stream_t       *qs;
+    ngx_quic_connection_t   *qc;
+    ngx_quic_stream_node_t  *sn;
+
+    qs = c->qs;
+    qc = qs->parent->quic;
+
+    /*
+     * A stream ID is a 62-bit integer that is unique for all streams
+     * on a connection.
+     *
+     * 0x3  | Server-Initiated, Unidirectional
+     */
+    id = (qc->streams.id_counter << 2) | 0x3;
+
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "creating server uni stream #%ui id %ui",
+                   qc->streams.id_counter, id);
+
+    qc->streams.id_counter++;
+
+    sn = ngx_quic_create_stream(qs->parent, id);
+    if (sn == NULL) {
+        return NULL;
+    }
+
+    return sn->c;
 }
 
 
@@ -1254,6 +1228,81 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtre
 }
 
 
+static ngx_quic_stream_node_t *
+ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id)
+{
+    ngx_log_t               *log;
+    ngx_pool_t              *pool;
+    ngx_event_t             *rev, *wev;
+    ngx_quic_connection_t   *qc;
+    ngx_quic_stream_node_t  *sn;
+
+    qc = c->quic;
+
+    sn = ngx_pcalloc(c->pool, sizeof(ngx_quic_stream_node_t));
+    if (sn == NULL) {
+        return NULL;
+    }
+
+    sn->c = ngx_get_connection(-1, c->log); // TODO: free on connection termination
+    if (sn->c == NULL) {
+        return NULL;
+    }
+
+    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
+    if (pool == NULL) {
+        /* XXX free connection */
+        // TODO: add pool cleanup handdler
+        return NULL;
+    }
+
+    log = ngx_palloc(pool, sizeof(ngx_log_t));
+    if (log == NULL) {
+        /* XXX free pool and connection */
+        return NULL;
+    }
+
+    *log = *c->log;
+    pool->log = log;
+
+    sn->c->log = log;
+    sn->c->pool = pool;
+
+    sn->c->listening = c->listening;
+    sn->c->sockaddr = c->sockaddr;
+    sn->c->local_sockaddr = c->local_sockaddr;
+
+    rev = sn->c->read;
+    wev = sn->c->write;
+
+    rev->ready = 1;
+
+    rev->log = c->log;
+    wev->log = c->log;
+
+    sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
+
+    sn->node.key =id;
+    sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone
+    if (sn->b == NULL) {
+        return NULL;
+    }
+
+    ngx_rbtree_insert(&qc->streams.tree, &sn->node);
+
+    sn->s.id = id;
+    sn->s.unidirectional = (sn->s.id & 0x02) ? 1 : 0;
+    sn->s.parent = c;
+    sn->c->qs = &sn->s;
+
+    sn->c->recv = ngx_quic_stream_recv;
+    sn->c->send = ngx_quic_stream_send;
+    sn->c->send_chain = ngx_quic_stream_send_chain;
+
+    return sn;
+}
+
+
 static ssize_t
 ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
 {