{
tasks[i] = stream.WriteAsync(data, 250 * i, 250);
}
- Assert.False(tasks.All(t => t.IsCompleted));
+ Assert.All(tasks, t => Assert.Equal(TaskStatus.WaitingForActivation, t.Status));
mcaos.Release();
await Task.WhenAll(tasks);
}
}
- internal sealed class ManuallyReleaseAsyncOperationsStream : MemoryStream
+ internal sealed class ManuallyReleaseAsyncOperationsStream : Stream
{
+ private readonly MemoryStream _stream = new MemoryStream();
private readonly TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
private bool _canSeek = true;
public override bool CanSeek => _canSeek;
+ public override bool CanRead => _stream.CanRead;
+
+ public override bool CanWrite => _stream.CanWrite;
+
+ public override long Length => _stream.Length;
+
+ public override long Position { get => _stream.Position; set => _stream.Position = value; }
+
public void SetCanSeek(bool canSeek) => _canSeek = canSeek;
public void Release() { _tcs.SetResult(true); }
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _tcs.Task;
- return await base.ReadAsync(buffer, offset, count, cancellationToken);
+ return await _stream.ReadAsync(buffer, offset, count, cancellationToken);
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _tcs.Task;
- await base.WriteAsync(buffer, offset, count, cancellationToken);
+ await _stream.WriteAsync(buffer, offset, count, cancellationToken);
}
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await _tcs.Task;
- await base.FlushAsync(cancellationToken);
+ await _stream.FlushAsync(cancellationToken);
}
+
+ public override void Flush() => _stream.Flush();
+ public override int Read(byte[] buffer, int offset, int count) => _stream.Read(buffer, offset, count);
+ public override long Seek(long offset, SeekOrigin origin) => _stream.Seek(offset, origin);
+ public override void SetLength(long value) => _stream.SetLength(value);
+ public override void Write(byte[] buffer, int offset, int count) => _stream.Write(buffer, offset, count);
}
internal sealed class ThrowsExceptionFromAsyncOperationsStream : MemoryStream
{
- public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
+ public override int Read(byte[] buffer, int offset, int count) =>
throw new InvalidOperationException("Exception from ReadAsync");
- }
- public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
+ public override void Write(byte[] buffer, int offset, int count) =>
+ throw new InvalidOperationException("Exception from ReadAsync");
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
+ throw new InvalidOperationException("Exception from ReadAsync");
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
throw new InvalidOperationException("Exception from WriteAsync");
- }
- public override Task FlushAsync(CancellationToken cancellationToken)
- {
+ public override Task FlushAsync(CancellationToken cancellationToken) =>
throw new InvalidOperationException("Exception from FlushAsync");
- }
}
public class BufferedStream_NS17
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
-
if (buffer == null)
throw new ArgumentNullException(nameof(buffer), SR.ArgumentNull_Buffer);
if (offset < 0)
}
// Delegate to the async implementation.
- return ReadFromUnderlyingStreamAsync(buffer, offset + bytesFromBuffer, count - bytesFromBuffer, cancellationToken,
- bytesFromBuffer, semaphoreLockTask);
+ return ReadFromUnderlyingStreamAsync(
+ new Memory<byte>(buffer, offset + bytesFromBuffer, count - bytesFromBuffer),
+ cancellationToken, bytesFromBuffer, semaphoreLockTask).AsTask();
+ }
+
+ public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+ }
+
+ EnsureNotClosed();
+ EnsureCanRead();
+
+ int bytesFromBuffer = 0;
+ SemaphoreSlim sem = LazyEnsureAsyncActiveSemaphoreInitialized();
+ Task semaphoreLockTask = sem.WaitAsync();
+ if (semaphoreLockTask.IsCompletedSuccessfully)
+ {
+ bool completeSynchronously = true;
+ try
+ {
+ bytesFromBuffer = ReadFromBuffer(destination.Span);
+ completeSynchronously = bytesFromBuffer == destination.Length;
+ if (completeSynchronously)
+ {
+ // If we satisfied enough data from the buffer, we can complete synchronously.
+ return new ValueTask<int>(bytesFromBuffer);
+ }
+ }
+ finally
+ {
+ if (completeSynchronously) // if this is FALSE, we will be entering ReadFromUnderlyingStreamAsync and releasing there.
+ {
+ sem.Release();
+ }
+ }
+ }
+
+ // Delegate to the async implementation.
+ return ReadFromUnderlyingStreamAsync(destination.Slice(bytesFromBuffer), cancellationToken, bytesFromBuffer, semaphoreLockTask);
}
/// <summary>BufferedStream should be as thin a wrapper as possible. We want ReadAsync to delegate to
/// ReadAsync of the underlying _stream rather than calling the base Stream which implements the one in terms of the other.
/// This allows BufferedStream to affect the semantics of the stream it wraps as little as possible. </summary>
/// <returns>-2 if _bufferSize was set to 0 while waiting on the semaphore; otherwise num of bytes read.</returns>
- private async Task<int> ReadFromUnderlyingStreamAsync(byte[] array, int offset, int count,
- CancellationToken cancellationToken,
- int bytesAlreadySatisfied,
- Task semaphoreLockTask)
+ private async ValueTask<int> ReadFromUnderlyingStreamAsync(
+ Memory<byte> buffer, CancellationToken cancellationToken, int bytesAlreadySatisfied, Task semaphoreLockTask)
{
-
// Same conditions validated with exceptions in ReadAsync:
- // (These should be Debug.Requires(..) but that method had some issues in async methods; using Assert(..) for now.)
- Debug.Assert(array != null);
- Debug.Assert(offset >= 0);
- Debug.Assert(count >= 0);
- Debug.Assert(array.Length - offset >= count);
Debug.Assert(_stream != null);
Debug.Assert(_stream.CanRead);
Debug.Assert(_bufferSize > 0);
await semaphoreLockTask.ConfigureAwait(false);
try
{
-
// The buffer might have been changed by another async task while we were waiting on the semaphore.
// Check it now again.
- int bytesFromBuffer = ReadFromBuffer(array, offset, count);
- if (bytesFromBuffer == count)
+ int bytesFromBuffer = ReadFromBuffer(buffer.Span);
+ if (bytesFromBuffer == buffer.Length)
+ {
return bytesAlreadySatisfied + bytesFromBuffer;
+ }
if (bytesFromBuffer > 0)
{
- count -= bytesFromBuffer;
- offset += bytesFromBuffer;
+ buffer = buffer.Slice(bytesFromBuffer);
bytesAlreadySatisfied += bytesFromBuffer;
}
// If there was anything in the write buffer, clear it.
if (_writePos > 0)
+ {
await FlushWriteAsync(cancellationToken).ConfigureAwait(false); // no Begin-End read version for Flush. Use Async.
+ }
// If the requested read is larger than buffer size, avoid the buffer and still use a single read:
- if (count >= _bufferSize)
+ if (buffer.Length >= _bufferSize)
{
- return bytesAlreadySatisfied + await _stream.ReadAsync(array, offset, count, cancellationToken).ConfigureAwait(false);
+ return bytesAlreadySatisfied + await _stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
}
// Ok. We can fill the buffer:
EnsureBufferAllocated();
_readLen = await _stream.ReadAsync(_buffer, 0, _bufferSize, cancellationToken).ConfigureAwait(false);
- bytesFromBuffer = ReadFromBuffer(array, offset, count);
+ bytesFromBuffer = ReadFromBuffer(buffer.Span);
return bytesAlreadySatisfied + bytesFromBuffer;
-
}
finally
{
offset += bytesToWrite;
}
- private void WriteToBuffer(ref ReadOnlySpan<byte> source)
+ private int WriteToBuffer(ReadOnlySpan<byte> source)
{
int bytesToWrite = Math.Min(_bufferSize - _writePos, source.Length);
if (bytesToWrite > 0)
EnsureBufferAllocated();
source.Slice(0, bytesToWrite).CopyTo(new Span<byte>(_buffer, _writePos, bytesToWrite));
_writePos += bytesToWrite;
- source = source.Slice(bytesToWrite);
}
+ return bytesToWrite;
}
private void WriteToBuffer(byte[] array, ref int offset, ref int count, out Exception error)
{
ClearReadBufferBeforeWrite();
}
- Debug.Assert(_writePos < _bufferSize);
+ Debug.Assert(_writePos < _bufferSize, $"Expected {_writePos} < {_bufferSize}");
int totalUserbytes;
bool useBuffer;
{
// Copy as much data to the buffer as will fit. If there's still room in the buffer,
// everything must have fit.
- WriteToBuffer(ref source);
+ int bytesWritten = WriteToBuffer(source);
if (_writePos < _bufferSize)
{
- Debug.Assert(source.IsEmpty);
+ Debug.Assert(bytesWritten == source.Length);
return;
}
+ source = source.Slice(bytesWritten);
Debug.Assert(_writePos == _bufferSize);
Debug.Assert(_buffer != null);
_writePos = 0;
// Now write the remainder. It must fit, as we're only on this path if that's true.
- WriteToBuffer(ref source);
+ bytesWritten = WriteToBuffer(source);
+ Debug.Assert(bytesWritten == source.Length);
- Debug.Assert(source.IsEmpty);
Debug.Assert(_writePos < _bufferSize);
}
else // skip the buffer
if (buffer.Length - offset < count)
throw new ArgumentException(SR.Argument_InvalidOffLen);
+ return WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
+ }
+
+ public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+ {
// Fast path check for cancellation already requested
if (cancellationToken.IsCancellationRequested)
+ {
return Task.FromCanceled<int>(cancellationToken);
+ }
EnsureNotClosed();
EnsureCanWrite();
- // Try to satisfy the request from the buffer synchronously. But still need a sem-lock in case that another
- // Async IO Task accesses the buffer concurrently. If we fail to acquire the lock without waiting, make this
- // an Async operation.
+ // Try to satisfy the request from the buffer synchronously.
SemaphoreSlim sem = LazyEnsureAsyncActiveSemaphoreInitialized();
Task semaphoreLockTask = sem.WaitAsync();
if (semaphoreLockTask.IsCompletedSuccessfully)
bool completeSynchronously = true;
try
{
-
if (_writePos == 0)
+ {
ClearReadBufferBeforeWrite();
+ }
Debug.Assert(_writePos < _bufferSize);
// If the write completely fits into the buffer, we can complete synchronously:
- completeSynchronously = (count < _bufferSize - _writePos);
-
+ completeSynchronously = source.Length < _bufferSize - _writePos;
if (completeSynchronously)
{
-
- Exception error;
- WriteToBuffer(buffer, ref offset, ref count, out error);
- Debug.Assert(count == 0);
-
- return (error == null)
- ? Task.CompletedTask
- : Task.FromException(error);
+ int bytesWritten = WriteToBuffer(source.Span);
+ Debug.Assert(bytesWritten == source.Length);
+ return Task.CompletedTask;
}
}
finally
}
// Delegate to the async implementation.
- return WriteToUnderlyingStreamAsync(buffer, offset, count, cancellationToken, semaphoreLockTask);
+ return WriteToUnderlyingStreamAsync(source, cancellationToken, semaphoreLockTask);
}
-
/// <summary>BufferedStream should be as thin a wrapper as possible. We want WriteAsync to delegate to
/// WriteAsync of the underlying _stream rather than calling the base Stream which implements the one
/// in terms of the other. This allows BufferedStream to affect the semantics of the stream it wraps as
/// little as possible.
/// </summary>
- private async Task WriteToUnderlyingStreamAsync(byte[] array, int offset, int count,
- CancellationToken cancellationToken,
- Task semaphoreLockTask)
+ private async Task WriteToUnderlyingStreamAsync(
+ ReadOnlyMemory<byte> source, CancellationToken cancellationToken, Task semaphoreLockTask)
{
- Debug.Assert(array != null);
- Debug.Assert(offset >= 0);
- Debug.Assert(count >= 0);
- Debug.Assert(array.Length - offset >= count);
Debug.Assert(_stream != null);
Debug.Assert(_stream.CanWrite);
Debug.Assert(_bufferSize > 0);
await semaphoreLockTask.ConfigureAwait(false);
try
{
-
// The buffer might have been changed by another async task while we were waiting on the semaphore.
// However, note that if we recalculate the sync completion condition to TRUE, then useBuffer will also be TRUE.
int totalUserBytes;
bool useBuffer;
checked
- { // We do not expect buffer sizes big enough for an overflow, but if it happens, lets fail early:
- totalUserBytes = _writePos + count;
- useBuffer = (totalUserBytes + count < (_bufferSize + _bufferSize));
+ {
+ // We do not expect buffer sizes big enough for an overflow, but if it happens, lets fail early:
+ totalUserBytes = _writePos + source.Length;
+ useBuffer = (totalUserBytes + source.Length < (_bufferSize + _bufferSize));
}
if (useBuffer)
{
- WriteToBuffer(array, ref offset, ref count);
+ source = source.Slice(WriteToBuffer(source.Span));
if (_writePos < _bufferSize)
{
- Debug.Assert(count == 0);
+ Debug.Assert(source.Length == 0);
return;
}
- Debug.Assert(count >= 0);
+ Debug.Assert(source.Length >= 0);
Debug.Assert(_writePos == _bufferSize);
Debug.Assert(_buffer != null);
-
await _stream.WriteAsync(_buffer, 0, _writePos, cancellationToken).ConfigureAwait(false);
_writePos = 0;
- WriteToBuffer(array, ref offset, ref count);
+ int bytesWritten = WriteToBuffer(source.Span);
+ Debug.Assert(bytesWritten == source.Length);
- Debug.Assert(count == 0);
Debug.Assert(_writePos < _bufferSize);
}
- else
- { // if (!useBuffer)
+ else // !useBuffer
+ {
// Write out the buffer if necessary.
if (_writePos > 0)
{
if (totalUserBytes <= (_bufferSize + _bufferSize) && totalUserBytes <= MaxShadowBufferSize)
{
EnsureShadowBufferAllocated();
- Buffer.BlockCopy(array, offset, _buffer, _writePos, count);
+ source.Span.CopyTo(new Span<byte>(_buffer, _writePos, source.Length));
await _stream.WriteAsync(_buffer, 0, totalUserBytes, cancellationToken).ConfigureAwait(false);
_writePos = 0;
}
// Write out user data.
- await _stream.WriteAsync(array, offset, count, cancellationToken).ConfigureAwait(false);
+ await _stream.WriteAsync(source, cancellationToken).ConfigureAwait(false);
}
}
finally