Use asynchronous lock in Parallel.ForEachAsync with synchronous enumerable (#82501)
authorStephen Toub <stoub@microsoft.com>
Thu, 23 Feb 2023 20:54:44 +0000 (15:54 -0500)
committerGitHub <noreply@github.com>
Thu, 23 Feb 2023 20:54:44 +0000 (15:54 -0500)
* Use asynchronous lock in Parallel.ForEachAsync with synchronous enumerable

Avoid blocking threads while waiting for access to the enumerator in the case of a slower MoveNext.

* Update src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs

Co-authored-by: Tanner Gooding <tagoo@outlook.com>
---------

Co-authored-by: Tanner Gooding <tagoo@outlook.com>
src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs

index 619cd14..b13157a 100644 (file)
@@ -93,17 +93,23 @@ namespace System.Threading.Tasks
                     // Continue to loop while there are more elements to be processed.
                     while (!state.Cancellation.IsCancellationRequested)
                     {
-                        // Get the next element from the enumerator.  This requires asynchronously locking around MoveNextAsync/Current.
+                        // Get the next element from the enumerator.  This requires asynchronously locking around MoveNext/Current.
                         TSource element;
-                        lock (state)
+                        await state.AcquireLock();
+                        try
                         {
-                            if (!state.Enumerator.MoveNext())
+                            if (state.Cancellation.IsCancellationRequested || // check now that the lock has been acquired
+                                !state.Enumerator.MoveNext())
                             {
                                 break;
                             }
 
                             element = state.Enumerator.Current;
                         }
+                        finally
+                        {
+                            state.ReleaseLock();
+                        }
 
                         // If the remaining dop allows it and we've not yet queued the next worker, do so now.  We wait
                         // until after we've grabbed an item from the enumerator to a) avoid unnecessary contention on the
@@ -249,20 +255,11 @@ namespace System.Threading.Tasks
                     {
                         // Get the next element from the enumerator.  This requires asynchronously locking around MoveNextAsync/Current.
                         TSource element;
+                        await state.AcquireLock();
                         try
                         {
-                            // TODO https://github.com/dotnet/runtime/issues/22144:
-                            // Use a no-throwing await if/when one is available built-in.
-                            await state.Lock.WaitAsync(state.Cancellation.Token);
-                        }
-                        catch (OperationCanceledException)
-                        {
-                            break;
-                        }
-
-                        try
-                        {
-                            if (!await state.Enumerator.MoveNextAsync())
+                            if (state.Cancellation.IsCancellationRequested || // check now that the lock has been acquired
+                                !await state.Enumerator.MoveNextAsync())
                             {
                                 break;
                             }
@@ -271,7 +268,7 @@ namespace System.Threading.Tasks
                         }
                         finally
                         {
-                            state.Lock.Release();
+                            state.ReleaseLock();
                         }
 
                         // If the remaining dop allows it and we've not yet queued the next worker, do so now.  We wait
@@ -354,6 +351,8 @@ namespace System.Threading.Tasks
             private readonly TaskScheduler _scheduler;
             /// <summary>The <see cref="ExecutionContext"/> present at the time of the ForEachAsync invocation.  This is only used if on the default scheduler.</summary>
             private readonly ExecutionContext? _executionContext;
+            /// <summary>Semaphore used to provide exclusive access to the enumerator.</summary>
+            private readonly SemaphoreSlim _lock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
 
             /// <summary>The number of outstanding workers.  When this hits 0, the operation has completed.</summary>
             private int _completionRefCount;
@@ -417,6 +416,21 @@ namespace System.Threading.Tasks
             /// <returns>true if this is the last worker to complete iterating; otherwise, false.</returns>
             public bool SignalWorkerCompletedIterating() => Interlocked.Decrement(ref _completionRefCount) == 0;
 
+            /// <summary>Asynchronously acquires exclusive access to the enumerator.</summary>
+            public Task AcquireLock() =>
+                // We explicitly don't pass this.Cancellation to WaitAsync.  Doing so adds overhead, and it isn't actually
+                // necessary. All of the operations that monitor the lock are part of the same ForEachAsync operation, and the Task
+                // returned from ForEachAsync can't complete until all of the constituent operations have completed, including whoever
+                // holds the lock while this worker is waiting on the lock.  Thus, the lock will need to be released for the overall
+                // operation to complete.  Passing the token would allow the overall operation to potentially complete a bit faster in
+                // the face of cancellation, in exchange for making it a bit slower / more overhead in the common case of cancellation
+                // not being requested.  We want to optimize for the latter.  This also then avoids an exception throw / catch when
+                // cancellation is requested.
+                _lock.WaitAsync(CancellationToken.None);
+
+            /// <summary>Relinquishes exclusive access to the enumerator.</summary>
+            public void ReleaseLock() => _lock.Release();
+
             /// <summary>Stores an exception and triggers cancellation in order to alert all workers to stop as soon as possible.</summary>
             /// <param name="e">The exception.</param>
             public void RecordException(Exception e)
@@ -444,6 +458,7 @@ namespace System.Threading.Tasks
                 else if (_exceptions is null)
                 {
                     // Everything completed successfully.
+                    Debug.Assert(!Cancellation.IsCancellationRequested);
                     taskSet = TrySetResult();
                 }
                 else
@@ -500,7 +515,6 @@ namespace System.Threading.Tasks
         /// <typeparam name="TSource">Specifies the type of data being enumerated.</typeparam>
         private sealed class AsyncForEachAsyncState<TSource> : ForEachAsyncState<TSource>, IAsyncDisposable
         {
-            public readonly SemaphoreSlim Lock = new SemaphoreSlim(1, 1);
             public readonly IAsyncEnumerator<TSource> Enumerator;
 
             public AsyncForEachAsyncState(