Override BufferedStream.Read/WriteAsync(Memory)
authorStephen Toub <stoub@microsoft.com>
Tue, 12 Sep 2017 20:57:38 +0000 (13:57 -0700)
committerStephen Toub <stoub@microsoft.com>
Thu, 14 Sep 2017 04:51:07 +0000 (21:51 -0700)
Commit migrated from https://github.com/dotnet/corefx/commit/d8939807cd72df458665503716a832dc79b8a338

src/libraries/System.IO/tests/BufferedStream/BufferedStreamTests.cs
src/libraries/System.IO/tests/BufferedStream/BufferedStreamTests.netcoreapp.cs
src/libraries/System.Runtime.Extensions/src/System/IO/BufferedStream.cs

index 259f82b..17d4f2d 100644 (file)
@@ -29,7 +29,7 @@ namespace System.IO.Tests
             {
                 tasks[i] = stream.WriteAsync(data, 250 * i, 250);
             }
-            Assert.False(tasks.All(t => t.IsCompleted));
+            Assert.All(tasks, t => Assert.Equal(TaskStatus.WaitingForActivation, t.Status));
 
             mcaos.Release();
             await Task.WhenAll(tasks);
@@ -264,13 +264,22 @@ namespace System.IO.Tests
         }
     }
 
-    internal sealed class ManuallyReleaseAsyncOperationsStream : MemoryStream
+    internal sealed class ManuallyReleaseAsyncOperationsStream : Stream
     {
+        private readonly MemoryStream _stream = new MemoryStream();
         private readonly TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
         private bool _canSeek = true;
 
         public override bool CanSeek => _canSeek;
 
+        public override bool CanRead => _stream.CanRead;
+
+        public override bool CanWrite => _stream.CanWrite;
+
+        public override long Length => _stream.Length;
+
+        public override long Position { get => _stream.Position; set => _stream.Position = value; }
+
         public void SetCanSeek(bool canSeek) => _canSeek = canSeek;
 
         public void Release() { _tcs.SetResult(true); }
@@ -278,38 +287,44 @@ namespace System.IO.Tests
         public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
         {
             await _tcs.Task;
-            return await base.ReadAsync(buffer, offset, count, cancellationToken);
+            return await _stream.ReadAsync(buffer, offset, count, cancellationToken);
         }
 
         public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
         {
             await _tcs.Task;
-            await base.WriteAsync(buffer, offset, count, cancellationToken);
+            await _stream.WriteAsync(buffer, offset, count, cancellationToken);
         }
 
         public override async Task FlushAsync(CancellationToken cancellationToken)
         {
             await _tcs.Task;
-            await base.FlushAsync(cancellationToken);
+            await _stream.FlushAsync(cancellationToken);
         }
+
+        public override void Flush() => _stream.Flush();
+        public override int Read(byte[] buffer, int offset, int count) => _stream.Read(buffer, offset, count);
+        public override long Seek(long offset, SeekOrigin origin) => _stream.Seek(offset, origin);
+        public override void SetLength(long value) => _stream.SetLength(value);
+        public override void Write(byte[] buffer, int offset, int count) => _stream.Write(buffer, offset, count);
     }
 
     internal sealed class ThrowsExceptionFromAsyncOperationsStream : MemoryStream
     {
-        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
-        {
+        public override int Read(byte[] buffer, int offset, int count) =>
             throw new InvalidOperationException("Exception from ReadAsync");
-        }
 
-        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
-        {
+        public override void Write(byte[] buffer, int offset, int count) =>
+            throw new InvalidOperationException("Exception from ReadAsync");
+
+        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
+            throw new InvalidOperationException("Exception from ReadAsync");
+
+        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
             throw new InvalidOperationException("Exception from WriteAsync");
-        }
 
-        public override Task FlushAsync(CancellationToken cancellationToken)
-        {
+        public override Task FlushAsync(CancellationToken cancellationToken) =>
             throw new InvalidOperationException("Exception from FlushAsync");
-        }
     }
 
     public class BufferedStream_NS17
