Add Memory-based Stream overloads to coreclr (#13769)
authorStephen Toub <stoub@microsoft.com>
Wed, 6 Sep 2017 15:48:01 +0000 (11:48 -0400)
committerGitHub <noreply@github.com>
Wed, 6 Sep 2017 15:48:01 +0000 (11:48 -0400)
Includes adding the virtuals to Stream and then overriding on the various streams implemented in coreclr.

src/mscorlib/shared/System/IO/FileStream.Unix.cs
src/mscorlib/shared/System/IO/FileStream.Windows.cs
src/mscorlib/shared/System/IO/FileStream.cs
src/mscorlib/shared/System/IO/FileStreamCompletionSource.Win32.cs
src/mscorlib/shared/System/IO/UnmanagedMemoryStream.cs
src/mscorlib/shared/System/IO/UnmanagedMemoryStreamWrapper.cs
src/mscorlib/src/System/IO/MemoryStream.cs
src/mscorlib/src/System/IO/Stream.cs
src/mscorlib/src/System/Threading/CancellationTokenRegistration.cs

index 58b71ac..8499595 100644 (file)
@@ -472,84 +472,79 @@ namespace System.IO
         /// Asynchronously reads a sequence of bytes from the current stream and advances
         /// the position within the stream by the number of bytes read.
         /// </summary>
-        /// <param name="buffer">The buffer to write the data into.</param>
-        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream.</param>
-        /// <param name="count">The maximum number of bytes to read.</param>
+        /// <param name="destination">The buffer to write the data into.</param>
         /// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
+        /// <param name="synchronousResult">If the operation completes synchronously, the number of bytes read.</param>
         /// <returns>A task that represents the asynchronous read operation.</returns>
-        private Task<int> ReadAsyncInternal(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        private Task<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken, out int synchronousResult)
         {
-            if (_useAsyncIO)
+            Debug.Assert(_useAsyncIO);
+
+            if (!CanRead) // match Windows behavior; this gets thrown synchronously
             {
-                if (!CanRead) // match Windows behavior; this gets thrown synchronously
-                {
-                    throw Error.GetReadNotSupported();
-                }
+                throw Error.GetReadNotSupported();
+            }
 
-                // Serialize operations using the semaphore.
-                Task waitTask = _asyncState.WaitAsync();
+            // Serialize operations using the semaphore.
+            Task waitTask = _asyncState.WaitAsync();
 
-                // If we got ownership immediately, and if there's enough data in our buffer
-                // to satisfy the full request of the caller, hand back the buffered data.
-                // While it would be a legal implementation of the Read contract, we don't
-                // hand back here less than the amount requested so as to match the behavior
-                // in ReadCore that will make a native call to try to fulfill the remainder
-                // of the request.
-                if (waitTask.Status == TaskStatus.RanToCompletion)
+            // If we got ownership immediately, and if there's enough data in our buffer
+            // to satisfy the full request of the caller, hand back the buffered data.
+            // While it would be a legal implementation of the Read contract, we don't
+            // hand back here less than the amount requested so as to match the behavior
+            // in ReadCore that will make a native call to try to fulfill the remainder
+            // of the request.
+            if (waitTask.Status == TaskStatus.RanToCompletion)
+            {
+                int numBytesAvailable = _readLength - _readPos;
+                if (numBytesAvailable >= destination.Length)
                 {
-                    int numBytesAvailable = _readLength - _readPos;
-                    if (numBytesAvailable >= count)
+                    try
                     {
-                        try
-                        {
-                            PrepareForReading();
-
-                            Buffer.BlockCopy(GetBuffer(), _readPos, buffer, offset, count);
-                            _readPos += count;
-
-                            return _asyncState._lastSuccessfulReadTask != null && _asyncState._lastSuccessfulReadTask.Result == count ?
-                                _asyncState._lastSuccessfulReadTask :
-                                (_asyncState._lastSuccessfulReadTask = Task.FromResult(count));
-                        }
-                        catch (Exception exc)
-                        {
-                            return Task.FromException<int>(exc);
-                        }
-                        finally
-                        {
-                            _asyncState.Release();
-                        }
-                    }
-                }
+                        PrepareForReading();
 
-                // Otherwise, issue the whole request asynchronously.
-                _asyncState.Update(buffer, offset, count);
-                return waitTask.ContinueWith((t, s) =>
-                {
-                    // The options available on Unix for writing asynchronously to an arbitrary file 
-                    // handle typically amount to just using another thread to do the synchronous write, 
-                    // which is exactly  what this implementation does. This does mean there are subtle
-                    // differences in certain FileStream behaviors between Windows and Unix when multiple 
-                    // asynchronous operations are issued against the stream to execute concurrently; on 
-                    // Unix the operations will be serialized due to the usage of a semaphore, but the 
-                    // position /length information won't be updated until after the write has completed, 
-                    // whereas on Windows it may happen before the write has completed.
-
-                    Debug.Assert(t.Status == TaskStatus.RanToCompletion);
-                    var thisRef = (FileStream)s;
-                    try
+                        new Span<byte>(GetBuffer(), _readPos, destination.Length).CopyTo(destination.Span);
+                        _readPos += destination.Length;
+
+                        synchronousResult = destination.Length;
+                        return null;
+                    }
+                    catch (Exception exc)
                     {
-                        byte[] b = thisRef._asyncState._buffer;
-                        thisRef._asyncState._buffer = null; // remove reference to user's buffer
-                        return thisRef.ReadSpan(new Span<byte>(b, thisRef._asyncState._offset, thisRef._asyncState._count));
+                        synchronousResult = 0;
+                        return Task.FromException<int>(exc);
                     }
-                    finally { thisRef._asyncState.Release(); }
-                }, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
+                    finally
+                    {
+                        _asyncState.Release();
+                    }
+                }
             }
-            else
+
+            // Otherwise, issue the whole request asynchronously.
+            synchronousResult = 0;
+            _asyncState.Memory = destination;
+            return waitTask.ContinueWith((t, s) =>
             {
-                return base.ReadAsync(buffer, offset, count, cancellationToken);
-            }
+                // The options available on Unix for writing asynchronously to an arbitrary file 
+                // handle typically amount to just using another thread to do the synchronous write, 
+                // which is exactly  what this implementation does. This does mean there are subtle
+                // differences in certain FileStream behaviors between Windows and Unix when multiple 
+                // asynchronous operations are issued against the stream to execute concurrently; on 
+                // Unix the operations will be serialized due to the usage of a semaphore, but the 
+                // position /length information won't be updated until after the write has completed, 
+                // whereas on Windows it may happen before the write has completed.
+
+                Debug.Assert(t.Status == TaskStatus.RanToCompletion);
+                var thisRef = (FileStream)s;
+                try
+                {
+                    Memory<byte> memory = thisRef._asyncState.Memory;
+                    thisRef._asyncState.Memory = default(Memory<byte>);
+                    return thisRef.ReadSpan(memory.Span);
+                }
+                finally { thisRef._asyncState.Release(); }
+            }, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
         }
 
         /// <summary>Reads from the file handle into the buffer, overwriting anything in it.</summary>
