Fix SocketsHttpHandler streams to do sync I/O in sync methods (dotnet/corefx#36946)
authorStephen Toub <stoub@microsoft.com>
Thu, 18 Apr 2019 04:01:04 +0000 (00:01 -0400)
committerGitHub <noreply@github.com>
Thu, 18 Apr 2019 04:01:04 +0000 (00:01 -0400)
* Fix SocketsHttpHandler response streams to do sync I/O in sync methods

SocketsHttpHandler hands back response Streams for reading response body content.  While we encourage developers to use the async Stream APIs, Stream does expose synchronous APIs, yet the current implementations are just wrapping the async ones and doing sync-over-async.

This fixes the response stream synchronous APIs to be sync down to the underlying networking stream.

It also fixes a couple other minor issues, e.g. Flush{Async} on read-only stream should be nops rather than throwing, we should include an error message about a stream being read-only when trying to write to it, etc.

* Address PR feedback

Commit migrated from https://github.com/dotnet/corefx/commit/315fd00cb2465390900209919cdb2bf6bc44ae0e

13 files changed:
src/libraries/System.Net.Http/src/System.Net.Http.csproj
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ChunkedEncodingReadStream.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionCloseReadStream.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ContentLengthReadStream.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/EmptyReadStream.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Stream.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpBaseStream.cs [moved from src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/BaseAsyncStream.cs with 75% similarity]
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentReadStream.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentStream.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpContentWriteStream.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/RawConnectionStream.cs
src/libraries/System.Net.Http/tests/FunctionalTests/ResponseStreamTest.cs

index 2de08e8..70ad122 100644 (file)
     <Compile Include="System\Net\Http\SocketsHttpHandler\AuthenticationHelper.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\AuthenticationHelper.Digest.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\AuthenticationHelper.NtAuth.cs" />
-    <Compile Include="System\Net\Http\SocketsHttpHandler\BaseAsyncStream.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\CancellationHelper.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\ChunkedEncodingReadStream.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\ChunkedEncodingWriteStream.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\Http2Connection.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\Http2Stream.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\HttpAuthenticatedConnectionHandler.cs" />
+    <Compile Include="System\Net\Http\SocketsHttpHandler\HttpBaseStream.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnection.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionBase.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionHandler.cs" />
index d96546c..13de7c6 100644 (file)
@@ -5,7 +5,6 @@
 using System.Buffers.Text;
 using System.Diagnostics;
 using System.IO;
-using System.Net.Http.Headers;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -36,6 +35,65 @@ namespace System.Net.Http
                 _response = response;
             }
 
