// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
+using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
if (GetType() != typeof(MemoryStream))
return base.CopyToAsync(destination, bufferSize, cancellationToken);
- // If cancelled - return fast:
+ // If canceled - return fast:
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
// Avoid copying data from this buffer into a temp buffer:
- // (require that InternalEmulateRead does not throw,
- // otherwise it needs to be wrapped into try-catch-Task.FromException like memStrDest.Write below)
+ // (require that InternalEmulateRead does not throw,
+ // otherwise it needs to be wrapped into try-catch-Task.FromException like memStrDest.Write below)
int pos = _position;
int n = InternalEmulateRead(_length - _position);
}
}
+ public override void CopyTo(ReadOnlySpanAction<byte, object?> callback, object? state, int bufferSize)
+ {
+ // If we have been inherited into a subclass, the following implementation could be incorrect
+ // since it does not call through to Read() which a subclass might have overridden.
+ // To be safe we will only use this implementation in cases where we know it is safe to do so,
+ // and delegate to our base class (which will call into Read) when we are not sure.
+ if (GetType() != typeof(MemoryStream))
+ {
+ base.CopyTo(callback, state, bufferSize);
+ return;
+ }
+
+ StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize);
+
+ // Retrieve a span until the end of the MemoryStream.
+ ReadOnlySpan<byte> span = new ReadOnlySpan<byte>(_buffer, _position, _length - _position);
+ _position = _length;
+
+ // Invoke the callback, using our internal span and avoiding any
+ // intermediary allocations.
+ callback(span, state);
+ }
+
+ public override Task CopyToAsync(Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask> callback, object? state, int bufferSize, CancellationToken cancellationToken)
+ {
+ // If we have been inherited into a subclass, the following implementation could be incorrect
+ // since it does not call through to ReadAsync() which a subclass might have overridden.
+ // To be safe we will only use this implementation in cases where we know it is safe to do so,
+ // and delegate to our base class (which will call into ReadAsync) when we are not sure.
+ if (GetType() != typeof(MemoryStream))
+ return base.CopyToAsync(callback, state, bufferSize, cancellationToken);
+
+ StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize);
+
+ // If canceled - return fast:
+ if (cancellationToken.IsCancellationRequested)
+ return Task.FromCanceled(cancellationToken);
+
+ // Avoid copying data from this buffer into a temp buffer
+ ReadOnlyMemory<byte> memory = new ReadOnlyMemory<byte>(_buffer, _position, _length - _position);
+ _position = _length;
+
+ return callback(memory, state, cancellationToken).AsTask();
+ }
+
public override long Seek(long offset, SeekOrigin loc)
{
EnsureNotClosed();
return bufferSize;
}
+ public virtual void CopyTo(ReadOnlySpanAction<byte, object?> callback, object? state, int bufferSize)
+ {
+ if (callback == null) throw new ArgumentNullException(nameof(callback));
+
+ CopyTo(new WriteCallbackStream(callback, state), bufferSize);
+ }
+
+ public virtual Task CopyToAsync(Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask> callback, object? state, int bufferSize, CancellationToken cancellationToken)
+ {
+ if (callback == null) throw new ArgumentNullException(nameof(callback));
+
+ return CopyToAsync(new WriteCallbackStream(callback, state), bufferSize, cancellationToken);
+ }
+
+ private sealed class WriteCallbackStream : Stream
+ {
+ private readonly ReadOnlySpanAction<byte, object?>? _action;
+ private readonly Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask>? _func;
+ private readonly object? _state;
+
+ public WriteCallbackStream(ReadOnlySpanAction<byte, object?> action, object? state)
+ {
+ _action = action;
+ _state = state;
+ }
+
+ public WriteCallbackStream(Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask> func, object? state)
+ {
+ _func = func;
+ _state = state;
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ Write(new ReadOnlySpan<byte>(buffer, offset, count));
+ }
+
+ public override void Write(ReadOnlySpan<byte> span)
+ {
+ if (_action != null)
+ {
+ _action(span, _state);
+ return;
+ }
+
+ // In case a poorly implemented CopyToAsync(Stream, ...) method decides to call
+ // the destination stream's Write rather than WriteAsync, we make it work, but this
+ // does not need to be efficient.
+ Debug.Assert(_func != null);
+ _func(span.ToArray(), _state, CancellationToken.None).AsTask().GetAwaiter().GetResult();
+
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+ {
+ return WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, length), cancellationToken).AsTask();
+ }
+
+ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
+ {
+ if (_func != null)
+ {
+ return _func(buffer, _state, cancellationToken);
+ }
+
+ // In case a poorly implemented CopyTo(Stream, ...) method decides to call
+ // the destination stream's WriteAsync rather than Write, we make it work,
+ // but this does not need to be efficient.
+ Debug.Assert(_action != null);
+ try
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ _action(buffer.Span, _state);
+ return default;
+ }
+ catch (Exception e)
+ {
+ return new ValueTask(Task.FromException(e));
+ }
+ }
+
+ public override bool CanRead => false;
+ public override bool CanSeek => false;
+ public override bool CanWrite => true;
+ public override void Flush() { }
+ public override Task FlushAsync(CancellationToken token) => Task.CompletedTask;
+ public override long Length => throw new NotSupportedException();
+ public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+ public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
+ public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+ public override void SetLength(long value) => throw new NotSupportedException();
+ }
+
// Stream used to require that all cleanup logic went into Close(),
// which was thought up before we invented IDisposable. However, we
// need to follow the IDisposable pattern so that users can write
Task.CompletedTask;
}
+ public override void CopyTo(ReadOnlySpanAction<byte, object?> callback, object? state, int bufferSize)
+ {
+ StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize);
+
+ // After we validate arguments this is a nop.
+ }
+
+ public override Task CopyToAsync(Func<ReadOnlyMemory<byte>, object?, CancellationToken, ValueTask> callback, object? state, int bufferSize, CancellationToken cancellationToken)
+ {
+ StreamHelpers.ValidateCopyToArgs(this, callback, bufferSize);
+
+ return cancellationToken.IsCancellationRequested ?
+ Task.FromCanceled(cancellationToken) :
+ Task.CompletedTask;
+ }
+
protected override void Dispose(bool disposing)
{
// Do nothing - we don't want NullStream singleton (static) to be closable
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
+using System.Buffers;
+using System.Threading;
+using System.Threading.Tasks;
+
namespace System.IO
{
/// <summary>Provides methods to help in the implementation of Stream-derived types.</summary>
throw new NotSupportedException(SR.NotSupported_UnwritableStream);
}
}
+
+ public static void ValidateCopyToArgs(Stream source, Delegate callback, int bufferSize)
+ {
+ if (callback == null)
+ {
+ throw new ArgumentNullException(nameof(callback));
+ }
+
+ if (bufferSize <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(bufferSize), bufferSize, SR.ArgumentOutOfRange_NeedPosNum);
+ }
+
+ if (!source.CanRead)
+ {
+ throw source.CanWrite ? (Exception)
+ new NotSupportedException(SR.NotSupported_UnreadableStream) :
+ new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed);
+ }
+ }
}
}
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
+using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
public override bool CanWrite => _isOpen && (_access & FileAccess.Write) != 0;
/// <summary>
+ /// Calls the given callback with a span of the memory stream data
+ /// </summary>
+ /// <param name="callback">the callback to be called</param>
+ /// <param name="state">A user-defined state, passed to the callback</param>
+ /// <param name="bufferSize">the maximum size of the memory span</param>
+ public override void CopyTo(ReadOnlySpanAction<byte, object?> callback, object? state, int bufferSize)
+ {
+ // If we have been inherited into a subclass, the following implementation could be incorrect
+ // since it does not call through to Read() which a subclass might have overridden.
+ // To be safe we will only use this implementation in cases where we know it is safe to do so,
+ // and delegate to our base class (which will call into Read) when we are not sure.
+ if (GetType() != typeof(UnmanagedMemoryStream))
+ {
+ base.CopyTo(callback, state, bufferSize);
+ return;
+ }
+
+ if (callback == null) throw new ArgumentNullException(nameof(callback));
+
+ EnsureNotClosed();
+ EnsureReadable();
+
+ // Use a local variable to avoid a race where another thread
+ // changes our position after we decide we can read some bytes.
+ long pos = Interlocked.Read(ref _position);
+ long len = Interlocked.Read(ref _length);
+ long n = len - pos;
+ if (n <= 0)
+ {
+ return;
+ }
+
+ int nInt = (int)n; // Safe because n <= count, which is an Int32
+ if (nInt < 0)
+ {
+ return; // _position could be beyond EOF
+ }
+
+ unsafe
+ {
+ if (_buffer != null)
+ {
+ byte* pointer = null;
+
+ RuntimeHelpers.PrepareConstrainedRegions();
+ try
+ {
+ _buffer.AcquirePointer(ref pointer);
+ ReadOnlySpan<byte> span = new ReadOnlySpan<byte>(pointer + pos + _offset, nInt);
+ Interlocked.Exchange(ref _position, pos + n);
+ callback(span, state);
+ }
+ finally
+ {
+ if (pointer != null)
+ {
+ _buffer.ReleasePointer();
+ }
+ }
+ }
+ else
+ {
+ ReadOnlySpan<byte> span = new ReadOnlySpan<byte>(_mem + pos, nInt);
+ Interlocked.Exchange(ref _position, pos + n);
+ callback(span, state);
+ }
+ }
+ }
+
+ /// <summary>
/// Closes the stream. The stream's memory needs to be dealt with separately.
/// </summary>
/// <param name="disposing"></param>