@@ -636,84 +631,77 @@ namespace System.IO
         /// the current position within this stream by the number of bytes written, and
         /// monitors cancellation requests.
         /// </summary>
-        /// <param name="buffer">The buffer to write data from.</param>
-        /// <param name="offset">The zero-based byte offset in buffer from which to begin copying bytes to the stream.</param>
-        /// <param name="count">The maximum number of bytes to write.</param>
+        /// <param name="source">The buffer to write data from.</param>
         /// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
         /// <returns>A task that represents the asynchronous write operation.</returns>
-        private Task WriteAsyncInternal(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
         {
+            Debug.Assert(_useAsyncIO);
+
             if (cancellationToken.IsCancellationRequested)
                 return Task.FromCanceled(cancellationToken);
 
             if (_fileHandle.IsClosed)
                 throw Error.GetFileNotOpen();
 
-            if (_useAsyncIO)
+            if (!CanWrite) // match Windows behavior; this gets thrown synchronously
             {
-                if (!CanWrite) // match Windows behavior; this gets thrown synchronously
-                {
-                    throw Error.GetWriteNotSupported();
-                }
+                throw Error.GetWriteNotSupported();
+            }
 
-                // Serialize operations using the semaphore.
-                Task waitTask = _asyncState.WaitAsync();
+            // Serialize operations using the semaphore.
+            Task waitTask = _asyncState.WaitAsync();
 
-                // If we got ownership immediately, and if there's enough space in our buffer
-                // to buffer the entire write request, then do so and we're done.
-                if (waitTask.Status == TaskStatus.RanToCompletion)
+            // If we got ownership immediately, and if there's enough space in our buffer
+            // to buffer the entire write request, then do so and we're done.
+            if (waitTask.Status == TaskStatus.RanToCompletion)
+            {
+                int spaceRemaining = _bufferLength - _writePos;
+                if (spaceRemaining >= source.Length)
                 {
-                    int spaceRemaining = _bufferLength - _writePos;
-                    if (spaceRemaining >= count)
+                    try
                     {
-                        try
-                        {
-                            PrepareForWriting();
-
-                            Buffer.BlockCopy(buffer, offset, GetBuffer(), _writePos, count);
-                            _writePos += count;
-
-                            return Task.CompletedTask;
-                        }
-                        catch (Exception exc)
-                        {
-                            return Task.FromException(exc);
-                        }
-                        finally
-                        {
-                            _asyncState.Release();
-                        }
-                    }
-                }
+                        PrepareForWriting();
 
-                // Otherwise, issue the whole request asynchronously.
-                _asyncState.Update(buffer, offset, count);
-                return waitTask.ContinueWith((t, s) =>
-                {
-                    // The options available on Unix for writing asynchronously to an arbitrary file 
-                    // handle typically amount to just using another thread to do the synchronous write, 
-                    // which is exactly  what this implementation does. This does mean there are subtle
-                    // differences in certain FileStream behaviors between Windows and Unix when multiple 
-                    // asynchronous operations are issued against the stream to execute concurrently; on 
-                    // Unix the operations will be serialized due to the usage of a semaphore, but the 
-                    // position/length information won't be updated until after the write has completed, 
-                    // whereas on Windows it may happen before the write has completed.
-
-                    Debug.Assert(t.Status == TaskStatus.RanToCompletion);
-                    var thisRef = (FileStream)s;
-                    try
+                        source.Span.CopyTo(new Span<byte>(GetBuffer(), _writePos, source.Length));
+                        _writePos += source.Length;
+
+                        return Task.CompletedTask;
+                    }
+                    catch (Exception exc)
                     {
-                        byte[] b = thisRef._asyncState._buffer;
-                        thisRef._asyncState._buffer = null; // remove reference to user's buffer
-                        thisRef.WriteSpan(new ReadOnlySpan<byte>(b, thisRef._asyncState._offset, thisRef._asyncState._count));
+                        return Task.FromException(exc);
                     }
-                    finally { thisRef._asyncState.Release(); }
-                }, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
+                    finally
+                    {
+                        _asyncState.Release();
+                    }
+                }
             }
-            else
+
+            // Otherwise, issue the whole request asynchronously.
+            _asyncState.ReadOnlyMemory = source;
+            return waitTask.ContinueWith((t, s) =>
             {
-                return base.WriteAsync(buffer, offset, count, cancellationToken);
-            }
+                // The options available on Unix for writing asynchronously to an arbitrary file 
+                // handle typically amount to just using another thread to do the synchronous write, 
+                // which is exactly  what this implementation does. This does mean there are subtle
+                // differences in certain FileStream behaviors between Windows and Unix when multiple 
+                // asynchronous operations are issued against the stream to execute concurrently; on 
+                // Unix the operations will be serialized due to the usage of a semaphore, but the 
+                // position/length information won't be updated until after the write has completed, 
+                // whereas on Windows it may happen before the write has completed.
+
+                Debug.Assert(t.Status == TaskStatus.RanToCompletion);
+                var thisRef = (FileStream)s;
+                try
+                {
+                    ReadOnlyMemory<byte> readOnlyMemory = thisRef._asyncState.ReadOnlyMemory;
+                    thisRef._asyncState.ReadOnlyMemory = default(ReadOnlyMemory<byte>);
+                    thisRef.WriteSpan(readOnlyMemory.Span);
+                }
+                finally { thisRef._asyncState.Release(); }
+            }, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
         }
 
         /// <summary>Sets the current position of this stream to the given value.</summary>
@@ -814,25 +802,11 @@ namespace System.IO
         /// <summary>State used when the stream is in async mode.</summary>
         private sealed class AsyncState : SemaphoreSlim
         {
-            /// <summary>The caller's buffer currently being used by the active async operation.</summary>
-            internal byte[] _buffer;
-            /// <summary>The caller's offset currently being used by the active async operation.</summary>
-            internal int _offset;
-            /// <summary>The caller's count currently being used by the active async operation.</summary>
-            internal int _count;
-            /// <summary>The last task successfully, synchronously returned task from ReadAsync.</summary>
-            internal Task<int> _lastSuccessfulReadTask;
+            internal ReadOnlyMemory<byte> ReadOnlyMemory;
+            internal Memory<byte> Memory;
 
             /// <summary>Initialize the AsyncState.</summary>
             internal AsyncState() : base(initialCount: 1, maxCount: 1) { }
-
-            /// <summary>Sets the active buffer, offset, and count.</summary>
-            internal void Update(byte[] buffer, int offset, int count)
-            {
-                _buffer = buffer;
-                _offset = offset;
-                _count = count;
-            }
         }
     }
 }