+            public override int Read(Span<byte> buffer)
+            {
+                if (_connection == null || buffer.Length == 0)
+                {
+                    // Response body fully consumed or the caller didn't ask for any data.
+                    return 0;
+                }
+
+                // Try to consume from data we already have in the buffer.
+                int bytesRead = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
+                if (bytesRead > 0)
+                {
+                    return bytesRead;
+                }
+
+                // Nothing available to consume.  Fall back to I/O.
+                while (true)
+                {
+                    if (_connection == null)
+                    {
+                        // Fully consumed the response in ReadChunksFromConnectionBuffer.
+                        return 0;
+                    }
+
+                    if (_state == ParsingState.ExpectChunkData &&
+                        buffer.Length >= _connection.ReadBufferSize &&
+                        _chunkBytesRemaining >= (ulong)_connection.ReadBufferSize)
+                    {
+                        // As an optimization, we skip going through the connection's read buffer if both
+                        // the remaining chunk data and the buffer are both at least as large
+                        // as the connection buffer.  That avoids an unnecessary copy while still reading
+                        // the maximum amount we'd otherwise read at a time.
+                        Debug.Assert(_connection.RemainingBuffer.Length == 0);
+                        bytesRead = _connection.Read(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining)));
+                        if (bytesRead == 0)
+                        {
+                            throw new IOException(SR.net_http_invalid_response);
+                        }
+                        _chunkBytesRemaining -= (ulong)bytesRead;
+                        if (_chunkBytesRemaining == 0)
+                        {
+                            _state = ParsingState.ExpectChunkTerminator;
+                        }
+                        return bytesRead;
+                    }
+
+                    // We're only here if we need more data to make forward progress.
+                    _connection.Fill();
+
+                    // Now that we have more, see if we can get any response data, and if
+                    // we can we're done.
+                    int bytesCopied = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
+                    if (bytesCopied > 0)
+                    {
+                        return bytesCopied;
+                    }
+                }
+            }
+
             public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
             {
                 if (cancellationToken.IsCancellationRequested)
index 1506cdf..8960ad2 100644 (file)
@@ -16,6 +16,25 @@ namespace System.Net.Http
             {
             }
 
+            public override int Read(Span<byte> buffer)
+            {
+                if (_connection == null || buffer.Length == 0)
+                {
+                    // Response body fully consumed or the caller didn't ask for any data
+                    return 0;
+                }
+
+                int bytesRead = _connection.Read(buffer);
+                if (bytesRead == 0)
+                {
+                    // We cannot reuse this connection, so close it.
+                    _connection.Dispose();
+                    _connection = null;
+                }
+
+                return bytesRead;
+            }
+
             public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
             {
                 CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
@@ -62,7 +81,6 @@ namespace System.Net.Http
                     // We cannot reuse this connection, so close it.
                     _connection.Dispose();
                     _connection = null;
-                    return 0;
                 }
 
                 return bytesRead;
index bd2727f..39e1440 100644 (file)
@@ -21,6 +21,40 @@ namespace System.Net.Http
                 _contentBytesRemaining = contentLength;
             }
 
+            public override int Read(Span<byte> buffer)
+            {
+                if (_connection == null || buffer.Length == 0)
+                {
+                    // Response body fully consumed or the caller didn't ask for any data.
+                    return 0;
+                }
+
+                Debug.Assert(_contentBytesRemaining > 0);
+                if ((ulong)buffer.Length > _contentBytesRemaining)
+                {
+                    buffer = buffer.Slice(0, (int)_contentBytesRemaining);
+                }
+
+                int bytesRead = _connection.Read(buffer);
+                if (bytesRead <= 0)
+                {
+                    // Unexpected end of response stream.
+                    throw new IOException(SR.net_http_invalid_response);
+                }
+
+                Debug.Assert((ulong)bytesRead <= _contentBytesRemaining);
+                _contentBytesRemaining -= (ulong)bytesRead;
+
+                if (_contentBytesRemaining == 0)
+                {
+                    // End of response body
+                    _connection.CompleteResponse();
+                    _connection = null;
+                }
+
+                return bytesRead;
+            }
+
             public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
             {
                 CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
index aa08a5b..6d154ea 100644 (file)
@@ -2,12 +2,13 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Net.Http
 {
-    internal sealed class EmptyReadStream : BaseAsyncStream
+    internal sealed class EmptyReadStream : HttpBaseStream
     {
         internal static EmptyReadStream Instance { get; } = new EmptyReadStream();
 
@@ -19,16 +20,20 @@ namespace System.Net.Http
         protected override void Dispose(bool disposing) {  /* nop */ }
         public override void Close() { /* nop */ }
 
-        public override int ReadByte() => -1;
-
         public override int Read(Span<byte> buffer) => 0;
 
         public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken) =>
             cancellationToken.IsCancellationRequested ? new ValueTask<int>(Task.FromCanceled<int>(cancellationToken)) :
             new ValueTask<int>(0);
 
-        public override ValueTask WriteAsync(ReadOnlyMemory<byte> destination, CancellationToken cancellationToken) => throw new NotSupportedException();
+        public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
+        {
+            ValidateCopyToArgs(this, destination, bufferSize);
+            return NopAsync(cancellationToken);
+        }
 
-        public override Task FlushAsync(CancellationToken cancellationToken) => throw new NotSupportedException();
+        public override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException(SR.net_http_content_readonly_stream);
+
+        public override ValueTask WriteAsync(ReadOnlyMemory<byte> destination, CancellationToken cancellationToken) => throw new NotSupportedException();
     }
 } 
