comparison src/event/ngx_event_udp.c @ 7286:d27aa9060c95

Stream: udp streams. Previously, only one client packet could be processed in a udp stream session even though multiple response packets were supported. Now multiple packets coming from the same client address and port are delivered to the same stream session. If it's required to maintain a single stream of data, nginx should be configured in a way that all packets from a client are delivered to the same worker. On Linux and DragonFly BSD the "reuseport" parameter should be specified for this. Other systems do not currently provide appropriate mechanisms. For these systems a single stream of udp packets is only guaranteed in single-worker configurations. The proxy_response directive now specifies how many packets are expected in response to a single client packet.
author Roman Arutyunyan <arut@nginx.com>
date Mon, 04 Jun 2018 19:50:00 +0300
parents 88a624c9b491
children 27559d4a5151
comparison
equal deleted inserted replaced
7285:88a624c9b491 7286:d27aa9060c95
10 #include <ngx_event.h> 10 #include <ngx_event.h>
11 11
12 12
13 #if !(NGX_WIN32) 13 #if !(NGX_WIN32)
14 14
15 struct ngx_udp_connection_s {
16 ngx_rbtree_node_t node;
17 ngx_connection_t *connection;
18 ngx_buf_t *buffer;
19 };
20
21
15 static void ngx_close_accepted_udp_connection(ngx_connection_t *c); 22 static void ngx_close_accepted_udp_connection(ngx_connection_t *c);
23 static ssize_t ngx_udp_shared_recv(ngx_connection_t *c, u_char *buf,
24 size_t size);
25 static ngx_int_t ngx_insert_udp_connection(ngx_connection_t *c);
26 static void ngx_delete_udp_connection(void *data);
27 static ngx_connection_t *ngx_lookup_udp_connection(ngx_listening_t *ls,
28 struct sockaddr *sockaddr, socklen_t socklen,
29 struct sockaddr *local_sockaddr, socklen_t local_socklen);
16 30
17 31
18 void 32 void
19 ngx_event_recvmsg(ngx_event_t *ev) 33 ngx_event_recvmsg(ngx_event_t *ev)
20 { 34 {
21 ssize_t n; 35 ssize_t n;
36 ngx_buf_t buf;
22 ngx_log_t *log; 37 ngx_log_t *log;
23 ngx_err_t err; 38 ngx_err_t err;
24 socklen_t socklen, local_socklen; 39 socklen_t socklen, local_socklen;
25 ngx_event_t *rev, *wev; 40 ngx_event_t *rev, *wev;
26 struct iovec iov[1]; 41 struct iovec iov[1];
213 } 228 }
214 } 229 }
215 230
216 #endif 231 #endif
217 232
233 c = ngx_lookup_udp_connection(ls, sockaddr, socklen, local_sockaddr,
234 local_socklen);
235
236 if (c) {
237
238 #if (NGX_DEBUG)
239 if (c->log->log_level & NGX_LOG_DEBUG_EVENT) {
240 ngx_log_handler_pt handler;
241
242 handler = c->log->handler;
243 c->log->handler = NULL;
244
245 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
246 "recvmsg: fd:%d n:%z", c->fd, n);
247
248 c->log->handler = handler;
249 }
250 #endif
251
252 ngx_memzero(&buf, sizeof(ngx_buf_t));
253
254 buf.pos = buffer;
255 buf.last = buffer + n;
256
257 rev = c->read;
258
259 c->udp->buffer = &buf;
260 rev->ready = 1;
261
262 rev->handler(rev);
263
264 c->udp->buffer = NULL;
265 rev->ready = 0;
266
267 goto next;
268 }
269
218 #if (NGX_STAT_STUB) 270 #if (NGX_STAT_STUB)
219 (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1); 271 (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1);
220 #endif 272 #endif
221 273
222 ngx_accept_disabled = ngx_cycle->connection_n / 8 274 ngx_accept_disabled = ngx_cycle->connection_n / 8
255 return; 307 return;
256 } 308 }
257 309
258 *log = ls->log; 310 *log = ls->log;
259 311
312 c->recv = ngx_udp_shared_recv;
260 c->send = ngx_udp_send; 313 c->send = ngx_udp_send;
261 c->send_chain = ngx_udp_send_chain; 314 c->send_chain = ngx_udp_send_chain;
262 315
263 c->log = log; 316 c->log = log;
264 c->pool->log = log; 317 c->pool->log = log;
342 } 395 }
343 396
344 } 397 }
345 #endif 398 #endif
346 399
400 if (ngx_insert_udp_connection(c) != NGX_OK) {
401 ngx_close_accepted_udp_connection(c);
402 return;
403 }
404
347 log->data = NULL; 405 log->data = NULL;
348 log->handler = NULL; 406 log->handler = NULL;
349 407
350 ls->handler(c); 408 ls->handler(c);
409
410 next:
351 411
352 if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { 412 if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
353 ev->available -= n; 413 ev->available -= n;
354 } 414 }
355 415
371 #if (NGX_STAT_STUB) 431 #if (NGX_STAT_STUB)
372 (void) ngx_atomic_fetch_add(ngx_stat_active, -1); 432 (void) ngx_atomic_fetch_add(ngx_stat_active, -1);
373 #endif 433 #endif
374 } 434 }
375 435
376 #endif 436
437 static ssize_t
438 ngx_udp_shared_recv(ngx_connection_t *c, u_char *buf, size_t size)
439 {
440 ssize_t n;
441 ngx_buf_t *b;
442
443 if (c->udp == NULL || c->udp->buffer == NULL) {
444 return NGX_AGAIN;
445 }
446
447 b = c->udp->buffer;
448
449 n = ngx_min(b->last - b->pos, (ssize_t) size);
450
451 ngx_memcpy(buf, b->pos, n);
452
453 c->udp->buffer = NULL;
454 c->read->ready = 0;
455
456 return n;
457 }
458
459
460 void
461 ngx_udp_rbtree_insert_value(ngx_rbtree_node_t *temp,
462 ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
463 {
464 ngx_int_t rc;
465 ngx_connection_t *c, *ct;
466 ngx_rbtree_node_t **p;
467 ngx_udp_connection_t *udp, *udpt;
468
469 for ( ;; ) {
470
471 if (node->key < temp->key) {
472
473 p = &temp->left;
474
475 } else if (node->key > temp->key) {
476
477 p = &temp->right;
478
479 } else { /* node->key == temp->key */
480
481 udp = (ngx_udp_connection_t *) node;
482 c = udp->connection;
483
484 udpt = (ngx_udp_connection_t *) temp;
485 ct = udpt->connection;
486
487 rc = ngx_cmp_sockaddr(c->sockaddr, c->socklen,
488 ct->sockaddr, ct->socklen, 1);
489
490 if (rc == 0 && c->listening->wildcard) {
491 rc = ngx_cmp_sockaddr(c->local_sockaddr, c->local_socklen,
492 ct->local_sockaddr, ct->local_socklen, 1);
493 }
494
495 p = (rc < 0) ? &temp->left : &temp->right;
496 }
497
498 if (*p == sentinel) {
499 break;
500 }
501
502 temp = *p;
503 }
504
505 *p = node;
506 node->parent = temp;
507 node->left = sentinel;
508 node->right = sentinel;
509 ngx_rbt_red(node);
510 }
511
512
513 static ngx_int_t
514 ngx_insert_udp_connection(ngx_connection_t *c)
515 {
516 uint32_t hash;
517 ngx_pool_cleanup_t *cln;
518 ngx_udp_connection_t *udp;
519
520 if (c->udp) {
521 return NGX_OK;
522 }
523
524 udp = ngx_pcalloc(c->pool, sizeof(ngx_udp_connection_t));
525 if (udp == NULL) {
526 return NGX_ERROR;
527 }
528
529 udp->connection = c;
530
531 ngx_crc32_init(hash);
532 ngx_crc32_update(&hash, (u_char *) c->sockaddr, c->socklen);
533
534 if (c->listening->wildcard) {
535 ngx_crc32_update(&hash, (u_char *) c->local_sockaddr, c->local_socklen);
536 }
537
538 ngx_crc32_final(hash);
539
540 udp->node.key = hash;
541
542 cln = ngx_pool_cleanup_add(c->pool, 0);
543 if (cln == NULL) {
544 return NGX_ERROR;
545 }
546
547 cln->data = c;
548 cln->handler = ngx_delete_udp_connection;
549
550 ngx_rbtree_insert(&c->listening->rbtree, &udp->node);
551
552 c->udp = udp;
553
554 return NGX_OK;
555 }
556
557
558 static void
559 ngx_delete_udp_connection(void *data)
560 {
561 ngx_connection_t *c = data;
562
563 ngx_rbtree_delete(&c->listening->rbtree, &c->udp->node);
564 }
565
566
567 static ngx_connection_t *
568 ngx_lookup_udp_connection(ngx_listening_t *ls, struct sockaddr *sockaddr,
569 socklen_t socklen, struct sockaddr *local_sockaddr, socklen_t local_socklen)
570 {
571 uint32_t hash;
572 ngx_int_t rc;
573 ngx_connection_t *c;
574 ngx_rbtree_node_t *node, *sentinel;
575 ngx_udp_connection_t *udp;
576
577 #if (NGX_HAVE_UNIX_DOMAIN)
578
579 if (sockaddr->sa_family == AF_UNIX) {
580 struct sockaddr_un *saun = (struct sockaddr_un *) sockaddr;
581
582 if (socklen <= (socklen_t) offsetof(struct sockaddr_un, sun_path)
583 || saun->sun_path[0] == '\0')
584 {
585 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0,
586 "unbound unix socket");
587 return NULL;
588 }
589 }
590
591 #endif
592
593 node = ls->rbtree.root;
594 sentinel = ls->rbtree.sentinel;
595
596 ngx_crc32_init(hash);
597 ngx_crc32_update(&hash, (u_char *) sockaddr, socklen);
598
599 if (ls->wildcard) {
600 ngx_crc32_update(&hash, (u_char *) local_sockaddr, local_socklen);
601 }
602
603 ngx_crc32_final(hash);
604
605 while (node != sentinel) {
606
607 if (hash < node->key) {
608 node = node->left;
609 continue;
610 }
611
612 if (hash > node->key) {
613 node = node->right;
614 continue;
615 }
616
617 /* hash == node->key */
618
619 udp = (ngx_udp_connection_t *) node;
620
621 c = udp->connection;
622
623 rc = ngx_cmp_sockaddr(sockaddr, socklen,
624 c->sockaddr, c->socklen, 1);
625
626 if (rc == 0 && ls->wildcard) {
627 rc = ngx_cmp_sockaddr(local_sockaddr, local_socklen,
628 c->local_sockaddr, c->local_socklen, 1);
629 }
630
631 if (rc == 0) {
632 return c;
633 }
634
635 node = (rc < 0) ? node->left : node->right;
636 }
637
638 return NULL;
639 }
640
641 #endif