changeset 816:77359b849cd5

Tests: stream package.
author Andrey Zelenkov <zelenkov@nginx.com>
date Mon, 20 Jul 2015 15:06:09 +0300
parents 9f5f604a840e
children ada7d1ad985b
files lib/Test/Nginx/Stream.pm stream_access.t stream_error_log.t stream_limit_rate.t stream_proxy.t stream_proxy_next_upstream.t stream_proxy_protocol.t stream_proxy_protocol_ipv6.t stream_tcp_nodelay.t stream_upstream.t stream_upstream_hash.t stream_upstream_least_conn.t
diffstat 12 files changed, 192 insertions(+), 670 deletions(-) [+]
line wrap: on
line diff
new file mode 100644
--- /dev/null
+++ b/lib/Test/Nginx/Stream.pm
@@ -0,0 +1,115 @@
+package Test::Nginx::Stream;
+
+# (C) Andrey Zelenkov
+# (C) Nginx, Inc.
+
+# Module for nginx stream tests.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use base qw/ Exporter /;
+our @EXPORT_OK = qw/ stream /;
+
+use Test::More qw//;
+use IO::Select;
+use IO::Socket;
+
+use Test::Nginx;
+
+sub stream {
+	return Test::Nginx::Stream->new(@_);
+}
+
+sub new {
+	my $self = {};
+	bless $self, shift @_;
+
+	unshift(@_, "PeerAddr") if @_ == 1;
+
+	$self->{_socket} = IO::Socket::INET->new(
+		Proto => "tcp",
+		PeerAddr => '127.0.0.1:8080',
+		@_
+	)
+		or die "Can't connect to nginx: $!\n";
+
+	if ({@_}->{'SSL'}) {
+		require IO::Socket::SSL;
+		IO::Socket::SSL->start_SSL($self->{_socket}, @_)
+			or die $IO::Socket::SSL::SSL_ERROR . "\n";
+	}
+
+	$self->{_socket}->autoflush(1);
+
+	return $self;
+}
+
+sub write {
+	my ($self, $message) = @_;
+	my $s = $self->{_socket};
+
+	local $SIG{PIPE} = 'IGNORE';
+
+	$s->blocking(0);
+	while (IO::Select->new($s)->can_write(1.5)) {
+		my $n = $s->syswrite($message);
+		log_out(substr($message, 0, $n));
+		last unless $n;
+
+		$message = substr($message, $n);
+		last unless length $message;
+	}
+
+	if (length $message) {
+		$s->close();
+	}
+}
+
+sub read {
+	my ($self) = @_;
+	my ($s, $buf);
+
+	$s = $self->{_socket};
+
+	$s->blocking(0);
+	if (IO::Select->new($s)->can_read(5)) {
+		$s->sysread($buf, 1024);
+	};
+
+	log_in($buf);
+	return $buf;
+}
+
+sub io {
+	my $self = shift;
+
+	my ($data, %extra) = @_;
+	my $length = $extra{length};
+
+	$self->write($data);
+
+	$data = '';
+	while (1) {
+		my $buf = $self->read();
+		last unless length($buf);
+
+		$data .= $buf;
+		last if defined $length && length($data) >= $length;
+	}
+
+	return $data;
+}
+
+sub sockport {
+	my $self = shift;
+	return $self->{_socket}->sockport();
+}
+
+###############################################################################
+
+1;
+
+###############################################################################
--- a/stream_access.t
+++ b/stream_access.t
@@ -12,12 +12,11 @@ use strict;
 
 use Test::More;
 
-use IO::Select;
-
 BEGIN { use FindBin; chdir($FindBin::Bin); }
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -159,97 +158,39 @@ my $str = 'SEE-THIS';
 
 # allow all
 
-is(stream_get($str, '127.0.0.1:8081'), $str, 'inet allow all');
-is(stream_get($str, '127.0.0.1:8082'), $str, 'inet6 allow all');
-is(stream_get($str, '127.0.0.1:8083'), $str, 'unix allow all');
+is(stream('127.0.0.1:8081')->io($str), $str, 'inet allow all');
+is(stream('127.0.0.1:8082')->io($str), $str, 'inet6 allow all');
+is(stream('127.0.0.1:8083')->io($str), $str, 'unix allow all');
 
 # deny all
 