index 0d26082..eec11b4 100644 (file)
@@ -48,7 +48,6 @@ namespace System.IO
 
         private static unsafe IOCompletionCallback s_ioCallback = FileStreamCompletionSource.IOCallback;
 
-        private Task<int> _lastSynchronouslyCompletedTask = null;   // cached task for read ops that complete synchronously
         private Task _activeBufferOperation = null;                 // tracks in-progress async ops using the buffer
         private PreAllocatedOverlapped _preallocatedOverlapped;     // optimization for async ops to avoid per-op allocations
         private FileStreamCompletionSource _currentOverlappedOwner; // async op currently using the preallocated overlapped
@@ -313,7 +312,7 @@ namespace System.IO
             // If the buffer is already flushed, don't spin up the OS write
             if (_writePos == 0) return Task.CompletedTask;
 
-            Task flushTask = WriteInternalCoreAsync(GetBuffer(), 0, _writePos, cancellationToken);
+            Task flushTask = WriteAsyncInternalCore(new ReadOnlyMemory<byte>(GetBuffer(), 0, _writePos), cancellationToken);
             _writePos = 0;
 
             // Update the active buffer operation
@@ -485,20 +484,10 @@ namespace System.IO
             Debug.Assert(CanRead, "CanRead");
         }
 
-        [Conditional("DEBUG")]
-        private void AssertCanRead(byte[] buffer, int offset, int count)
-        {
-            AssertCanRead();
-            Debug.Assert(buffer != null, "buffer != null");
-            Debug.Assert(_writePos == 0, "_writePos == 0");
-            Debug.Assert(offset >= 0, "offset is negative");
-            Debug.Assert(count >= 0, "count is negative");
-        }
-
         /// <summary>Reads from the file handle into the buffer, overwriting anything in it.</summary>
         private int FillReadBufferForReadByte() =>
             _useAsyncIO ?
-                ReadNativeAsync(_buffer, 0, _bufferLength, 0, CancellationToken.None).GetAwaiter().GetResult() :
+                ReadNativeAsync(new Memory<byte>(_buffer), 0, CancellationToken.None).GetAwaiter().GetResult() :
                 ReadNative(_buffer);
 
         private unsafe int ReadNative(Span<byte> buffer)
@@ -586,7 +575,6 @@ namespace System.IO
                 {
                     if (_readPos > 0)
                     {
-                        //Console.WriteLine("Seek: seeked for 0, adjusting buffer back by: "+_readPos+"  _readLen: "+_readLen);
                         Buffer.BlockCopy(GetBuffer(), _readPos, GetBuffer(), 0, _readLength - _readPos);
                         _readLength -= _readPos;
                         _readPos = 0;
@@ -599,7 +587,6 @@ namespace System.IO
                 else if (oldPos - _readPos < pos && pos < oldPos + _readLength - _readPos)
                 {
                     int diff = (int)(pos - oldPos);
-                    //Console.WriteLine("Seek: diff was "+diff+", readpos was "+_readPos+"  adjusting buffer - shrinking by "+ (_readPos + diff));
                     Buffer.BlockCopy(GetBuffer(), _readPos + diff, GetBuffer(), 0, _readLength - (_readPos + diff));
                     _readLength -= (_readPos + diff);
                     _readPos = 0;
@@ -752,15 +739,9 @@ namespace System.IO
             return;
         }
 
-        private Task<int> ReadAsyncInternal(byte[] array, int offset, int numBytes, CancellationToken cancellationToken)
+        private Task<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken, out int synchronousResult)
         {
-            // If async IO is not supported on this platform or 
-            // if this Win32FileStream was not opened with FileOptions.Asynchronous.
-            if (!_useAsyncIO)
-            {
-                return base.ReadAsync(array, offset, numBytes, cancellationToken);
-            }
-
+            Debug.Assert(_useAsyncIO);
             if (!CanRead) throw Error.GetReadNotSupported();
 
             Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both.");
@@ -782,18 +763,17 @@ namespace System.IO
                 // pipes.  But don't completely ignore buffered data either.  
                 if (_readPos < _readLength)
                 {
-                    int n = _readLength - _readPos;
-                    if (n > numBytes) n = numBytes;
-                    Buffer.BlockCopy(GetBuffer(), _readPos, array, offset, n);
+                    int n = Math.Min(_readLength - _readPos, destination.Length);
+                    new Span<byte>(GetBuffer(), _readPos, n).CopyTo(destination.Span);
                     _readPos += n;
-
-                    // Return a completed task
-                    return TaskFromResultOrCache(n);
+                    synchronousResult = n;
+                    return null;
                 }
                 else
                 {
                     Debug.Assert(_writePos == 0, "Win32FileStream must not have buffered write data here!  Pipes should be unidirectional.");
-                    return ReadNativeAsync(array, offset, numBytes, 0, cancellationToken);
+                    synchronousResult = 0;
+                    return ReadNativeAsync(destination, 0, cancellationToken);
                 }
             }
 
@@ -814,17 +794,16 @@ namespace System.IO
                 // problem, and any async read less than 64K gets turned into a
                 // synchronous read by NT anyways...       -- 
 
-                if (numBytes < _bufferLength)
+                if (destination.Length < _bufferLength)
                 {
-                    Task<int> readTask = ReadNativeAsync(GetBuffer(), 0, _bufferLength, 0, cancellationToken);
+                    Task<int> readTask = ReadNativeAsync(new Memory<byte>(GetBuffer()), 0, cancellationToken);
                     _readLength = readTask.GetAwaiter().GetResult();
-                    int n = _readLength;
-                    if (n > numBytes) n = numBytes;
-                    Buffer.BlockCopy(GetBuffer(), 0, array, offset, n);
+                    int n = Math.Min(_readLength, destination.Length);
+                    new Span<byte>(GetBuffer(), 0, n).CopyTo(destination.Span);
                     _readPos = n;
 
-                    // Return a completed task (recycling the one above if possible)
-                    return (_readLength == n ? readTask : TaskFromResultOrCache(n));
+                    synchronousResult = n;
+                    return null;
                 }
                 else
                 {
@@ -832,20 +811,21 @@ namespace System.IO
                     // with our read buffer.  Throw away the read buffer's contents.
                     _readPos = 0;
                     _readLength = 0;
-                    return ReadNativeAsync(array, offset, numBytes, 0, cancellationToken);
+                    synchronousResult = 0;
+                    return ReadNativeAsync(destination, 0, cancellationToken);
                 }
             }
             else
             {
-                int n = _readLength - _readPos;
-                if (n > numBytes) n = numBytes;
-                Buffer.BlockCopy(GetBuffer(), _readPos, array, offset, n);
+                int n = Math.Min(_readLength - _readPos, destination.Length);
+                new Span<byte>(GetBuffer(), _readPos, n).CopyTo(destination.Span);
                 _readPos += n;
 
-                if (n >= numBytes)
+                if (n == destination.Length)
                 {
                     // Return a completed task
-                    return TaskFromResultOrCache(n);
+                    synchronousResult = n;
+                    return null;
                 }
                 else
                 {
@@ -856,18 +836,21 @@ namespace System.IO
                     // Throw away read buffer.
                     _readPos = 0;
                     _readLength = 0;
-                    return ReadNativeAsync(array, offset + n, numBytes - n, n, cancellationToken);
+                    synchronousResult = 0;
+                    return ReadNativeAsync(destination.Slice(n), n, cancellationToken);
                 }
             }
         }
 
