-<root>
+<?xml version="1.0" encoding="utf-8"?>
+<root>
<!--
Microsoft ResX Schema
<data name="GetResultBeforeCompleted" xml:space="preserve">
<value>Can't GetResult unless awaiter is completed.</value>
</data>
+ <data name="InvalidExaminedOrConsumedPosition" xml:space="preserve">
+ <value>The examined position must be greater than or equal to the consumed position.</value>
+ </data>
+ <data name="InvalidExaminedPosition" xml:space="preserve">
+ <value>The examined position cannot be less than the previously examined position.</value>
+ </data>
<data name="InvalidZeroByteRead" xml:space="preserve">
<value>The PipeReader returned 0 bytes when the ReadResult was not completed or canceled.</value>
</data>
// Stores the last examined position, used to calculate how many bytes were to release
// for back pressure management
- private BufferSegment _lastExamined;
- private int _lastExaminedIndex;
+ private long _lastExaminedIndex = -1;
// The read head which is the extent of the PipeReader's consumed bytes
private BufferSegment _readHead;
_writerAwaitable = new PipeAwaitable(completed: true, _useSynchronizationContext);
_readTailIndex = 0;
_readHeadIndex = 0;
- _lastExaminedIndex = 0;
+ _lastExaminedIndex = -1;
_currentWriteLength = 0;
_length = 0;
}
BufferSegment newSegment = AllocateSegment(sizeHint);
// Set all the pointers
- _writingHead = _readHead = _readTail = _lastExamined = newSegment;
+ _writingHead = _readHead = _readTail = newSegment;
+ _lastExaminedIndex = 0;
}
else
{
Debug.Assert(segment != _readHead, "Returning _readHead segment that's in use!");
Debug.Assert(segment != _readTail, "Returning _readTail segment that's in use!");
Debug.Assert(segment != _writingHead, "Returning _writingHead segment that's in use!");
- Debug.Assert(segment != _lastExamined, "Returning _lastExamined segment that's in use!");
if (_bufferSegmentPool.Count < MaxPoolSize)
{
private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
{
+ // Throw if examined < consumed
+ if (consumedSegment != null && examinedSegment != null && GetLength(consumedSegment, consumedIndex, examinedSegment, examinedIndex) < 0)
+ {
+ ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
+ }
+
BufferSegment returnStart = null;
BufferSegment returnEnd = null;
examinedEverything = examinedIndex == _readTailIndex;
}
- if (examinedSegment != null && _lastExamined != null)
+ if (examinedSegment != null && _lastExaminedIndex >= 0)
{
- long examinedBytes = GetLength(_lastExamined, _lastExaminedIndex, examinedSegment, examinedIndex);
+ long examinedBytes = GetLength(_lastExaminedIndex, examinedSegment, examinedIndex);
long oldLength = _length;
if (examinedBytes < 0)
{
- ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
+ ThrowHelper.ThrowInvalidOperationException_InvalidExaminedPosition();
}
_length -= examinedBytes;
- _lastExamined = examinedSegment;
- _lastExaminedIndex = examinedIndex;
+ // Store the absolute position
+ _lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex;
Debug.Assert(_length >= 0, "Length has gone negative");
_readHead = nextBlock;
_readHeadIndex = 0;
- // Only update the last examined if the same as consumed
- if (consumedSegment == examinedSegment)
- {
- // The last examined index and the read head should be in sync
- _lastExamined = nextBlock;
- _lastExaminedIndex = 0;
- }
-
// Reset the writing head to null if it's the return block
// then null it out as we're about to reset that memory
if (_writingHead == returnEnd)
return (endSegment.RunningIndex + (uint)endIndex) - (startSegment.RunningIndex + (uint)startIndex);
}
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static long GetLength(long startPosition, BufferSegment endSegment, int endIndex)
+ {
+ return (endSegment.RunningIndex + (uint)endIndex) - startPosition;
+ }
+
internal void CompleteReader(Exception exception)
{
PipeCompletionCallbacks completionCallbacks;
_writingHead = null;
_readHead = null;
_readTail = null;
- _lastExamined = null;
+ _lastExaminedIndex = -1;
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoReadingAllowed() => new InvalidOperationException(SR.ReadingAfterCompleted);
+ public static void ThrowInvalidOperationException_InvalidExaminedPosition() => throw CreateInvalidOperationException_InvalidExaminedPosition();
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ public static Exception CreateInvalidOperationException_InvalidExaminedPosition() => new InvalidOperationException(SR.InvalidExaminedPosition);
+
+ public static void ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition() => throw CreateInvalidOperationException_InvalidExaminedOrConsumedPosition();
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ public static Exception CreateInvalidOperationException_InvalidExaminedOrConsumedPosition() => new InvalidOperationException(SR.InvalidExaminedOrConsumedPosition);
+
public static void ThrowInvalidOperationException_AdvanceToInvalidCursor() => throw CreateInvalidOperationException_AdvanceToInvalidCursor();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_AdvanceToInvalidCursor() => new InvalidOperationException(SR.AdvanceToInvalidCursor);
_pipe.Writer.WriteEmpty(_pool.MaxBufferSize);
await _pipe.Writer.FlushAsync();
+ result = await _pipe.Reader.ReadAsync();
+ _pipe.Reader.AdvanceTo(result.Buffer.Start);
+
+ Assert.Equal(8192, _pipe.Length);
+
+ result = await _pipe.Reader.ReadAsync();
+ _pipe.Reader.AdvanceTo(result.Buffer.End);
+
+ Assert.Equal(0, _pipe.Length);
+ }
+
+ [Fact]
+ public async Task PooledSegmentsDontAffectLastExaminedSegmentEmptyGapWithDifferentBlocks()
+ {
+ _pipe.Writer.WriteEmpty(_pool.MaxBufferSize);
+ _pipe.Writer.WriteEmpty(_pool.MaxBufferSize);
+ await _pipe.Writer.FlushAsync();
+
+ ReadResult result = await _pipe.Reader.ReadAsync();
+ // This gets the end of the first block
+ SequencePosition endOfFirstBlock = result.Buffer.Slice(result.Buffer.Start, _pool.MaxBufferSize).End;
+ // Start of the next block
+ SequencePosition startOfSecondBlock = result.Buffer.GetPosition(_pool.MaxBufferSize);
+
+ Assert.NotSame(endOfFirstBlock.GetObject(), startOfSecondBlock.GetObject());
+
+ // This should return the first segment
+ _pipe.Reader.AdvanceTo(startOfSecondBlock, endOfFirstBlock);
+
+ // One block remaining
+ Assert.Equal(4096, _pipe.Length);
+
+ // This should use the segment that was returned
+ _pipe.Writer.WriteEmpty(_pool.MaxBufferSize);
+ await _pipe.Writer.FlushAsync();
+
+ result = await _pipe.Reader.ReadAsync();
+ _pipe.Reader.AdvanceTo(result.Buffer.Start);
+
+ Assert.Equal(8192, _pipe.Length);
+
result = await _pipe.Reader.ReadAsync();
_pipe.Reader.AdvanceTo(result.Buffer.End);
result = await _pipe.Reader.ReadAsync();
Assert.Throws<InvalidOperationException>(() => _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.Start));
}
+
+ [Fact]
+ public async Task ConsumedGreatherThanExaminedThrows()
+ {
+ _pipe.Writer.WriteEmpty(10);
+ await _pipe.Writer.FlushAsync();
+
+ ReadResult result = await _pipe.Reader.ReadAsync();
+ Assert.Throws<InvalidOperationException>(() => _pipe.Reader.AdvanceTo(result.Buffer.End, result.Buffer.Start));
+ }
+
+ [Fact]
+ public async Task NullConsumedOrExaminedNoops()
+ {
+ _pipe.Writer.WriteEmpty(10);
+ await _pipe.Writer.FlushAsync();
+
+ ReadResult result = await _pipe.Reader.ReadAsync();
+ _pipe.Reader.AdvanceTo(default, result.Buffer.End);
+ }
+
+ [Fact]
+ public async Task NullExaminedNoops()
+ {
+ _pipe.Writer.WriteEmpty(10);
+ await _pipe.Writer.FlushAsync();
+
+ ReadResult result = await _pipe.Reader.ReadAsync();
+ _pipe.Reader.AdvanceTo(result.Buffer.Start, default);
+ }
+
+ [Fact]
+ public async Task NullExaminedAndConsumedNoops()
+ {
+ _pipe.Writer.WriteEmpty(10);
+ await _pipe.Writer.FlushAsync();
+
+ ReadResult result = await _pipe.Reader.ReadAsync();
+ _pipe.Reader.AdvanceTo(default, default);
+ }
}
}