# HG changeset patch # User Andrey Zelenkov # Date 1437393969 -10800 # Node ID 77359b849cd57244ba8cdc079d8390b9fe8d88d1 # Parent 9f5f604a840e7748160e9bd21bcd4d162b4466ea Tests: stream package. diff --git a/lib/Test/Nginx/Stream.pm b/lib/Test/Nginx/Stream.pm new file mode 100644 --- /dev/null +++ b/lib/Test/Nginx/Stream.pm @@ -0,0 +1,115 @@ +package Test::Nginx::Stream; + +# (C) Andrey Zelenkov +# (C) Nginx, Inc. + +# Module for nginx stream tests. + +############################################################################### + +use warnings; +use strict; + +use base qw/ Exporter /; +our @EXPORT_OK = qw/ stream /; + +use Test::More qw//; +use IO::Select; +use IO::Socket; + +use Test::Nginx; + +sub stream { + return Test::Nginx::Stream->new(@_); +} + +sub new { + my $self = {}; + bless $self, shift @_; + + unshift(@_, "PeerAddr") if @_ == 1; + + $self->{_socket} = IO::Socket::INET->new( + Proto => "tcp", + PeerAddr => '127.0.0.1:8080', + @_ + ) + or die "Can't connect to nginx: $!\n"; + + if ({@_}->{'SSL'}) { + require IO::Socket::SSL; + IO::Socket::SSL->start_SSL($self->{_socket}, @_) + or die $IO::Socket::SSL::SSL_ERROR . "\n"; + } + + $self->{_socket}->autoflush(1); + + return $self; +} + +sub write { + my ($self, $message) = @_; + my $s = $self->{_socket}; + + local $SIG{PIPE} = 'IGNORE'; + + $s->blocking(0); + while (IO::Select->new($s)->can_write(1.5)) { + my $n = $s->syswrite($message); + log_out(substr($message, 0, $n)); + last unless $n; + + $message = substr($message, $n); + last unless length $message; + } + + if (length $message) { + $s->close(); + } +} + +sub read { + my ($self) = @_; + my ($s, $buf); + + $s = $self->{_socket}; + + $s->blocking(0); + if (IO::Select->new($s)->can_read(5)) { + $s->sysread($buf, 1024); + }; + + log_in($buf); + return $buf; +} + +sub io { + my $self = shift; + + my ($data, %extra) = @_; + my $length = $extra{length}; + + $self->write($data); + + $data = ''; + while (1) { + my $buf = $self->read(); + last unless length($buf); + + $data .= $buf; + last if defined $length && length($data) >= $length; + } + + return $data; +} + +sub sockport { + my $self = shift; + return $self->{_socket}->sockport(); +} + +############################################################################### + +1; + +############################################################################### diff --git a/stream_access.t b/stream_access.t --- a/stream_access.t +++ b/stream_access.t @@ -12,12 +12,11 @@ use strict; use Test::More; -use IO::Select; - BEGIN { use FindBin; chdir($FindBin::Bin); } use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -159,97 +158,39 @@ my $str = 'SEE-THIS'; # allow all -is(stream_get($str, '127.0.0.1:8081'), $str, 'inet allow all'); -is(stream_get($str, '127.0.0.1:8082'), $str, 'inet6 allow all'); -is(stream_get($str, '127.0.0.1:8083'), $str, 'unix allow all'); +is(stream('127.0.0.1:8081')->io($str), $str, 'inet allow all'); +is(stream('127.0.0.1:8082')->io($str), $str, 'inet6 allow all'); +is(stream('127.0.0.1:8083')->io($str), $str, 'unix allow all'); # deny all -is(stream_get($str, '127.0.0.1:8084'), '', 'inet deny all'); -is(stream_get($str, '127.0.0.1:8085'), '', 'inet6 deny all'); -is(stream_get($str, '127.0.0.1:8086'), '', 'unix deny all'); +is(stream('127.0.0.1:8084')->io($str), '', 'inet deny all'); +is(stream('127.0.0.1:8085')->io($str), '', 'inet6 deny all'); +is(stream('127.0.0.1:8086')->io($str), '', 'unix deny all'); # allow unix -is(stream_get($str, '127.0.0.1:8087'), $str, 'inet allow unix'); -is(stream_get($str, '127.0.0.1:8088'), $str, 'inet6 allow unix'); -is(stream_get($str, '127.0.0.1:8089'), $str, 'unix allow unix'); +is(stream('127.0.0.1:8087')->io($str), $str, 'inet allow unix'); +is(stream('127.0.0.1:8088')->io($str), $str, 'inet6 allow unix'); +is(stream('127.0.0.1:8089')->io($str), $str, 'unix allow unix'); # deny inet -is(stream_get($str, '127.0.0.1:8090'), '', 'inet deny inet'); -is(stream_get($str, '127.0.0.1:8091'), $str, 'inet6 deny inet'); -is(stream_get($str, '127.0.0.1:8092'), $str, 'unix deny inet'); +is(stream('127.0.0.1:8090')->io($str), '', 'inet deny inet'); +is(stream('127.0.0.1:8091')->io($str), $str, 'inet6 deny inet'); +is(stream('127.0.0.1:8092')->io($str), $str, 'unix deny inet'); # deny inet6 -is(stream_get($str, '127.0.0.1:8093'), $str, 'inet deny inet6'); -is(stream_get($str, '127.0.0.1:8094'), '', 'inet6 deny inet6'); -is(stream_get($str, '127.0.0.1:8095'), $str, 'unix deny inet6'); +is(stream('127.0.0.1:8093')->io($str), $str, 'inet deny inet6'); +is(stream('127.0.0.1:8094')->io($str), '', 'inet6 deny inet6'); +is(stream('127.0.0.1:8095')->io($str), $str, 'unix deny inet6'); # deny unix -is(stream_get($str, '127.0.0.1:8096'), $str, 'inet deny unix'); -is(stream_get($str, '127.0.0.1:8097'), $str, 'inet6 deny unix'); -is(stream_get($str, '127.0.0.1:8098'), '', 'unix deny unix'); - -############################################################################### - -sub stream_get { - my ($data, $peer) = @_; - - my $s = stream_connect($peer); - stream_write($s, $data); - - $data = ''; - while (my $buf = stream_read($s)) { - $data .= $buf; - } - - return $data; -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer || '127.0.0.1:8080' - ) - or die "Can't connect to nginx: $!\n"; - - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} +is(stream('127.0.0.1:8096')->io($str), $str, 'inet deny unix'); +is(stream('127.0.0.1:8097')->io($str), $str, 'inet6 deny unix'); +is(stream('127.0.0.1:8098')->io($str), '', 'unix deny unix'); ############################################################################### diff --git a/stream_error_log.t b/stream_error_log.t --- a/stream_error_log.t +++ b/stream_error_log.t @@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -87,7 +88,7 @@ open STDERR, ">&", \*OLDERR; ############################################################################### -stream_get('data'); +stream()->io('data'); # error_log levels @@ -163,62 +164,6 @@ sub levels { return \%levels_hash; } -sub stream_get { - my ($data, $peer) = @_; - - $peer = '127.0.0.1:8080' unless defined $peer; - my $s = stream_connect($peer); - stream_write($s, $data); - - $data = ''; - while (my $buf = stream_read($s)) { - $data .= $buf; - } - return $data; -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer || '127.0.0.1:8080' - ) - or die "Can't connect to nginx: $!\n"; - - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} - sub get_syslog { my ($data, $peer, $port) = @_; my ($s); @@ -242,7 +187,7 @@ sub get_syslog { return undef; } - stream_get($data, $peer); + stream($peer)->io($data); $data = ''; IO::Select->new($s)->can_read(1.5); diff --git a/stream_limit_rate.t b/stream_limit_rate.t --- a/stream_limit_rate.t +++ b/stream_limit_rate.t @@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -95,10 +96,10 @@ EOF my $str = '1234567890' x 100; -my %r = stream_get($str, peer => '127.0.0.1:8081'); +my %r = response($str, peer => '127.0.0.1:8081'); is($r{'data'}, $str, 'exact limit'); -%r = stream_get($str, peer => '127.0.0.1:8082'); +%r = response($str, peer => '127.0.0.1:8082'); is($r{'data'}, $str, 'unlimited'); SKIP: { @@ -107,10 +108,10 @@ skip 'unsafe on VM', 2 unless $ENV{TEST_ # if interaction between backend and client is slow then proxy can add extra # bytes to upload/download data -%r = stream_get($str, peer => '127.0.0.1:8083', readonce => 1); +%r = response($str, peer => '127.0.0.1:8083', readonce => 1); is($r{'data'}, '1', 'download - one byte'); -%r = stream_get($str, peer => '127.0.0.1:8084'); +%r = response($str, peer => '127.0.0.1:8084'); is($r{'data'}, '1', 'upload - one byte'); } @@ -119,75 +120,37 @@ is($r{'data'}, '1', 'upload - one byte') # the first four chunks are quarters of test string # and the fifth one is some extra data from backend. -%r = stream_get($str, peer => '127.0.0.1:8085'); +%r = response($str, peer => '127.0.0.1:8085'); my $diff = time() - $r{'time'}; cmp_ok($diff, '>=', 4, 'download - time'); is($r{'data'}, $str, 'download - data'); my $time = time(); -%r = stream_get($str . 'close', peer => '127.0.0.1:8086'); +%r = response($str . 'close', peer => '127.0.0.1:8086'); $diff = time() - $time; cmp_ok($diff, '>=', 4, 'upload - time'); is($r{'data'}, $str . 'close', 'upload - data'); ############################################################################### -sub stream_get { - my ($data, %extra) = @_; +sub response { + my ($data, %extra) = @_; - my $s = stream_connect($extra{'peer'}); - stream_write($s, $data); + my $s = stream($extra{peer}); + $s->write($data); $data = ''; - while (my $buf = stream_read($s)) { + while (1) { + my $buf = $s->read(); + last unless length($buf); + $data .= $buf; + last if $extra{'readonce'}; } $data =~ /([\S]*)\s?(\d+)?/; - return ('data' => $1, 'time' => $2); -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer - ) - or die "Can't connect to nginx: $!\n"; - - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; + return ('data' => $1, 'time' => $2) } ############################################################################### diff --git a/stream_proxy.t b/stream_proxy.t --- a/stream_proxy.t +++ b/stream_proxy.t @@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -49,70 +50,18 @@ EOF ############################################################################### -my $s = stream_connect(); - -stream_write($s, 'foo1'); -is(stream_read($s), 'bar1', 'proxy connection'); - -stream_write($s, 'foo3'); -is(stream_read($s), 'bar3', 'proxy connection again'); +my $s = stream(); -stream_write($s, 'close'); -is(stream_read($s), 'close', 'proxy connection close'); +is($s->io('foo1', length => 4), 'bar1', 'proxy connection'); +is($s->io('foo3', length => 4), 'bar3', 'proxy connection again'); +is($s->io('close'), 'close', 'proxy connection close'); +is($s->io('test'), '', 'proxy connection closed'); -stream_write($s, 'test'); -is(stream_read($s), '', 'proxy connection closed'); - -$s = stream_connect(); +$s = stream(); sleep 3; -stream_write($s, 'foo'); -is(stream_read($s), 'bar', 'proxy connect timeout'); - -############################################################################### - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer || '127.0.0.1:8080' - ) - or die "Can't connect to nginx: $!\n"; - - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} +is($s->io('foo', length => 3), 'bar', 'proxy connect timeout'); ############################################################################### diff --git a/stream_proxy_next_upstream.t b/stream_proxy_next_upstream.t --- a/stream_proxy_next_upstream.t +++ b/stream_proxy_next_upstream.t @@ -12,12 +12,11 @@ use strict; use Test::More; -use IO::Select; - BEGIN { use FindBin; chdir($FindBin::Bin); } use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -76,69 +75,12 @@ EOF ############################################################################### -is(stream_get('.', '127.0.0.1:8081'), '', 'next upstream off'); -is(stream_get('.', '127.0.0.1:8082'), 'SEE-THIS', 'next upstream on'); +is(stream('127.0.0.1:8081')->io('.'), '', 'next off'); +is(stream('127.0.0.1:8082')->io('.'), 'SEE-THIS', 'next on'); # make sure backup is not tried -is(stream_get('.', '127.0.0.1:8083'), '', 'next upstream tries'); - -############################################################################### - -sub stream_get { - my ($data, $peer) = @_; - - my $s = stream_connect($peer); - stream_write($s, $data); - - $data = ''; - while (my $buf = stream_read($s)) { - $data .= $buf; - } - return $data; -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer - ) - or die "Can't connect to nginx: $!\n"; - - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} +is(stream('127.0.0.1:8083')->io('.'), '', 'next tries'); ############################################################################### diff --git a/stream_proxy_protocol.t b/stream_proxy_protocol.t --- a/stream_proxy_protocol.t +++ b/stream_proxy_protocol.t @@ -13,12 +13,13 @@ use strict; use Test::More; use IO::Select; -use Socket qw/ CRLF /; +use Socket qw/ $CRLF /; BEGIN { use FindBin; chdir($FindBin::Bin); } use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -58,71 +59,12 @@ EOF ############################################################################### -my %r = stream_get('close'); -is($r{'data'}, "PROXY TCP4 127.0.0.1 127.0.0.1 $r{'sp'} 8080" . CRLF . "close", - 'protocol on'); - -%r = stream_get('close', '127.0.0.1:8082'); -is($r{'data'}, 'close', 'protocol off'); - -############################################################################### - -sub stream_get { - my ($data, $peer) = @_; - - my $s = stream_connect($peer); - my $sockport = $s->sockport(); - stream_write($s, $data); - - $data = ''; - while (my $buf = stream_read($s)) { - $data .= $buf; - } - - return ('data' => $data, 'sp' => $sockport); -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer || '127.0.0.1:8080' - ) - or die "Can't connect to nginx: $!\n"; +my $s = stream(); +my $data = $s->io('close'); +my $sp = $s->sockport(); +is($data, "PROXY TCP4 127.0.0.1 127.0.0.1 $sp 8080${CRLF}close", 'protocol on'); - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} +is(stream('127.0.0.1:8082')->io('close'), 'close', 'protocol off'); ############################################################################### diff --git a/stream_proxy_protocol_ipv6.t b/stream_proxy_protocol_ipv6.t --- a/stream_proxy_protocol_ipv6.t +++ b/stream_proxy_protocol_ipv6.t @@ -19,6 +19,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -66,66 +67,9 @@ EOF ############################################################################### -like(stream_get('close'), qr/PROXY TCP6 ::1 ::1 \d+ 8080$CRLF/, 'protocol on'); -unlike(stream_get('close', '127.0.0.1:8081'), qr/PROXY/, 'protocol off'); - -############################################################################### - -sub stream_get { - my ($data, $peer) = @_; - - my $s = stream_connect($peer); - stream_write($s, $data); - - $data = ''; - while (my $buf = stream_read($s)) { - $data .= $buf; - } - - return $data; -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer || '127.0.0.1:8080' - ) - or die "Can't connect to nginx: $!\n"; - - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} +like(stream()->io('close'), qr/PROXY TCP6 ::1 ::1 \d+ 8080$CRLF/, + 'protocol on'); +unlike(stream('127.0.0.1:8081')->io('close'), qr/PROXY/, 'protocol off'); ############################################################################### diff --git a/stream_tcp_nodelay.t b/stream_tcp_nodelay.t --- a/stream_tcp_nodelay.t +++ b/stream_tcp_nodelay.t @@ -19,6 +19,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -62,68 +63,12 @@ EOF ############################################################################### my $str = '1234567890' x 10 . 'F'; - -is(stream_get($str, '127.0.0.1:8081'), $str, 'tcp_nodelay off'); -is(stream_get($str, '127.0.0.1:8082'), $str, 'tcp_nodelay on'); - -############################################################################### - -sub stream_get { - my ($data, $peer) = @_; - my $data_length = length $data; - - my $s = stream_connect($peer); - stream_write($s, $data); - - $data = ''; - while (length $data < $data_length) { - my $buf = stream_read($s); - $data .= $buf; - } - return $data; -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer || '127.0.0.1:8080' - ) - or die "Can't connect to nginx: $!\n"; +my $length = length($str); - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} +is(stream('127.0.0.1:8081')->io($str, length => $length), $str, + 'tcp_nodelay off'); +is(stream('127.0.0.1:8082')->io($str, length => $length), $str, + 'tcp_nodelay on'); ############################################################################### diff --git a/stream_upstream.t b/stream_upstream.t --- a/stream_upstream.t +++ b/stream_upstream.t @@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -105,7 +106,7 @@ sub many { $peer = $opts{peer}; for (1 .. $count) { - if (stream_get($data, $peer) =~ /(\d+)/) { + if (stream($peer)->io($data) =~ /(\d+)/) { $ports{$1} = 0 unless defined $ports{$1}; $ports{$1}++; } @@ -114,61 +115,6 @@ sub many { return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports; } -sub stream_get { - my ($data, $peer) = @_; - - my $s = stream_connect($peer); - stream_write($s, $data); - - $data = ''; - while (my $buf = stream_read($s)) { - $data .= $buf; - } - return $data; -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer - ) - or die "Can't connect to nginx: $!\n"; - - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} - ############################################################################### sub stream_daemon { diff --git a/stream_upstream_hash.t b/stream_upstream_hash.t --- a/stream_upstream_hash.t +++ b/stream_upstream_hash.t @@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -82,7 +83,7 @@ sub many { $peer = $opts{peer}; for (1 .. $count) { - if (stream_get($data, $peer) =~ /(\d+)/) { + if (stream($peer)->io($data) =~ /(\d+)/) { $ports{$1} = 0 unless defined $ports{$1}; $ports{$1}++; } @@ -91,61 +92,6 @@ sub many { return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports; } -sub stream_get { - my ($data, $peer) = @_; - - my $s = stream_connect($peer); - stream_write($s, $data); - - $data = ''; - while (my $buf = stream_read($s)) { - $data .= $buf; - } - return $data; -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer || '127.0.0.1:8080' - ) - or die "Can't connect to nginx: $!\n"; - - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} - ############################################################################### sub stream_daemon { diff --git a/stream_upstream_least_conn.t b/stream_upstream_least_conn.t --- a/stream_upstream_least_conn.t +++ b/stream_upstream_least_conn.t @@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin use lib 'lib'; use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; ############################################################################### @@ -62,8 +63,8 @@ is(many('.', 10), '8081: 5, 8082: 5', 'b my @sockets; for (1 .. 2) { - my $s = stream_connect(); - stream_write($s, 'w'); + my $s = stream(); + $s->write('w'); push @sockets, $s; } @@ -75,12 +76,10 @@ is(many('.', 10), '8082: 10', 'least_con sub many { my ($data, $count, %opts) = @_; - my (%ports, $peer); - - $peer = $opts{peer}; + my (%ports); for (1 .. $count) { - if (stream_get($data, $peer) =~ /(\d+)/) { + if (stream()->io($data) =~ /(\d+)/) { $ports{$1} = 0 unless defined $ports{$1}; $ports{$1}++; } @@ -89,61 +88,6 @@ sub many { return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports; } -sub stream_get { - my ($data, $peer) = @_; - - my $s = stream_connect($peer); - stream_write($s, $data); - - $data = ''; - while (my $buf = stream_read($s)) { - $data .= $buf; - } - return $data; -} - -sub stream_connect { - my $peer = shift; - my $s = IO::Socket::INET->new( - Proto => 'tcp', - PeerAddr => $peer || '127.0.0.1:8080' - ) - or die "Can't connect to nginx: $!\n"; - - return $s; -} - -sub stream_write { - my ($s, $message) = @_; - - local $SIG{PIPE} = 'IGNORE'; - - $s->blocking(0); - while (IO::Select->new($s)->can_write(1.5)) { - my $n = $s->syswrite($message); - last unless $n; - $message = substr($message, $n); - last unless length $message; - } - - if (length $message) { - $s->close(); - } -} - -sub stream_read { - my ($s) = @_; - my ($buf); - - $s->blocking(0); - if (IO::Select->new($s)->can_read(5)) { - $s->sysread($buf, 1024); - }; - - log_in($buf); - return $buf; -} - ############################################################################### sub stream_daemon {