Add Parallel.ForAsync (#84804)
authorStephen Toub <stoub@microsoft.com>
Tue, 25 Apr 2023 02:51:18 +0000 (22:51 -0400)
committerGitHub <noreply@github.com>
Tue, 25 Apr 2023 02:51:18 +0000 (22:51 -0400)
* Add Parallel.ForAsync

* Apply suggestions from code review

Co-authored-by: Carlos Sánchez López <1175054+carlossanlop@users.noreply.github.com>
---------

Co-authored-by: Carlos Sánchez López <1175054+carlossanlop@users.noreply.github.com>
src/libraries/System.Threading.Tasks.Parallel/ref/System.Threading.Tasks.Parallel.cs
src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs
src/libraries/System.Threading.Tasks.Parallel/tests/ParallelForEachAsyncTests.cs

index 41be9b17b088cc79c9f1990a7be85852ced640c4..abe73d4db72e2231a7a2d836b5fd5400944c5a56 100644 (file)
@@ -4,6 +4,8 @@
 // Changes to this file must follow the https://aka.ms/api-review process.
 // ------------------------------------------------------------------------------
 
+using System.Numerics;
+
 namespace System.Threading.Tasks
 {
     public static partial class Parallel
@@ -16,6 +18,9 @@ namespace System.Threading.Tasks
         public static System.Threading.Tasks.ParallelLoopResult For(long fromInclusive, long toExclusive, System.Action<long> body) { throw null; }
         public static System.Threading.Tasks.ParallelLoopResult For(long fromInclusive, long toExclusive, System.Threading.Tasks.ParallelOptions parallelOptions, System.Action<long, System.Threading.Tasks.ParallelLoopState> body) { throw null; }
         public static System.Threading.Tasks.ParallelLoopResult For(long fromInclusive, long toExclusive, System.Threading.Tasks.ParallelOptions parallelOptions, System.Action<long> body) { throw null; }
+        public static System.Threading.Tasks.Task ForAsync<T>(T fromInclusive, T toExclusive, System.Func<T, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask> body) where T : notnull, System.Numerics.IBinaryInteger<T> { throw null; }
+        public static System.Threading.Tasks.Task ForAsync<T>(T fromInclusive, T toExclusive, System.Threading.CancellationToken cancellationToken, System.Func<T, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask> body) where T : notnull, System.Numerics.IBinaryInteger<T> { throw null; }
+        public static System.Threading.Tasks.Task ForAsync<T>(T fromInclusive, T toExclusive, System.Threading.Tasks.ParallelOptions parallelOptions, System.Func<T, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask> body) where T : notnull, System.Numerics.IBinaryInteger<T> { throw null; }
         public static System.Threading.Tasks.ParallelLoopResult ForEach<TSource>(System.Collections.Concurrent.OrderablePartitioner<TSource> source, System.Action<TSource, System.Threading.Tasks.ParallelLoopState, long> body) { throw null; }
         public static System.Threading.Tasks.ParallelLoopResult ForEach<TSource>(System.Collections.Concurrent.OrderablePartitioner<TSource> source, System.Threading.Tasks.ParallelOptions parallelOptions, System.Action<TSource, System.Threading.Tasks.ParallelLoopState, long> body) { throw null; }
         public static System.Threading.Tasks.ParallelLoopResult ForEach<TSource>(System.Collections.Concurrent.Partitioner<TSource> source, System.Action<TSource, System.Threading.Tasks.ParallelLoopState> body) { throw null; }
index 7862243c1ae4f0d0e9974b59b9feb8a0dbaedcd7..b3c6d50cafe2e7d974c359ae5e140a2d709649c3 100644 (file)
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Numerics;
+using System.Runtime.CompilerServices;
 
 namespace System.Threading.Tasks
 {
     public static partial class Parallel
     {
-        /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+        /// <summary>Executes a for loop in which iterations may run in parallel.</summary>
+        /// <param name="fromInclusive">The start index, inclusive.</param>
+        /// <param name="toExclusive">The end index, exclusive.</param>
+        /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
+        /// <exception cref="ArgumentNullException">The <paramref name="body"/> argument is <see langword="null"/>.</exception>
+        /// <returns>A task that represents the entire for each operation.</returns>
+        /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
+        public static Task ForAsync<T>(T fromInclusive, T toExclusive, Func<T, CancellationToken, ValueTask> body)
+            where T : notnull, IBinaryInteger<T>
+        {
+            if (fromInclusive is null) throw new ArgumentNullException(nameof(fromInclusive));
+            if (toExclusive is null) throw new ArgumentNullException(nameof(toExclusive));
+            ArgumentNullException.ThrowIfNull(body);
+
+            return ForAsync(fromInclusive, toExclusive, DefaultDegreeOfParallelism, TaskScheduler.Default, default, body);
+        }
+
+        /// <summary>Executes a for loop in which iterations may run in parallel.</summary>
+        /// <param name="fromInclusive">The start index, inclusive.</param>
+        /// <param name="toExclusive">The end index, exclusive.</param>
+        /// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
+        /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
+        /// <exception cref="ArgumentNullException">The <paramref name="body"/> argument is <see langword="null"/>.</exception>
+        /// <returns>A task that represents the entire for each operation.</returns>
+        /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
+        public static Task ForAsync<T>(T fromInclusive, T toExclusive, CancellationToken cancellationToken, Func<T, CancellationToken, ValueTask> body)
+            where T : notnull, IBinaryInteger<T>
+        {
+            if (fromInclusive is null) throw new ArgumentNullException(nameof(fromInclusive));
+            if (toExclusive is null) throw new ArgumentNullException(nameof(toExclusive));
+            ArgumentNullException.ThrowIfNull(body);
+
+            return ForAsync(fromInclusive, toExclusive, DefaultDegreeOfParallelism, TaskScheduler.Default, cancellationToken, body);
+        }
+
+        /// <summary>Executes a for loop in which iterations may run in parallel.</summary>
+        /// <param name="fromInclusive">The start index, inclusive.</param>
+        /// <param name="toExclusive">The end index, exclusive.</param>
+        /// <param name="parallelOptions">An object that configures the behavior of this operation.</param>
+        /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
+        /// <exception cref="ArgumentNullException">The <paramref name="body"/> argument is <see langword="null"/>.</exception>
+        /// <returns>A task that represents the entire for each operation.</returns>
+        /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
+        public static Task ForAsync<T>(T fromInclusive, T toExclusive, ParallelOptions parallelOptions, Func<T, CancellationToken, ValueTask> body)
+            where T : notnull, IBinaryInteger<T>
+        {
+            if (fromInclusive is null) throw new ArgumentNullException(nameof(fromInclusive));
+            if (toExclusive is null) throw new ArgumentNullException(nameof(toExclusive));
+            ArgumentNullException.ThrowIfNull(parallelOptions);
+            ArgumentNullException.ThrowIfNull(body);
+
+            return ForAsync(fromInclusive, toExclusive, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body);
+        }
+
+        /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+        /// <typeparam name="T">The type of the data in the source.</typeparam>
+        /// <param name="fromInclusive">The start index, inclusive.</param>
+        /// <param name="toExclusive">The end index, exclusive.</param>
+        /// <param name="dop">The degree of parallelism, or the number of operations to allow to run in parallel.</param>
+        /// <param name="scheduler">The task scheduler on which all code should execute.</param>
+        /// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
+        /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
+        /// <exception cref="ArgumentNullException">The <paramref name="body"/> argument is <see langword="null"/>.</exception>
+        /// <returns>A task that represents the entire for each operation.</returns>
+        private static Task ForAsync<T>(T fromInclusive, T toExclusive, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<T, CancellationToken, ValueTask> body)
+            where T : notnull, IBinaryInteger<T>
+        {
+            Debug.Assert(fromInclusive != null);
+            Debug.Assert(toExclusive != null);
+            Debug.Assert(scheduler != null);
+            Debug.Assert(body != null);
+
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return Task.FromCanceled(cancellationToken);
+            }
+
+            if (fromInclusive >= toExclusive)
+            {
+                return Task.CompletedTask;
+            }
+
+            [MethodImpl(MethodImplOptions.AggressiveInlining)]
+            static bool Interlockable() =>
+                typeof(T) == typeof(int) ||
+                typeof(T) == typeof(uint) ||
+                typeof(T) == typeof(long) ||
+                typeof(T) == typeof(ulong) ||
+                typeof(T) == typeof(nint) ||
+                typeof(T) == typeof(nuint);
+
+            [MethodImpl(MethodImplOptions.AggressiveInlining)]
+            static bool CompareExchange(ref T location, T value, T comparand) =>
+                typeof(T) == typeof(int) ? Interlocked.CompareExchange(ref Unsafe.As<T, int>(ref location), Unsafe.As<T, int>(ref value), Unsafe.As<T, int>(ref comparand)) == Unsafe.As<T, int>(ref comparand) :
+                typeof(T) == typeof(uint) ? Interlocked.CompareExchange(ref Unsafe.As<T, uint>(ref location), Unsafe.As<T, uint>(ref value), Unsafe.As<T, uint>(ref comparand)) == Unsafe.As<T, uint>(ref comparand) :
+                typeof(T) == typeof(long) ? Interlocked.CompareExchange(ref Unsafe.As<T, long>(ref location), Unsafe.As<T, long>(ref value), Unsafe.As<T, long>(ref comparand)) == Unsafe.As<T, long>(ref comparand) :
+                typeof(T) == typeof(ulong) ? Interlocked.CompareExchange(ref Unsafe.As<T, ulong>(ref location), Unsafe.As<T, ulong>(ref value), Unsafe.As<T, ulong>(ref comparand)) == Unsafe.As<T, ulong>(ref comparand) :
+                typeof(T) == typeof(nint) ? Interlocked.CompareExchange(ref Unsafe.As<T, nint>(ref location), Unsafe.As<T, nint>(ref value), Unsafe.As<T, nint>(ref comparand)) == Unsafe.As<T, nint>(ref comparand) :
+                typeof(T) == typeof(nuint) ? Interlocked.CompareExchange(ref Unsafe.As<T, nuint>(ref location), Unsafe.As<T, nuint>(ref value), Unsafe.As<T, nuint>(ref comparand)) == Unsafe.As<T, nuint>(ref comparand) :
+                throw new UnreachableException();
+
+            // The worker body. Each worker will execute this same body.
+            Func<object, Task> taskBody = static async o =>
+            {
+                var state = (ForEachState<T>)o;
+                bool launchedNext = false;
+
+#pragma warning disable CA2007 // Explicitly don't use ConfigureAwait, as we want to perform all work on the specified scheduler that's now current
+                try
+                {
+                    // Continue to loop while there are more elements to be processed.
+                    while (!state.Cancellation.IsCancellationRequested)
+                    {
+                        // Get the next element from the enumerator. For some types, we can get the next element with just
+                        // interlocked operations, avoiding the need to take a lock.  For other types, we need to take a lock.
+                        T element;
+                        if (Interlockable())
+                        {
+                            TryAgain:
+                            element = state.NextAvailable;
+                            if (element >= state.ToExclusive)
+                            {
+                                break;
+                            }
+
+                            if (!CompareExchange(ref state.NextAvailable, element + T.One, element))
+                            {
+                                goto TryAgain;
+                            }
+                        }
+                        else
+                        {
+                            await state.AcquireLock();
+                            try
+                            {
+                                if (state.Cancellation.IsCancellationRequested || // check now that the lock has been acquired
+                                    state.NextAvailable >= state.ToExclusive)
+                                {
+                                    break;
+                                }
+
+                                element = state.NextAvailable;
+                                state.NextAvailable++;
+                            }
+                            finally
+                            {
+                                state.ReleaseLock();
+                            }
+                        }
+
+                        // If the remaining dop allows it and we've not yet queued the next worker, do so now.  We wait
+                        // until after we've grabbed an item from the enumerator to a) avoid unnecessary contention on the
+                        // serialized resource, and b) avoid queueing another work if there aren't any more items.  Each worker
+                        // is responsible only for creating the next worker, which in turn means there can't be any contention
+                        // on creating workers (though it's possible one worker could be executing while we're creating the next).
+                        if (!launchedNext)
+                        {
+                            launchedNext = true;
+                            state.QueueWorkerIfDopAvailable();
+                        }
+
+                        // Process the loop body.
+                        await state.LoopBody(element, state.Cancellation.Token);
+                    }
+                }
+                catch (Exception e)
+                {
+                    // Record the failure and then don't let the exception propagate.  The last worker to complete
+                    // will propagate exceptions as is appropriate to the top-level task.
+                    state.RecordException(e);
+                }
+                finally
+                {
+                    // If we're the last worker to complete, complete the operation.
+                    if (state.SignalWorkerCompletedIterating())
+                    {
+                        state.Complete();
+                    }
+                }
+#pragma warning restore CA2007
+            };
+
+            try
+            {
+                // Construct a state object that encapsulates all state to be passed and shared between
+                // the workers, and queues the first worker.
+                var state = new ForEachState<T>(fromInclusive, toExclusive, taskBody, !Interlockable(), dop, scheduler, cancellationToken, body);
+                state.QueueWorkerIfDopAvailable();
+                return state.Task;
+            }
+            catch (Exception e)
+            {
+                return Task.FromException(e);
+            }
+        }
+
+        /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
         /// <typeparam name="TSource">The type of the data in the source.</typeparam>
         /// <param name="source">An enumerable data source.</param>
         /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
-        /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+        /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
         /// <returns>A task that represents the entire for each operation.</returns>
         /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
         public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask> body)
@@ -23,12 +221,12 @@ namespace System.Threading.Tasks
             return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, default(CancellationToken), body);
         }
 