index 53111e8..7ead9aa 100644 (file)
@@ -2,7 +2,8 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
 using Xunit;
 
 namespace System.IO.Tests
@@ -49,5 +50,41 @@ namespace System.IO.Tests
             }
             Assert.Equal(data, result.ToArray());
         }
+
+        [Theory]
+        [InlineData(1, 1)]
+        [InlineData(1, 2)]
+        [InlineData(1024, 4096)]
+        [InlineData(4096, 4097)]
+        [InlineData(4096, 1)]
+        [InlineData(2047, 4096)]
+        public async Task ReadMemory_WriteMemory_AllDataCopied(int spanSize, int bufferSize)
+        {
+            byte[] data = new byte[80000];
+            new Random(42).NextBytes(data);
+
+            var result = new MemoryStream();
+            using (var output = new BufferedStream(result, bufferSize))
+            using (var input = new BufferedStream(new MemoryStream(data), bufferSize))
+            {
+                Memory<byte> memory = new byte[spanSize];
+                int bytesRead;
+                while ((bytesRead = await input.ReadAsync(memory)) != 0)
+                {
+                    await output.WriteAsync(memory.Slice(0, bytesRead));
+                }
+            }
+            Assert.Equal(data, result.ToArray());
+        }
+
+        [Fact]
+        public void ReadWriteMemory_Precanceled_Throws()
+        {
+            using (var bs = new BufferedStream(new MemoryStream()))
+            {
+                Assert.Equal(TaskStatus.Canceled, bs.ReadAsync(new byte[1], new CancellationToken(true)).AsTask().Status);
+                Assert.Equal(TaskStatus.Canceled, bs.WriteAsync(new byte[1], new CancellationToken(true)).Status);
+            }
+        }
     }
 }
index f038e80..212f821 100644 (file)
@@ -589,7 +589,6 @@ namespace System.IO
 
         public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
         {
-
             if (buffer == null)
                 throw new ArgumentNullException(nameof(buffer), SR.ArgumentNull_Buffer);
             if (offset < 0)
@@ -645,26 +644,58 @@ namespace System.IO
             }
 
             // Delegate to the async implementation.
-            return ReadFromUnderlyingStreamAsync(buffer, offset + bytesFromBuffer, count - bytesFromBuffer, cancellationToken,
-                                                 bytesFromBuffer, semaphoreLockTask);
+            return ReadFromUnderlyingStreamAsync(
+                new Memory<byte>(buffer, offset + bytesFromBuffer, count - bytesFromBuffer),
+                cancellationToken, bytesFromBuffer, semaphoreLockTask).AsTask();
+        }
+
+        public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+            }
+
+            EnsureNotClosed();
+            EnsureCanRead();
+
+            int bytesFromBuffer = 0;
+            SemaphoreSlim sem = LazyEnsureAsyncActiveSemaphoreInitialized();
+            Task semaphoreLockTask = sem.WaitAsync();
+            if (semaphoreLockTask.IsCompletedSuccessfully)
+            {
+                bool completeSynchronously = true;
+                try
+                {
+                    bytesFromBuffer = ReadFromBuffer(destination.Span);
+                    completeSynchronously = bytesFromBuffer == destination.Length;
+                    if (completeSynchronously)
+                    {
+                        // If we satisfied enough data from the buffer, we can complete synchronously.
+                        return new ValueTask<int>(bytesFromBuffer);
+                    }
+                }
+                finally
+                {
+                    if (completeSynchronously)  // if this is FALSE, we will be entering ReadFromUnderlyingStreamAsync and releasing there.
+                    {
+                        sem.Release();
+                    }
+                }
+            }
+
+            // Delegate to the async implementation.
+            return ReadFromUnderlyingStreamAsync(destination.Slice(bytesFromBuffer), cancellationToken, bytesFromBuffer, semaphoreLockTask);
         }
 
         /// <summary>BufferedStream should be as thin a wrapper as possible. We want ReadAsync to delegate to
         /// ReadAsync of the underlying _stream rather than calling the base Stream which implements the one in terms of the other.
         /// This allows BufferedStream to affect the semantics of the stream it wraps as little as possible. </summary>
         /// <returns>-2 if _bufferSize was set to 0 while waiting on the semaphore; otherwise num of bytes read.</returns>
