view src/http/v3/ngx_http_v3_streams.c @ 8902:925572184d4a quic

HTTP/3: adjusted QUIC connection finalization. When an HTTP/3 function returns an error in context of a QUIC stream, it's this function's responsibility now to finalize the entire QUIC connection with the right code, if required. Previously, QUIC connection finalization could be done both outside and inside such functions. The new rule follows a similar rule for logging, leads to cleaner code, and allows to provide more details about the error. While here, a few error cases are no longer treated as fatal and QUIC connection is no longer finalized in these cases. A few other cases now lead to stream reset instead of connection finalization.
author Roman Arutyunyan <arut@nginx.com>
date Mon, 18 Oct 2021 15:22:33 +0300
parents 72b304f6207c
children 0d3bf08eaac0
line wrap: on
line source


/*
 * Copyright (C) Roman Arutyunyan
 * Copyright (C) Nginx, Inc.
 */


#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>


typedef struct {
    ngx_http_v3_parse_uni_t         parse;
    ngx_int_t                       index;
} ngx_http_v3_uni_stream_t;


typedef struct {
    ngx_queue_t                     queue;
    uint64_t                        id;
    ngx_connection_t               *connection;
    ngx_uint_t                     *npushing;
} ngx_http_v3_push_t;


static void ngx_http_v3_close_uni_stream(ngx_connection_t *c);
static void ngx_http_v3_uni_read_handler(ngx_event_t *rev);
static void ngx_http_v3_dummy_write_handler(ngx_event_t *wev);
static void ngx_http_v3_push_cleanup(void *data);
static ngx_connection_t *ngx_http_v3_get_uni_stream(ngx_connection_t *c,
    ngx_uint_t type);


void
ngx_http_v3_init_uni_stream(ngx_connection_t *c)
{
    uint64_t                   n;
    ngx_http_v3_srv_conf_t    *h3scf;
    ngx_http_v3_uni_stream_t  *us;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http3 init uni stream");

    h3scf = ngx_http_v3_get_module_srv_conf(c, ngx_http_v3_module);

    n = c->quic->id >> 2;

    if (n >= h3scf->max_uni_streams) {
        ngx_http_v3_finalize_connection(c,
                                      NGX_HTTP_V3_ERR_STREAM_CREATION_ERROR,
                                      "reached maximum number of uni streams");
        ngx_http_close_connection(c);
        return;
    }

    c->quic->cancelable = 1;

    us = ngx_pcalloc(c->pool, sizeof(ngx_http_v3_uni_stream_t));
    if (us == NULL) {
        ngx_http_close_connection(c);
        return;
    }

    us->index = -1;

    c->data = us;

    c->read->handler = ngx_http_v3_uni_read_handler;
    c->write->handler = ngx_http_v3_dummy_write_handler;

    ngx_http_v3_uni_read_handler(c->read);
}


static void
ngx_http_v3_close_uni_stream(ngx_connection_t *c)
{
    ngx_pool_t                *pool;
    ngx_http_v3_session_t     *h3c;
    ngx_http_v3_uni_stream_t  *us;

    us = c->data;
    h3c = ngx_http_v3_get_session(c);

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http3 close stream");

    if (us->index >= 0) {
        h3c->known_streams[us->index] = NULL;
    }

    c->destroyed = 1;

    pool = c->pool;

    ngx_close_connection(c);

    ngx_destroy_pool(pool);
}


ngx_int_t
ngx_http_v3_register_uni_stream(ngx_connection_t *c, uint64_t type)
{
    ngx_int_t                  index;
    ngx_http_v3_session_t     *h3c;
    ngx_http_v3_uni_stream_t  *us;

    h3c = ngx_http_v3_get_session(c);

    switch (type) {

    case NGX_HTTP_V3_STREAM_ENCODER:

        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http3 encoder stream");
        index = NGX_HTTP_V3_STREAM_CLIENT_ENCODER;
        break;

    case NGX_HTTP_V3_STREAM_DECODER:

        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http3 decoder stream");
        index = NGX_HTTP_V3_STREAM_CLIENT_DECODER;
        break;

    case NGX_HTTP_V3_STREAM_CONTROL:

        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http3 control stream");
        index = NGX_HTTP_V3_STREAM_CLIENT_CONTROL;

        break;

    default:

        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http3 stream 0x%02xL", type);

        if (h3c->known_streams[NGX_HTTP_V3_STREAM_CLIENT_ENCODER] == NULL
            || h3c->known_streams[NGX_HTTP_V3_STREAM_CLIENT_DECODER] == NULL
            || h3c->known_streams[NGX_HTTP_V3_STREAM_CLIENT_CONTROL] == NULL)
        {
            ngx_log_error(NGX_LOG_INFO, c->log, 0, "missing mandatory stream");
            return NGX_HTTP_V3_ERR_STREAM_CREATION_ERROR;
        }

        index = -1;
    }

    if (index >= 0) {
        if (h3c->known_streams[index]) {
            ngx_log_error(NGX_LOG_INFO, c->log, 0, "stream exists");
            return NGX_HTTP_V3_ERR_STREAM_CREATION_ERROR;
        }

        h3c->known_streams[index] = c;

        us = c->data;
        us->index = index;
    }

    return NGX_OK;
}


