diff src/event/ngx_event_pipe.c @ 6443:fc72784b1f52

Threads: writing via threads pools in event pipe. The "aio_write" directive is introduced, which enables use of aio for writing. Currently it is meaningful only with "aio threads". Note that aio operations can be done by both event pipe and output chain, so proper mapping between r->aio and p->aio is provided when calling ngx_event_pipe() and in output filter. In collaboration with Valentin Bartenev.
author Maxim Dounin <mdounin@mdounin.ru>
date Fri, 18 Mar 2016 06:44:49 +0300
parents d811f22033ad
children 2cd019520210
line wrap: on
line diff
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -112,6 +112,14 @@ ngx_event_pipe_read_upstream(ngx_event_p
         return NGX_OK;
     }
 
+#if (NGX_THREADS)
+    if (p->aio) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
+                       "pipe read upstream: aio");
+        return NGX_AGAIN;
+    }
+#endif
+
     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    "pipe read upstream: %d", p->upstream->read->ready);
 
@@ -258,19 +266,6 @@ ngx_event_pipe_read_upstream(ngx_event_p
                     break;
                 }
 
-                if (rc == NGX_AGAIN) {
-                    if (ngx_event_flags & NGX_USE_LEVEL_EVENT
-                        && p->upstream->read->active
-                        && p->upstream->read->ready)
-                    {
-                        if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
-                            == NGX_ERROR)
-                        {
-                            return NGX_ABORT;
-                        }
-                    }
-                }
-
                 if (rc != NGX_OK) {
                     return rc;
                 }
@@ -475,8 +470,10 @@ ngx_event_pipe_read_upstream(ngx_event_p
         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                        "pipe write chain");
 
-        if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
-            return NGX_ABORT;
+        rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+        if (rc != NGX_OK) {
+            return rc;
         }
     }
 
@@ -499,6 +496,18 @@ ngx_event_pipe_write_to_downstream(ngx_e
     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    "pipe write downstream: %d", downstream->write->ready);
 
+#if (NGX_THREADS)
+
+    if (p->writing) {
+        rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+        if (rc == NGX_ABORT) {
+            return NGX_ABORT;
+        }
+    }
+
+#endif
+
     flushed = 0;
 
     for ( ;; ) {
@@ -532,6 +541,10 @@ ngx_event_pipe_write_to_downstream(ngx_e
                 p->out = NULL;
             }
 
+            if (p->writing) {
+                break;
+            }
+
             if (p->in) {
                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                                "pipe write downstream flush in");
@@ -608,7 +621,7 @@ ngx_event_pipe_write_to_downstream(ngx_e
 
                 p->out = p->out->next;
 
-            } else if (!p->cacheable && p->in) {
+            } else if (!p->cacheable && !p->writing && p->in) {
                 cl = p->in;
 
                 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
@@ -710,12 +723,38 @@ ngx_event_pipe_write_chain_to_temp_file(
     ssize_t       size, bsize, n;
     ngx_buf_t    *b;
     ngx_uint_t    prev_last_shadow;
-    ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
+    ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free;
+
+#if (NGX_THREADS)
+
+    if (p->writing) {
+
+        if (p->aio) {
+            return NGX_AGAIN;
+        }
+
+        out = p->writing;
+        p->writing = NULL;
+
+        n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
+
+        if (n == NGX_ERROR) {
+            return NGX_ABORT;
+        }
+
+        goto done;
+    }
+
+#endif
 
     if (p->buf_to_file) {
-        fl.buf = p->buf_to_file;
-        fl.next = p->in;
-        out = &fl;
+        out = ngx_alloc_chain_link(p->pool);
+        if (out == NULL) {
+            return NGX_ABORT;
+        }
+
+        out->buf = p->buf_to_file;
+        out->next = p->in;
 
     } else {
         out = p->in;
@@ -775,12 +814,31 @@ ngx_event_pipe_write_chain_to_temp_file(
         p->last_in = &p->in;
     }
 
+#if (NGX_THREADS)
+    p->temp_file->thread_write = p->thread_handler ? 1 : 0;
+    p->temp_file->file.thread_task = p->thread_task;
+    p->temp_file->file.thread_handler = p->thread_handler;
+    p->temp_file->file.thread_ctx = p->thread_ctx;
+#endif
+
     n = ngx_write_chain_to_temp_file(p->temp_file, out);
 
     if (n == NGX_ERROR) {
         return NGX_ABORT;
     }
 
+#if (NGX_THREADS)
+
+    if (n == NGX_AGAIN) {
+        p->writing = out;
+        p->thread_task = p->temp_file->file.thread_task;
+        return NGX_AGAIN;
+    }
+
+done:
+
+#endif
+
     if (p->buf_to_file) {
         p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
         n -= p->buf_to_file->last - p->buf_to_file->pos;