Several Stream.Read/WriteAsync improvements
authorstephentoub <stoub@microsoft.com>
Mon, 18 Jan 2016 15:44:35 +0000 (10:44 -0500)
committerstephentoub <stoub@microsoft.com>
Tue, 19 Jan 2016 12:18:31 +0000 (07:18 -0500)
Stream.XxAsync are currently implemented as a wrapper around Stream.Begin/EndXx.  When a derived class overrides XxAsync to do its own async implementation, there's no issue.  When a derived class overrides Begin/EndXx but not XxAsync, there's no issue (the base implementation does what it needs to do, wrapping Begin/EndXx).  However, if the derived implementation doesn't override either XxAsync or Begin/EndXx, there are a few issues.

First, there's unnecessary cost.  The base Begin/EndXx methods queue a Task to call the corresponding Read/Write method.  But then the XxAsync method create another Task to wrap the Begin/End method invocation.  This means we're allocating and completing two tasks, when we only needed to do one.

Second, task wait inlining is affected.  Because Read/WriteAsync aren't returning the original queued delegate-backed task but rather a promise task, Wait()'ing on the returned task blocks until the operation completes on another thread.  If the original delegate-backed task were returned, then Wait()'ing on the task could potentially "inline" its execution onto the current thread.

Third, there's unnecessary blocking if there are other outstanding async operations on the instance.  Since Begin/EndXx were introduced, they track whether an operation is in progress, and subsequent calls to BeginXx while an operation is in progress blocks synchronously.  Since Read/WriteAsync just wrap the virtual Begin/End methods, they inherit this behavior.

This commit addresses all three issues for CoreCLR.  The Begin/EndXx methods aren't exposed from Stream in the new contracts, and as a result outside of mscorlib we don't need to be concerned about these methods being overridden.  Thus, the commit adds an optimized path that simply returns the original delegate-backed task rather than wrapping it.  This avoids the unnecessary task overheads and duplication, and it enables wait inlining if someone happens to wait on it.  Further, since we're no longer subject to the behaviors of Begin/End, we can change the serialization to be done asynchronously rather than synchronously.

src/mscorlib/src/System/IO/FileStream.cs
src/mscorlib/src/System/IO/Stream.cs

index 3258940b53501e9ada23fe6faf66737f73141baf..0952300dfb6ff8048943df7ec09ce3b7b0571679 100644 (file)
@@ -1778,6 +1778,9 @@ namespace System.IO {
             return;
         }
 
+#if FEATURE_CORECLR
+        internal override bool OverridesBeginEnd { get { return true; } }
+#endif
 
         [System.Security.SecuritySafeCritical]  // auto-generated
         [HostProtection(ExternalThreading = true)]
index a39e438281854c6911ed6ecbc68187717b51feb4..d62bcbd85542b39d3f19b9844f0429d82591b195 100644 (file)
@@ -285,15 +285,29 @@ namespace System.IO {
             return new ManualResetEvent(false);
         }
 
-        [HostProtection(ExternalThreading=true)]
+        internal virtual bool OverridesBeginEnd 
+        { 
+            get 
+            {
+#if FEATURE_CORECLR
+                return false; // methods aren't exposed outside of mscorlib
+#else
+                return true; // have to assume they are overridden as they're exposed
+#endif
+            }
+        }
+
+            [HostProtection(ExternalThreading=true)]
         public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
         {
             Contract.Ensures(Contract.Result<IAsyncResult>() != null);
-            return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
+            return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
         }
 
         [HostProtection(ExternalThreading = true)]
