changeset 7349:f6047a579ca1

gRPC: improved keepalive handling. The code is now able to parse additional control frames after the response is received, and can send control frames as well. This fixes keepalive problems as observed with grpc-c, which can send window update and ping frames after the response, see http://mailman.nginx.org/pipermail/nginx/2018-August/056620.html.
author Maxim Dounin <mdounin@mdounin.ru>
date Mon, 03 Sep 2018 19:34:01 +0300
parents f6e7831a17d4
children 67c6cb7f477c
files src/http/modules/ngx_http_grpc_module.c
diffstat 1 files changed, 67 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/src/http/modules/ngx_http_grpc_module.c
+++ b/src/http/modules/ngx_http_grpc_module.c
@@ -111,6 +111,7 @@ typedef struct {
     unsigned                   output_closed:1;
     unsigned                   parsing_headers:1;
     unsigned                   end_stream:1;
+    unsigned                   done:1;
     unsigned                   status:1;
 
     ngx_http_request_t        *request;
@@ -1074,6 +1075,7 @@ ngx_http_grpc_reinit_request(ngx_http_re
     ctx->output_closed = 0;
     ctx->parsing_headers = 0;
     ctx->end_stream = 0;
+    ctx->done = 0;
     ctx->status = 0;
     ctx->connection = NULL;
 
@@ -1093,6 +1095,7 @@ ngx_http_grpc_body_output_filter(void *d
     ngx_int_t               rc;
     ngx_uint_t              next, last;
     ngx_chain_t            *cl, *out, **ll;
+    ngx_http_upstream_t    *u;
     ngx_http_grpc_ctx_t    *ctx;
     ngx_http_grpc_frame_t  *f;
 
@@ -1407,6 +1410,28 @@ ngx_http_grpc_body_output_filter(void *d
         rc = NGX_AGAIN;
     }
 
+    if (ctx->done) {
+
+        /*
+         * We have already got the response and were sending some additional
+         * control frames.  Even if there is still something unsent, stop
+         * here anyway.
+         */
+
+        u = r->upstream;
+        u->length = 0;
+
+        if (ctx->in == NULL
+            && ctx->out == NULL
+            && ctx->output_closed
+            && ctx->state == ngx_http_grpc_st_start)
+        {
+            u->keepalive = 1;
+        }
+
+        ngx_post_event(u->peer.connection->read, &ngx_posted_events);
+    }
+
     return rc;
 }
 
@@ -1832,6 +1857,33 @@ ngx_http_grpc_filter(void *data, ssize_t
             rc = ngx_http_grpc_parse_frame(r, ctx, b);
 
             if (rc == NGX_AGAIN) {
+
+                if (ctx->done) {
+
+                    /*
+                     * We have finished parsing the response and the
+                     * remaining control frames.  If there are unsent
+                     * control frames, post a write event to send them.
+                     */
+
+                    if (ctx->out) {
+                        ngx_post_event(u->peer.connection->write,
+                                       &ngx_posted_events);
+                        return NGX_AGAIN;
+                    }
+
+                    u->length = 0;
+
+                    if (ctx->in == NULL
+                        && ctx->output_closed
+                        && ctx->state == ngx_http_grpc_st_start)
+                    {
+                        u->keepalive = 1;
+                    }
+
+                    break;
+                }
+
                 return NGX_AGAIN;
             }
 
@@ -1898,6 +1950,13 @@ ngx_http_grpc_filter(void *data, ssize_t
                 return NGX_ERROR;
             }
 
+            if (ctx->stream_id && ctx->done) {
+                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+                              "upstream sent frame for closed stream %ui",
+                              ctx->stream_id);
+                return NGX_ERROR;
+            }
+
             ctx->padding = 0;
         }
 
@@ -1914,17 +1973,7 @@ ngx_http_grpc_filter(void *data, ssize_t
             ctx->state = ngx_http_grpc_st_start;
 
             if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
-                u->length = 0;
-
-                if (ctx->in == NULL
-                    && ctx->out == NULL
-                    && ctx->output_closed
-                    && b->last == b->pos)
-                {
-                    u->keepalive = 1;
-                }
-
-                break;
+                ctx->done = 1;
             }
 
             continue;
@@ -2094,17 +2143,8 @@ ngx_http_grpc_filter(void *data, ssize_t
                                    "grpc trailer done");
 
                     if (ctx->end_stream) {
-                        u->length = 0;
-
-                        if (ctx->in == NULL
-                            && ctx->out == NULL
-                            && ctx->output_closed
-                            && b->last == b->pos)
-                        {
-                            u->keepalive = 1;
-                        }
-
-                        return NGX_OK;
+                        ctx->done = 1;
+                        break;
                     }
 
                     ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
@@ -2121,6 +2161,10 @@ ngx_http_grpc_filter(void *data, ssize_t
                 return NGX_ERROR;
             }
 
+            if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
+                continue;
+            }
+
             /* rc == NGX_AGAIN */
 
             if (ctx->rest == 0) {
@@ -2237,17 +2281,7 @@ ngx_http_grpc_filter(void *data, ssize_t
         ctx->state = ngx_http_grpc_st_start;
 
         if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
-            u->length = 0;
-
-            if (ctx->in == NULL
-                && ctx->out == NULL
-                && ctx->output_closed
-                && b->last == b->pos)
-            {
-                u->keepalive = 1;
-            }
-
-            break;
+            ctx->done = 1;
         }
     }