static void
ngx_http_v3_uni_read_handler(ngx_event_t *rev)
{
    u_char                     buf[128];
    ssize_t                    n;
    ngx_buf_t                  b;
    ngx_int_t                  rc;
    ngx_connection_t          *c;
    ngx_http_v3_session_t     *h3c;
    ngx_http_v3_uni_stream_t  *us;

    c = rev->data;
    us = c->data;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http3 read handler");

    ngx_memzero(&b, sizeof(ngx_buf_t));

    while (rev->ready) {

        n = c->recv(c, buf, sizeof(buf));

        if (n == NGX_ERROR) {
            rc = NGX_HTTP_V3_ERR_INTERNAL_ERROR;
            goto failed;
        }

        if (n == 0) {
            if (us->index >= 0) {
                rc = NGX_HTTP_V3_ERR_CLOSED_CRITICAL_STREAM;
                goto failed;
            }

            ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http3 read eof");
            ngx_http_v3_close_uni_stream(c);
            return;
        }

        if (n == NGX_AGAIN) {
            break;
        }

        b.pos = buf;
        b.last = buf + n;

        h3c = ngx_http_v3_get_session(c);
        h3c->total_bytes += n;

        if (ngx_http_v3_check_flood(c) != NGX_OK) {
            ngx_http_v3_close_uni_stream(c);
            return;
        }

        rc = ngx_http_v3_parse_uni(c, &us->parse, &b);

        if (rc == NGX_DONE) {
            ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                           "http3 read done");
            ngx_http_v3_close_uni_stream(c);
            return;
        }

        if (rc > 0) {
            goto failed;
        }

        if (rc != NGX_AGAIN) {
            rc = NGX_HTTP_V3_ERR_GENERAL_PROTOCOL_ERROR;
            goto failed;
        }
    }

    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        rc = NGX_HTTP_V3_ERR_INTERNAL_ERROR;
        goto failed;
    }

    return;

failed:

    ngx_http_v3_finalize_connection(c, rc, "stream error");
    ngx_http_v3_close_uni_stream(c);
}


static void
ngx_http_v3_dummy_write_handler(ngx_event_t *wev)
{
    ngx_connection_t  *c;

    c = wev->data;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http3 dummy write handler");

    if (ngx_handle_write_event(wev, 0) != NGX_OK) {
        ngx_http_v3_finalize_connection(c, NGX_HTTP_V3_ERR_INTERNAL_ERROR,
                                        NULL);
        ngx_http_v3_close_uni_stream(c);
    }
}


/* XXX async & buffered stream writes */

ngx_connection_t *
ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id)
{
    u_char                 *p, buf[NGX_HTTP_V3_VARLEN_INT_LEN * 2];
    size_t                  n;
    ngx_connection_t       *sc;
    ngx_pool_cleanup_t     *cln;
    ngx_http_v3_push_t     *push;
    ngx_http_v3_session_t  *h3c;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http3 create push stream id:%uL", push_id);

    sc = ngx_quic_open_stream(c, 0);
    if (sc == NULL) {
        goto failed;
    }

    p = buf;
    p = (u_char *) ngx_http_v3_encode_varlen_int(p, NGX_HTTP_V3_STREAM_PUSH);
    p = (u_char *) ngx_http_v3_encode_varlen_int(p, push_id);
    n = p - buf;

    h3c = ngx_http_v3_get_session(c);
    h3c->total_bytes += n;

    if (sc->send(sc, buf, n) != (ssize_t) n) {
        goto failed;
    }

    cln = ngx_pool_cleanup_add(sc->pool, sizeof(ngx_http_v3_push_t));
    if (cln == NULL) {
        goto failed;
    }

    h3c->npushing++;

    cln->handler = ngx_http_v3_push_cleanup;

    push = cln->data;
    push->id = push_id;
    push->connection = sc;
    push->npushing = &h3c->npushing;

    ngx_queue_insert_tail(&h3c->pushing, &push->queue);

    return sc;

failed:

    ngx_log_error(NGX_LOG_ERR, c->log, 0, "failed to create push stream");

    ngx_http_v3_finalize_connection(c, NGX_HTTP_V3_ERR_STREAM_CREATION_ERROR,
                                    "failed to create push stream");
    if (sc) {
        ngx_http_v3_close_uni_stream(sc);
    }

    return NULL;
}