-        /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+        /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
         /// <typeparam name="TSource">The type of the data in the source.</typeparam>
         /// <param name="source">An enumerable data source.</param>
         /// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
         /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
-        /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+        /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
         /// <returns>A task that represents the entire for each operation.</returns>
         /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
         public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
@@ -39,12 +237,12 @@ namespace System.Threading.Tasks
             return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, cancellationToken, body);
         }
 
-        /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+        /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
         /// <typeparam name="TSource">The type of the data in the source.</typeparam>
         /// <param name="source">An enumerable data source.</param>
         /// <param name="parallelOptions">An object that configures the behavior of this operation.</param>
         /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
-        /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+        /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
         /// <returns>A task that represents the entire for each operation.</returns>
         public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
         {
@@ -55,14 +253,14 @@ namespace System.Threading.Tasks
             return ForEachAsync(source, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body);
         }
 
-        /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+        /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
         /// <typeparam name="TSource">The type of the data in the source.</typeparam>
         /// <param name="source">An enumerable data source.</param>
         /// <param name="dop">A integer indicating how many operations to allow to run in parallel.</param>
         /// <param name="scheduler">The task scheduler on which all code should execute.</param>
         /// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
         /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
-        /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+        /// <exception cref="ArgumentNullException">The<paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
         /// <returns>A task that represents the entire for each operation.</returns>
         private static Task ForEachAsync<TSource>(IEnumerable<TSource> source, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
         {
@@ -76,11 +274,6 @@ namespace System.Threading.Tasks
                 return Task.FromCanceled(cancellationToken);
             }
 
-            if (dop < 0)
-            {
-                dop = DefaultDegreeOfParallelism;
-            }
-
             // The worker body. Each worker will execute this same body.
             Func<object, Task> taskBody = static async o =>
             {
@@ -168,11 +361,11 @@ namespace System.Threading.Tasks
             }
         }
 
