Includes adding the virtuals to Stream and then overriding on the various streams implemented in coreclr.
/// Asynchronously reads a sequence of bytes from the current stream and advances
/// the position within the stream by the number of bytes read.
/// </summary>
- /// <param name="buffer">The buffer to write the data into.</param>
- /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream.</param>
- /// <param name="count">The maximum number of bytes to read.</param>
+ /// <param name="destination">The buffer to write the data into.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
+ /// <param name="synchronousResult">If the operation completes synchronously, the number of bytes read.</param>
/// <returns>A task that represents the asynchronous read operation.</returns>
- private Task<int> ReadAsyncInternal(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ private Task<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken, out int synchronousResult)
{
- if (_useAsyncIO)
+ Debug.Assert(_useAsyncIO);
+
+ if (!CanRead) // match Windows behavior; this gets thrown synchronously
{
- if (!CanRead) // match Windows behavior; this gets thrown synchronously
- {
- throw Error.GetReadNotSupported();
- }
+ throw Error.GetReadNotSupported();
+ }
- // Serialize operations using the semaphore.
- Task waitTask = _asyncState.WaitAsync();
+ // Serialize operations using the semaphore.
+ Task waitTask = _asyncState.WaitAsync();
- // If we got ownership immediately, and if there's enough data in our buffer
- // to satisfy the full request of the caller, hand back the buffered data.
- // While it would be a legal implementation of the Read contract, we don't
- // hand back here less than the amount requested so as to match the behavior
- // in ReadCore that will make a native call to try to fulfill the remainder
- // of the request.
- if (waitTask.Status == TaskStatus.RanToCompletion)
+ // If we got ownership immediately, and if there's enough data in our buffer
+ // to satisfy the full request of the caller, hand back the buffered data.
+ // While it would be a legal implementation of the Read contract, we don't
+ // hand back here less than the amount requested so as to match the behavior
+ // in ReadCore that will make a native call to try to fulfill the remainder
+ // of the request.
+ if (waitTask.Status == TaskStatus.RanToCompletion)
+ {
+ int numBytesAvailable = _readLength - _readPos;
+ if (numBytesAvailable >= destination.Length)
{
- int numBytesAvailable = _readLength - _readPos;
- if (numBytesAvailable >= count)
+ try
{
- try
- {
- PrepareForReading();
-
- Buffer.BlockCopy(GetBuffer(), _readPos, buffer, offset, count);
- _readPos += count;
-
- return _asyncState._lastSuccessfulReadTask != null && _asyncState._lastSuccessfulReadTask.Result == count ?
- _asyncState._lastSuccessfulReadTask :
- (_asyncState._lastSuccessfulReadTask = Task.FromResult(count));
- }
- catch (Exception exc)
- {
- return Task.FromException<int>(exc);
- }
- finally
- {
- _asyncState.Release();
- }
- }
- }
+ PrepareForReading();
- // Otherwise, issue the whole request asynchronously.
- _asyncState.Update(buffer, offset, count);
- return waitTask.ContinueWith((t, s) =>
- {
- // The options available on Unix for writing asynchronously to an arbitrary file
- // handle typically amount to just using another thread to do the synchronous write,
- // which is exactly what this implementation does. This does mean there are subtle
- // differences in certain FileStream behaviors between Windows and Unix when multiple
- // asynchronous operations are issued against the stream to execute concurrently; on
- // Unix the operations will be serialized due to the usage of a semaphore, but the
- // position /length information won't be updated until after the write has completed,
- // whereas on Windows it may happen before the write has completed.
-
- Debug.Assert(t.Status == TaskStatus.RanToCompletion);
- var thisRef = (FileStream)s;
- try
+ new Span<byte>(GetBuffer(), _readPos, destination.Length).CopyTo(destination.Span);
+ _readPos += destination.Length;
+
+ synchronousResult = destination.Length;
+ return null;
+ }
+ catch (Exception exc)
{
- byte[] b = thisRef._asyncState._buffer;
- thisRef._asyncState._buffer = null; // remove reference to user's buffer
- return thisRef.ReadSpan(new Span<byte>(b, thisRef._asyncState._offset, thisRef._asyncState._count));
+ synchronousResult = 0;
+ return Task.FromException<int>(exc);
}
- finally { thisRef._asyncState.Release(); }
- }, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
+ finally
+ {
+ _asyncState.Release();
+ }
+ }
}
- else
+
+ // Otherwise, issue the whole request asynchronously.
+ synchronousResult = 0;
+ _asyncState.Memory = destination;
+ return waitTask.ContinueWith((t, s) =>
{
- return base.ReadAsync(buffer, offset, count, cancellationToken);
- }
+ // The options available on Unix for writing asynchronously to an arbitrary file
+ // handle typically amount to just using another thread to do the synchronous write,
+ // which is exactly what this implementation does. This does mean there are subtle
+ // differences in certain FileStream behaviors between Windows and Unix when multiple
+ // asynchronous operations are issued against the stream to execute concurrently; on
+ // Unix the operations will be serialized due to the usage of a semaphore, but the
+ // position /length information won't be updated until after the write has completed,
+ // whereas on Windows it may happen before the write has completed.
+
+ Debug.Assert(t.Status == TaskStatus.RanToCompletion);
+ var thisRef = (FileStream)s;
+ try
+ {
+ Memory<byte> memory = thisRef._asyncState.Memory;
+ thisRef._asyncState.Memory = default(Memory<byte>);
+ return thisRef.ReadSpan(memory.Span);
+ }
+ finally { thisRef._asyncState.Release(); }
+ }, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}
/// <summary>Reads from the file handle into the buffer, overwriting anything in it.</summary>
/// the current position within this stream by the number of bytes written, and
/// monitors cancellation requests.
/// </summary>
- /// <param name="buffer">The buffer to write data from.</param>
- /// <param name="offset">The zero-based byte offset in buffer from which to begin copying bytes to the stream.</param>
- /// <param name="count">The maximum number of bytes to write.</param>
+ /// <param name="source">The buffer to write data from.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
- private Task WriteAsyncInternal(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
+ Debug.Assert(_useAsyncIO);
+
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
if (_fileHandle.IsClosed)
throw Error.GetFileNotOpen();
- if (_useAsyncIO)
+ if (!CanWrite) // match Windows behavior; this gets thrown synchronously
{
- if (!CanWrite) // match Windows behavior; this gets thrown synchronously
- {
- throw Error.GetWriteNotSupported();
- }
+ throw Error.GetWriteNotSupported();
+ }
- // Serialize operations using the semaphore.
- Task waitTask = _asyncState.WaitAsync();
+ // Serialize operations using the semaphore.
+ Task waitTask = _asyncState.WaitAsync();
- // If we got ownership immediately, and if there's enough space in our buffer
- // to buffer the entire write request, then do so and we're done.
- if (waitTask.Status == TaskStatus.RanToCompletion)
+ // If we got ownership immediately, and if there's enough space in our buffer
+ // to buffer the entire write request, then do so and we're done.
+ if (waitTask.Status == TaskStatus.RanToCompletion)
+ {
+ int spaceRemaining = _bufferLength - _writePos;
+ if (spaceRemaining >= source.Length)
{
- int spaceRemaining = _bufferLength - _writePos;
- if (spaceRemaining >= count)
+ try
{
- try
- {
- PrepareForWriting();
-
- Buffer.BlockCopy(buffer, offset, GetBuffer(), _writePos, count);
- _writePos += count;
-
- return Task.CompletedTask;
- }
- catch (Exception exc)
- {
- return Task.FromException(exc);
- }
- finally
- {
- _asyncState.Release();
- }
- }
- }
+ PrepareForWriting();
- // Otherwise, issue the whole request asynchronously.
- _asyncState.Update(buffer, offset, count);
- return waitTask.ContinueWith((t, s) =>
- {
- // The options available on Unix for writing asynchronously to an arbitrary file
- // handle typically amount to just using another thread to do the synchronous write,
- // which is exactly what this implementation does. This does mean there are subtle
- // differences in certain FileStream behaviors between Windows and Unix when multiple
- // asynchronous operations are issued against the stream to execute concurrently; on
- // Unix the operations will be serialized due to the usage of a semaphore, but the
- // position/length information won't be updated until after the write has completed,
- // whereas on Windows it may happen before the write has completed.
-
- Debug.Assert(t.Status == TaskStatus.RanToCompletion);
- var thisRef = (FileStream)s;
- try
+ source.Span.CopyTo(new Span<byte>(GetBuffer(), _writePos, source.Length));
+ _writePos += source.Length;
+
+ return Task.CompletedTask;
+ }
+ catch (Exception exc)
{
- byte[] b = thisRef._asyncState._buffer;
- thisRef._asyncState._buffer = null; // remove reference to user's buffer
- thisRef.WriteSpan(new ReadOnlySpan<byte>(b, thisRef._asyncState._offset, thisRef._asyncState._count));
+ return Task.FromException(exc);
}
- finally { thisRef._asyncState.Release(); }
- }, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
+ finally
+ {
+ _asyncState.Release();
+ }
+ }
}
- else
+
+ // Otherwise, issue the whole request asynchronously.
+ _asyncState.ReadOnlyMemory = source;
+ return waitTask.ContinueWith((t, s) =>
{
- return base.WriteAsync(buffer, offset, count, cancellationToken);
- }
+ // The options available on Unix for writing asynchronously to an arbitrary file
+ // handle typically amount to just using another thread to do the synchronous write,
+ // which is exactly what this implementation does. This does mean there are subtle
+ // differences in certain FileStream behaviors between Windows and Unix when multiple
+ // asynchronous operations are issued against the stream to execute concurrently; on
+ // Unix the operations will be serialized due to the usage of a semaphore, but the
+ // position/length information won't be updated until after the write has completed,
+ // whereas on Windows it may happen before the write has completed.
+
+ Debug.Assert(t.Status == TaskStatus.RanToCompletion);
+ var thisRef = (FileStream)s;
+ try
+ {
+ ReadOnlyMemory<byte> readOnlyMemory = thisRef._asyncState.ReadOnlyMemory;
+ thisRef._asyncState.ReadOnlyMemory = default(ReadOnlyMemory<byte>);
+ thisRef.WriteSpan(readOnlyMemory.Span);
+ }
+ finally { thisRef._asyncState.Release(); }
+ }, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}
/// <summary>Sets the current position of this stream to the given value.</summary>
/// <summary>State used when the stream is in async mode.</summary>
private sealed class AsyncState : SemaphoreSlim
{
- /// <summary>The caller's buffer currently being used by the active async operation.</summary>
- internal byte[] _buffer;
- /// <summary>The caller's offset currently being used by the active async operation.</summary>
- internal int _offset;
- /// <summary>The caller's count currently being used by the active async operation.</summary>
- internal int _count;
- /// <summary>The last task successfully, synchronously returned task from ReadAsync.</summary>
- internal Task<int> _lastSuccessfulReadTask;
+ internal ReadOnlyMemory<byte> ReadOnlyMemory;
+ internal Memory<byte> Memory;
/// <summary>Initialize the AsyncState.</summary>
internal AsyncState() : base(initialCount: 1, maxCount: 1) { }
-
- /// <summary>Sets the active buffer, offset, and count.</summary>
- internal void Update(byte[] buffer, int offset, int count)
- {
- _buffer = buffer;
- _offset = offset;
- _count = count;
- }
}
}
}
private static unsafe IOCompletionCallback s_ioCallback = FileStreamCompletionSource.IOCallback;
- private Task<int> _lastSynchronouslyCompletedTask = null; // cached task for read ops that complete synchronously
private Task _activeBufferOperation = null; // tracks in-progress async ops using the buffer
private PreAllocatedOverlapped _preallocatedOverlapped; // optimization for async ops to avoid per-op allocations
private FileStreamCompletionSource _currentOverlappedOwner; // async op currently using the preallocated overlapped
// If the buffer is already flushed, don't spin up the OS write
if (_writePos == 0) return Task.CompletedTask;
- Task flushTask = WriteInternalCoreAsync(GetBuffer(), 0, _writePos, cancellationToken);
+ Task flushTask = WriteAsyncInternalCore(new ReadOnlyMemory<byte>(GetBuffer(), 0, _writePos), cancellationToken);
_writePos = 0;
// Update the active buffer operation
Debug.Assert(CanRead, "CanRead");
}
- [Conditional("DEBUG")]
- private void AssertCanRead(byte[] buffer, int offset, int count)
- {
- AssertCanRead();
- Debug.Assert(buffer != null, "buffer != null");
- Debug.Assert(_writePos == 0, "_writePos == 0");
- Debug.Assert(offset >= 0, "offset is negative");
- Debug.Assert(count >= 0, "count is negative");
- }
-
/// <summary>Reads from the file handle into the buffer, overwriting anything in it.</summary>
private int FillReadBufferForReadByte() =>
_useAsyncIO ?
- ReadNativeAsync(_buffer, 0, _bufferLength, 0, CancellationToken.None).GetAwaiter().GetResult() :
+ ReadNativeAsync(new Memory<byte>(_buffer), 0, CancellationToken.None).GetAwaiter().GetResult() :
ReadNative(_buffer);
private unsafe int ReadNative(Span<byte> buffer)
{
if (_readPos > 0)
{
- //Console.WriteLine("Seek: seeked for 0, adjusting buffer back by: "+_readPos+" _readLen: "+_readLen);
Buffer.BlockCopy(GetBuffer(), _readPos, GetBuffer(), 0, _readLength - _readPos);
_readLength -= _readPos;
_readPos = 0;
else if (oldPos - _readPos < pos && pos < oldPos + _readLength - _readPos)
{
int diff = (int)(pos - oldPos);
- //Console.WriteLine("Seek: diff was "+diff+", readpos was "+_readPos+" adjusting buffer - shrinking by "+ (_readPos + diff));
Buffer.BlockCopy(GetBuffer(), _readPos + diff, GetBuffer(), 0, _readLength - (_readPos + diff));
_readLength -= (_readPos + diff);
_readPos = 0;
return;
}
- private Task<int> ReadAsyncInternal(byte[] array, int offset, int numBytes, CancellationToken cancellationToken)
+ private Task<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken, out int synchronousResult)
{
- // If async IO is not supported on this platform or
- // if this Win32FileStream was not opened with FileOptions.Asynchronous.
- if (!_useAsyncIO)
- {
- return base.ReadAsync(array, offset, numBytes, cancellationToken);
- }
-
+ Debug.Assert(_useAsyncIO);
if (!CanRead) throw Error.GetReadNotSupported();
Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both.");
// pipes. But don't completely ignore buffered data either.
if (_readPos < _readLength)
{
- int n = _readLength - _readPos;
- if (n > numBytes) n = numBytes;
- Buffer.BlockCopy(GetBuffer(), _readPos, array, offset, n);
+ int n = Math.Min(_readLength - _readPos, destination.Length);
+ new Span<byte>(GetBuffer(), _readPos, n).CopyTo(destination.Span);
_readPos += n;
-
- // Return a completed task
- return TaskFromResultOrCache(n);
+ synchronousResult = n;
+ return null;
}
else
{
Debug.Assert(_writePos == 0, "Win32FileStream must not have buffered write data here! Pipes should be unidirectional.");
- return ReadNativeAsync(array, offset, numBytes, 0, cancellationToken);
+ synchronousResult = 0;
+ return ReadNativeAsync(destination, 0, cancellationToken);
}
}
// problem, and any async read less than 64K gets turned into a
// synchronous read by NT anyways... --
- if (numBytes < _bufferLength)
+ if (destination.Length < _bufferLength)
{
- Task<int> readTask = ReadNativeAsync(GetBuffer(), 0, _bufferLength, 0, cancellationToken);
+ Task<int> readTask = ReadNativeAsync(new Memory<byte>(GetBuffer()), 0, cancellationToken);
_readLength = readTask.GetAwaiter().GetResult();
- int n = _readLength;
- if (n > numBytes) n = numBytes;
- Buffer.BlockCopy(GetBuffer(), 0, array, offset, n);
+ int n = Math.Min(_readLength, destination.Length);
+ new Span<byte>(GetBuffer(), 0, n).CopyTo(destination.Span);
_readPos = n;
- // Return a completed task (recycling the one above if possible)
- return (_readLength == n ? readTask : TaskFromResultOrCache(n));
+ synchronousResult = n;
+ return null;
}
else
{
// with our read buffer. Throw away the read buffer's contents.
_readPos = 0;
_readLength = 0;
- return ReadNativeAsync(array, offset, numBytes, 0, cancellationToken);
+ synchronousResult = 0;
+ return ReadNativeAsync(destination, 0, cancellationToken);
}
}
else
{
- int n = _readLength - _readPos;
- if (n > numBytes) n = numBytes;
- Buffer.BlockCopy(GetBuffer(), _readPos, array, offset, n);
+ int n = Math.Min(_readLength - _readPos, destination.Length);
+ new Span<byte>(GetBuffer(), _readPos, n).CopyTo(destination.Span);
_readPos += n;
- if (n >= numBytes)
+ if (n == destination.Length)
{
// Return a completed task
- return TaskFromResultOrCache(n);
+ synchronousResult = n;
+ return null;
}
else
{
// Throw away read buffer.
_readPos = 0;
_readLength = 0;
- return ReadNativeAsync(array, offset + n, numBytes - n, n, cancellationToken);
+ synchronousResult = 0;
+ return ReadNativeAsync(destination.Slice(n), n, cancellationToken);
}
}
}
- unsafe private Task<int> ReadNativeAsync(byte[] bytes, int offset, int numBytes, int numBufferedBytesRead, CancellationToken cancellationToken)
+ unsafe private Task<int> ReadNativeAsync(Memory<byte> destination, int numBufferedBytesRead, CancellationToken cancellationToken)
{
- AssertCanRead(bytes, offset, numBytes);
+ AssertCanRead();
Debug.Assert(_useAsyncIO, "ReadNativeAsync doesn't work on synchronous file streams!");
// Create and store async stream class library specific data in the async result
- FileStreamCompletionSource completionSource = new FileStreamCompletionSource(this, numBufferedBytesRead, bytes, cancellationToken);
+ FileStreamCompletionSource completionSource = destination.TryGetArray(out ArraySegment<byte> memoryArray) ?
+ new FileStreamCompletionSource(this, numBufferedBytesRead, memoryArray.Array) :
+ new MemoryFileStreamCompletionSource(this, numBufferedBytesRead, destination);
NativeOverlapped* intOverlapped = completionSource.Overlapped;
// Calculate position in the file we should be at after the read is done
// Make sure we are reading from the position that we think we are
VerifyOSHandlePosition();
- if (_filePosition + numBytes > len)
+ if (_filePosition + destination.Length > len)
{
if (_filePosition <= len)
- numBytes = (int)(len - _filePosition);
+ {
+ destination = destination.Slice(0, (int)(len - _filePosition));
+ }
else
- numBytes = 0;
+ {
+ destination = default(Memory<byte>);
+ }
}
// Now set the position to read from in the NativeOverlapped struct
// the file pointer when writing to a UNC path!
// So changed the code below to seek to an absolute
// location, not a relative one. ReadFile seems consistent though.
- SeekCore(_fileHandle, numBytes, SeekOrigin.Current);
+ SeekCore(_fileHandle, destination.Length, SeekOrigin.Current);
}
// queue an async ReadFile operation and pass in a packed overlapped
int errorCode = 0;
- int r = ReadFileNative(_fileHandle, new Span<byte>(bytes, offset, numBytes), intOverlapped, out errorCode);
+ int r = ReadFileNative(_fileHandle, destination.Span, intOverlapped, out errorCode);
+
// ReadFile, the OS version, will return 0 on failure. But
// my ReadFileNative wrapper returns -1. My wrapper will return
// the following:
// read back from this call when using overlapped structures! You must
// not pass in a non-null lpNumBytesRead to ReadFile when using
// overlapped structures! This is by design NT behavior.
- if (r == -1 && numBytes != -1)
+ if (r == -1)
{
// For pipes, when they hit EOF, they will come here.
if (errorCode == ERROR_BROKEN_PIPE)
throw Win32Marshal.GetExceptionForWin32Error(errorCode);
}
}
- else
+ else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING
{
// Only once the IO is pending do we register for cancellation
- completionSource.RegisterForCancellation();
+ completionSource.RegisterForCancellation(cancellationToken);
}
}
else
// synchronously or asynchronously. We absolutely must not
// set asyncResult._numBytes here, since will never have correct
// results.
- //Console.WriteLine("ReadFile returned: "+r+" (0x"+Int32.Format(r, "x")+") The IO completed synchronously, but the user callback was called on a separate thread");
}
return completionSource.Task;
}
- private Task WriteAsyncInternal(byte[] array, int offset, int numBytes, CancellationToken cancellationToken)
+ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
- // If async IO is not supported on this platform or
- // if this Win32FileStream was not opened with FileOptions.Asynchronous.
- if (!_useAsyncIO)
- {
- return base.WriteAsync(array, offset, numBytes, cancellationToken);
- }
-
- if (!CanWrite) throw Error.GetWriteNotSupported();
-
+ Debug.Assert(_useAsyncIO);
Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both.");
Debug.Assert(!_isPipe || (_readPos == 0 && _readLength == 0), "Win32FileStream must not have buffered data here! Pipes should be unidirectional.");
+ if (!CanWrite) throw Error.GetWriteNotSupported();
+
bool writeDataStoredInBuffer = false;
if (!_isPipe) // avoid async buffering with pipes, as doing so can lead to deadlocks (see comments in ReadInternalAsyncCore)
{
// - There's no active flush operation, such that we don't have to worry about the existing buffer being in use.
// - And the data we're trying to write fits in the buffer, meaning it wasn't already filled by previous writes.
// In that case, just store it in the buffer.
- if (numBytes < _bufferLength && !HasActiveBufferOperation && numBytes <= remainingBuffer)
+ if (source.Length < _bufferLength && !HasActiveBufferOperation && source.Length <= remainingBuffer)
{
- Buffer.BlockCopy(array, offset, GetBuffer(), _writePos, numBytes);
- _writePos += numBytes;
+ source.Span.CopyTo(new Span<byte>(GetBuffer(), _writePos, source.Length));
+ _writePos += source.Length;
writeDataStoredInBuffer = true;
// There is one special-but-common case, common because devs often use
// then we're done and can return a completed task now. But if we filled the buffer
// completely, we want to do the asynchronous flush/write as part of this operation
// rather than waiting until the next write that fills the buffer.
- if (numBytes != remainingBuffer)
+ if (source.Length != remainingBuffer)
return Task.CompletedTask;
Debug.Assert(_writePos == _bufferLength);
// Finally, issue the write asynchronously, and return a Task that logically
// represents the write operation, including any flushing done.
- Task writeTask = WriteInternalCoreAsync(array, offset, numBytes, cancellationToken);
+ Task writeTask = WriteAsyncInternalCore(source, cancellationToken);
return
(flushTask == null || flushTask.Status == TaskStatus.RanToCompletion) ? writeTask :
(writeTask.Status == TaskStatus.RanToCompletion) ? flushTask :
Task.WhenAll(flushTask, writeTask);
}
- private unsafe Task WriteInternalCoreAsync(byte[] bytes, int offset, int numBytes, CancellationToken cancellationToken)
+ private unsafe Task WriteAsyncInternalCore(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed");
Debug.Assert(CanWrite, "_parent.CanWrite");
- Debug.Assert(bytes != null, "bytes != null");
Debug.Assert(_readPos == _readLength, "_readPos == _readLen");
Debug.Assert(_useAsyncIO, "WriteInternalCoreAsync doesn't work on synchronous file streams!");
- Debug.Assert(offset >= 0, "offset is negative");
- Debug.Assert(numBytes >= 0, "numBytes is negative");
// Create and store async stream class library specific data in the async result
- FileStreamCompletionSource completionSource = new FileStreamCompletionSource(this, 0, bytes, cancellationToken);
+ FileStreamCompletionSource completionSource = source.DangerousTryGetArray(out ArraySegment<byte> array) ?
+ new FileStreamCompletionSource(this, 0, array.Array) :
+ new MemoryFileStreamCompletionSource(this, 0, source);
NativeOverlapped* intOverlapped = completionSource.Overlapped;
if (CanSeek)
{
// Make sure we set the length of the file appropriately.
long len = Length;
- //Console.WriteLine("WriteInternalCoreAsync - Calculating end pos. pos: "+pos+" len: "+len+" numBytes: "+numBytes);
// Make sure we are writing to the position that we think we are
VerifyOSHandlePosition();
- if (_filePosition + numBytes > len)
+ if (_filePosition + source.Length > len)
{
- //Console.WriteLine("WriteInternalCoreAsync - Setting length to: "+(pos + numBytes));
- SetLengthCore(_filePosition + numBytes);
+ SetLengthCore(_filePosition + source.Length);
}
// Now set the position to read from in the NativeOverlapped struct
// When using overlapped IO, the OS is not supposed to
// touch the file pointer location at all. We will adjust it
// ourselves. This isn't threadsafe.
- SeekCore(_fileHandle, numBytes, SeekOrigin.Current);
+ SeekCore(_fileHandle, source.Length, SeekOrigin.Current);
}
- //Console.WriteLine("WriteInternalCoreAsync finishing. pos: "+pos+" numBytes: "+numBytes+" _pos: "+_pos+" Position: "+Position);
-
int errorCode = 0;
// queue an async WriteFile operation and pass in a packed overlapped
- int r = WriteFileNative(_fileHandle, new ReadOnlySpan<byte>(bytes, offset, numBytes), intOverlapped, out errorCode);
+ int r = WriteFileNative(_fileHandle, source.Span, intOverlapped, out errorCode);
// WriteFile, the OS version, will return 0 on failure. But
// my WriteFileNative wrapper returns -1. My wrapper will return
// written back from this call when using overlapped IO! You must
// not pass in a non-null lpNumBytesWritten to WriteFile when using
// overlapped structures! This is ByDesign NT behavior.
- if (r == -1 && numBytes != -1)
+ if (r == -1)
{
- //Console.WriteLine("WriteFile returned 0; Write will complete asynchronously (if errorCode==3e5) errorCode: 0x{0:x}", errorCode);
-
// For pipes, when they are closed on the other side, they will come here.
if (errorCode == ERROR_NO_DATA)
{
throw Win32Marshal.GetExceptionForWin32Error(errorCode);
}
}
- else // ERROR_IO_PENDING
+ else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING
{
// Only once the IO is pending do we register for cancellation
- completionSource.RegisterForCancellation();
+ completionSource.RegisterForCancellation(cancellationToken);
}
}
else
// synchronously or asynchronously. We absolutely must not
// set asyncResult._numBytes here, since will never have correct
// results.
- //Console.WriteLine("WriteFile returned: "+r+" (0x"+Int32.Format(r, "x")+") The IO completed synchronously, but the user callback was called on another thread.");
}
return completionSource.Task;
}
}
- private Task<int> TaskFromResultOrCache(int result)
- {
- Task<int> completedTask = _lastSynchronouslyCompletedTask;
- Debug.Assert(completedTask == null || completedTask.Status == TaskStatus.RanToCompletion, "Cached task should have completed successfully");
-
- if ((completedTask == null) || (completedTask.Result != result))
- {
- completedTask = Task.FromResult(result);
- _lastSynchronouslyCompletedTask = completedTask;
- }
-
- return completedTask;
- }
-
private void LockInternal(long position, long length)
{
int positionLow = unchecked((int)(position));
/// </summary>
private readonly bool _useAsyncIO;
+ /// <summary>cached task for read ops that complete synchronously</summary>
+ private Task<int> _lastSynchronouslyCompletedTask = null;
+
/// <summary>
/// Currently cached position in the stream. This should always mirror the underlying file's actual position,
/// and should only ever be out of sync if another stream with access to this same file manipulates it, at which
{
ValidateReadWriteArgs(array, offset, count);
return _useAsyncIO ?
- ReadAsyncInternal(array, offset, count, CancellationToken.None).GetAwaiter().GetResult() :
+ ReadAsyncTask(array, offset, count, CancellationToken.None).GetAwaiter().GetResult() :
ReadSpan(new Span<byte>(array, offset, count));
}
throw new ArgumentException(SR.Argument_InvalidOffLen /*, no good single parameter name to pass*/);
// If we have been inherited into a subclass, the following implementation could be incorrect
- // since it does not call through to Read() or ReadAsync() which a subclass might have overridden.
+ // since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read/ReadAsync) when we are not sure.
- if (GetType() != typeof(FileStream))
+ // Similarly, if we weren't opened for asynchronous I/O, call to the base implementation so that
+ // Read is invoked asynchronously.
+ if (GetType() != typeof(FileStream) || !_useAsyncIO)
return base.ReadAsync(buffer, offset, count, cancellationToken);
if (cancellationToken.IsCancellationRequested)
if (IsClosed)
throw Error.GetFileNotOpen();
- return ReadAsyncInternal(buffer, offset, count, cancellationToken);
+ return ReadAsyncTask(buffer, offset, count, cancellationToken);
+ }
+
+ public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (!_useAsyncIO || GetType() != typeof(FileStream))
+ {
+ // If we're not using async I/O, delegate to the base, which will queue a call to Read.
+ // Or if this isn't a concrete FileStream, a derived type may have overridden ReadAsync(byte[],...),
+ // which was introduced first, so delegate to the base which will delegate to that.
+ return base.ReadAsync(destination, cancellationToken);
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+ }
+
+ if (IsClosed)
+ {
+ throw Error.GetFileNotOpen();
+ }
+
+ Task<int> t = ReadAsyncInternal(destination, cancellationToken, out int synchronousResult);
+ return t != null ?
+ new ValueTask<int>(t) :
+ new ValueTask<int>(synchronousResult);
+ }
+
+ private Task<int> ReadAsyncTask(byte[] array, int offset, int count, CancellationToken cancellationToken)
+ {
+ Task<int> t = ReadAsyncInternal(new Memory<byte>(array, offset, count), cancellationToken, out int synchronousResult);
+
+ if (t == null)
+ {
+ t = _lastSynchronouslyCompletedTask;
+ Debug.Assert(t == null || t.IsCompletedSuccessfully, "Cached task should have completed successfully");
+
+ if (t == null || t.Result != synchronousResult)
+ {
+ _lastSynchronouslyCompletedTask = t = Task.FromResult(synchronousResult);
+ }
+ }
+
+ return t;
}
public override void Write(byte[] array, int offset, int count)
ValidateReadWriteArgs(array, offset, count);
if (_useAsyncIO)
{
- WriteAsyncInternal(array, offset, count, CancellationToken.None).GetAwaiter().GetResult();
+ WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, count), CancellationToken.None).GetAwaiter().GetResult();
}
else
{
// since it does not call through to Write() or WriteAsync() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Write/WriteAsync) when we are not sure.
- if (GetType() != typeof(FileStream))
+ if (!_useAsyncIO || GetType() != typeof(FileStream))
return base.WriteAsync(buffer, offset, count, cancellationToken);
if (cancellationToken.IsCancellationRequested)
if (IsClosed)
throw Error.GetFileNotOpen();
- return WriteAsyncInternal(buffer, offset, count, cancellationToken);
+ return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
+ }
+
+ public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (!_useAsyncIO || GetType() != typeof(FileStream))
+ {
+ // If we're not using async I/O, delegate to the base, which will queue a call to Write.
+ // Or if this isn't a concrete FileStream, a derived type may have overridden WriteAsync(byte[],...),
+ // which was introduced first, so delegate to the base which will delegate to that.
+ return base.WriteAsync(source, cancellationToken);
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled<int>(cancellationToken);
+ }
+
+ if (IsClosed)
+ {
+ throw Error.GetFileNotOpen();
+ }
+
+ return WriteAsyncInternal(source, cancellationToken);
}
/// <summary>
if (!IsAsync)
return base.BeginRead(array, offset, numBytes, callback, state);
else
- return TaskToApm.Begin(ReadAsyncInternal(array, offset, numBytes, CancellationToken.None), callback, state);
+ return TaskToApm.Begin(ReadAsyncTask(array, offset, numBytes, CancellationToken.None), callback, state);
}
public override IAsyncResult BeginWrite(byte[] array, int offset, int numBytes, AsyncCallback callback, object state)
if (!IsAsync)
return base.BeginWrite(array, offset, numBytes, callback, state);
else
- return TaskToApm.Begin(WriteAsyncInternal(array, offset, numBytes, CancellationToken.None), callback, state);
+ return TaskToApm.Begin(WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, numBytes), CancellationToken.None), callback, state);
}
public override int EndRead(IAsyncResult asyncResult)
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using System.Security;
+using System.Buffers;
+using System.Diagnostics;
+using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
-using System.Runtime.InteropServices;
-using System.Diagnostics;
namespace System.IO
{
// This is an internal object extending TaskCompletionSource with fields
// for all of the relevant data necessary to complete the IO operation.
// This is used by IOCallback and all of the async methods.
- unsafe private sealed class FileStreamCompletionSource : TaskCompletionSource<int>
+ private unsafe class FileStreamCompletionSource : TaskCompletionSource<int>
{
private const long NoResult = 0;
private const long ResultSuccess = (long)1 << 32;
private readonly FileStream _stream;
private readonly int _numBufferedBytes;
- private readonly CancellationToken _cancellationToken;
private CancellationTokenRegistration _cancellationRegistration;
#if DEBUG
private bool _cancellationHasBeenRegistered;
private long _result; // Using long since this needs to be used in Interlocked APIs
// Using RunContinuationsAsynchronously for compat reasons (old API used Task.Factory.StartNew for continuations)
- internal FileStreamCompletionSource(FileStream stream, int numBufferedBytes, byte[] bytes, CancellationToken cancellationToken)
+ internal FileStreamCompletionSource(FileStream stream, int numBufferedBytes, byte[] bytes)
: base(TaskCreationOptions.RunContinuationsAsynchronously)
{
_numBufferedBytes = numBufferedBytes;
_stream = stream;
_result = NoResult;
- _cancellationToken = cancellationToken;
- // Create the native overlapped. We try to use the preallocated overlapped if possible:
- // it's possible if the byte buffer is the same one that's associated with the preallocated overlapped
- // and if no one else is currently using the preallocated overlapped. This is the fast-path for cases
- // where the user-provided buffer is smaller than the FileStream's buffer (such that the FileStream's
+ // Create the native overlapped. We try to use the preallocated overlapped if possible: it's possible if the byte
+ // buffer is null (there's nothing to pin) or the same one that's associated with the preallocated overlapped (and
+ // thus is already pinned) and if no one else is currently using the preallocated overlapped. This is the fast-path
+ // for cases where the user-provided buffer is smaller than the FileStream's buffer (such that the FileStream's
// buffer is used) and where operations on the FileStream are not being performed concurrently.
- _overlapped = ReferenceEquals(bytes, _stream._buffer) && _stream.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
+ _overlapped = (bytes == null || ReferenceEquals(bytes, _stream._buffer)) && _stream.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
_stream._fileHandle.ThreadPoolBinding.AllocateNativeOverlapped(_stream._preallocatedOverlapped) :
_stream._fileHandle.ThreadPoolBinding.AllocateNativeOverlapped(s_ioCallback, this, bytes);
Debug.Assert(_overlapped != null, "AllocateNativeOverlapped returned null");
TrySetResult(numBytes + _numBufferedBytes);
}
- public void RegisterForCancellation()
+ public void RegisterForCancellation(CancellationToken cancellationToken)
{
#if DEBUG
+ Debug.Assert(cancellationToken.CanBeCanceled);
Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
_cancellationHasBeenRegistered = true;
#endif
- // Quick check to make sure that the cancellation token supports cancellation, and that the IO hasn't completed
- if ((_cancellationToken.CanBeCanceled) && (_overlapped != null))
+ // Quick check to make sure the IO hasn't completed
+ if (_overlapped != null)
{
var cancelCallback = s_cancelCallback;
if (cancelCallback == null) s_cancelCallback = cancelCallback = Cancel;
long packedResult = Interlocked.CompareExchange(ref _result, RegisteringCancellation, NoResult);
if (packedResult == NoResult)
{
- _cancellationRegistration = _cancellationToken.Register(cancelCallback, this);
+ _cancellationRegistration = cancellationToken.Register(cancelCallback, this);
// Switch the result, just in case IO completed while we were setting the registration
packedResult = Interlocked.Exchange(ref _result, NoResult);
}
}
- internal void ReleaseNativeResource()
+ internal virtual void ReleaseNativeResource()
{
// Ensure that cancellation has been completed and cleaned up.
_cancellationRegistration.Dispose();
private void CompleteCallback(ulong packedResult)
{
// Free up the native resource and cancellation registration
+ CancellationToken cancellationToken = _cancellationRegistration.Token; // access before disposing registration
ReleaseNativeResource();
// Unpack the result and send it to the user
int errorCode = unchecked((int)(packedResult & uint.MaxValue));
if (errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
{
- TrySetCanceled(_cancellationToken.IsCancellationRequested ? _cancellationToken : new CancellationToken(true));
+ TrySetCanceled(cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(true));
}
else
{
}
}
}
+
+ /// <summary>
+ /// Extends <see cref="FileStreamCompletionSource"/> with to support disposing of a
+ /// <see cref="MemoryHandle"/> when the operation has completed. This should only be used
+ /// when memory doesn't wrap a byte[].
+ /// </summary>
+ private sealed class MemoryFileStreamCompletionSource : FileStreamCompletionSource
+ {
+ private MemoryHandle _handle; // mutable struct; do not make this readonly
+
+ internal MemoryFileStreamCompletionSource(FileStream stream, int numBufferedBytes, ReadOnlyMemory<byte> memory) :
+ base(stream, numBufferedBytes, bytes: null) // this type handles the pinning, so null is passed for bytes
+ {
+ Debug.Assert(!memory.DangerousTryGetArray(out ArraySegment<byte> array), "The base should be used directly if we can get the array.");
+ _handle = memory.Retain(pin: true);
+ }
+
+ internal override void ReleaseNativeResource()
+ {
+ _handle.Dispose();
+ base.ReleaseNativeResource();
+ }
+ }
}
}
}
/// <summary>
+ /// Reads bytes from stream and puts them into the buffer
+ /// </summary>
+ /// <param name="destination">Buffer to read the bytes to.</param>
+ /// <param name="cancellationToken">Token that can be used to cancel this operation.</param>
+ public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+ }
+
+ try
+ {
+ return new ValueTask<int>(Read(destination.Span));
+ }
+ catch (Exception ex)
+ {
+ return new ValueTask<int>(Task.FromException<int>(ex));
+ }
+ }
+
+ /// <summary>
/// Returns the byte at the stream current Position and advances the Position.
/// </summary>
/// <returns></returns>
}
/// <summary>
+ /// Writes buffer into the stream. The operation completes synchronously.
+ /// </summary>
+ /// <param name="buffer">Buffer that will be written.</param>
+ /// <param name="cancellationToken">Token that can be used to cancel the operation.</param>
+ public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ try
+ {
+ Write(source.Span);
+ return Task.CompletedTask;
+ }
+ catch (Exception ex)
+ {
+ return Task.FromException(ex);
+ }
+ }
+
+ /// <summary>
/// Writes a byte to the stream and advances the current Position.
/// </summary>
/// <param name="value"></param>
return _unmanagedStream.ReadAsync(buffer, offset, count, cancellationToken);
}
+ public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ return _unmanagedStream.ReadAsync(destination, cancellationToken);
+ }
+
public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, CancellationToken cancellationToken)
{
return _unmanagedStream.WriteAsync(buffer, offset, count, cancellationToken);
}
+
+ public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ return _unmanagedStream.WriteAsync(source, cancellationToken);
+ }
} // class UnmanagedMemoryStreamWrapper
} // namespace
}
}
+ public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+ }
+
+ try
+ {
+ return new ValueTask<int>(Read(destination.Span));
+ }
+ catch (OperationCanceledException oce)
+ {
+ return new ValueTask<int>(Task.FromCancellation<int>(oce));
+ }
+ catch (Exception exception)
+ {
+ return new ValueTask<int>(Task.FromException<int>(exception));
+ }
+ }
+
public override int ReadByte()
{
}
}
+ public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ try
+ {
+ Write(source.Span);
+ return Task.CompletedTask;
+ }
+ catch (OperationCanceledException oce)
+ {
+ return Task.FromCancellation<VoidTaskResult>(oce);
+ }
+ catch (Exception exception)
+ {
+ return Task.FromException(exception);
+ }
+ }
+
public override void WriteByte(byte value)
{
if (!_isOpen) __Error.StreamIsClosed();
: BeginEndReadAsync(buffer, offset, count);
}
+ public virtual ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (destination.TryGetArray(out ArraySegment<byte> array))
+ {
+ return new ValueTask<int>(ReadAsync(array.Array, array.Offset, array.Count, cancellationToken));
+ }
+ else
+ {
+ byte[] buffer = ArrayPool<byte>.Shared.Rent(destination.Length);
+ return FinishReadAsync(ReadAsync(buffer, 0, destination.Length, cancellationToken), buffer, destination);
+
+ async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
+ {
+ try
+ {
+ int result = await readTask.ConfigureAwait(false);
+ new Span<byte>(localBuffer, 0, result).CopyTo(localDestination.Span);
+ return result;
+ }
+ finally
+ {
+ ArrayPool<byte>.Shared.Return(localBuffer);
+ }
+ }
+ }
+ }
+
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private extern bool HasOverriddenBeginEndRead();
return WriteAsync(buffer, offset, count, CancellationToken.None);
}
-
-
public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// If cancellation was requested, bail early with an already completed task.
: BeginEndWriteAsync(buffer, offset, count);
}
+ public virtual Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (source.DangerousTryGetArray(out ArraySegment<byte> array))
+ {
+ return WriteAsync(array.Array, array.Offset, array.Count, cancellationToken);
+ }
+ else
+ {
+ byte[] buffer = ArrayPool<byte>.Shared.Rent(source.Length);
+ source.Span.CopyTo(buffer);
+ return FinishWriteAsync(WriteAsync(buffer, 0, source.Length, cancellationToken), buffer);
+
+ async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
+ {
+ try
+ {
+ await writeTask.ConfigureAwait(false);
+ }
+ finally
+ {
+ ArrayPool<byte>.Shared.Return(localBuffer);
+ }
+ }
+ }
+ }
+
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private extern bool HasOverriddenBeginEndWrite();
public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
- var nullReadTask = s_nullReadTask;
- if (nullReadTask == null)
- s_nullReadTask = nullReadTask = new Task<int>(false, 0, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, CancellationToken.None); // benign race condition
- return nullReadTask;
+ return AsyncTaskMethodBuilder<int>.s_defaultResultTask;
+ }
+
+ public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ return new ValueTask<int>(0);
}
- private static Task<int> s_nullReadTask;
public override int ReadByte()
{
Task.CompletedTask;
}
+ public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ return cancellationToken.IsCancellationRequested ?
+ Task.FromCanceled(cancellationToken) :
+ Task.CompletedTask;
+ }
+
public override void WriteByte(byte value)
{
}
}
/// <summary>
+ /// Gets the <see cref="CancellationToken"/> with which this registration is associated. If the
+ /// registration isn't associated with a token (such as after the registration has been disposed),
+ /// this will return a default token.
+ /// </summary>
+ internal CancellationToken Token => _node?.Partition.Source.Token ?? default(CancellationToken);
+
+ /// <summary>
/// Disposes of the registration and unregisters the target callback from the associated
/// <see cref="T:System.Threading.CancellationToken">CancellationToken</see>.
/// </summary>