Add span-based CopyTo and CopyToAsync methods (dotnet/coreclr#27639)
authorEmmanuel André <2341261+manandre@users.noreply.github.com>
Wed, 13 Nov 2019 19:47:18 +0000 (20:47 +0100)
committerStephen Toub <stoub@microsoft.com>
Wed, 13 Nov 2019 19:47:18 +0000 (14:47 -0500)
* Add span-based CopyTo and CopyToAsync methods

* Update according to feedback

* Add span-based CopyTo overrides for MemoryStream

* Improve span-based CopyTo arguments validation

To avoid code duplication

* Update according to second review

Stream API is changed

* Resolve InternalReadSpan/Memory inlining

* Refactor ValidateCopyToArgs

* Update according to third review

* Update after fourth review

* Override span CopyTo for UnmanagedMemoryStream

* Apply suggestions from code review

Co-Authored-By: Stephen Toub <stoub@microsoft.com>
* Update after fifth review

* Add cross sync/async support for span-based CopyTo

* Call sync action directly in async context

* Rework cross sync/async support for span-based CopyTo

Co-Authored-By: Stephen Toub <stoub@microsoft.com>
Commit migrated from https://github.com/dotnet/coreclr/commit/6abc099d4a798dc9a267d04bc025efbcbf148473

src/libraries/System.Private.CoreLib/src/System/IO/MemoryStream.cs
src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs
src/libraries/System.Private.CoreLib/src/System/IO/StreamHelpers.CopyValidation.cs
src/libraries/System.Private.CoreLib/src/System/IO/UnmanagedMemoryStream.cs

index c7aaefa..b8ff126 100644 (file)
@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using System.Buffers;
 using System.Diagnostics;
 using System.Runtime.CompilerServices;
 using System.Runtime.InteropServices;
@@ -516,13 +517,13 @@ namespace System.IO
             if (GetType() != typeof(MemoryStream))
                 return base.CopyToAsync(destination, bufferSize, cancellationToken);
 
-            // If cancelled - return fast:
+            // If canceled - return fast:
             if (cancellationToken.IsCancellationRequested)
                 return Task.FromCanceled(cancellationToken);
 
             // Avoid copying data from this buffer into a temp buffer:
-            //   (require that InternalEmulateRead does not throw,
-            //    otherwise it needs to be wrapped into try-catch-Task.FromException like memStrDest.Write below)
+            // (require that InternalEmulateRead does not throw,
+            // otherwise it needs to be wrapped into try-catch-Task.FromException like memStrDest.Write below)
 
             int pos = _position;
             int n = InternalEmulateRead(_length - _position);
@@ -547,6 +548,51 @@ namespace System.IO
             }
         }
 
+        public override void CopyTo(ReadOnlySpanAction<byte, object?> callback, object? state, int bufferSize)
+        {
+            // If we have been inherited into a subclass, the following implementation could be incorrect
+            // since it does not call through to Read() which a subclass might have overridden.
+            // To be safe we will only use this implementation in cases where we know it is safe to do so,
+            // and delegate to our base class (which will call into Read) when we are not sure.
+            if (GetType() != typeof(MemoryStream))
+            {
+                base.CopyTo(callback, state, bufferSize);
+                return;
+            }
+
+            StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize);
+
+            // Retrieve a span until the end of the MemoryStream.
+            ReadOnlySpan<byte> span = new ReadOnlySpan<byte>(_buffer, _position, _length - _position);
+            _position = _length;
+
+            // Invoke the callback, using our internal span and avoiding any
+            // intermediary allocations.
+            callback(span, state);
+        }
+
+        public override Task CopyToAsync(Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask> callback, object? state, int bufferSize, CancellationToken cancellationToken)
+        {
+            // If we have been inherited into a subclass, the following implementation could be incorrect
+            // since it does not call through to ReadAsync() which a subclass might have overridden.
+            // To be safe we will only use this implementation in cases where we know it is safe to do so,
+            // and delegate to our base class (which will call into ReadAsync) when we are not sure.
+            if (GetType() != typeof(MemoryStream))
+                return base.CopyToAsync(callback, state, bufferSize, cancellationToken);
+
+            StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize);
+
+            // If canceled - return fast:
+            if (cancellationToken.IsCancellationRequested)
+                return Task.FromCanceled(cancellationToken);
+
+            // Avoid copying data from this buffer into a temp buffer
+            ReadOnlyMemory<byte> memory = new ReadOnlyMemory<byte>(_buffer, _position, _length - _position);
+            _position = _length;
+
+            return callback(memory, state, cancellationToken).AsTask();
+        }
+
         public override long Seek(long offset, SeekOrigin loc)
         {
             EnsureNotClosed();
index f9e2ecb..cf05e66 100644 (file)
@@ -189,6 +189,99 @@ namespace System.IO
             return bufferSize;
         }
 
