# HG changeset patch # User Valentin Bartenev # Date 1426343845 -10800 # Node ID 1fdba317ee6de901cb90957532b12d8ff3e84f93 # Parent 117c77b22db1d418c7fffbfc733efcb6f216dd04 Added support for offloading read() in thread pools. diff --git a/src/core/ngx_buf.h b/src/core/ngx_buf.h --- a/src/core/ngx_buf.h +++ b/src/core/ngx_buf.h @@ -90,15 +90,23 @@ struct ngx_output_chain_ctx_s { #endif unsigned need_in_memory:1; unsigned need_in_temp:1; -#if (NGX_HAVE_FILE_AIO) +#if (NGX_HAVE_FILE_AIO || NGX_THREADS) unsigned aio:1; +#endif +#if (NGX_HAVE_FILE_AIO) ngx_output_chain_aio_pt aio_handler; #if (NGX_HAVE_AIO_SENDFILE) ssize_t (*aio_preload)(ngx_buf_t *file); #endif #endif +#if (NGX_THREADS) + ngx_int_t (*thread_handler)(ngx_thread_task_t *task, + ngx_file_t *file); + ngx_thread_task_t *thread_task; +#endif + off_t alignment; ngx_pool_t *pool; diff --git a/src/core/ngx_file.h b/src/core/ngx_file.h --- a/src/core/ngx_file.h +++ b/src/core/ngx_file.h @@ -23,6 +23,12 @@ struct ngx_file_s { ngx_log_t *log; +#if (NGX_THREADS) + ngx_int_t (*thread_handler)(ngx_thread_task_t *task, + ngx_file_t *file); + void *thread_ctx; +#endif + #if (NGX_HAVE_FILE_AIO) ngx_event_aio_t *aio; #endif diff --git a/src/core/ngx_output_chain.c b/src/core/ngx_output_chain.c --- a/src/core/ngx_output_chain.c +++ b/src/core/ngx_output_chain.c @@ -50,7 +50,7 @@ ngx_output_chain(ngx_output_chain_ctx_t ngx_chain_t *cl, *out, **last_out; if (ctx->in == NULL && ctx->busy == NULL -#if (NGX_HAVE_FILE_AIO) +#if (NGX_HAVE_FILE_AIO || NGX_THREADS) && !ctx->aio #endif ) @@ -89,7 +89,7 @@ ngx_output_chain(ngx_output_chain_ctx_t for ( ;; ) { -#if (NGX_HAVE_FILE_AIO) +#if (NGX_HAVE_FILE_AIO || NGX_THREADS) if (ctx->aio) { return NGX_AGAIN; } @@ -233,6 +233,13 @@ ngx_output_chain_as_is(ngx_output_chain_ return 1; } +#if (NGX_THREADS) + if (buf->in_file) { + buf->file->thread_handler = ctx->thread_handler; + buf->file->thread_ctx = ctx->filter_ctx; + } +#endif + if (buf->in_file && buf->file->directio) { return 0; } @@ -559,7 +566,6 @@ ngx_output_chain_copy_buf(ngx_output_cha #endif #if (NGX_HAVE_FILE_AIO) - if (ctx->aio_handler) { n = ngx_file_aio_read(src->file, dst->pos, (size_t) size, src->file_pos, ctx->pool); @@ -568,15 +574,23 @@ ngx_output_chain_copy_buf(ngx_output_cha return NGX_AGAIN; } - } else { + } else +#endif +#if (NGX_THREADS) + if (src->file->thread_handler) { + n = ngx_thread_read(&ctx->thread_task, src->file, dst->pos, + (size_t) size, src->file_pos, ctx->pool); + if (n == NGX_AGAIN) { + ctx->aio = 1; + return NGX_AGAIN; + } + + } else +#endif + { n = ngx_read_file(src->file, dst->pos, (size_t) size, src->file_pos); } -#else - - n = ngx_read_file(src->file, dst->pos, (size_t) size, src->file_pos); - -#endif #if (NGX_HAVE_ALIGNED_DIRECTIO) diff --git a/src/http/ngx_http_copy_filter_module.c b/src/http/ngx_http_copy_filter_module.c --- a/src/http/ngx_http_copy_filter_module.c +++ b/src/http/ngx_http_copy_filter_module.c @@ -24,6 +24,11 @@ static ssize_t ngx_http_copy_aio_sendfil static void ngx_http_copy_aio_sendfile_event_handler(ngx_event_t *ev); #endif #endif +#if (NGX_THREADS) +static ngx_int_t ngx_http_copy_thread_handler(ngx_thread_task_t *task, + ngx_file_t *file); +static void ngx_http_copy_thread_event_handler(ngx_event_t *ev); +#endif static void *ngx_http_copy_filter_create_conf(ngx_conf_t *cf); static char *ngx_http_copy_filter_merge_conf(ngx_conf_t *cf, @@ -121,7 +126,7 @@ ngx_http_copy_filter(ngx_http_request_t ctx->filter_ctx = r; #if (NGX_HAVE_FILE_AIO) - if (ngx_file_aio && clcf->aio) { + if (ngx_file_aio && clcf->aio == NGX_HTTP_AIO_ON) { ctx->aio_handler = ngx_http_copy_aio_handler; #if (NGX_HAVE_AIO_SENDFILE) ctx->aio_preload = ngx_http_copy_aio_sendfile_preload; @@ -129,12 +134,18 @@ ngx_http_copy_filter(ngx_http_request_t } #endif +#if (NGX_THREADS) + if (clcf->aio == NGX_HTTP_AIO_THREADS) { + ctx->thread_handler = ngx_http_copy_thread_handler; + } +#endif + if (in && in->buf && ngx_buf_size(in->buf)) { r->request_output = 1; } } -#if (NGX_HAVE_FILE_AIO) +#if (NGX_HAVE_FILE_AIO || NGX_THREADS) ctx->aio = r->aio; #endif @@ -233,6 +244,67 @@ ngx_http_copy_aio_sendfile_event_handler #endif +#if (NGX_THREADS) + +static ngx_int_t +ngx_http_copy_thread_handler(ngx_thread_task_t *task, ngx_file_t *file) +{ + ngx_str_t name; + ngx_thread_pool_t *tp; + ngx_http_request_t *r; + ngx_http_core_loc_conf_t *clcf; + + r = file->thread_ctx; + + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + tp = clcf->thread_pool; + + if (tp == NULL) { + if (ngx_http_complex_value(r, clcf->thread_pool_value, &name) + != NGX_OK) + { + return NGX_ERROR; + } + + tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name); + + if (tp == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "thread pool \"%V\" not found", &name); + return NGX_ERROR; + } + } + + task->event.data = r; + task->event.handler = ngx_http_copy_thread_event_handler; + + if (ngx_thread_task_post(tp, task) != NGX_OK) { + return NGX_ERROR; + } + + r->main->blocked++; + r->aio = 1; + + return NGX_OK; +} + + +static void +ngx_http_copy_thread_event_handler(ngx_event_t *ev) +{ + ngx_http_request_t *r; + + r = ev->data; + + r->main->blocked--; + r->aio = 0; + + r->connection->write->handler(r->connection->write); +} + +#endif + + static void * ngx_http_copy_filter_create_conf(ngx_conf_t *cf) { diff --git a/src/http/ngx_http_core_module.c b/src/http/ngx_http_core_module.c --- a/src/http/ngx_http_core_module.c +++ b/src/http/ngx_http_core_module.c @@ -3624,6 +3624,10 @@ ngx_http_core_create_loc_conf(ngx_conf_t clcf->sendfile = NGX_CONF_UNSET; clcf->sendfile_max_chunk = NGX_CONF_UNSET_SIZE; clcf->aio = NGX_CONF_UNSET; +#if (NGX_THREADS) + clcf->thread_pool = NGX_CONF_UNSET_PTR; + clcf->thread_pool_value = NGX_CONF_UNSET_PTR; +#endif clcf->read_ahead = NGX_CONF_UNSET_SIZE; clcf->directio = NGX_CONF_UNSET; clcf->directio_alignment = NGX_CONF_UNSET; @@ -3839,7 +3843,14 @@ ngx_http_core_merge_loc_conf(ngx_conf_t ngx_conf_merge_value(conf->sendfile, prev->sendfile, 0); ngx_conf_merge_size_value(conf->sendfile_max_chunk, prev->sendfile_max_chunk, 0); +#if (NGX_HAVE_FILE_AIO || NGX_THREADS) ngx_conf_merge_value(conf->aio, prev->aio, NGX_HTTP_AIO_OFF); +#endif +#if (NGX_THREADS) + ngx_conf_merge_ptr_value(conf->thread_pool, prev->thread_pool, NULL); + ngx_conf_merge_ptr_value(conf->thread_pool_value, prev->thread_pool_value, + NULL); +#endif ngx_conf_merge_size_value(conf->read_ahead, prev->read_ahead, 0); ngx_conf_merge_off_value(conf->directio, prev->directio, NGX_OPEN_FILE_DIRECTIO_OFF); @@ -4644,6 +4655,11 @@ ngx_http_core_set_aio(ngx_conf_t *cf, ng return "is duplicate"; } +#if (NGX_THREADS) + clcf->thread_pool = NULL; + clcf->thread_pool_value = NULL; +#endif + value = cf->args->elts; if (ngx_strcmp(value[1].data, "off") == 0) { @@ -4676,6 +4692,64 @@ ngx_http_core_set_aio(ngx_conf_t *cf, ng #endif + if (ngx_strncmp(value[1].data, "threads", 7) == 0 + && (value[1].len == 7 || value[1].data[7] == '=')) + { +#if (NGX_THREADS) + ngx_str_t name; + ngx_thread_pool_t *tp; + ngx_http_complex_value_t cv; + ngx_http_compile_complex_value_t ccv; + + clcf->aio = NGX_HTTP_AIO_THREADS; + + if (value[1].len >= 8) { + name.len = value[1].len - 8; + name.data = value[1].data + 8; + + ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t)); + + ccv.cf = cf; + ccv.value = &name; + ccv.complex_value = &cv; + + if (ngx_http_compile_complex_value(&ccv) != NGX_OK) { + return NGX_CONF_ERROR; + } + + if (cv.lengths != NULL) { + clcf->thread_pool_value = ngx_palloc(cf->pool, + sizeof(ngx_http_complex_value_t)); + if (clcf->thread_pool_value == NULL) { + return NGX_CONF_ERROR; + } + + *clcf->thread_pool_value = cv; + + return NGX_CONF_OK; + } + + tp = ngx_thread_pool_add(cf, &name); + + } else { + tp = ngx_thread_pool_add(cf, NULL); + } + + if (tp == NULL) { + return NGX_CONF_ERROR; + } + + clcf->thread_pool = tp; + + return NGX_CONF_OK; +#else + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "\"aio threads\" " + "is unsupported on this platform"); + return NGX_CONF_ERROR; +#endif + } + return "invalid value"; } diff --git a/src/http/ngx_http_core_module.h b/src/http/ngx_http_core_module.h --- a/src/http/ngx_http_core_module.h +++ b/src/http/ngx_http_core_module.h @@ -13,6 +13,10 @@ #include #include +#if (NGX_THREADS) +#include +#endif + #define NGX_HTTP_GZIP_PROXIED_OFF 0x0002 #define NGX_HTTP_GZIP_PROXIED_EXPIRED 0x0004 @@ -27,6 +31,7 @@ #define NGX_HTTP_AIO_OFF 0 #define NGX_HTTP_AIO_ON 1 +#define NGX_HTTP_AIO_THREADS 2 #define NGX_HTTP_SATISFY_ALL 0 @@ -421,6 +426,11 @@ struct ngx_http_core_loc_conf_s { #endif #endif +#if (NGX_THREADS) + ngx_thread_pool_t *thread_pool; + ngx_http_complex_value_t *thread_pool_value; +#endif + #if (NGX_HAVE_OPENAT) ngx_uint_t disable_symlinks; /* disable_symlinks */ ngx_http_complex_value_t *disable_symlinks_from; diff --git a/src/http/ngx_http_file_cache.c b/src/http/ngx_http_file_cache.c --- a/src/http/ngx_http_file_cache.c +++ b/src/http/ngx_http_file_cache.c @@ -646,7 +646,7 @@ ngx_http_file_cache_aio_read(ngx_http_re clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); - if (!clcf->aio) { + if (clcf->aio != NGX_HTTP_AIO_ON) { goto noaio; } diff --git a/src/os/unix/ngx_files.c b/src/os/unix/ngx_files.c --- a/src/os/unix/ngx_files.c +++ b/src/os/unix/ngx_files.c @@ -9,6 +9,12 @@ #include +#if (NGX_THREADS) +#include +static void ngx_thread_read_handler(void *data, ngx_log_t *log); +#endif + + #if (NGX_HAVE_FILE_AIO) ngx_uint_t ngx_file_aio = 1; @@ -64,6 +70,109 @@ ngx_read_file(ngx_file_t *file, u_char * } +#if (NGX_THREADS) + +typedef struct { + ngx_fd_t fd; + u_char *buf; + size_t size; + off_t offset; + + size_t read; + ngx_err_t err; +} ngx_thread_read_ctx_t; + + +ssize_t +ngx_thread_read(ngx_thread_task_t **taskp, ngx_file_t *file, u_char *buf, + size_t size, off_t offset, ngx_pool_t *pool) +{ + ngx_thread_task_t *task; + ngx_thread_read_ctx_t *ctx; + + ngx_log_debug4(NGX_LOG_DEBUG_CORE, file->log, 0, + "thread read: %d, %p, %uz, %O", + file->fd, buf, size, offset); + + task = *taskp; + + if (task == NULL) { + task = ngx_thread_task_alloc(pool, sizeof(ngx_thread_read_ctx_t)); + if (task == NULL) { + return NGX_ERROR; + } + + task->handler = ngx_thread_read_handler; + + *taskp = task; + } + + ctx = task->ctx; + + if (task->event.complete) { + task->event.complete = 0; + + if (ctx->err) { + ngx_log_error(NGX_LOG_CRIT, file->log, ctx->err, + "pread() \"%s\" failed", file->name.data); + return NGX_ERROR; + } + + return ctx->read; + } + + ctx->fd = file->fd; + ctx->buf = buf; + ctx->size = size; + ctx->offset = offset; + + if (file->thread_handler(task, file) != NGX_OK) { + return NGX_ERROR; + } + + return NGX_AGAIN; +} + + +#if (NGX_HAVE_PREAD) + +static void +ngx_thread_read_handler(void *data, ngx_log_t *log) +{ + ngx_thread_read_ctx_t *ctx = data; + + ssize_t n; + + ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, 0, "thread read handler"); + + n = pread(ctx->fd, ctx->buf, ctx->size, ctx->offset); + + if (n == -1) { + ctx->err = ngx_errno; + + } else { + ctx->read = n; + ctx->err = 0; + } + +#if 0 + ngx_time_update(); +#endif + + ngx_log_debug4(NGX_LOG_DEBUG_CORE, log, 0, + "pread: %z (err: %i) of %uz @%O", + n, ctx->err, ctx->size, ctx->offset); +} + +#else + +#error pread() is required! + +#endif + +#endif /* NGX_THREADS */ + + ssize_t ngx_write_file(ngx_file_t *file, u_char *buf, size_t size, off_t offset) { diff --git a/src/os/unix/ngx_files.h b/src/os/unix/ngx_files.h --- a/src/os/unix/ngx_files.h +++ b/src/os/unix/ngx_files.h @@ -383,5 +383,10 @@ extern ngx_uint_t ngx_file_aio; #endif +#if (NGX_THREADS) +ssize_t ngx_thread_read(ngx_thread_task_t **taskp, ngx_file_t *file, + u_char *buf, size_t size, off_t offset, ngx_pool_t *pool); +#endif + #endif /* _NGX_FILES_H_INCLUDED_ */