-        private async Task<int> ReadFromUnderlyingStreamAsync(byte[] array, int offset, int count,
-                                                                CancellationToken cancellationToken,
-                                                                int bytesAlreadySatisfied,
-                                                                Task semaphoreLockTask)
+        private async ValueTask<int> ReadFromUnderlyingStreamAsync(
+            Memory<byte> buffer, CancellationToken cancellationToken, int bytesAlreadySatisfied, Task semaphoreLockTask)
         {
-
             // Same conditions validated with exceptions in ReadAsync:
-            // (These should be Debug.Requires(..) but that method had some issues in async methods; using Assert(..) for now.)
-            Debug.Assert(array != null);
-            Debug.Assert(offset >= 0);
-            Debug.Assert(count >= 0);
-            Debug.Assert(array.Length - offset >= count);
             Debug.Assert(_stream != null);
             Debug.Assert(_stream.CanRead);
             Debug.Assert(_bufferSize > 0);
@@ -674,17 +705,17 @@ namespace System.IO
             await semaphoreLockTask.ConfigureAwait(false);
             try
             {
-
                 // The buffer might have been changed by another async task while we were waiting on the semaphore.
                 // Check it now again.            
-                int bytesFromBuffer = ReadFromBuffer(array, offset, count);
-                if (bytesFromBuffer == count)
+                int bytesFromBuffer = ReadFromBuffer(buffer.Span);
+                if (bytesFromBuffer == buffer.Length)
+                {
                     return bytesAlreadySatisfied + bytesFromBuffer;
+                }
 
                 if (bytesFromBuffer > 0)
                 {
-                    count -= bytesFromBuffer;
-                    offset += bytesFromBuffer;
+                    buffer = buffer.Slice(bytesFromBuffer);
                     bytesAlreadySatisfied += bytesFromBuffer;
                 }
 
@@ -693,21 +724,22 @@ namespace System.IO
 
                 // If there was anything in the write buffer, clear it.
                 if (_writePos > 0)
+                {
                     await FlushWriteAsync(cancellationToken).ConfigureAwait(false);  // no Begin-End read version for Flush. Use Async.            
+                }
 
                 // If the requested read is larger than buffer size, avoid the buffer and still use a single read:
-                if (count >= _bufferSize)
+                if (buffer.Length >= _bufferSize)
                 {
-                    return bytesAlreadySatisfied + await _stream.ReadAsync(array, offset, count, cancellationToken).ConfigureAwait(false);
+                    return bytesAlreadySatisfied + await _stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
                 }
 
                 // Ok. We can fill the buffer:
                 EnsureBufferAllocated();
                 _readLen = await _stream.ReadAsync(_buffer, 0, _bufferSize, cancellationToken).ConfigureAwait(false);
 
-                bytesFromBuffer = ReadFromBuffer(array, offset, count);
+                bytesFromBuffer = ReadFromBuffer(buffer.Span);
                 return bytesAlreadySatisfied + bytesFromBuffer;
-
             }
             finally
             {
@@ -769,7 +801,7 @@ namespace System.IO
             offset += bytesToWrite;
         }
 
-        private void WriteToBuffer(ref ReadOnlySpan<byte> source)
+        private int WriteToBuffer(ReadOnlySpan<byte> source)
         {
             int bytesToWrite = Math.Min(_bufferSize - _writePos, source.Length);
             if (bytesToWrite > 0)
@@ -777,8 +809,8 @@ namespace System.IO
                 EnsureBufferAllocated();
                 source.Slice(0, bytesToWrite).CopyTo(new Span<byte>(_buffer, _writePos, bytesToWrite));
                 _writePos += bytesToWrite;
-                source = source.Slice(bytesToWrite);
             }
+            return bytesToWrite;
         }
 
         private void WriteToBuffer(byte[] array, ref int offset, ref int count, out Exception error)