-        unsafe private Task<int> ReadNativeAsync(byte[] bytes, int offset, int numBytes, int numBufferedBytesRead, CancellationToken cancellationToken)
+        unsafe private Task<int> ReadNativeAsync(Memory<byte> destination, int numBufferedBytesRead, CancellationToken cancellationToken)
         {
-            AssertCanRead(bytes, offset, numBytes);
+            AssertCanRead();
             Debug.Assert(_useAsyncIO, "ReadNativeAsync doesn't work on synchronous file streams!");
 
             // Create and store async stream class library specific data in the async result
-            FileStreamCompletionSource completionSource = new FileStreamCompletionSource(this, numBufferedBytesRead, bytes, cancellationToken);
+            FileStreamCompletionSource completionSource = destination.TryGetArray(out ArraySegment<byte> memoryArray) ?
+                new FileStreamCompletionSource(this, numBufferedBytesRead, memoryArray.Array) :
+                new MemoryFileStreamCompletionSource(this, numBufferedBytesRead, destination);
             NativeOverlapped* intOverlapped = completionSource.Overlapped;
 
             // Calculate position in the file we should be at after the read is done
@@ -878,12 +861,16 @@ namespace System.IO
                 // Make sure we are reading from the position that we think we are
                 VerifyOSHandlePosition();
 
-                if (_filePosition + numBytes > len)
+                if (_filePosition + destination.Length > len)
                 {
                     if (_filePosition <= len)
-                        numBytes = (int)(len - _filePosition);
+                    {
+                        destination = destination.Slice(0, (int)(len - _filePosition));
+                    }
                     else
-                        numBytes = 0;
+                    {
+                        destination = default(Memory<byte>);
+                    }
                 }
 
                 // Now set the position to read from in the NativeOverlapped struct
@@ -900,12 +887,13 @@ namespace System.IO
                 // the file pointer when writing to a UNC path!   
                 // So changed the code below to seek to an absolute 
                 // location, not a relative one.  ReadFile seems consistent though.
-                SeekCore(_fileHandle, numBytes, SeekOrigin.Current);
+                SeekCore(_fileHandle, destination.Length, SeekOrigin.Current);
             }
 
             // queue an async ReadFile operation and pass in a packed overlapped
             int errorCode = 0;
-            int r = ReadFileNative(_fileHandle, new Span<byte>(bytes, offset, numBytes), intOverlapped, out errorCode);
+            int r = ReadFileNative(_fileHandle, destination.Span, intOverlapped, out errorCode);
+
             // ReadFile, the OS version, will return 0 on failure.  But
             // my ReadFileNative wrapper returns -1.  My wrapper will return
             // the following:
@@ -916,7 +904,7 @@ namespace System.IO
             // read back from this call when using overlapped structures!  You must
             // not pass in a non-null lpNumBytesRead to ReadFile when using 
             // overlapped structures!  This is by design NT behavior.
-            if (r == -1 && numBytes != -1)
+            if (r == -1)
             {
                 // For pipes, when they hit EOF, they will come here.
                 if (errorCode == ERROR_BROKEN_PIPE)
@@ -947,10 +935,10 @@ namespace System.IO
                         throw Win32Marshal.GetExceptionForWin32Error(errorCode);
                     }
                 }
-                else
+                else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING
                 {
                     // Only once the IO is pending do we register for cancellation
-                    completionSource.RegisterForCancellation();
+                    completionSource.RegisterForCancellation(cancellationToken);
                 }
             }
             else
@@ -962,26 +950,19 @@ namespace System.IO
                 // synchronously or asynchronously.  We absolutely must not
                 // set asyncResult._numBytes here, since will never have correct
                 // results.  
-                //Console.WriteLine("ReadFile returned: "+r+" (0x"+Int32.Format(r, "x")+")  The IO completed synchronously, but the user callback was called on a separate thread");
             }
 
             return completionSource.Task;
         }
 
