comparison src/stream/ngx_stream_proxy_module.c @ 6436:8f038068f4bc

Stream: UDP proxy.
author Roman Arutyunyan <arut@nginx.com>
date Wed, 20 Jan 2016 19:52:12 +0300
parents d1c791479bbb
children a01e315b3a78
comparison
equal deleted inserted replaced
6435:d1c791479bbb 6436:8f038068f4bc
15 ngx_msec_t timeout; 15 ngx_msec_t timeout;
16 ngx_msec_t next_upstream_timeout; 16 ngx_msec_t next_upstream_timeout;
17 size_t buffer_size; 17 size_t buffer_size;
18 size_t upload_rate; 18 size_t upload_rate;
19 size_t download_rate; 19 size_t download_rate;
20 ngx_uint_t responses;
20 ngx_uint_t next_upstream_tries; 21 ngx_uint_t next_upstream_tries;
21 ngx_flag_t next_upstream; 22 ngx_flag_t next_upstream;
22 ngx_flag_t proxy_protocol; 23 ngx_flag_t proxy_protocol;
23 ngx_addr_t *local; 24 ngx_addr_t *local;
24 25
165 ngx_conf_set_size_slot, 166 ngx_conf_set_size_slot,
166 NGX_STREAM_SRV_CONF_OFFSET, 167 NGX_STREAM_SRV_CONF_OFFSET,
167 offsetof(ngx_stream_proxy_srv_conf_t, download_rate), 168 offsetof(ngx_stream_proxy_srv_conf_t, download_rate),
168 NULL }, 169 NULL },
169 170
171 { ngx_string("proxy_responses"),
172 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
173 ngx_conf_set_num_slot,
174 NGX_STREAM_SRV_CONF_OFFSET,
175 offsetof(ngx_stream_proxy_srv_conf_t, responses),
176 NULL },
177
170 { ngx_string("proxy_next_upstream"), 178 { ngx_string("proxy_next_upstream"),
171 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, 179 NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
172 ngx_conf_set_flag_slot, 180 ngx_conf_set_flag_slot,
173 NGX_STREAM_SRV_CONF_OFFSET, 181 NGX_STREAM_SRV_CONF_OFFSET,
174 offsetof(ngx_stream_proxy_srv_conf_t, next_upstream), 182 offsetof(ngx_stream_proxy_srv_conf_t, next_upstream),
349 357
350 u->peer.log = c->log; 358 u->peer.log = c->log;
351 u->peer.log_error = NGX_ERROR_ERR; 359 u->peer.log_error = NGX_ERROR_ERR;
352 360
353 u->peer.local = pscf->local; 361 u->peer.local = pscf->local;
362 u->peer.type = c->type;
354 363
355 uscf = pscf->upstream; 364 uscf = pscf->upstream;
356 365
357 if (uscf->peer.init(s, uscf) != NGX_OK) { 366 if (uscf->peer.init(s, uscf) != NGX_OK) {
358 ngx_stream_proxy_finalize(s, NGX_ERROR); 367 ngx_stream_proxy_finalize(s, NGX_ERROR);
368 } 377 }
369 378
370 u->proxy_protocol = pscf->proxy_protocol; 379 u->proxy_protocol = pscf->proxy_protocol;
371 u->start_sec = ngx_time(); 380 u->start_sec = ngx_time();
372 381
382 c->write->handler = ngx_stream_proxy_downstream_handler;
383 c->read->handler = ngx_stream_proxy_downstream_handler;
384
385 if (c->type == SOCK_DGRAM) {
386 ngx_stream_proxy_connect(s);
387 return;
388 }
389
373 p = ngx_pnalloc(c->pool, pscf->buffer_size); 390 p = ngx_pnalloc(c->pool, pscf->buffer_size);
374 if (p == NULL) { 391 if (p == NULL) {
375 ngx_stream_proxy_finalize(s, NGX_ERROR); 392 ngx_stream_proxy_finalize(s, NGX_ERROR);
376 return; 393 return;
377 } 394 }
378 395
379 u->downstream_buf.start = p; 396 u->downstream_buf.start = p;
380 u->downstream_buf.end = p + pscf->buffer_size; 397 u->downstream_buf.end = p + pscf->buffer_size;
381 u->downstream_buf.pos = p; 398 u->downstream_buf.pos = p;
382 u->downstream_buf.last = p; 399 u->downstream_buf.last = p;
383
384 c->write->handler = ngx_stream_proxy_downstream_handler;
385 c->read->handler = ngx_stream_proxy_downstream_handler;
386 400
387 if (u->proxy_protocol 401 if (u->proxy_protocol
388 #if (NGX_STREAM_SSL) 402 #if (NGX_STREAM_SSL)
389 && pscf->ssl == NULL 403 && pscf->ssl == NULL
390 #endif 404 #endif
486 u = s->upstream; 500 u = s->upstream;
487 pc = u->peer.connection; 501 pc = u->peer.connection;
488 502
489 cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); 503 cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
490 504
491 if (cscf->tcp_nodelay && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { 505 if (pc->type == SOCK_STREAM
506 && cscf->tcp_nodelay
507 && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET)
508 {
492 ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay"); 509 ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay");
493 510
494 tcp_nodelay = 1; 511 tcp_nodelay = 1;
495 512
496 if (setsockopt(pc->fd, IPPROTO_TCP, TCP_NODELAY, 513 if (setsockopt(pc->fd, IPPROTO_TCP, TCP_NODELAY,
514 } 531 }
515 532
516 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); 533 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
517 534
518 #if (NGX_STREAM_SSL) 535 #if (NGX_STREAM_SSL)
519 if (pscf->ssl && pc->ssl == NULL) { 536 if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) {
520 ngx_stream_proxy_ssl_init_connection(s); 537 ngx_stream_proxy_ssl_init_connection(s);
521 return; 538 return;
522 } 539 }
523 #endif 540 #endif
524 541
542 } 559 }
543 } 560 }
544 561
545 c->log->action = "proxying connection"; 562 c->log->action = "proxying connection";
546 563
547 p = ngx_pnalloc(c->pool, pscf->buffer_size); 564 if (u->upstream_buf.start == NULL) {
548 if (p == NULL) { 565 p = ngx_pnalloc(c->pool, pscf->buffer_size);
549 ngx_stream_proxy_finalize(s, NGX_ERROR); 566 if (p == NULL) {
550 return; 567 ngx_stream_proxy_finalize(s, NGX_ERROR);
551 } 568 return;
552 569 }
553 u->upstream_buf.start = p; 570
554 u->upstream_buf.end = p + pscf->buffer_size; 571 u->upstream_buf.start = p;
555 u->upstream_buf.pos = p; 572 u->upstream_buf.end = p + pscf->buffer_size;
556 u->upstream_buf.last = p; 573 u->upstream_buf.pos = p;
574 u->upstream_buf.last = p;
575 }
576
577 if (c->type == SOCK_DGRAM) {
578 s->received = c->buffer->last - c->buffer->pos;
579 u->downstream_buf = *c->buffer;
580
581 if (pscf->responses == 0) {
582 pc->read->ready = 0;
583 pc->read->eof = 1;
584 }
585 }
557 586
558 u->connected = 1; 587 u->connected = 1;
559 588
560 pc->read->handler = ngx_stream_proxy_upstream_handler; 589 pc->read->handler = ngx_stream_proxy_upstream_handler;
561 pc->write->handler = ngx_stream_proxy_upstream_handler; 590 pc->write->handler = ngx_stream_proxy_upstream_handler;
562 591
563 if (pc->read->ready) { 592 if (pc->read->ready || pc->read->eof) {
564 ngx_post_event(pc->read, &ngx_posted_events); 593 ngx_post_event(pc->read, &ngx_posted_events);
565 } 594 }
566 595
567 ngx_stream_proxy_process(s, 0, 1); 596 ngx_stream_proxy_process(s, 0, 1);
568 } 597 }
892 921
893 c = ev->data; 922 c = ev->data;
894 s = c->data; 923 s = c->data;
895 u = s->upstream; 924 u = s->upstream;
896 925
926 c = s->connection;
927 pc = u->peer.connection;
928
929 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
930
897 if (ev->timedout) { 931 if (ev->timedout) {
932 ev->timedout = 0;
898 933
899 if (ev->delayed) { 934 if (ev->delayed) {
900
901 ev->timedout = 0;
902 ev->delayed = 0; 935 ev->delayed = 0;
903 936
904 if (!ev->ready) { 937 if (!ev->ready) {
905 if (ngx_handle_read_event(ev, 0) != NGX_OK) { 938 if (ngx_handle_read_event(ev, 0) != NGX_OK) {
906 ngx_stream_proxy_finalize(s, NGX_ERROR); 939 ngx_stream_proxy_finalize(s, NGX_ERROR);
907 return; 940 return;
908 } 941 }
909 942
910 if (u->connected) { 943 if (u->connected && !c->read->delayed && !pc->read->delayed) {
911 pc = u->peer.connection; 944 ngx_add_timer(c->write, pscf->timeout);
912
913 if (!c->read->delayed && !pc->read->delayed) {
914 pscf = ngx_stream_get_module_srv_conf(s,
915 ngx_stream_proxy_module);
916 ngx_add_timer(c->write, pscf->timeout);
917 }
918 } 945 }
919 946
920 return; 947 return;
921 } 948 }
922 949
923 } else { 950 } else {
951 if (s->connection->type == SOCK_DGRAM) {
952 if (pscf->responses == NGX_MAX_INT32_VALUE) {
953
954 /*
955 * successfully terminate timed out UDP session
956 * with unspecified number of responses
957 */
958
959 pc->read->ready = 0;
960 pc->read->eof = 1;
961
962 ngx_stream_proxy_process(s, 1, 0);
963 return;
964 }
965
966 if (u->received == 0) {
967 ngx_stream_proxy_next_upstream(s);
968 return;
969 }
970 }
971
924 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); 972 ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
925 ngx_stream_proxy_finalize(s, NGX_DECLINED); 973 ngx_stream_proxy_finalize(s, NGX_DECLINED);
926 return; 974 return;
927 } 975 }
928 976
1037 u = s->upstream; 1085 u = s->upstream;
1038 1086
1039 c = s->connection; 1087 c = s->connection;
1040 pc = u->connected ? u->peer.connection : NULL; 1088 pc = u->connected ? u->peer.connection : NULL;
1041 1089
1090 if (c->type == SOCK_DGRAM && (ngx_terminate || ngx_exiting)) {
1091
1092 /* socket is already closed on worker shutdown */
1093
1094 handler = c->log->handler;
1095 c->log->handler = NULL;
1096
1097 ngx_log_error(NGX_LOG_INFO, c->log, 0, "disconnected on shutdown");
1098
1099 c->log->handler = handler;
1100
1101 ngx_stream_proxy_finalize(s, NGX_OK);
1102 return;
1103 }
1104
1042 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); 1105 pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
1043 1106
1044 if (from_upstream) { 1107 if (from_upstream) {
1045 src = pc; 1108 src = pc;
1046 dst = c; 1109 dst = c;
1064 1127
1065 if (size && dst && dst->write->ready) { 1128 if (size && dst && dst->write->ready) {
1066 1129
1067 n = dst->send(dst, b->pos, size); 1130 n = dst->send(dst, b->pos, size);
1068 1131
1132 if (n == NGX_AGAIN && dst->shared) {
1133 /* cannot wait on a shared socket */
1134 n = NGX_ERROR;
1135 }
1136
1069 if (n == NGX_ERROR) { 1137 if (n == NGX_ERROR) {
1138 if (c->type == SOCK_DGRAM && !from_upstream) {
1139 ngx_stream_proxy_next_upstream(s);
1140 return;
1141 }
1142
1070 ngx_stream_proxy_finalize(s, NGX_DECLINED); 1143 ngx_stream_proxy_finalize(s, NGX_DECLINED);
1071 return; 1144 return;
1072 } 1145 }
1073 1146
1074 if (n > 0) { 1147 if (n > 0) {
1116 src->read->delayed = 1; 1189 src->read->delayed = 1;
1117 ngx_add_timer(src->read, delay); 1190 ngx_add_timer(src->read, delay);
1118 } 1191 }
1119 } 1192 }
1120 1193
1194 if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses)
1195 {
1196 src->read->ready = 0;
1197 src->read->eof = 1;
1198 }
1199
1121 *received += n; 1200 *received += n;
1122 b->last += n; 1201 b->last += n;
1123 do_write = 1; 1202 do_write = 1;
1124 1203
1125 continue; 1204 continue;
1126 } 1205 }
1127 1206
1128 if (n == NGX_ERROR) { 1207 if (n == NGX_ERROR) {
1208 if (c->type == SOCK_DGRAM && u->received == 0) {
1209 ngx_stream_proxy_next_upstream(s);
1210 return;
1211 }
1212
1129 src->read->eof = 1; 1213 src->read->eof = 1;
1130 } 1214 }
1131 } 1215 }
1132 1216
1133 break; 1217 break;
1150 return; 1234 return;
1151 } 1235 }
1152 1236
1153 flags = src->read->eof ? NGX_CLOSE_EVENT : 0; 1237 flags = src->read->eof ? NGX_CLOSE_EVENT : 0;
1154 1238
1155 if (ngx_handle_read_event(src->read, flags) != NGX_OK) { 1239 if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) {
1156 ngx_stream_proxy_finalize(s, NGX_ERROR); 1240 ngx_stream_proxy_finalize(s, NGX_ERROR);
1157 return; 1241 return;
1158 } 1242 }
1159 1243
1160 if (dst) { 1244 if (dst) {
1161 if (ngx_handle_write_event(dst->write, 0) != NGX_OK) { 1245 if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) {
1162 ngx_stream_proxy_finalize(s, NGX_ERROR); 1246 ngx_stream_proxy_finalize(s, NGX_ERROR);
1163 return; 1247 return;
1164 } 1248 }
1165 1249
1166 if (!c->read->delayed && !pc->read->delayed) { 1250 if (!c->read->delayed && !pc->read->delayed) {
1329 conf->timeout = NGX_CONF_UNSET_MSEC; 1413 conf->timeout = NGX_CONF_UNSET_MSEC;
1330 conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC; 1414 conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC;
1331 conf->buffer_size = NGX_CONF_UNSET_SIZE; 1415 conf->buffer_size = NGX_CONF_UNSET_SIZE;
1332 conf->upload_rate = NGX_CONF_UNSET_SIZE; 1416 conf->upload_rate = NGX_CONF_UNSET_SIZE;
1333 conf->download_rate = NGX_CONF_UNSET_SIZE; 1417 conf->download_rate = NGX_CONF_UNSET_SIZE;
1418 conf->responses = NGX_CONF_UNSET_UINT;
1334 conf->next_upstream_tries = NGX_CONF_UNSET_UINT; 1419 conf->next_upstream_tries = NGX_CONF_UNSET_UINT;
1335 conf->next_upstream = NGX_CONF_UNSET; 1420 conf->next_upstream = NGX_CONF_UNSET;
1336 conf->proxy_protocol = NGX_CONF_UNSET; 1421 conf->proxy_protocol = NGX_CONF_UNSET;
1337 conf->local = NGX_CONF_UNSET_PTR; 1422 conf->local = NGX_CONF_UNSET_PTR;
1338 1423
1371 prev->upload_rate, 0); 1456 prev->upload_rate, 0);
1372 1457
1373 ngx_conf_merge_size_value(conf->download_rate, 1458 ngx_conf_merge_size_value(conf->download_rate,
1374 prev->download_rate, 0); 1459 prev->download_rate, 0);
1375 1460
1461 ngx_conf_merge_uint_value(conf->responses,
1462 prev->responses, NGX_MAX_INT32_VALUE);
1463
1376 ngx_conf_merge_uint_value(conf->next_upstream_tries, 1464 ngx_conf_merge_uint_value(conf->next_upstream_tries,
1377 prev->next_upstream_tries, 0); 1465 prev->next_upstream_tries, 0);
1378 1466
1379 ngx_conf_merge_value(conf->next_upstream, prev->next_upstream, 1); 1467 ngx_conf_merge_value(conf->next_upstream, prev->next_upstream, 1);
1380 1468