@@ -939,7 +971,7 @@ namespace System.IO
             {
                 ClearReadBufferBeforeWrite();
             }
-            Debug.Assert(_writePos < _bufferSize);
+            Debug.Assert(_writePos < _bufferSize, $"Expected {_writePos} < {_bufferSize}");
 
             int totalUserbytes;
             bool useBuffer;
@@ -954,12 +986,13 @@ namespace System.IO
             {
                 // Copy as much data to the buffer as will fit.  If there's still room in the buffer,
                 // everything must have fit.
-                WriteToBuffer(ref source);
+                int bytesWritten = WriteToBuffer(source);
                 if (_writePos < _bufferSize)
                 {
-                    Debug.Assert(source.IsEmpty);
+                    Debug.Assert(bytesWritten == source.Length);
                     return;
                 }
+                source = source.Slice(bytesWritten);
 
                 Debug.Assert(_writePos == _bufferSize);
                 Debug.Assert(_buffer != null);
@@ -969,9 +1002,9 @@ namespace System.IO
                 _writePos = 0;
 
                 // Now write the remainder.  It must fit, as we're only on this path if that's true.
-                WriteToBuffer(ref source);
+                bytesWritten = WriteToBuffer(source);
+                Debug.Assert(bytesWritten == source.Length);
 
-                Debug.Assert(source.IsEmpty);
                 Debug.Assert(_writePos < _bufferSize);
             }
             else // skip the buffer
@@ -1012,16 +1045,21 @@ namespace System.IO
             if (buffer.Length - offset < count)
                 throw new ArgumentException(SR.Argument_InvalidOffLen);
 
+            return WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
+        }
+
+        public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+        {
             // Fast path check for cancellation already requested
             if (cancellationToken.IsCancellationRequested)
+            {
                 return Task.FromCanceled<int>(cancellationToken);
+            }
 
             EnsureNotClosed();
             EnsureCanWrite();
 
-            // Try to satisfy the request from the buffer synchronously. But still need a sem-lock in case that another
-            // Async IO Task accesses the buffer concurrently. If we fail to acquire the lock without waiting, make this 
-            // an Async operation.
+            // Try to satisfy the request from the buffer synchronously.
             SemaphoreSlim sem = LazyEnsureAsyncActiveSemaphoreInitialized();
             Task semaphoreLockTask = sem.WaitAsync();
             if (semaphoreLockTask.IsCompletedSuccessfully)
@@ -1029,25 +1067,20 @@ namespace System.IO
                 bool completeSynchronously = true;
                 try
                 {
-
                     if (_writePos == 0)
+                    {
                         ClearReadBufferBeforeWrite();
+                    }
 
                     Debug.Assert(_writePos < _bufferSize);
 
                     // If the write completely fits into the buffer, we can complete synchronously:
-                    completeSynchronously = (count < _bufferSize - _writePos);
-
+                    completeSynchronously = source.Length < _bufferSize - _writePos;
                     if (completeSynchronously)
                     {
-
-                        Exception error;
-                        WriteToBuffer(buffer, ref offset, ref count, out error);
-                        Debug.Assert(count == 0);
-
-                        return (error == null)
-                                    ? Task.CompletedTask
-                                    : Task.FromException(error);
+                        int bytesWritten = WriteToBuffer(source.Span);
+                        Debug.Assert(bytesWritten == source.Length);
+                        return Task.CompletedTask;
                     }
                 }
                 finally
@@ -1058,23 +1091,17 @@ namespace System.IO
             }
 
             // Delegate to the async implementation.
-            return WriteToUnderlyingStreamAsync(buffer, offset, count, cancellationToken, semaphoreLockTask);
+            return WriteToUnderlyingStreamAsync(source, cancellationToken, semaphoreLockTask);
         }
 