-        /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+        /// <summary>Executes a for each operation on an <see cref="IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
         /// <typeparam name="TSource">The type of the data in the source.</typeparam>
         /// <param name="source">An asynchronous enumerable data source.</param>
         /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
-        /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+        /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
         /// <returns>A task that represents the entire for each operation.</returns>
         /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
         public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask> body)
@@ -183,12 +376,12 @@ namespace System.Threading.Tasks
             return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, default(CancellationToken), body);
         }
 
-        /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+        /// <summary>Executes a for each operation on an <see cref="IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
         /// <typeparam name="TSource">The type of the data in the source.</typeparam>
         /// <param name="source">An asynchronous enumerable data source.</param>
         /// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
         /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
-        /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+        /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
         /// <returns>A task that represents the entire for each operation.</returns>
         /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
         public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
@@ -199,12 +392,12 @@ namespace System.Threading.Tasks
             return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, cancellationToken, body);
         }
 
-        /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+        /// <summary>Executes a for each operation on an <see cref="IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
         /// <typeparam name="TSource">The type of the data in the source.</typeparam>
         /// <param name="source">An asynchronous enumerable data source.</param>
         /// <param name="parallelOptions">An object that configures the behavior of this operation.</param>
         /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
-        /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+        /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
         /// <returns>A task that represents the entire for each operation.</returns>
         public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
         {
@@ -215,14 +408,14 @@ namespace System.Threading.Tasks
             return ForEachAsync(source, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body);
         }
 
