using System.Collections.Generic;
using System.Diagnostics;
+using System.Numerics;
+using System.Runtime.CompilerServices;
namespace System.Threading.Tasks
{
public static partial class Parallel
{
- /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+ /// <summary>Executes a for loop in which iterations may run in parallel.</summary>
+ /// <param name="fromInclusive">The start index, inclusive.</param>
+ /// <param name="toExclusive">The end index, exclusive.</param>
+ /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
+ /// <exception cref="ArgumentNullException">The <paramref name="body"/> argument is <see langword="null"/>.</exception>
+ /// <returns>A task that represents the entire for each operation.</returns>
+ /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
+ public static Task ForAsync<T>(T fromInclusive, T toExclusive, Func<T, CancellationToken, ValueTask> body)
+ where T : notnull, IBinaryInteger<T>
+ {
+ if (fromInclusive is null) throw new ArgumentNullException(nameof(fromInclusive));
+ if (toExclusive is null) throw new ArgumentNullException(nameof(toExclusive));
+ ArgumentNullException.ThrowIfNull(body);
+
+ return ForAsync(fromInclusive, toExclusive, DefaultDegreeOfParallelism, TaskScheduler.Default, default, body);
+ }
+
+ /// <summary>Executes a for loop in which iterations may run in parallel.</summary>
+ /// <param name="fromInclusive">The start index, inclusive.</param>
+ /// <param name="toExclusive">The end index, exclusive.</param>
+ /// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
+ /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
+ /// <exception cref="ArgumentNullException">The <paramref name="body"/> argument is <see langword="null"/>.</exception>
+ /// <returns>A task that represents the entire for each operation.</returns>
+ /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
+ public static Task ForAsync<T>(T fromInclusive, T toExclusive, CancellationToken cancellationToken, Func<T, CancellationToken, ValueTask> body)
+ where T : notnull, IBinaryInteger<T>
+ {
+ if (fromInclusive is null) throw new ArgumentNullException(nameof(fromInclusive));
+ if (toExclusive is null) throw new ArgumentNullException(nameof(toExclusive));
+ ArgumentNullException.ThrowIfNull(body);
+
+ return ForAsync(fromInclusive, toExclusive, DefaultDegreeOfParallelism, TaskScheduler.Default, cancellationToken, body);
+ }
+
+ /// <summary>Executes a for loop in which iterations may run in parallel.</summary>
+ /// <param name="fromInclusive">The start index, inclusive.</param>
+ /// <param name="toExclusive">The end index, exclusive.</param>
+ /// <param name="parallelOptions">An object that configures the behavior of this operation.</param>
+ /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
+ /// <exception cref="ArgumentNullException">The <paramref name="body"/> argument is <see langword="null"/>.</exception>
+ /// <returns>A task that represents the entire for each operation.</returns>
+ /// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
+ public static Task ForAsync<T>(T fromInclusive, T toExclusive, ParallelOptions parallelOptions, Func<T, CancellationToken, ValueTask> body)
+ where T : notnull, IBinaryInteger<T>
+ {
+ if (fromInclusive is null) throw new ArgumentNullException(nameof(fromInclusive));
+ if (toExclusive is null) throw new ArgumentNullException(nameof(toExclusive));
+ ArgumentNullException.ThrowIfNull(parallelOptions);
+ ArgumentNullException.ThrowIfNull(body);
+
+ return ForAsync(fromInclusive, toExclusive, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body);
+ }
+
+ /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+ /// <typeparam name="T">The type of the data in the source.</typeparam>
+ /// <param name="fromInclusive">The start index, inclusive.</param>
+ /// <param name="toExclusive">The end index, exclusive.</param>
+ /// <param name="dop">The degree of parallelism, or the number of operations to allow to run in parallel.</param>
+ /// <param name="scheduler">The task scheduler on which all code should execute.</param>
+ /// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
+ /// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
+ /// <exception cref="ArgumentNullException">The <paramref name="body"/> argument is <see langword="null"/>.</exception>
+ /// <returns>A task that represents the entire for each operation.</returns>
+ private static Task ForAsync<T>(T fromInclusive, T toExclusive, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<T, CancellationToken, ValueTask> body)
+ where T : notnull, IBinaryInteger<T>
+ {
+ Debug.Assert(fromInclusive != null);
+ Debug.Assert(toExclusive != null);
+ Debug.Assert(scheduler != null);
+ Debug.Assert(body != null);
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ if (fromInclusive >= toExclusive)
+ {
+ return Task.CompletedTask;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static bool Interlockable() =>
+ typeof(T) == typeof(int) ||
+ typeof(T) == typeof(uint) ||
+ typeof(T) == typeof(long) ||
+ typeof(T) == typeof(ulong) ||
+ typeof(T) == typeof(nint) ||
+ typeof(T) == typeof(nuint);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static bool CompareExchange(ref T location, T value, T comparand) =>
+ typeof(T) == typeof(int) ? Interlocked.CompareExchange(ref Unsafe.As<T, int>(ref location), Unsafe.As<T, int>(ref value), Unsafe.As<T, int>(ref comparand)) == Unsafe.As<T, int>(ref comparand) :
+ typeof(T) == typeof(uint) ? Interlocked.CompareExchange(ref Unsafe.As<T, uint>(ref location), Unsafe.As<T, uint>(ref value), Unsafe.As<T, uint>(ref comparand)) == Unsafe.As<T, uint>(ref comparand) :
+ typeof(T) == typeof(long) ? Interlocked.CompareExchange(ref Unsafe.As<T, long>(ref location), Unsafe.As<T, long>(ref value), Unsafe.As<T, long>(ref comparand)) == Unsafe.As<T, long>(ref comparand) :
+ typeof(T) == typeof(ulong) ? Interlocked.CompareExchange(ref Unsafe.As<T, ulong>(ref location), Unsafe.As<T, ulong>(ref value), Unsafe.As<T, ulong>(ref comparand)) == Unsafe.As<T, ulong>(ref comparand) :
+ typeof(T) == typeof(nint) ? Interlocked.CompareExchange(ref Unsafe.As<T, nint>(ref location), Unsafe.As<T, nint>(ref value), Unsafe.As<T, nint>(ref comparand)) == Unsafe.As<T, nint>(ref comparand) :
+ typeof(T) == typeof(nuint) ? Interlocked.CompareExchange(ref Unsafe.As<T, nuint>(ref location), Unsafe.As<T, nuint>(ref value), Unsafe.As<T, nuint>(ref comparand)) == Unsafe.As<T, nuint>(ref comparand) :
+ throw new UnreachableException();
+
+ // The worker body. Each worker will execute this same body.
+ Func<object, Task> taskBody = static async o =>
+ {
+ var state = (ForEachState<T>)o;
+ bool launchedNext = false;
+
+#pragma warning disable CA2007 // Explicitly don't use ConfigureAwait, as we want to perform all work on the specified scheduler that's now current
+ try
+ {
+ // Continue to loop while there are more elements to be processed.
+ while (!state.Cancellation.IsCancellationRequested)
+ {
+ // Get the next element from the enumerator. For some types, we can get the next element with just
+ // interlocked operations, avoiding the need to take a lock. For other types, we need to take a lock.
+ T element;
+ if (Interlockable())
+ {
+ TryAgain:
+ element = state.NextAvailable;
+ if (element >= state.ToExclusive)
+ {
+ break;
+ }
+
+ if (!CompareExchange(ref state.NextAvailable, element + T.One, element))
+ {
+ goto TryAgain;
+ }
+ }
+ else
+ {
+ await state.AcquireLock();
+ try
+ {
+ if (state.Cancellation.IsCancellationRequested || // check now that the lock has been acquired
+ state.NextAvailable >= state.ToExclusive)
+ {
+ break;
+ }
+
+ element = state.NextAvailable;
+ state.NextAvailable++;
+ }
+ finally
+ {
+ state.ReleaseLock();
+ }
+ }
+
+ // If the remaining dop allows it and we've not yet queued the next worker, do so now. We wait
+ // until after we've grabbed an item from the enumerator to a) avoid unnecessary contention on the
+ // serialized resource, and b) avoid queueing another work if there aren't any more items. Each worker
+ // is responsible only for creating the next worker, which in turn means there can't be any contention
+ // on creating workers (though it's possible one worker could be executing while we're creating the next).
+ if (!launchedNext)
+ {
+ launchedNext = true;
+ state.QueueWorkerIfDopAvailable();
+ }
+
+ // Process the loop body.
+ await state.LoopBody(element, state.Cancellation.Token);
+ }
+ }
+ catch (Exception e)
+ {
+ // Record the failure and then don't let the exception propagate. The last worker to complete
+ // will propagate exceptions as is appropriate to the top-level task.
+ state.RecordException(e);
+ }
+ finally
+ {
+ // If we're the last worker to complete, complete the operation.
+ if (state.SignalWorkerCompletedIterating())
+ {
+ state.Complete();
+ }
+ }
+#pragma warning restore CA2007
+ };
+
+ try
+ {
+ // Construct a state object that encapsulates all state to be passed and shared between
+ // the workers, and queues the first worker.
+ var state = new ForEachState<T>(fromInclusive, toExclusive, taskBody, !Interlockable(), dop, scheduler, cancellationToken, body);
+ state.QueueWorkerIfDopAvailable();
+ return state.Task;
+ }
+ catch (Exception e)
+ {
+ return Task.FromException(e);
+ }
+ }
+
+ /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
/// <typeparam name="TSource">The type of the data in the source.</typeparam>
/// <param name="source">An enumerable data source.</param>
/// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
- /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+ /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
/// <returns>A task that represents the entire for each operation.</returns>
/// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask> body)
return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, default(CancellationToken), body);
}
- /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+ /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
/// <typeparam name="TSource">The type of the data in the source.</typeparam>
/// <param name="source">An enumerable data source.</param>
/// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
/// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
- /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+ /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
/// <returns>A task that represents the entire for each operation.</returns>
/// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, cancellationToken, body);
}
- /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+ /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
/// <typeparam name="TSource">The type of the data in the source.</typeparam>
/// <param name="source">An enumerable data source.</param>
/// <param name="parallelOptions">An object that configures the behavior of this operation.</param>
/// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
- /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+ /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
/// <returns>A task that represents the entire for each operation.</returns>
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
{
return ForEachAsync(source, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body);
}
- /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+ /// <summary>Executes a for each operation on an <see cref="IEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
/// <typeparam name="TSource">The type of the data in the source.</typeparam>
/// <param name="source">An enumerable data source.</param>
/// <param name="dop">A integer indicating how many operations to allow to run in parallel.</param>
/// <param name="scheduler">The task scheduler on which all code should execute.</param>
/// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
/// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
- /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+ /// <exception cref="ArgumentNullException">The<paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
/// <returns>A task that represents the entire for each operation.</returns>
private static Task ForEachAsync<TSource>(IEnumerable<TSource> source, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
{
return Task.FromCanceled(cancellationToken);
}
- if (dop < 0)
- {
- dop = DefaultDegreeOfParallelism;
- }
-
// The worker body. Each worker will execute this same body.
Func<object, Task> taskBody = static async o =>
{
}
}
- /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+ /// <summary>Executes a for each operation on an <see cref="IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
/// <typeparam name="TSource">The type of the data in the source.</typeparam>
/// <param name="source">An asynchronous enumerable data source.</param>
/// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
- /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+ /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
/// <returns>A task that represents the entire for each operation.</returns>
/// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask> body)
return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, default(CancellationToken), body);
}
- /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+ /// <summary>Executes a for each operation on an <see cref="IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
/// <typeparam name="TSource">The type of the data in the source.</typeparam>
/// <param name="source">An asynchronous enumerable data source.</param>
/// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
/// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
- /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+ /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
/// <returns>A task that represents the entire for each operation.</returns>
/// <remarks>The operation will execute at most <see cref="Environment.ProcessorCount"/> operations in parallel.</remarks>
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, cancellationToken, body);
}
- /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+ /// <summary>Executes a for each operation on an <see cref="IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
/// <typeparam name="TSource">The type of the data in the source.</typeparam>
/// <param name="source">An asynchronous enumerable data source.</param>
/// <param name="parallelOptions">An object that configures the behavior of this operation.</param>
/// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
- /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+ /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
/// <returns>A task that represents the entire for each operation.</returns>
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
{
return ForEachAsync(source, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body);
}
- /// <summary>Executes a for each operation on an <see cref="System.Collections.Generic.IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
+ /// <summary>Executes a for each operation on an <see cref="IAsyncEnumerable{TSource}"/> in which iterations may run in parallel.</summary>
/// <typeparam name="TSource">The type of the data in the source.</typeparam>
/// <param name="source">An asynchronous enumerable data source.</param>
/// <param name="dop">A integer indicating how many operations to allow to run in parallel.</param>
/// <param name="scheduler">The task scheduler on which all code should execute.</param>
/// <param name="cancellationToken">A cancellation token that may be used to cancel the for each operation.</param>
/// <param name="body">An asynchronous delegate that is invoked once per element in the data source.</param>
- /// <exception cref="System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> argument or <paramref name="body"/> argument is null.</exception>
+ /// <exception cref="ArgumentNullException">The <paramref name="source"/> argument or <paramref name="body"/> argument is <see langword="null"/>.</exception>
/// <returns>A task that represents the entire for each operation.</returns>
private static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
{
return Task.FromCanceled(cancellationToken);
}
- if (dop < 0)
- {
- dop = DefaultDegreeOfParallelism;
- }
-
// The worker body. Each worker will execute this same body.
Func<object, Task> taskBody = static async o =>
{
/// <summary>The <see cref="ExecutionContext"/> present at the time of the ForEachAsync invocation. This is only used if on the default scheduler.</summary>
private readonly ExecutionContext? _executionContext;
/// <summary>Semaphore used to provide exclusive access to the enumerator.</summary>
- private readonly SemaphoreSlim _lock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
+ private readonly SemaphoreSlim? _lock;
/// <summary>The number of outstanding workers. When this hits 0, the operation has completed.</summary>
private int _completionRefCount;
public readonly CancellationTokenSource Cancellation = new CancellationTokenSource();
/// <summary>Initializes the state object.</summary>
- protected ForEachAsyncState(Func<object, Task> taskBody, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
+ protected ForEachAsyncState(Func<object, Task> taskBody, bool needsLock, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
{
_taskBody = taskBody;
- _remainingDop = dop;
+ _lock = needsLock ? new SemaphoreSlim(initialCount: 1, maxCount: 1) : null;
+ _remainingDop = dop < 0 ? DefaultDegreeOfParallelism : dop;
LoopBody = body;
_scheduler = scheduler;
if (scheduler == TaskScheduler.Default)
public bool SignalWorkerCompletedIterating() => Interlocked.Decrement(ref _completionRefCount) == 0;
/// <summary>Asynchronously acquires exclusive access to the enumerator.</summary>
- public Task AcquireLock() =>
+ public Task AcquireLock()
+ {
// We explicitly don't pass this.Cancellation to WaitAsync. Doing so adds overhead, and it isn't actually
// necessary. All of the operations that monitor the lock are part of the same ForEachAsync operation, and the Task
// returned from ForEachAsync can't complete until all of the constituent operations have completed, including whoever
// the face of cancellation, in exchange for making it a bit slower / more overhead in the common case of cancellation
// not being requested. We want to optimize for the latter. This also then avoids an exception throw / catch when
// cancellation is requested.
- _lock.WaitAsync(CancellationToken.None);
+ Debug.Assert(_lock is not null, "Should only be invoked when _lock is non-null");
+ return _lock.WaitAsync(CancellationToken.None);
+ }
/// <summary>Relinquishes exclusive access to the enumerator.</summary>
- public void ReleaseLock() => _lock.Release();
+ public void ReleaseLock()
+ {
+ Debug.Assert(_lock is not null, "Should only be invoked when _lock is non-null");
+ _lock.Release();
+ }
/// <summary>Stores an exception and triggers cancellation in order to alert all workers to stop as soon as possible.</summary>
/// <param name="e">The exception.</param>
IEnumerable<TSource> source, Func<object, Task> taskBody,
int dop, TaskScheduler scheduler, CancellationToken cancellationToken,
Func<TSource, CancellationToken, ValueTask> body) :
- base(taskBody, dop, scheduler, cancellationToken, body)
+ base(taskBody, needsLock: true, dop, scheduler, cancellationToken, body)
{
Enumerator = source.GetEnumerator() ?? throw new InvalidOperationException(SR.Parallel_ForEach_NullEnumerator);
}
IAsyncEnumerable<TSource> source, Func<object, Task> taskBody,
int dop, TaskScheduler scheduler, CancellationToken cancellationToken,
Func<TSource, CancellationToken, ValueTask> body) :
- base(taskBody, dop, scheduler, cancellationToken, body)
+ base(taskBody, needsLock: true, dop, scheduler, cancellationToken, body)
{
Enumerator = source.GetAsyncEnumerator(Cancellation.Token) ?? throw new InvalidOperationException(SR.Parallel_ForEach_NullEnumerator);
}
return Enumerator.DisposeAsync();
}
}
+
+ /// <summary>Stores the state associated with an IAsyncEnumerable ForEachAsync operation, shared between all its workers.</summary>
+ /// <typeparam name="T">Specifies the type of data being enumerated.</typeparam>
+ private sealed class ForEachState<T> : ForEachAsyncState<T>
+ {
+ public T NextAvailable;
+ public readonly T ToExclusive;
+
+ public ForEachState(
+ T fromExclusive, T toExclusive, Func<object, Task> taskBody,
+ bool needsLock, int dop, TaskScheduler scheduler, CancellationToken cancellationToken,
+ Func<T, CancellationToken, ValueTask> body) :
+ base(taskBody, needsLock, dop, scheduler, cancellationToken, body)
+ {
+ NextAvailable = fromExclusive;
+ ToExclusive = toExclusive;
+ }
+ }
}
}
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
+using System.Numerics;
using System.Runtime.CompilerServices;
using Xunit;
AssertExtensions.Throws<ArgumentNullException>("source", () => { Parallel.ForEachAsync((IAsyncEnumerable<int>)null, CancellationToken.None, (item, cancellationToken) => default); });
AssertExtensions.Throws<ArgumentNullException>("source", () => { Parallel.ForEachAsync((IAsyncEnumerable<int>)null, new ParallelOptions(), (item, cancellationToken) => default); });
+ AssertExtensions.Throws<ArgumentNullException>("parallelOptions", () => { Parallel.ForAsync(1, 10, null, (item, cancellationToken) => default); });
AssertExtensions.Throws<ArgumentNullException>("parallelOptions", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), null, (item, cancellationToken) => default); });
AssertExtensions.Throws<ArgumentNullException>("parallelOptions", () => { Parallel.ForEachAsync(EnumerableRangeAsync(1, 10), null, (item, cancellationToken) => default); });
+ AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForAsync(1, 10, null); });
+ AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForAsync(1, 10, CancellationToken.None, null); });
+ AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForAsync(1, 10, new ParallelOptions(), null); });
+
AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), null); });
AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), CancellationToken.None, null); });
AssertExtensions.Throws<ArgumentNullException>("body", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), new ParallelOptions(), null); });
return default;
};
+ AssertCanceled(Parallel.ForAsync(1, 10, cts.Token, body));
AssertCanceled(Parallel.ForEachAsync(MarkStart(box), cts.Token, body));
AssertCanceled(Parallel.ForEachAsync(MarkStartAsync(box), cts.Token, body));
+ AssertCanceled(Parallel.ForAsync(1, 10, new ParallelOptions { CancellationToken = cts.Token }, body));
AssertCanceled(Parallel.ForEachAsync(MarkStart(box), new ParallelOptions { CancellationToken = cts.Token }, body));
AssertCanceled(Parallel.ForEachAsync(MarkStartAsync(box), new ParallelOptions { CancellationToken = cts.Token }, body));
}
}
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(-1)]
+ [InlineData(1)]
+ [InlineData(2)]
+ [InlineData(4)]
+ [InlineData(128)]
+ public async Task Dop_WorkersCreatedRespectingLimit_For(int dop)
+ {
+ bool exit = false;
+
+ int activeWorkers = 0;
+ var block = new TaskCompletionSource();
+
+ Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { MaxDegreeOfParallelism = dop }, async (item, cancellationToken) =>
+ {
+ Interlocked.Increment(ref activeWorkers);
+ await block.Task;
+ if (Volatile.Read(ref exit))
+ {
+ throw new FormatException();
+ }
+ });
+ Assert.False(t.IsCompleted);
+
+ await Task.Delay(20); // give the loop some time to run
+
+ Volatile.Write(ref exit, true);
+ block.SetResult();
+ await Assert.ThrowsAsync<FormatException>(() => t);
+
+ Assert.InRange(activeWorkers, 0, dop == -1 ? Environment.ProcessorCount : dop);
+ }
+
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(-1)]
[InlineData(1)]
Assert.InRange(activeWorkers, 0, dop == -1 ? Environment.ProcessorCount : dop);
}
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(-1)]
+ [InlineData(1)]
+ [InlineData(2)]
+ [InlineData(4)]
+ [InlineData(128)]
+ public async Task Dop_WorkersCreatedRespectingLimitAndTaskScheduler_For(int dop)
+ {
+ bool exit = false;
+ int activeWorkers = 0;
+ var block = new TaskCompletionSource();
+
+ int MaxSchedulerLimit = Math.Min(2, Environment.ProcessorCount);
+
+ Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { MaxDegreeOfParallelism = dop, TaskScheduler = new MaxConcurrencyLevelPassthroughTaskScheduler(MaxSchedulerLimit) }, async (item, cancellationToken) =>
+ {
+ Interlocked.Increment(ref activeWorkers);
+ await block.Task;
+ if (Volatile.Read(ref exit))
+ {
+ throw new FormatException();
+ }
+ });
+ Assert.False(t.IsCompleted);
+
+ await Task.Delay(20); // give the loop some time to run
+
+ Volatile.Write(ref exit, true);
+ block.SetResult();
+ await Assert.ThrowsAsync<FormatException>(() => t);
+
+ Assert.InRange(activeWorkers, 0, Math.Min(MaxSchedulerLimit, dop == -1 ? Environment.ProcessorCount : dop));
+ }
+
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(-1)]
[InlineData(1)]
Assert.InRange(activeWorkers, 0, Math.Min(MaxSchedulerLimit, dop == -1 ? Environment.ProcessorCount : dop));
}
+ [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ public async Task Dop_NegativeTaskSchedulerLimitTreatedAsDefault_For()
+ {
+ bool exit = false;
+ int activeWorkers = 0;
+ var block = new TaskCompletionSource();
+
+ Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { TaskScheduler = new MaxConcurrencyLevelPassthroughTaskScheduler(-42) }, async (item, cancellationToken) =>
+ {
+ Interlocked.Increment(ref activeWorkers);
+ await block.Task;
+ if (Volatile.Read(ref exit))
+ {
+ throw new FormatException();
+ }
+ });
+ Assert.False(t.IsCompleted);
+
+ await Task.Delay(20); // give the loop some time to run
+
+ Volatile.Write(ref exit, true);
+ block.SetResult();
+ await Assert.ThrowsAsync<FormatException>(() => t);
+
+ Assert.InRange(activeWorkers, 0, Environment.ProcessorCount);
+ }
+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task Dop_NegativeTaskSchedulerLimitTreatedAsDefault_Sync()
{
Assert.InRange(activeWorkers, 0, Environment.ProcessorCount);
}
+ [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ public async Task RunsAsynchronously_For()
+ {
+ var cts = new CancellationTokenSource();
+
+ Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, cts.Token, (item, cancellationToken) => default);
+ Assert.False(t.IsCompleted);
+
+ cts.Cancel();
+
+ await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+ }
+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task RunsAsynchronously_EvenForEntirelySynchronousWork_Sync()
{
Assert.InRange(activeWorkers, 0, dop == -1 ? Environment.ProcessorCount : dop);
}
+ [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ public void EmptyRange_For()
+ {
+ int counter = 0;
+ Task t = Parallel.ForAsync(10, 10, (item, cancellationToken) =>
+ {
+ Interlocked.Increment(ref counter);
+ return default;
+ });
+ Assert.True(t.IsCompletedSuccessfully);
+
+ Assert.Equal(0, counter);
+ }
+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task EmptySource_Sync()
{
Assert.Equal(0, counter);
}
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(false)]
+ [InlineData(true)]
+ public async Task AllItemsEnumeratedOnce_For(bool yield)
+ {
+ await Test<int>(yield);
+ await Test<uint>(yield);
+ await Test<long>(yield);
+ await Test<ulong>(yield);
+ await Test<short>(yield);
+ await Test<ushort>(yield);
+ await Test<nint>(yield);
+ await Test<nuint>(yield);
+ await Test<Int128>(yield);
+ await Test<UInt128>(yield);
+ await Test<BigInteger>(yield);
+
+ async Task Test<T>(bool yield) where T : IBinaryInteger<T>
+ {
+ const int Start = 10, Count = 10_000;
+
+ var set = new HashSet<T>();
+
+ await Parallel.ForAsync(T.CreateTruncating(Start), T.CreateTruncating(Start + Count), async (item, cancellationToken) =>
+ {
+ lock (set)
+ {
+ Assert.True(set.Add(item));
+ }
+
+ if (yield)
+ {
+ await Task.Yield();
+ }
+ });
+
+ Assert.False(set.Contains(T.CreateTruncating(Start - 1)));
+ for (int i = Start; i < Start + Count; i++)
+ {
+ Assert.True(set.Contains(T.CreateTruncating(i)));
+ }
+ Assert.False(set.Contains(T.CreateTruncating(Start + Count + 1)));
+ }
+ }
+
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(false)]
[InlineData(true)]
}
});
+ Assert.False(set.Contains(Start - 1));
for (int i = Start; i < Start + Count; i++)
{
Assert.True(set.Contains(i));
}
+ Assert.False(set.Contains(Start + Count + 1));
}
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
}
});
+ Assert.False(set.Contains(Start - 1));
for (int i = Start; i < Start + Count; i++)
{
Assert.True(set.Contains(i));
}
+ Assert.False(set.Contains(Start + Count + 1));
+ }
+
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(false)]
+ [InlineData(true)]
+ public async Task TaskScheduler_AllCodeExecutedOnCorrectScheduler_For(bool defaultScheduler)
+ {
+ TaskScheduler scheduler = defaultScheduler ?
+ TaskScheduler.Default :
+ new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
+
+ TaskScheduler otherScheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
+
+ var cq = new ConcurrentQueue<int>();
+
+ await Parallel.ForAsync(1, 101, new ParallelOptions { TaskScheduler = scheduler }, async (item, cancellationToken) =>
+ {
+ Assert.Same(scheduler, TaskScheduler.Current);
+ await Task.Yield();
+ cq.Enqueue(item);
+
+ if (item % 10 == 0)
+ {
+ await new SwitchTo(otherScheduler);
+ }
+ });
+
+ Assert.Equal(Enumerable.Range(1, 100), cq.OrderBy(i => i));
}
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
Assert.Equal(Enumerable.Range(1, 100), cq.OrderBy(i => i));
}
+ [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ public async Task Cancellation_CancelsIterationAndReturnsCanceledTask_For()
+ {
+ using var cts = new CancellationTokenSource(10);
+ OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(() => Parallel.ForAsync(long.MinValue, long.MaxValue, cts.Token, async (item, cancellationToken) =>
+ {
+ await Task.Yield();
+ }));
+ Assert.Equal(cts.Token, oce.CancellationToken);
+ }
+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task Cancellation_CancelsIterationAndReturnsCanceledTask_Sync()
{
});
}
+ [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ public async Task Cancellation_SameTokenPassedToEveryInvocation_For()
+ {
+ var cq = new ConcurrentQueue<CancellationToken>();
+
+ await Parallel.ForAsync(1, 101, async (item, cancellationToken) =>
+ {
+ cq.Enqueue(cancellationToken);
+ await Task.Yield();
+ });
+
+ Assert.Equal(100, cq.Count);
+ Assert.Equal(1, cq.Distinct().Count());
+ }
+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task Cancellation_SameTokenPassedToEveryInvocation_Sync()
{
Assert.Equal(1, cq.Distinct().Count());
}
+ [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ public async Task Cancellation_HasPriorityOverExceptions_For()
+ {
+ var tcs = new TaskCompletionSource();
+ var cts = new CancellationTokenSource();
+
+ Task t = Parallel.ForAsync(0, long.MaxValue, new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = 2 }, async (item, cancellationToken) =>
+ {
+ if (item == 0)
+ {
+ await tcs.Task;
+ cts.Cancel();
+ throw new FormatException();
+ }
+ else
+ {
+ tcs.TrySetResult();
+ await Task.Yield();
+ }
+ });
+
+ OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+ Assert.Equal(cts.Token, oce.CancellationToken);
+ Assert.True(t.IsCanceled);
+ }
+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task Cancellation_HasPriorityOverExceptions_Sync()
{
Assert.True(t.IsCanceled);
}
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(false)]
+ [InlineData(true)]
+ public async Task Cancellation_FaultsForOceForNonCancellation_For(bool internalToken)
+ {
+ var cts = new CancellationTokenSource();
+
+ Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { CancellationToken = cts.Token }, (item, cancellationToken) =>
+ {
+ throw new OperationCanceledException(internalToken ? cancellationToken : cts.Token);
+ });
+
+ await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+ Assert.True(t.IsFaulted);
+ }
+
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(false)]
[InlineData(true)]
Assert.True(t.IsFaulted);
}
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(0, 4)]
+ [InlineData(1, 4)]
+ [InlineData(2, 4)]
+ [InlineData(3, 4)]
+ [InlineData(4, 4)]
+ public async Task Cancellation_InternalCancellationExceptionsArentFilteredOut_For(int numThrowingNonCanceledOce, int total)
+ {
+ var cts = new CancellationTokenSource();
+
+ var barrier = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ int remainingCount = total;
+
+ Task t = Parallel.ForAsync(0, total, new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = total }, async (item, cancellationToken) =>
+ {
+ // Wait for all operations to be started
+ if (Interlocked.Decrement(ref remainingCount) == 0)
+ {
+ barrier.SetResult();
+ }
+ await barrier.Task;
+
+ throw item < numThrowingNonCanceledOce ?
+ new OperationCanceledException(cancellationToken) :
+ throw new FormatException();
+ });
+
+ await Assert.ThrowsAnyAsync<Exception>(() => t);
+ Assert.Equal(total, t.Exception.InnerExceptions.Count);
+ Assert.Equal(numThrowingNonCanceledOce, t.Exception.InnerExceptions.Count(e => e is OperationCanceledException));
+ }
+
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(0, 4)]
[InlineData(1, 4)]
Assert.True(t.IsFaulted);
}
+ [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ public async Task Exception_FromLoopBody_For()
+ {
+ var barrier = new Barrier(2);
+ Task t = Parallel.ForAsync(1, 3, new ParallelOptions { MaxDegreeOfParallelism = barrier.ParticipantCount }, (item, cancellationToken) =>
+ {
+ barrier.SignalAndWait();
+ throw item switch
+ {
+ 1 => new FormatException(),
+ 2 => new InvalidTimeZoneException(),
+ _ => new Exception()
+ };
+ });
+ await Assert.ThrowsAnyAsync<Exception>(() => t);
+ Assert.True(t.IsFaulted);
+ Assert.Equal(2, t.Exception.InnerExceptions.Count);
+ Assert.Contains(t.Exception.InnerExceptions, e => e is FormatException);
+ Assert.Contains(t.Exception.InnerExceptions, e => e is InvalidTimeZoneException);
+ }
+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task Exception_FromLoopBody_Sync()
{
Assert.Contains(t.Exception.InnerExceptions, e => e is InvalidTimeZoneException);
}
+ [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ public async Task Exception_ImplicitlyCancelsOtherWorkers_For()
+ {
+ await Assert.ThrowsAsync<Exception>(() => Parallel.ForAsync(0, long.MaxValue, async (item, cancellationToken) =>
+ {
+ await Task.Yield();
+ if (item == 1000)
+ {
+ throw new Exception();
+ }
+ }));
+
+ await Assert.ThrowsAsync<FormatException>(() => Parallel.ForAsync(0, long.MaxValue, new ParallelOptions { MaxDegreeOfParallelism = 2 }, async (item, cancellationToken) =>
+ {
+ if (item == 0)
+ {
+ throw new FormatException();
+ }
+ else
+ {
+ Assert.Equal(1, item);
+ var tcs = new TaskCompletionSource();
+ cancellationToken.Register(() => tcs.SetResult());
+ await tcs.Task;
+ }
+ }));
+ }
+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task Exception_ImplicitlyCancelsOtherWorkers_Sync()
{
Assert.IsType<FormatException>(ae.InnerException);
}
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(false)]
+ [InlineData(true)]
+ public async Task ExecutionContext_FlowsToWorkerBodies_For(bool defaultScheduler)
+ {
+ TaskScheduler scheduler = defaultScheduler ?
+ TaskScheduler.Default :
+ new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
+
+ var al = new AsyncLocal<int>();
+ al.Value = 42;
+ await Parallel.ForAsync(0, 100, async (item, cancellationToken) =>
+ {
+ await Task.Yield();
+ Assert.Equal(42, al.Value);
+ });
+ }
+
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(false)]
[InlineData(true)]