[release/6.0-rc1] Fixed StreamPipeReader.CopyToAsync (#57966)
authorgithub-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Tue, 24 Aug 2021 16:17:36 +0000 (10:17 -0600)
committerGitHub <noreply@github.com>
Tue, 24 Aug 2021 16:17:36 +0000 (10:17 -0600)
* Fixed StreamPipeReader.CopyToAsync - Take the segment index into account when copying buffered data. This handles the case where ReadAsync has consumed a partial segment and then the same PipeReader instance is used to copy to a Stream and PipeWriter. - Added tests

* Always slice

Co-authored-by: David Fowler <davidfowl@gmail.com>
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
src/libraries/System.IO.Pipelines/tests/PipeReaderCopyToAsyncTests.cs

index 7b33539..cc457bf 100644 (file)
@@ -383,11 +383,13 @@ namespace System.IO.Pipelines
                 try
                 {
                     BufferSegment? segment = _readHead;
+                    int segmentIndex = _readIndex;
+
                     try
                     {
                         while (segment != null)
                         {
-                            FlushResult flushResult = await destination.WriteAsync(segment.Memory, tokenSource.Token).ConfigureAwait(false);
+                            FlushResult flushResult = await destination.WriteAsync(segment.Memory.Slice(segmentIndex), tokenSource.Token).ConfigureAwait(false);
 
                             if (flushResult.IsCanceled)
                             {
@@ -395,6 +397,7 @@ namespace System.IO.Pipelines
                             }
 
                             segment = segment.NextSegment;
+                            segmentIndex = 0;
 
                             if (flushResult.IsCompleted)
                             {
@@ -451,13 +454,16 @@ namespace System.IO.Pipelines
                 try
                 {
                     BufferSegment? segment = _readHead;
+                    int segmentIndex = _readIndex;
+
                     try
                     {
                         while (segment != null)
                         {
-                            await destination.WriteAsync(segment.Memory, tokenSource.Token).ConfigureAwait(false);
+                            await destination.WriteAsync(segment.Memory.Slice(segmentIndex), tokenSource.Token).ConfigureAwait(false);
 
                             segment = segment.NextSegment;
+                            segmentIndex = 0;
                         }
                     }
                     finally
index e58e5af..adf547a 100644 (file)
@@ -286,5 +286,41 @@ namespace System.IO.Pipelines.Tests
             Assert.True(startPosition.Equals(wrappedPipeReader.LastConsumed));
             Assert.True(startPosition.Equals(wrappedPipeReader.LastExamined));
         }
+
+        [Fact]
+        public async Task CopyToAsyncStreamCopiesRemainderAfterReadingSome()
+        {
+            var buffer = Encoding.UTF8.GetBytes("Hello World");
+            await Pipe.Writer.WriteAsync(buffer);
+            Pipe.Writer.Complete();
+
+            var result = await PipeReader.ReadAsync();
+            Assert.Equal(result.Buffer.ToArray(), buffer);
+            // Consume Hello
+            PipeReader.AdvanceTo(result.Buffer.GetPosition(5));
+
+            var ms = new MemoryStream();
+            await PipeReader.CopyToAsync(ms);
+
+            Assert.Equal(buffer.AsMemory(5).ToArray(), ms.ToArray());
+        }
+
+        [Fact]
+        public async Task CopyToAsyncPipeWriterCopiesRemainderAfterReadingSome()
+        {
+            var buffer = Encoding.UTF8.GetBytes("Hello World");
+            await Pipe.Writer.WriteAsync(buffer);
+            Pipe.Writer.Complete();
+
+            var result = await PipeReader.ReadAsync();
+            Assert.Equal(result.Buffer.ToArray(), buffer);
+            // Consume Hello
+            PipeReader.AdvanceTo(result.Buffer.GetPosition(5));
+
+            var ms = new MemoryStream();
+            await PipeReader.CopyToAsync(PipeWriter.Create(ms));
+
+            Assert.Equal(buffer.AsMemory(5).ToArray(), ms.ToArray());
+        }
     }
 }