-        /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+        /// <summary>Executes a for each operation on an <see cref="IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
         /// <typeparam name="TSource">The type of the data in the source.</typeparam>
         /// <param name="source">An asynchronous enumerable data source.</param>
         /// <param name="dop">A integer indicating how many operations to allow to run in parallel.</param>
         /// <param name="scheduler">The task scheduler on which all code should execute.</param>
         /// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
         /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
-        /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+        /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
         /// <returns>A task that represents the entire for each operation.</returns>
         private static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
         {
@@ -236,11 +429,6 @@ namespace System.Threading.Tasks
                 return Task.FromCanceled(cancellationToken);
             }
 
-            if (dop < 0)
-            {
-                dop = DefaultDegreeOfParallelism;
-            }
-
             // The worker body. Each worker will execute this same body.
             Func<object, Task> taskBody = static async o =>
             {
@@ -352,7 +540,7 @@ namespace System.Threading.Tasks
             /// <summary>The <see cref="ExecutionContext"/> present at the time of the ForEachAsync invocation.  This is only used if on the default scheduler.</summary>
             private readonly ExecutionContext? _executionContext;
             /// <summary>Semaphore used to provide exclusive access to the enumerator.</summary>
-            private readonly SemaphoreSlim _lock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
+            private readonly SemaphoreSlim? _lock;
 
             /// <summary>The number of outstanding workers.  When this hits 0, the operation has completed.</summary>
             private int _completionRefCount;
@@ -367,10 +555,11 @@ namespace System.Threading.Tasks
             public readonly CancellationTokenSource Cancellation = new CancellationTokenSource();
 
             /// <summary>Initializes the state object.</summary>
-            protected ForEachAsyncState(Func<object, Task> taskBody, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
+            protected ForEachAsyncState(Func<object, Task> taskBody, bool needsLock, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
             {
                 _taskBody = taskBody;
-                _remainingDop = dop;
+                _lock = needsLock ? new SemaphoreSlim(initialCount: 1, maxCount: 1) : null;
+                _remainingDop = dop < 0 ? DefaultDegreeOfParallelism : dop;
                 LoopBody = body;
                 _scheduler = scheduler;
                 if (scheduler == TaskScheduler.Default)
@@ -417,7 +606,8 @@ namespace System.Threading.Tasks
             public bool SignalWorkerCompletedIterating() => Interlocked.Decrement(ref _completionRefCount) == 0;
 
             /// <summary>Asynchronously acquires exclusive access to the enumerator.</summary>
-            public Task AcquireLock() =>
+            public Task AcquireLock()
+            {
                 // We explicitly don't pass this.Cancellation to WaitAsync.  Doing so adds overhead, and it isn't actually
                 // necessary. All of the operations that monitor the lock are part of the same ForEachAsync operation, and the Task
                 // returned from ForEachAsync can't complete until all of the constituent operations have completed, including whoever
@@ -426,10 +616,16 @@ namespace System.Threading.Tasks
                 // the face of cancellation, in exchange for making it a bit slower / more overhead in the common case of cancellation
                 // not being requested.  We want to optimize for the latter.  This also then avoids an exception throw / catch when
                 // cancellation is requested.
-                _lock.WaitAsync(CancellationToken.None);
+                Debug.Assert(_lock is not null, "Should only be invoked when _lock is non-null");
+                return _lock.WaitAsync(CancellationToken.None);
+            }
 
             /// <summary>Relinquishes exclusive access to the enumerator.</summary>
-            public void ReleaseLock() => _lock.Release();
+            public void ReleaseLock()
+            {
+                Debug.Assert(_lock is not null, "Should only be invoked when _lock is non-null");
+                _lock.Release();
+            }
 
             /// <summary>Stores an exception and triggers cancellation in order to alert all workers to stop as soon as possible.</summary>
             /// <param name="e">The exception.</param>
@@ -513,7 +709,7 @@ namespace System.Threading.Tasks
                 IEnumerable<TSource> source, Func<object, Task> taskBody,
                 int dop, TaskScheduler scheduler, CancellationToken cancellationToken,
                 Func<TSource, CancellationToken, ValueTask> body) :
-                base(taskBody, dop, scheduler, cancellationToken, body)
+                base(taskBody, needsLock: true, dop, scheduler, cancellationToken, body)
             {
                 Enumerator = source.GetEnumerator() ?? throw new InvalidOperationException(SR.Parallel_ForEach_NullEnumerator);
             }
@@ -535,7 +731,7 @@ namespace System.Threading.Tasks
                 IAsyncEnumerable<TSource> source, Func<object, Task> taskBody,
                 int dop, TaskScheduler scheduler, CancellationToken cancellationToken,
                 Func<TSource, CancellationToken, ValueTask> body) :
-                base(taskBody, dop, scheduler, cancellationToken, body)
+                base(taskBody, needsLock: true, dop, scheduler, cancellationToken, body)
             {
                 Enumerator = source.GetAsyncEnumerator(Cancellation.Token) ?? throw new InvalidOperationException(SR.Parallel_ForEach_NullEnumerator);
             }
@@ -546,5 +742,23 @@ namespace System.Threading.Tasks
                 return Enumerator.DisposeAsync();
             }
         }
