From 03db03f5aa250f73bb111dd06cbc197ad496faf8 Mon Sep 17 00:00:00 2001 From: Pavel Savara Date: Thu, 19 Jan 2023 18:33:44 +0100 Subject: [PATCH] [browser][http] Fix blocking of streaming response and abort (#80693) - return bytes of streaming response as soon as available - fix unhandled error in reader.cancel() promise - return cancelable promise from http_wasm_get_streamed_response_bytes - unit test for slowly streamed chunks - unit test for streaming and default cancellation --- .../System/Net/Http/HttpClientHandlerTest.cs | 119 ++++++++++++++++-- src/mono/wasm/runtime/http.ts | 53 ++++---- 2 files changed, 136 insertions(+), 36 deletions(-) diff --git a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs index f9f6de4931b..d5116fc0d10 100644 --- a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs +++ b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs @@ -984,13 +984,14 @@ namespace System.Net.Http.Functional.Tests } [Theory] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, true)] - [InlineData(false, false)] - [InlineData(null, false)] + [InlineData(true, true, true)] + [InlineData(true, true, false)] + [InlineData(true, false, false)] + [InlineData(false, true, false)] + [InlineData(false, false, false)] + [InlineData(null, false, false)] [ActiveIssue("https://github.com/dotnet/runtime/issues/65429", typeof(PlatformDetection), nameof(PlatformDetection.IsNodeJS))] - public async Task ReadAsStreamAsync_HandlerProducesWellBehavedResponseStream(bool? chunked, bool enableWasmStreaming) + public async Task ReadAsStreamAsync_HandlerProducesWellBehavedResponseStream(bool? chunked, bool enableWasmStreaming, bool slowChunks) { if (IsWinHttpHandler && UseVersion >= HttpVersion20.Value) { @@ -1003,6 +1004,13 @@ namespace System.Net.Http.Functional.Tests return; } + if (enableWasmStreaming && !PlatformDetection.IsBrowser) + { + // enableWasmStreaming makes only sense on Browser platform + return; + } + + var tcs = new TaskCompletionSource(); await LoopbackServerFactory.CreateClientAndServerAsync(async uri => { var request = new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion }; @@ -1079,11 +1087,21 @@ namespace System.Net.Http.Functional.Tests // Various forms of reading var buffer = new byte[1]; + var buffer2 = new byte[2]; if (PlatformDetection.IsBrowser) { #if !NETFRAMEWORK - Assert.Equal('h', await responseStream.ReadByteAsync()); + if(slowChunks) + { + Assert.Equal(1, await responseStream.ReadAsync(new Memory(buffer2))); + Assert.Equal((byte)'h', buffer2[0]); + tcs.SetResult(true); + } + else + { + Assert.Equal('h', await responseStream.ReadByteAsync()); + } Assert.Equal('e', await responseStream.ReadByteAsync()); Assert.Equal(1, await responseStream.ReadAsync(new Memory(buffer))); Assert.Equal((byte)'l', buffer[0]); @@ -1184,7 +1202,18 @@ namespace System.Net.Http.Functional.Tests { case true: await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false); - await connection.SendResponseBodyAsync("3\r\nhel\r\n8\r\nlo world\r\n0\r\n\r\n"); + if(PlatformDetection.IsBrowser && slowChunks) + { + await connection.SendResponseBodyAsync("1\r\nh\r\n", false); + await tcs.Task; + await connection.SendResponseBodyAsync("2\r\nel\r\n", false); + await connection.SendResponseBodyAsync("8\r\nlo world\r\n", false); + await connection.SendResponseBodyAsync("0\r\n\r\n", true); + } + else + { + await connection.SendResponseBodyAsync("3\r\nhel\r\n8\r\nlo world\r\n0\r\n\r\n"); + } break; case false: @@ -1295,6 +1324,80 @@ namespace System.Net.Http.Functional.Tests server => server.AcceptConnectionSendResponseAndCloseAsync()); } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))] + [ActiveIssue("https://github.com/dotnet/runtime/issues/65429", typeof(PlatformDetection), nameof(PlatformDetection.IsNodeJS))] + public async Task ReadAsStreamAsync_StreamingCancellation() + { + var tcs = new TaskCompletionSource(); + var tcs2 = new TaskCompletionSource(); + await LoopbackServerFactory.CreateClientAndServerAsync(async uri => + { + var request = new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion }; +#if !NETFRAMEWORK + request.Options.Set(new HttpRequestOptionsKey("WebAssemblyEnableStreamingResponse"), true); +#endif + + var cts = new CancellationTokenSource(); + using (var client = new HttpMessageInvoker(CreateHttpClientHandler())) + using (HttpResponseMessage response = await client.SendAsync(TestAsync, request, CancellationToken.None)) + { + using (Stream responseStream = await response.Content.ReadAsStreamAsync(TestAsync)) + { + var buffer = new byte[1]; +#if !NETFRAMEWORK + Assert.Equal(1, await responseStream.ReadAsync(new Memory(buffer))); + Assert.Equal((byte)'h', buffer[0]); + var sizePromise = responseStream.ReadAsync(new Memory(buffer), cts.Token); + await tcs2.Task; // wait for the request and response header to be sent + cts.Cancel(); + await Assert.ThrowsAsync(async () => await sizePromise); + tcs.SetResult(true); +#endif + } + } + }, async server => + { + await server.AcceptConnectionAsync(async connection => + { + await connection.ReadRequestDataAsync(); + await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false); + await connection.SendResponseBodyAsync("1\r\nh\r\n", false); + tcs2.SetResult(true); + await tcs.Task; + }); + }); + } + + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))] + public async Task ReadAsStreamAsync_Cancellation() + { + var tcs = new TaskCompletionSource(); + var tcs2 = new TaskCompletionSource(); + await LoopbackServerFactory.CreateClientAndServerAsync(async uri => + { + var request = new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion }; + var cts = new CancellationTokenSource(); + using (var client = new HttpMessageInvoker(CreateHttpClientHandler())) + { + var responsePromise = client.SendAsync(TestAsync, request, cts.Token); + await tcs2.Task; // wait for the request to be sent + cts.Cancel(); + await Assert.ThrowsAsync(async () => await responsePromise); + tcs.SetResult(true); + } + }, async server => + { + await server.AcceptConnectionAsync(async connection => + { + await connection.ReadRequestDataAsync(); + tcs2.SetResult(true); + await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false); + await connection.SendResponseBodyAsync("1\r\nh\r\n", false); + await tcs.Task; + }); + }); + } + [Fact] public async Task Dispose_DisposingHandlerCancelsActiveOperationsWithoutResponses() { diff --git a/src/mono/wasm/runtime/http.ts b/src/mono/wasm/runtime/http.ts index 7404cefcb30..73365d34070 100644 --- a/src/mono/wasm/runtime/http.ts +++ b/src/mono/wasm/runtime/http.ts @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. import { wrap_as_cancelable_promise } from "./cancelable-promise"; +import { Module } from "./imports"; import { MemoryViewType, Span } from "./marshal"; import { mono_assert } from "./types"; import { VoidPtr } from "./types/emscripten"; @@ -21,7 +22,12 @@ export function http_wasm_abort_request(abort_controller: AbortController): void export function http_wasm_abort_response(res: ResponseExtension): void { res.__abort_controller.abort(); if (res.__reader) { - res.__reader.cancel(); + res.__reader.cancel().catch((err) => { + if (err && err.name !== "AbortError") { + Module.printErr("MONO_WASM: Error in http_wasm_abort_response: " + err); + } + // otherwise, it's expected + }); } } @@ -100,42 +106,33 @@ export function http_wasm_get_response_bytes(res: ResponseExtension, view: Span) return bytes_read; } -export async function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bufferPtr: VoidPtr, bufferLength: number): Promise { +export function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bufferPtr: VoidPtr, bufferLength: number): Promise { // the bufferPtr is pinned by the caller const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte); return wrap_as_cancelable_promise(async () => { - if (!res.__chunk && res.body) { - res.__reader = res.body.getReader(); + if (!res.__reader) { + res.__reader = res.body!.getReader(); + } + if (!res.__chunk) { res.__chunk = await res.__reader.read(); res.__source_offset = 0; } + if (res.__chunk.done) { + return 0; + } - let target_offset = 0; - let bytes_read = 0; - // loop until end of browser stream or end of C# buffer - while (res.__reader && res.__chunk && !res.__chunk.done) { - const remaining_source = res.__chunk.value.byteLength - res.__source_offset; - if (remaining_source === 0) { - res.__chunk = await res.__reader.read(); - res.__source_offset = 0; - continue;// are we done yet - } - - const remaining_target = view.byteLength - target_offset; - const bytes_copied = Math.min(remaining_source, remaining_target); - const source_view = res.__chunk.value.subarray(res.__source_offset, res.__source_offset + bytes_copied); - - // copy available bytes - view.set(source_view, target_offset); - target_offset += bytes_copied; - bytes_read += bytes_copied; - res.__source_offset += bytes_copied; + const remaining_source = res.__chunk.value.byteLength - res.__source_offset; + mono_assert(remaining_source > 0, "expected remaining_source to be greater than 0"); - if (target_offset == view.byteLength) { - return bytes_read; - } + const bytes_copied = Math.min(remaining_source, view.byteLength); + const source_view = res.__chunk.value.subarray(res.__source_offset, res.__source_offset + bytes_copied); + view.set(source_view, 0); + res.__source_offset += bytes_copied; + if (remaining_source == bytes_copied) { + res.__chunk = undefined; } - return bytes_read; + + return bytes_copied; }); } -- 2.34.1