static void
ngx_http_v3_push_cleanup(void *data)
{
    ngx_http_v3_push_t  *push = data;

    ngx_queue_remove(&push->queue);
    (*push->npushing)--;
}


static ngx_connection_t *
ngx_http_v3_get_uni_stream(ngx_connection_t *c, ngx_uint_t type)
{
    u_char                     buf[NGX_HTTP_V3_VARLEN_INT_LEN];
    size_t                     n;
    ngx_int_t                  index;
    ngx_connection_t          *sc;
    ngx_http_v3_session_t     *h3c;
    ngx_http_v3_uni_stream_t  *us;

    switch (type) {
    case NGX_HTTP_V3_STREAM_ENCODER:
        index = NGX_HTTP_V3_STREAM_SERVER_ENCODER;
        break;
    case NGX_HTTP_V3_STREAM_DECODER:
        index = NGX_HTTP_V3_STREAM_SERVER_DECODER;
        break;
    case NGX_HTTP_V3_STREAM_CONTROL:
        index = NGX_HTTP_V3_STREAM_SERVER_CONTROL;
        break;
    default:
        index = -1;
    }

    h3c = ngx_http_v3_get_session(c);

    if (index >= 0) {
        if (h3c->known_streams[index]) {
            return h3c->known_streams[index];
        }
    }

    sc = ngx_quic_open_stream(c, 0);
    if (sc == NULL) {
        goto failed;
    }

    sc->quic->cancelable = 1;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http3 create uni stream, type:%ui", type);

    us = ngx_pcalloc(sc->pool, sizeof(ngx_http_v3_uni_stream_t));
    if (us == NULL) {
        goto failed;
    }

    us->index = index;

    sc->data = us;

    sc->read->handler = ngx_http_v3_uni_read_handler;
    sc->write->handler = ngx_http_v3_dummy_write_handler;

    if (index >= 0) {
        h3c->known_streams[index] = sc;
    }

    n = (u_char *) ngx_http_v3_encode_varlen_int(buf, type) - buf;

    h3c = ngx_http_v3_get_session(c);
    h3c->total_bytes += n;

    if (sc->send(sc, buf, n) != (ssize_t) n) {
        goto failed;
    }

    return sc;

failed:

    ngx_log_error(NGX_LOG_ERR, c->log, 0, "failed to create server stream");

    ngx_http_v3_finalize_connection(c, NGX_HTTP_V3_ERR_STREAM_CREATION_ERROR,
                                    "failed to create server stream");
    if (sc) {
        ngx_http_v3_close_uni_stream(sc);
    }

    return NULL;
}


