comparison src/event/ngx_event_accept.c @ 6436:8f038068f4bc

Stream: UDP proxy.
author Roman Arutyunyan <arut@nginx.com>
date Wed, 20 Jan 2016 19:52:12 +0300
parents 4f6efabcb09b
children 2f98b5709d79
comparison
equal deleted inserted replaced
6435:d1c791479bbb 6436:8f038068f4bc
11 11
12 12
13 static ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle); 13 static ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle);
14 static ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle, ngx_uint_t all); 14 static ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle, ngx_uint_t all);
15 static void ngx_close_accepted_connection(ngx_connection_t *c); 15 static void ngx_close_accepted_connection(ngx_connection_t *c);
16 #if (NGX_DEBUG)
17 static void ngx_debug_accepted_connection(ngx_event_conf_t *ecf,
18 ngx_connection_t *c);
19 #endif
16 20
17 21
18 void 22 void
19 ngx_event_accept(ngx_event_t *ev) 23 ngx_event_accept(ngx_event_t *ev)
20 { 24 {
147 } 151 }
148 152
149 return; 153 return;
150 } 154 }
151 155
156 c->type = SOCK_STREAM;
157
152 #if (NGX_STAT_STUB) 158 #if (NGX_STAT_STUB)
153 (void) ngx_atomic_fetch_add(ngx_stat_active, 1); 159 (void) ngx_atomic_fetch_add(ngx_stat_active, 1);
154 #endif 160 #endif
155 161
156 c->pool = ngx_create_pool(ls->pool_size, ev->log); 162 c->pool = ngx_create_pool(ls->pool_size, ev->log);
274 } 280 }
275 } 281 }
276 282
277 #if (NGX_DEBUG) 283 #if (NGX_DEBUG)
278 { 284 {
279 285 ngx_str_t addr;
280 ngx_str_t addr; 286 u_char text[NGX_SOCKADDR_STRLEN];
281 struct sockaddr_in *sin; 287
282 ngx_cidr_t *cidr; 288 ngx_debug_accepted_connection(ecf, c);
283 ngx_uint_t i;
284 u_char text[NGX_SOCKADDR_STRLEN];
285 #if (NGX_HAVE_INET6)
286 struct sockaddr_in6 *sin6;
287 ngx_uint_t n;
288 #endif
289
290 cidr = ecf->debug_connection.elts;
291 for (i = 0; i < ecf->debug_connection.nelts; i++) {
292 if (cidr[i].family != (ngx_uint_t) c->sockaddr->sa_family) {
293 goto next;
294 }
295
296 switch (cidr[i].family) {
297
298 #if (NGX_HAVE_INET6)
299 case AF_INET6:
300 sin6 = (struct sockaddr_in6 *) c->sockaddr;
301 for (n = 0; n < 16; n++) {
302 if ((sin6->sin6_addr.s6_addr[n]
303 & cidr[i].u.in6.mask.s6_addr[n])
304 != cidr[i].u.in6.addr.s6_addr[n])
305 {
306 goto next;
307 }
308 }
309 break;
310 #endif
311
312 #if (NGX_HAVE_UNIX_DOMAIN)
313 case AF_UNIX:
314 break;
315 #endif
316
317 default: /* AF_INET */
318 sin = (struct sockaddr_in *) c->sockaddr;
319 if ((sin->sin_addr.s_addr & cidr[i].u.in.mask)
320 != cidr[i].u.in.addr)
321 {
322 goto next;
323 }
324 break;
325 }
326
327 log->log_level = NGX_LOG_DEBUG_CONNECTION|NGX_LOG_DEBUG_ALL;
328 break;
329
330 next:
331 continue;
332 }
333 289
334 if (log->log_level & NGX_LOG_DEBUG_EVENT) { 290 if (log->log_level & NGX_LOG_DEBUG_EVENT) {
335 addr.data = text; 291 addr.data = text;
336 addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, 292 addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
337 NGX_SOCKADDR_STRLEN, 1); 293 NGX_SOCKADDR_STRLEN, 1);
359 ev->available--; 315 ev->available--;
360 } 316 }
361 317
362 } while (ev->available); 318 } while (ev->available);
363 } 319 }
320
321
322 #if !(NGX_WIN32)
323
324 void
325 ngx_event_recvmsg(ngx_event_t *ev)
326 {
327 ssize_t n;
328 ngx_log_t *log;
329 ngx_err_t err;
330 ngx_event_t *rev, *wev;
331 struct iovec iov[1];
332 struct msghdr msg;
333 ngx_listening_t *ls;
334 ngx_event_conf_t *ecf;
335 ngx_connection_t *c, *lc;
336 u_char sa[NGX_SOCKADDRLEN];
337 static u_char buffer[65535];
338
339 #if (NGX_HAVE_MSGHDR_MSG_CONTROL)
340
341 #if (NGX_HAVE_IP_RECVDSTADDR)
342 u_char msg_control[CMSG_SPACE(sizeof(struct in_addr))];
343 #elif (NGX_HAVE_IP_PKTINFO)
344 u_char msg_control[CMSG_SPACE(sizeof(struct in_pktinfo))];
345 #endif
346
347 #if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO)
348 u_char msg_control6[CMSG_SPACE(sizeof(struct in6_pktinfo))];
349 #endif
350
351 #endif
352
353 if (ev->timedout) {
354 if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
355 return;
356 }
357
358 ev->timedout = 0;
359 }
360
361 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
362
363 if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
364 ev->available = ecf->multi_accept;
365 }
366
367 lc = ev->data;
368 ls = lc->listening;
369 ev->ready = 0;
370
371 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
372 "recvmsg on %V, ready: %d", &ls->addr_text, ev->available);
373
374 do {
375 ngx_memzero(&msg, sizeof(struct msghdr));
376
377 iov[0].iov_base = (void *) buffer;
378 iov[0].iov_len = sizeof(buffer);
379
380 msg.msg_name = &sa;
381 msg.msg_namelen = sizeof(sa);
382 msg.msg_iov = iov;
383 msg.msg_iovlen = 1;
384
385 #if (NGX_HAVE_MSGHDR_MSG_CONTROL)
386
387 if (ls->wildcard) {
388
389 #if (NGX_HAVE_IP_RECVDSTADDR || NGX_HAVE_IP_PKTINFO)
390 if (ls->sockaddr->sa_family == AF_INET) {
391 msg.msg_control = &msg_control;
392 msg.msg_controllen = sizeof(msg_control);
393 }
394 #endif
395
396 #if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO)
397 if (ls->sockaddr->sa_family == AF_INET6) {
398 msg.msg_control = &msg_control6;
399 msg.msg_controllen = sizeof(msg_control6);
400 }
401 #endif
402 }
403
404 #endif
405
406 n = recvmsg(lc->fd, &msg, 0);
407
408 if (n == -1) {
409 err = ngx_socket_errno;
410
411 if (err == NGX_EAGAIN) {
412 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err,
413 "recvmsg() not ready");
414 return;
415 }
416
417 ngx_log_error(NGX_LOG_ALERT, ev->log, err, "recvmsg() failed");
418
419 return;
420 }
421
422 #if (NGX_STAT_STUB)
423 (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1);
424 #endif
425
426 #if (NGX_HAVE_MSGHDR_MSG_CONTROL)
427 if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) {
428 ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
429 "recvmsg() truncated data");
430 continue;
431 }
432 #endif
433
434 ngx_accept_disabled = ngx_cycle->connection_n / 8
435 - ngx_cycle->free_connection_n;
436
437 c = ngx_get_connection(lc->fd, ev->log);
438 if (c == NULL) {
439 return;
440 }
441
442 c->shared = 1;
443 c->type = SOCK_DGRAM;
444 c->socklen = msg.msg_namelen;
445
446 #if (NGX_STAT_STUB)
447 (void) ngx_atomic_fetch_add(ngx_stat_active, 1);
448 #endif
449
450 c->pool = ngx_create_pool(ls->pool_size, ev->log);
451 if (c->pool == NULL) {
452 ngx_close_accepted_connection(c);
453 return;
454 }
455
456 c->sockaddr = ngx_palloc(c->pool, c->socklen);
457 if (c->sockaddr == NULL) {
458 ngx_close_accepted_connection(c);
459 return;
460 }
461
462 ngx_memcpy(c->sockaddr, msg.msg_name, c->socklen);
463
464 log = ngx_palloc(c->pool, sizeof(ngx_log_t));
465 if (log == NULL) {
466 ngx_close_accepted_connection(c);
467 return;
468 }
469
470 *log = ls->log;
471
472 c->send = ngx_udp_send;
473
474 c->log = log;
475 c->pool->log = log;
476
477 c->listening = ls;
478 c->local_sockaddr = ls->sockaddr;
479 c->local_socklen = ls->socklen;
480
481 #if (NGX_HAVE_MSGHDR_MSG_CONTROL)
482
483 if (ls->wildcard) {
484 struct cmsghdr *cmsg;
485 struct sockaddr *sockaddr;
486
487 sockaddr = ngx_palloc(c->pool, c->local_socklen);
488 if (sockaddr == NULL) {
489 ngx_close_accepted_connection(c);
490 return;
491 }
492
493 ngx_memcpy(sockaddr, c->local_sockaddr, c->local_socklen);
494 c->local_sockaddr = sockaddr;
495
496 for (cmsg = CMSG_FIRSTHDR(&msg);
497 cmsg != NULL;
498 cmsg = CMSG_NXTHDR(&msg, cmsg))
499 {
500
501 #if (NGX_HAVE_IP_RECVDSTADDR)
502
503 if (cmsg->cmsg_level == IPPROTO_IP
504 && cmsg->cmsg_type == IP_RECVDSTADDR
505 && sockaddr->sa_family == AF_INET)
506 {
507 struct in_addr *addr;
508 struct sockaddr_in *sin;
509
510 addr = (struct in_addr *) CMSG_DATA(cmsg);
511 sin = (struct sockaddr_in *) sockaddr;
512 sin->sin_addr = *addr;
513
514 break;
515 }
516
517 #elif (NGX_HAVE_IP_PKTINFO)
518
519 if (cmsg->cmsg_level == IPPROTO_IP
520 && cmsg->cmsg_type == IP_PKTINFO
521 && sockaddr->sa_family == AF_INET)
522 {
523 struct in_pktinfo *pkt;
524 struct sockaddr_in *sin;
525
526 pkt = (struct in_pktinfo *) CMSG_DATA(cmsg);
527 sin = (struct sockaddr_in *) sockaddr;
528 sin->sin_addr = pkt->ipi_addr;
529
530 break;
531 }
532
533 #endif
534
535 #if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO)
536
537 if (cmsg->cmsg_level == IPPROTO_IPV6
538 && cmsg->cmsg_type == IPV6_PKTINFO
539 && sockaddr->sa_family == AF_INET6)
540 {
541 struct in6_pktinfo *pkt6;
542 struct sockaddr_in6 *sin6;
543
544 pkt6 = (struct in6_pktinfo *) CMSG_DATA(cmsg);
545 sin6 = (struct sockaddr_in6 *) sockaddr;
546 sin6->sin6_addr = pkt6->ipi6_addr;
547
548 break;
549 }
550
551 #endif
552
553 }
554 }
555
556 #endif
557
558 c->buffer = ngx_create_temp_buf(c->pool, n);
559 if (c->buffer == NULL) {
560 ngx_close_accepted_connection(c);
561 return;
562 }
563
564 c->buffer->last = ngx_cpymem(c->buffer->last, buffer, n);
565
566 rev = c->read;
567 wev = c->write;
568
569 wev->ready = 1;
570
571 rev->log = log;
572 wev->log = log;
573
574 /*
575 * TODO: MT: - ngx_atomic_fetch_add()
576 * or protection by critical section or light mutex
577 *
578 * TODO: MP: - allocated in a shared memory
579 * - ngx_atomic_fetch_add()
580 * or protection by critical section or light mutex
581 */
582
583 c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
584
585 #if (NGX_STAT_STUB)
586 (void) ngx_atomic_fetch_add(ngx_stat_handled, 1);
587 #endif
588
589 if (ls->addr_ntop) {
590 c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
591 if (c->addr_text.data == NULL) {
592 ngx_close_accepted_connection(c);
593 return;
594 }
595
596 c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
597 c->addr_text.data,
598 ls->addr_text_max_len, 0);
599 if (c->addr_text.len == 0) {
600 ngx_close_accepted_connection(c);
601 return;
602 }
603 }
604
605 #if (NGX_DEBUG)
606 {
607 ngx_str_t addr;
608 u_char text[NGX_SOCKADDR_STRLEN];
609
610 ngx_debug_accepted_connection(ecf, c);
611
612 if (log->log_level & NGX_LOG_DEBUG_EVENT) {
613 addr.data = text;
614 addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
615 NGX_SOCKADDR_STRLEN, 1);
616
617 ngx_log_debug4(NGX_LOG_DEBUG_EVENT, log, 0,
618 "*%uA recvmsg: %V fd:%d n:%z",
619 c->number, &addr, c->fd, n);
620 }
621
622 }
623 #endif
624
625 log->data = NULL;
626 log->handler = NULL;
627
628 ls->handler(c);
629
630 if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
631 ev->available -= n;
632 }
633
634 } while (ev->available);
635 }
636
637 #endif
364 638
365 639
366 ngx_int_t 640 ngx_int_t
367 ngx_trylock_accept_mutex(ngx_cycle_t *cycle) 641 ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
368 { 642 {
474 ngx_free_connection(c); 748 ngx_free_connection(c);
475 749
476 fd = c->fd; 750 fd = c->fd;
477 c->fd = (ngx_socket_t) -1; 751 c->fd = (ngx_socket_t) -1;
478 752
479 if (ngx_close_socket(fd) == -1) { 753 if (!c->shared && ngx_close_socket(fd) == -1) {
480 ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno, 754 ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno,
481 ngx_close_socket_n " failed"); 755 ngx_close_socket_n " failed");
482 } 756 }
483 757
484 if (c->pool) { 758 if (c->pool) {
495 ngx_accept_log_error(ngx_log_t *log, u_char *buf, size_t len) 769 ngx_accept_log_error(ngx_log_t *log, u_char *buf, size_t len)
496 { 770 {
497 return ngx_snprintf(buf, len, " while accepting new connection on %V", 771 return ngx_snprintf(buf, len, " while accepting new connection on %V",
498 log->data); 772 log->data);
499 } 773 }
774
775
776 #if (NGX_DEBUG)
777
778 static void
779 ngx_debug_accepted_connection(ngx_event_conf_t *ecf, ngx_connection_t *c)
780 {
781 struct sockaddr_in *sin;
782 ngx_cidr_t *cidr;
783 ngx_uint_t i;
784 #if (NGX_HAVE_INET6)
785 struct sockaddr_in6 *sin6;
786 ngx_uint_t n;
787 #endif
788
789 cidr = ecf->debug_connection.elts;
790 for (i = 0; i < ecf->debug_connection.nelts; i++) {
791 if (cidr[i].family != (ngx_uint_t) c->sockaddr->sa_family) {
792 goto next;
793 }
794
795 switch (cidr[i].family) {
796
797 #if (NGX_HAVE_INET6)
798 case AF_INET6:
799 sin6 = (struct sockaddr_in6 *) c->sockaddr;
800 for (n = 0; n < 16; n++) {
801 if ((sin6->sin6_addr.s6_addr[n]
802 & cidr[i].u.in6.mask.s6_addr[n])
803 != cidr[i].u.in6.addr.s6_addr[n])
804 {
805 goto next;
806 }
807 }
808 break;
809 #endif
810
811 #if (NGX_HAVE_UNIX_DOMAIN)
812 case AF_UNIX:
813 break;
814 #endif
815
816 default: /* AF_INET */
817 sin = (struct sockaddr_in *) c->sockaddr;
818 if ((sin->sin_addr.s_addr & cidr[i].u.in.mask)
819 != cidr[i].u.in.addr)
820 {
821 goto next;
822 }
823 break;
824 }
825
826 c->log->log_level = NGX_LOG_DEBUG_CONNECTION|NGX_LOG_DEBUG_ALL;
827 break;
828
829 next:
830 continue;
831 }
832 }
833
834 #endif