-        private Task WriteAsyncInternal(byte[] array, int offset, int numBytes, CancellationToken cancellationToken)
+        private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
         {
-            // If async IO is not supported on this platform or 
-            // if this Win32FileStream was not opened with FileOptions.Asynchronous.
-            if (!_useAsyncIO)
-            {
-                return base.WriteAsync(array, offset, numBytes, cancellationToken);
-            }
-
-            if (!CanWrite) throw Error.GetWriteNotSupported();
-
+            Debug.Assert(_useAsyncIO);
             Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both.");
             Debug.Assert(!_isPipe || (_readPos == 0 && _readLength == 0), "Win32FileStream must not have buffered data here!  Pipes should be unidirectional.");
 
+            if (!CanWrite) throw Error.GetWriteNotSupported();
+
             bool writeDataStoredInBuffer = false;
             if (!_isPipe) // avoid async buffering with pipes, as doing so can lead to deadlocks (see comments in ReadInternalAsyncCore)
             {
@@ -1005,10 +986,10 @@ namespace System.IO
                 // - There's no active flush operation, such that we don't have to worry about the existing buffer being in use.
                 // - And the data we're trying to write fits in the buffer, meaning it wasn't already filled by previous writes.
                 // In that case, just store it in the buffer.
-                if (numBytes < _bufferLength && !HasActiveBufferOperation && numBytes <= remainingBuffer)
+                if (source.Length < _bufferLength && !HasActiveBufferOperation && source.Length <= remainingBuffer)
                 {
-                    Buffer.BlockCopy(array, offset, GetBuffer(), _writePos, numBytes);
-                    _writePos += numBytes;
+                    source.Span.CopyTo(new Span<byte>(GetBuffer(), _writePos, source.Length));
+                    _writePos += source.Length;
                     writeDataStoredInBuffer = true;
 
                     // There is one special-but-common case, common because devs often use
@@ -1017,7 +998,7 @@ namespace System.IO
                     // then we're done and can return a completed task now.  But if we filled the buffer
                     // completely, we want to do the asynchronous flush/write as part of this operation 
                     // rather than waiting until the next write that fills the buffer.
-                    if (numBytes != remainingBuffer)
+                    if (source.Length != remainingBuffer)
                         return Task.CompletedTask;
 
                     Debug.Assert(_writePos == _bufferLength);
@@ -1073,40 +1054,37 @@ namespace System.IO
 
             // Finally, issue the write asynchronously, and return a Task that logically
             // represents the write operation, including any flushing done.
-            Task writeTask = WriteInternalCoreAsync(array, offset, numBytes, cancellationToken);
+            Task writeTask = WriteAsyncInternalCore(source, cancellationToken);
             return
                 (flushTask == null || flushTask.Status == TaskStatus.RanToCompletion) ? writeTask :
                 (writeTask.Status == TaskStatus.RanToCompletion) ? flushTask :
                 Task.WhenAll(flushTask, writeTask);
         }
 
-        private unsafe Task WriteInternalCoreAsync(byte[] bytes, int offset, int numBytes, CancellationToken cancellationToken)
+        private unsafe Task WriteAsyncInternalCore(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
         {
             Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed");
             Debug.Assert(CanWrite, "_parent.CanWrite");
-            Debug.Assert(bytes != null, "bytes != null");
             Debug.Assert(_readPos == _readLength, "_readPos == _readLen");
             Debug.Assert(_useAsyncIO, "WriteInternalCoreAsync doesn't work on synchronous file streams!");
-            Debug.Assert(offset >= 0, "offset is negative");
-            Debug.Assert(numBytes >= 0, "numBytes is negative");
 
             // Create and store async stream class library specific data in the async result
-            FileStreamCompletionSource completionSource = new FileStreamCompletionSource(this, 0, bytes, cancellationToken);
+            FileStreamCompletionSource completionSource = source.DangerousTryGetArray(out ArraySegment<byte> array) ?
+                new FileStreamCompletionSource(this, 0, array.Array) :
+                new MemoryFileStreamCompletionSource(this, 0, source);
             NativeOverlapped* intOverlapped = completionSource.Overlapped;
 
             if (CanSeek)
             {
                 // Make sure we set the length of the file appropriately.
                 long len = Length;
-                //Console.WriteLine("WriteInternalCoreAsync - Calculating end pos.  pos: "+pos+"  len: "+len+"  numBytes: "+numBytes);
 
                 // Make sure we are writing to the position that we think we are
                 VerifyOSHandlePosition();
 
-                if (_filePosition + numBytes > len)
+                if (_filePosition + source.Length > len)
                 {
-                    //Console.WriteLine("WriteInternalCoreAsync - Setting length to: "+(pos + numBytes));
-                    SetLengthCore(_filePosition + numBytes);
+                    SetLengthCore(_filePosition + source.Length);
                 }
 
                 // Now set the position to read from in the NativeOverlapped struct
@@ -1117,14 +1095,12 @@ namespace System.IO
                 // When using overlapped IO, the OS is not supposed to 
                 // touch the file pointer location at all.  We will adjust it 
                 // ourselves.  This isn't threadsafe.
-                SeekCore(_fileHandle, numBytes, SeekOrigin.Current);
+                SeekCore(_fileHandle, source.Length, SeekOrigin.Current);
             }
 
-            //Console.WriteLine("WriteInternalCoreAsync finishing.  pos: "+pos+"  numBytes: "+numBytes+"  _pos: "+_pos+"  Position: "+Position);
-
             int errorCode = 0;
             // queue an async WriteFile operation and pass in a packed overlapped
-            int r = WriteFileNative(_fileHandle, new ReadOnlySpan<byte>(bytes, offset, numBytes), intOverlapped, out errorCode);
+            int r = WriteFileNative(_fileHandle, source.Span, intOverlapped, out errorCode);
 
             // WriteFile, the OS version, will return 0 on failure.  But
             // my WriteFileNative wrapper returns -1.  My wrapper will return
@@ -1136,10 +1112,8 @@ namespace System.IO
             // written back from this call when using overlapped IO!  You must
             // not pass in a non-null lpNumBytesWritten to WriteFile when using 
             // overlapped structures!  This is ByDesign NT behavior.
-            if (r == -1 && numBytes != -1)
+            if (r == -1)
             {
-                //Console.WriteLine("WriteFile returned 0;  Write will complete asynchronously (if errorCode==3e5)  errorCode: 0x{0:x}", errorCode);
-
                 // For pipes, when they are closed on the other side, they will come here.
                 if (errorCode == ERROR_NO_DATA)
                 {
@@ -1166,10 +1140,10 @@ namespace System.IO
                         throw Win32Marshal.GetExceptionForWin32Error(errorCode);
                     }
                 }
-                else // ERROR_IO_PENDING
+                else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING
                 {
                     // Only once the IO is pending do we register for cancellation
-                    completionSource.RegisterForCancellation();
+                    completionSource.RegisterForCancellation(cancellationToken);
                 }
             }
             else
@@ -1181,7 +1155,6 @@ namespace System.IO
                 // synchronously or asynchronously.  We absolutely must not
                 // set asyncResult._numBytes here, since will never have correct
                 // results.  
-                //Console.WriteLine("WriteFile returned: "+r+" (0x"+Int32.Format(r, "x")+")  The IO completed synchronously, but the user callback was called on another thread.");
             }
 
             return completionSource.Task;
@@ -1646,20 +1619,6 @@ namespace System.IO
             }
         }
 