ngx_int_t
ngx_http_v3_send_settings(ngx_connection_t *c)
{
    u_char                  *p, buf[NGX_HTTP_V3_VARLEN_INT_LEN * 6];
    size_t                   n;
    ngx_connection_t        *cc;
    ngx_http_v3_session_t   *h3c;
    ngx_http_v3_srv_conf_t  *h3scf;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http3 send settings");

    cc = ngx_http_v3_get_uni_stream(c, NGX_HTTP_V3_STREAM_CONTROL);
    if (cc == NULL) {
        return NGX_ERROR;
    }

    h3scf = ngx_http_v3_get_module_srv_conf(c, ngx_http_v3_module);

    n = ngx_http_v3_encode_varlen_int(NULL,
                                      NGX_HTTP_V3_PARAM_MAX_TABLE_CAPACITY);
    n += ngx_http_v3_encode_varlen_int(NULL, h3scf->max_table_capacity);
    n += ngx_http_v3_encode_varlen_int(NULL, NGX_HTTP_V3_PARAM_BLOCKED_STREAMS);
    n += ngx_http_v3_encode_varlen_int(NULL, h3scf->max_blocked_streams);

    p = (u_char *) ngx_http_v3_encode_varlen_int(buf,
                                                 NGX_HTTP_V3_FRAME_SETTINGS);
    p = (u_char *) ngx_http_v3_encode_varlen_int(p, n);
    p = (u_char *) ngx_http_v3_encode_varlen_int(p,
                                         NGX_HTTP_V3_PARAM_MAX_TABLE_CAPACITY);
    p = (u_char *) ngx_http_v3_encode_varlen_int(p, h3scf->max_table_capacity);
    p = (u_char *) ngx_http_v3_encode_varlen_int(p,
                                            NGX_HTTP_V3_PARAM_BLOCKED_STREAMS);
    p = (u_char *) ngx_http_v3_encode_varlen_int(p, h3scf->max_blocked_streams);
    n = p - buf;

    h3c = ngx_http_v3_get_session(c);
    h3c->total_bytes += n;

    if (cc->send(cc, buf, n) != (ssize_t) n) {
        goto failed;
    }

    return NGX_OK;

failed:

    ngx_log_error(NGX_LOG_ERR, c->log, 0, "failed to send settings");

    ngx_http_v3_finalize_connection(c, NGX_HTTP_V3_ERR_EXCESSIVE_LOAD,
                                    "failed to send settings");
    ngx_http_v3_close_uni_stream(cc);

    return NGX_ERROR;
}


ngx_int_t
ngx_http_v3_send_goaway(ngx_connection_t *c, uint64_t id)
{
    u_char                 *p, buf[NGX_HTTP_V3_VARLEN_INT_LEN * 3];
    size_t                  n;
    ngx_connection_t       *cc;
    ngx_http_v3_session_t  *h3c;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http3 send goaway %uL", id);

    cc = ngx_http_v3_get_uni_stream(c, NGX_HTTP_V3_STREAM_CONTROL);
    if (cc == NULL) {
        return NGX_ERROR;
    }

    n = ngx_http_v3_encode_varlen_int(NULL, id);
    p = (u_char *) ngx_http_v3_encode_varlen_int(buf, NGX_HTTP_V3_FRAME_GOAWAY);
    p = (u_char *) ngx_http_v3_encode_varlen_int(p, n);
    p = (u_char *) ngx_http_v3_encode_varlen_int(p, id);
    n = p - buf;

    h3c = ngx_http_v3_get_session(c);
    h3c->total_bytes += n;

    if (cc->send(cc, buf, n) != (ssize_t) n) {
        goto failed;
    }

    return NGX_OK;

failed:

    ngx_log_error(NGX_LOG_ERR, c->log, 0, "failed to send goaway");

    ngx_http_v3_finalize_connection(c, NGX_HTTP_V3_ERR_EXCESSIVE_LOAD,
                                    "failed to send goaway");
    ngx_http_v3_close_uni_stream(cc);

    return NGX_ERROR;
}


ngx_int_t
ngx_http_v3_send_ack_section(ngx_connection_t *c, ngx_uint_t stream_id)
{
    u_char                  buf[NGX_HTTP_V3_PREFIX_INT_LEN];
    size_t                  n;
    ngx_connection_t       *dc;
    ngx_http_v3_session_t  *h3c;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http3 send section acknowledgement %ui", stream_id);

    dc = ngx_http_v3_get_uni_stream(c, NGX_HTTP_V3_STREAM_DECODER);
    if (dc == NULL) {
        return NGX_ERROR;
    }

    buf[0] = 0x80;
    n = (u_char *) ngx_http_v3_encode_prefix_int(buf, stream_id, 7) - buf;

    h3c = ngx_http_v3_get_session(c);
    h3c->total_bytes += n;

    if (dc->send(dc, buf, n) != (ssize_t) n) {
        goto failed;
    }

    return NGX_OK;

failed:

    ngx_log_error(NGX_LOG_ERR, c->log, 0,
                  "failed to send section acknowledgement");

    ngx_http_v3_finalize_connection(c, NGX_HTTP_V3_ERR_EXCESSIVE_LOAD,
                                    "failed to send section acknowledgement");
    ngx_http_v3_close_uni_stream(dc);

    return NGX_ERROR;
}


