Make PipeReader/Writer.AsStream().Dispose complete the reader/writer (dotnet/corefx...
authorStephen Toub <stoub@microsoft.com>
Tue, 19 Mar 2019 20:32:51 +0000 (16:32 -0400)
committerGitHub <noreply@github.com>
Tue, 19 Mar 2019 20:32:51 +0000 (16:32 -0400)
Commit migrated from https://github.com/dotnet/corefx/commit/4edea1b6b817a3b7e9116e8cc20ccab55f7d7c9a

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriterStream.cs
src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs
src/libraries/System.IO.Pipelines/tests/PipeWriterStreamTests.nonnetstandard.cs

index 6b449db..b552c6b 100644 (file)
@@ -20,6 +20,12 @@ namespace System.IO.Pipelines
             _pipeReader = pipeReader;
         }
 
+        protected override void Dispose(bool disposing)
+        {
+            _pipeReader.Complete();
+            base.Dispose(disposing);
+        }
+
         public override bool CanRead => true;
 
         public override bool CanSeek => false;
index cdab755..ac69c31 100644 (file)
@@ -18,6 +18,12 @@ namespace System.IO.Pipelines
             _pipeWriter = pipeWriter;
         }
 
+        protected override void Dispose(bool disposing)
+        {
+            _pipeWriter.Complete();
+            base.Dispose(disposing);
+        }
+
         public override bool CanRead => false;
 
         public override bool CanSeek => false;
index a379d72..8043b4d 100644 (file)
@@ -17,6 +17,40 @@ namespace System.IO.Pipelines.Tests
         public delegate Task<int> ReadAsyncDelegate(Stream stream, byte[] data);
 
         [Theory]
+        [InlineData(false)]
+        [InlineData(true)]
+        public async Task DisposingPipeReaderStreamCompletesPipeReader(bool dataInPipe)
+        {
+            var pipe = new Pipe();
+            Stream s = pipe.Reader.AsStream();
+
+            if (dataInPipe)
+            {
+                await pipe.Writer.WriteAsync(new byte[42]);
+                await pipe.Writer.FlushAsync();
+            }
+
+            var readerCompletedTask = new TaskCompletionSource<bool>();
+            pipe.Writer.OnReaderCompleted(delegate { readerCompletedTask.SetResult(true); }, null);
+
+            // Call Dispose{Async} multiple times; all should succeed.
+            for (int i = 0; i < 2; i++)
+            {
+                s.Dispose();
+                await s.DisposeAsync();
+            }
+
+            // Make sure OnReaderCompleted was invoked.
+            await readerCompletedTask.Task;
+
+            // Unable to read after disposing.
+            await Assert.ThrowsAsync<InvalidOperationException>(async () => await s.ReadAsync(new byte[1]));
+
+            // Writes still work.
+            await pipe.Writer.WriteAsync(new byte[1]);
+        }
+
+        [Theory]
         [MemberData(nameof(ReadCalls))]
         public async Task ReadingFromPipeReaderStreamReadsFromUnderlyingPipeReader(ReadAsyncDelegate readAsync)
         {
@@ -209,6 +243,41 @@ namespace System.IO.Pipelines.Tests
             Assert.Same(stream, pipeReader.AsStream());
         }
 
+        [Fact]
+        public async Task PipeWriterStreamProducesToConsumingPipeReaderStream()
+        {
+            var pipe = new Pipe();
+
+            int consumedSum = 0, producedSum = 0;
+            Task consumer = Task.Run(() =>
+            {
+                using (Stream reader = pipe.Reader.AsStream())
+                {
+                    int b;
+                    while ((b = reader.ReadByte()) != -1)
+                    {
+                        consumedSum += b;
+                    }
+
+                    Assert.Equal(-1, reader.ReadByte());
+                }
+            });
+
+            var rand = new Random();
+            using (Stream writer = pipe.Writer.AsStream())
+            {
+                for (int i = 0; i < 1000; i++)
+                {
+                    byte b = (byte)rand.Next(256);
+                    writer.WriteByte(b);
+                    producedSum += b;
+                }
+            }
+
+            await consumer;
+            Assert.Equal(producedSum, consumedSum);
+        }
+
         public class BuggyPipeReader : PipeReader
         {
             public override void AdvanceTo(SequencePosition consumed)
index fa5de13..50094ea 100644 (file)
@@ -16,6 +16,34 @@ namespace System.IO.Pipelines.Tests
     {
         public delegate Task WriteAsyncDelegate(Stream stream, byte[] data);
 
+        [Fact]
+        public async Task DisposingPipeWriterStreamCompletesPipeWriter()
+        {
+            var pipe = new Pipe();
+            Stream s = pipe.Writer.AsStream();
+
+            var writerCompletedTask = new TaskCompletionSource<bool>();
+            pipe.Reader.OnWriterCompleted(delegate { writerCompletedTask.SetResult(true); }, null);
+
+            // Call Dispose{Async} multiple times; all should succeed.
+            for (int i = 0; i < 2; i++)
+            {
+                s.Dispose();
+                await s.DisposeAsync();
+            }
+
+            // Make sure OnWriterCompleted was invoked.
+            await writerCompletedTask.Task;
+
+            // Unable to write after disposing.
+            await Assert.ThrowsAsync<InvalidOperationException>(async () => await s.WriteAsync(new byte[1]));
+
+            // Reads still work and return 0.
+            ReadResult rr = await pipe.Reader.ReadAsync();
+            Assert.True(rr.IsCompleted);
+            Assert.Equal(0, rr.Buffer.Length);
+        }
+
         [Theory]
         [MemberData(nameof(WriteCalls))]
         public async Task WritingToPipeStreamWritesToUnderlyingPipeWriter(WriteAsyncDelegate writeAsync)