-        private Task<int> TaskFromResultOrCache(int result)
-        {
-            Task<int> completedTask = _lastSynchronouslyCompletedTask;
-            Debug.Assert(completedTask == null || completedTask.Status == TaskStatus.RanToCompletion, "Cached task should have completed successfully");
-
-            if ((completedTask == null) || (completedTask.Result != result))
-            {
-                completedTask = Task.FromResult(result);
-                _lastSynchronouslyCompletedTask = completedTask;
-            }
-
-            return completedTask;
-        }
-
         private void LockInternal(long position, long length)
         {
             int positionLow = unchecked((int)(position));
index 6e77f09..65c63bc 100644 (file)
@@ -51,6 +51,9 @@ namespace System.IO
         /// </summary>
         private readonly bool _useAsyncIO;
 
+        /// <summary>cached task for read ops that complete synchronously</summary>
+        private Task<int> _lastSynchronouslyCompletedTask = null;
+
         /// <summary>
         /// Currently cached position in the stream.  This should always mirror the underlying file's actual position,
         /// and should only ever be out of sync if another stream with access to this same file manipulates it, at which
@@ -296,7 +299,7 @@ namespace System.IO
         {
             ValidateReadWriteArgs(array, offset, count);
             return _useAsyncIO ?
-                ReadAsyncInternal(array, offset, count, CancellationToken.None).GetAwaiter().GetResult() :
+                ReadAsyncTask(array, offset, count, CancellationToken.None).GetAwaiter().GetResult() :
                 ReadSpan(new Span<byte>(array, offset, count));
         }
 
@@ -334,10 +337,12 @@ namespace System.IO
                 throw new ArgumentException(SR.Argument_InvalidOffLen /*, no good single parameter name to pass*/);
 
             // If we have been inherited into a subclass, the following implementation could be incorrect
-            // since it does not call through to Read() or ReadAsync() which a subclass might have overridden.  
+            // since it does not call through to Read() which a subclass might have overridden.  
             // To be safe we will only use this implementation in cases where we know it is safe to do so,
             // and delegate to our base class (which will call into Read/ReadAsync) when we are not sure.
-            if (GetType() != typeof(FileStream))
+            // Similarly, if we weren't opened for asynchronous I/O, call to the base implementation so that
+            // Read is invoked asynchronously.
+            if (GetType() != typeof(FileStream) || !_useAsyncIO)
                 return base.ReadAsync(buffer, offset, count, cancellationToken);
 
             if (cancellationToken.IsCancellationRequested)
@@ -346,7 +351,51 @@ namespace System.IO
             if (IsClosed)
                 throw Error.GetFileNotOpen();
 
-            return ReadAsyncInternal(buffer, offset, count, cancellationToken);
+            return ReadAsyncTask(buffer, offset, count, cancellationToken);
+        }
+
+        public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            if (!_useAsyncIO || GetType() != typeof(FileStream))
+            {
+                // If we're not using async I/O, delegate to the base, which will queue a call to Read.
+                // Or if this isn't a concrete FileStream, a derived type may have overridden ReadAsync(byte[],...),
+                // which was introduced first, so delegate to the base which will delegate to that.
+                return base.ReadAsync(destination, cancellationToken);
+            }
+
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+            }
+
+            if (IsClosed)
+            {
+                throw Error.GetFileNotOpen();
+            }
+
+            Task<int> t = ReadAsyncInternal(destination, cancellationToken, out int synchronousResult);
+            return t != null ?
+                new ValueTask<int>(t) :
+                new ValueTask<int>(synchronousResult);
+        }
+
+        private Task<int> ReadAsyncTask(byte[] array, int offset, int count, CancellationToken cancellationToken)
+        {
+            Task<int> t = ReadAsyncInternal(new Memory<byte>(array, offset, count), cancellationToken, out int synchronousResult);
+
+            if (t == null)
+            {
+                t = _lastSynchronouslyCompletedTask;
+                Debug.Assert(t == null || t.IsCompletedSuccessfully, "Cached task should have completed successfully");
+
+                if (t == null || t.Result != synchronousResult)
+                {
+                    _lastSynchronouslyCompletedTask = t = Task.FromResult(synchronousResult);
+                }
+            }
+
+            return t;
         }
 
         public override void Write(byte[] array, int offset, int count)
@@ -354,7 +403,7 @@ namespace System.IO
             ValidateReadWriteArgs(array, offset, count);
             if (_useAsyncIO)
             {
-                WriteAsyncInternal(array, offset, count, CancellationToken.None).GetAwaiter().GetResult();
+                WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, count), CancellationToken.None).GetAwaiter().GetResult();
             }
             else
             {
@@ -399,7 +448,7 @@ namespace System.IO
             // since it does not call through to Write() or WriteAsync() which a subclass might have overridden.  
             // To be safe we will only use this implementation in cases where we know it is safe to do so,
             // and delegate to our base class (which will call into Write/WriteAsync) when we are not sure.
-            if (GetType() != typeof(FileStream))
+            if (!_useAsyncIO || GetType() != typeof(FileStream))
                 return base.WriteAsync(buffer, offset, count, cancellationToken);
 
             if (cancellationToken.IsCancellationRequested)
@@ -408,7 +457,30 @@ namespace System.IO
             if (IsClosed)
                 throw Error.GetFileNotOpen();
 
-            return WriteAsyncInternal(buffer, offset, count, cancellationToken);
+            return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
+        }
+
+        public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            if (!_useAsyncIO || GetType() != typeof(FileStream))
+            {
+                // If we're not using async I/O, delegate to the base, which will queue a call to Write.
+                // Or if this isn't a concrete FileStream, a derived type may have overridden WriteAsync(byte[],...),
+                // which was introduced first, so delegate to the base which will delegate to that.
+                return base.WriteAsync(source, cancellationToken);
+            }
+
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return Task.FromCanceled<int>(cancellationToken);
+            }
+
+            if (IsClosed)
+            {
+                throw Error.GetFileNotOpen();
+            }
+
+            return WriteAsyncInternal(source, cancellationToken);
         }
 
         /// <summary>
@@ -744,7 +816,7 @@ namespace System.IO
             if (!IsAsync)
                 return base.BeginRead(array, offset, numBytes, callback, state);
             else
-                return TaskToApm.Begin(ReadAsyncInternal(array, offset, numBytes, CancellationToken.None), callback, state);
+                return TaskToApm.Begin(ReadAsyncTask(array, offset, numBytes, CancellationToken.None), callback, state);
         }
 
         public override IAsyncResult BeginWrite(byte[] array, int offset, int numBytes, AsyncCallback callback, object state)
@@ -764,7 +836,7 @@ namespace System.IO
             if (!IsAsync)
                 return base.BeginWrite(array, offset, numBytes, callback, state);
             else
-                return TaskToApm.Begin(WriteAsyncInternal(array, offset, numBytes, CancellationToken.None), callback, state);
+                return TaskToApm.Begin(WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, numBytes), CancellationToken.None), callback, state);
         }
 
         public override int EndRead(IAsyncResult asyncResult)
index 7dca133..e3871bc 100644 (file)
@@ -2,11 +2,11 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using System.Security;
+using System.Buffers;
+using System.Diagnostics;
+using System.Runtime.InteropServices;
 using System.Threading;
 using System.Threading.Tasks;