ngx_int_t
ngx_http_v3_send_cancel_stream(ngx_connection_t *c, ngx_uint_t stream_id)
{
    u_char                  buf[NGX_HTTP_V3_PREFIX_INT_LEN];
    size_t                  n;
    ngx_connection_t       *dc;
    ngx_http_v3_session_t  *h3c;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http3 send stream cancellation %ui", stream_id);

    dc = ngx_http_v3_get_uni_stream(c, NGX_HTTP_V3_STREAM_DECODER);
    if (dc == NULL) {
        return NGX_ERROR;
    }

    buf[0] = 0x40;
    n = (u_char *) ngx_http_v3_encode_prefix_int(buf, stream_id, 6) - buf;

    h3c = ngx_http_v3_get_session(c);
    h3c->total_bytes += n;

    if (dc->send(dc, buf, n) != (ssize_t) n) {
        goto failed;
    }

    return NGX_OK;

failed:

    ngx_log_error(NGX_LOG_ERR, c->log, 0, "failed to send stream cancellation");

    ngx_http_v3_finalize_connection(c, NGX_HTTP_V3_ERR_EXCESSIVE_LOAD,
                                    "failed to send stream cancellation");
    ngx_http_v3_close_uni_stream(dc);

    return NGX_ERROR;
}


ngx_int_t
ngx_http_v3_send_inc_insert_count(ngx_connection_t *c, ngx_uint_t inc)
{
    u_char                  buf[NGX_HTTP_V3_PREFIX_INT_LEN];
    size_t                  n;
    ngx_connection_t       *dc;
    ngx_http_v3_session_t  *h3c;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http3 send insert count increment %ui", inc);

    dc = ngx_http_v3_get_uni_stream(c, NGX_HTTP_V3_STREAM_DECODER);
    if (dc == NULL) {
        return NGX_ERROR;
    }

    buf[0] = 0;
    n = (u_char *) ngx_http_v3_encode_prefix_int(buf, inc, 6) - buf;

    h3c = ngx_http_v3_get_session(c);
    h3c->total_bytes += n;

    if (dc->send(dc, buf, n) != (ssize_t) n) {
        goto failed;
    }

    return NGX_OK;

failed:

    ngx_log_error(NGX_LOG_ERR, c->log, 0,
                  "failed to send insert count increment");

    ngx_http_v3_finalize_connection(c, NGX_HTTP_V3_ERR_EXCESSIVE_LOAD,
                                    "failed to send insert count increment");
    ngx_http_v3_close_uni_stream(dc);

    return NGX_ERROR;
}


ngx_int_t
ngx_http_v3_set_max_push_id(ngx_connection_t *c, uint64_t max_push_id)
{
    ngx_http_v3_session_t  *h3c;

    h3c = ngx_http_v3_get_session(c);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http3 MAX_PUSH_ID:%uL", max_push_id);

    if (h3c->max_push_id != (uint64_t) -1 && max_push_id < h3c->max_push_id) {
        return NGX_HTTP_V3_ERR_ID_ERROR;
    }

    h3c->max_push_id = max_push_id;

    return NGX_OK;
}


ngx_int_t
ngx_http_v3_goaway(ngx_connection_t *c, uint64_t push_id)
{
    ngx_http_v3_session_t  *h3c;

    h3c = ngx_http_v3_get_session(c);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http3 GOAWAY:%uL", push_id);

    h3c->goaway_push_id = push_id;

    return NGX_OK;
}


ngx_int_t
ngx_http_v3_cancel_push(ngx_connection_t *c, uint64_t push_id)
{
    ngx_queue_t            *q;
    ngx_http_request_t     *r;
    ngx_http_v3_push_t     *push;
    ngx_http_v3_session_t  *h3c;

    h3c = ngx_http_v3_get_session(c);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http3 CANCEL_PUSH:%uL", push_id);

    if (push_id >= h3c->next_push_id) {
        return NGX_HTTP_V3_ERR_ID_ERROR;
    }

    for (q = ngx_queue_head(&h3c->pushing);
         q != ngx_queue_sentinel(&h3c->pushing);
         q = ngx_queue_next(&h3c->pushing))
    {
        push = (ngx_http_v3_push_t *) q;

        if (push->id != push_id) {
            continue;
        }

        r = push->connection->data;

        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http3 cancel push");

        ngx_http_finalize_request(r, NGX_HTTP_CLOSE);

        break;
    }

    return NGX_OK;
}


ngx_int_t
ngx_http_v3_cancel_stream(ngx_connection_t *c, ngx_uint_t stream_id)
{
    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http3 cancel stream %ui", stream_id);

    /* we do not use dynamic tables */

    return NGX_OK;
}