+
+        /// <summary>Stores the state associated with an IAsyncEnumerable ForEachAsync operation, shared between all its workers.</summary>
+        /// <typeparam name="T">Specifies the type of data being enumerated.</typeparam>
+        private sealed class ForEachState<T> : ForEachAsyncState<T>
+        {
+            public T NextAvailable;
+            public readonly T ToExclusive;
+
+            public ForEachState(
+                T fromExclusive, T toExclusive, Func<object, Task> taskBody,
+                bool needsLock, int dop, TaskScheduler scheduler, CancellationToken cancellationToken,
+                Func<T, CancellationToken, ValueTask> body) :
+                base(taskBody, needsLock, dop, scheduler, cancellationToken, body)
+            {
+                NextAvailable = fromExclusive;
+                ToExclusive = toExclusive;
+            }
+        }
     }
 }
index b68b3d2c7b26bb655b8c1767b49b9cac46d4da7e..5708643c1d28f0930fc75bbc24bcd16f322cda49 100644 (file)
@@ -5,6 +5,7 @@ using System.Collections;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
+using System.Numerics;
 using System.Runtime.CompilerServices;
 using Xunit;
 
@@ -23,9 +24,14 @@ namespace System.Threading.Tasks.Tests
             AssertExtensions.Throws<ArgumentNullException>("source", () => { Parallel.ForEachAsync((IAsyncEnumerable<int>)null, CancellationToken.None, (item, cancellationToken) => default); });
             AssertExtensions.Throws<ArgumentNullException>("source", () => { Parallel.ForEachAsync((IAsyncEnumerable<int>)null, new ParallelOptions(), (item, cancellationToken) => default); });
 
+            AssertExtensions.Throws<ArgumentNullException>("parallelOptions", () => { Parallel.ForAsync(1, 10, null, (item, cancellationToken) => default); });
             AssertExtensions.Throws<ArgumentNullException>("parallelOptions", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), null, (item, cancellationToken) => default); });
             AssertExtensions.Throws<ArgumentNullException>("parallelOptions", () => { Parallel.ForEachAsync(EnumerableRangeAsync(1, 10), null, (item, cancellationToken) => default); });
 
+            AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForAsync(1, 10, null); });
+            AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForAsync(1, 10, CancellationToken.None, null); });
+            AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForAsync(1, 10, new ParallelOptions(), null); });
+
             AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), null); });
             AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), CancellationToken.None, null); });
             AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), new ParallelOptions(), null); });
@@ -54,9 +60,11 @@ namespace System.Threading.Tasks.Tests
                 return default;
             };
 
+            AssertCanceled(Parallel.ForAsync(1, 10, cts.Token, body));
             AssertCanceled(Parallel.ForEachAsync(MarkStart(box), cts.Token, body));
             AssertCanceled(Parallel.ForEachAsync(MarkStartAsync(box), cts.Token, body));
 
+            AssertCanceled(Parallel.ForAsync(1, 10, new ParallelOptions { CancellationToken = cts.Token }, body));
             AssertCanceled(Parallel.ForEachAsync(MarkStart(box), new ParallelOptions { CancellationToken = cts.Token }, body));
             AssertCanceled(Parallel.ForEachAsync(MarkStartAsync(box), new ParallelOptions { CancellationToken = cts.Token }, body));
 