-
         /// <summary>BufferedStream should be as thin a wrapper as possible. We want WriteAsync to delegate to
         /// WriteAsync of the underlying _stream rather than calling the base Stream which implements the one 
         /// in terms of the other. This allows BufferedStream to affect the semantics of the stream it wraps as 
         /// little as possible.
         /// </summary>
-        private async Task WriteToUnderlyingStreamAsync(byte[] array, int offset, int count,
-                                                        CancellationToken cancellationToken,
-                                                        Task semaphoreLockTask)
+        private async Task WriteToUnderlyingStreamAsync(
+            ReadOnlyMemory<byte> source, CancellationToken cancellationToken, Task semaphoreLockTask)
         {
-            Debug.Assert(array != null);
-            Debug.Assert(offset >= 0);
-            Debug.Assert(count >= 0);
-            Debug.Assert(array.Length - offset >= count);
             Debug.Assert(_stream != null);
             Debug.Assert(_stream.CanWrite);
             Debug.Assert(_bufferSize > 0);
@@ -1085,7 +1112,6 @@ namespace System.IO
             await semaphoreLockTask.ConfigureAwait(false);
             try
             {
-
                 // The buffer might have been changed by another async task while we were waiting on the semaphore.
                 // However, note that if we recalculate the sync completion condition to TRUE, then useBuffer will also be TRUE.
 
@@ -1095,37 +1121,37 @@ namespace System.IO
                 int totalUserBytes;
                 bool useBuffer;
                 checked
-                {  // We do not expect buffer sizes big enough for an overflow, but if it happens, lets fail early:
-                    totalUserBytes = _writePos + count;
-                    useBuffer = (totalUserBytes + count < (_bufferSize + _bufferSize));
+                {
+                    // We do not expect buffer sizes big enough for an overflow, but if it happens, lets fail early:
+                    totalUserBytes = _writePos + source.Length;
+                    useBuffer = (totalUserBytes + source.Length < (_bufferSize + _bufferSize));
                 }
 
                 if (useBuffer)
                 {
-                    WriteToBuffer(array, ref offset, ref count);
+                    source = source.Slice(WriteToBuffer(source.Span));
 
                     if (_writePos < _bufferSize)
                     {
-                        Debug.Assert(count == 0);
+                        Debug.Assert(source.Length == 0);
                         return;
                     }
 
-                    Debug.Assert(count >= 0);
+                    Debug.Assert(source.Length >= 0);
                     Debug.Assert(_writePos == _bufferSize);
                     Debug.Assert(_buffer != null);
-
                    
                     await _stream.WriteAsync(_buffer, 0, _writePos, cancellationToken).ConfigureAwait(false);
                     _writePos = 0;
 
-                    WriteToBuffer(array, ref offset, ref count);
+                    int bytesWritten = WriteToBuffer(source.Span);
+                    Debug.Assert(bytesWritten == source.Length);
 
-                    Debug.Assert(count == 0);
                     Debug.Assert(_writePos < _bufferSize);
 
                 }
-                else
-                {  // if (!useBuffer)
+                else // !useBuffer
+                {
                     // Write out the buffer if necessary.
                     if (_writePos > 0)
                     {
@@ -1136,7 +1162,7 @@ namespace System.IO
                         if (totalUserBytes <= (_bufferSize + _bufferSize) && totalUserBytes <= MaxShadowBufferSize)
                         {
                             EnsureShadowBufferAllocated();
-                            Buffer.BlockCopy(array, offset, _buffer, _writePos, count);
+                            source.Span.CopyTo(new Span<byte>(_buffer, _writePos, source.Length));
 
                             await _stream.WriteAsync(_buffer, 0, totalUserBytes, cancellationToken).ConfigureAwait(false);
                             _writePos = 0;
@@ -1148,7 +1174,7 @@ namespace System.IO
                     }
 
                     // Write out user data.
-                    await _stream.WriteAsync(array, offset, count, cancellationToken).ConfigureAwait(false);
+                    await _stream.WriteAsync(source, cancellationToken).ConfigureAwait(false);
                 }
             }
             finally