index c492381..3c9ad5e 100644 (file)
@@ -5,7 +5,6 @@
 using System.Diagnostics;
 using System.IO;
 using System.Net.Http.Headers;
-using System.Runtime.CompilerServices;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
@@ -407,6 +406,32 @@ namespace System.Net.Http
                 }
             }
 
+            public int ReadData(Span<byte> buffer)
+            {
+                if (buffer.Length == 0)
+                {
+                    return 0;
+                }
+
+                (bool wait, int bytesRead) = TryReadFromBuffer(buffer);
+                if (wait)
+                {
+                    // Synchronously block waiting for data to be produced.
+                    Debug.Assert(bytesRead == 0);
+                    GetWaiterTask().AsTask().GetAwaiter().GetResult();
+                    (wait, bytesRead) = TryReadFromBuffer(buffer);
+                    Debug.Assert(!wait);
+                }
+
+                if (bytesRead != 0)
+                {
+                    ExtendWindow(bytesRead);
+                    _connection.ExtendWindow(bytesRead);
+                }
+
+                return bytesRead;
+            }
+
             public async ValueTask<int> ReadDataAsync(Memory<byte> buffer, CancellationToken cancellationToken)
             {
                 if (buffer.Length == 0)
@@ -432,7 +457,7 @@ namespace System.Net.Http
                 return bytesRead;
             }
 
-            private async ValueTask SendDataAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
+            private async Task SendDataAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
             {
                 ReadOnlyMemory<byte> remaining = buffer;
 
@@ -490,7 +515,7 @@ namespace System.Net.Http
             void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _waitSource.OnCompleted(continuation, state, token, flags);
             void IValueTaskSource.GetResult(short token) => _waitSource.GetResult(token);
 
-            private sealed class Http2ReadStream : BaseAsyncStream
+            private sealed class Http2ReadStream : HttpBaseStream
             {
                 private Http2Stream _http2Stream;
 
@@ -519,6 +544,12 @@ namespace System.Net.Http
                 public override bool CanRead => true;
                 public override bool CanWrite => false;
 
+                public override int Read(Span<byte> destination)
+                {
+                    Http2Stream http2Stream = _http2Stream ?? throw new ObjectDisposedException(nameof(Http2ReadStream));
+                    return http2Stream.ReadData(destination);
+                }
+
                 public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
                 {
                     Http2Stream http2Stream = _http2Stream;
@@ -530,13 +561,12 @@ namespace System.Net.Http
                     return http2Stream.ReadDataAsync(destination, cancellationToken);
                 }
 
-                public override ValueTask WriteAsync(ReadOnlyMemory<byte> destination, CancellationToken cancellationToken) => throw new NotSupportedException();
+                public override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException(SR.net_http_content_readonly_stream);
 
-                public override Task FlushAsync(CancellationToken cancellationToken) => throw new NotSupportedException();
+                public override ValueTask WriteAsync(ReadOnlyMemory<byte> destination, CancellationToken cancellationToken) => throw new NotSupportedException();
             }
 
-
-            private sealed class Http2WriteStream : BaseAsyncStream
+            private sealed class Http2WriteStream : HttpBaseStream
             {
                 private Http2Stream _http2Stream;
 
@@ -560,6 +590,8 @@ namespace System.Net.Http
                 public override bool CanRead => false;
                 public override bool CanWrite => true;
 
+                public override int Read(Span<byte> buffer) => throw new NotSupportedException();
+
                 public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken) => throw new NotSupportedException();
 
                 public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
@@ -570,10 +602,8 @@ namespace System.Net.Http
                         return new ValueTask(Task.FromException(new ObjectDisposedException(nameof(Http2WriteStream))));
                     }
 
-                    return http2Stream.SendDataAsync(buffer, cancellationToken);
+                    return new ValueTask(http2Stream.SendDataAsync(buffer, cancellationToken));
                 }
