From 7dd7430eefcffcd114b7111498a815e367970db4 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Emmanuel=20Andr=C3=A9?= <2341261+manandre@users.noreply.github.com> Date: Wed, 13 Nov 2019 20:47:18 +0100 Subject: [PATCH] Add span-based CopyTo and CopyToAsync methods (dotnet/coreclr#27639) * Add span-based CopyTo and CopyToAsync methods * Update according to feedback * Add span-based CopyTo overrides for MemoryStream * Improve span-based CopyTo arguments validation To avoid code duplication * Update according to second review Stream API is changed * Resolve InternalReadSpan/Memory inlining * Refactor ValidateCopyToArgs * Update according to third review * Update after fourth review * Override span CopyTo for UnmanagedMemoryStream * Apply suggestions from code review Co-Authored-By: Stephen Toub * Update after fifth review * Add cross sync/async support for span-based CopyTo * Call sync action directly in async context * Rework cross sync/async support for span-based CopyTo Co-Authored-By: Stephen Toub Commit migrated from https://github.com/dotnet/coreclr/commit/6abc099d4a798dc9a267d04bc025efbcbf148473 --- .../src/System/IO/MemoryStream.cs | 52 +++++++++- .../System.Private.CoreLib/src/System/IO/Stream.cs | 109 +++++++++++++++++++++ .../src/System/IO/StreamHelpers.CopyValidation.cs | 24 +++++ .../src/System/IO/UnmanagedMemoryStream.cs | 71 ++++++++++++++ 4 files changed, 253 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/MemoryStream.cs b/src/libraries/System.Private.CoreLib/src/System/IO/MemoryStream.cs index c7aaefa..b8ff126 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/MemoryStream.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/MemoryStream.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Buffers; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -516,13 +517,13 @@ namespace System.IO if (GetType() != typeof(MemoryStream)) return base.CopyToAsync(destination, bufferSize, cancellationToken); - // If cancelled - return fast: + // If canceled - return fast: if (cancellationToken.IsCancellationRequested) return Task.FromCanceled(cancellationToken); // Avoid copying data from this buffer into a temp buffer: - // (require that InternalEmulateRead does not throw, - // otherwise it needs to be wrapped into try-catch-Task.FromException like memStrDest.Write below) + // (require that InternalEmulateRead does not throw, + // otherwise it needs to be wrapped into try-catch-Task.FromException like memStrDest.Write below) int pos = _position; int n = InternalEmulateRead(_length - _position); @@ -547,6 +548,51 @@ namespace System.IO } } + public override void CopyTo(ReadOnlySpanAction callback, object? state, int bufferSize) + { + // If we have been inherited into a subclass, the following implementation could be incorrect + // since it does not call through to Read() which a subclass might have overridden. + // To be safe we will only use this implementation in cases where we know it is safe to do so, + // and delegate to our base class (which will call into Read) when we are not sure. + if (GetType() != typeof(MemoryStream)) + { + base.CopyTo(callback, state, bufferSize); + return; + } + + StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize); + + // Retrieve a span until the end of the MemoryStream. + ReadOnlySpan span = new ReadOnlySpan(_buffer, _position, _length - _position); + _position = _length; + + // Invoke the callback, using our internal span and avoiding any + // intermediary allocations. + callback(span, state); + } + + public override Task CopyToAsync(Func, object?, CancellationToken, ValueTask> callback, object? state, int bufferSize, CancellationToken cancellationToken) + { + // If we have been inherited into a subclass, the following implementation could be incorrect + // since it does not call through to ReadAsync() which a subclass might have overridden. + // To be safe we will only use this implementation in cases where we know it is safe to do so, + // and delegate to our base class (which will call into ReadAsync) when we are not sure. + if (GetType() != typeof(MemoryStream)) + return base.CopyToAsync(callback, state, bufferSize, cancellationToken); + + StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize); + + // If canceled - return fast: + if (cancellationToken.IsCancellationRequested) + return Task.FromCanceled(cancellationToken); + + // Avoid copying data from this buffer into a temp buffer + ReadOnlyMemory memory = new ReadOnlyMemory(_buffer, _position, _length - _position); + _position = _length; + + return callback(memory, state, cancellationToken).AsTask(); + } + public override long Seek(long offset, SeekOrigin loc) { EnsureNotClosed(); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs index f9e2ecb..cf05e66 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs @@ -189,6 +189,99 @@ namespace System.IO return bufferSize; } + public virtual void CopyTo(ReadOnlySpanAction callback, object? state, int bufferSize) + { + if (callback == null) throw new ArgumentNullException(nameof(callback)); + + CopyTo(new WriteCallbackStream(callback, state), bufferSize); + } + + public virtual Task CopyToAsync(Func, object?, CancellationToken, ValueTask> callback, object? state, int bufferSize, CancellationToken cancellationToken) + { + if (callback == null) throw new ArgumentNullException(nameof(callback)); + + return CopyToAsync(new WriteCallbackStream(callback, state), bufferSize, cancellationToken); + } + + private sealed class WriteCallbackStream : Stream + { + private readonly ReadOnlySpanAction? _action; + private readonly Func, object?, CancellationToken, ValueTask>? _func; + private readonly object? _state; + + public WriteCallbackStream(ReadOnlySpanAction action, object? state) + { + _action = action; + _state = state; + } + + public WriteCallbackStream(Func, object?, CancellationToken, ValueTask> func, object? state) + { + _func = func; + _state = state; + } + + public override void Write(byte[] buffer, int offset, int count) + { + Write(new ReadOnlySpan(buffer, offset, count)); + } + + public override void Write(ReadOnlySpan span) + { + if (_action != null) + { + _action(span, _state); + return; + } + + // In case a poorly implemented CopyToAsync(Stream, ...) method decides to call + // the destination stream's Write rather than WriteAsync, we make it work, but this + // does not need to be efficient. + Debug.Assert(_func != null); + _func(span.ToArray(), _state, CancellationToken.None).AsTask().GetAwaiter().GetResult(); + + } + + public override Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + return WriteAsync(new ReadOnlyMemory(buffer, offset, length), cancellationToken).AsTask(); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) + { + if (_func != null) + { + return _func(buffer, _state, cancellationToken); + } + + // In case a poorly implemented CopyTo(Stream, ...) method decides to call + // the destination stream's WriteAsync rather than Write, we make it work, + // but this does not need to be efficient. + Debug.Assert(_action != null); + try + { + cancellationToken.ThrowIfCancellationRequested(); + _action(buffer.Span, _state); + return default; + } + catch (Exception e) + { + return new ValueTask(Task.FromException(e)); + } + } + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override void Flush() { } + public override Task FlushAsync(CancellationToken token) => Task.CompletedTask; + public override long Length => throw new NotSupportedException(); + public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } + public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + } + // Stream used to require that all cleanup logic went into Close(), // which was thought up before we invented IDisposable. However, we // need to follow the IDisposable pattern so that users can write @@ -887,6 +980,22 @@ namespace System.IO Task.CompletedTask; } + public override void CopyTo(ReadOnlySpanAction callback, object? state, int bufferSize) + { + StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize); + + // After we validate arguments this is a nop. + } + + public override Task CopyToAsync(Func, object?, CancellationToken, ValueTask> callback, object? state, int bufferSize, CancellationToken cancellationToken) + { + StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize); + + return cancellationToken.IsCancellationRequested ? + Task.FromCanceled(cancellationToken) : + Task.CompletedTask; + } + protected override void Dispose(bool disposing) { // Do nothing - we don't want NullStream singleton (static) to be closable diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/StreamHelpers.CopyValidation.cs b/src/libraries/System.Private.CoreLib/src/System/IO/StreamHelpers.CopyValidation.cs index 45bbd81..1d120e5 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/StreamHelpers.CopyValidation.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/StreamHelpers.CopyValidation.cs @@ -2,6 +2,10 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Buffers; +using System.Threading; +using System.Threading.Tasks; + namespace System.IO { /// Provides methods to help in the implementation of Stream-derived types. @@ -42,5 +46,25 @@ namespace System.IO throw new NotSupportedException(SR.NotSupported_UnwritableStream); } } + + public static void ValidateCopyToArgs(Stream source, Delegate callback, int bufferSize) + { + if (callback == null) + { + throw new ArgumentNullException(nameof(callback)); + } + + if (bufferSize <= 0) + { + throw new ArgumentOutOfRangeException(nameof(bufferSize), bufferSize, SR.ArgumentOutOfRange_NeedPosNum); + } + + if (!source.CanRead) + { + throw source.CanWrite ? (Exception) + new NotSupportedException(SR.NotSupported_UnreadableStream) : + new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed); + } + } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/UnmanagedMemoryStream.cs b/src/libraries/System.Private.CoreLib/src/System/IO/UnmanagedMemoryStream.cs index de03abd..4dbd34a 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/UnmanagedMemoryStream.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/UnmanagedMemoryStream.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Buffers; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -215,6 +216,76 @@ namespace System.IO public override bool CanWrite => _isOpen && (_access & FileAccess.Write) != 0; /// + /// Calls the given callback with a span of the memory stream data + /// + /// the callback to be called + /// A user-defined state, passed to the callback + /// the maximum size of the memory span + public override void CopyTo(ReadOnlySpanAction callback, object? state, int bufferSize) + { + // If we have been inherited into a subclass, the following implementation could be incorrect + // since it does not call through to Read() which a subclass might have overridden. + // To be safe we will only use this implementation in cases where we know it is safe to do so, + // and delegate to our base class (which will call into Read) when we are not sure. + if (GetType() != typeof(UnmanagedMemoryStream)) + { + base.CopyTo(callback, state, bufferSize); + return; + } + + if (callback == null) throw new ArgumentNullException(nameof(callback)); + + EnsureNotClosed(); + EnsureReadable(); + + // Use a local variable to avoid a race where another thread + // changes our position after we decide we can read some bytes. + long pos = Interlocked.Read(ref _position); + long len = Interlocked.Read(ref _length); + long n = len - pos; + if (n <= 0) + { + return; + } + + int nInt = (int)n; // Safe because n <= count, which is an Int32 + if (nInt < 0) + { + return; // _position could be beyond EOF + } + + unsafe + { + if (_buffer != null) + { + byte* pointer = null; + + RuntimeHelpers.PrepareConstrainedRegions(); + try + { + _buffer.AcquirePointer(ref pointer); + ReadOnlySpan span = new ReadOnlySpan(pointer + pos + _offset, nInt); + Interlocked.Exchange(ref _position, pos + n); + callback(span, state); + } + finally + { + if (pointer != null) + { + _buffer.ReleasePointer(); + } + } + } + else + { + ReadOnlySpan span = new ReadOnlySpan(_mem + pos, nInt); + Interlocked.Exchange(ref _position, pos + n); + callback(span, state); + } + } + } + + /// /// Closes the stream. The stream's memory needs to be dealt with separately. /// /// -- 2.7.4