+        public virtual void CopyTo(ReadOnlySpanAction<byte, object?> callback, object? state, int bufferSize)
+        {
+            if (callback == null) throw new ArgumentNullException(nameof(callback));
+
+            CopyTo(new WriteCallbackStream(callback, state), bufferSize);
+        }
+
+        public virtual Task CopyToAsync(Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask> callback, object? state, int bufferSize, CancellationToken cancellationToken)
+        {
+            if (callback == null) throw new ArgumentNullException(nameof(callback));
+
+            return CopyToAsync(new WriteCallbackStream(callback, state), bufferSize, cancellationToken);
+        }
+
+        private sealed class WriteCallbackStream : Stream
+        {
+            private readonly ReadOnlySpanAction<byte, object?>? _action;
+            private readonly Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask>? _func;
+            private readonly object? _state;
+
+            public WriteCallbackStream(ReadOnlySpanAction<byte, object?> action, object? state)
+            {
+                _action = action;
+                _state = state;
+            }
+
+            public WriteCallbackStream(Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask> func, object? state)
+            {
+                _func = func;
+                _state = state;
+            }
+
+            public override void Write(byte[] buffer, int offset, int count)
+            {
+                Write(new ReadOnlySpan<byte>(buffer, offset, count));
+            }
+
+            public override void Write(ReadOnlySpan<byte> span)
+            {
+                if (_action != null)
+                {
+                    _action(span, _state);
+                    return;
+                }
+
+                // In case a poorly implemented CopyToAsync(Stream, ...) method decides to call
+                // the destination stream's Write rather than WriteAsync, we make it work, but this
+                // does not need to be efficient.
+                Debug.Assert(_func != null);
+                _func(span.ToArray(), _state, CancellationToken.None).AsTask().GetAwaiter().GetResult();
+
+            }
+
+            public override Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+            {
+                return WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, length), cancellationToken).AsTask();
+            }
+
+            public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
+            {
+                if (_func != null)
+                {
+                    return _func(buffer, _state, cancellationToken);
+                }
+
+                // In case a poorly implemented CopyTo(Stream, ...) method decides to call
+                // the destination stream's WriteAsync rather than Write, we make it work,
+                // but this does not need to be efficient.
+                Debug.Assert(_action != null);
+                try
+                {
+                    cancellationToken.ThrowIfCancellationRequested();
+                    _action(buffer.Span, _state);
+                    return default;
+                }
+                catch (Exception e)
+                {
+                    return new ValueTask(Task.FromException(e));
+                }
+            }
+
+            public override bool CanRead => false;
+            public override bool CanSeek => false;
+            public override bool CanWrite => true;
+            public override void Flush() { }
+            public override Task FlushAsync(CancellationToken token) => Task.CompletedTask;
+            public override long Length => throw new NotSupportedException();
+            public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+            public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
+            public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+            public override void SetLength(long value) => throw new NotSupportedException();
+        }
+
         // Stream used to require that all cleanup logic went into Close(),
         // which was thought up before we invented IDisposable.  However, we
         // need to follow the IDisposable pattern so that users can write
@@ -887,6 +980,22 @@ namespace System.IO
                     Task.CompletedTask;
             }
 