-
-                public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
             }
         }
     }
@@ -3,12 +3,13 @@
 // See the LICENSE file in the project root for more information.
 
 using System.IO;
+using System.Runtime.InteropServices;
 using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Net.Http
 {
-    internal abstract class BaseAsyncStream : Stream
+    internal abstract class HttpBaseStream : Stream
     {
         public sealed override bool CanSeek => false;
 
@@ -78,10 +79,16 @@ namespace System.Net.Http
             }
         }
 
+        public sealed override int ReadByte()
+        {
+            byte b = 0;
+            return Read(MemoryMarshal.CreateSpan(ref b, 1)) == 1 ? b : -1;
+        }
+
         public sealed override int Read(byte[] buffer, int offset, int count)
         {
             ValidateBufferArgs(buffer, offset, count);
-            return ReadAsync(new Memory<byte>(buffer, offset, count), CancellationToken.None).GetAwaiter().GetResult();
+            return Read(buffer.AsSpan(offset, count));
         }
 
         public sealed override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@@ -90,30 +97,39 @@ namespace System.Net.Http
             return ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
         }
 
-        public sealed override void Write(byte[] buffer, int offset, int count)
+        public override void Write(byte[] buffer, int offset, int count)
         {
+            // This does sync-over-async, but it also should only end up being used in strange
+            // situations.  Either a derived stream overrides this anyway, so the implementation won't be used,
+            // or it's being called as part of HttpContent.SerializeToStreamAsync, which means custom
+            // content is explicitly choosing to make a synchronous call as part of an asynchronous method.
             ValidateBufferArgs(buffer, offset, count);
             WriteAsync(new Memory<byte>(buffer, offset, count), CancellationToken.None).GetAwaiter().GetResult();
         }
 
+        public sealed override void WriteByte(byte value) =>
+            Write(MemoryMarshal.CreateReadOnlySpan(ref value, 1));
+
         public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
         {
             ValidateBufferArgs(buffer, offset, count);
             return WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
         }
 
-        public sealed override void Flush() => FlushAsync().GetAwaiter().GetResult();
+        public override void Flush() { }
 
-        public sealed override void CopyTo(Stream destination, int bufferSize) =>
-            CopyToAsync(destination, bufferSize, CancellationToken.None).GetAwaiter().GetResult();
+        public override Task FlushAsync(CancellationToken cancellationToken) => NopAsync(cancellationToken);
 
+        protected static Task NopAsync(CancellationToken cancellationToken) =>
+            cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) :
+            Task.CompletedTask;
 
         //
         // Methods which must be implemented by derived classes
         //
 
+        public abstract override int Read(Span<byte> buffer);
         public abstract override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken);
-        public abstract override ValueTask WriteAsync(ReadOnlyMemory<byte> destination, CancellationToken cancellationToken);
-        public abstract override Task FlushAsync(CancellationToken cancellationToken);
+        public abstract override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken);
     }
 }
index c27762f..ff10f69 100644 (file)
@@ -972,6 +972,13 @@ namespace System.Net.Http
 
         private static bool IsDigit(byte c) => (uint)(c - '0') <= '9' - '0';
 
+        private void WriteToBuffer(ReadOnlySpan<byte> source)
+        {
+            Debug.Assert(source.Length <= _writeBuffer.Length - _writeOffset);
+            source.CopyTo(new Span<byte>(_writeBuffer, _writeOffset, source.Length));
+            _writeOffset += source.Length;
+        }
+
         private void WriteToBuffer(ReadOnlyMemory<byte> source)
         {
             Debug.Assert(source.Length <= _writeBuffer.Length - _writeOffset);
@@ -1010,6 +1017,30 @@ namespace System.Net.Http
             }
         }
 
