* Add PipeWriter CanGetUnflushedBytes and UnflushedBytes properties and covering tests
* address feedback
public abstract void Advance(int bytes);
public virtual System.IO.Stream AsStream(bool leaveOpen = false) { throw null; }
public abstract void CancelPendingFlush();
+ public virtual bool CanGetUnflushedBytes => throw null;
public abstract void Complete(System.Exception? exception = null);
public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; }
protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public abstract System.Span<byte> GetSpan(int sizeHint = 0);
[System.ObsoleteAttribute("OnReaderCompleted may not be invoked on all implementations of PipeWriter. This will be removed in a future release.")]
public virtual void OnReaderCompleted(System.Action<System.Exception?, object?> callback, object? state) { }
+ public virtual long UnflushedBytes => throw null;
public virtual System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> WriteAsync(System.ReadOnlyMemory<byte> source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public readonly partial struct ReadResult
<data name="WritingAfterCompleted" xml:space="preserve">
<value>Writing is not allowed after writer was completed.</value>
</data>
+ <data name="UnflushedBytesNotSupported" xml:space="preserve">
+ <value>UnflushedBytes is not supported.</value>
+ </data>
</root>
\ No newline at end of file
public override void CancelPendingFlush() => _pipe.CancelPendingFlush();
+ public override bool CanGetUnflushedBytes => true;
+
#pragma warning disable CS0672 // Member overrides obsolete member
public override void OnReaderCompleted(Action<Exception?, object?> callback, object? state) => _pipe.OnReaderCompleted(callback, state);
#pragma warning restore CS0672 // Member overrides obsolete member
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnFlushAsyncCompleted(continuation, state, flags);
+ public override long UnflushedBytes => _pipe.GetUnflushedBytes();
+
public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return _pipe.WriteAsync(source, cancellationToken);
return result;
}
+ internal long GetUnflushedBytes() => _unflushedBytes;
+
private void GetFlushResult(ref FlushResult result)
{
// Change the state from to be canceled -> observed
/// <remarks>The canceled <see cref="System.IO.Pipelines.PipeWriter.FlushAsync(System.Threading.CancellationToken)" /> or <see cref="System.IO.Pipelines.PipeWriter.WriteAsync(System.ReadOnlyMemory{byte},System.Threading.CancellationToken)" /> operation returns a <see cref="System.IO.Pipelines.FlushResult" /> where <see cref="System.IO.Pipelines.FlushResult.IsCanceled" /> is <see langword="true" />.</remarks>
public abstract void CancelPendingFlush();
+ /// <summary>Gets a value that indicates whether the current <see cref="System.IO.Pipelines.PipeWriter" /> supports reporting the count of unflushed bytes.</summary>
+ /// <value><see langword="true" />If a class derived from <see cref="System.IO.Pipelines.PipeWriter" /> does not support getting the unflushed bytes, calls to <see cref="System.IO.Pipelines.PipeWriter.UnflushedBytes" /> throw <see cref="System.NotImplementedException" />.</value>
+ public virtual bool CanGetUnflushedBytes => false;
+
/// <summary>Registers a callback that executes when the <see cref="System.IO.Pipelines.PipeReader" /> side of the pipe is completed.</summary>
/// <param name="callback">The callback to register.</param>
/// <param name="state">The state object to pass to <paramref name="callback" /> when it's invoked.</param>
}
}
}
+
+ /// <summary>
+ /// When overridden in a derived class, gets the count of unflushed bytes within the current writer.
+ /// </summary>
+ /// <exception cref="System.NotImplementedException">The <see cref="System.IO.Pipelines.PipeWriter"/> does not support getting the unflushed byte count.</exception>
+ public virtual long UnflushedBytes => throw ThrowHelper.CreateNotSupportedException_UnflushedBytes();
}
}
}
/// <inheritdoc />
+ public override bool CanGetUnflushedBytes => true;
+
+ /// <inheritdoc />
public override void Complete(Exception? exception = null)
{
if (_isCompleted)
return FlushAsyncInternal(writeToStream: true, data: Memory<byte>.Empty, cancellationToken);
}
+ /// <inheritdoc />
+ public override long UnflushedBytes => _bytesBuffered;
+
public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return FlushAsyncInternal(writeToStream: true, data: source, cancellationToken);
public static void ThrowInvalidOperationException_InvalidZeroByteRead() => throw CreateInvalidOperationException_InvalidZeroByteRead();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_InvalidZeroByteRead() => new InvalidOperationException(SR.InvalidZeroByteRead);
+
+ [DoesNotReturn]
+ public static void ThrowNotSupported_UnflushedBytes() => throw CreateNotSupportedException_UnflushedBytes();
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ public static Exception CreateNotSupportedException_UnflushedBytes() => new NotSupportedException(SR.UnflushedBytesNotSupported);
}
internal enum ExceptionArgument
await writer.FlushAsync();
writer.Complete();
-
+ Assert.Equal(0, writer.UnflushedBytes);
ReadResult readResult = await pipe.Reader.ReadAsync();
Assert.Equal(bytes, readResult.Buffer.ToArray());
pipe.Reader.AdvanceTo(readResult.Buffer.End);
await writer.FlushAsync();
writer.Complete();
-
+ Assert.Equal(0, writer.UnflushedBytes);
ReadResult readResult = await pipe.Reader.ReadAsync();
Assert.Equal(bytes, readResult.Buffer.ToArray());
pipe.Reader.AdvanceTo(readResult.Buffer.End);
Assert.Equal(1, pool.CurrentlyRentedBlocks);
pipe.Writer.Complete();
Assert.Equal(0, pool.CurrentlyRentedBlocks);
+ Assert.Equal(0, Pipe.Writer.UnflushedBytes);
}
}
}
Assert.Equal(0, stream.Length);
writer.Complete();
+
+
}
[Fact]
writer.Complete();
+ Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(bytes.Length, stream.Length);
Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
}
Assert.Equal(bytes.Length, stream.Length);
Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
+ Assert.Equal(0, writer.UnflushedBytes);
}
[Fact]
Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
writer.Complete();
+
+ Assert.Equal(0, writer.UnflushedBytes);
}
[Fact]
Assert.False(stream.FlushAsyncCalled);
writer.Complete();
+ Assert.Equal(0, writer.UnflushedBytes);
}
[Fact]
await writer.FlushAsync();
Assert.Equal(bytes, stream.ToArray());
writer.Complete();
+ Assert.Equal(0, writer.UnflushedBytes);
}
[Fact]
await writer.FlushAsync();
Assert.Equal(bytes, stream.ToArray());
writer.Complete();
+ Assert.Equal(0, writer.UnflushedBytes);
}
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
Assert.Equal(1, pool.DisposedBlocks);
writer.Complete();
+ Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(3, pool.DisposedBlocks);
}
Assert.Equal(0, stream.Length);
writer.Complete();
+ Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(1, pool.DisposedBlocks);
}
Assert.Equal(0, pool.DisposedBlocks);
writer.Complete();
-
+ Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(1, pool.DisposedBlocks);
}
Assert.Equal(0, pool.DisposedBlocks);
writer.Complete();
-
+ Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}
await Assert.ThrowsAsync<OperationCanceledException>(async () => await writer.WriteAsync(new byte[1]));
}
+ [Fact]
+ public void UnflushedBytesWorks()
+ {
+ byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+ var stream = new MemoryStream();
+ PipeWriter writer = PipeWriter.Create(stream);
+
+ Assert.True(writer.CanGetUnflushedBytes);
+
+ bytes.AsSpan().CopyTo(writer.GetSpan(bytes.Length));
+ writer.Advance(bytes.Length);
+
+ Assert.Equal(bytes.Length, writer.UnflushedBytes);
+
+ writer.Complete();
+
+ Assert.Equal(0, writer.UnflushedBytes);
+ }
+
private class ThrowsOperationCanceledExceptionStream : WriteOnlyStream
{
public override void Write(byte[] buffer, int offset, int count)
<Compile Include="Infrastructure\TestMemoryPool.cs" />
<Compile Include="StreamPipeWriterTests.cs" />
<Compile Include="Infrastructure\ThrowAfterNWritesStream.cs" />
+ <Compile Include="UnflushedBytesTests.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
<Compile Include="PipeLengthTests.cs" />
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+ public class UnflushedBytesTests : PipeTest
+ {
+ internal class MinimalPipeWriter : PipeWriter
+ {
+ public override void Advance(int bytes) => throw new NotImplementedException();
+ public override void CancelPendingFlush() => throw new NotImplementedException();
+ public override void Complete(Exception? exception = null) => throw new NotImplementedException();
+ public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException();
+ public override Memory<byte> GetMemory(int sizeHint = 0) => throw new NotImplementedException();
+ public override Span<byte> GetSpan(int sizeHint = 0) => throw new NotImplementedException();
+ }
+
+ public UnflushedBytesTests() : base(0, 0)
+ {
+ }
+
+ [Fact]
+ public void NonOverriddenUnflushedBytesThrows()
+ {
+ MinimalPipeWriter writer = new MinimalPipeWriter();
+ Assert.False(writer.CanGetUnflushedBytes);
+ _ = Assert.Throws<NotSupportedException>(() => { long value = writer.UnflushedBytes; }); ;
+ }
+
+ [Fact]
+ public void UnflushedBytesWorks()
+ {
+ byte[] bytes = Encoding.ASCII.GetBytes("abcdefghijklmnopqrstuvwzyz");
+ Pipe.Writer.Write(bytes);
+ Assert.True(Pipe.Writer.CanGetUnflushedBytes);
+ Assert.Equal(bytes.Length,Pipe.Writer.UnflushedBytes);
+ _ = Pipe.Writer.FlushAsync().GetAwaiter().GetResult();
+ Assert.Equal(0, Pipe.Writer.UnflushedBytes);
+ }
+ }
+}