changeset 1785:2c08bfffe112

Tests: added stream js test for s.send() in async context.
author Dmitry Volyntsev <xeioex@nginx.com>
date Tue, 23 Aug 2022 19:40:07 -0700
parents 898598722ad4
children cadf15e2e2b3
files stream_js_fetch.t
diffstat 1 files changed, 93 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/stream_js_fetch.t
+++ b/stream_js_fetch.t
@@ -67,6 +67,12 @@ stream {
         js_preread  test.preread_verify;
         proxy_pass  127.0.0.1:8090;
     }
+
+    server {
+        listen      127.0.0.1:8082;
+        js_filter   test.filter_verify;
+        proxy_pass  127.0.0.1:8091;
+    }
 }
 
 EOF
@@ -103,13 +109,37 @@ my $p = port(8080);
         });
     }
 
-    export default {njs: test_njs, validate, preread_verify}
+    function filter_verify(s) {
+        var collect = Buffer.from([]);
+
+        s.on('upstream', async function (data, flags) {
+            collect = Buffer.concat([collect, data]);
+
+            if (collect.length >= 4 && collect.readUInt16BE(0) == 0xabcd) {
+                s.off('upstream');
+
+                let reply = await ngx.fetch('http://127.0.0.1:$p/validate',
+                                            {body: collect.slice(2,4),
+                                             headers: {Host:'aaa'}});
+
+                if (reply.status == 200) {
+                    s.send(collect.slice(4), flags);
+
+                } else {
+                    s.send("__CLOSE__", flags);
+                }
+            }
+        });
+    }
+
+    export default {njs: test_njs, validate, preread_verify, filter_verify};
 EOF
 
-$t->try_run('no stream njs available')->plan(4);
+$t->try_run('no stream njs available')->plan(7);
 
-$t->run_daemon(\&stream_daemon, port(8090));
+$t->run_daemon(\&stream_daemon, port(8090), port(8091));
 $t->waitforsocket('127.0.0.1:' . port(8090));
+$t->waitforsocket('127.0.0.1:' . port(8091));
 
 ###############################################################################
 
@@ -124,34 +154,77 @@ is(stream('127.0.0.1:' . port(8081))->io
 is(stream('127.0.0.1:' . port(8081))->io("\xAB\xCDQQ##"), '',
 	'preread validation failed');
 
+is(stream('127.0.0.1:' . port(8082))->io("\xAB\xCDQQ##"), '',
+	'filter validation failed');
+
+my $s = stream('127.0.0.1:' . port(8082));
+
+TODO: {
+local $TODO = 'not yet'
+	unless http_get('/njs') =~ /^([.0-9]+)$/m && $1 ge '0.7.7';
+
+is($s->io("\xAB\xCDQZ##", read => 1), '##', 'filter validated');
+
+}
+
+is($s->io("@@", read => 1), '@@', 'filter off');
+
 ###############################################################################
 
 sub stream_daemon {
-	my $server = IO::Socket::INET->new(
-		Proto => 'tcp',
-		LocalAddr => '127.0.0.1:' . port(8090),
-		Listen => 5,
-		Reuse => 1
-	)
-		or die "Can't create listening socket: $!\n";
+	my (@ports) = @_;
+	my (@socks, @clients);
+
+	for my $port (@ports) {
+		my $server = IO::Socket::INET->new(
+			Proto => 'tcp',
+			LocalAddr => "127.0.0.1:$port",
+			Listen => 5,
+			Reuse => 1
+		)
+			or die "Can't create listening socket: $!\n";
+		push @socks, $server;
+	}
+
+	my $sel = IO::Select->new(@socks);
 
 	local $SIG{PIPE} = 'IGNORE';
 
-	while (my $client = $server->accept()) {
-		$client->autoflush(1);
+	while (my @ready = $sel->can_read) {
+		foreach my $fh (@ready) {
+			if (grep $_ == $fh, @socks) {
+				my $new = $fh->accept;
+				$new->autoflush(1);
+				$sel->add($new);
 
-		log2c("(new connection $client)");
-
-		$client->sysread(my $buffer, 65536) or next;
+			} elsif (stream_handle_client($fh)
+				|| $fh->sockport() == port(8090))
+			{
+				$sel->remove($fh);
+				$fh->close;
+			}
+		}
+	}
+}
 
-		log2i("$client $buffer");
+sub stream_handle_client {
+	my ($client) = @_;
 
-		log2o("$client $buffer");
+	log2c("(new connection $client)");
+
+	$client->sysread(my $buffer, 65536) or return 1;
+
+	log2i("$client $buffer");
 
-		$client->syswrite($buffer);
+	if ($buffer eq "__CLOSE__") {
+		return 1;
+	}
 
-		close $client;
-	}
+	log2o("$client $buffer");
+
+	$client->syswrite($buffer);
+
+	return 0;
 }
 
 sub log2i { Test::Nginx::log_core('|| <<', @_); }