Mercurial > hg > nginx-tests
comparison stream_js_fetch.t @ 1785:2c08bfffe112
Tests: added stream js test for s.send() in async context.
author | Dmitry Volyntsev <xeioex@nginx.com> |
---|---|
date | Tue, 23 Aug 2022 19:40:07 -0700 |
parents | 898598722ad4 |
children | cadf15e2e2b3 |
comparison
equal
deleted
inserted
replaced
1784:898598722ad4 | 1785:2c08bfffe112 |
---|---|
64 | 64 |
65 server { | 65 server { |
66 listen 127.0.0.1:8081; | 66 listen 127.0.0.1:8081; |
67 js_preread test.preread_verify; | 67 js_preread test.preread_verify; |
68 proxy_pass 127.0.0.1:8090; | 68 proxy_pass 127.0.0.1:8090; |
69 } | |
70 | |
71 server { | |
72 listen 127.0.0.1:8082; | |
73 js_filter test.filter_verify; | |
74 proxy_pass 127.0.0.1:8091; | |
69 } | 75 } |
70 } | 76 } |
71 | 77 |
72 EOF | 78 EOF |
73 | 79 |
101 s.deny(); | 107 s.deny(); |
102 } | 108 } |
103 }); | 109 }); |
104 } | 110 } |
105 | 111 |
106 export default {njs: test_njs, validate, preread_verify} | 112 function filter_verify(s) { |
113 var collect = Buffer.from([]); | |
114 | |
115 s.on('upstream', async function (data, flags) { | |
116 collect = Buffer.concat([collect, data]); | |
117 | |
118 if (collect.length >= 4 && collect.readUInt16BE(0) == 0xabcd) { | |
119 s.off('upstream'); | |
120 | |
121 let reply = await ngx.fetch('http://127.0.0.1:$p/validate', | |
122 {body: collect.slice(2,4), | |
123 headers: {Host:'aaa'}}); | |
124 | |
125 if (reply.status == 200) { | |
126 s.send(collect.slice(4), flags); | |
127 | |
128 } else { | |
129 s.send("__CLOSE__", flags); | |
130 } | |
131 } | |
132 }); | |
133 } | |
134 | |
135 export default {njs: test_njs, validate, preread_verify, filter_verify}; | |
107 EOF | 136 EOF |
108 | 137 |
109 $t->try_run('no stream njs available')->plan(4); | 138 $t->try_run('no stream njs available')->plan(7); |
110 | 139 |
111 $t->run_daemon(\&stream_daemon, port(8090)); | 140 $t->run_daemon(\&stream_daemon, port(8090), port(8091)); |
112 $t->waitforsocket('127.0.0.1:' . port(8090)); | 141 $t->waitforsocket('127.0.0.1:' . port(8090)); |
142 $t->waitforsocket('127.0.0.1:' . port(8091)); | |
113 | 143 |
114 ############################################################################### | 144 ############################################################################### |
115 | 145 |
116 local $TODO = 'not yet' | 146 local $TODO = 'not yet' |
117 unless http_get('/njs') =~ /^([.0-9]+)$/m && $1 ge '0.5.1'; | 147 unless http_get('/njs') =~ /^([.0-9]+)$/m && $1 ge '0.5.1'; |
122 is(stream('127.0.0.1:' . port(8081))->io("\xAC\xCDQZ##"), '', | 152 is(stream('127.0.0.1:' . port(8081))->io("\xAC\xCDQZ##"), '', |
123 'preread invalid magic'); | 153 'preread invalid magic'); |
124 is(stream('127.0.0.1:' . port(8081))->io("\xAB\xCDQQ##"), '', | 154 is(stream('127.0.0.1:' . port(8081))->io("\xAB\xCDQQ##"), '', |
125 'preread validation failed'); | 155 'preread validation failed'); |
126 | 156 |
157 is(stream('127.0.0.1:' . port(8082))->io("\xAB\xCDQQ##"), '', | |
158 'filter validation failed'); | |
159 | |
160 my $s = stream('127.0.0.1:' . port(8082)); | |
161 | |
162 TODO: { | |
163 local $TODO = 'not yet' | |
164 unless http_get('/njs') =~ /^([.0-9]+)$/m && $1 ge '0.7.7'; | |
165 | |
166 is($s->io("\xAB\xCDQZ##", read => 1), '##', 'filter validated'); | |
167 | |
168 } | |
169 | |
170 is($s->io("@@", read => 1), '@@', 'filter off'); | |
171 | |
127 ############################################################################### | 172 ############################################################################### |
128 | 173 |
129 sub stream_daemon { | 174 sub stream_daemon { |
130 my $server = IO::Socket::INET->new( | 175 my (@ports) = @_; |
131 Proto => 'tcp', | 176 my (@socks, @clients); |
132 LocalAddr => '127.0.0.1:' . port(8090), | 177 |
133 Listen => 5, | 178 for my $port (@ports) { |
134 Reuse => 1 | 179 my $server = IO::Socket::INET->new( |
135 ) | 180 Proto => 'tcp', |
136 or die "Can't create listening socket: $!\n"; | 181 LocalAddr => "127.0.0.1:$port", |
182 Listen => 5, | |
183 Reuse => 1 | |
184 ) | |
185 or die "Can't create listening socket: $!\n"; | |
186 push @socks, $server; | |
187 } | |
188 | |
189 my $sel = IO::Select->new(@socks); | |
137 | 190 |
138 local $SIG{PIPE} = 'IGNORE'; | 191 local $SIG{PIPE} = 'IGNORE'; |
139 | 192 |
140 while (my $client = $server->accept()) { | 193 while (my @ready = $sel->can_read) { |
141 $client->autoflush(1); | 194 foreach my $fh (@ready) { |
142 | 195 if (grep $_ == $fh, @socks) { |
143 log2c("(new connection $client)"); | 196 my $new = $fh->accept; |
144 | 197 $new->autoflush(1); |
145 $client->sysread(my $buffer, 65536) or next; | 198 $sel->add($new); |
146 | 199 |
147 log2i("$client $buffer"); | 200 } elsif (stream_handle_client($fh) |
148 | 201 || $fh->sockport() == port(8090)) |
149 log2o("$client $buffer"); | 202 { |
150 | 203 $sel->remove($fh); |
151 $client->syswrite($buffer); | 204 $fh->close; |
152 | 205 } |
153 close $client; | 206 } |
154 } | 207 } |
208 } | |
209 | |
210 sub stream_handle_client { | |
211 my ($client) = @_; | |
212 | |
213 log2c("(new connection $client)"); | |
214 | |
215 $client->sysread(my $buffer, 65536) or return 1; | |
216 | |
217 log2i("$client $buffer"); | |
218 | |
219 if ($buffer eq "__CLOSE__") { | |
220 return 1; | |
221 } | |
222 | |
223 log2o("$client $buffer"); | |
224 | |
225 $client->syswrite($buffer); | |
226 | |
227 return 0; | |
155 } | 228 } |
156 | 229 |
157 sub log2i { Test::Nginx::log_core('|| <<', @_); } | 230 sub log2i { Test::Nginx::log_core('|| <<', @_); } |
158 sub log2o { Test::Nginx::log_core('|| >>', @_); } | 231 sub log2o { Test::Nginx::log_core('|| >>', @_); } |
159 sub log2c { Test::Nginx::log_core('||', @_); } | 232 sub log2c { Test::Nginx::log_core('||', @_); } |