comparison src/event/ngx_event_pipe.c @ 6443:fc72784b1f52

Threads: writing via threads pools in event pipe. The "aio_write" directive is introduced, which enables use of aio for writing. Currently it is meaningful only with "aio threads". Note that aio operations can be done by both event pipe and output chain, so proper mapping between r->aio and p->aio is provided when calling ngx_event_pipe() and in output filter. In collaboration with Valentin Bartenev.
author Maxim Dounin <mdounin@mdounin.ru>
date Fri, 18 Mar 2016 06:44:49 +0300
parents d811f22033ad
children 2cd019520210
comparison
equal deleted inserted replaced
6442:6e10518f95d8 6443:fc72784b1f52
110 110
111 if (p->upstream_eof || p->upstream_error || p->upstream_done) { 111 if (p->upstream_eof || p->upstream_error || p->upstream_done) {
112 return NGX_OK; 112 return NGX_OK;
113 } 113 }
114 114
115 #if (NGX_THREADS)
116 if (p->aio) {
117 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
118 "pipe read upstream: aio");
119 return NGX_AGAIN;
120 }
121 #endif
122
115 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, 123 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
116 "pipe read upstream: %d", p->upstream->read->ready); 124 "pipe read upstream: %d", p->upstream->read->ready);
117 125
118 for ( ;; ) { 126 for ( ;; ) {
119 127
254 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, 262 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
255 "pipe temp offset: %O", p->temp_file->offset); 263 "pipe temp offset: %O", p->temp_file->offset);
256 264
257 if (rc == NGX_BUSY) { 265 if (rc == NGX_BUSY) {
258 break; 266 break;
259 }
260
261 if (rc == NGX_AGAIN) {
262 if (ngx_event_flags & NGX_USE_LEVEL_EVENT
263 && p->upstream->read->active
264 && p->upstream->read->ready)
265 {
266 if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
267 == NGX_ERROR)
268 {
269 return NGX_ABORT;
270 }
271 }
272 } 267 }
273 268
274 if (rc != NGX_OK) { 269 if (rc != NGX_OK) {
275 return rc; 270 return rc;
276 } 271 }
473 if (p->cacheable && (p->in || p->buf_to_file)) { 468 if (p->cacheable && (p->in || p->buf_to_file)) {
474 469
475 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, 470 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
476 "pipe write chain"); 471 "pipe write chain");
477 472
478 if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) { 473 rc = ngx_event_pipe_write_chain_to_temp_file(p);
479 return NGX_ABORT; 474
475 if (rc != NGX_OK) {
476 return rc;
480 } 477 }
481 } 478 }
482 479
483 return NGX_OK; 480 return NGX_OK;
484 } 481 }
497 downstream = p->downstream; 494 downstream = p->downstream;
498 495
499 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, 496 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
500 "pipe write downstream: %d", downstream->write->ready); 497 "pipe write downstream: %d", downstream->write->ready);
501 498
499 #if (NGX_THREADS)
500
501 if (p->writing) {
502 rc = ngx_event_pipe_write_chain_to_temp_file(p);
503
504 if (rc == NGX_ABORT) {
505 return NGX_ABORT;
506 }
507 }
508
509 #endif
510
502 flushed = 0; 511 flushed = 0;
503 512
504 for ( ;; ) { 513 for ( ;; ) {
505 if (p->downstream_error) { 514 if (p->downstream_error) {
506 return ngx_event_pipe_drain_chains(p); 515 return ngx_event_pipe_drain_chains(p);
528 p->downstream_error = 1; 537 p->downstream_error = 1;
529 return ngx_event_pipe_drain_chains(p); 538 return ngx_event_pipe_drain_chains(p);
530 } 539 }
531 540
532 p->out = NULL; 541 p->out = NULL;
542 }
543
544 if (p->writing) {
545 break;
533 } 546 }
534 547
535 if (p->in) { 548 if (p->in) {
536 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, 549 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
537 "pipe write downstream flush in"); 550 "pipe write downstream flush in");
606 "recycled buffer in pipe out chain"); 619 "recycled buffer in pipe out chain");
607 } 620 }
608 621
609 p->out = p->out->next; 622 p->out = p->out->next;
610 623
611 } else if (!p->cacheable && p->in) { 624 } else if (!p->cacheable && !p->writing && p->in) {
612 cl = p->in; 625 cl = p->in;
613 626
614 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0, 627 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
615 "pipe write buf ls:%d %p %z", 628 "pipe write buf ls:%d %p %z",
616 cl->buf->last_shadow, 629 cl->buf->last_shadow,
708 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p) 721 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
709 { 722 {
710 ssize_t size, bsize, n; 723 ssize_t size, bsize, n;
711 ngx_buf_t *b; 724 ngx_buf_t *b;
712 ngx_uint_t prev_last_shadow; 725 ngx_uint_t prev_last_shadow;
713 ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl; 726 ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free;
727
728 #if (NGX_THREADS)
729
730 if (p->writing) {
731
732 if (p->aio) {
733 return NGX_AGAIN;
734 }
735
736 out = p->writing;
737 p->writing = NULL;
738
739 n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
740
741 if (n == NGX_ERROR) {
742 return NGX_ABORT;
743 }
744
745 goto done;
746 }
747
748 #endif
714 749
715 if (p->buf_to_file) { 750 if (p->buf_to_file) {
716 fl.buf = p->buf_to_file; 751 out = ngx_alloc_chain_link(p->pool);
717 fl.next = p->in; 752 if (out == NULL) {
718 out = &fl; 753 return NGX_ABORT;
754 }
755
756 out->buf = p->buf_to_file;
757 out->next = p->in;
719 758
720 } else { 759 } else {
721 out = p->in; 760 out = p->in;
722 } 761 }
723 762
773 } else { 812 } else {
774 p->in = NULL; 813 p->in = NULL;
775 p->last_in = &p->in; 814 p->last_in = &p->in;
776 } 815 }
777 816
817 #if (NGX_THREADS)
818 p->temp_file->thread_write = p->thread_handler ? 1 : 0;
819 p->temp_file->file.thread_task = p->thread_task;
820 p->temp_file->file.thread_handler = p->thread_handler;
821 p->temp_file->file.thread_ctx = p->thread_ctx;
822 #endif
823
778 n = ngx_write_chain_to_temp_file(p->temp_file, out); 824 n = ngx_write_chain_to_temp_file(p->temp_file, out);
779 825
780 if (n == NGX_ERROR) { 826 if (n == NGX_ERROR) {
781 return NGX_ABORT; 827 return NGX_ABORT;
782 } 828 }
829
830 #if (NGX_THREADS)
831
832 if (n == NGX_AGAIN) {
833 p->writing = out;
834 p->thread_task = p->temp_file->file.thread_task;
835 return NGX_AGAIN;
836 }
837
838 done:
839
840 #endif
783 841
784 if (p->buf_to_file) { 842 if (p->buf_to_file) {
785 p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos; 843 p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
786 n -= p->buf_to_file->last - p->buf_to_file->pos; 844 n -= p->buf_to_file->last - p->buf_to_file->pos;
787 p->buf_to_file = NULL; 845 p->buf_to_file = NULL;