Add PipeWriter CanGetUnflushedBytes and UnflushedBytes properties (#54164)
authorWraith <wraith2@gmail.com>
Thu, 17 Jun 2021 13:14:16 +0000 (14:14 +0100)
committerGitHub <noreply@github.com>
Thu, 17 Jun 2021 13:14:16 +0000 (06:14 -0700)
* Add PipeWriter CanGetUnflushedBytes and UnflushedBytes properties and covering tests

* address feedback

src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs
src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeWriter.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs
src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs
src/libraries/System.IO.Pipelines/tests/StreamPipeWriterTests.cs
src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj
src/libraries/System.IO.Pipelines/tests/UnflushedBytesTests.cs [new file with mode: 0644]

index 28a8f6d..fda8e71 100644 (file)
@@ -71,6 +71,7 @@ namespace System.IO.Pipelines
         public abstract void Advance(int bytes);
         public virtual System.IO.Stream AsStream(bool leaveOpen = false) { throw null; }
         public abstract void CancelPendingFlush();
+        public virtual bool CanGetUnflushedBytes => throw null;
         public abstract void Complete(System.Exception? exception = null);
         public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; }
         protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
@@ -80,6 +81,7 @@ namespace System.IO.Pipelines
         public abstract System.Span<byte> GetSpan(int sizeHint = 0);
         [System.ObsoleteAttribute("OnReaderCompleted may not be invoked on all implementations of PipeWriter. This will be removed in a future release.")]
         public virtual void OnReaderCompleted(System.Action<System.Exception?, object?> callback, object? state) { }
+        public virtual long UnflushedBytes => throw null;
         public virtual System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> WriteAsync(System.ReadOnlyMemory<byte> source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
     }
     public readonly partial struct ReadResult
index 457f167..ab5264d 100644 (file)
   <data name="WritingAfterCompleted" xml:space="preserve">
     <value>Writing is not allowed after writer was completed.</value>
   </data>
+  <data name="UnflushedBytesNotSupported" xml:space="preserve">
+    <value>UnflushedBytes is not supported.</value>
+  </data>
 </root>
\ No newline at end of file
index d566d7b..f4ec8a3 100644 (file)
@@ -23,6 +23,8 @@ namespace System.IO.Pipelines
 
             public override void CancelPendingFlush() => _pipe.CancelPendingFlush();
 
+            public override bool CanGetUnflushedBytes => true;
+
 #pragma warning disable CS0672 // Member overrides obsolete member
             public override void OnReaderCompleted(Action<Exception?, object?> callback, object? state) => _pipe.OnReaderCompleted(callback, state);
 #pragma warning restore CS0672 // Member overrides obsolete member
@@ -41,6 +43,8 @@ namespace System.IO.Pipelines
 
             public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnFlushAsyncCompleted(continuation, state, flags);
 
+            public override long UnflushedBytes => _pipe.GetUnflushedBytes();
+
             public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
             {
                 return _pipe.WriteAsync(source, cancellationToken);
index bea6289..9a0f580 100644 (file)
@@ -1009,6 +1009,8 @@ namespace System.IO.Pipelines
             return result;
         }
 
+        internal long GetUnflushedBytes() => _unflushedBytes;
+
         private void GetFlushResult(ref FlushResult result)
         {
             // Change the state from to be canceled -> observed
index 23eaf2d..f89b50e 100644 (file)
@@ -36,6 +36,10 @@ namespace System.IO.Pipelines
         /// <remarks>The canceled <see cref="System.IO.Pipelines.PipeWriter.FlushAsync(System.Threading.CancellationToken)" /> or <see cref="System.IO.Pipelines.PipeWriter.WriteAsync(System.ReadOnlyMemory{byte},System.Threading.CancellationToken)" /> operation returns a <see cref="System.IO.Pipelines.FlushResult" /> where <see cref="System.IO.Pipelines.FlushResult.IsCanceled" /> is <see langword="true" />.</remarks>
         public abstract void CancelPendingFlush();
 
+        /// <summary>Gets a value that indicates whether the current <see cref="System.IO.Pipelines.PipeWriter" /> supports reporting the count of unflushed bytes.</summary>
+        /// <value><see langword="true" />If a class derived from <see cref="System.IO.Pipelines.PipeWriter" /> does not support getting the unflushed bytes, calls to <see cref="System.IO.Pipelines.PipeWriter.UnflushedBytes" /> throw <see cref="System.NotImplementedException" />.</value>
+        public virtual bool CanGetUnflushedBytes => false;
+
         /// <summary>Registers a callback that executes when the <see cref="System.IO.Pipelines.PipeReader" /> side of the pipe is completed.</summary>
         /// <param name="callback">The callback to register.</param>
         /// <param name="state">The state object to pass to <paramref name="callback" /> when it's invoked.</param>
@@ -143,5 +147,11 @@ namespace System.IO.Pipelines
                 }
             }
         }