-using System.Runtime.InteropServices;
-using System.Diagnostics;
 
 namespace System.IO
 {
@@ -15,7 +15,7 @@ namespace System.IO
         // This is an internal object extending TaskCompletionSource with fields
         // for all of the relevant data necessary to complete the IO operation.
         // This is used by IOCallback and all of the async methods.
-        unsafe private sealed class FileStreamCompletionSource : TaskCompletionSource<int>
+        private unsafe class FileStreamCompletionSource : TaskCompletionSource<int>
         {
             private const long NoResult = 0;
             private const long ResultSuccess = (long)1 << 32;
@@ -28,7 +28,6 @@ namespace System.IO
 
             private readonly FileStream _stream;
             private readonly int _numBufferedBytes;
-            private readonly CancellationToken _cancellationToken;
             private CancellationTokenRegistration _cancellationRegistration;
 #if DEBUG
             private bool _cancellationHasBeenRegistered;
@@ -37,20 +36,19 @@ namespace System.IO
             private long _result; // Using long since this needs to be used in Interlocked APIs
 
             // Using RunContinuationsAsynchronously for compat reasons (old API used Task.Factory.StartNew for continuations)
-            internal FileStreamCompletionSource(FileStream stream, int numBufferedBytes, byte[] bytes, CancellationToken cancellationToken)
+            internal FileStreamCompletionSource(FileStream stream, int numBufferedBytes, byte[] bytes)
                 : base(TaskCreationOptions.RunContinuationsAsynchronously)
             {
                 _numBufferedBytes = numBufferedBytes;
                 _stream = stream;
                 _result = NoResult;
-                _cancellationToken = cancellationToken;
 
-                // Create the native overlapped. We try to use the preallocated overlapped if possible: 
-                // it's possible if the byte buffer is the same one that's associated with the preallocated overlapped 
-                // and if no one else is currently using the preallocated overlapped.  This is the fast-path for cases 
-                // where the user-provided buffer is smaller than the FileStream's buffer (such that the FileStream's 
+                // Create the native overlapped. We try to use the preallocated overlapped if possible: it's possible if the byte
+                // buffer is null (there's nothing to pin) or the same one that's associated with the preallocated overlapped (and
+                // thus is already pinned) and if no one else is currently using the preallocated overlapped.  This is the fast-path
+                // for cases where the user-provided buffer is smaller than the FileStream's buffer (such that the FileStream's
                 // buffer is used) and where operations on the FileStream are not being performed concurrently.
-                _overlapped = ReferenceEquals(bytes, _stream._buffer) && _stream.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
+                _overlapped = (bytes == null || ReferenceEquals(bytes, _stream._buffer)) && _stream.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
                     _stream._fileHandle.ThreadPoolBinding.AllocateNativeOverlapped(_stream._preallocatedOverlapped) :
                     _stream._fileHandle.ThreadPoolBinding.AllocateNativeOverlapped(s_ioCallback, this, bytes);
                 Debug.Assert(_overlapped != null, "AllocateNativeOverlapped returned null");
@@ -67,15 +65,16 @@ namespace System.IO
                 TrySetResult(numBytes + _numBufferedBytes);
             }
 
-            public void RegisterForCancellation()
+            public void RegisterForCancellation(CancellationToken cancellationToken)
             {
 #if DEBUG
+                Debug.Assert(cancellationToken.CanBeCanceled);
                 Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
                 _cancellationHasBeenRegistered = true;
 #endif
 
-                // Quick check to make sure that the cancellation token supports cancellation, and that the IO hasn't completed
-                if ((_cancellationToken.CanBeCanceled) && (_overlapped != null))
+                // Quick check to make sure the IO hasn't completed
+                if (_overlapped != null)
                 {
                     var cancelCallback = s_cancelCallback;
                     if (cancelCallback == null) s_cancelCallback = cancelCallback = Cancel;
@@ -84,7 +83,7 @@ namespace System.IO
                     long packedResult = Interlocked.CompareExchange(ref _result, RegisteringCancellation, NoResult);
                     if (packedResult == NoResult)
                     {
-                        _cancellationRegistration = _cancellationToken.Register(cancelCallback, this);
+                        _cancellationRegistration = cancellationToken.Register(cancelCallback, this);
 
                         // Switch the result, just in case IO completed while we were setting the registration
                         packedResult = Interlocked.Exchange(ref _result, NoResult);
@@ -104,7 +103,7 @@ namespace System.IO
                 }
             }
 
-            internal void ReleaseNativeResource()
+            internal virtual void ReleaseNativeResource()
             {
                 // Ensure that cancellation has been completed and cleaned up.
                 _cancellationRegistration.Dispose();
@@ -172,6 +171,7 @@ namespace System.IO
             private void CompleteCallback(ulong packedResult)
             {
                 // Free up the native resource and cancellation registration
+                CancellationToken cancellationToken = _cancellationRegistration.Token; // access before disposing registration
                 ReleaseNativeResource();
 
                 // Unpack the result and send it to the user
@@ -181,7 +181,7 @@ namespace System.IO
                     int errorCode = unchecked((int)(packedResult & uint.MaxValue));
                     if (errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
                     {
-                        TrySetCanceled(_cancellationToken.IsCancellationRequested ? _cancellationToken : new CancellationToken(true));
+                        TrySetCanceled(cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(true));
                     }
                     else
                     {
@@ -218,5 +218,28 @@ namespace System.IO
                 }
             }
         }
+
+        /// <summary>
+        /// Extends <see cref="FileStreamCompletionSource"/> with to support disposing of a
+        /// <see cref="MemoryHandle"/> when the operation has completed.  This should only be used
+        /// when memory doesn't wrap a byte[].
+        /// </summary>
+        private sealed class MemoryFileStreamCompletionSource : FileStreamCompletionSource
+        {
+            private MemoryHandle _handle; // mutable struct; do not make this readonly
+
+            internal MemoryFileStreamCompletionSource(FileStream stream, int numBufferedBytes, ReadOnlyMemory<byte> memory) :
+                base(stream, numBufferedBytes, bytes: null) // this type handles the pinning, so null is passed for bytes
+            {
+                Debug.Assert(!memory.DangerousTryGetArray(out ArraySegment<byte> array), "The base should be used directly if we can get the array.");
+                _handle = memory.Retain(pin: true);
+            }
+
+            internal override void ReleaseNativeResource()
+            {
+                _handle.Dispose();
+                base.ReleaseNativeResource();
+            }
+        }
     }
 }
index f808ab4..b899951 100644 (file)
@@ -471,6 +471,28 @@ namespace System.IO
         }
 
         /// <summary>
+        /// Reads bytes from stream and puts them into the buffer
+        /// </summary>
+        /// <param name="destination">Buffer to read the bytes to.</param>
+        /// <param name="cancellationToken">Token that can be used to cancel this operation.</param>
+        public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+            }
+
+            try
+            {
+                return new ValueTask<int>(Read(destination.Span));
+            }
+            catch (Exception ex)
+            {
+                return new ValueTask<int>(Task.FromException<int>(ex));
+            }
+        }
+
+        /// <summary>
         /// Returns the byte at the stream current Position and advances the Position.
         /// </summary>
         /// <returns></returns>