-is(stream_get($str, '127.0.0.1:8084'), '', 'inet deny all');
-is(stream_get($str, '127.0.0.1:8085'), '', 'inet6 deny all');
-is(stream_get($str, '127.0.0.1:8086'), '', 'unix deny all');
+is(stream('127.0.0.1:8084')->io($str), '', 'inet deny all');
+is(stream('127.0.0.1:8085')->io($str), '', 'inet6 deny all');
+is(stream('127.0.0.1:8086')->io($str), '', 'unix deny all');
 
 # allow unix
 
-is(stream_get($str, '127.0.0.1:8087'), $str, 'inet allow unix');
-is(stream_get($str, '127.0.0.1:8088'), $str, 'inet6 allow unix');
-is(stream_get($str, '127.0.0.1:8089'), $str, 'unix allow unix');
+is(stream('127.0.0.1:8087')->io($str), $str, 'inet allow unix');
+is(stream('127.0.0.1:8088')->io($str), $str, 'inet6 allow unix');
+is(stream('127.0.0.1:8089')->io($str), $str, 'unix allow unix');
 
 # deny inet
 
-is(stream_get($str, '127.0.0.1:8090'), '', 'inet deny inet');
-is(stream_get($str, '127.0.0.1:8091'), $str, 'inet6 deny inet');
-is(stream_get($str, '127.0.0.1:8092'), $str, 'unix deny inet');
+is(stream('127.0.0.1:8090')->io($str), '', 'inet deny inet');
+is(stream('127.0.0.1:8091')->io($str), $str, 'inet6 deny inet');
+is(stream('127.0.0.1:8092')->io($str), $str, 'unix deny inet');
 
 # deny inet6
 
-is(stream_get($str, '127.0.0.1:8093'), $str, 'inet deny inet6');
-is(stream_get($str, '127.0.0.1:8094'), '', 'inet6 deny inet6');
-is(stream_get($str, '127.0.0.1:8095'), $str, 'unix deny inet6');
+is(stream('127.0.0.1:8093')->io($str), $str, 'inet deny inet6');
+is(stream('127.0.0.1:8094')->io($str), '', 'inet6 deny inet6');
+is(stream('127.0.0.1:8095')->io($str), $str, 'unix deny inet6');
 
 # deny unix
 
-is(stream_get($str, '127.0.0.1:8096'), $str, 'inet deny unix');
-is(stream_get($str, '127.0.0.1:8097'), $str, 'inet6 deny unix');
-is(stream_get($str, '127.0.0.1:8098'), '', 'unix deny unix');
-
-###############################################################################
-
-sub stream_get {
-	my ($data, $peer) = @_;
-
-	my $s = stream_connect($peer);
-	stream_write($s, $data);
-
-	$data = '';
-	while (my $buf = stream_read($s)) {
-		$data .= $buf;
-	}
-
-	return $data;
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer || '127.0.0.1:8080'
-	)
-		or die "Can't connect to nginx: $!\n";
-
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
+is(stream('127.0.0.1:8096')->io($str), $str, 'inet deny unix');
+is(stream('127.0.0.1:8097')->io($str), $str, 'inet6 deny unix');
+is(stream('127.0.0.1:8098')->io($str), '', 'unix deny unix');
 
 ###############################################################################
 
--- a/stream_error_log.t
+++ b/stream_error_log.t
@@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -87,7 +88,7 @@ open STDERR, ">&", \*OLDERR;
 
 ###############################################################################
 
-stream_get('data');
+stream()->io('data');
 
 # error_log levels
 
@@ -163,62 +164,6 @@ sub levels {
 	return \%levels_hash;
 }
 