+
+        /// <summary>
+        /// When overridden in a derived class, gets the count of unflushed bytes within the current writer.
+        /// </summary>
+        /// <exception cref="System.NotImplementedException">The <see cref="System.IO.Pipelines.PipeWriter"/> does not support getting the unflushed byte count.</exception>
+        public virtual long UnflushedBytes => throw ThrowHelper.CreateNotSupportedException_UnflushedBytes();
     }
 }
index 80c4d77..38de6ff 100644 (file)
@@ -206,6 +206,9 @@ namespace System.IO.Pipelines
         }
 
         /// <inheritdoc />
+        public override bool CanGetUnflushedBytes => true;
+
+        /// <inheritdoc />
         public override void Complete(Exception? exception = null)
         {
             if (_isCompleted)
@@ -259,6 +262,9 @@ namespace System.IO.Pipelines
             return FlushAsyncInternal(writeToStream: true, data: Memory<byte>.Empty, cancellationToken);
         }
 
+        /// <inheritdoc />
+        public override long UnflushedBytes => _bytesBuffered;
+
         public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
         {
             return FlushAsyncInternal(writeToStream: true, data: source, cancellationToken);
index ff3ce69..0fb3168 100644 (file)
@@ -83,6 +83,11 @@ namespace System.IO.Pipelines
         public static void ThrowInvalidOperationException_InvalidZeroByteRead() => throw CreateInvalidOperationException_InvalidZeroByteRead();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_InvalidZeroByteRead() => new InvalidOperationException(SR.InvalidZeroByteRead);
+
+        [DoesNotReturn]
+        public static void ThrowNotSupported_UnflushedBytes() => throw CreateNotSupportedException_UnflushedBytes();
+        [MethodImpl(MethodImplOptions.NoInlining)]
+        public static Exception CreateNotSupportedException_UnflushedBytes() => new NotSupportedException(SR.UnflushedBytesNotSupported);
     }
 
     internal enum ExceptionArgument
index 0abdfa2..8d94e72 100644 (file)
@@ -197,7 +197,7 @@ namespace System.IO.Pipelines.Tests
 
             await writer.FlushAsync();
             writer.Complete();
-
+            Assert.Equal(0, writer.UnflushedBytes);
             ReadResult readResult = await pipe.Reader.ReadAsync();
             Assert.Equal(bytes, readResult.Buffer.ToArray());
             pipe.Reader.AdvanceTo(readResult.Buffer.End);
@@ -220,7 +220,7 @@ namespace System.IO.Pipelines.Tests
 
             await writer.FlushAsync();
             writer.Complete();
-
+            Assert.Equal(0, writer.UnflushedBytes);
             ReadResult readResult = await pipe.Reader.ReadAsync();
             Assert.Equal(bytes, readResult.Buffer.ToArray());
             pipe.Reader.AdvanceTo(readResult.Buffer.End);
@@ -290,6 +290,7 @@ namespace System.IO.Pipelines.Tests
             Assert.Equal(1, pool.CurrentlyRentedBlocks);
             pipe.Writer.Complete();
             Assert.Equal(0, pool.CurrentlyRentedBlocks);
+            Assert.Equal(0, Pipe.Writer.UnflushedBytes);
         }
     }
 }
index 9128999..dcd0704 100644 (file)
@@ -26,6 +26,8 @@ namespace System.IO.Pipelines.Tests
             Assert.Equal(0, stream.Length);
 
             writer.Complete();
+
+
         }
 
         [Fact]
@@ -42,6 +44,7 @@ namespace System.IO.Pipelines.Tests
 
             writer.Complete();
 
+            Assert.Equal(0, writer.UnflushedBytes);
             Assert.Equal(bytes.Length, stream.Length);
             Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
         }
@@ -62,6 +65,7 @@ namespace System.IO.Pipelines.Tests
 
             Assert.Equal(bytes.Length, stream.Length);
             Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
+            Assert.Equal(0, writer.UnflushedBytes);
         }
 
         [Fact]
@@ -132,6 +136,8 @@ namespace System.IO.Pipelines.Tests
             Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
 
             writer.Complete();
+
+            Assert.Equal(0, writer.UnflushedBytes);
         }
 
         [Fact]
@@ -143,6 +149,7 @@ namespace System.IO.Pipelines.Tests
 
             Assert.False(stream.FlushAsyncCalled);
             writer.Complete();
+            Assert.Equal(0, writer.UnflushedBytes);
         }
 
         [Fact]
@@ -162,6 +169,7 @@ namespace System.IO.Pipelines.Tests
             await writer.FlushAsync();
             Assert.Equal(bytes, stream.ToArray());
             writer.Complete();
+            Assert.Equal(0, writer.UnflushedBytes);
         }
 
         [Fact]
@@ -181,6 +189,7 @@ namespace System.IO.Pipelines.Tests
             await writer.FlushAsync();
             Assert.Equal(bytes, stream.ToArray());
             writer.Complete();