@@ -79,6 +87,39 @@ namespace System.Threading.Tasks.Tests
             }
         }
 
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [InlineData(-1)]
+        [InlineData(1)]
+        [InlineData(2)]
+        [InlineData(4)]
+        [InlineData(128)]
+        public async Task Dop_WorkersCreatedRespectingLimit_For(int dop)
+        {
+            bool exit = false;
+
+            int activeWorkers = 0;
+            var block = new TaskCompletionSource();
+
+            Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { MaxDegreeOfParallelism = dop }, async (item, cancellationToken) =>
+            {
+                Interlocked.Increment(ref activeWorkers);
+                await block.Task;
+                if (Volatile.Read(ref exit))
+                {
+                    throw new FormatException();
+                }
+            });
+            Assert.False(t.IsCompleted);
+
+            await Task.Delay(20); // give the loop some time to run
+
+            Volatile.Write(ref exit, true);
+            block.SetResult();
+            await Assert.ThrowsAsync<FormatException>(() => t);
+
+            Assert.InRange(activeWorkers, 0, dop == -1 ? Environment.ProcessorCount : dop);
+        }
+
         [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         [InlineData(-1)]
         [InlineData(1)]
@@ -117,6 +158,40 @@ namespace System.Threading.Tasks.Tests
             Assert.InRange(activeWorkers, 0, dop == -1 ? Environment.ProcessorCount : dop);
         }
 
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [InlineData(-1)]
+        [InlineData(1)]
+        [InlineData(2)]
+        [InlineData(4)]
+        [InlineData(128)]
+        public async Task Dop_WorkersCreatedRespectingLimitAndTaskScheduler_For(int dop)
+        {
+            bool exit = false;
+            int activeWorkers = 0;
+            var block = new TaskCompletionSource();
+
+            int MaxSchedulerLimit = Math.Min(2, Environment.ProcessorCount);
+
+            Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { MaxDegreeOfParallelism = dop, TaskScheduler = new MaxConcurrencyLevelPassthroughTaskScheduler(MaxSchedulerLimit) }, async (item, cancellationToken) =>
+            {
+                Interlocked.Increment(ref activeWorkers);
+                await block.Task;
+                if (Volatile.Read(ref exit))
+                {
+                    throw new FormatException();
+                }
+            });
+            Assert.False(t.IsCompleted);
+
+            await Task.Delay(20); // give the loop some time to run
+
+            Volatile.Write(ref exit, true);
+            block.SetResult();
+            await Assert.ThrowsAsync<FormatException>(() => t);
+
+            Assert.InRange(activeWorkers, 0, Math.Min(MaxSchedulerLimit, dop == -1 ? Environment.ProcessorCount : dop));
+        }
+
         [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         [InlineData(-1)]
         [InlineData(1)]
@@ -157,6 +232,33 @@ namespace System.Threading.Tasks.Tests
             Assert.InRange(activeWorkers, 0, Math.Min(MaxSchedulerLimit, dop == -1 ? Environment.ProcessorCount : dop));
         }
 
+        [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        public async Task Dop_NegativeTaskSchedulerLimitTreatedAsDefault_For()
+        {
+            bool exit = false;
+            int activeWorkers = 0;
+            var block = new TaskCompletionSource();
+
+            Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { TaskScheduler = new MaxConcurrencyLevelPassthroughTaskScheduler(-42) }, async (item, cancellationToken) =>
+            {
+                Interlocked.Increment(ref activeWorkers);
+                await block.Task;
+                if (Volatile.Read(ref exit))
+                {
+                    throw new FormatException();
+                }
+            });
+            Assert.False(t.IsCompleted);
+
+            await Task.Delay(20); // give the loop some time to run
+
+            Volatile.Write(ref exit, true);
+            block.SetResult();
+            await Assert.ThrowsAsync<FormatException>(() => t);
+
+            Assert.InRange(activeWorkers, 0, Environment.ProcessorCount);
+        }
+
         [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         public async Task Dop_NegativeTaskSchedulerLimitTreatedAsDefault_Sync()
         {
@@ -224,6 +326,19 @@ namespace System.Threading.Tasks.Tests
             Assert.InRange(activeWorkers, 0, Environment.ProcessorCount);
         }
 
+        [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        public async Task RunsAsynchronously_For()
+        {
+            var cts = new CancellationTokenSource();
+
+            Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, cts.Token, (item, cancellationToken) => default);
+            Assert.False(t.IsCompleted);
+
+            cts.Cancel();
+
+            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+        }
+
         [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         public async Task RunsAsynchronously_EvenForEntirelySynchronousWork_Sync()
         {
@@ -301,6 +416,20 @@ namespace System.Threading.Tasks.Tests
             Assert.InRange(activeWorkers, 0, dop == -1 ? Environment.ProcessorCount : dop);
         }
 
+        [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        public void EmptyRange_For()
+        {
+            int counter = 0;
+            Task t = Parallel.ForAsync(10, 10, (item, cancellationToken) =>
+            {
+                Interlocked.Increment(ref counter);
+                return default;
+            });
+            Assert.True(t.IsCompletedSuccessfully);
+
+            Assert.Equal(0, counter);
+        }
+
         [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         public async Task EmptySource_Sync()
         {
@@ -327,6 +456,51 @@ namespace System.Threading.Tasks.Tests
             Assert.Equal(0, counter);
         }
 
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [InlineData(false)]
+        [InlineData(true)]
+        public async Task AllItemsEnumeratedOnce_For(bool yield)
+        {
+            await Test<int>(yield);
+            await Test<uint>(yield);
+            await Test<long>(yield);
+            await Test<ulong>(yield);
+            await Test<short>(yield);
+            await Test<ushort>(yield);
+            await Test<nint>(yield);
+            await Test<nuint>(yield);
+            await Test<Int128>(yield);
+            await Test<UInt128>(yield);
+            await Test<BigInteger>(yield);
+
+            async Task Test<T>(bool yield) where T : IBinaryInteger<T>
+            {
+                const int Start = 10, Count = 10_000;
+
+                var set = new HashSet<T>();
+
+                await Parallel.ForAsync(T.CreateTruncating(Start), T.CreateTruncating(Start + Count), async (item, cancellationToken) =>
+                {
+                    lock (set)
+                    {
+                        Assert.True(set.Add(item));
+                    }
+
+                    if (yield)
+                    {
+                        await Task.Yield();
+                    }
+                });
+
+                Assert.False(set.Contains(T.CreateTruncating(Start - 1)));
+                for (int i = Start; i < Start + Count; i++)
+                {
+                    Assert.True(set.Contains(T.CreateTruncating(i)));
+                }
+                Assert.False(set.Contains(T.CreateTruncating(Start + Count + 1)));
+            }
+        }
+
         [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         [InlineData(false)]
         [InlineData(true)]
@@ -349,10 +523,12 @@ namespace System.Threading.Tasks.Tests
                 }
             });
 
+            Assert.False(set.Contains(Start - 1));
             for (int i = Start; i < Start + Count; i++)
             {
                 Assert.True(set.Contains(i));
             }
+            Assert.False(set.Contains(Start + Count + 1));
         }
 
         [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
@@ -377,10 +553,40 @@ namespace System.Threading.Tasks.Tests
                 }
             });
 