-sub stream_get {
-	my ($data, $peer) = @_;
-
-	$peer = '127.0.0.1:8080' unless defined $peer;
-	my $s = stream_connect($peer);
-	stream_write($s, $data);
-
-	$data = '';
-	while (my $buf = stream_read($s)) {
-		$data .= $buf;
-	}
-	return $data;
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer || '127.0.0.1:8080'
-	)
-		or die "Can't connect to nginx: $!\n";
-
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
-
 sub get_syslog {
 	my ($data, $peer, $port) = @_;
 	my ($s);
@@ -242,7 +187,7 @@ sub get_syslog {
 		return undef;
 	}
 
-	stream_get($data, $peer);
+	stream($peer)->io($data);
 	$data = '';
 
 	IO::Select->new($s)->can_read(1.5);
--- a/stream_limit_rate.t
+++ b/stream_limit_rate.t
@@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -95,10 +96,10 @@ EOF
 
 my $str = '1234567890' x 100;
 
-my %r = stream_get($str, peer => '127.0.0.1:8081');
+my %r = response($str, peer => '127.0.0.1:8081');
 is($r{'data'}, $str, 'exact limit');
 
-%r = stream_get($str, peer => '127.0.0.1:8082');
+%r = response($str, peer => '127.0.0.1:8082');
 is($r{'data'}, $str, 'unlimited');
 
 SKIP: {
@@ -107,10 +108,10 @@ skip 'unsafe on VM', 2 unless $ENV{TEST_
 # if interaction between backend and client is slow then proxy can add extra
 # bytes to upload/download data
 
-%r = stream_get($str, peer => '127.0.0.1:8083', readonce => 1);
+%r = response($str, peer => '127.0.0.1:8083', readonce => 1);
 is($r{'data'}, '1', 'download - one byte');
 
-%r = stream_get($str, peer =>  '127.0.0.1:8084');
+%r = response($str, peer =>  '127.0.0.1:8084');
 is($r{'data'}, '1', 'upload - one byte');
 
 }
@@ -119,75 +120,37 @@ is($r{'data'}, '1', 'upload - one byte')
 # the first four chunks are quarters of test string
 # and the fifth one is some extra data from backend.
 
-%r = stream_get($str, peer =>  '127.0.0.1:8085');
+%r = response($str, peer =>  '127.0.0.1:8085');
 my $diff = time() - $r{'time'};
 cmp_ok($diff, '>=', 4, 'download - time');
 is($r{'data'}, $str, 'download - data');
 
 my $time = time();
-%r = stream_get($str . 'close', peer => '127.0.0.1:8086');
+%r = response($str . 'close', peer => '127.0.0.1:8086');
 $diff = time() - $time;
 cmp_ok($diff, '>=', 4, 'upload - time');
 is($r{'data'}, $str . 'close', 'upload - data');
 
 ###############################################################################
 
-sub stream_get {
-	my ($data, %extra) = @_;
+sub response {
+ 	my ($data, %extra) = @_;
 
-	my $s = stream_connect($extra{'peer'});
-	stream_write($s, $data);
+	my $s = stream($extra{peer});
+	$s->write($data);
 
 	$data = '';
-	while (my $buf = stream_read($s)) {
+	while (1) {
+		my $buf = $s->read();
+		last unless length($buf);
+
 		$data .= $buf;
+
 		last if $extra{'readonce'};
 	}
 	$data =~ /([\S]*)\s?(\d+)?/;
 
-	return ('data' => $1, 'time' => $2);
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer
-	)
-		or die "Can't connect to nginx: $!\n";
-
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
+	return ('data' => $1, 'time' => $2)
 }
 
 ###############################################################################
--- a/stream_proxy.t
+++ b/stream_proxy.t
@@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -49,70 +50,18 @@ EOF
 
 ###############################################################################
 
-my $s = stream_connect();
-
-stream_write($s, 'foo1');
-is(stream_read($s), 'bar1', 'proxy connection');
-
-stream_write($s, 'foo3');
-is(stream_read($s), 'bar3', 'proxy connection again');
+my $s = stream();
 
-stream_write($s, 'close');
-is(stream_read($s), 'close', 'proxy connection close');
+is($s->io('foo1', length => 4), 'bar1', 'proxy connection');
+is($s->io('foo3', length => 4), 'bar3', 'proxy connection again');
+is($s->io('close'), 'close', 'proxy connection close');
+is($s->io('test'), '', 'proxy connection closed');
 
-stream_write($s, 'test');
-is(stream_read($s), '', 'proxy connection closed');
-
-$s = stream_connect();
+$s = stream();
 
 sleep 3;
 
-stream_write($s, 'foo');
-is(stream_read($s), 'bar', 'proxy connect timeout');
-
-###############################################################################
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer || '127.0.0.1:8080'
-	)
-		or die "Can't connect to nginx: $!\n";
-
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
+is($s->io('foo', length => 3), 'bar', 'proxy connect timeout');
 
 ###############################################################################
 
--- a/stream_proxy_next_upstream.t
+++ b/stream_proxy_next_upstream.t
@@ -12,12 +12,11 @@ use strict;
 
 use Test::More;
 
-use IO::Select;
-
 BEGIN { use FindBin; chdir($FindBin::Bin); }
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -76,69 +75,12 @@ EOF
 
 ###############################################################################
 
-is(stream_get('.', '127.0.0.1:8081'), '', 'next upstream off');
-is(stream_get('.', '127.0.0.1:8082'), 'SEE-THIS', 'next upstream on');
+is(stream('127.0.0.1:8081')->io('.'), '', 'next off');
+is(stream('127.0.0.1:8082')->io('.'), 'SEE-THIS', 'next on');
 
 # make sure backup is not tried
 
-is(stream_get('.', '127.0.0.1:8083'), '', 'next upstream tries');
-
-###############################################################################
-
-sub stream_get {
-	my ($data, $peer) = @_;
-
-	my $s = stream_connect($peer);
-	stream_write($s, $data);
-
-	$data = '';
-	while (my $buf = stream_read($s)) {
-		$data .= $buf;
-	}
-	return $data;
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer
-	)
-		or die "Can't connect to nginx: $!\n";
-
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
+is(stream('127.0.0.1:8083')->io('.'), '', 'next tries');
 
 ###############################################################################
 
--- a/stream_proxy_protocol.t
+++ b/stream_proxy_protocol.t
@@ -13,12 +13,13 @@ use strict;
 use Test::More;
 
 use IO::Select;
-use Socket qw/ CRLF /;
+use Socket qw/ $CRLF /;
 
 BEGIN { use FindBin; chdir($FindBin::Bin); }
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -58,71 +59,12 @@ EOF
 
 ###############################################################################
 
-my %r = stream_get('close');
-is($r{'data'}, "PROXY TCP4 127.0.0.1 127.0.0.1 $r{'sp'} 8080" . CRLF . "close",
-	'protocol on');
-
-%r = stream_get('close', '127.0.0.1:8082');
-is($r{'data'}, 'close', 'protocol off');
-
-###############################################################################
-
-sub stream_get {
-	my ($data, $peer) = @_;
-
-	my $s = stream_connect($peer);
-	my $sockport = $s->sockport();
-	stream_write($s, $data);
-
-	$data = '';
-	while (my $buf = stream_read($s)) {
-		$data .= $buf;
-	}
-
-	return ('data' => $data, 'sp' => $sockport);
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer || '127.0.0.1:8080'
-	)
-		or die "Can't connect to nginx: $!\n";
+my $s = stream();
+my $data = $s->io('close');
+my $sp = $s->sockport();
+is($data, "PROXY TCP4 127.0.0.1 127.0.0.1 $sp 8080${CRLF}close", 'protocol on');
 
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
+is(stream('127.0.0.1:8082')->io('close'), 'close', 'protocol off');
 
 ###############################################################################
 
--- a/stream_proxy_protocol_ipv6.t
+++ b/stream_proxy_protocol_ipv6.t
@@ -19,6 +19,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -66,66 +67,9 @@ EOF
 
 ###############################################################################
 
-like(stream_get('close'), qr/PROXY TCP6 ::1 ::1 \d+ 8080$CRLF/, 'protocol on');
-unlike(stream_get('close', '127.0.0.1:8081'), qr/PROXY/, 'protocol off');
-
-###############################################################################
-
-sub stream_get {
-	my ($data, $peer) = @_;
-
-	my $s = stream_connect($peer);
-	stream_write($s, $data);
-
-	$data = '';
-	while (my $buf = stream_read($s)) {
-		$data .= $buf;
-	}
-
-	return $data;
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer || '127.0.0.1:8080'
-	)
-		or die "Can't connect to nginx: $!\n";
-
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
+like(stream()->io('close'), qr/PROXY TCP6 ::1 ::1 \d+ 8080$CRLF/,
+	'protocol on');
+unlike(stream('127.0.0.1:8081')->io('close'), qr/PROXY/, 'protocol off');
 
 ###############################################################################
 
--- a/stream_tcp_nodelay.t
+++ b/stream_tcp_nodelay.t
@@ -19,6 +19,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -62,68 +63,12 @@ EOF
 ###############################################################################
 
 my $str = '1234567890' x 10 . 'F';
-
-is(stream_get($str, '127.0.0.1:8081'), $str, 'tcp_nodelay off');
-is(stream_get($str, '127.0.0.1:8082'), $str, 'tcp_nodelay on');
-
-###############################################################################
-
-sub stream_get {
-	my ($data, $peer) = @_;
-	my $data_length = length $data;
-
-	my $s = stream_connect($peer);
-	stream_write($s, $data);
-
-	$data = '';
-	while (length $data < $data_length) {
-		my $buf = stream_read($s);
-		$data .= $buf;
-	}
-	return $data;
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer || '127.0.0.1:8080'
-	)
-		or die "Can't connect to nginx: $!\n";
+my $length = length($str);
 
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
+is(stream('127.0.0.1:8081')->io($str, length => $length), $str,
+	'tcp_nodelay off');
+is(stream('127.0.0.1:8082')->io($str, length => $length), $str,
+	'tcp_nodelay on');
 
 ###############################################################################
 
--- a/stream_upstream.t
+++ b/stream_upstream.t
@@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -105,7 +106,7 @@ sub many {
 	$peer = $opts{peer};
 
 	for (1 .. $count) {
-		if (stream_get($data, $peer) =~ /(\d+)/) {
+		if (stream($peer)->io($data) =~ /(\d+)/) {
 			$ports{$1} = 0 unless defined $ports{$1};
 			$ports{$1}++;
 		}
@@ -114,61 +115,6 @@ sub many {
 	return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports;
 }
 
-sub stream_get {
-	my ($data, $peer) = @_;
-
-	my $s = stream_connect($peer);
-	stream_write($s, $data);
-
-	$data = '';
-	while (my $buf = stream_read($s)) {
-		$data .= $buf;
-	}
-	return $data;
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer
-	)
-		or die "Can't connect to nginx: $!\n";
-
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
-
 ###############################################################################
 
 sub stream_daemon {
--- a/stream_upstream_hash.t
+++ b/stream_upstream_hash.t
@@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -82,7 +83,7 @@ sub many {
 	$peer = $opts{peer};
 
 	for (1 .. $count) {
-		if (stream_get($data, $peer) =~ /(\d+)/) {
+		if (stream($peer)->io($data) =~ /(\d+)/) {
 			$ports{$1} = 0 unless defined $ports{$1};
 			$ports{$1}++;
 		}
@@ -91,61 +92,6 @@ sub many {
 	return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports;
 }
 
-sub stream_get {
-	my ($data, $peer) = @_;
-
-	my $s = stream_connect($peer);
-	stream_write($s, $data);
-
-	$data = '';
-	while (my $buf = stream_read($s)) {
-		$data .= $buf;
-	}
-	return $data;
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer || '127.0.0.1:8080'
-	)
-		or die "Can't connect to nginx: $!\n";
-
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
-
 ###############################################################################
 
 sub stream_daemon {
--- a/stream_upstream_least_conn.t
+++ b/stream_upstream_least_conn.t
@@ -18,6 +18,7 @@ BEGIN { use FindBin; chdir($FindBin::Bin
 
 use lib 'lib';
 use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
 
 ###############################################################################
 
@@ -62,8 +63,8 @@ is(many('.', 10), '8081: 5, 8082: 5', 'b
 
 my @sockets;
 for (1 .. 2) {
-	my $s = stream_connect();
-	stream_write($s, 'w');
+	my $s = stream();
+	$s->write('w');
 	push @sockets, $s;
 }
 
@@ -75,12 +76,10 @@ is(many('.', 10), '8082: 10', 'least_con
 
 sub many {
 	my ($data, $count, %opts) = @_;
-	my (%ports, $peer);
-
-	$peer = $opts{peer};
+	my (%ports);
 
 	for (1 .. $count) {
-		if (stream_get($data, $peer) =~ /(\d+)/) {
+		if (stream()->io($data) =~ /(\d+)/) {
 			$ports{$1} = 0 unless defined $ports{$1};
 			$ports{$1}++;
 		}
@@ -89,61 +88,6 @@ sub many {
 	return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports;
 }
 
-sub stream_get {
-	my ($data, $peer) = @_;
-
-	my $s = stream_connect($peer);
-	stream_write($s, $data);
-
-	$data = '';
-	while (my $buf = stream_read($s)) {
-		$data .= $buf;
-	}
-	return $data;
-}
-
-sub stream_connect {
-	my $peer = shift;
-	my $s = IO::Socket::INET->new(
-		Proto => 'tcp',
-		PeerAddr => $peer || '127.0.0.1:8080'
-	)
-		or die "Can't connect to nginx: $!\n";
-
-	return $s;
-}
-
-sub stream_write {
-	my ($s, $message) = @_;
-
-	local $SIG{PIPE} = 'IGNORE';
-
-	$s->blocking(0);
-	while (IO::Select->new($s)->can_write(1.5)) {
-		my $n = $s->syswrite($message);
-		last unless $n;
-		$message = substr($message, $n);
-		last unless length $message;
-	}
-
-	if (length $message) {
-		$s->close();
-	}
-}
-
-sub stream_read {
-	my ($s) = @_;
-	my ($buf);
-
-	$s->blocking(0);
-	if (IO::Select->new($s)->can_read(5)) {
-		$s->sysread($buf, 1024);
-	};
-
-	log_in($buf);
-	return $buf;
-}
-
 ###############################################################################
 
 sub stream_daemon {