+            Assert.Equal(0, writer.UnflushedBytes);
         }
 
         [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
@@ -397,6 +406,7 @@ namespace System.IO.Pipelines.Tests
                 Assert.Equal(1, pool.DisposedBlocks);
 
                 writer.Complete();
+                Assert.Equal(0, writer.UnflushedBytes);
                 Assert.Equal(0, pool.CurrentlyRentedBlocks);
                 Assert.Equal(3, pool.DisposedBlocks);
             }
@@ -441,6 +451,7 @@ namespace System.IO.Pipelines.Tests
                 Assert.Equal(0, stream.Length);
 
                 writer.Complete();
+                Assert.Equal(0, writer.UnflushedBytes);
                 Assert.Equal(0, pool.CurrentlyRentedBlocks);
                 Assert.Equal(1, pool.DisposedBlocks);
             }
@@ -462,7 +473,7 @@ namespace System.IO.Pipelines.Tests
                 Assert.Equal(0, pool.DisposedBlocks);
 
                 writer.Complete();
-
+                Assert.Equal(0, writer.UnflushedBytes);
                 Assert.Equal(0, pool.CurrentlyRentedBlocks);
                 Assert.Equal(1, pool.DisposedBlocks);
             }
@@ -482,7 +493,7 @@ namespace System.IO.Pipelines.Tests
                 Assert.Equal(0, pool.DisposedBlocks);
 
                 writer.Complete();
-
+                Assert.Equal(0, writer.UnflushedBytes);
                 Assert.Equal(0, pool.CurrentlyRentedBlocks);
                 Assert.Equal(0, pool.DisposedBlocks);
             }
@@ -592,6 +603,25 @@ namespace System.IO.Pipelines.Tests
             await Assert.ThrowsAsync<OperationCanceledException>(async () => await writer.WriteAsync(new byte[1]));
         }
 
+        [Fact]
+        public void UnflushedBytesWorks()
+        {
+            byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+            var stream = new MemoryStream();
+            PipeWriter writer = PipeWriter.Create(stream);
+
+            Assert.True(writer.CanGetUnflushedBytes);
+
+            bytes.AsSpan().CopyTo(writer.GetSpan(bytes.Length));
+            writer.Advance(bytes.Length);
+
+            Assert.Equal(bytes.Length, writer.UnflushedBytes);
+
+            writer.Complete();
+
+            Assert.Equal(0, writer.UnflushedBytes);
+        }
+
         private class ThrowsOperationCanceledExceptionStream : WriteOnlyStream
         {
             public override void Write(byte[] buffer, int offset, int count)
index d5b3b44..6b8db4a 100644 (file)
@@ -42,6 +42,7 @@
     <Compile Include="Infrastructure\TestMemoryPool.cs" />
     <Compile Include="StreamPipeWriterTests.cs" />
     <Compile Include="Infrastructure\ThrowAfterNWritesStream.cs" />
+    <Compile Include="UnflushedBytesTests.cs" />
   </ItemGroup>
   <ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
     <Compile Include="PipeLengthTests.cs" />
diff --git a/src/libraries/System.IO.Pipelines/tests/UnflushedBytesTests.cs b/src/libraries/System.IO.Pipelines/tests/UnflushedBytesTests.cs
new file mode 100644 (file)
index 0000000..1c2c15e
--- /dev/null
@@ -0,0 +1,50 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+    public class UnflushedBytesTests : PipeTest
+    {
+        internal class MinimalPipeWriter : PipeWriter
+        {
+            public override void Advance(int bytes) => throw new NotImplementedException();
+            public override void CancelPendingFlush() => throw new NotImplementedException();
+            public override void Complete(Exception? exception = null) => throw new NotImplementedException();
+            public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException();
+            public override Memory<byte> GetMemory(int sizeHint = 0) => throw new NotImplementedException();
+            public override Span<byte> GetSpan(int sizeHint = 0) => throw new NotImplementedException();
+        }
+
+        public UnflushedBytesTests() : base(0, 0)
+        {
+        }
+
+        [Fact]
+        public void NonOverriddenUnflushedBytesThrows()
+        {
+            MinimalPipeWriter writer = new MinimalPipeWriter();
+            Assert.False(writer.CanGetUnflushedBytes);
+            _ = Assert.Throws<NotSupportedException>(() => { long value = writer.UnflushedBytes; }); ;
+        }
+
+        [Fact]
+        public void UnflushedBytesWorks()
+        {
+            byte[] bytes = Encoding.ASCII.GetBytes("abcdefghijklmnopqrstuvwzyz");
+            Pipe.Writer.Write(bytes);
+            Assert.True(Pipe.Writer.CanGetUnflushedBytes);
+            Assert.Equal(bytes.Length,Pipe.Writer.UnflushedBytes);
+            _ = Pipe.Writer.FlushAsync().GetAwaiter().GetResult();
+            Assert.Equal(0, Pipe.Writer.UnflushedBytes);
+        }
+    }
+}