# HG changeset patch # User Valentin Bartenev # Date 1426343827 -10800 # Node ID 466bd63b63d1b104121526681ee507de3ccb1fa5 # Parent 83d54192e97b4472217f5c34eab821e2f2e45117 Thread pools implementation. diff --git a/auto/configure b/auto/configure --- a/auto/configure +++ b/auto/configure @@ -58,6 +58,7 @@ if [ "$NGX_PLATFORM" != win32 ]; then . auto/unix fi +. auto/threads . auto/modules . auto/lib/conf diff --git a/auto/modules b/auto/modules --- a/auto/modules +++ b/auto/modules @@ -432,6 +432,12 @@ fi modules="$CORE_MODULES $EVENT_MODULES" +# thread pool module should be initialized after events +if [ $USE_THREADS = YES ]; then + modules="$modules $THREAD_POOL_MODULE" +fi + + if [ $USE_OPENSSL = YES ]; then modules="$modules $OPENSSL_MODULE" CORE_DEPS="$CORE_DEPS $OPENSSL_DEPS" diff --git a/auto/options b/auto/options --- a/auto/options +++ b/auto/options @@ -190,6 +190,8 @@ do --without-poll_module) EVENT_POLL=NONE ;; --with-aio_module) EVENT_AIO=YES ;; + --with-threads) USE_THREADS=YES ;; + --with-file-aio) NGX_FILE_AIO=YES ;; --with-ipv6) NGX_IPV6=YES ;; @@ -351,6 +353,8 @@ cat << END --with-poll_module enable poll module --without-poll_module disable poll module + --with-threads enable thread pool support + --with-file-aio enable file AIO support --with-ipv6 enable IPv6 support diff --git a/auto/sources b/auto/sources --- a/auto/sources +++ b/auto/sources @@ -193,6 +193,13 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \ POSIX_DEPS=src/os/unix/ngx_posix_config.h +THREAD_POOL_MODULE=ngx_thread_pool_module +THREAD_POOL_DEPS=src/core/ngx_thread_pool.h +THREAD_POOL_SRCS="src/core/ngx_thread_pool.c + src/os/unix/ngx_thread_cond.c + src/os/unix/ngx_thread_mutex.c + src/os/unix/ngx_thread_id.c" + FREEBSD_DEPS="src/os/unix/ngx_freebsd_config.h src/os/unix/ngx_freebsd.h" FREEBSD_SRCS=src/os/unix/ngx_freebsd_init.c FREEBSD_SENDFILE_SRCS=src/os/unix/ngx_freebsd_sendfile_chain.c diff --git a/auto/summary b/auto/summary --- a/auto/summary +++ b/auto/summary @@ -7,6 +7,10 @@ echo echo "Configuration summary" +if [ $USE_THREADS = YES ]; then + echo " + using threads" +fi + if [ $USE_PCRE = DISABLED ]; then echo " + PCRE library is disabled" diff --git a/auto/threads b/auto/threads new file mode 100644 --- /dev/null +++ b/auto/threads @@ -0,0 +1,20 @@ + +# Copyright (C) Nginx, Inc. + + +if [ $USE_THREADS = YES ]; then + + if [ "$NGX_PLATFORM" = win32 ]; then + cat << END + +$0: --with-threads is not supported on Windows + +END + exit 1 + fi + + have=NGX_THREADS . auto/have + CORE_DEPS="$CORE_DEPS $THREAD_POOL_DEPS" + CORE_SRCS="$CORE_SRCS $THREAD_POOL_SRCS" + CORE_LIBS="$CORE_LIBS -lpthread" +fi diff --git a/src/core/ngx_core.h b/src/core/ngx_core.h --- a/src/core/ngx_core.h +++ b/src/core/ngx_core.h @@ -22,6 +22,10 @@ typedef struct ngx_event_s ngx_eve typedef struct ngx_event_aio_s ngx_event_aio_t; typedef struct ngx_connection_s ngx_connection_t; +#if (NGX_THREADS) +typedef struct ngx_thread_task_s ngx_thread_task_t; +#endif + typedef void (*ngx_event_handler_pt)(ngx_event_t *ev); typedef void (*ngx_connection_handler_pt)(ngx_connection_t *c); diff --git a/src/core/ngx_thread_pool.c b/src/core/ngx_thread_pool.c new file mode 100644 --- /dev/null +++ b/src/core/ngx_thread_pool.c @@ -0,0 +1,631 @@ + +/* + * Copyright (C) Nginx, Inc. + * Copyright (C) Valentin V. Bartenev + * Copyright (C) Ruslan Ermilov + */ + + +#include +#include +#include + + +typedef struct { + ngx_array_t pools; +} ngx_thread_pool_conf_t; + + +typedef struct { + ngx_thread_mutex_t mtx; + ngx_uint_t count; + ngx_thread_task_t *first; + ngx_thread_task_t **last; +} ngx_thread_pool_queue_t; + + +struct ngx_thread_pool_s { + ngx_thread_cond_t cond; + + ngx_thread_pool_queue_t queue; + + ngx_log_t *log; + ngx_pool_t *pool; + + ngx_str_t name; + ngx_uint_t threads; + ngx_uint_t max_queue; + + u_char *file; + ngx_uint_t line; +}; + + +static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, + ngx_pool_t *pool); +static ngx_int_t ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, + ngx_log_t *log); +static ngx_int_t ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, + ngx_log_t *log); +static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp); + +static void *ngx_thread_pool_cycle(void *data); +static void ngx_thread_pool_handler(ngx_event_t *ev); + +static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); + +static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle); +static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf); + +static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle); +static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle); + + +static ngx_command_t ngx_thread_pool_commands[] = { + + { ngx_string("thread_pool"), + NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23, + ngx_thread_pool, + 0, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_core_module_t ngx_thread_pool_module_ctx = { + ngx_string("thread_pool"), + ngx_thread_pool_create_conf, + ngx_thread_pool_init_conf +}; + + +ngx_module_t ngx_thread_pool_module = { + NGX_MODULE_V1, + &ngx_thread_pool_module_ctx, /* module context */ + ngx_thread_pool_commands, /* module directives */ + NGX_CORE_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + ngx_thread_pool_init_worker, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + ngx_thread_pool_exit_worker, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static ngx_str_t ngx_thread_pool_default = ngx_string("default"); + +static ngx_uint_t ngx_thread_pool_task_id; +static ngx_thread_pool_queue_t ngx_thread_pool_done; + + +static ngx_int_t +ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool) +{ + int err; + pthread_t tid; + ngx_uint_t n; + pthread_attr_t attr; + + if (ngx_notify == NULL) { + ngx_log_error(NGX_LOG_ALERT, log, 0, + "the configured event method cannot be used with thread pools"); + return NGX_ERROR; + } + + if (ngx_thread_pool_queue_init(&tp->queue, log) != NGX_OK) { + return NGX_ERROR; + } + + if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) { + (void) ngx_thread_pool_queue_destroy(&tp->queue, log); + return NGX_ERROR; + } + + tp->log = log; + tp->pool = pool; + + err = pthread_attr_init(&attr); + if (err) { + ngx_log_error(NGX_LOG_ALERT, log, err, + "pthread_attr_init() failed"); + return NGX_ERROR; + } + +#if 0 + err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); + if (err) { + ngx_log_error(NGX_LOG_ALERT, log, err, + "pthread_attr_setstacksize() failed"); + return NGX_ERROR; + } +#endif + + for (n = 0; n < tp->threads; n++) { + err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp); + if (err) { + ngx_log_error(NGX_LOG_ALERT, log, err, + "pthread_create() failed"); + return NGX_ERROR; + } + } + + (void) pthread_attr_destroy(&attr); + + return NGX_OK; +} + + +static ngx_int_t +ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, ngx_log_t *log) +{ + queue->count = 0; + queue->first = NULL; + queue->last = &queue->first; + + return ngx_thread_mutex_create(&queue->mtx, log); +} + + +static ngx_int_t +ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, ngx_log_t *log) +{ + return ngx_thread_mutex_destroy(&queue->mtx, log); +} + + +static void +ngx_thread_pool_destroy(ngx_thread_pool_t *tp) +{ + /* TODO: exit threads */ + + (void) ngx_thread_cond_destroy(&tp->cond, tp->log); + (void) ngx_thread_pool_queue_destroy(&tp->queue, tp->log); +} + + +ngx_thread_task_t * +ngx_thread_task_alloc(ngx_pool_t *pool, size_t size) +{ + ngx_thread_task_t *task; + + task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size); + if (task == NULL) { + return NULL; + } + + task->ctx = task + 1; + + return task; +} + + +ngx_int_t +ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task) +{ + if (task->event.active) { + ngx_log_error(NGX_LOG_ALERT, tp->log, 0, + "task #%ui already active", task->id); + return NGX_ERROR; + } + + if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { + return NGX_ERROR; + } + + if (tp->queue.count >= tp->max_queue) { + (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); + + ngx_log_error(NGX_LOG_ERR, tp->log, 0, + "thread pool \"%V\" queue overflow: %ui tasks waiting", + &tp->name, tp->queue.count); + return NGX_ERROR; + } + + task->event.active = 1; + + task->id = ngx_thread_pool_task_id++; + task->next = NULL; + + if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) { + (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); + return NGX_ERROR; + } + + *tp->queue.last = task; + tp->queue.last = &task->next; + + tp->queue.count++; + + (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); + + ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, + "task #%ui added to thread pool \"%V\"", + task->id, &tp->name); + + return NGX_OK; +} + + +static void * +ngx_thread_pool_cycle(void *data) +{ + ngx_thread_pool_t *tp = data; + + int err; + sigset_t set; + ngx_thread_task_t *task; + +#if 0 + ngx_time_update(); +#endif + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0, + "thread in pool \"%V\" started", &tp->name); + + sigfillset(&set); + + sigdelset(&set, SIGILL); + sigdelset(&set, SIGFPE); + sigdelset(&set, SIGSEGV); + sigdelset(&set, SIGBUS); + + err = pthread_sigmask(SIG_BLOCK, &set, NULL); + if (err) { + ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed"); + return NULL; + } + + for ( ;; ) { + if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) { + return NULL; + } + + while (tp->queue.count == 0) { + if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log) + != NGX_OK) + { + (void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log); + return NULL; + } + } + + tp->queue.count--; + + task = tp->queue.first; + tp->queue.first = task->next; + + if (tp->queue.first == NULL) { + tp->queue.last = &tp->queue.first; + } + + if (ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log) != NGX_OK) { + return NULL; + } + +#if 0 + ngx_time_update(); +#endif + + ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, + "run task #%ui in thread pool \"%V\"", + task->id, &tp->name); + + task->handler(task->ctx, tp->log); + + ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, + "complete task #%ui in thread pool \"%V\"", + task->id, &tp->name); + + task->next = NULL; + + if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, tp->log) + != NGX_OK) + { + return NULL; + } + + *ngx_thread_pool_done.last = task; + ngx_thread_pool_done.last = &task->next; + + if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, tp->log) + != NGX_OK) + { + return NULL; + } + + (void) ngx_notify(ngx_thread_pool_handler); + } +} + + +static void +ngx_thread_pool_handler(ngx_event_t *ev) +{ + ngx_event_t *event; + ngx_thread_task_t *task; + + ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler"); + + if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) { + return; + } + + task = ngx_thread_pool_done.first; + ngx_thread_pool_done.first = NULL; + ngx_thread_pool_done.last = &ngx_thread_pool_done.first; + + if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) { + return; + } + + while (task) { + ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, + "run completion handler for task #%ui", task->id); + + event = &task->event; + task = task->next; + + event->complete = 1; + event->active = 0; + + event->handler(event); + } +} + + +static void * +ngx_thread_pool_create_conf(ngx_cycle_t *cycle) +{ + ngx_thread_pool_conf_t *tcf; + + tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t)); + if (tcf == NULL) { + return NULL; + } + + if (ngx_array_init(&tcf->pools, cycle->pool, 4, + sizeof(ngx_thread_pool_t *)) + != NGX_OK) + { + return NULL; + } + + return tcf; +} + + +static char * +ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf) +{ + ngx_thread_pool_conf_t *tcf = conf; + + ngx_uint_t i; + ngx_thread_pool_t **tpp; + + tpp = tcf->pools.elts; + + for (i = 0; i < tcf->pools.nelts; i++) { + + if (tpp[i]->threads) { + continue; + } + + if (tpp[i]->name.len == ngx_thread_pool_default.len + && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data, + ngx_thread_pool_default.len) + == 0) + { + tpp[i]->threads = 32; + tpp[i]->max_queue = 65536; + continue; + } + + ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, + "unknown thread pool \"%V\" in %s:%ui", + &tpp[i]->name, tpp[i]->file, tpp[i]->line); + + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} + + +static char * +ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_str_t *value; + ngx_uint_t i; + ngx_thread_pool_t *tp; + + value = cf->args->elts; + + tp = ngx_thread_pool_add(cf, &value[1]); + + if (tp == NULL) { + return NGX_CONF_ERROR; + } + + if (tp->threads) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "duplicate thread pool \"%V\"", &tp->name); + return NGX_CONF_ERROR; + } + + tp->max_queue = 65536; + + for (i = 2; i < cf->args->nelts; i++) { + + if (ngx_strncmp(value[i].data, "threads=", 8) == 0) { + + tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8); + + if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid threads value \"%V\"", &value[i]); + return NGX_CONF_ERROR; + } + + continue; + } + + if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) { + + tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10); + + if (tp->max_queue == (ngx_uint_t) NGX_ERROR) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid max_queue value \"%V\"", &value[i]); + return NGX_CONF_ERROR; + } + + continue; + } + } + + if (tp->threads == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "\"%V\" must have \"threads\" parameter", + &cmd->name); + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} + + +ngx_thread_pool_t * +ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name) +{ + ngx_thread_pool_t *tp, **tpp; + ngx_thread_pool_conf_t *tcf; + + if (name == NULL) { + name = &ngx_thread_pool_default; + } + + tp = ngx_thread_pool_get(cf->cycle, name); + + if (tp) { + return tp; + } + + tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t)); + if (tp == NULL) { + return NULL; + } + + tp->name = *name; + tp->file = cf->conf_file->file.name.data; + tp->line = cf->conf_file->line; + + tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, + ngx_thread_pool_module); + + tpp = ngx_array_push(&tcf->pools); + if (tpp == NULL) { + return NULL; + } + + *tpp = tp; + + return tp; +} + + +ngx_thread_pool_t * +ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name) +{ + ngx_uint_t i; + ngx_thread_pool_t **tpp; + ngx_thread_pool_conf_t *tcf; + + tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, + ngx_thread_pool_module); + + tpp = tcf->pools.elts; + + for (i = 0; i < tcf->pools.nelts; i++) { + + if (tpp[i]->name.len == name->len + && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0) + { + return tpp[i]; + } + } + + return NULL; +} + + +static ngx_int_t +ngx_thread_pool_init_worker(ngx_cycle_t *cycle) +{ + ngx_uint_t i; + ngx_thread_pool_t **tpp; + ngx_thread_pool_conf_t *tcf; + + if (ngx_process != NGX_PROCESS_WORKER + && ngx_process != NGX_PROCESS_SINGLE) + { + return NGX_OK; + } + + tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, + ngx_thread_pool_module); + + if (tcf == NULL) { + return NGX_OK; + } + + if (ngx_thread_pool_queue_init(&ngx_thread_pool_done, cycle->log) + != NGX_OK) + { + return NGX_ERROR; + } + + tpp = tcf->pools.elts; + + for (i = 0; i < tcf->pools.nelts; i++) { + if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) { + return NGX_ERROR; + } + } + + return NGX_OK; +} + + +static void +ngx_thread_pool_exit_worker(ngx_cycle_t *cycle) +{ + ngx_uint_t i; + ngx_thread_pool_t **tpp; + ngx_thread_pool_conf_t *tcf; + + if (ngx_process != NGX_PROCESS_WORKER + && ngx_process != NGX_PROCESS_SINGLE) + { + return; + } + + tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, + ngx_thread_pool_module); + + if (tcf == NULL) { + return; + } + + tpp = tcf->pools.elts; + + for (i = 0; i < tcf->pools.nelts; i++) { + ngx_thread_pool_destroy(tpp[i]); + } + + (void) ngx_thread_pool_queue_destroy(&ngx_thread_pool_done, cycle->log); +} diff --git a/src/core/ngx_thread_pool.h b/src/core/ngx_thread_pool.h new file mode 100644 --- /dev/null +++ b/src/core/ngx_thread_pool.h @@ -0,0 +1,36 @@ + +/* + * Copyright (C) Nginx, Inc. + * Copyright (C) Valentin V. Bartenev + */ + + +#ifndef _NGX_THREAD_POOL_H_INCLUDED_ +#define _NGX_THREAD_POOL_H_INCLUDED_ + + +#include +#include +#include + + +struct ngx_thread_task_s { + ngx_thread_task_t *next; + ngx_uint_t id; + void *ctx; + void (*handler)(void *data, ngx_log_t *log); + ngx_event_t event; +}; + + +typedef struct ngx_thread_pool_s ngx_thread_pool_t; + + +ngx_thread_pool_t *ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name); +ngx_thread_pool_t *ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name); + +ngx_thread_task_t *ngx_thread_task_alloc(ngx_pool_t *pool, size_t size); +ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task); + + +#endif /* _NGX_THREAD_POOL_H_INCLUDED_ */ diff --git a/src/event/modules/ngx_aio_module.c b/src/event/modules/ngx_aio_module.c --- a/src/event/modules/ngx_aio_module.c +++ b/src/event/modules/ngx_aio_module.c @@ -48,6 +48,7 @@ ngx_event_module_t ngx_aio_module_ctx = NULL, /* disable an event */ NULL, /* add an connection */ ngx_aio_del_connection, /* delete an connection */ + NULL, /* trigger a notify */ NULL, /* process the changes */ ngx_aio_process_events, /* process the events */ ngx_aio_init, /* init the events */ diff --git a/src/event/modules/ngx_devpoll_module.c b/src/event/modules/ngx_devpoll_module.c --- a/src/event/modules/ngx_devpoll_module.c +++ b/src/event/modules/ngx_devpoll_module.c @@ -88,6 +88,7 @@ ngx_event_module_t ngx_devpoll_module_c ngx_devpoll_del_event, /* disable an event */ NULL, /* add an connection */ NULL, /* delete an connection */ + NULL, /* trigger a notify */ NULL, /* process the changes */ ngx_devpoll_process_events, /* process the events */ ngx_devpoll_init, /* init the events */ diff --git a/src/event/modules/ngx_epoll_module.c b/src/event/modules/ngx_epoll_module.c --- a/src/event/modules/ngx_epoll_module.c +++ b/src/event/modules/ngx_epoll_module.c @@ -164,6 +164,7 @@ ngx_event_module_t ngx_epoll_module_ctx ngx_epoll_del_event, /* disable an event */ ngx_epoll_add_connection, /* add an connection */ ngx_epoll_del_connection, /* delete an connection */ + NULL, /* trigger a notify */ NULL, /* process the changes */ ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ diff --git a/src/event/modules/ngx_eventport_module.c b/src/event/modules/ngx_eventport_module.c --- a/src/event/modules/ngx_eventport_module.c +++ b/src/event/modules/ngx_eventport_module.c @@ -172,6 +172,7 @@ ngx_event_module_t ngx_eventport_module ngx_eventport_del_event, /* disable an event */ NULL, /* add an connection */ NULL, /* delete an connection */ + NULL, /* trigger a notify */ NULL, /* process the changes */ ngx_eventport_process_events, /* process the events */ ngx_eventport_init, /* init the events */ diff --git a/src/event/modules/ngx_iocp_module.c b/src/event/modules/ngx_iocp_module.c --- a/src/event/modules/ngx_iocp_module.c +++ b/src/event/modules/ngx_iocp_module.c @@ -64,6 +64,7 @@ ngx_event_module_t ngx_iocp_module_ctx NULL, /* disable an event */ NULL, /* add an connection */ ngx_iocp_del_connection, /* delete an connection */ + NULL, /* trigger a notify */ NULL, /* process the changes */ ngx_iocp_process_events, /* process the events */ ngx_iocp_init, /* init the events */ diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -89,6 +89,7 @@ ngx_event_module_t ngx_kqueue_module_ct ngx_kqueue_del_event, /* disable an event */ NULL, /* add an connection */ NULL, /* delete an connection */ + NULL, /* trigger a notify */ ngx_kqueue_process_changes, /* process the changes */ ngx_kqueue_process_events, /* process the events */ ngx_kqueue_init, /* init the events */ diff --git a/src/event/modules/ngx_poll_module.c b/src/event/modules/ngx_poll_module.c --- a/src/event/modules/ngx_poll_module.c +++ b/src/event/modules/ngx_poll_module.c @@ -39,6 +39,7 @@ ngx_event_module_t ngx_poll_module_ctx ngx_poll_del_event, /* disable an event */ NULL, /* add an connection */ NULL, /* delete an connection */ + NULL, /* trigger a notify */ NULL, /* process the changes */ ngx_poll_process_events, /* process the events */ ngx_poll_init, /* init the events */ diff --git a/src/event/modules/ngx_rtsig_module.c b/src/event/modules/ngx_rtsig_module.c --- a/src/event/modules/ngx_rtsig_module.c +++ b/src/event/modules/ngx_rtsig_module.c @@ -130,6 +130,7 @@ ngx_event_module_t ngx_rtsig_module_ctx NULL, /* disable an event */ ngx_rtsig_add_connection, /* add an connection */ ngx_rtsig_del_connection, /* delete an connection */ + NULL, /* trigger a notify */ NULL, /* process the changes */ ngx_rtsig_process_events, /* process the events */ ngx_rtsig_init, /* init the events */ diff --git a/src/event/modules/ngx_select_module.c b/src/event/modules/ngx_select_module.c --- a/src/event/modules/ngx_select_module.c +++ b/src/event/modules/ngx_select_module.c @@ -47,6 +47,7 @@ ngx_event_module_t ngx_select_module_ct ngx_select_del_event, /* disable an event */ NULL, /* add an connection */ NULL, /* delete an connection */ + NULL, /* trigger a notify */ NULL, /* process the changes */ ngx_select_process_events, /* process the events */ ngx_select_init, /* init the events */ diff --git a/src/event/modules/ngx_win32_select_module.c b/src/event/modules/ngx_win32_select_module.c --- a/src/event/modules/ngx_win32_select_module.c +++ b/src/event/modules/ngx_win32_select_module.c @@ -48,6 +48,7 @@ ngx_event_module_t ngx_select_module_ct ngx_select_del_event, /* disable an event */ NULL, /* add an connection */ NULL, /* delete an connection */ + NULL, /* trigger a notify */ NULL, /* process the changes */ ngx_select_process_events, /* process the events */ ngx_select_init, /* init the events */ diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c --- a/src/event/ngx_event.c +++ b/src/event/ngx_event.c @@ -178,7 +178,7 @@ ngx_event_module_t ngx_event_core_modul ngx_event_core_create_conf, /* create configuration */ ngx_event_core_init_conf, /* init configuration */ - { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL } + { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL } }; diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -200,6 +200,8 @@ typedef struct { ngx_int_t (*add_conn)(ngx_connection_t *c); ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags); + ngx_int_t (*notify)(ngx_event_handler_pt handler); + ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait); ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); @@ -422,6 +424,8 @@ extern ngx_event_actions_t ngx_event_a #define ngx_add_conn ngx_event_actions.add_conn #define ngx_del_conn ngx_event_actions.del_conn +#define ngx_notify ngx_event_actions.notify + #define ngx_add_timer ngx_event_add_timer #define ngx_del_timer ngx_event_del_timer diff --git a/src/os/unix/ngx_linux_config.h b/src/os/unix/ngx_linux_config.h --- a/src/os/unix/ngx_linux_config.h +++ b/src/os/unix/ngx_linux_config.h @@ -93,11 +93,11 @@ extern ssize_t sendfile(int s, int fd, i #endif -#if (NGX_HAVE_FILE_AIO) #if (NGX_HAVE_SYS_EVENTFD_H) #include #endif #include +#if (NGX_HAVE_FILE_AIO) #include typedef struct iocb ngx_aiocb_t; #endif diff --git a/src/os/unix/ngx_thread.h b/src/os/unix/ngx_thread.h --- a/src/os/unix/ngx_thread.h +++ b/src/os/unix/ngx_thread.h @@ -111,9 +111,61 @@ ngx_int_t ngx_cond_signal(ngx_cond_t *cv #define ngx_thread_volatile +#if (NGX_THREADS) + +#include + + +typedef pthread_mutex_t ngx_thread_mutex_t; + +ngx_int_t ngx_thread_mutex_create(ngx_thread_mutex_t *mtx, ngx_log_t *log); +ngx_int_t ngx_thread_mutex_destroy(ngx_thread_mutex_t *mtx, ngx_log_t *log); +ngx_int_t ngx_thread_mutex_lock(ngx_thread_mutex_t *mtx, ngx_log_t *log); +ngx_int_t ngx_thread_mutex_unlock(ngx_thread_mutex_t *mtx, ngx_log_t *log); + + +typedef pthread_cond_t ngx_thread_cond_t; + +ngx_int_t ngx_thread_cond_create(ngx_thread_cond_t *cond, ngx_log_t *log); +ngx_int_t ngx_thread_cond_destroy(ngx_thread_cond_t *cond, ngx_log_t *log); +ngx_int_t ngx_thread_cond_signal(ngx_thread_cond_t *cond, ngx_log_t *log); +ngx_int_t ngx_thread_cond_wait(ngx_thread_cond_t *cond, ngx_thread_mutex_t *mtx, + ngx_log_t *log); + + +#if (NGX_LINUX) + +typedef pid_t ngx_tid_t; +#define NGX_TID_T_FMT "%P" + +#elif (NGX_FREEBSD) + +typedef uint32_t ngx_tid_t; +#define NGX_TID_T_FMT "%uD" + +#elif (NGX_DARWIN) + +typedef uint64_t ngx_tid_t; +#define NGX_TID_T_FMT "%uA" + +#else + +typedef uint64_t ngx_tid_t; +#define NGX_TID_T_FMT "%uA" + +#endif + +ngx_tid_t ngx_thread_tid(void); + +#define ngx_log_tid ngx_thread_tid() + +#else + #define ngx_log_tid 0 #define NGX_TID_T_FMT "%d" +#endif + #define ngx_mutex_trylock(m) NGX_OK #define ngx_mutex_lock(m) #define ngx_mutex_unlock(m) diff --git a/src/os/unix/ngx_thread_cond.c b/src/os/unix/ngx_thread_cond.c new file mode 100644 --- /dev/null +++ b/src/os/unix/ngx_thread_cond.c @@ -0,0 +1,87 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#include +#include + + +ngx_int_t +ngx_thread_cond_create(ngx_thread_cond_t *cond, ngx_log_t *log) +{ + ngx_err_t err; + + err = pthread_cond_init(cond, NULL); + if (err == 0) { + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "pthread_cond_init(%p)", cond); + return NGX_OK; + } + + ngx_log_error(NGX_LOG_EMERG, log, err, "pthread_cond_init() failed"); + return NGX_ERROR; +} + + +ngx_int_t +ngx_thread_cond_destroy(ngx_thread_cond_t *cond, ngx_log_t *log) +{ + ngx_err_t err; + + err = pthread_cond_destroy(cond); + if (err == 0) { + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "pthread_cond_destroy(%p)", cond); + return NGX_OK; + } + + ngx_log_error(NGX_LOG_EMERG, log, err, "pthread_cond_destroy() failed"); + return NGX_ERROR; +} + + +ngx_int_t +ngx_thread_cond_signal(ngx_thread_cond_t *cond, ngx_log_t *log) +{ + ngx_err_t err; + + err = pthread_cond_signal(cond); + if (err == 0) { + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "pthread_cond_signal(%p)", cond); + return NGX_OK; + } + + ngx_log_error(NGX_LOG_EMERG, log, err, "pthread_cond_signal() failed"); + return NGX_ERROR; +} + + +ngx_int_t +ngx_thread_cond_wait(ngx_thread_cond_t *cond, ngx_thread_mutex_t *mtx, + ngx_log_t *log) +{ + ngx_err_t err; + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "pthread_cond_wait(%p) enter", cond); + + err = pthread_cond_wait(cond, mtx); + +#if 0 + ngx_time_update(); +#endif + + if (err == 0) { + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "pthread_cond_wait(%p) exit", cond); + return NGX_OK; + } + + ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_cond_wait() failed"); + + return NGX_ERROR; +} diff --git a/src/os/unix/ngx_thread_id.c b/src/os/unix/ngx_thread_id.c new file mode 100644 --- /dev/null +++ b/src/os/unix/ngx_thread_id.c @@ -0,0 +1,70 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include + + +#if (NGX_LINUX) + +/* + * Linux thread id is a pid of thread created by clone(2), + * glibc does not provide a wrapper for gettid(). + */ + +ngx_tid_t +ngx_thread_tid(void) +{ + return syscall(SYS_gettid); +} + +#elif (NGX_FREEBSD) && (__FreeBSD_version >= 900031) + +#include + +ngx_tid_t +ngx_thread_tid(void) +{ + return pthread_getthreadid_np(); +} + +#elif (NGX_DARWIN) + +/* + * MacOSX thread has two thread ids: + * + * 1) MacOSX 10.6 (Snow Leoprad) has pthread_threadid_np() returning + * an uint64_t value, which is obtained using the __thread_selfid() + * syscall. It is a number above 300,000. + */ + +ngx_tid_t +ngx_thread_tid(void) +{ + uint64_t tid; + + (void) pthread_threadid_np(NULL, &tid); + return tid; +} + +/* + * 2) Kernel thread mach_port_t returned by pthread_mach_thread_np(). + * It is a number in range 100-100,000. + * + * return pthread_mach_thread_np(pthread_self()); + */ + +#else + +ngx_tid_t +ngx_thread_tid(void) +{ + return (uint64_t) (uintptr_t) pthread_self(); +} + +#endif diff --git a/src/os/unix/ngx_thread_mutex.c b/src/os/unix/ngx_thread_mutex.c new file mode 100644 --- /dev/null +++ b/src/os/unix/ngx_thread_mutex.c @@ -0,0 +1,174 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Nginx, Inc. + */ + +#include +#include + + +/* + * All modern pthread mutex implementations try to acquire a lock + * atomically in userland before going to sleep in kernel. Some + * spins before the sleeping. + * + * In Solaris since version 8 all mutex types spin before sleeping. + * The default spin count is 1000. It can be overridden using + * _THREAD_ADAPTIVE_SPIN=100 environment variable. + * + * In MacOSX all mutex types spin to acquire a lock protecting a mutex's + * internals. If the mutex is busy, thread calls Mach semaphore_wait(). + * + * + * PTHREAD_MUTEX_NORMAL lacks deadlock detection and is the fastest + * mutex type. + * + * Linux: No spinning. The internal name PTHREAD_MUTEX_TIMED_NP + * remains from the times when pthread_mutex_timedlock() was + * non-standard extension. Alias name: PTHREAD_MUTEX_FAST_NP. + * FreeBSD: No spinning. + * + * + * PTHREAD_MUTEX_ERRORCHECK is usually as fast as PTHREAD_MUTEX_NORMAL + * yet has lightweight deadlock detection. + * + * Linux: No spinning. The internal name: PTHREAD_MUTEX_ERRORCHECK_NP. + * FreeBSD: No spinning. + * + * + * PTHREAD_MUTEX_RECURSIVE allows recursive locking. + * + * Linux: No spinning. The internal name: PTHREAD_MUTEX_RECURSIVE_NP. + * FreeBSD: No spinning. + * + * + * PTHREAD_MUTEX_ADAPTIVE_NP spins on SMP systems before sleeping. + * + * Linux: No deadlock detection. Dynamically changes a spin count + * for each mutex from 10 to 100 based on spin count taken + * previously. + * FreeBSD: Deadlock detection. The default spin count is 2000. + * It can be overriden using LIBPTHREAD_SPINLOOPS environment + * variable or by pthread_mutex_setspinloops_np(). If a lock + * is still busy, sched_yield() can be called on both UP and + * SMP systems. The default yield loop count is zero, but + * it can be set by LIBPTHREAD_YIELDLOOPS environment + * variable or by pthread_mutex_setyieldloops_np(). + * Solaris: No PTHREAD_MUTEX_ADAPTIVE_NP. + * MacOSX: No PTHREAD_MUTEX_ADAPTIVE_NP. + * + * + * PTHREAD_MUTEX_ELISION_NP is a Linux extension to elide locks using + * Intel Restricted Transactional Memory. It is the most suitable for + * rwlock pattern access because it allows simultaneous reads without lock. + * Supported since glibc 2.18. + * + * + * PTHREAD_MUTEX_DEFAULT is default mutex type. + * + * Linux: PTHREAD_MUTEX_NORMAL. + * FreeBSD: PTHREAD_MUTEX_ERRORCHECK. + * Solaris: PTHREAD_MUTEX_NORMAL. + * MacOSX: PTHREAD_MUTEX_NORMAL. + */ + + +ngx_int_t +ngx_thread_mutex_create(ngx_thread_mutex_t *mtx, ngx_log_t *log) +{ + ngx_err_t err; + pthread_mutexattr_t attr; + + err = pthread_mutexattr_init(&attr); + if (err != 0) { + ngx_log_error(NGX_LOG_EMERG, log, err, + "pthread_mutexattr_init() failed"); + return NGX_ERROR; + } + + err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); + if (err != 0) { + ngx_log_error(NGX_LOG_EMERG, log, err, + "pthread_mutexattr_settype" + "(PTHREAD_MUTEX_ERRORCHECK) failed"); + return NGX_ERROR; + } + + err = pthread_mutex_init(mtx, &attr); + if (err != 0) { + ngx_log_error(NGX_LOG_EMERG, log, err, + "pthread_mutex_init() failed"); + return NGX_ERROR; + } + + err = pthread_mutexattr_destroy(&attr); + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, log, err, + "pthread_mutexattr_destroy() failed"); + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "pthread_mutex_init(%p)", mtx); + return NGX_OK; +} + + +ngx_int_t +ngx_thread_mutex_destroy(ngx_thread_mutex_t *mtx, ngx_log_t *log) +{ + ngx_err_t err; + + err = pthread_mutex_destroy(mtx); + if (err != 0) { + ngx_log_error(NGX_LOG_ALERT, log, err, + "pthread_mutex_destroy() failed"); + return NGX_ERROR; + } + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "pthread_mutex_destroy(%p)", mtx); + return NGX_OK; +} + + +ngx_int_t +ngx_thread_mutex_lock(ngx_thread_mutex_t *mtx, ngx_log_t *log) +{ + ngx_err_t err; + + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "pthread_mutex_lock(%p) enter", mtx); + + err = pthread_mutex_lock(mtx); + if (err == 0) { + return NGX_OK; + } + + ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_mutex_lock() failed"); + + return NGX_ERROR; +} + + +ngx_int_t +ngx_thread_mutex_unlock(ngx_thread_mutex_t *mtx, ngx_log_t *log) +{ + ngx_err_t err; + + err = pthread_mutex_unlock(mtx); + +#if 0 + ngx_time_update(); +#endif + + if (err == 0) { + ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0, + "pthread_mutex_unlock(%p) exit", mtx); + return NGX_OK; + } + + ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_mutex_unlock() failed"); + + return NGX_ERROR; +}