Mercurial > hg > nginx
comparison src/http/ngx_http_upstream.c @ 6443:fc72784b1f52
Threads: writing via threads pools in event pipe.
The "aio_write" directive is introduced, which enables use of aio
for writing. Currently it is meaningful only with "aio threads".
Note that aio operations can be done by both event pipe and output
chain, so proper mapping between r->aio and p->aio is provided when
calling ngx_event_pipe() and in output filter.
In collaboration with Valentin Bartenev.
author | Maxim Dounin <mdounin@mdounin.ru> |
---|---|
date | Fri, 18 Mar 2016 06:44:49 +0300 |
parents | 81329f6a4254 |
children | 984687f25998 |
comparison
equal
deleted
inserted
replaced
6442:6e10518f95d8 | 6443:fc72784b1f52 |
---|---|
74 ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r, | 74 ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r, |
75 ngx_uint_t do_write); | 75 ngx_uint_t do_write); |
76 static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data); | 76 static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data); |
77 static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data, | 77 static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data, |
78 ssize_t bytes); | 78 ssize_t bytes); |
79 #if (NGX_THREADS) | |
80 static ngx_int_t ngx_http_upstream_thread_handler(ngx_thread_task_t *task, | |
81 ngx_file_t *file); | |
82 static void ngx_http_upstream_thread_event_handler(ngx_event_t *ev); | |
83 #endif | |
84 static ngx_int_t ngx_http_upstream_output_filter(void *data, | |
85 ngx_chain_t *chain); | |
79 static void ngx_http_upstream_process_downstream(ngx_http_request_t *r); | 86 static void ngx_http_upstream_process_downstream(ngx_http_request_t *r); |
80 static void ngx_http_upstream_process_upstream(ngx_http_request_t *r, | 87 static void ngx_http_upstream_process_upstream(ngx_http_request_t *r, |
81 ngx_http_upstream_t *u); | 88 ngx_http_upstream_t *u); |
82 static void ngx_http_upstream_process_request(ngx_http_request_t *r, | 89 static void ngx_http_upstream_process_request(ngx_http_request_t *r, |
83 ngx_http_upstream_t *u); | 90 ngx_http_upstream_t *u); |
2868 | 2875 |
2869 #endif | 2876 #endif |
2870 | 2877 |
2871 p = u->pipe; | 2878 p = u->pipe; |
2872 | 2879 |
2873 p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter; | 2880 p->output_filter = ngx_http_upstream_output_filter; |
2874 p->output_ctx = r; | 2881 p->output_ctx = r; |
2875 p->tag = u->output.tag; | 2882 p->tag = u->output.tag; |
2876 p->bufs = u->conf->bufs; | 2883 p->bufs = u->conf->bufs; |
2877 p->busy_size = u->conf->busy_buffers_size; | 2884 p->busy_size = u->conf->busy_buffers_size; |
2878 p->upstream = u->peer.connection; | 2885 p->upstream = u->peer.connection; |
2911 } | 2918 } |
2912 | 2919 |
2913 p->max_temp_file_size = u->conf->max_temp_file_size; | 2920 p->max_temp_file_size = u->conf->max_temp_file_size; |
2914 p->temp_file_write_size = u->conf->temp_file_write_size; | 2921 p->temp_file_write_size = u->conf->temp_file_write_size; |
2915 | 2922 |
2923 #if (NGX_THREADS) | |
2924 if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) { | |
2925 p->thread_handler = ngx_http_upstream_thread_handler; | |
2926 p->thread_ctx = r; | |
2927 } | |
2928 #endif | |
2929 | |
2916 p->preread_bufs = ngx_alloc_chain_link(r->pool); | 2930 p->preread_bufs = ngx_alloc_chain_link(r->pool); |
2917 if (p->preread_bufs == NULL) { | 2931 if (p->preread_bufs == NULL) { |
2918 ngx_http_upstream_finalize_request(r, u, NGX_ERROR); | 2932 ngx_http_upstream_finalize_request(r, u, NGX_ERROR); |
2919 return; | 2933 return; |
2920 } | 2934 } |
3485 | 3499 |
3486 return NGX_OK; | 3500 return NGX_OK; |
3487 } | 3501 } |
3488 | 3502 |
3489 | 3503 |
3504 #if (NGX_THREADS) | |
3505 | |
3506 static ngx_int_t | |
3507 ngx_http_upstream_thread_handler(ngx_thread_task_t *task, ngx_file_t *file) | |
3508 { | |
3509 ngx_str_t name; | |
3510 ngx_event_pipe_t *p; | |
3511 ngx_thread_pool_t *tp; | |
3512 ngx_http_request_t *r; | |
3513 ngx_http_core_loc_conf_t *clcf; | |
3514 | |
3515 r = file->thread_ctx; | |
3516 p = r->upstream->pipe; | |
3517 | |
3518 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); | |
3519 tp = clcf->thread_pool; | |
3520 | |
3521 if (tp == NULL) { | |
3522 if (ngx_http_complex_value(r, clcf->thread_pool_value, &name) | |
3523 != NGX_OK) | |
3524 { | |
3525 return NGX_ERROR; | |
3526 } | |
3527 | |
3528 tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name); | |
3529 | |
3530 if (tp == NULL) { | |
3531 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, | |
3532 "thread pool \"%V\" not found", &name); | |
3533 return NGX_ERROR; | |
3534 } | |
3535 } | |
3536 | |
3537 task->event.data = r; | |
3538 task->event.handler = ngx_http_upstream_thread_event_handler; | |
3539 | |
3540 if (ngx_thread_task_post(tp, task) != NGX_OK) { | |
3541 return NGX_ERROR; | |
3542 } | |
3543 | |
3544 r->main->blocked++; | |
3545 r->aio = 1; | |
3546 p->aio = 1; | |
3547 | |
3548 return NGX_OK; | |
3549 } | |
3550 | |
3551 | |
3552 static void | |
3553 ngx_http_upstream_thread_event_handler(ngx_event_t *ev) | |
3554 { | |
3555 ngx_connection_t *c; | |
3556 ngx_http_request_t *r; | |
3557 | |
3558 r = ev->data; | |
3559 c = r->connection; | |
3560 | |
3561 ngx_http_set_log_request(c->log, r); | |
3562 | |
3563 ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, | |
3564 "http upstream thread: \"%V?%V\"", &r->uri, &r->args); | |
3565 | |
3566 r->main->blocked--; | |
3567 r->aio = 0; | |
3568 | |
3569 r->write_event_handler(r); | |
3570 | |
3571 ngx_http_run_posted_requests(c); | |
3572 } | |
3573 | |
3574 #endif | |
3575 | |
3576 | |
3577 static ngx_int_t | |
3578 ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain) | |
3579 { | |
3580 ngx_int_t rc; | |
3581 ngx_event_pipe_t *p; | |
3582 ngx_http_request_t *r; | |
3583 | |
3584 r = data; | |
3585 p = r->upstream->pipe; | |
3586 | |
3587 rc = ngx_http_output_filter(r, chain); | |
3588 | |
3589 p->aio = r->aio; | |
3590 | |
3591 return rc; | |
3592 } | |
3593 | |
3594 | |
3490 static void | 3595 static void |
3491 ngx_http_upstream_process_downstream(ngx_http_request_t *r) | 3596 ngx_http_upstream_process_downstream(ngx_http_request_t *r) |
3492 { | 3597 { |
3493 ngx_event_t *wev; | 3598 ngx_event_t *wev; |
3494 ngx_connection_t *c; | 3599 ngx_connection_t *c; |
3503 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, | 3608 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, |
3504 "http upstream process downstream"); | 3609 "http upstream process downstream"); |
3505 | 3610 |
3506 c->log->action = "sending to client"; | 3611 c->log->action = "sending to client"; |
3507 | 3612 |
3613 #if (NGX_THREADS) | |
3614 p->aio = r->aio; | |
3615 #endif | |
3616 | |
3508 if (wev->timedout) { | 3617 if (wev->timedout) { |
3509 | 3618 |
3510 if (wev->delayed) { | 3619 if (wev->delayed) { |
3511 | 3620 |
3512 wev->timedout = 0; | 3621 wev->timedout = 0; |
3631 { | 3740 { |
3632 ngx_temp_file_t *tf; | 3741 ngx_temp_file_t *tf; |
3633 ngx_event_pipe_t *p; | 3742 ngx_event_pipe_t *p; |
3634 | 3743 |
3635 p = u->pipe; | 3744 p = u->pipe; |
3745 | |
3746 #if (NGX_THREADS) | |
3747 if (p->writing) { | |
3748 return; | |
3749 } | |
3750 #endif | |
3636 | 3751 |
3637 if (u->peer.connection) { | 3752 if (u->peer.connection) { |
3638 | 3753 |
3639 if (u->store) { | 3754 if (u->store) { |
3640 | 3755 |