From 59dfed72f2eba1f52b4f0f9712c5e1138d74f3f1 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 18 Apr 2019 00:01:04 -0400 Subject: [PATCH] Fix SocketsHttpHandler streams to do sync I/O in sync methods (dotnet/corefx#36946) * Fix SocketsHttpHandler response streams to do sync I/O in sync methods SocketsHttpHandler hands back response Streams for reading response body content. While we encourage developers to use the async Stream APIs, Stream does expose synchronous APIs, yet the current implementations are just wrapping the async ones and doing sync-over-async. This fixes the response stream synchronous APIs to be sync down to the underlying networking stream. It also fixes a couple other minor issues, e.g. Flush{Async} on read-only stream should be nops rather than throwing, we should include an error message about a stream being read-only when trying to write to it, etc. * Address PR feedback Commit migrated from https://github.com/dotnet/corefx/commit/315fd00cb2465390900209919cdb2bf6bc44ae0e --- .../System.Net.Http/src/System.Net.Http.csproj | 2 +- .../ChunkedEncodingReadStream.cs | 60 +++++++- .../ConnectionCloseReadStream.cs | 20 ++- .../SocketsHttpHandler/ContentLengthReadStream.cs | 34 +++++ .../Net/Http/SocketsHttpHandler/EmptyReadStream.cs | 15 +- .../Net/Http/SocketsHttpHandler/Http2Stream.cs | 50 +++++-- .../{BaseAsyncStream.cs => HttpBaseStream.cs} | 32 +++-- .../Net/Http/SocketsHttpHandler/HttpConnection.cs | 159 ++++++++++++++++++++- .../SocketsHttpHandler/HttpContentReadStream.cs | 5 +- .../Http/SocketsHttpHandler/HttpContentStream.cs | 2 +- .../SocketsHttpHandler/HttpContentWriteStream.cs | 10 +- .../Http/SocketsHttpHandler/RawConnectionStream.cs | 41 +++++- .../tests/FunctionalTests/ResponseStreamTest.cs | 18 +++ 13 files changed, 415 insertions(+), 33 deletions(-) rename src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/{BaseAsyncStream.cs => HttpBaseStream.cs} (75%) diff --git a/src/libraries/System.Net.Http/src/System.Net.Http.csproj b/src/libraries/System.Net.Http/src/System.Net.Http.csproj index 2de08e8..70ad122 100644 --- a/src/libraries/System.Net.Http/src/System.Net.Http.csproj +++ b/src/libraries/System.Net.Http/src/System.Net.Http.csproj @@ -127,7 +127,6 @@ - @@ -143,6 +142,7 @@ + diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ChunkedEncodingReadStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ChunkedEncodingReadStream.cs index d96546c..13de7c6 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ChunkedEncodingReadStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ChunkedEncodingReadStream.cs @@ -5,7 +5,6 @@ using System.Buffers.Text; using System.Diagnostics; using System.IO; -using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; @@ -36,6 +35,65 @@ namespace System.Net.Http _response = response; } + public override int Read(Span buffer) + { + if (_connection == null || buffer.Length == 0) + { + // Response body fully consumed or the caller didn't ask for any data. + return 0; + } + + // Try to consume from data we already have in the buffer. + int bytesRead = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default); + if (bytesRead > 0) + { + return bytesRead; + } + + // Nothing available to consume. Fall back to I/O. + while (true) + { + if (_connection == null) + { + // Fully consumed the response in ReadChunksFromConnectionBuffer. + return 0; + } + + if (_state == ParsingState.ExpectChunkData && + buffer.Length >= _connection.ReadBufferSize && + _chunkBytesRemaining >= (ulong)_connection.ReadBufferSize) + { + // As an optimization, we skip going through the connection's read buffer if both + // the remaining chunk data and the buffer are both at least as large + // as the connection buffer. That avoids an unnecessary copy while still reading + // the maximum amount we'd otherwise read at a time. + Debug.Assert(_connection.RemainingBuffer.Length == 0); + bytesRead = _connection.Read(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining))); + if (bytesRead == 0) + { + throw new IOException(SR.net_http_invalid_response); + } + _chunkBytesRemaining -= (ulong)bytesRead; + if (_chunkBytesRemaining == 0) + { + _state = ParsingState.ExpectChunkTerminator; + } + return bytesRead; + } + + // We're only here if we need more data to make forward progress. + _connection.Fill(); + + // Now that we have more, see if we can get any response data, and if + // we can we're done. + int bytesCopied = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default); + if (bytesCopied > 0) + { + return bytesCopied; + } + } + } + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionCloseReadStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionCloseReadStream.cs index 1506cdf..8960ad2 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionCloseReadStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionCloseReadStream.cs @@ -16,6 +16,25 @@ namespace System.Net.Http { } + public override int Read(Span buffer) + { + if (_connection == null || buffer.Length == 0) + { + // Response body fully consumed or the caller didn't ask for any data + return 0; + } + + int bytesRead = _connection.Read(buffer); + if (bytesRead == 0) + { + // We cannot reuse this connection, so close it. + _connection.Dispose(); + _connection = null; + } + + return bytesRead; + } + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); @@ -62,7 +81,6 @@ namespace System.Net.Http // We cannot reuse this connection, so close it. _connection.Dispose(); _connection = null; - return 0; } return bytesRead; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ContentLengthReadStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ContentLengthReadStream.cs index bd2727f..39e1440 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ContentLengthReadStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ContentLengthReadStream.cs @@ -21,6 +21,40 @@ namespace System.Net.Http _contentBytesRemaining = contentLength; } + public override int Read(Span buffer) + { + if (_connection == null || buffer.Length == 0) + { + // Response body fully consumed or the caller didn't ask for any data. + return 0; + } + + Debug.Assert(_contentBytesRemaining > 0); + if ((ulong)buffer.Length > _contentBytesRemaining) + { + buffer = buffer.Slice(0, (int)_contentBytesRemaining); + } + + int bytesRead = _connection.Read(buffer); + if (bytesRead <= 0) + { + // Unexpected end of response stream. + throw new IOException(SR.net_http_invalid_response); + } + + Debug.Assert((ulong)bytesRead <= _contentBytesRemaining); + _contentBytesRemaining -= (ulong)bytesRead; + + if (_contentBytesRemaining == 0) + { + // End of response body + _connection.CompleteResponse(); + _connection = null; + } + + return bytesRead; + } + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/EmptyReadStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/EmptyReadStream.cs index aa08a5b..6d154ea 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/EmptyReadStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/EmptyReadStream.cs @@ -2,12 +2,13 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.IO; using System.Threading; using System.Threading.Tasks; namespace System.Net.Http { - internal sealed class EmptyReadStream : BaseAsyncStream + internal sealed class EmptyReadStream : HttpBaseStream { internal static EmptyReadStream Instance { get; } = new EmptyReadStream(); @@ -19,16 +20,20 @@ namespace System.Net.Http protected override void Dispose(bool disposing) { /* nop */ } public override void Close() { /* nop */ } - public override int ReadByte() => -1; - public override int Read(Span buffer) => 0; public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) => cancellationToken.IsCancellationRequested ? new ValueTask(Task.FromCanceled(cancellationToken)) : new ValueTask(0); - public override ValueTask WriteAsync(ReadOnlyMemory destination, CancellationToken cancellationToken) => throw new NotSupportedException(); + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + ValidateCopyToArgs(this, destination, bufferSize); + return NopAsync(cancellationToken); + } - public override Task FlushAsync(CancellationToken cancellationToken) => throw new NotSupportedException(); + public override void Write(ReadOnlySpan buffer) => throw new NotSupportedException(SR.net_http_content_readonly_stream); + + public override ValueTask WriteAsync(ReadOnlyMemory destination, CancellationToken cancellationToken) => throw new NotSupportedException(); } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Stream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Stream.cs index c492381..3c9ad5e 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Stream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Stream.cs @@ -5,7 +5,6 @@ using System.Diagnostics; using System.IO; using System.Net.Http.Headers; -using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -407,6 +406,32 @@ namespace System.Net.Http } } + public int ReadData(Span buffer) + { + if (buffer.Length == 0) + { + return 0; + } + + (bool wait, int bytesRead) = TryReadFromBuffer(buffer); + if (wait) + { + // Synchronously block waiting for data to be produced. + Debug.Assert(bytesRead == 0); + GetWaiterTask().AsTask().GetAwaiter().GetResult(); + (wait, bytesRead) = TryReadFromBuffer(buffer); + Debug.Assert(!wait); + } + + if (bytesRead != 0) + { + ExtendWindow(bytesRead); + _connection.ExtendWindow(bytesRead); + } + + return bytesRead; + } + public async ValueTask ReadDataAsync(Memory buffer, CancellationToken cancellationToken) { if (buffer.Length == 0) @@ -432,7 +457,7 @@ namespace System.Net.Http return bytesRead; } - private async ValueTask SendDataAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) + private async Task SendDataAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) { ReadOnlyMemory remaining = buffer; @@ -490,7 +515,7 @@ namespace System.Net.Http void IValueTaskSource.OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _waitSource.OnCompleted(continuation, state, token, flags); void IValueTaskSource.GetResult(short token) => _waitSource.GetResult(token); - private sealed class Http2ReadStream : BaseAsyncStream + private sealed class Http2ReadStream : HttpBaseStream { private Http2Stream _http2Stream; @@ -519,6 +544,12 @@ namespace System.Net.Http public override bool CanRead => true; public override bool CanWrite => false; + public override int Read(Span destination) + { + Http2Stream http2Stream = _http2Stream ?? throw new ObjectDisposedException(nameof(Http2ReadStream)); + return http2Stream.ReadData(destination); + } + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken) { Http2Stream http2Stream = _http2Stream; @@ -530,13 +561,12 @@ namespace System.Net.Http return http2Stream.ReadDataAsync(destination, cancellationToken); } - public override ValueTask WriteAsync(ReadOnlyMemory destination, CancellationToken cancellationToken) => throw new NotSupportedException(); + public override void Write(ReadOnlySpan buffer) => throw new NotSupportedException(SR.net_http_content_readonly_stream); - public override Task FlushAsync(CancellationToken cancellationToken) => throw new NotSupportedException(); + public override ValueTask WriteAsync(ReadOnlyMemory destination, CancellationToken cancellationToken) => throw new NotSupportedException(); } - - private sealed class Http2WriteStream : BaseAsyncStream + private sealed class Http2WriteStream : HttpBaseStream { private Http2Stream _http2Stream; @@ -560,6 +590,8 @@ namespace System.Net.Http public override bool CanRead => false; public override bool CanWrite => true; + public override int Read(Span buffer) => throw new NotSupportedException(); + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) => throw new NotSupportedException(); public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) @@ -570,10 +602,8 @@ namespace System.Net.Http return new ValueTask(Task.FromException(new ObjectDisposedException(nameof(Http2WriteStream)))); } - return http2Stream.SendDataAsync(buffer, cancellationToken); + return new ValueTask(http2Stream.SendDataAsync(buffer, cancellationToken)); } - - public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; } } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/BaseAsyncStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpBaseStream.cs similarity index 75% rename from src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/BaseAsyncStream.cs rename to src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpBaseStream.cs index fed8103..b9b2c42 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/BaseAsyncStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpBaseStream.cs @@ -3,12 +3,13 @@ // See the LICENSE file in the project root for more information. using System.IO; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace System.Net.Http { - internal abstract class BaseAsyncStream : Stream + internal abstract class HttpBaseStream : Stream { public sealed override bool CanSeek => false; @@ -78,10 +79,16 @@ namespace System.Net.Http } } + public sealed override int ReadByte() + { + byte b = 0; + return Read(MemoryMarshal.CreateSpan(ref b, 1)) == 1 ? b : -1; + } + public sealed override int Read(byte[] buffer, int offset, int count) { ValidateBufferArgs(buffer, offset, count); - return ReadAsync(new Memory(buffer, offset, count), CancellationToken.None).GetAwaiter().GetResult(); + return Read(buffer.AsSpan(offset, count)); } public sealed override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) @@ -90,30 +97,39 @@ namespace System.Net.Http return ReadAsync(new Memory(buffer, offset, count), cancellationToken).AsTask(); } - public sealed override void Write(byte[] buffer, int offset, int count) + public override void Write(byte[] buffer, int offset, int count) { + // This does sync-over-async, but it also should only end up being used in strange + // situations. Either a derived stream overrides this anyway, so the implementation won't be used, + // or it's being called as part of HttpContent.SerializeToStreamAsync, which means custom + // content is explicitly choosing to make a synchronous call as part of an asynchronous method. ValidateBufferArgs(buffer, offset, count); WriteAsync(new Memory(buffer, offset, count), CancellationToken.None).GetAwaiter().GetResult(); } + public sealed override void WriteByte(byte value) => + Write(MemoryMarshal.CreateReadOnlySpan(ref value, 1)); + public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { ValidateBufferArgs(buffer, offset, count); return WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); } - public sealed override void Flush() => FlushAsync().GetAwaiter().GetResult(); + public override void Flush() { } - public sealed override void CopyTo(Stream destination, int bufferSize) => - CopyToAsync(destination, bufferSize, CancellationToken.None).GetAwaiter().GetResult(); + public override Task FlushAsync(CancellationToken cancellationToken) => NopAsync(cancellationToken); + protected static Task NopAsync(CancellationToken cancellationToken) => + cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : + Task.CompletedTask; // // Methods which must be implemented by derived classes // + public abstract override int Read(Span buffer); public abstract override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken); - public abstract override ValueTask WriteAsync(ReadOnlyMemory destination, CancellationToken cancellationToken); - public abstract override Task FlushAsync(CancellationToken cancellationToken); + public abstract override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken); } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs index c27762f..ff10f69 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs @@ -972,6 +972,13 @@ namespace System.Net.Http private static bool IsDigit(byte c) => (uint)(c - '0') <= '9' - '0'; + private void WriteToBuffer(ReadOnlySpan source) + { + Debug.Assert(source.Length <= _writeBuffer.Length - _writeOffset); + source.CopyTo(new Span(_writeBuffer, _writeOffset, source.Length)); + _writeOffset += source.Length; + } + private void WriteToBuffer(ReadOnlyMemory source) { Debug.Assert(source.Length <= _writeBuffer.Length - _writeOffset); @@ -1010,6 +1017,30 @@ namespace System.Net.Http } } + private void WriteWithoutBuffering(ReadOnlySpan source) + { + if (_writeOffset != 0) + { + int remaining = _writeBuffer.Length - _writeOffset; + if (source.Length <= remaining) + { + // There's something already in the write buffer, but the content + // we're writing can also fit after it in the write buffer. Copy + // the content to the write buffer and then flush it, so that we + // can do a single send rather than two. + WriteToBuffer(source); + Flush(); + return; + } + + // There's data in the write buffer and the data we're writing doesn't fit after it. + // Do two writes, one to flush the buffer and then another to write the supplied content. + Flush(); + } + + WriteToStream(source); + } + private ValueTask WriteWithoutBufferingAsync(ReadOnlyMemory source) { if (_writeOffset == 0) @@ -1172,6 +1203,15 @@ namespace System.Net.Http } } + private void Flush() + { + if (_writeOffset > 0) + { + WriteToStream(new ReadOnlySpan(_writeBuffer, 0, _writeOffset)); + _writeOffset = 0; + } + } + private ValueTask FlushAsync() { if (_writeOffset > 0) @@ -1183,6 +1223,12 @@ namespace System.Net.Http return default; } + private void WriteToStream(ReadOnlySpan source) + { + if (NetEventSource.IsEnabled) Trace($"Writing {source.Length} bytes."); + _stream.Write(source); + } + private ValueTask WriteToStreamAsync(ReadOnlyMemory source) { if (NetEventSource.IsEnabled) Trace($"Writing {source.Length} bytes."); @@ -1313,6 +1359,52 @@ namespace System.Net.Http } // Throws IOException on EOF. This is only called when we expect more data. + private void Fill() + { + Debug.Assert(_readAheadTask == null); + + int remaining = _readLength - _readOffset; + Debug.Assert(remaining >= 0); + + if (remaining == 0) + { + // No data in the buffer. Simply reset the offset and length to 0 to allow + // the whole buffer to be filled. + _readOffset = _readLength = 0; + } + else if (_readOffset > 0) + { + // There's some data in the buffer but it's not at the beginning. Shift it + // down to make room for more. + Buffer.BlockCopy(_readBuffer, _readOffset, _readBuffer, 0, remaining); + _readOffset = 0; + _readLength = remaining; + } + else if (remaining == _readBuffer.Length) + { + // The whole buffer is full, but the caller is still requesting more data, + // so increase the size of the buffer. + Debug.Assert(_readOffset == 0); + Debug.Assert(_readLength == _readBuffer.Length); + + var newReadBuffer = new byte[_readBuffer.Length * 2]; + Buffer.BlockCopy(_readBuffer, 0, newReadBuffer, 0, remaining); + _readBuffer = newReadBuffer; + _readOffset = 0; + _readLength = remaining; + } + + int bytesRead = _stream.Read(_readBuffer, _readLength, _readBuffer.Length - _readLength); + if (NetEventSource.IsEnabled) Trace($"Received {bytesRead} bytes."); + if (bytesRead == 0) + { + throw new IOException(SR.net_http_invalid_response); + } + + _readLength += bytesRead; + } + + // Throws IOException on EOF. This is only called when we expect more data. private async Task FillAsync() { Debug.Assert(_readAheadTask == null); @@ -1341,7 +1433,7 @@ namespace System.Net.Http Debug.Assert(_readOffset == 0); Debug.Assert(_readLength == _readBuffer.Length); - byte[] newReadBuffer = new byte[_readBuffer.Length * 2]; + var newReadBuffer = new byte[_readBuffer.Length * 2]; Buffer.BlockCopy(_readBuffer, 0, newReadBuffer, 0, remaining); _readBuffer = newReadBuffer; _readOffset = 0; @@ -1367,6 +1459,34 @@ namespace System.Net.Http _readOffset += buffer.Length; } + private int Read(Span destination) + { + // This is called when reading the response body. + + int remaining = _readLength - _readOffset; + if (remaining > 0) + { + // We have data in the read buffer. Return it to the caller. + if (destination.Length <= remaining) + { + ReadFromBuffer(destination); + return destination.Length; + } + else + { + ReadFromBuffer(destination.Slice(0, remaining)); + return remaining; + } + } + + // No data in read buffer. + // Do an unbuffered read directly against the underlying stream. + Debug.Assert(_readAheadTask == null, "Read ahead task should have been consumed as part of the headers."); + int count = _stream.Read(destination); + if (NetEventSource.IsEnabled) Trace($"Received {count} bytes."); + return count; + } + private async ValueTask ReadAsync(Memory destination) { // This is called when reading the response body. @@ -1395,6 +1515,43 @@ namespace System.Net.Http return count; } + private int ReadBuffered(Span destination) + { + // This is called when reading the response body. + Debug.Assert(destination.Length != 0); + + int remaining = _readLength - _readOffset; + if (remaining > 0) + { + // We have data in the read buffer. Return it to the caller. + if (destination.Length <= remaining) + { + ReadFromBuffer(destination); + return destination.Length; + } + else + { + ReadFromBuffer(destination.Slice(0, remaining)); + return remaining; + } + } + + // No data in read buffer. + _readOffset = _readLength = 0; + + // Do a buffered read directly against the underlying stream. + Debug.Assert(_readAheadTask == null, "Read ahead task should have been consumed as part of the headers."); + int bytesRead = _stream.Read(_readBuffer, 0, _readBuffer.Length); + if (NetEventSource.IsEnabled) Trace($"Received {bytesRead} bytes."); + _readLength = bytesRead; + + // Hand back as much data as we can fit. + int bytesToCopy = Math.Min(bytesRead, destination.Length); + _readBuffer.AsSpan(0, bytesToCopy).CopyTo(destination); + _readOffset = bytesToCopy; + return bytesToCopy; + } + private ValueTask ReadBufferedAsync(Memory destination) { // If the caller provided buffer, and thus the amount of data desired to be read, diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentReadStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentReadStream.cs index 080ff78..b7527be 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentReadStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentReadStream.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using System.Diagnostics; -using System.IO; using System.Threading; using System.Threading.Tasks; @@ -22,9 +21,9 @@ namespace System.Net.Http public sealed override bool CanRead => true; public sealed override bool CanWrite => false; - public sealed override ValueTask WriteAsync(ReadOnlyMemory destination, CancellationToken cancellationToken) => throw new NotSupportedException(); + public sealed override void Write(ReadOnlySpan buffer) => throw new NotSupportedException(SR.net_http_content_readonly_stream); - public override Task FlushAsync(CancellationToken cancellationToken) => throw new NotSupportedException(); + public sealed override ValueTask WriteAsync(ReadOnlyMemory destination, CancellationToken cancellationToken) => throw new NotSupportedException(); public virtual bool NeedsDrain => false; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentStream.cs index f3a0f01..b1b40ea 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentStream.cs @@ -4,7 +4,7 @@ namespace System.Net.Http { - internal abstract class HttpContentStream : BaseAsyncStream + internal abstract class HttpContentStream : HttpBaseStream { protected HttpConnection _connection; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentWriteStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentWriteStream.cs index 9dd513f..6802a07 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentWriteStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentWriteStream.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Diagnostics; +using System.IO; using System.Threading; using System.Threading.Tasks; @@ -18,12 +19,19 @@ namespace System.Net.Http public sealed override bool CanRead => false; public sealed override bool CanWrite => true; + public sealed override void Flush() => + _connection.Flush(); + public sealed override Task FlushAsync(CancellationToken ignored) => _connection.FlushAsync().AsTask(); - public abstract Task FinishAsync(); + public sealed override int Read(Span buffer) => throw new NotSupportedException(); public sealed override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) => throw new NotSupportedException(); + + public sealed override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => throw new NotSupportedException(); + + public abstract Task FinishAsync(); } } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/RawConnectionStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/RawConnectionStream.cs index 52acc7f..81d1dfc 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/RawConnectionStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/RawConnectionStream.cs @@ -20,6 +20,25 @@ namespace System.Net.Http public sealed override bool CanRead => true; public sealed override bool CanWrite => true; + public override int Read(Span buffer) + { + if (_connection == null || buffer.Length == 0) + { + // Response body fully consumed or the caller didn't ask for any data + return 0; + } + + int bytesRead = _connection.ReadBuffered(buffer); + if (bytesRead == 0) + { + // We cannot reuse this connection, so close it. + _connection.Dispose(); + _connection = null; + } + + return bytesRead; + } + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); @@ -61,7 +80,6 @@ namespace System.Net.Http // We cannot reuse this connection, so close it. _connection.Dispose(); _connection = null; - return 0; } return bytesRead; @@ -124,6 +142,25 @@ namespace System.Net.Http _connection = null; } + public override void Write(byte[] buffer, int offset, int count) + { + ValidateBufferArgs(buffer, offset, count); + Write(buffer.AsSpan(offset, count)); + } + + public override void Write(ReadOnlySpan buffer) + { + if (_connection == null) + { + throw new IOException(SR.net_http_io_write); + } + + if (buffer.Length != 0) + { + _connection.WriteWithoutBuffering(buffer); + } + } + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) @@ -147,6 +184,8 @@ namespace System.Net.Http new ValueTask(WaitWithConnectionCancellationAsync(writeTask, cancellationToken)); } + public override void Flush() => _connection?.Flush(); + public override Task FlushAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/ResponseStreamTest.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/ResponseStreamTest.cs index 57e8a22..b665177 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/ResponseStreamTest.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/ResponseStreamTest.cs @@ -27,6 +27,8 @@ namespace System.Net.Http.Functional.Tests [InlineData(3)] [InlineData(4)] [InlineData(5)] + [InlineData(6)] + [InlineData(7)] public async Task GetStreamAsync_ReadToEnd_Success(int readMode) { using (HttpClient client = CreateHttpClient()) @@ -82,6 +84,22 @@ namespace System.Net.Http.Functional.Tests break; case 5: + // ReadByte + int byteValue; + while ((byteValue = stream.ReadByte()) != -1) + { + ms.WriteByte((byte)byteValue); + } + responseBody = Encoding.UTF8.GetString(ms.ToArray()); + break; + + case 6: + // CopyTo + stream.CopyTo(ms); + responseBody = Encoding.UTF8.GetString(ms.ToArray()); + break; + + case 7: // CopyToAsync await stream.CopyToAsync(ms); responseBody = Encoding.UTF8.GetString(ms.ToArray()); -- 2.7.4