[Pipelines] Noop Writes when Reader completed (#40905)
authorBrennan <brecon@microsoft.com>
Thu, 20 Aug 2020 02:42:27 +0000 (19:42 -0700)
committerGitHub <noreply@github.com>
Thu, 20 Aug 2020 02:42:27 +0000 (19:42 -0700)
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs

index a47a480..5637dd9 100644 (file)
@@ -319,6 +319,12 @@ namespace System.IO.Pipelines
                     ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
                 }
 
+                // If the reader is completed we no-op Advance but leave GetMemory and FlushAsync alone
+                if (_readerCompletion.IsCompleted)
+                {
+                    return;
+                }
+
                 AdvanceCore(bytes);
             }
         }
@@ -954,6 +960,11 @@ namespace System.IO.Pipelines
                 ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
             }
 
+            if (_readerCompletion.IsCompleted)
+            {
+                return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
+            }
+
             CompletionData completionData;
             ValueTask<FlushResult> result;
 
index 2ab7e7b..e6a64d4 100644 (file)
@@ -256,5 +256,40 @@ namespace System.IO.Pipelines.Tests
 
             await task;
         }
+
+        [Fact]
+        public async Task WriteAsyncWithACompletedReaderNoops()
+        {
+            var pool = new DisposeTrackingBufferPool();
+            var pipe = new Pipe(new PipeOptions(pool));
+            pipe.Reader.Complete();
+
+            byte[] writeBuffer = new byte[100];
+            for (var i = 0; i < 10000; i++)
+            {
+                await pipe.Writer.WriteAsync(writeBuffer);
+            }
+
+            Assert.Equal(0, pool.CurrentlyRentedBlocks);
+        }
+
+        [Fact]
+        public async Task GetMemoryFlushWithACompletedReaderNoops()
+        {
+            var pool = new DisposeTrackingBufferPool();
+            var pipe = new Pipe(new PipeOptions(pool));
+            pipe.Reader.Complete();
+
+            for (var i = 0; i < 10000; i++)
+            {
+                var mem = pipe.Writer.GetMemory();
+                pipe.Writer.Advance(mem.Length);
+                await pipe.Writer.FlushAsync(default);
+            }
+
+            Assert.Equal(1, pool.CurrentlyRentedBlocks);
+            pipe.Writer.Complete();
+            Assert.Equal(0, pool.CurrentlyRentedBlocks);
+        }
     }
 }