+            Assert.False(set.Contains(Start - 1));
             for (int i = Start; i < Start + Count; i++)
             {
                 Assert.True(set.Contains(i));
             }
+            Assert.False(set.Contains(Start + Count + 1));
+        }
+
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [InlineData(false)]
+        [InlineData(true)]
+        public async Task TaskScheduler_AllCodeExecutedOnCorrectScheduler_For(bool defaultScheduler)
+        {
+            TaskScheduler scheduler = defaultScheduler ?
+                TaskScheduler.Default :
+                new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
+
+            TaskScheduler otherScheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
+
+            var cq = new ConcurrentQueue<int>();
+
+            await Parallel.ForAsync(1, 101, new ParallelOptions { TaskScheduler = scheduler }, async (item, cancellationToken) =>
+            {
+                Assert.Same(scheduler, TaskScheduler.Current);
+                await Task.Yield();
+                cq.Enqueue(item);
+
+                if (item % 10 == 0)
+                {
+                    await new SwitchTo(otherScheduler);
+                }
+            });
+
+            Assert.Equal(Enumerable.Range(1, 100), cq.OrderBy(i => i));
         }
 
         [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
@@ -460,6 +666,17 @@ namespace System.Threading.Tasks.Tests
             Assert.Equal(Enumerable.Range(1, 100), cq.OrderBy(i => i));
         }
 
+        [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        public async Task Cancellation_CancelsIterationAndReturnsCanceledTask_For()
+        {
+            using var cts = new CancellationTokenSource(10);
+            OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(() => Parallel.ForAsync(long.MinValue, long.MaxValue, cts.Token, async (item, cancellationToken) =>
+            {
+                await Task.Yield();
+            }));
+            Assert.Equal(cts.Token, oce.CancellationToken);
+        }
+
         [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         public async Task Cancellation_CancelsIterationAndReturnsCanceledTask_Sync()
         {
@@ -518,6 +735,21 @@ namespace System.Threading.Tasks.Tests
             });
         }
 
