<Compile Include="System\IO\Pipelines\FlushResult.cs" />
<Compile Include="System\IO\Pipelines\InlineScheduler.cs" />
<Compile Include="System\IO\Pipelines\IDuplexPipe.cs" />
+ <Compile Include="System\IO\Pipelines\BufferSegmentStack.cs" />
<Compile Include="System\IO\Pipelines\Pipe.DefaultPipeReader.cs" />
<Compile Include="System\IO\Pipelines\Pipe.DefaultPipeWriter.cs" />
<Compile Include="System\IO\Pipelines\Pipe.cs" />
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Text;
+
+namespace System.IO.Pipelines
+{
+ internal struct BufferSegmentStack
+ {
+ private BufferSegment[] _array;
+ private int _size;
+
+ public BufferSegmentStack(int size)
+ {
+ _array = new BufferSegment[size];
+ _size = 0;
+ }
+
+ public int Count => _size;
+
+ public bool TryPop(out BufferSegment result)
+ {
+ int size = _size - 1;
+ BufferSegment[] array = _array;
+
+ if ((uint)size >= (uint)array.Length)
+ {
+ result = default;
+ return false;
+ }
+
+ _size = size;
+ result = array[size];
+ array[size] = default;
+ return true;
+ }
+
+ // Pushes an item to the top of the stack.
+ public void Push(BufferSegment item)
+ {
+ int size = _size;
+ BufferSegment[] array = _array;
+
+ if ((uint)size < (uint)array.Length)
+ {
+ array[size] = item;
+ _size = size + 1;
+ }
+ else
+ {
+ PushWithResize(item);
+ }
+ }
+
+ // Non-inline from Stack.Push to improve its code quality as uncommon path
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ private void PushWithResize(BufferSegment item)
+ {
+ Array.Resize(ref _array, 2 * _array.Length);
+ _array[_size] = item;
+ _size++;
+ }
+ }
+}
public sealed partial class Pipe
{
internal const int InitialSegmentPoolSize = 16; // 65K
- internal const int MaxPoolSize = 256; // 1MB
+ internal const int MaxSegmentPoolSize = 256; // 1MB
private static readonly Action<object> s_signalReaderAwaitable = state => ((Pipe)state).ReaderCancellationRequested();
private static readonly Action<object> s_signalWriterAwaitable = state => ((Pipe)state).WriterCancellationRequested();
private readonly PipeScheduler _readerScheduler;
private readonly PipeScheduler _writerScheduler;
- private readonly Stack<BufferSegment> _bufferSegmentPool;
+ // Mutable struct! Don't make this readonly
+ private BufferSegmentStack _bufferSegmentPool;
private readonly DefaultPipeReader _reader;
private readonly DefaultPipeWriter _writer;
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options);
}
- _bufferSegmentPool = new Stack<BufferSegment>(InitialSegmentPoolSize);
+ _bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
_operationState = default;
_readerCompletion = default;
private BufferSegment CreateSegmentUnsynchronized()
{
- if (_bufferSegmentPool.Count > 0)
+ if (_bufferSegmentPool.TryPop(out BufferSegment segment))
{
- return _bufferSegmentPool.Pop();
+ return segment;
}
return new BufferSegment();
Debug.Assert(segment != _readTail, "Returning _readTail segment that's in use!");
Debug.Assert(segment != _writingHead, "Returning _writingHead segment that's in use!");
- if (_bufferSegmentPool.Count < MaxPoolSize)
+ if (_bufferSegmentPool.Count < MaxSegmentPoolSize)
{
_bufferSegmentPool.Push(segment);
}
}
}
+ [Fact]
+ public async Task BufferSegmentsAreReused()
+ {
+ _pipe.Writer.WriteEmpty(_pool.MaxBufferSize);
+ await _pipe.Writer.FlushAsync();
+
+ ReadResult result = await _pipe.Reader.ReadAsync();
+ object oldSegment = result.Buffer.End.GetObject();
+
+ // This should return the first segment
+ _pipe.Reader.AdvanceTo(result.Buffer.End);
+
+ // One block remaining
+ Assert.Equal(0, _pipe.Length);
+
+ // This should use the segment that was returned
+ _pipe.Writer.WriteEmpty(_pool.MaxBufferSize);
+ await _pipe.Writer.FlushAsync();
+
+ result = await _pipe.Reader.ReadAsync();
+ object newSegment = result.Buffer.End.GetObject();
+ _pipe.Reader.AdvanceTo(result.Buffer.End);
+
+ Assert.Same(oldSegment, newSegment);
+
+ Assert.Equal(0, _pipe.Length);
+ }
+
[Fact]
public async Task PooledSegmentsDontAffectLastExaminedSegment()
{