+            public override void CopyTo(ReadOnlySpanAction<byte, object?> callback, object? state, int bufferSize)
+            {
+                StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize);
+
+                // After we validate arguments this is a nop.
+            }
+
+            public override Task CopyToAsync(Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask> callback, object? state, int bufferSize, CancellationToken cancellationToken)
+            {
+                StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize);
+
+                return cancellationToken.IsCancellationRequested ?
+                    Task.FromCanceled(cancellationToken) :
+                    Task.CompletedTask;
+            }
+
             protected override void Dispose(bool disposing)
             {
                 // Do nothing - we don't want NullStream singleton (static) to be closable
index 45bbd81..1d120e5 100644 (file)
@@ -2,6 +2,10 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using System.Buffers;
+using System.Threading;
+using System.Threading.Tasks;
+
 namespace System.IO
 {
     /// <summary>Provides methods to help in the implementation of Stream-derived types.</summary>
@@ -42,5 +46,25 @@ namespace System.IO
                 throw new NotSupportedException(SR.NotSupported_UnwritableStream);
             }
         }
+
+        public static void ValidateCopyToArgs(Stream source, Delegate callback, int bufferSize)
+        {
+            if (callback == null)
+            {
+                throw new ArgumentNullException(nameof(callback));
+            }
+
+            if (bufferSize <= 0)
+            {
+                throw new ArgumentOutOfRangeException(nameof(bufferSize), bufferSize, SR.ArgumentOutOfRange_NeedPosNum);
+            }
+
+            if (!source.CanRead)
+            {
+                throw source.CanWrite ? (Exception)
+                    new NotSupportedException(SR.NotSupported_UnreadableStream) :
+                    new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed);
+            }
+        }
     }
 }
index de03abd..4dbd34a 100644 (file)
@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using System.Buffers;
 using System.Diagnostics;
 using System.Runtime.CompilerServices;
 using System.Runtime.InteropServices;
@@ -215,6 +216,76 @@ namespace System.IO
         public override bool CanWrite => _isOpen && (_access & FileAccess.Write) != 0;
 
         /// <summary>
+        /// Calls the given callback with a span of the memory stream data
+        /// </summary>
+        /// <param name="callback">the callback to be called</param>
+        /// <param name="state">A user-defined state, passed to the callback</param>
+        /// <param name="bufferSize">the maximum size of the memory span</param>
+        public override void CopyTo(ReadOnlySpanAction<byte, object?> callback, object? state, int bufferSize)
+        {
+            // If we have been inherited into a subclass, the following implementation could be incorrect
+            // since it does not call through to Read() which a subclass might have overridden.
+            // To be safe we will only use this implementation in cases where we know it is safe to do so,
+            // and delegate to our base class (which will call into Read) when we are not sure.
+            if (GetType() != typeof(UnmanagedMemoryStream))
+            {
+                base.CopyTo(callback, state, bufferSize);
+                return;
+            }
+
+            if (callback == null) throw new ArgumentNullException(nameof(callback));
+
+            EnsureNotClosed();
+            EnsureReadable();
+
+            // Use a local variable to avoid a race where another thread
+            // changes our position after we decide we can read some bytes.
+            long pos = Interlocked.Read(ref _position);
+            long len = Interlocked.Read(ref _length);
+            long n = len - pos;
+            if (n <= 0)
+            {
+                return;
+            }
+
+            int nInt = (int)n; // Safe because n <= count, which is an Int32
+            if (nInt < 0)
+            {
+                return;  // _position could be beyond EOF
+            }
+
+            unsafe
+            {
+                if (_buffer != null)
+                {
+                    byte* pointer = null;
+
+                    RuntimeHelpers.PrepareConstrainedRegions();
+                    try
+                    {
+                        _buffer.AcquirePointer(ref pointer);
+                        ReadOnlySpan<byte> span = new ReadOnlySpan<byte>(pointer + pos + _offset, nInt);
+                        Interlocked.Exchange(ref _position, pos + n);
+                        callback(span, state);
+                    }
+                    finally
+                    {
+                        if (pointer != null)
+                        {
+                            _buffer.ReleasePointer();
+                        }
+                    }
+                }
+                else
+                {
+                    ReadOnlySpan<byte> span = new ReadOnlySpan<byte>(_mem + pos, nInt);
+                    Interlocked.Exchange(ref _position, pos + n);
+                    callback(span, state);
+                }
+            }
+        }
+
+        /// <summary>
         /// Closes the stream. The stream's memory needs to be dealt with separately.
         /// </summary>
         /// <param name="disposing"></param>