Throw an exception if examined < consumed and reworked lastExamined (dotnet/corefx...
authorDavid Fowler <davidfowl@gmail.com>
Mon, 11 Mar 2019 21:35:27 +0000 (14:35 -0700)
committerGitHub <noreply@github.com>
Mon, 11 Mar 2019 21:35:27 +0000 (14:35 -0700)
- Store a single absolute position instead of the segment
- Fixed some typos and added tests for null segments
- Added more tests

Commit migrated from https://github.com/dotnet/corefx/commit/203bc37eddbae8fb14531d305554a662c0428279

src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs
src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs

index a47b9ba..9030a1e 100644 (file)
@@ -1,4 +1,5 @@
-<root>
+<?xml version="1.0" encoding="utf-8"?>
+<root>
   <!-- 
     Microsoft ResX Schema 
     
   <data name="GetResultBeforeCompleted" xml:space="preserve">
     <value>Can't GetResult unless awaiter is completed.</value>
   </data>
+  <data name="InvalidExaminedOrConsumedPosition" xml:space="preserve">
+    <value>The examined position must be greater than or equal to the consumed position.</value>
+  </data>
+  <data name="InvalidExaminedPosition" xml:space="preserve">
+    <value>The examined position cannot be less than the previously examined position.</value>
+  </data>
   <data name="InvalidZeroByteRead" xml:space="preserve">
     <value>The PipeReader returned 0 bytes when the ReadResult was not completed or canceled.</value>
   </data>
index ee98e61..dda00f0 100644 (file)
@@ -62,8 +62,7 @@ namespace System.IO.Pipelines
 
         // Stores the last examined position, used to calculate how many bytes were to release
         // for back pressure management
-        private BufferSegment _lastExamined;
-        private int _lastExaminedIndex;
+        private long _lastExaminedIndex = -1;
 
         // The read head which is the extent of the PipeReader's consumed bytes
         private BufferSegment _readHead;
@@ -130,7 +129,7 @@ namespace System.IO.Pipelines
             _writerAwaitable = new PipeAwaitable(completed: true, _useSynchronizationContext);
             _readTailIndex = 0;
             _readHeadIndex = 0;
-            _lastExaminedIndex = 0;
+            _lastExaminedIndex = -1;
             _currentWriteLength = 0;
             _length = 0;
         }
@@ -193,7 +192,8 @@ namespace System.IO.Pipelines
                     BufferSegment newSegment = AllocateSegment(sizeHint);
 
                     // Set all the pointers
