comparison src/http/ngx_http_upstream.c @ 6443:fc72784b1f52

Threads: writing via threads pools in event pipe. The "aio_write" directive is introduced, which enables use of aio for writing. Currently it is meaningful only with "aio threads". Note that aio operations can be done by both event pipe and output chain, so proper mapping between r->aio and p->aio is provided when calling ngx_event_pipe() and in output filter. In collaboration with Valentin Bartenev.
author Maxim Dounin <mdounin@mdounin.ru>
date Fri, 18 Mar 2016 06:44:49 +0300
parents 81329f6a4254
children 984687f25998
comparison
equal deleted inserted replaced
6442:6e10518f95d8 6443:fc72784b1f52
74 ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r, 74 ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r,
75 ngx_uint_t do_write); 75 ngx_uint_t do_write);
76 static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data); 76 static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data);
77 static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data, 77 static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data,
78 ssize_t bytes); 78 ssize_t bytes);
79 #if (NGX_THREADS)
80 static ngx_int_t ngx_http_upstream_thread_handler(ngx_thread_task_t *task,
81 ngx_file_t *file);
82 static void ngx_http_upstream_thread_event_handler(ngx_event_t *ev);
83 #endif
84 static ngx_int_t ngx_http_upstream_output_filter(void *data,
85 ngx_chain_t *chain);
79 static void ngx_http_upstream_process_downstream(ngx_http_request_t *r); 86 static void ngx_http_upstream_process_downstream(ngx_http_request_t *r);
80 static void ngx_http_upstream_process_upstream(ngx_http_request_t *r, 87 static void ngx_http_upstream_process_upstream(ngx_http_request_t *r,
81 ngx_http_upstream_t *u); 88 ngx_http_upstream_t *u);
82 static void ngx_http_upstream_process_request(ngx_http_request_t *r, 89 static void ngx_http_upstream_process_request(ngx_http_request_t *r,
83 ngx_http_upstream_t *u); 90 ngx_http_upstream_t *u);
2868 2875
2869 #endif 2876 #endif
2870 2877
2871 p = u->pipe; 2878 p = u->pipe;
2872 2879
2873 p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter; 2880 p->output_filter = ngx_http_upstream_output_filter;
2874 p->output_ctx = r; 2881 p->output_ctx = r;
2875 p->tag = u->output.tag; 2882 p->tag = u->output.tag;
2876 p->bufs = u->conf->bufs; 2883 p->bufs = u->conf->bufs;
2877 p->busy_size = u->conf->busy_buffers_size; 2884 p->busy_size = u->conf->busy_buffers_size;
2878 p->upstream = u->peer.connection; 2885 p->upstream = u->peer.connection;
2911 } 2918 }
2912 2919
2913 p->max_temp_file_size = u->conf->max_temp_file_size; 2920 p->max_temp_file_size = u->conf->max_temp_file_size;
2914 p->temp_file_write_size = u->conf->temp_file_write_size; 2921 p->temp_file_write_size = u->conf->temp_file_write_size;
2915 2922
2923 #if (NGX_THREADS)
2924 if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) {
2925 p->thread_handler = ngx_http_upstream_thread_handler;
2926 p->thread_ctx = r;
2927 }
2928 #endif
2929
2916 p->preread_bufs = ngx_alloc_chain_link(r->pool); 2930 p->preread_bufs = ngx_alloc_chain_link(r->pool);
2917 if (p->preread_bufs == NULL) { 2931 if (p->preread_bufs == NULL) {
2918 ngx_http_upstream_finalize_request(r, u, NGX_ERROR); 2932 ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
2919 return; 2933 return;
2920 } 2934 }
3485 3499
3486 return NGX_OK; 3500 return NGX_OK;
3487 } 3501 }
3488 3502
3489 3503
3504 #if (NGX_THREADS)
3505
3506 static ngx_int_t
3507 ngx_http_upstream_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
3508 {
3509 ngx_str_t name;
3510 ngx_event_pipe_t *p;
3511 ngx_thread_pool_t *tp;
3512 ngx_http_request_t *r;
3513 ngx_http_core_loc_conf_t *clcf;
3514
3515 r = file->thread_ctx;
3516 p = r->upstream->pipe;
3517
3518 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
3519 tp = clcf->thread_pool;
3520
3521 if (tp == NULL) {
3522 if (ngx_http_complex_value(r, clcf->thread_pool_value, &name)
3523 != NGX_OK)
3524 {
3525 return NGX_ERROR;
3526 }
3527
3528 tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name);
3529
3530 if (tp == NULL) {
3531 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
3532 "thread pool \"%V\" not found", &name);
3533 return NGX_ERROR;
3534 }
3535 }
3536
3537 task->event.data = r;
3538 task->event.handler = ngx_http_upstream_thread_event_handler;
3539
3540 if (ngx_thread_task_post(tp, task) != NGX_OK) {
3541 return NGX_ERROR;
3542 }
3543
3544 r->main->blocked++;
3545 r->aio = 1;
3546 p->aio = 1;
3547
3548 return NGX_OK;
3549 }
3550
3551
3552 static void
3553 ngx_http_upstream_thread_event_handler(ngx_event_t *ev)
3554 {
3555 ngx_connection_t *c;
3556 ngx_http_request_t *r;
3557
3558 r = ev->data;
3559 c = r->connection;
3560
3561 ngx_http_set_log_request(c->log, r);
3562
3563 ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
3564 "http upstream thread: \"%V?%V\"", &r->uri, &r->args);
3565
3566 r->main->blocked--;
3567 r->aio = 0;
3568
3569 r->write_event_handler(r);
3570
3571 ngx_http_run_posted_requests(c);
3572 }
3573
3574 #endif
3575
3576
3577 static ngx_int_t
3578 ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)
3579 {
3580 ngx_int_t rc;
3581 ngx_event_pipe_t *p;
3582 ngx_http_request_t *r;
3583
3584 r = data;
3585 p = r->upstream->pipe;
3586
3587 rc = ngx_http_output_filter(r, chain);
3588
3589 p->aio = r->aio;
3590
3591 return rc;
3592 }
3593
3594
3490 static void 3595 static void
3491 ngx_http_upstream_process_downstream(ngx_http_request_t *r) 3596 ngx_http_upstream_process_downstream(ngx_http_request_t *r)
3492 { 3597 {
3493 ngx_event_t *wev; 3598 ngx_event_t *wev;
3494 ngx_connection_t *c; 3599 ngx_connection_t *c;
3503 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, 3608 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
3504 "http upstream process downstream"); 3609 "http upstream process downstream");
3505 3610
3506 c->log->action = "sending to client"; 3611 c->log->action = "sending to client";
3507 3612
3613 #if (NGX_THREADS)
3614 p->aio = r->aio;
3615 #endif
3616
3508 if (wev->timedout) { 3617 if (wev->timedout) {
3509 3618
3510 if (wev->delayed) { 3619 if (wev->delayed) {
3511 3620
3512 wev->timedout = 0; 3621 wev->timedout = 0;
3631 { 3740 {
3632 ngx_temp_file_t *tf; 3741 ngx_temp_file_t *tf;
3633 ngx_event_pipe_t *p; 3742 ngx_event_pipe_t *p;
3634 3743
3635 p = u->pipe; 3744 p = u->pipe;
3745
3746 #if (NGX_THREADS)
3747 if (p->writing) {
3748 return;
3749 }
3750 #endif
3636 3751
3637 if (u->peer.connection) { 3752 if (u->peer.connection) {
3638 3753
3639 if (u->store) { 3754 if (u->store) {
3640 3755