+        private void WriteWithoutBuffering(ReadOnlySpan<byte> source)
+        {
+            if (_writeOffset != 0)
+            {
+                int remaining = _writeBuffer.Length - _writeOffset;
+                if (source.Length <= remaining)
+                {
+                    // There's something already in the write buffer, but the content
+                    // we're writing can also fit after it in the write buffer.  Copy
+                    // the content to the write buffer and then flush it, so that we
+                    // can do a single send rather than two.
+                    WriteToBuffer(source);
+                    Flush();
+                    return;
+                }
+
+                // There's data in the write buffer and the data we're writing doesn't fit after it.
+                // Do two writes, one to flush the buffer and then another to write the supplied content.
+                Flush();
+            }
+
+            WriteToStream(source);
+        }
+
         private ValueTask WriteWithoutBufferingAsync(ReadOnlyMemory<byte> source)
         {
             if (_writeOffset == 0)
@@ -1172,6 +1203,15 @@ namespace System.Net.Http
             }
         }
 
+        private void Flush()
+        {
+            if (_writeOffset > 0)
+            {
+                WriteToStream(new ReadOnlySpan<byte>(_writeBuffer, 0, _writeOffset));
+                _writeOffset = 0;
+            }
+        }
+
         private ValueTask FlushAsync()
         {
             if (_writeOffset > 0)
@@ -1183,6 +1223,12 @@ namespace System.Net.Http
             return default;
         }
 
+        private void WriteToStream(ReadOnlySpan<byte> source)
+        {
+            if (NetEventSource.IsEnabled) Trace($"Writing {source.Length} bytes.");
+            _stream.Write(source);
+        }
+
         private ValueTask WriteToStreamAsync(ReadOnlyMemory<byte> source)
         {
             if (NetEventSource.IsEnabled) Trace($"Writing {source.Length} bytes.");
@@ -1313,6 +1359,52 @@ namespace System.Net.Http
         }
 
         // Throws IOException on EOF.  This is only called when we expect more data.
+        private void Fill()
+        {
+            Debug.Assert(_readAheadTask == null);
+
+            int remaining = _readLength - _readOffset;
+            Debug.Assert(remaining >= 0);
+
+            if (remaining == 0)
+            {
+                // No data in the buffer.  Simply reset the offset and length to 0 to allow
+                // the whole buffer to be filled.
+                _readOffset = _readLength = 0;
+            }
+            else if (_readOffset > 0)
+            {
+                // There's some data in the buffer but it's not at the beginning.  Shift it
+                // down to make room for more.
+                Buffer.BlockCopy(_readBuffer, _readOffset, _readBuffer, 0, remaining);
+                _readOffset = 0;
+                _readLength = remaining;
+            }
+            else if (remaining == _readBuffer.Length)
+            {
+                // The whole buffer is full, but the caller is still requesting more data,
+                // so increase the size of the buffer.
+                Debug.Assert(_readOffset == 0);
+                Debug.Assert(_readLength == _readBuffer.Length);
+
+                var newReadBuffer = new byte[_readBuffer.Length * 2];
+                Buffer.BlockCopy(_readBuffer, 0, newReadBuffer, 0, remaining);
+                _readBuffer = newReadBuffer;
+                _readOffset = 0;
+                _readLength = remaining;
+            }
+
+            int bytesRead = _stream.Read(_readBuffer, _readLength, _readBuffer.Length - _readLength);
+            if (NetEventSource.IsEnabled) Trace($"Received {bytesRead} bytes.");
+            if (bytesRead == 0)
+            {
+                throw new IOException(SR.net_http_invalid_response);
+            }
+
+            _readLength += bytesRead;
+        }
+
+        // Throws IOException on EOF.  This is only called when we expect more data.
         private async Task FillAsync()
         {
             Debug.Assert(_readAheadTask == null);
@@ -1341,7 +1433,7 @@ namespace System.Net.Http
                 Debug.Assert(_readOffset == 0);
                 Debug.Assert(_readLength == _readBuffer.Length);
 
-                byte[] newReadBuffer = new byte[_readBuffer.Length * 2];
+                var newReadBuffer = new byte[_readBuffer.Length * 2];
                 Buffer.BlockCopy(_readBuffer, 0, newReadBuffer, 0, remaining);
                 _readBuffer = newReadBuffer;
                 _readOffset = 0;
@@ -1367,6 +1459,34 @@ namespace System.Net.Http
             _readOffset += buffer.Length;
         }
 
