Don't swallow OCE unconditionally (dotnet/corefx#38473)
authorDavid Fowler <davidfowl@gmail.com>
Wed, 12 Jun 2019 16:09:54 +0000 (09:09 -0700)
committerGitHub <noreply@github.com>
Wed, 12 Jun 2019 16:09:54 +0000 (09:09 -0700)
- Today we swallow OperationCanceledExceptions to avoid throwing if CancelPendingRead/CancelPendingFlush, this unintentionally swallows all exceptions that derive from OperationCancelled that were triggered by the ReadAsync call itself. This change rethrows the error unless we're in that specific case of having called one of those Cancel* methods.
- Added tests and did some other small test cleanup.

Commit migrated from https://github.com/dotnet/corefx/commit/b3d3267abca4ae5bd8d4542a02c08a3173ae1ab9

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs
src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs
src/libraries/System.IO.Pipelines/tests/StreamPipeWriterTests.cs

index 2c95245..58e3e5c 100644 (file)
@@ -241,12 +241,16 @@ namespace System.IO.Pipelines
                 {
                     ClearCancellationToken();
 
-                    if (cancellationToken.IsCancellationRequested)
+                    if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
+                    {
+                        // Catch cancellation and translate it into setting isCanceled = true
+                        isCanceled = true;
+                    }
+                    else
                     {
                         throw;
                     }
 
-                    isCanceled = true;
                 }
 
                 return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted);
index fcba542..474cb00 100644 (file)
@@ -306,13 +306,13 @@ namespace System.IO.Pipelines
                         _internalTokenSource = null;
                     }
 
-                    if (cancellationToken.IsCancellationRequested)
+                    if (localToken.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
                     {
-                        throw;
+                        // Catch cancellation and translate it into setting isCanceled = true
+                        return new FlushResult(isCanceled: true, isCompleted: false);
                     }
 
-                    // Catch any cancellation and translate it into setting isCanceled = true
-                    return new FlushResult(isCanceled: true, isCompleted: false);
+                    throw;
                 }
             }
         }
index 12c5631..f60680c 100644 (file)
@@ -224,7 +224,7 @@ namespace System.IO.Pipelines.Tests
         [Fact]
         public async Task ThrowsOnReadAfterCompleteReader()
         {
-            var reader = PipeReader.Create(Stream.Null);
+            PipeReader reader = PipeReader.Create(Stream.Null);
 
             reader.Complete();
             await Assert.ThrowsAsync<InvalidOperationException>(async () => await reader.ReadAsync());
@@ -233,7 +233,7 @@ namespace System.IO.Pipelines.Tests
         [Fact]
         public void TryReadAfterCancelPendingReadReturnsTrue()
         {
-            var reader = PipeReader.Create(Stream.Null);
+            PipeReader reader = PipeReader.Create(Stream.Null);
 
             reader.CancelPendingRead();
 
@@ -482,7 +482,7 @@ namespace System.IO.Pipelines.Tests
         }
 
         [Fact]
-        public void OnWriterCompletedThrowsNotSupportedException()
+        public void OnWriterCompletedNoops()
         {
             bool fired = false;
             PipeReader reader = PipeReader.Create(Stream.Null);
@@ -534,13 +534,21 @@ namespace System.IO.Pipelines.Tests
         public void LeaveUnderlyingStreamOpen()
         {
             var stream = new MemoryStream();
-            var reader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true));
+            PipeReader reader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true));
 
             reader.Complete();
 
             Assert.True(stream.CanRead);
         }
 
+        [Fact]
+        public async Task OperationCancelledExceptionNotSwallowedIfNotThrownFromSpecifiedToken()
+        {
+            PipeReader reader = PipeReader.Create(new ThrowsOperationCanceledExceptionStream());
+
+            await Assert.ThrowsAsync<OperationCanceledException>(async () => await reader.ReadAsync());
+        }
+
         private static async Task<string> ReadFromPipeAsString(PipeReader reader)
         {
             ReadResult readResult = await reader.ReadAsync();
@@ -568,6 +576,25 @@ namespace System.IO.Pipelines.Tests
             return new object[] { bytesInBuffer, bufferSize, minimumReadSize, readSizes };
         }
 
+        private class ThrowsOperationCanceledExceptionStream : ReadOnlyStream
+        {
+            public override int Read(byte[] buffer, int offset, int count)
+            {
+                throw new OperationCanceledException();
+            }
+
+            public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+            {
+                throw new OperationCanceledException();
+            }
+#if netcoreapp
+            public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+            {
+                throw new OperationCanceledException();
+            }
+#endif
+        }
+
         private class ThrowAfterZeroByteReadStream : MemoryStream
         {
             public ThrowAfterZeroByteReadStream()
index a5488ca..5b527c0 100644 (file)
@@ -451,7 +451,7 @@ namespace System.IO.Pipelines.Tests
         }
 
         [Fact]
-        public void OnReaderCompletedThrowsNotSupported()
+        public void OnReaderCompletedNoops()
         {
             bool fired = false;
             PipeWriter writer = PipeWriter.Create(Stream.Null);
@@ -464,13 +464,47 @@ namespace System.IO.Pipelines.Tests
         public void LeaveUnderlyingStreamOpen()
         {
             var stream = new MemoryStream();
-            var writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true));
+            PipeWriter writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true));
 
             writer.Complete();
 
             Assert.True(stream.CanRead);
         }
 
+        [Fact]
+        public async Task OperationCancelledExceptionNotSwallowedIfNotThrownFromSpecifiedToken()
+        {
+            PipeWriter writer = PipeWriter.Create(new ThrowsOperationCanceledExceptionStream());
+
+            await Assert.ThrowsAsync<OperationCanceledException>(async () => await writer.WriteAsync(new byte[1]));
+            await Assert.ThrowsAsync<OperationCanceledException>(async () => await writer.FlushAsync());
+        }
+
+        private class ThrowsOperationCanceledExceptionStream : WriteOnlyStream
+        {
+            public override void Write(byte[] buffer, int offset, int count)
+            {
+                throw new OperationCanceledException();
+            }
+
+            public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+            {
+                throw new OperationCanceledException();
+            }
+
+            public override Task FlushAsync(CancellationToken cancellationToken)
+            {
+                throw new OperationCanceledException();
+            }
+
+#if netcoreapp
+            public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+            {
+                throw new OperationCanceledException();
+            }
+#endif
+        }
+
         private class FlushAsyncAwareStream : WriteOnlyStream
         {
             public bool FlushAsyncCalled { get; set; }