@@ -729,6 +751,29 @@ namespace System.IO
         }
 
         /// <summary>
+        /// Writes buffer into the stream. The operation completes synchronously.
+        /// </summary>
+        /// <param name="buffer">Buffer that will be written.</param>
+        /// <param name="cancellationToken">Token that can be used to cancel the operation.</param>
+        public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return Task.FromCanceled(cancellationToken);
+            }
+
+            try
+            {
+                Write(source.Span);
+                return Task.CompletedTask;
+            }
+            catch (Exception ex)
+            {
+                return Task.FromException(ex);
+            }
+        }
+
+        /// <summary>
         /// Writes a byte to the stream and advances the current Position.
         /// </summary>
         /// <param name="value"></param>
index f3e743a..2699912 100644 (file)
@@ -210,11 +210,21 @@ namespace System.IO
             return _unmanagedStream.ReadAsync(buffer, offset, count, cancellationToken);
         }
 
+        public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            return _unmanagedStream.ReadAsync(destination, cancellationToken);
+        }
+
 
         public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, CancellationToken cancellationToken)
         {
             return _unmanagedStream.WriteAsync(buffer, offset, count, cancellationToken);
         }
+
+        public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            return _unmanagedStream.WriteAsync(source, cancellationToken);
+        }
     }  // class UnmanagedMemoryStreamWrapper
 }  // namespace
 
index 91662c5..d97d3f4 100644 (file)
@@ -456,6 +456,27 @@ namespace System.IO
             }
         }
 
+        public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+            }
+
+            try
+            {
+                return new ValueTask<int>(Read(destination.Span));
+            }
+            catch (OperationCanceledException oce)
+            {
+                return new ValueTask<int>(Task.FromCancellation<int>(oce));
+            }
+            catch (Exception exception)
+            {
+                return new ValueTask<int>(Task.FromException<int>(exception));
+            }
+        }
+
 
         public override int ReadByte()
         {
@@ -742,6 +763,28 @@ namespace System.IO
             }
         }
 
+        public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return Task.FromCanceled(cancellationToken);
+            }
+
+            try
+            {
+                Write(source.Span);
+                return Task.CompletedTask;
+            }
+            catch (OperationCanceledException oce)
+            {
+                return Task.FromCancellation<VoidTaskResult>(oce);
+            }
+            catch (Exception exception)
+            {
+                return Task.FromException(exception);
+            }
+        }
+
         public override void WriteByte(byte value)
         {
             if (!_isOpen) __Error.StreamIsClosed();
index 428dd5e..ae8ca51 100644 (file)
@@ -416,6 +416,33 @@ namespace System.IO
                         : BeginEndReadAsync(buffer, offset, count);
         }
 
+        public virtual ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            if (destination.TryGetArray(out ArraySegment<byte> array))
+            {
+                return new ValueTask<int>(ReadAsync(array.Array, array.Offset, array.Count, cancellationToken));
+            }
+            else
+            {
+                byte[] buffer = ArrayPool<byte>.Shared.Rent(destination.Length);
+                return FinishReadAsync(ReadAsync(buffer, 0, destination.Length, cancellationToken), buffer, destination);
+
+                async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
+                {
+                    try
+                    {
+                        int result = await readTask.ConfigureAwait(false);
+                        new Span<byte>(localBuffer, 0, result).CopyTo(localDestination.Span);
+                        return result;
+                    }
+                    finally
+                    {
+                        ArrayPool<byte>.Shared.Return(localBuffer);
+                    }
+                }
+            }
+        }
+
         [MethodImplAttribute(MethodImplOptions.InternalCall)]
         private extern bool HasOverriddenBeginEndRead();
 
@@ -694,8 +721,6 @@ namespace System.IO
             return WriteAsync(buffer, offset, count, CancellationToken.None);
         }
 
-
-
         public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
         {
             // If cancellation was requested, bail early with an already completed task.
@@ -705,6 +730,32 @@ namespace System.IO
                         : BeginEndWriteAsync(buffer, offset, count);
         }
 
+        public virtual Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+        {
+            if (source.DangerousTryGetArray(out ArraySegment<byte> array))
+            {
+                return WriteAsync(array.Array, array.Offset, array.Count, cancellationToken);
+            }
+            else
+            {
+                byte[] buffer = ArrayPool<byte>.Shared.Rent(source.Length);
+                source.Span.CopyTo(buffer);
+                return FinishWriteAsync(WriteAsync(buffer, 0, source.Length, cancellationToken), buffer);
+
+                async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
+                {
+                    try
+                    {
+                        await writeTask.ConfigureAwait(false);
+                    }
+                    finally
+                    {
+                        ArrayPool<byte>.Shared.Return(localBuffer);
+                    }
+                }
+            }
+        }
+
         [MethodImplAttribute(MethodImplOptions.InternalCall)]
         private extern bool HasOverriddenBeginEndWrite();
 
@@ -991,12 +1042,13 @@ namespace System.IO
 
             public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
             {
-                var nullReadTask = s_nullReadTask;
-                if (nullReadTask == null)
-                    s_nullReadTask = nullReadTask = new Task<int>(false, 0, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, CancellationToken.None); // benign race condition
-                return nullReadTask;
+                return AsyncTaskMethodBuilder<int>.s_defaultResultTask;
+            }
+
+            public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
+            {
+                return new ValueTask<int>(0);
             }
-            private static Task<int> s_nullReadTask;
 
             public override int ReadByte()
             {
@@ -1018,6 +1070,13 @@ namespace System.IO
                     Task.CompletedTask;
             }
 
+            public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
+            {
+                return cancellationToken.IsCancellationRequested ?
+                    Task.FromCanceled(cancellationToken) :
+                    Task.CompletedTask;
+            }
+
             public override void WriteByte(byte value)
             {
             }
index 61dc38d..e0a5258 100644 (file)
@@ -37,6 +37,13 @@ namespace System.Threading
         }
 
         /// <summary>
+        /// Gets the <see cref="CancellationToken"/> with which this registration is associated.  If the
+        /// registration isn't associated with a token (such as after the registration has been disposed),
+        /// this will return a default token.
+        /// </summary>
+        internal CancellationToken Token => _node?.Partition.Source.Token ?? default(CancellationToken);
+
+        /// <summary>
         /// Disposes of the registration and unregisters the target callback from the associated 
         /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see>.
         /// </summary>