+        private int Read(Span<byte> destination)
+        {
+            // This is called when reading the response body.
+
+            int remaining = _readLength - _readOffset;
+            if (remaining > 0)
+            {
+                // We have data in the read buffer.  Return it to the caller.
+                if (destination.Length <= remaining)
+                {
+                    ReadFromBuffer(destination);
+                    return destination.Length;
+                }
+                else
+                {
+                    ReadFromBuffer(destination.Slice(0, remaining));
+                    return remaining;
+                }
+            }
+
+            // No data in read buffer. 
+            // Do an unbuffered read directly against the underlying stream.
+            Debug.Assert(_readAheadTask == null, "Read ahead task should have been consumed as part of the headers.");
+            int count = _stream.Read(destination);
+            if (NetEventSource.IsEnabled) Trace($"Received {count} bytes.");
+            return count;
+        }
+
         private async ValueTask<int> ReadAsync(Memory<byte> destination)
         {
             // This is called when reading the response body.
@@ -1395,6 +1515,43 @@ namespace System.Net.Http
             return count;
         }
 
+        private int ReadBuffered(Span<byte> destination)
+        {
+            // This is called when reading the response body.
+            Debug.Assert(destination.Length != 0);
+
+            int remaining = _readLength - _readOffset;
+            if (remaining > 0)
+            {
+                // We have data in the read buffer.  Return it to the caller.
+                if (destination.Length <= remaining)
+                {
+                    ReadFromBuffer(destination);
+                    return destination.Length;
+                }
+                else
+                {
+                    ReadFromBuffer(destination.Slice(0, remaining));
+                    return remaining;
+                }
+            }
+
+            // No data in read buffer. 
+            _readOffset = _readLength = 0;
+
+            // Do a buffered read directly against the underlying stream.
+            Debug.Assert(_readAheadTask == null, "Read ahead task should have been consumed as part of the headers.");
+            int bytesRead = _stream.Read(_readBuffer, 0, _readBuffer.Length);
+            if (NetEventSource.IsEnabled) Trace($"Received {bytesRead} bytes.");
+            _readLength = bytesRead;
+
+            // Hand back as much data as we can fit.
+            int bytesToCopy = Math.Min(bytesRead, destination.Length);
+            _readBuffer.AsSpan(0, bytesToCopy).CopyTo(destination);
+            _readOffset = bytesToCopy;
+            return bytesToCopy;
+        }
+
         private ValueTask<int> ReadBufferedAsync(Memory<byte> destination)
         {
             // If the caller provided buffer, and thus the amount of data desired to be read,
index 080ff78..b7527be 100644 (file)
@@ -3,7 +3,6 @@
 // See the LICENSE file in the project root for more information.
 
 using System.Diagnostics;
-using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -22,9 +21,9 @@ namespace System.Net.Http
             public sealed override bool CanRead => true;
             public sealed override bool CanWrite => false;
 
-            public sealed override ValueTask WriteAsync(ReadOnlyMemory<byte> destination, CancellationToken cancellationToken) => throw new NotSupportedException();
+            public sealed override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException(SR.net_http_content_readonly_stream);
 
-            public override Task FlushAsync(CancellationToken cancellationToken) => throw new NotSupportedException();
+            public sealed override ValueTask WriteAsync(ReadOnlyMemory<byte> destination, CancellationToken cancellationToken) => throw new NotSupportedException();
 
             public virtual bool NeedsDrain => false;
 
index f3a0f01..b1b40ea 100644 (file)
@@ -4,7 +4,7 @@
 
 namespace System.Net.Http
 {
-    internal abstract class HttpContentStream : BaseAsyncStream
+    internal abstract class HttpContentStream : HttpBaseStream
     {
         protected HttpConnection _connection;
 
index 9dd513f..6802a07 100644 (file)
@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information.
 
 using System.Diagnostics;
+using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -18,12 +19,19 @@ namespace System.Net.Http
             public sealed override bool CanRead => false;
             public sealed override bool CanWrite => true;
 
+            public sealed override void Flush() =>
+                _connection.Flush();
+
             public sealed override Task FlushAsync(CancellationToken ignored) =>
                 _connection.FlushAsync().AsTask();
 
-            public abstract Task FinishAsync();
+            public sealed override int Read(Span<byte> buffer) => throw new NotSupportedException();
 
             public sealed override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken) => throw new NotSupportedException();
+
+            public sealed override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => throw new NotSupportedException();
+
+            public abstract Task FinishAsync();
         }
     }
 }