-        internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
+        internal IAsyncResult BeginReadInternal(
+            byte[] buffer, int offset, int count, AsyncCallback callback, Object state, 
+            bool serializeAsynchronously, bool apm)
         {
             Contract.Ensures(Contract.Result<IAsyncResult>() != null);
             if (!CanRead) __Error.ReadNotSupported();
@@ -326,7 +340,7 @@ namespace System.IO {
 
             // Create the task to asynchronously do a Read.  This task serves both
             // as the asynchronous work item and as the IAsyncResult returned to the user.
-            var asyncResult = new ReadWriteTask(true /*isRead*/, delegate
+            var asyncResult = new ReadWriteTask(true /*isRead*/, apm, delegate
             {
                 // The ReadWriteTask stores all of the parameters to pass to Read.
                 // As we're currently inside of it, we can get the current task
@@ -334,10 +348,23 @@ namespace System.IO {
                 var thisTask = Task.InternalCurrent as ReadWriteTask;
                 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
 
-                // Do the Read and return the number of bytes read
-                var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
-                thisTask.ClearBeginState(); // just to help alleviate some memory pressure
-                return bytesRead;
+                try
+                {
+                    // Do the Read and return the number of bytes read
+                    return thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
+                }
+                finally
+                {
+                    // If this implementation is part of Begin/EndXx, then the EndXx method will handle
+                    // finishing the async operation.  However, if this is part of XxAsync, then there won't
+                    // be an end method, and this task is responsible for cleaning up.
+                    if (!thisTask._apm)
+                    {
+                        thisTask._stream.FinishTrackingAsyncOperation();
+                    }
+      
+                    thisTask.ClearBeginState(); // just to help alleviate some memory pressure
+                }
             }, state, this, buffer, offset, count, callback);
 
             // Schedule it
@@ -388,9 +415,7 @@ namespace System.IO {
             }
             finally
             {
-                _activeReadWriteTask = null;
-                Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
-                _asyncActiveSemaphore.Release();
+                FinishTrackingAsyncOperation();
             }
 #endif
         }
@@ -414,7 +439,12 @@ namespace System.IO {
         }
 
         private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
-        {            
+        {
+            if (!OverridesBeginEnd)
+            {
+                return (Task<Int32>)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
+            }
+
             return TaskFactory<Int32>.FromAsyncTrim(
                         this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
                         (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
@@ -434,11 +464,13 @@ namespace System.IO {
         public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
         {
             Contract.Ensures(Contract.Result<IAsyncResult>() != null);
-            return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
+            return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
         }
 
         [HostProtection(ExternalThreading = true)]
-        internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
+        internal IAsyncResult BeginWriteInternal(
+            byte[] buffer, int offset, int count, AsyncCallback callback, Object state, 
+            bool serializeAsynchronously, bool apm)
         {
             Contract.Ensures(Contract.Result<IAsyncResult>() != null);
             if (!CanWrite) __Error.WriteNotSupported();
@@ -470,7 +502,7 @@ namespace System.IO {
 
             // Create the task to asynchronously do a Write.  This task serves both
             // as the asynchronous work item and as the IAsyncResult returned to the user.
-            var asyncResult = new ReadWriteTask(false /*isRead*/, delegate
+            var asyncResult = new ReadWriteTask(false /*isRead*/, apm, delegate
             {
                 // The ReadWriteTask stores all of the parameters to pass to Write.
                 // As we're currently inside of it, we can get the current task
@@ -478,10 +510,24 @@ namespace System.IO {
                 var thisTask = Task.InternalCurrent as ReadWriteTask;
                 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
 
-                // Do the Write
-                thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);  
-                thisTask.ClearBeginState(); // just to help alleviate some memory pressure
-                return 0; // not used, but signature requires a value be returned
+                try
+                {
+                    // Do the Write
+                    thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);  
+                    return 0; // not used, but signature requires a value be returned
+                }
+                finally
+                {
+                    // If this implementation is part of Begin/EndXx, then the EndXx method will handle
+                    // finishing the async operation.  However, if this is part of XxAsync, then there won't
+                    // be an end method, and this task is responsible for cleaning up.
+                    if (!thisTask._apm)
+                    {
+                        thisTask._stream.FinishTrackingAsyncOperation();
+                    }
+
+                    thisTask.ClearBeginState(); // just to help alleviate some memory pressure
+                }
             }, state, this, buffer, offset, count, callback);
 
             // Schedule it
@@ -534,6 +580,13 @@ namespace System.IO {
             readWriteTask.m_taskScheduler = TaskScheduler.Default;
             readWriteTask.ScheduleAndStart(needsProtection: false);
         }
+
+        private void FinishTrackingAsyncOperation()
+        {
+             _activeReadWriteTask = null;
+            Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
+            _asyncActiveSemaphore.Release();
+        }
 #endif
 
         public virtual void EndWrite(IAsyncResult asyncResult)
@@ -574,9 +627,7 @@ namespace System.IO {
             }
             finally
             {
-                _activeReadWriteTask = null;
-                Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
-                _asyncActiveSemaphore.Release();
+                FinishTrackingAsyncOperation();
             }
 #endif
         }
@@ -600,7 +651,8 @@ namespace System.IO {
         // with a single allocation.
         private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
         {
-            internal readonly bool _isRead;
+            internal readonly bool _isRead; 
+            internal readonly bool _apm; // true if this is from Begin/EndXx; false if it's from XxAsync
             internal Stream _stream;
             internal byte [] _buffer;
             internal int _offset;
@@ -618,6 +670,7 @@ namespace System.IO {
             [MethodImpl(MethodImplOptions.NoInlining)]
             public ReadWriteTask(
                 bool isRead,
+                bool apm,
                 Func<object,int> function, object state,
                 Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
                 base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
@@ -631,6 +684,7 @@ namespace System.IO {
 
                 // Store the arguments
                 _isRead = isRead;
+                _apm = apm;
                 _stream = stream;
                 _buffer = buffer;
                 _offset = offset;
@@ -710,7 +764,12 @@ namespace System.IO {
 
 
         private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
-        {            
+        {
+            if (!OverridesBeginEnd)
+            {
+                return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
+            }
+
             return TaskFactory<VoidTaskResult>.FromAsyncTrim(
                         this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count },
                         (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
@@ -1222,7 +1281,7 @@ namespace System.IO {
                     // _stream due to this call blocked while holding the lock.
                     return _overridesBeginRead.Value ?
                         _stream.BeginRead(buffer, offset, count, callback, state) :
-                        _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
+                        _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
                 }
             }
         
@@ -1280,7 +1339,7 @@ namespace System.IO {
                     // _stream due to this call blocked while holding the lock.
                     return _overridesBeginWrite.Value ?
                         _stream.BeginWrite(buffer, offset, count, callback, state) :
-                        _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
+                        _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
                 }
             }