try
{
BufferSegment? segment = _readHead;
+ int segmentIndex = _readIndex;
+
try
{
while (segment != null)
{
- FlushResult flushResult = await destination.WriteAsync(segment.Memory, tokenSource.Token).ConfigureAwait(false);
+ FlushResult flushResult = await destination.WriteAsync(segment.Memory.Slice(segmentIndex), tokenSource.Token).ConfigureAwait(false);
if (flushResult.IsCanceled)
{
}
segment = segment.NextSegment;
+ segmentIndex = 0;
if (flushResult.IsCompleted)
{
try
{
BufferSegment? segment = _readHead;
+ int segmentIndex = _readIndex;
+
try
{
while (segment != null)
{
- await destination.WriteAsync(segment.Memory, tokenSource.Token).ConfigureAwait(false);
+ await destination.WriteAsync(segment.Memory.Slice(segmentIndex), tokenSource.Token).ConfigureAwait(false);
segment = segment.NextSegment;
+ segmentIndex = 0;
}
}
finally
Assert.True(startPosition.Equals(wrappedPipeReader.LastConsumed));
Assert.True(startPosition.Equals(wrappedPipeReader.LastExamined));
}
+
+ [Fact]
+ public async Task CopyToAsyncStreamCopiesRemainderAfterReadingSome()
+ {
+ var buffer = Encoding.UTF8.GetBytes("Hello World");
+ await Pipe.Writer.WriteAsync(buffer);
+ Pipe.Writer.Complete();
+
+ var result = await PipeReader.ReadAsync();
+ Assert.Equal(result.Buffer.ToArray(), buffer);
+ // Consume Hello
+ PipeReader.AdvanceTo(result.Buffer.GetPosition(5));
+
+ var ms = new MemoryStream();
+ await PipeReader.CopyToAsync(ms);
+
+ Assert.Equal(buffer.AsMemory(5).ToArray(), ms.ToArray());
+ }
+
+ [Fact]
+ public async Task CopyToAsyncPipeWriterCopiesRemainderAfterReadingSome()
+ {
+ var buffer = Encoding.UTF8.GetBytes("Hello World");
+ await Pipe.Writer.WriteAsync(buffer);
+ Pipe.Writer.Complete();
+
+ var result = await PipeReader.ReadAsync();
+ Assert.Equal(result.Buffer.ToArray(), buffer);
+ // Consume Hello
+ PipeReader.AdvanceTo(result.Buffer.GetPosition(5));
+
+ var ms = new MemoryStream();
+ await PipeReader.CopyToAsync(PipeWriter.Create(ms));
+
+ Assert.Equal(buffer.AsMemory(5).ToArray(), ms.ToArray());
+ }
}
}