index 52acc7f..81d1dfc 100644 (file)
@@ -20,6 +20,25 @@ namespace System.Net.Http
             public sealed override bool CanRead => true;
             public sealed override bool CanWrite => true;
 
+            public override int Read(Span<byte> buffer)
+            {
+                if (_connection == null || buffer.Length == 0)
+                {
+                    // Response body fully consumed or the caller didn't ask for any data
+                    return 0;
+                }
+
+                int bytesRead = _connection.ReadBuffered(buffer);
+                if (bytesRead == 0)
+                {
+                    // We cannot reuse this connection, so close it.
+                    _connection.Dispose();
+                    _connection = null;
+                }
+
+                return bytesRead;
+            }
+
             public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
             {
                 CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
@@ -61,7 +80,6 @@ namespace System.Net.Http
                     // We cannot reuse this connection, so close it.
                     _connection.Dispose();
                     _connection = null;
-                    return 0;
                 }
 
                 return bytesRead;
@@ -124,6 +142,25 @@ namespace System.Net.Http
                 _connection = null;
             }
 
+            public override void Write(byte[] buffer, int offset, int count)
+            {
+                ValidateBufferArgs(buffer, offset, count);
+                Write(buffer.AsSpan(offset, count));
+            }
+
+            public override void Write(ReadOnlySpan<byte> buffer)
+            {
+                if (_connection == null)
+                {
+                    throw new IOException(SR.net_http_io_write);
+                }
+
+                if (buffer.Length != 0)
+                {
+                    _connection.WriteWithoutBuffering(buffer);
+                }
+            }
+
             public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
             {
                 if (cancellationToken.IsCancellationRequested)
@@ -147,6 +184,8 @@ namespace System.Net.Http
                     new ValueTask(WaitWithConnectionCancellationAsync(writeTask, cancellationToken));
             }
 
+            public override void Flush() => _connection?.Flush();
+
             public override Task FlushAsync(CancellationToken cancellationToken)
             {
                 if (cancellationToken.IsCancellationRequested)
index 57e8a22..b665177 100644 (file)
@@ -27,6 +27,8 @@ namespace System.Net.Http.Functional.Tests
         [InlineData(3)]
         [InlineData(4)]
         [InlineData(5)]
+        [InlineData(6)]
+        [InlineData(7)]
         public async Task GetStreamAsync_ReadToEnd_Success(int readMode)
         {
             using (HttpClient client = CreateHttpClient())
@@ -82,6 +84,22 @@ namespace System.Net.Http.Functional.Tests
                             break;
 
                         case 5:
+                            // ReadByte
+                            int byteValue;
+                            while ((byteValue = stream.ReadByte()) != -1)
+                            {
+                                ms.WriteByte((byte)byteValue);
+                            }
+                            responseBody = Encoding.UTF8.GetString(ms.ToArray());
+                            break;
+
+                        case 6:
+                            // CopyTo
+                            stream.CopyTo(ms);
+                            responseBody = Encoding.UTF8.GetString(ms.ToArray());
+                            break;
+
+                        case 7:
                             // CopyToAsync
                             await stream.CopyToAsync(ms);
                             responseBody = Encoding.UTF8.GetString(ms.ToArray());