# HG changeset patch # User Dmitry Volyntsev # Date 1661308807 25200 # Node ID 2c08bfffe112d53c20df2ab3cd0e27fb2fbe36d2 # Parent 898598722ad4e5eda501dca9ee4f0949a41c024a Tests: added stream js test for s.send() in async context. diff --git a/stream_js_fetch.t b/stream_js_fetch.t --- a/stream_js_fetch.t +++ b/stream_js_fetch.t @@ -67,6 +67,12 @@ stream { js_preread test.preread_verify; proxy_pass 127.0.0.1:8090; } + + server { + listen 127.0.0.1:8082; + js_filter test.filter_verify; + proxy_pass 127.0.0.1:8091; + } } EOF @@ -103,13 +109,37 @@ my $p = port(8080); }); } - export default {njs: test_njs, validate, preread_verify} + function filter_verify(s) { + var collect = Buffer.from([]); + + s.on('upstream', async function (data, flags) { + collect = Buffer.concat([collect, data]); + + if (collect.length >= 4 && collect.readUInt16BE(0) == 0xabcd) { + s.off('upstream'); + + let reply = await ngx.fetch('http://127.0.0.1:$p/validate', + {body: collect.slice(2,4), + headers: {Host:'aaa'}}); + + if (reply.status == 200) { + s.send(collect.slice(4), flags); + + } else { + s.send("__CLOSE__", flags); + } + } + }); + } + + export default {njs: test_njs, validate, preread_verify, filter_verify}; EOF -$t->try_run('no stream njs available')->plan(4); +$t->try_run('no stream njs available')->plan(7); -$t->run_daemon(\&stream_daemon, port(8090)); +$t->run_daemon(\&stream_daemon, port(8090), port(8091)); $t->waitforsocket('127.0.0.1:' . port(8090)); +$t->waitforsocket('127.0.0.1:' . port(8091)); ############################################################################### @@ -124,34 +154,77 @@ is(stream('127.0.0.1:' . port(8081))->io is(stream('127.0.0.1:' . port(8081))->io("\xAB\xCDQQ##"), '', 'preread validation failed'); +is(stream('127.0.0.1:' . port(8082))->io("\xAB\xCDQQ##"), '', + 'filter validation failed'); + +my $s = stream('127.0.0.1:' . port(8082)); + +TODO: { +local $TODO = 'not yet' + unless http_get('/njs') =~ /^([.0-9]+)$/m && $1 ge '0.7.7'; + +is($s->io("\xAB\xCDQZ##", read => 1), '##', 'filter validated'); + +} + +is($s->io("@@", read => 1), '@@', 'filter off'); + ############################################################################### sub stream_daemon { - my $server = IO::Socket::INET->new( - Proto => 'tcp', - LocalAddr => '127.0.0.1:' . port(8090), - Listen => 5, - Reuse => 1 - ) - or die "Can't create listening socket: $!\n"; + my (@ports) = @_; + my (@socks, @clients); + + for my $port (@ports) { + my $server = IO::Socket::INET->new( + Proto => 'tcp', + LocalAddr => "127.0.0.1:$port", + Listen => 5, + Reuse => 1 + ) + or die "Can't create listening socket: $!\n"; + push @socks, $server; + } + + my $sel = IO::Select->new(@socks); local $SIG{PIPE} = 'IGNORE'; - while (my $client = $server->accept()) { - $client->autoflush(1); + while (my @ready = $sel->can_read) { + foreach my $fh (@ready) { + if (grep $_ == $fh, @socks) { + my $new = $fh->accept; + $new->autoflush(1); + $sel->add($new); - log2c("(new connection $client)"); - - $client->sysread(my $buffer, 65536) or next; + } elsif (stream_handle_client($fh) + || $fh->sockport() == port(8090)) + { + $sel->remove($fh); + $fh->close; + } + } + } +} - log2i("$client $buffer"); +sub stream_handle_client { + my ($client) = @_; - log2o("$client $buffer"); + log2c("(new connection $client)"); + + $client->sysread(my $buffer, 65536) or return 1; + + log2i("$client $buffer"); - $client->syswrite($buffer); + if ($buffer eq "__CLOSE__") { + return 1; + } - close $client; - } + log2o("$client $buffer"); + + $client->syswrite($buffer); + + return 0; } sub log2i { Test::Nginx::log_core('|| <<', @_); }