+        [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        public async Task Cancellation_SameTokenPassedToEveryInvocation_For()
+        {
+            var cq = new ConcurrentQueue<CancellationToken>();
+
+            await Parallel.ForAsync(1, 101, async (item, cancellationToken) =>
+            {
+                cq.Enqueue(cancellationToken);
+                await Task.Yield();
+            });
+
+            Assert.Equal(100, cq.Count);
+            Assert.Equal(1, cq.Distinct().Count());
+        }
+
         [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         public async Task Cancellation_SameTokenPassedToEveryInvocation_Sync()
         {
@@ -548,6 +780,32 @@ namespace System.Threading.Tasks.Tests
             Assert.Equal(1, cq.Distinct().Count());
         }
 
+        [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        public async Task Cancellation_HasPriorityOverExceptions_For()
+        {
+            var tcs = new TaskCompletionSource();
+            var cts = new CancellationTokenSource();
+
+            Task t = Parallel.ForAsync(0, long.MaxValue, new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = 2 }, async (item, cancellationToken) =>
+            {
+                if (item == 0)
+                {
+                    await tcs.Task;
+                    cts.Cancel();
+                    throw new FormatException();
+                }
+                else
+                {
+                    tcs.TrySetResult();
+                    await Task.Yield();
+                }
+            });
+
+            OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+            Assert.Equal(cts.Token, oce.CancellationToken);
+            Assert.True(t.IsCanceled);
+        }
+
         [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         public async Task Cancellation_HasPriorityOverExceptions_Sync()
         {
@@ -616,6 +874,22 @@ namespace System.Threading.Tasks.Tests
             Assert.True(t.IsCanceled);
         }
 
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [InlineData(false)]
+        [InlineData(true)]
+        public async Task Cancellation_FaultsForOceForNonCancellation_For(bool internalToken)
+        {
+            var cts = new CancellationTokenSource();
+
+            Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { CancellationToken = cts.Token }, (item, cancellationToken) =>
+            {
+                throw new OperationCanceledException(internalToken ? cancellationToken : cts.Token);
+            });
+
+            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+            Assert.True(t.IsFaulted);
+        }
+
         [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         [InlineData(false)]
         [InlineData(true)]
@@ -642,6 +916,38 @@ namespace System.Threading.Tasks.Tests
             Assert.True(t.IsFaulted);
         }
 
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [InlineData(0, 4)]
+        [InlineData(1, 4)]
+        [InlineData(2, 4)]
+        [InlineData(3, 4)]
+        [InlineData(4, 4)]
+        public async Task Cancellation_InternalCancellationExceptionsArentFilteredOut_For(int numThrowingNonCanceledOce, int total)
+        {
+            var cts = new CancellationTokenSource();
+
+            var barrier = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+            int remainingCount = total;
+
+            Task t = Parallel.ForAsync(0, total, new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = total }, async (item, cancellationToken) =>
+            {
+                // Wait for all operations to be started
+                if (Interlocked.Decrement(ref remainingCount) == 0)
+                {
+                    barrier.SetResult();
+                }
+                await barrier.Task;
+
+                throw item < numThrowingNonCanceledOce ?
+                    new OperationCanceledException(cancellationToken) :
+                    throw new FormatException();
+            });
+
+            await Assert.ThrowsAnyAsync<Exception>(() => t);
+            Assert.Equal(total, t.Exception.InnerExceptions.Count);
+            Assert.Equal(numThrowingNonCanceledOce, t.Exception.InnerExceptions.Count(e => e is OperationCanceledException));
+        }
+
         [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         [InlineData(0, 4)]
         [InlineData(1, 4)]
@@ -751,6 +1057,27 @@ namespace System.Threading.Tasks.Tests
             Assert.True(t.IsFaulted);
         }
 
+        [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        public async Task Exception_FromLoopBody_For()
+        {
+            var barrier = new Barrier(2);
+            Task t = Parallel.ForAsync(1, 3, new ParallelOptions { MaxDegreeOfParallelism = barrier.ParticipantCount }, (item, cancellationToken) =>
+            {
+                barrier.SignalAndWait();
+                throw item switch
+                {
+                    1 => new FormatException(),
+                    2 => new InvalidTimeZoneException(),
+                    _ => new Exception()
+                };
+            });
+            await Assert.ThrowsAnyAsync<Exception>(() => t);
+            Assert.True(t.IsFaulted);
+            Assert.Equal(2, t.Exception.InnerExceptions.Count);
+            Assert.Contains(t.Exception.InnerExceptions, e => e is FormatException);
+            Assert.Contains(t.Exception.InnerExceptions, e => e is InvalidTimeZoneException);
+        }
+
         [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         public async Task Exception_FromLoopBody_Sync()
         {
@@ -853,6 +1180,34 @@ namespace System.Threading.Tasks.Tests
             Assert.Contains(t.Exception.InnerExceptions, e => e is InvalidTimeZoneException);
         }
 
+        [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        public async Task Exception_ImplicitlyCancelsOtherWorkers_For()
+        {
+            await Assert.ThrowsAsync<Exception>(() => Parallel.ForAsync(0, long.MaxValue, async (item, cancellationToken) =>
+            {
+                await Task.Yield();
+                if (item == 1000)
+                {
+                    throw new Exception();
+                }
+            }));
+
+            await Assert.ThrowsAsync<FormatException>(() => Parallel.ForAsync(0, long.MaxValue, new ParallelOptions { MaxDegreeOfParallelism = 2 }, async (item, cancellationToken) =>
+            {
+                if (item == 0)
+                {
+                    throw new FormatException();
+                }
+                else
+                {
+                    Assert.Equal(1, item);
+                    var tcs = new TaskCompletionSource();
+                    cancellationToken.Register(() => tcs.SetResult());
+                    await tcs.Task;
+                }
+            }));
+        }
+
         [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         public async Task Exception_ImplicitlyCancelsOtherWorkers_Sync()
         {
@@ -958,6 +1313,24 @@ namespace System.Threading.Tasks.Tests
             Assert.IsType<FormatException>(ae.InnerException);
         }
 
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [InlineData(false)]
+        [InlineData(true)]
+        public async Task ExecutionContext_FlowsToWorkerBodies_For(bool defaultScheduler)
+        {
+            TaskScheduler scheduler = defaultScheduler ?
+                TaskScheduler.Default :
+                new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
+
+            var al = new AsyncLocal<int>();
+            al.Value = 42;
+            await Parallel.ForAsync(0, 100, async (item, cancellationToken) =>
+            {
+                await Task.Yield();
+                Assert.Equal(42, al.Value);
+            });
+        }
+
         [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
         [InlineData(false)]
         [InlineData(true)]