-                    _writingHead = _readHead = _readTail = _lastExamined = newSegment;
+                    _writingHead = _readHead = _readTail = newSegment;
+                    _lastExaminedIndex = 0;
                 }
                 else
                 {
@@ -266,7 +266,6 @@ namespace System.IO.Pipelines
             Debug.Assert(segment != _readHead, "Returning _readHead segment that's in use!");
             Debug.Assert(segment != _readTail, "Returning _readTail segment that's in use!");
             Debug.Assert(segment != _writingHead, "Returning _writingHead segment that's in use!");
-            Debug.Assert(segment != _lastExamined, "Returning _lastExamined segment that's in use!");
 
             if (_bufferSegmentPool.Count < MaxPoolSize)
             {
@@ -423,6 +422,12 @@ namespace System.IO.Pipelines
 
         private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
         {
+            // Throw if examined < consumed
+            if (consumedSegment != null && examinedSegment != null && GetLength(consumedSegment, consumedIndex, examinedSegment, examinedIndex) < 0)
+            {
+                ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
+            }
+
             BufferSegment returnStart = null;
             BufferSegment returnEnd = null;
 
@@ -436,20 +441,20 @@ namespace System.IO.Pipelines
                     examinedEverything = examinedIndex == _readTailIndex;
                 }
 
-                if (examinedSegment != null && _lastExamined != null)
+                if (examinedSegment != null && _lastExaminedIndex >= 0)
                 {
-                    long examinedBytes = GetLength(_lastExamined, _lastExaminedIndex, examinedSegment, examinedIndex);
+                    long examinedBytes = GetLength(_lastExaminedIndex, examinedSegment, examinedIndex);
                     long oldLength = _length;
 
                     if (examinedBytes < 0)
                     {
-                        ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
+                        ThrowHelper.ThrowInvalidOperationException_InvalidExaminedPosition();
                     }
 
                     _length -= examinedBytes;
 
-                    _lastExamined = examinedSegment;
-                    _lastExaminedIndex = examinedIndex;
+                    // Store the absolute position
+                    _lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex;
 
                     Debug.Assert(_length >= 0, "Length has gone negative");
 
@@ -486,14 +491,6 @@ namespace System.IO.Pipelines
                         _readHead = nextBlock;
                         _readHeadIndex = 0;
 
-                        // Only update the last examined if the same as consumed
-                        if (consumedSegment == examinedSegment)
-                        {
-                            // The last examined index and the read head should be in sync
-                            _lastExamined = nextBlock;
-                            _lastExaminedIndex = 0;
-                        }
-
                         // Reset the writing head to null if it's the return block
                         // then null it out as we're about to reset that memory
                         if (_writingHead == returnEnd)
@@ -543,6 +540,12 @@ namespace System.IO.Pipelines
             return (endSegment.RunningIndex + (uint)endIndex) - (startSegment.RunningIndex + (uint)startIndex);
         }
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        private static long GetLength(long startPosition, BufferSegment endSegment, int endIndex)
+        {
+            return (endSegment.RunningIndex + (uint)endIndex) - startPosition;
+        }
+
         internal void CompleteReader(Exception exception)
         {
             PipeCompletionCallbacks completionCallbacks;
@@ -787,7 +790,7 @@ namespace System.IO.Pipelines
                 _writingHead = null;
                 _readHead = null;
                 _readTail = null;
-                _lastExamined = null;
+                _lastExaminedIndex = -1;
             }
         }
 
index 4df2021..0ef1fe1 100644 (file)
@@ -41,6 +41,14 @@ namespace System.IO.Pipelines
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_NoReadingAllowed() => new InvalidOperationException(SR.ReadingAfterCompleted);
 
+        public static void ThrowInvalidOperationException_InvalidExaminedPosition() => throw CreateInvalidOperationException_InvalidExaminedPosition();
+        [MethodImpl(MethodImplOptions.NoInlining)]
+        public static Exception CreateInvalidOperationException_InvalidExaminedPosition() => new InvalidOperationException(SR.InvalidExaminedPosition);
+
+        public static void ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition() => throw CreateInvalidOperationException_InvalidExaminedOrConsumedPosition();
+        [MethodImpl(MethodImplOptions.NoInlining)]
+        public static Exception CreateInvalidOperationException_InvalidExaminedOrConsumedPosition() => new InvalidOperationException(SR.InvalidExaminedOrConsumedPosition);
+
         public static void ThrowInvalidOperationException_AdvanceToInvalidCursor() => throw CreateInvalidOperationException_AdvanceToInvalidCursor();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_AdvanceToInvalidCursor() => new InvalidOperationException(SR.AdvanceToInvalidCursor);
index 03c084a..acc447e 100644 (file)
@@ -147,6 +147,47 @@ namespace System.IO.Pipelines.Tests
             await _pipe.Writer.FlushAsync();
 
             result = await _pipe.Reader.ReadAsync();
+            _pipe.Reader.AdvanceTo(result.Buffer.Start);
+
+            Assert.Equal(8192, _pipe.Length);
+
+            result = await _pipe.Reader.ReadAsync();
+            _pipe.Reader.AdvanceTo(result.Buffer.End);
+
+            Assert.Equal(0, _pipe.Length);
+        }
+
+        [Fact]
+        public async Task PooledSegmentsDontAffectLastExaminedSegmentEmptyGapWithDifferentBlocks()
+        {
+            _pipe.Writer.WriteEmpty(_pool.MaxBufferSize);
+            _pipe.Writer.WriteEmpty(_pool.MaxBufferSize);
+            await _pipe.Writer.FlushAsync();
+
+            ReadResult result = await _pipe.Reader.ReadAsync();
+            // This gets the end of the first block
+            SequencePosition endOfFirstBlock = result.Buffer.Slice(result.Buffer.Start, _pool.MaxBufferSize).End;
+            // Start of the next block
+            SequencePosition startOfSecondBlock = result.Buffer.GetPosition(_pool.MaxBufferSize);
+
+            Assert.NotSame(endOfFirstBlock.GetObject(), startOfSecondBlock.GetObject());
+
+            // This should return the first segment
+            _pipe.Reader.AdvanceTo(startOfSecondBlock, endOfFirstBlock);
+
+            // One block remaining
+            Assert.Equal(4096, _pipe.Length);
+
+            // This should use the segment that was returned
+            _pipe.Writer.WriteEmpty(_pool.MaxBufferSize);
+            await _pipe.Writer.FlushAsync();
+
+            result = await _pipe.Reader.ReadAsync();
+            _pipe.Reader.AdvanceTo(result.Buffer.Start);
+
+            Assert.Equal(8192, _pipe.Length);
+
+            result = await _pipe.Reader.ReadAsync();
             _pipe.Reader.AdvanceTo(result.Buffer.End);
 
             Assert.Equal(0, _pipe.Length);
@@ -197,5 +238,45 @@ namespace System.IO.Pipelines.Tests
             result = await _pipe.Reader.ReadAsync();
             Assert.Throws<InvalidOperationException>(() => _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.Start));
         }
+
+        [Fact]
+        public async Task ConsumedGreatherThanExaminedThrows()
+        {
+            _pipe.Writer.WriteEmpty(10);
+            await _pipe.Writer.FlushAsync();
+
+            ReadResult result = await _pipe.Reader.ReadAsync();
+            Assert.Throws<InvalidOperationException>(() => _pipe.Reader.AdvanceTo(result.Buffer.End, result.Buffer.Start));
+        }
+
+        [Fact]
+        public async Task NullConsumedOrExaminedNoops()
+        {
+            _pipe.Writer.WriteEmpty(10);
+            await _pipe.Writer.FlushAsync();
+
+            ReadResult result = await _pipe.Reader.ReadAsync();
+            _pipe.Reader.AdvanceTo(default, result.Buffer.End);
+        }
+
+        [Fact]
+        public async Task NullExaminedNoops()
+        {
+            _pipe.Writer.WriteEmpty(10);
+            await _pipe.Writer.FlushAsync();
+
+            ReadResult result = await _pipe.Reader.ReadAsync();
+            _pipe.Reader.AdvanceTo(result.Buffer.Start, default);
+        }
+
+        [Fact]
+        public async Task NullExaminedAndConsumedNoops()
+        {
+            _pipe.Writer.WriteEmpty(10);
+            await _pipe.Writer.FlushAsync();
+
+            ReadResult result = await _pipe.Reader.ReadAsync();
+            _pipe.Reader.AdvanceTo(default, default);
+        }
     }
 }