From 015162f2b1950fddaa2850b2e6938bcb884a81c3 Mon Sep 17 00:00:00 2001 From: James Ko Date: Sun, 1 Jan 2017 20:40:39 -0500 Subject: [PATCH] Delete unused Threading types/members (dotnet/coreclr#8766) Commit migrated from https://github.com/dotnet/coreclr/commit/0d99e34c932ba91440665335e8e7d48260c4e0be --- .../src/mscorlib/mscorlib.shared.sources.props | 4 - .../Threading/Tasks/BeginEndAwaitableAdapter.cs | 157 - .../src/System/Threading/Tasks/Parallel.cs | 3593 -------------------- .../System/Threading/Tasks/ParallelLoopState.cs | 641 ---- .../System/Threading/Tasks/ParallelRangeManager.cs | 279 -- .../mscorlib/src/System/Threading/Tasks/Task.cs | 96 - .../src/mscorlib/src/System/Threading/Thread.cs | 211 -- 7 files changed, 4981 deletions(-) delete mode 100644 src/coreclr/src/mscorlib/src/System/Threading/Tasks/BeginEndAwaitableAdapter.cs delete mode 100644 src/coreclr/src/mscorlib/src/System/Threading/Tasks/Parallel.cs delete mode 100644 src/coreclr/src/mscorlib/src/System/Threading/Tasks/ParallelLoopState.cs delete mode 100644 src/coreclr/src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs diff --git a/src/coreclr/src/mscorlib/mscorlib.shared.sources.props b/src/coreclr/src/mscorlib/mscorlib.shared.sources.props index ee6d6fd..0ff31ea 100644 --- a/src/coreclr/src/mscorlib/mscorlib.shared.sources.props +++ b/src/coreclr/src/mscorlib/mscorlib.shared.sources.props @@ -752,9 +752,6 @@ - - - @@ -768,7 +765,6 @@ - diff --git a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/BeginEndAwaitableAdapter.cs b/src/coreclr/src/mscorlib/src/System/Threading/Tasks/BeginEndAwaitableAdapter.cs deleted file mode 100644 index 71eb787..0000000 --- a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/BeginEndAwaitableAdapter.cs +++ /dev/null @@ -1,157 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System; -using System.Diagnostics; -using System.Diagnostics.Contracts; -using System.IO; -using System.Runtime.CompilerServices; -using System.Security; -using System.Threading; -using System.Threading.Tasks; - -namespace System.Threading.Tasks { - -/// -/// Provides an adapter to make Begin/End pairs awaitable. -/// In general, Task.Factory.FromAsync should be used for this purpose. -/// However, for cases where absolute minimal overhead is required, this type -/// may be used to making APM pairs awaitable while minimizing overhead. -/// (APM = Asynchronous Programming Model or the Begin/End pattern.) -/// -/// -/// This instance may be reused repeatedly. However, it must only be used -/// by a single APM invocation at a time. It's state will automatically be reset -/// when the await completes. -/// -/// -/// Usage sample: -/// -/// static async Task CopyStreamAsync(Stream source, Stream dest) { -/// -/// BeginEndAwaitableAdapter adapter = new BeginEndAwaitableAdapter(); -/// Byte[] buffer = new Byte[0x1000]; -/// -/// while (true) { -/// -/// source.BeginRead(buffer, 0, buffer.Length, BeginEndAwaitableAdapter.Callback, adapter); -/// Int32 numRead = source.EndRead(await adapter); -/// if (numRead == 0) -/// break; -/// -/// dest.BeginWrite(buffer, 0, numRead, BeginEndAwaitableAdapter.Callback, adapter); -/// dest.EndWrite(await adapter); -/// } -/// } -/// -/// -internal sealed class BeginEndAwaitableAdapter : ICriticalNotifyCompletion { - - /// A sentinel marker used to communicate between OnCompleted and the APM callback - /// that the callback has already run, and thus OnCompleted needs to execute the callback. - private readonly static Action CALLBACK_RAN = () => { }; - - /// The IAsyncResult for the APM operation. - private IAsyncResult _asyncResult; - - /// The continuation delegate provided to the awaiter. - private Action _continuation; - - - /// A callback to be passed as the AsyncCallback to an APM pair. - /// It expects that an BeginEndAwaitableAdapter instance was supplied to the APM Begin method as the object state. - public readonly static AsyncCallback Callback = (asyncResult) => { - - Debug.Assert(asyncResult != null); - Debug.Assert(asyncResult.IsCompleted); - Debug.Assert(asyncResult.AsyncState is BeginEndAwaitableAdapter); - - // Get the adapter object supplied as the "object state" to the Begin method - BeginEndAwaitableAdapter adapter = (BeginEndAwaitableAdapter) asyncResult.AsyncState; - - // Store the IAsyncResult into it so that it's available to the awaiter - adapter._asyncResult = asyncResult; - - // If the _continuation has already been set to the actual continuation by OnCompleted, then invoke the continuation. - // Set _continuation to the CALLBACK_RAN sentinel so that IsCompleted returns true and OnCompleted sees the sentinel - // and knows to invoke the callback. - // Due to some known incorrect implementations of IAsyncResult in the Framework where CompletedSynchronously is lazily - // set to true if it is first invoked after IsCompleted is true, we cannot rely here on CompletedSynchronously for - // synchronization between the caller and the callback, and thus do not use CompletedSynchronously at all. - Action continuation = Interlocked.Exchange(ref adapter._continuation, CALLBACK_RAN); - if (continuation != null) { - - Debug.Assert(continuation != CALLBACK_RAN); - continuation(); - } - }; - - - /// Gets an awaiter. - /// Returns itself as the awaiter. - public BeginEndAwaitableAdapter GetAwaiter() { - - return this; - } - - - /// Gets whether the awaited APM operation completed. - public bool IsCompleted { - get { - - // We are completed if the callback was called and it set the continuation to the CALLBACK_RAN sentinel. - // If the operation completes asynchronously, there's still a chance we'll see CALLBACK_RAN here, in which - // case we're still good to keep running synchronously. - return (_continuation == CALLBACK_RAN); - } - } - - /// Schedules the continuation to run when the operation completes. - /// The continuation. - public void UnsafeOnCompleted(Action continuation) { - - Debug.Assert(continuation != null); - OnCompleted(continuation); - } - - - /// Schedules the continuation to run when the operation completes. - /// The continuation. - public void OnCompleted(Action continuation) { - - Debug.Assert(continuation != null); - - // If the continuation field is null, then set it to be the target continuation - // so that when the operation completes, it'll invoke the continuation. If it's non-null, - // it was already set to the CALLBACK_RAN-sentinel by the Callback, in which case we hit a very rare race condition - // where the operation didn't complete synchronously but completed asynchronously between our - // calls to IsCompleted and OnCompleted... in that case, just schedule a task to run the continuation. - if (_continuation == CALLBACK_RAN - || Interlocked.CompareExchange(ref _continuation, continuation, null) == CALLBACK_RAN) { - - Task.Run(continuation); // must run async at this point, or else we'd risk stack diving - } - } - - - /// Gets the IAsyncResult for the APM operation after the operation completes, and then resets the adapter. - /// The IAsyncResult for the operation. - public IAsyncResult GetResult() { - - Debug.Assert(_asyncResult != null && _asyncResult.IsCompleted); - - // Get the IAsyncResult - IAsyncResult result = _asyncResult; - - // Reset the adapter - _asyncResult = null; - _continuation = null; - - // Return the result - return result; - } - -} // class BeginEndAwaitableAdapter - -} // namespace diff --git a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/Parallel.cs b/src/coreclr/src/mscorlib/src/System/Threading/Tasks/Parallel.cs deleted file mode 100644 index 7808943..0000000 --- a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/Parallel.cs +++ /dev/null @@ -1,3593 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -// -// -// -// A helper class that contains parallel versions of various looping constructs. This -// internally uses the task parallel library, but takes care to expose very little -// evidence of this infrastructure being used. -// -// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - -using System; -using System.Diagnostics; -using System.Collections; -using System.Collections.Generic; -using System.Collections.Concurrent; -using System.Security.Permissions; -using System.Threading; -using System.Threading.Tasks; -using System.Diagnostics.Contracts; - - -namespace System.Threading.Tasks -{ - /// - /// Stores options that configure the operation of methods on the - /// Parallel class. - /// - /// - /// By default, methods on the Parallel class attempt to utilize all available processors, are non-cancelable, and target - /// the default TaskScheduler (TaskScheduler.Default). enables - /// overriding these defaults. - /// - public class ParallelOptions - { - private TaskScheduler m_scheduler; - private int m_maxDegreeOfParallelism; - private CancellationToken m_cancellationToken; - - /// - /// Initializes a new instance of the class. - /// - /// - /// This constructor initializes the instance with default values. - /// is initialized to -1, signifying that there is no upper bound set on how much parallelism should - /// be employed. is initialized to a non-cancelable token, - /// and is initialized to the default scheduler (TaskScheduler.Default). - /// All of these defaults may be overwritten using the property set accessors on the instance. - /// - public ParallelOptions() - { - m_scheduler = TaskScheduler.Default; - m_maxDegreeOfParallelism = -1; - m_cancellationToken = CancellationToken.None; - } - - /// - /// Gets or sets the TaskScheduler - /// associated with this instance. Setting this property to null - /// indicates that the current scheduler should be used. - /// - public TaskScheduler TaskScheduler - { - get { return m_scheduler; } - set { m_scheduler = value; } - } - - // Convenience property used by TPL logic - internal TaskScheduler EffectiveTaskScheduler - { - get - { - if (m_scheduler == null) return TaskScheduler.Current; - else return m_scheduler; - } - } - - /// - /// Gets or sets the maximum degree of parallelism enabled by this ParallelOptions instance. - /// - /// - /// The limits the number of concurrent operations run by Parallel method calls that are passed this - /// ParallelOptions instance to the set value, if it is positive. If is -1, then there is no limit placed on the number of concurrently - /// running operations. - /// - /// - /// The exception that is thrown when this is set to 0 or some - /// value less than -1. - /// - public int MaxDegreeOfParallelism - { - get { return m_maxDegreeOfParallelism; } - set - { - if ((value == 0) || (value < -1)) - throw new ArgumentOutOfRangeException(nameof(MaxDegreeOfParallelism)); - m_maxDegreeOfParallelism = value; - } - } - - /// - /// Gets or sets the CancellationToken - /// associated with this instance. - /// - /// - /// Providing a CancellationToken - /// to a Parallel method enables the operation to be - /// exited early. Code external to the operation may cancel the token, and if the operation observes the - /// token being set, it may exit early by throwing an - /// . - /// - public CancellationToken CancellationToken - { - get { return m_cancellationToken; } - set { m_cancellationToken = value; } - } - - internal int EffectiveMaxConcurrencyLevel - { - get - { - int rval = MaxDegreeOfParallelism; - int schedulerMax = EffectiveTaskScheduler.MaximumConcurrencyLevel; - if ((schedulerMax > 0) && (schedulerMax != Int32.MaxValue)) - { - rval = (rval == -1) ? schedulerMax : Math.Min(schedulerMax, rval); - } - return rval; - } - } - } - - /// - /// Provides support for parallel loops and regions. - /// - /// - /// The class provides library-based data parallel replacements - /// for common operations such as for loops, for each loops, and execution of a set of statements. - /// - public static class Parallel - { - // static counter for generating unique Fork/Join Context IDs to be used in ETW events - internal static int s_forkJoinContextID; - - // We use a stride for loops to amortize the frequency of interlocked operations. - internal const int DEFAULT_LOOP_STRIDE = 16; - - // Static variable to hold default parallel options - internal static ParallelOptions s_defaultParallelOptions = new ParallelOptions(); - - /// - /// Executes each of the provided actions, possibly in parallel. - /// - /// An array of Actions to execute. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// array contains a null element. - /// The exception that is thrown when any - /// action in the array throws an exception. - /// - /// This method can be used to execute a set of operations, potentially in parallel. - /// No guarantees are made about the order in which the operations execute or whether - /// they execute in parallel. This method does not return until each of the - /// provided operations has completed, regardless of whether completion - /// occurs due to normal or exceptional termination. - /// - public static void Invoke(params Action[] actions) - { - Invoke(s_defaultParallelOptions, actions); - } - - /// - /// Executes each of the provided actions, possibly in parallel. - /// - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// An array of Actions to execute. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// array contains a null element. - /// The exception that is thrown when - /// the CancellationToken in the - /// is set. - /// The exception that is thrown when any - /// action in the array throws an exception. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// - /// This method can be used to execute a set of operations, potentially in parallel. - /// No guarantees are made about the order in which the operations execute or whether - /// the they execute in parallel. This method does not return until each of the - /// provided operations has completed, regardless of whether completion - /// occurs due to normal or exceptional termination. - /// - public static void Invoke(ParallelOptions parallelOptions, params Action[] actions) - { - if (actions == null) - { - throw new ArgumentNullException(nameof(actions)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - // Throw an ODE if we're passed a disposed CancellationToken. - if (parallelOptions.CancellationToken.CanBeCanceled - && AppContextSwitches.ThrowExceptionIfDisposedCancellationTokenSource) - { - parallelOptions.CancellationToken.ThrowIfSourceDisposed(); - } - // Quit early if we're already canceled -- avoid a bunch of work. - if (parallelOptions.CancellationToken.IsCancellationRequested) - throw new OperationCanceledException(parallelOptions.CancellationToken); - - // We must validate that the actions array contains no null elements, and also - // make a defensive copy of the actions array. - Action[] actionsCopy = new Action[actions.Length]; - for (int i = 0; i < actionsCopy.Length; i++) - { - actionsCopy[i] = actions[i]; - if (actionsCopy[i] == null) - { - throw new ArgumentException(Environment.GetResourceString("Parallel_Invoke_ActionNull")); - } - } - - // ETW event for Parallel Invoke Begin - int forkJoinContextID = 0; - Task callerTask = null; - if (TplEtwProvider.Log.IsEnabled()) - { - forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); - callerTask = Task.InternalCurrent; - TplEtwProvider.Log.ParallelInvokeBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelInvoke, - actionsCopy.Length); - } - -#if DEBUG - actions = null; // Ensure we don't accidentally use this below. -#endif - - // If we have no work to do, we are done. - if (actionsCopy.Length < 1) return; - - // In the algorithm below, if the number of actions is greater than this, we automatically - // use Parallel.For() to handle the actions, rather than the Task-per-Action strategy. - const int SMALL_ACTIONCOUNT_LIMIT = 10; - - try - { - // If we've gotten this far, it's time to process the actions. - if ((actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) || - (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length)) - { - // Used to hold any exceptions encountered during action processing - ConcurrentQueue exceptionQ = null; // will be lazily initialized if necessary - - // This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism. - try - { - // Launch a self-replicating task to handle the execution of all actions. - // The use of a self-replicating task allows us to use as many cores - // as are available, and no more. The exception to this rule is - // that, in the case of a blocked action, the ThreadPool may inject - // extra threads, which means extra tasks can run. - int actionIndex = 0; - ParallelForReplicatingTask rootTask = new ParallelForReplicatingTask(parallelOptions, delegate - { - // Each for-task will pull an action at a time from the list - int myIndex = Interlocked.Increment(ref actionIndex); // = index to use + 1 - while (myIndex <= actionsCopy.Length) - { - // Catch and store any exceptions. If we don't catch them, the self-replicating - // task will exit, and that may cause other SR-tasks to exit. - // And (absent cancellation) we want all actions to execute. - try - { - actionsCopy[myIndex - 1](); - } - catch (Exception e) - { - LazyInitializer.EnsureInitialized>(ref exceptionQ, () => { return new ConcurrentQueue(); }); - exceptionQ.Enqueue(e); - } - - // Check for cancellation. If it is encountered, then exit the delegate. - if (parallelOptions.CancellationToken.IsCancellationRequested) - throw new OperationCanceledException(parallelOptions.CancellationToken); - - // You're still in the game. Grab your next action index. - myIndex = Interlocked.Increment(ref actionIndex); - } - }, TaskCreationOptions.None, InternalTaskOptions.SelfReplicating); - - rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); - rootTask.Wait(); - } - catch (Exception e) - { - LazyInitializer.EnsureInitialized>(ref exceptionQ, () => { return new ConcurrentQueue(); }); - - // Since we're consuming all action exceptions, there are very few reasons that - // we would see an exception here. Two that come to mind: - // (1) An OCE thrown by one or more actions (AggregateException thrown) - // (2) An exception thrown from the ParallelForReplicatingTask constructor - // (regular exception thrown). - // We'll need to cover them both. - AggregateException ae = e as AggregateException; - if (ae != null) - { - // Strip off outer container of an AggregateException, because downstream - // logic needs OCEs to be at the top level. - foreach (Exception exc in ae.InnerExceptions) exceptionQ.Enqueue(exc); - } - else - { - exceptionQ.Enqueue(e); - } - } - - // If we have encountered any exceptions, then throw. - if ((exceptionQ != null) && (exceptionQ.Count > 0)) - { - ThrowIfReducableToSingleOCE(exceptionQ, parallelOptions.CancellationToken); - throw new AggregateException(exceptionQ); - } - } - else - { - // This is more efficient for a small number of actions and no DOP support - - // Initialize our array of tasks, one per action. - Task[] tasks = new Task[actionsCopy.Length]; - - // One more check before we begin... - if (parallelOptions.CancellationToken.IsCancellationRequested) - throw new OperationCanceledException(parallelOptions.CancellationToken); - - // Launch all actions as tasks - for (int i = 1; i < tasks.Length; i++) - { - tasks[i] = Task.Factory.StartNew(actionsCopy[i], parallelOptions.CancellationToken, TaskCreationOptions.None, - InternalTaskOptions.None, parallelOptions.EffectiveTaskScheduler); - } - - // Optimization: Use current thread to run something before we block waiting for all tasks. - tasks[0] = new Task(actionsCopy[0]); - tasks[0].RunSynchronously(parallelOptions.EffectiveTaskScheduler); - - // Now wait for the tasks to complete. This will not unblock until all of - // them complete, and it will throw an exception if one or more of them also - // threw an exception. We let such exceptions go completely unhandled. - try - { - if (tasks.Length <= 4) - { - // for 4 or less tasks, the sequential waitall version is faster - Task.FastWaitAll(tasks); - } - else - { - // otherwise we revert to the regular WaitAll which delegates the multiple wait to the cooperative event. - Task.WaitAll(tasks); - } - } - catch (AggregateException aggExp) - { - // see if we can combine it into a single OCE. If not propagate the original exception - ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); - throw; - } - finally - { - for (int i = 0; i < tasks.Length; i++) - { - if (tasks[i].IsCompleted) tasks[i].Dispose(); - } - } - } - } - finally - { - // ETW event for Parallel Invoke End - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelInvokeEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID); - } - } - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The start index, inclusive. - /// The end index, exclusive. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int32) as a parameter. - /// - public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForWorker( - fromInclusive, toExclusive, - s_defaultParallelOptions, - body, null, null, null, null); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The start index, inclusive. - /// The end index, exclusive. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int64) as a parameter. - /// - public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForWorker64( - fromInclusive, toExclusive, s_defaultParallelOptions, - body, null, null, null, null); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The start index, inclusive. - /// The end index, exclusive. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int32) as a parameter. - /// - public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker( - fromInclusive, toExclusive, parallelOptions, - body, null, null, null, null); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The start index, inclusive. - /// The end index, exclusive. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int64) as a parameter. - /// - public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker64( - fromInclusive, toExclusive, parallelOptions, - body, null, null, null, null); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The start index, inclusive. - /// The end index, exclusive. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), - /// and a ParallelLoopState instance that may be - /// used to break out of the loop prematurely. - /// - /// - /// Calling ParallelLoopState.Break() - /// informs the For operation that iterations after the current one need not - /// execute. However, all iterations before the current one will still need to be executed if they have not already. - /// Therefore, calling Break is similar to using a break operation within a - /// conventional for loop in a language like C#, but it is not a perfect substitute: for example, there is no guarantee that iterations - /// after the current one will definitely not execute. - /// - /// - /// If executing all iterations before the current one is not necessary, - /// ParallelLoopState.Stop() - /// should be preferred to using Break. Calling Stop informs the For loop that it may abandon all remaining - /// iterations, regardless of whether they're for interations above or below the current, - /// since all required work has already been completed. As with Break, however, there are no guarantees regarding - /// which other iterations will not execute. - /// - /// - /// When a loop is ended prematurely, the that's returned will contain - /// relevant information about the loop's completion. - /// - /// - public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForWorker( - fromInclusive, toExclusive, s_defaultParallelOptions, - null, body, null, null, null); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The start index, inclusive. - /// The end index, exclusive. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), - /// and a ParallelLoopState instance that may be - /// used to break out of the loop prematurely. - /// - public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForWorker64( - fromInclusive, toExclusive, s_defaultParallelOptions, - null, body, null, null, null); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The start index, inclusive. - /// The end index, exclusive. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), - /// and a ParallelLoopState instance that may be - /// used to break out of the loop prematurely. - /// - public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker( - fromInclusive, toExclusive, parallelOptions, - null, body, null, null, null); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The start index, inclusive. - /// The end index, exclusive. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), - /// and a ParallelLoopState instance that may be - /// used to break out of the loop prematurely. - /// - public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, - Action body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker64( - fromInclusive, toExclusive, parallelOptions, - null, body, null, null, null); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The type of the thread-local data. - /// The start index, inclusive. - /// The end index, exclusive. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult For( - int fromInclusive, int toExclusive, - Func localInit, - Func body, - Action localFinally) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return ForWorker( - fromInclusive, toExclusive, s_defaultParallelOptions, - null, null, body, localInit, localFinally); - } - - /// - /// Executes a for loop in which iterations may run in parallel. Supports 64-bit indices. - /// - /// The type of the thread-local data. - /// The start index, inclusive. - /// The end index, exclusive. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult For( - long fromInclusive, long toExclusive, - Func localInit, - Func body, - Action localFinally) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return ForWorker64( - fromInclusive, toExclusive, s_defaultParallelOptions, - null, null, body, localInit, localFinally); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The type of the thread-local data. - /// The start index, inclusive. - /// The end index, exclusive. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult For( - int fromInclusive, int toExclusive, ParallelOptions parallelOptions, - Func localInit, - Func body, - Action localFinally) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker( - fromInclusive, toExclusive, parallelOptions, - null, null, body, localInit, localFinally); - } - - /// - /// Executes a for loop in which iterations may run in parallel. - /// - /// The type of the thread-local data. - /// The start index, inclusive. - /// The end index, exclusive. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult For( - long fromInclusive, long toExclusive, ParallelOptions parallelOptions, - Func localInit, - Func body, - Action localFinally) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - - return ForWorker64( - fromInclusive, toExclusive, parallelOptions, - null, null, body, localInit, localFinally); - } - - - - - - - - /// - /// Performs the major work of the parallel for loop. It assumes that argument validation has already - /// been performed by the caller. This function's whole purpose in life is to enable as much reuse of - /// common implementation details for the various For overloads we offer. Without it, we'd end up - /// with lots of duplicate code. It handles: (1) simple for loops, (2) for loops that depend on - /// ParallelState, and (3) for loops with thread local data. - /// - /// - /// The type of the local data. - /// The loop's start index, inclusive. - /// The loop's end index, exclusive. - /// A ParallelOptions instance. - /// The simple loop body. - /// The loop body for ParallelState overloads. - /// The loop body for thread local state overloads. - /// A selector function that returns new thread local state. - /// A cleanup function to destroy thread local state. - /// Only one of the body arguments may be supplied (i.e. they are exclusive). - /// A structure. - private static ParallelLoopResult ForWorker( - int fromInclusive, int toExclusive, - ParallelOptions parallelOptions, - Action body, - Action bodyWithState, - Func bodyWithLocal, - Func localInit, Action localFinally) - { - Debug.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + (bodyWithLocal == null ? 0 : 1)) == 1, - "expected exactly one body function to be supplied"); - Debug.Assert(bodyWithLocal != null || (localInit == null && localFinally == null), - "thread local functions should only be supplied for loops w/ thread local bodies"); - - // Instantiate our result. Specifics will be filled in later. - ParallelLoopResult result = new ParallelLoopResult(); - - // We just return immediately if 'to' is smaller (or equal to) 'from'. - if (toExclusive <= fromInclusive) - { - result.m_completed = true; - return result; - } - - // For all loops we need a shared flag even though we don't have a body with state, - // because the shared flag contains the exceptional bool, which triggers other workers - // to exit their loops if one worker catches an exception - ParallelLoopStateFlags32 sharedPStateFlags = new ParallelLoopStateFlags32(); - - TaskCreationOptions creationOptions = TaskCreationOptions.None; - InternalTaskOptions internalOptions = InternalTaskOptions.SelfReplicating; - - // Before getting started, do a quick peek to see if we have been canceled already - if (parallelOptions.CancellationToken.IsCancellationRequested) - { - throw new OperationCanceledException(parallelOptions.CancellationToken); - } - - // initialize ranges with passed in loop arguments and expected number of workers - int numExpectedWorkers = (parallelOptions.EffectiveMaxConcurrencyLevel == -1) ? - PlatformHelper.ProcessorCount : - parallelOptions.EffectiveMaxConcurrencyLevel; - RangeManager rangeManager = new RangeManager(fromInclusive, toExclusive, 1, numExpectedWorkers); - - // Keep track of any cancellations - OperationCanceledException oce = null; - - CancellationTokenRegistration ctr = new CancellationTokenRegistration(); - - // if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr = parallelOptions.CancellationToken.InternalRegisterWithoutEC((o) => - { - // Cause processing to stop - sharedPStateFlags.Cancel(); - // Record our cancellation - oce = new OperationCanceledException(parallelOptions.CancellationToken); - }, null); - } - - // ETW event for Parallel For begin - int forkJoinContextID = 0; - Task callingTask = null; - if (TplEtwProvider.Log.IsEnabled()) - { - forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); - callingTask = Task.InternalCurrent; - TplEtwProvider.Log.ParallelLoopBegin((callingTask != null ? callingTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callingTask != null ? callingTask.Id : 0), - forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelFor, - fromInclusive, toExclusive); - } - - ParallelForReplicatingTask rootTask = null; - - try - { - // this needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel - rootTask = new ParallelForReplicatingTask( - parallelOptions, - delegate - { - // - // first thing we do upon enterying the task is to register as a new "RangeWorker" with the - // shared RangeManager instance. - // - // If this call returns a RangeWorker struct which wraps the state needed by this task - // - // We need to call FindNewWork32() on it to see whether there's a chunk available. - // - - - // Cache some information about the current task - Task currentWorkerTask = Task.InternalCurrent; - bool bIsRootTask = (currentWorkerTask == rootTask); - - RangeWorker currentWorker = new RangeWorker(); - Object savedStateFromPreviousReplica = currentWorkerTask.SavedStateFromPreviousReplica; - - if (savedStateFromPreviousReplica is RangeWorker) - currentWorker = (RangeWorker)savedStateFromPreviousReplica; - else - currentWorker = rangeManager.RegisterNewWorker(); - - - - // These are the local index values to be used in the sequential loop. - // Their values filled in by FindNewWork32 - int nFromInclusiveLocal; - int nToExclusiveLocal; - - if (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) == false || - sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true) - { - return; // no need to run - } - - // ETW event for ParallelFor Worker Fork - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - - TLocal localValue = default(TLocal); - bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't - - try - { - // Create a new state object that references the shared "stopped" and "exceptional" flags - // If needed, it will contain a new instance of thread-local state by invoking the selector. - ParallelLoopState32 state = null; - - if (bodyWithState != null) - { - Debug.Assert(sharedPStateFlags != null); - state = new ParallelLoopState32(sharedPStateFlags); - } - else if (bodyWithLocal != null) - { - Debug.Assert(sharedPStateFlags != null); - state = new ParallelLoopState32(sharedPStateFlags); - if (localInit != null) - { - localValue = localInit(); - bLocalValueInitialized = true; - } - } - - // initialize a loop timer which will help us decide whether we should exit early - LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount); - - // Now perform the loop itself. - do - { - if (body != null) - { - for (int j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state - j += 1) - { - - body(j); - } - } - else if (bodyWithState != null) - { - for (int j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop(j)); - j += 1) - { - - state.CurrentIteration = j; - bodyWithState(j, state); - } - } - else - { - for (int j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop(j)); - j += 1) - { - state.CurrentIteration = j; - localValue = bodyWithLocal(j, state, localValue); - } - } - - // Cooperative multitasking workaround for AppDomain fairness. - // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic - // will detect this, and queue up a replacement task. Note that we don't do this on the root task. - if (!bIsRootTask && loopTimer.LimitExceeded()) - { - currentWorkerTask.SavedStateForNextReplica = (object)currentWorker; - break; - } - - } - // Exit if we can't find new work, or if the loop was stoppped. - while (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) && - ((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE) || - !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal))); - } - catch - { - // if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow - sharedPStateFlags.SetExceptional(); - throw; - } - finally - { - // If a cleanup function was specified, call it. Otherwise, if the type is - // IDisposable, we will invoke Dispose on behalf of the user. - if (localFinally != null && bLocalValueInitialized) - { - localFinally(localValue); - } - - // ETW event for ParallelFor Worker Join - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - } - }, - creationOptions, internalOptions); - - rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); // might throw TSE - rootTask.Wait(); - - // If we made a cancellation registration, we need to clean it up now before observing the OCE - // Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // If we got through that with no exceptions, and we were canceled, then - // throw our cancellation exception - if (oce != null) throw oce; - } - catch (AggregateException aggExp) - { - // if we made a cancellation registration, and rootTask.Wait threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // see if we can combine it into a single OCE. If not propagate the original exception - ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); - throw; - } - catch (TaskSchedulerException) - { - // if we made a cancellation registration, and rootTask.RunSynchronously threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - throw; - } - finally - { - int sb_status = sharedPStateFlags.LoopStateFlags; - result.m_completed = (sb_status == ParallelLoopStateFlags.PLS_NONE); - if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - { - result.m_lowestBreakIteration = sharedPStateFlags.LowestBreakIteration; - } - - if ((rootTask != null) && rootTask.IsCompleted) rootTask.Dispose(); - - // ETW event for Parallel For End - if (TplEtwProvider.Log.IsEnabled()) - { - int nTotalIterations = 0; - - // calculate how many iterations we ran in total - if (sb_status == ParallelLoopStateFlags.PLS_NONE) - nTotalIterations = toExclusive - fromInclusive; - else if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - nTotalIterations = sharedPStateFlags.LowestBreakIteration - fromInclusive; - else - nTotalIterations = -1; //PLS_STOPPED! We can't determine this if we were stopped.. - - TplEtwProvider.Log.ParallelLoopEnd((callingTask != null ? callingTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callingTask != null ? callingTask.Id : 0), - forkJoinContextID, nTotalIterations); - } - } - - return result; - } - - /// - /// Performs the major work of the 64-bit parallel for loop. It assumes that argument validation has already - /// been performed by the caller. This function's whole purpose in life is to enable as much reuse of - /// common implementation details for the various For overloads we offer. Without it, we'd end up - /// with lots of duplicate code. It handles: (1) simple for loops, (2) for loops that depend on - /// ParallelState, and (3) for loops with thread local data. - /// - /// - /// The type of the local data. - /// The loop's start index, inclusive. - /// The loop's end index, exclusive. - /// A ParallelOptions instance. - /// The simple loop body. - /// The loop body for ParallelState overloads. - /// The loop body for thread local state overloads. - /// A selector function that returns new thread local state. - /// A cleanup function to destroy thread local state. - /// Only one of the body arguments may be supplied (i.e. they are exclusive). - /// A structure. - private static ParallelLoopResult ForWorker64( - long fromInclusive, long toExclusive, - ParallelOptions parallelOptions, - Action body, - Action bodyWithState, - Func bodyWithLocal, - Func localInit, Action localFinally) - { - Debug.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + (bodyWithLocal == null ? 0 : 1)) == 1, - "expected exactly one body function to be supplied"); - Debug.Assert(bodyWithLocal != null || (localInit == null && localFinally == null), - "thread local functions should only be supplied for loops w/ thread local bodies"); - - // Instantiate our result. Specifics will be filled in later. - ParallelLoopResult result = new ParallelLoopResult(); - - // We just return immediately if 'to' is smaller (or equal to) 'from'. - if (toExclusive <= fromInclusive) - { - result.m_completed = true; - return result; - } - - // For all loops we need a shared flag even though we don't have a body with state, - // because the shared flag contains the exceptional bool, which triggers other workers - // to exit their loops if one worker catches an exception - ParallelLoopStateFlags64 sharedPStateFlags = new ParallelLoopStateFlags64(); - - TaskCreationOptions creationOptions = TaskCreationOptions.None; - InternalTaskOptions internalOptions = InternalTaskOptions.SelfReplicating; - - // Before getting started, do a quick peek to see if we have been canceled already - if (parallelOptions.CancellationToken.IsCancellationRequested) - { - throw new OperationCanceledException(parallelOptions.CancellationToken); - } - - // initialize ranges with passed in loop arguments and expected number of workers - int numExpectedWorkers = (parallelOptions.EffectiveMaxConcurrencyLevel == -1) ? - PlatformHelper.ProcessorCount : - parallelOptions.EffectiveMaxConcurrencyLevel; - RangeManager rangeManager = new RangeManager(fromInclusive, toExclusive, 1, numExpectedWorkers); - - // Keep track of any cancellations - OperationCanceledException oce = null; - - CancellationTokenRegistration ctr = new CancellationTokenRegistration(); - - // if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr = parallelOptions.CancellationToken.InternalRegisterWithoutEC((o) => - { - // Cause processing to stop - sharedPStateFlags.Cancel(); - // Record our cancellation - oce = new OperationCanceledException(parallelOptions.CancellationToken); - }, null); - } - - // ETW event for Parallel For begin - Task callerTask = null; - int forkJoinContextID = 0; - if (TplEtwProvider.Log.IsEnabled()) - { - forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); - callerTask = Task.InternalCurrent; - TplEtwProvider.Log.ParallelLoopBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelFor, - fromInclusive, toExclusive); - } - - ParallelForReplicatingTask rootTask = null; - - try - { - // this needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel - rootTask = new ParallelForReplicatingTask( - parallelOptions, - delegate - { - // - // first thing we do upon enterying the task is to register as a new "RangeWorker" with the - // shared RangeManager instance. - // - // If this call returns a RangeWorker struct which wraps the state needed by this task - // - // We need to call FindNewWork() on it to see whether there's a chunk available. - // - - // Cache some information about the current task - Task currentWorkerTask = Task.InternalCurrent; - bool bIsRootTask = (currentWorkerTask == rootTask); - - RangeWorker currentWorker = new RangeWorker(); - Object savedStateFromPreviousReplica = currentWorkerTask.SavedStateFromPreviousReplica; - - if (savedStateFromPreviousReplica is RangeWorker) - currentWorker = (RangeWorker)savedStateFromPreviousReplica; - else - currentWorker = rangeManager.RegisterNewWorker(); - - - // These are the local index values to be used in the sequential loop. - // Their values filled in by FindNewWork - long nFromInclusiveLocal; - long nToExclusiveLocal; - - if (currentWorker.FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal) == false || - sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true) - { - return; // no need to run - } - - // ETW event for ParallelFor Worker Fork - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - - TLocal localValue = default(TLocal); - bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't - - try - { - - // Create a new state object that references the shared "stopped" and "exceptional" flags - // If needed, it will contain a new instance of thread-local state by invoking the selector. - ParallelLoopState64 state = null; - - if (bodyWithState != null) - { - Debug.Assert(sharedPStateFlags != null); - state = new ParallelLoopState64(sharedPStateFlags); - } - else if (bodyWithLocal != null) - { - Debug.Assert(sharedPStateFlags != null); - state = new ParallelLoopState64(sharedPStateFlags); - - // If a thread-local selector was supplied, invoke it. Otherwise, use the default. - if (localInit != null) - { - localValue = localInit(); - bLocalValueInitialized = true; - } - } - - // initialize a loop timer which will help us decide whether we should exit early - LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount); - - // Now perform the loop itself. - do - { - if (body != null) - { - for (long j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state - j += 1) - { - body(j); - } - } - else if (bodyWithState != null) - { - for (long j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop(j)); - j += 1) - { - state.CurrentIteration = j; - bodyWithState(j, state); - } - } - else - { - for (long j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop(j)); - j += 1) - { - state.CurrentIteration = j; - localValue = bodyWithLocal(j, state, localValue); - } - } - - // Cooperative multitasking workaround for AppDomain fairness. - // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic - // will detect this, and queue up a replacement task. Note that we don't do this on the root task. - if (!bIsRootTask && loopTimer.LimitExceeded()) - { - currentWorkerTask.SavedStateForNextReplica = (object)currentWorker; - break; - } - } - // Exit if we can't find new work, or if the loop was stoppped. - while (currentWorker.FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal) && - ((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE) || - !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal))); - } - catch - { - // if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow - sharedPStateFlags.SetExceptional(); - throw; - } - finally - { - // If a cleanup function was specified, call it. Otherwise, if the type is - // IDisposable, we will invoke Dispose on behalf of the user. - if (localFinally != null && bLocalValueInitialized) - { - localFinally(localValue); - } - - // ETW event for ParallelFor Worker Join - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - } - }, - creationOptions, internalOptions); - - rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); // might throw TSE - rootTask.Wait(); - - // If we made a cancellation registration, we need to clean it up now before observing the OCE - // Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // If we got through that with no exceptions, and we were canceled, then - // throw our cancellation exception - if (oce != null) throw oce; - } - catch (AggregateException aggExp) - { - // if we made a cancellation registration, and rootTask.Wait threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // see if we can combine it into a single OCE. If not propagate the original exception - ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); - throw; - } - catch (TaskSchedulerException) - { - // if we made a cancellation registration, and rootTask.RunSynchronously threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - throw; - } - finally - { - int sb_status = sharedPStateFlags.LoopStateFlags; - result.m_completed = (sb_status == ParallelLoopStateFlags.PLS_NONE); - if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - { - result.m_lowestBreakIteration = sharedPStateFlags.LowestBreakIteration; - } - - if ((rootTask != null) && rootTask.IsCompleted) rootTask.Dispose(); - - // ETW event for Parallel For End - if (TplEtwProvider.Log.IsEnabled()) - { - long nTotalIterations = 0; - - // calculate how many iterations we ran in total - if (sb_status == ParallelLoopStateFlags.PLS_NONE) - nTotalIterations = toExclusive - fromInclusive; - else if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - nTotalIterations = sharedPStateFlags.LowestBreakIteration - fromInclusive; - else - nTotalIterations = -1; //PLS_STOPPED! We can't determine this if we were stopped.. - - TplEtwProvider.Log.ParallelLoopEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, nTotalIterations); - } - } - - return result; - } - - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// An enumerable data source. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the current element as a parameter. - /// - public static ParallelLoopResult ForEach(IEnumerable source, Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForEachWorker( - source, s_defaultParallelOptions, body, null, null, null, null, null, null); - } - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// An enumerable data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the current element as a parameter. - /// - public static ParallelLoopResult ForEach(IEnumerable source, ParallelOptions parallelOptions, Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker( - source, parallelOptions, body, null, null, null, null, null, null); - } - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// An enumerable data source. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the following parameters: the current element, - /// and a ParallelLoopState instance that may be - /// used to break out of the loop prematurely. - /// - public static ParallelLoopResult ForEach(IEnumerable source, Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForEachWorker( - source, s_defaultParallelOptions, null, body, null, null, null, null, null); - } - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// An enumerable data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the following parameters: the current element, - /// and a ParallelLoopState instance that may be - /// used to break out of the loop prematurely. - /// - public static ParallelLoopResult ForEach(IEnumerable source, ParallelOptions parallelOptions, Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker( - source, parallelOptions, null, body, null, null, null, null, null); - } - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// An enumerable data source. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and the current element's index (an Int64). - /// - public static ParallelLoopResult ForEach(IEnumerable source, Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForEachWorker( - source, s_defaultParallelOptions, null, null, body, null, null, null, null); - } - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// An enumerable data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and the current element's index (an Int64). - /// - public static ParallelLoopResult ForEach(IEnumerable source, ParallelOptions parallelOptions, Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker( - source, parallelOptions, null, null, body, null, null, null, null); - } - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// The type of the thread-local data. - /// An enumerable data source. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult ForEach(IEnumerable source, Func localInit, - Func body, Action localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return ForEachWorker( - source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally); - } - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// The type of the thread-local data. - /// An enumerable data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult ForEach(IEnumerable source, - ParallelOptions parallelOptions, Func localInit, - Func body, Action localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker( - source, parallelOptions, null, null, null, body, null, localInit, localFinally); - } - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// The type of the thread-local data. - /// An enumerable data source. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, the current element's index (an Int64), and some local - /// state that may be shared amongst iterations that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult ForEach(IEnumerable source, Func localInit, - Func body, Action localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return ForEachWorker( - source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally); - } - - /// - /// Executes a for each operation on an - /// in which iterations may run in parallel. - /// - /// The type of the data in the source. - /// The type of the thread-local data. - /// An enumerable data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The delegate is invoked once for each element in the - /// enumerable. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, the current element's index (an Int64), and some local - /// state that may be shared amongst iterations that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult ForEach(IEnumerable source, ParallelOptions parallelOptions, Func localInit, - Func body, Action localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker( - source, parallelOptions, null, null, null, null, body, localInit, localFinally); - } - - - /// - /// Performs the major work of the parallel foreach loop. It assumes that argument validation has - /// already been performed by the caller. This function's whole purpose in life is to enable as much - /// reuse of common implementation details for the various For overloads we offer. Without it, we'd - /// end up with lots of duplicate code. It handles: (1) simple foreach loops, (2) foreach loops that - /// depend on ParallelState, and (3) foreach loops that access indices, (4) foreach loops with thread - /// local data, and any necessary permutations thereof. - /// - /// - /// The type of the source data. - /// The type of the local data. - /// An enumerable data source. - /// ParallelOptions instance to use with this ForEach-loop - /// The simple loop body. - /// The loop body for ParallelState overloads. - /// The loop body for ParallelState/indexed overloads. - /// The loop body for ParallelState/thread local state overloads. - /// The loop body for ParallelState/indexed/thread local state overloads. - /// A selector function that returns new thread local state. - /// A cleanup function to destroy thread local state. - /// Only one of the bodyXX arguments may be supplied (i.e. they are exclusive). - /// A structure. - private static ParallelLoopResult ForEachWorker( - IEnumerable source, - ParallelOptions parallelOptions, - Action body, - Action bodyWithState, - Action bodyWithStateAndIndex, - Func bodyWithStateAndLocal, - Func bodyWithEverything, - Func localInit, Action localFinally) - { - Debug.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + - (bodyWithStateAndIndex == null ? 0 : 1) + (bodyWithStateAndLocal == null ? 0 : 1) + (bodyWithEverything == null ? 0 : 1)) == 1, - "expected exactly one body function to be supplied"); - Debug.Assert((bodyWithStateAndLocal != null) || (bodyWithEverything != null) || (localInit == null && localFinally == null), - "thread local functions should only be supplied for loops w/ thread local bodies"); - - // Before getting started, do a quick peek to see if we have been canceled already - if (parallelOptions.CancellationToken.IsCancellationRequested) - { - throw new OperationCanceledException(parallelOptions.CancellationToken); - } - - // If it's an array, we can use a fast-path that uses ldelems in the IL. - TSource[] sourceAsArray = source as TSource[]; - if (sourceAsArray != null) - { - return ForEachWorker( - sourceAsArray, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, - bodyWithEverything, localInit, localFinally); - } - - // If we can index into the list, we can use a faster code-path that doesn't result in - // contention for the single, shared enumerator object. - IList sourceAsList = source as IList; - if (sourceAsList != null) - { - return ForEachWorker( - sourceAsList, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, - bodyWithEverything, localInit, localFinally); - } - - // This is an honest-to-goodness IEnumerable. Wrap it in a Partitioner and defer to our - // ForEach(Partitioner) logic. - return PartitionerForEachWorker(Partitioner.Create(source), parallelOptions, body, bodyWithState, - bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally); - - } - - /// - /// A fast path for the more general ForEachWorker method above. This uses ldelem instructions to - /// access the individual elements of the array, which will be faster. - /// - /// The type of the source data. - /// The type of the local data. - /// An array data source. - /// The options to use for execution. - /// The simple loop body. - /// The loop body for ParallelState overloads. - /// The loop body for indexed/ParallelLoopState overloads. - /// The loop body for local/ParallelLoopState overloads. - /// The loop body for the most generic overload. - /// A selector function that returns new thread local state. - /// A cleanup function to destroy thread local state. - /// A structure. - private static ParallelLoopResult ForEachWorker( - TSource[] array, - ParallelOptions parallelOptions, - Action body, - Action bodyWithState, - Action bodyWithStateAndIndex, - Func bodyWithStateAndLocal, - Func bodyWithEverything, - Func localInit, Action localFinally) - { - Debug.Assert(array != null); - Debug.Assert(parallelOptions != null, "ForEachWorker(array): parallelOptions is null"); - - int from = array.GetLowerBound(0); - int to = array.GetUpperBound(0) + 1; - - if (body != null) - { - return ForWorker( - from, to, parallelOptions, (i) => body(array[i]), null, null, null, null); - } - else if (bodyWithState != null) - { - return ForWorker( - from, to, parallelOptions, null, (i, state) => bodyWithState(array[i], state), null, null, null); - } - else if (bodyWithStateAndIndex != null) - { - return ForWorker( - from, to, parallelOptions, null, (i, state) => bodyWithStateAndIndex(array[i], state, i), null, null, null); - } - else if (bodyWithStateAndLocal != null) - { - return ForWorker( - from, to, parallelOptions, null, null, (i, state, local) => bodyWithStateAndLocal(array[i], state, local), localInit, localFinally); - } - else - { - return ForWorker( - from, to, parallelOptions, null, null, (i, state, local) => bodyWithEverything(array[i], state, i, local), localInit, localFinally); - } - } - - /// - /// A fast path for the more general ForEachWorker method above. This uses IList<T>'s indexer - /// capabilities to access the individual elements of the list rather than an enumerator. - /// - /// The type of the source data. - /// The type of the local data. - /// A list data source. - /// The options to use for execution. - /// The simple loop body. - /// The loop body for ParallelState overloads. - /// The loop body for indexed/ParallelLoopState overloads. - /// The loop body for local/ParallelLoopState overloads. - /// The loop body for the most generic overload. - /// A selector function that returns new thread local state. - /// A cleanup function to destroy thread local state. - /// A structure. - private static ParallelLoopResult ForEachWorker( - IList list, - ParallelOptions parallelOptions, - Action body, - Action bodyWithState, - Action bodyWithStateAndIndex, - Func bodyWithStateAndLocal, - Func bodyWithEverything, - Func localInit, Action localFinally) - { - Debug.Assert(list != null); - Debug.Assert(parallelOptions != null, "ForEachWorker(list): parallelOptions is null"); - - if (body != null) - { - return ForWorker( - 0, list.Count, parallelOptions, (i) => body(list[i]), null, null, null, null); - } - else if (bodyWithState != null) - { - return ForWorker( - 0, list.Count, parallelOptions, null, (i, state) => bodyWithState(list[i], state), null, null, null); - } - else if (bodyWithStateAndIndex != null) - { - return ForWorker( - 0, list.Count, parallelOptions, null, (i, state) => bodyWithStateAndIndex(list[i], state, i), null, null, null); - } - else if (bodyWithStateAndLocal != null) - { - return ForWorker( - 0, list.Count, parallelOptions, null, null, (i, state, local) => bodyWithStateAndLocal(list[i], state, local), localInit, localFinally); - } - else - { - return ForWorker( - 0, list.Count, parallelOptions, null, null, (i, state, local) => bodyWithEverything(list[i], state, i, local), localInit, localFinally); - } - } - - - - /// - /// Executes a for each operation on a - /// Partitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The Partitioner that contains the original data source. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the Partitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the Partitioner return null. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner does not return - /// the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner returns an IList - /// with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() method in the Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the current element as a parameter. - /// - /// - public static ParallelLoopResult ForEach( - Partitioner source, - Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return PartitionerForEachWorker(source, s_defaultParallelOptions, body, null, null, null, null, null, null); - } - - /// - /// Executes a for each operation on a - /// Partitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The Partitioner that contains the original data source. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the Partitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the Partitioner return null. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner does not return - /// the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner returns an IList - /// with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() method in the Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the following parameters: the current element, - /// and a ParallelLoopState instance that may be - /// used to break out of the loop prematurely. - /// - /// - public static ParallelLoopResult ForEach( - Partitioner source, - Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return PartitionerForEachWorker(source, s_defaultParallelOptions, null, body, null, null, null, null, null); - } - - /// - /// Executes a for each operation on a - /// OrderablePartitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The OrderablePartitioner that contains the original data source. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the OrderablePartitioner returns - /// false. - /// The exception that is thrown when the - /// KeysNormalized property in the OrderablePartitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the OrderablePartitioner return null. - /// The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the - /// OrderablePartitioner do not return the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the - /// OrderablePartitioner return an IList with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the - /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and the current element's index (an Int64). - /// - /// - public static ParallelLoopResult ForEach( - OrderablePartitioner source, - Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - if (!source.KeysNormalized) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized")); - } - - return PartitionerForEachWorker(source, s_defaultParallelOptions, null, null, body, null, null, null, null); - } - - /// - /// Executes a for each operation on a - /// Partitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The type of the thread-local data. - /// The Partitioner that contains the original data source. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the Partitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the Partitioner return null. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner does not return - /// the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner returns an IList - /// with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() method in the Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult ForEach( - Partitioner source, - Func localInit, - Func body, - Action localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return PartitionerForEachWorker(source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally); - } - - /// - /// Executes a for each operation on a - /// OrderablePartitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The type of the thread-local data. - /// The OrderablePartitioner that contains the original data source. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the OrderablePartitioner returns - /// false. - /// The exception that is thrown when the - /// KeysNormalized property in the OrderablePartitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the OrderablePartitioner return null. - /// The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the - /// OrderablePartitioner do not return the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the - /// OrderablePartitioner return an IList with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the - /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, the current element's index (an Int64), and some local - /// state that may be shared amongst iterations that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult ForEach( - OrderablePartitioner source, - Func localInit, - Func body, - Action localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - if (!source.KeysNormalized) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized")); - } - - return PartitionerForEachWorker(source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally); - } - - /// - /// Executes a for each operation on a - /// Partitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The Partitioner that contains the original data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the Partitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the Partitioner return null. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner does not return - /// the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner returns an IList - /// with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() method in the Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the current element as a parameter. - /// - /// - public static ParallelLoopResult ForEach( - Partitioner source, - ParallelOptions parallelOptions, - Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return PartitionerForEachWorker(source, parallelOptions, body, null, null, null, null, null, null); - } - - /// - /// Executes a for each operation on a - /// Partitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The Partitioner that contains the original data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the Partitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the Partitioner return null. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner does not return - /// the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner returns an IList - /// with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() method in the Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the following parameters: the current element, - /// and a ParallelLoopState instance that may be - /// used to break out of the loop prematurely. - /// - /// - public static ParallelLoopResult ForEach( - Partitioner source, - ParallelOptions parallelOptions, - Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return PartitionerForEachWorker(source, parallelOptions, null, body, null, null, null, null, null); - } - - /// - /// Executes a for each operation on a - /// OrderablePartitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The OrderablePartitioner that contains the original data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The delegate that is invoked once per iteration. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the OrderablePartitioner returns - /// false. - /// The exception that is thrown when the - /// KeysNormalized property in the OrderablePartitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the OrderablePartitioner return null. - /// The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the - /// OrderablePartitioner do not return the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the - /// OrderablePartitioner return an IList with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the - /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and the current element's index (an Int64). - /// - /// - public static ParallelLoopResult ForEach( - OrderablePartitioner source, - ParallelOptions parallelOptions, - Action body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - if (!source.KeysNormalized) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized")); - } - - return PartitionerForEachWorker(source, parallelOptions, null, null, body, null, null, null, null); - } - - /// - /// Executes a for each operation on a - /// Partitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The type of the thread-local data. - /// The Partitioner that contains the original data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the Partitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the Partitioner return null. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner does not return - /// the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() method in the Partitioner returns an IList - /// with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() method in the Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult ForEach( - Partitioner source, - ParallelOptions parallelOptions, - Func localInit, - Func body, - Action localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return PartitionerForEachWorker(source, parallelOptions, null, null, null, body, null, localInit, localFinally); - } - - /// - /// Executes a for each operation on a - /// OrderablePartitioner in which iterations may run in parallel. - /// - /// The type of the elements in . - /// The type of the thread-local data. - /// The OrderablePartitioner that contains the original data source. - /// A ParallelOptions - /// instance that configures the behavior of this operation. - /// The function delegate that returns the initial state of the local data - /// for each thread. - /// The delegate that is invoked once per iteration. - /// The delegate that performs a final action on the local state of each - /// thread. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// argument is null. - /// The exception that is thrown when the - /// CancellationToken in the - /// argument is set - /// The exception that is thrown when the - /// SupportsDynamicPartitions property in the OrderablePartitioner returns - /// false. - /// The exception that is thrown when the - /// KeysNormalized property in the OrderablePartitioner returns - /// false. - /// The exception that is thrown when any - /// methods in the OrderablePartitioner return null. - /// The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the - /// OrderablePartitioner do not return the correct number of partitions. - /// The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the - /// OrderablePartitioner return an IList with at least one null value. - /// The exception that is thrown when the - /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the - /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null. - /// The exception that is thrown to contain an exception - /// thrown from one of the specified delegates. - /// The exception that is thrown when the - /// the CancellationTokenSource associated with the - /// the CancellationToken in the - /// has been disposed. - /// A ParallelLoopResult structure - /// that contains information on what portion of the loop completed. - /// - /// - /// The Partitioner is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an - /// OrderablePartitioner. - /// - /// - /// The delegate is invoked once for each element in the - /// Partitioner. It is provided with the following parameters: the current element, - /// a ParallelLoopState instance that may be - /// used to break out of the loop prematurely, the current element's index (an Int64), and some local - /// state that may be shared amongst iterations that execute on the same thread. - /// - /// - /// The delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// - /// - public static ParallelLoopResult ForEach( - OrderablePartitioner source, - ParallelOptions parallelOptions, - Func localInit, - Func body, - Action localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - if (!source.KeysNormalized) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized")); - } - - return PartitionerForEachWorker(source, parallelOptions, null, null, null, null, body, localInit, localFinally); - } - - // Main worker method for Parallel.ForEach() calls w/ Partitioners. - private static ParallelLoopResult PartitionerForEachWorker( - Partitioner source, // Might be OrderablePartitioner - ParallelOptions parallelOptions, - Action simpleBody, - Action bodyWithState, - Action bodyWithStateAndIndex, - Func bodyWithStateAndLocal, - Func bodyWithEverything, - Func localInit, - Action localFinally) - { - Debug.Assert(((simpleBody == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + - (bodyWithStateAndIndex == null ? 0 : 1) + (bodyWithStateAndLocal == null ? 0 : 1) + (bodyWithEverything == null ? 0 : 1)) == 1, - "PartitionForEach: expected exactly one body function to be supplied"); - Debug.Assert((bodyWithStateAndLocal != null) || (bodyWithEverything != null) || (localInit == null && localFinally == null), - "PartitionForEach: thread local functions should only be supplied for loops w/ thread local bodies"); - - OrderablePartitioner orderedSource = source as OrderablePartitioner; - Debug.Assert((orderedSource != null) || (bodyWithStateAndIndex == null && bodyWithEverything == null), - "PartitionForEach: bodies with indices are only allowable for OrderablePartitioner"); - - if (!source.SupportsDynamicPartitions) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_PartitionerNotDynamic")); - } - - // Before getting started, do a quick peek to see if we have been canceled already - if (parallelOptions.CancellationToken.IsCancellationRequested) - { - throw new OperationCanceledException(parallelOptions.CancellationToken); - } - - // ETW event for Parallel For begin - int forkJoinContextID = 0; - Task callerTask = null; - if (TplEtwProvider.Log.IsEnabled()) - { - forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); - callerTask = Task.InternalCurrent; - TplEtwProvider.Log.ParallelLoopBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelForEach, - 0, 0); - } - - // For all loops we need a shared flag even though we don't have a body with state, - // because the shared flag contains the exceptional bool, which triggers other workers - // to exit their loops if one worker catches an exception - ParallelLoopStateFlags64 sharedPStateFlags = new ParallelLoopStateFlags64(); - - // Instantiate our result. Specifics will be filled in later. - ParallelLoopResult result = new ParallelLoopResult(); - - // Keep track of any cancellations - OperationCanceledException oce = null; - - CancellationTokenRegistration ctr = new CancellationTokenRegistration(); - - // if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr = parallelOptions.CancellationToken.InternalRegisterWithoutEC((o) => - { - // Cause processing to stop - sharedPStateFlags.Cancel(); - // Record our cancellation - oce = new OperationCanceledException(parallelOptions.CancellationToken); - }, null); - } - - // Get our dynamic partitioner -- depends on whether source is castable to OrderablePartitioner - // Also, do some error checking. - IEnumerable partitionerSource = null; - IEnumerable> orderablePartitionerSource = null; - if (orderedSource != null) - { - orderablePartitionerSource = orderedSource.GetOrderableDynamicPartitions(); - if (orderablePartitionerSource == null) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_PartitionerReturnedNull")); - } - } - else - { - partitionerSource = source.GetDynamicPartitions(); - if (partitionerSource == null) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_PartitionerReturnedNull")); - } - } - - ParallelForReplicatingTask rootTask = null; - - // This is the action that will be run by each replicable task. - Action partitionAction = delegate - { - Task currentWorkerTask = Task.InternalCurrent; - - // ETW event for ParallelForEach Worker Fork - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - - TLocal localValue = default(TLocal); - bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't - IDisposable myPartitionToDispose = null; - - try - { - // Create a new state object that references the shared "stopped" and "exceptional" flags. - // If needed, it will contain a new instance of thread-local state by invoking the selector. - ParallelLoopState64 state = null; - - if (bodyWithState != null || bodyWithStateAndIndex != null) - { - state = new ParallelLoopState64(sharedPStateFlags); - } - else if (bodyWithStateAndLocal != null || bodyWithEverything != null) - { - state = new ParallelLoopState64(sharedPStateFlags); - // If a thread-local selector was supplied, invoke it. Otherwise, stick with the default. - if (localInit != null) - { - localValue = localInit(); - bLocalValueInitialized = true; - } - } - - - bool bIsRootTask = (rootTask == currentWorkerTask); - - // initialize a loop timer which will help us decide whether we should exit early - LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount); - - if (orderedSource != null) - { - // Use this path for OrderablePartitioner - - - // first check if there's saved state from a previous replica that we might be replacing. - // the only state to be passed down in such a transition is the enumerator - IEnumerator> myPartition = currentWorkerTask.SavedStateFromPreviousReplica as IEnumerator>; - if (myPartition == null) - { - // apparently we're a brand new replica, get a fresh enumerator from the partitioner - myPartition = orderablePartitionerSource.GetEnumerator(); - if (myPartition == null) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_NullEnumerator")); - } - } - myPartitionToDispose = myPartition; - - while (myPartition.MoveNext()) - { - KeyValuePair kvp = myPartition.Current; - long index = kvp.Key; - TSource value = kvp.Value; - - // Update our iteration index - if (state != null) state.CurrentIteration = index; - - if (simpleBody != null) - simpleBody(value); - else if (bodyWithState != null) - bodyWithState(value, state); - else if (bodyWithStateAndIndex != null) - bodyWithStateAndIndex(value, state, index); - else if (bodyWithStateAndLocal != null) - localValue = bodyWithStateAndLocal(value, state, localValue); - else - localValue = bodyWithEverything(value, state, index, localValue); - - if (sharedPStateFlags.ShouldExitLoop(index)) break; - - // Cooperative multitasking workaround for AppDomain fairness. - // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic - // will detect this, and queue up a replacement task. Note that we don't do this on the root task. - if (!bIsRootTask && loopTimer.LimitExceeded()) - { - currentWorkerTask.SavedStateForNextReplica = myPartition; - myPartitionToDispose = null; - break; - } - } - - } - else - { - // Use this path for Partitioner that is not OrderablePartitioner - - // first check if there's saved state from a previous replica that we might be replacing. - // the only state to be passed down in such a transition is the enumerator - IEnumerator myPartition = currentWorkerTask.SavedStateFromPreviousReplica as IEnumerator; - if (myPartition == null) - { - // apparently we're a brand new replica, get a fresh enumerator from the partitioner - myPartition = partitionerSource.GetEnumerator(); - if (myPartition == null) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_NullEnumerator")); - } - } - myPartitionToDispose = myPartition; - - // I'm not going to try to maintain this - if (state != null) - state.CurrentIteration = 0; - - while (myPartition.MoveNext()) - { - TSource t = myPartition.Current; - - if (simpleBody != null) - simpleBody(t); - else if (bodyWithState != null) - bodyWithState(t, state); - else if (bodyWithStateAndLocal != null) - localValue = bodyWithStateAndLocal(t, state, localValue); - else - Debug.Assert(false, "PartitionerForEach: illegal body type in Partitioner handler"); - - - // Any break, stop or exception causes us to halt - // We don't have the global indexing information to discriminate whether or not - // we are before or after a break point. - if (sharedPStateFlags.LoopStateFlags != ParallelLoopStateFlags.PLS_NONE) - break; - - // Cooperative multitasking workaround for AppDomain fairness. - // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic - // will detect this, and queue up a replacement task. Note that we don't do this on the root task. - if (!bIsRootTask && loopTimer.LimitExceeded()) - { - currentWorkerTask.SavedStateForNextReplica = myPartition; - myPartitionToDispose = null; - break; - } - } - } - } - catch - { - // Inform other tasks of the exception, then rethrow - sharedPStateFlags.SetExceptional(); - throw; - } - finally - { - if (localFinally != null && bLocalValueInitialized) - { - localFinally(localValue); - } - - if (myPartitionToDispose != null) - { - myPartitionToDispose.Dispose(); - } - - // ETW event for ParallelFor Worker Join - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - } - }; - - try - { - // Create and start the self-replicating task. - // This needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel - rootTask = new ParallelForReplicatingTask(parallelOptions, partitionAction, TaskCreationOptions.None, - InternalTaskOptions.SelfReplicating); - - // And process it's completion... - // Moved inside try{} block because faulty scheduler may throw here. - rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); - - rootTask.Wait(); - - // If we made a cancellation registration, we need to clean it up now before observing the OCE - // Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // If we got through that with no exceptions, and we were canceled, then - // throw our cancellation exception - if (oce != null) throw oce; - } - catch (AggregateException aggExp) - { - // if we made a cancellation registration, and rootTask.Wait threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // see if we can combine it into a single OCE. If not propagate the original exception - ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); - throw; - } - catch (TaskSchedulerException) - { - // if we made a cancellation registration, and either we threw an exception constructing rootTask or - // rootTask.RunSynchronously threw, we need to clean it up here. - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - throw; - } - finally - { - int sb_status = sharedPStateFlags.LoopStateFlags; - result.m_completed = (sb_status == ParallelLoopStateFlags.PLS_NONE); - if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - { - result.m_lowestBreakIteration = sharedPStateFlags.LowestBreakIteration; - } - - if ((rootTask != null) && rootTask.IsCompleted) rootTask.Dispose(); - - //dispose the partitioner source if it implements IDisposable - IDisposable d = null; - if (orderablePartitionerSource != null) - { - d = orderablePartitionerSource as IDisposable; - } - else - { - d = partitionerSource as IDisposable; - } - - if (d != null) - { - d.Dispose(); - } - - // ETW event for Parallel For End - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelLoopEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, 0); - } - } - - return result; - } - - /// - /// Internal utility function that implements the OCE filtering behavior for all Parallel.* APIs. - /// Throws a single OperationCancelledException object with the token if the Exception collection only contains - /// OperationCancelledExceptions with the given CancellationToken. - /// - /// - /// The exception collection to filter - /// The CancellationToken expected on all inner exceptions - /// - internal static void ThrowIfReducableToSingleOCE(IEnumerable excCollection, CancellationToken ct) - { - bool bCollectionNotZeroLength = false; - if (ct.IsCancellationRequested) - { - foreach (Exception e in excCollection) - { - bCollectionNotZeroLength = true; - OperationCanceledException oce = e as OperationCanceledException; - if (oce == null || oce.CancellationToken != ct) - { - // mismatch found - return; - } - } - - // all exceptions are OCEs with this token, let's throw it - if (bCollectionNotZeroLength) throw new OperationCanceledException(ct); - } - } - - internal struct LoopTimer - { - public LoopTimer(int nWorkerTaskIndex) - { - // This logic ensures that we have a diversity of timeouts across worker tasks (100, 150, 200, 250, 100, etc) - // Otherwise all worker will try to timeout at precisely the same point, which is bad if the work is just about to finish - int timeOut = s_BaseNotifyPeriodMS + (nWorkerTaskIndex % PlatformHelper.ProcessorCount) * s_NotifyPeriodIncrementMS; - - m_timeLimit = Environment.TickCount + timeOut; - } - - public bool LimitExceeded() - { - Debug.Assert(m_timeLimit != 0, "Probably the default initializer for LoopTimer was used somewhere"); - - // comparing against the next expected time saves an addition operation here - // Also we omit the comparison for wrap around here. The only side effect is one extra early yield every 38 days. - return (Environment.TickCount > m_timeLimit); - } - - const int s_BaseNotifyPeriodMS = 100; - const int s_NotifyPeriodIncrementMS = 50; - - private int m_timeLimit; - } - - } - -} diff --git a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/ParallelLoopState.cs b/src/coreclr/src/mscorlib/src/System/Threading/Tasks/ParallelLoopState.cs deleted file mode 100644 index 6a62cf8..0000000 --- a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/ParallelLoopState.cs +++ /dev/null @@ -1,641 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -// -// ParallelState.cs -// -// -// A non-generic and generic parallel state class, used by the Parallel helper class -// for parallel loop management. -// -// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - -using System.Diagnostics; -using System.Security.Permissions; -using System.Diagnostics.Contracts; - -// Prevents compiler warnings/errors regarding the use of ref params in Interlocked methods -#pragma warning disable 0420 - -namespace System.Threading.Tasks -{ - - /// - /// Enables iterations of loops to interact with - /// other iterations. - /// - [DebuggerDisplay("ShouldExitCurrentIteration = {ShouldExitCurrentIteration}")] - public class ParallelLoopState - { - // Derived classes will track a ParallelStateFlags32 or ParallelStateFlags64. - // So this is slightly redundant, but it enables us to implement some - // methods in this base class. - private ParallelLoopStateFlags m_flagsBase; - - internal ParallelLoopState(ParallelLoopStateFlags fbase) - { - m_flagsBase = fbase; - } - - /// - /// Internal/virtual support for ShouldExitCurrentIteration. - /// - internal virtual bool InternalShouldExitCurrentIteration - { - get - { - Debug.Assert(false); - throw new NotSupportedException( - Environment.GetResourceString("ParallelState_NotSupportedException_UnsupportedMethod")); - } - } - - /// - /// Gets whether the current iteration of the loop should exit based - /// on requests made by this or other iterations. - /// - /// - /// When an iteration of a loop calls or , or - /// when one throws an exception, or when the loop is canceled, the class will proactively - /// attempt to prohibit additional iterations of the loop from starting execution. - /// However, there may be cases where it is unable to prevent additional iterations from starting. - /// It may also be the case that a long-running iteration has already begun execution. In such - /// cases, iterations may explicitly check the property and - /// cease execution if the property returns true. - /// - public bool ShouldExitCurrentIteration - { - get - { - return InternalShouldExitCurrentIteration; - } - } - - /// - /// Gets whether any iteration of the loop has called . - /// - public bool IsStopped - { - get - { - return ((m_flagsBase.LoopStateFlags & ParallelLoopStateFlags.PLS_STOPPED) != 0); - } - } - - /// - /// Gets whether any iteration of the loop has thrown an exception that went unhandled by that - /// iteration. - /// - public bool IsExceptional - { - get - { - return ((m_flagsBase.LoopStateFlags & ParallelLoopStateFlags.PLS_EXCEPTIONAL) != 0); - } - } - - /// - /// Internal/virtual support for LowestBreakIteration. - /// - internal virtual long? InternalLowestBreakIteration - { - get - { - Debug.Assert(false); - throw new NotSupportedException( - Environment.GetResourceString("ParallelState_NotSupportedException_UnsupportedMethod")); - } - } - - /// - /// Gets the lowest iteration of the loop from which was called. - /// - /// - /// If no iteration of the loop called , this property will return null. - /// - public long? LowestBreakIteration - { - get - { - return InternalLowestBreakIteration; - } - } - - /// - /// Communicates that the loop should cease execution at the system's earliest - /// convenience. - /// - /// - /// The method was previously called. and may not be used in combination by iterations of the same loop. - /// - /// - /// - /// may be used to communicate to the loop that no other iterations need be run. - /// For long-running iterations that may already be executing, causes - /// to return true for all other iterations of the loop, such that another iteration may check and exit early if it's observed to be true. - /// - /// - /// is typically employed in search-based algorithms, where once a result is found, - /// no other iterations need be executed. - /// - /// - public void Stop() - { - m_flagsBase.Stop(); - } - - // Internal/virtual support for Break(). - internal virtual void InternalBreak() - { - Debug.Assert(false); - throw new NotSupportedException( - Environment.GetResourceString("ParallelState_NotSupportedException_UnsupportedMethod")); - } - - /// - /// Communicates that the loop should cease execution at the system's earliest - /// convenience of iterations beyond the current iteration. - /// - /// - /// The method was previously called. and - /// may not be used in combination by iterations of the same loop. - /// - /// - /// - /// may be used to communicate to the loop that no other iterations after the - /// current iteration need be run. For example, if is called from the 100th - /// iteration of a for loop iterating in parallel from 0 to 1000, all iterations less than 100 should - /// still be run, but the iterations from 101 through to 1000 are not necessary. - /// - /// - /// For long-running iterations that may already be executing, causes - /// to be set to the current iteration's index if the current index is less than the current value of - /// . - /// - /// - /// is typically employed in search-based algorithms where an ordering is - /// present in the data source. - /// - /// - public void Break() - { - InternalBreak(); - } - - // Helper method to avoid repeating Break() logic between ParallelState32 and ParallelState32 - internal static void Break(int iteration, ParallelLoopStateFlags32 pflags) - { - int oldValue = ParallelLoopStateFlags.PLS_NONE; - - // Attempt to change state from "not stopped or broken or canceled or exceptional" to "broken". - if (!pflags.AtomicLoopStateUpdate(ParallelLoopStateFlags.PLS_BROKEN, - ParallelLoopStateFlags.PLS_STOPPED | ParallelLoopStateFlags.PLS_EXCEPTIONAL | ParallelLoopStateFlags.PLS_CANCELED, - ref oldValue)) - { - - // If we were already stopped, we have a problem - if ((oldValue & ParallelLoopStateFlags.PLS_STOPPED) != 0) - { - throw new InvalidOperationException( - Environment.GetResourceString("ParallelState_Break_InvalidOperationException_BreakAfterStop")); - } - else - { - // Apparently we previously got cancelled or became exceptional. No action necessary - return; - } - } - - // replace shared LowestBreakIteration with CurrentIteration, but only if CurrentIteration - // is less than LowestBreakIteration. - int oldLBI = pflags.m_lowestBreakIteration; - if (iteration < oldLBI) - { - SpinWait wait = new SpinWait(); - while (Interlocked.CompareExchange( - ref pflags.m_lowestBreakIteration, - iteration, - oldLBI) != oldLBI) - { - wait.SpinOnce(); - oldLBI = pflags.m_lowestBreakIteration; - if (iteration > oldLBI) break; - } - } - - } - - // Helper method to avoid repeating Break() logic between ParallelState64 and ParallelState64 - internal static void Break(long iteration, ParallelLoopStateFlags64 pflags) - { - int oldValue = ParallelLoopStateFlags.PLS_NONE; - - // Attempt to change state from "not stopped or broken or canceled or exceptional" to "broken". - if (!pflags.AtomicLoopStateUpdate(ParallelLoopStateFlags.PLS_BROKEN, - ParallelLoopStateFlags.PLS_STOPPED | ParallelLoopStateFlags.PLS_EXCEPTIONAL | ParallelLoopStateFlags.PLS_CANCELED, - ref oldValue)) - { - - // If we were already stopped, we have a problem - if ((oldValue & ParallelLoopStateFlags.PLS_STOPPED) != 0) - { - throw new InvalidOperationException( - Environment.GetResourceString("ParallelState_Break_InvalidOperationException_BreakAfterStop")); - } - else - { - // Apparently we previously got cancelled or became exceptional. No action necessary - return; - } - } - - // replace shared LowestBreakIteration with CurrentIteration, but only if CurrentIteration - // is less than LowestBreakIteration. - long oldLBI = pflags.LowestBreakIteration; - if (iteration < oldLBI) - { - SpinWait wait = new SpinWait(); - while (Interlocked.CompareExchange( - ref pflags.m_lowestBreakIteration, - iteration, - oldLBI) != oldLBI) - { - wait.SpinOnce(); - oldLBI = pflags.LowestBreakIteration; - if (iteration > oldLBI) break; - } - } - - } - } - - internal class ParallelLoopState32 : ParallelLoopState - { - private ParallelLoopStateFlags32 m_sharedParallelStateFlags; - private int m_currentIteration = 0; - - /// - /// Internal constructor to ensure an instance isn't created by users. - /// - /// A flag shared among all threads participating - /// in the execution of a certain loop. - internal ParallelLoopState32(ParallelLoopStateFlags32 sharedParallelStateFlags) - : base(sharedParallelStateFlags) - { - m_sharedParallelStateFlags = sharedParallelStateFlags; - } - - /// - /// Tracks the current loop iteration for the owning task. - /// This is used to compute whether or not the task should - /// terminate early due to a Break() call. - /// - internal int CurrentIteration { - get { return m_currentIteration; } - set { m_currentIteration = value; } - } - - /// - /// Returns true if we should be exiting from the current iteration - /// due to Stop(), Break() or exception. - /// - internal override bool InternalShouldExitCurrentIteration - { - get { return m_sharedParallelStateFlags.ShouldExitLoop(CurrentIteration); } - } - - /// - /// Returns the lowest iteration at which Break() has been called, or - /// null if Break() has not yet been called. - /// - internal override long? InternalLowestBreakIteration - { - get {return m_sharedParallelStateFlags.NullableLowestBreakIteration; } - } - - /// - /// Communicates that parallel tasks should stop when they reach a specified iteration element. - /// (which is CurrentIteration of the caller). - /// - /// Break() called after Stop(). - /// - /// This is shared with all other concurrent threads in the system which are participating in the - /// loop's execution. After calling Break(), no additional iterations will be executed on - /// the current thread, and other worker threads will execute once they get beyond the calling iteration. - /// - internal override void InternalBreak() - { - ParallelLoopState.Break(CurrentIteration, m_sharedParallelStateFlags); - } - } - - /// - /// Allows independent iterations of a parallel loop to interact with other iterations. - /// - internal class ParallelLoopState64 : ParallelLoopState - { - private ParallelLoopStateFlags64 m_sharedParallelStateFlags; - private long m_currentIteration = 0; - - /// - /// Internal constructor to ensure an instance isn't created by users. - /// - /// A flag shared among all threads participating - /// in the execution of a certain loop. - internal ParallelLoopState64(ParallelLoopStateFlags64 sharedParallelStateFlags) - : base(sharedParallelStateFlags) - { - m_sharedParallelStateFlags = sharedParallelStateFlags; - } - - /// - /// Tracks the current loop iteration for the owning task. - /// This is used to compute whether or not the task should - /// terminate early due to a Break() call. - /// - internal long CurrentIteration - { - // No interlocks needed, because this value is only accessed in a single thread. - get {return m_currentIteration;} - set {m_currentIteration = value; } - } - - /// - /// Returns true if we should be exiting from the current iteration - /// due to Stop(), Break() or exception. - /// - internal override bool InternalShouldExitCurrentIteration - { - get { return m_sharedParallelStateFlags.ShouldExitLoop(CurrentIteration); } - } - - /// - /// Returns the lowest iteration at which Break() has been called, or - /// null if Break() has not yet been called. - /// - internal override long? InternalLowestBreakIteration - { - // We don't need to worry about torn read/write here because - // ParallelStateFlags64.LowestBreakIteration property is protected - // by an Interlocked.Read(). - get { return m_sharedParallelStateFlags.NullableLowestBreakIteration; } - } - - /// - /// Communicates that parallel tasks should stop when they reach a specified iteration element. - /// (which is CurrentIteration of the caller). - /// - /// Break() called after Stop(). - /// - /// Atomically sets shared StoppedBroken flag to BROKEN, then atomically sets shared - /// LowestBreakIteration to CurrentIteration, but only if CurrentIteration is less than - /// LowestBreakIteration. - /// - internal override void InternalBreak() - { - ParallelLoopState.Break(CurrentIteration, m_sharedParallelStateFlags); - } - - } - - /// - /// State information that is common between ParallelStateFlags class - /// and ParallelStateFlags64 class. - /// - internal class ParallelLoopStateFlags - { - internal static int PLS_NONE; - internal static int PLS_EXCEPTIONAL = 1; - internal static int PLS_BROKEN = 2; - internal static int PLS_STOPPED = 4; - internal static int PLS_CANCELED = 8; - - private volatile int m_LoopStateFlags = PLS_NONE; - - internal int LoopStateFlags - { - get { return m_LoopStateFlags; } - } - - internal bool AtomicLoopStateUpdate(int newState, int illegalStates) - { - int oldState = 0; - return AtomicLoopStateUpdate(newState, illegalStates, ref oldState); - } - - internal bool AtomicLoopStateUpdate(int newState, int illegalStates, ref int oldState) - { - SpinWait sw = new SpinWait(); - - do - { - oldState = m_LoopStateFlags; - if ((oldState & illegalStates) != 0) return false; - if (Interlocked.CompareExchange(ref m_LoopStateFlags, oldState | newState, oldState) == oldState) - { - return true; - } - sw.SpinOnce(); - } while (true); - - } - - internal void SetExceptional() - { - // we can set the exceptional flag regardless of the state of other bits. - AtomicLoopStateUpdate(PLS_EXCEPTIONAL, PLS_NONE); - } - - internal void Stop() - { - // disallow setting of PLS_STOPPED bit only if PLS_BROKEN was already set - if (!AtomicLoopStateUpdate(PLS_STOPPED, PLS_BROKEN)) - { - throw new InvalidOperationException( - Environment.GetResourceString("ParallelState_Stop_InvalidOperationException_StopAfterBreak")); - } - } - - // Returns true if StoppedBroken is updated to PLS_CANCELED. - internal bool Cancel() - { - // we can set the canceled flag regardless of the state of other bits. - return (AtomicLoopStateUpdate(PLS_CANCELED, PLS_NONE)); - } - } - - /// - /// An internal class used to share accounting information in 32-bit versions - /// of For()/ForEach() loops. - /// - internal class ParallelLoopStateFlags32 : ParallelLoopStateFlags - { - // Records the lowest iteration at which a Break() has been called, - // or Int32.MaxValue if no break has been called. Used directly - // by Break(). - internal volatile int m_lowestBreakIteration = Int32.MaxValue; - - // Not strictly necessary, but maintains consistency with ParallelStateFlags64 - internal int LowestBreakIteration - { - get { return m_lowestBreakIteration; } - } - - // Does some processing to convert m_lowestBreakIteration to a long?. - internal long? NullableLowestBreakIteration - { - get - { - if (m_lowestBreakIteration == Int32.MaxValue) return null; - else - { - // protect against torn read of 64-bit value - long rval = m_lowestBreakIteration; - if (IntPtr.Size >= 8) return rval; - else return Interlocked.Read(ref rval); - } - } - } - - - /// - /// Lets the caller know whether or not to prematurely exit the For/ForEach loop. - /// If this returns true, then exit the loop. Otherwise, keep going. - /// - /// The caller's current iteration point - /// in the loop. - /// - /// The loop should exit on any one of the following conditions: - /// (1) Stop() has been called by one or more tasks. - /// (2) An exception has been raised by one or more tasks. - /// (3) Break() has been called by one or more tasks, and - /// CallerIteration exceeds the (lowest) iteration at which - /// Break() was called. - /// (4) The loop was canceled. - /// - internal bool ShouldExitLoop(int CallerIteration) - { - int flags = LoopStateFlags; - return (flags != PLS_NONE && ( - ((flags & (PLS_EXCEPTIONAL | PLS_STOPPED | PLS_CANCELED)) != 0) || - (((flags & PLS_BROKEN) != 0) && (CallerIteration > LowestBreakIteration)))); - } - - // This lighter version of ShouldExitLoop will be used when the body type doesn't contain a state. - // Since simpler bodies cannot stop or break, we can safely skip checks for those flags here. - internal bool ShouldExitLoop() - { - int flags = LoopStateFlags; - return ((flags != PLS_NONE) && ((flags & (PLS_EXCEPTIONAL | PLS_CANCELED)) != 0)); - } - } - - /// - /// An internal class used to share accounting information in 64-bit versions - /// of For()/ForEach() loops. - /// - internal class ParallelLoopStateFlags64 : ParallelLoopStateFlags - { - // Records the lowest iteration at which a Break() has been called, - // or Int64.MaxValue if no break has been called. Used directly - // by Break(). - internal long m_lowestBreakIteration = Int64.MaxValue; - - // Performs a conditionally interlocked read of m_lowestBreakIteration. - internal long LowestBreakIteration - { - get - { - if (IntPtr.Size >= 8) return m_lowestBreakIteration; - else return Interlocked.Read(ref m_lowestBreakIteration); - } - } - - // Does some processing to convert m_lowestBreakIteration to a long?. - internal long? NullableLowestBreakIteration - { - get - { - if (m_lowestBreakIteration == Int64.MaxValue) return null; - else - { - if (IntPtr.Size >= 8) return m_lowestBreakIteration; - else return Interlocked.Read(ref m_lowestBreakIteration); - } - } - } - - /// - /// Lets the caller know whether or not to prematurely exit the For/ForEach loop. - /// If this returns true, then exit the loop. Otherwise, keep going. - /// - /// The caller's current iteration point - /// in the loop. - /// - /// The loop should exit on any one of the following conditions: - /// (1) Stop() has been called by one or more tasks. - /// (2) An exception has been raised by one or more tasks. - /// (3) Break() has been called by one or more tasks, and - /// CallerIteration exceeds the (lowest) iteration at which - /// Break() was called. - /// (4) The loop has been canceled. - /// - internal bool ShouldExitLoop(long CallerIteration) - { - int flags = LoopStateFlags; - return (flags != PLS_NONE && ( - ((flags & (PLS_EXCEPTIONAL | PLS_STOPPED | PLS_CANCELED)) != 0) || - (((flags & PLS_BROKEN) != 0) && (CallerIteration > LowestBreakIteration)))); - } - - // This lighter version of ShouldExitLoop will be used when the body type doesn't contain a state. - // Since simpler bodies cannot stop or break, we can safely skip checks for those flags here. - internal bool ShouldExitLoop() - { - int flags = LoopStateFlags; - return ((flags != PLS_NONE) && ((flags & (PLS_EXCEPTIONAL | PLS_CANCELED)) != 0)); - } - } - - /// - /// Provides completion status on the execution of a loop. - /// - /// - /// If returns true, then the loop ran to completion, such that all iterations - /// of the loop were executed. If returns false and returns null, a call to was used to end the loop prematurely. If returns false and returns a non-null integral - /// value, was used to end the loop prematurely. - /// - public struct ParallelLoopResult - { - internal bool m_completed; - internal long? m_lowestBreakIteration; - - /// - /// Gets whether the loop ran to completion, such that all iterations of the loop were executed - /// and the loop didn't receive a request to end prematurely. - /// - public bool IsCompleted { get { return m_completed; } } - - /// - /// Gets the index of the lowest iteration from which - /// was called. - /// - /// - /// If was not employed, this property will - /// return null. - /// - public long? LowestBreakIteration { get { return m_lowestBreakIteration; } } - } - -} - -#pragma warning restore 0420 diff --git a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs b/src/coreclr/src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs deleted file mode 100644 index 49f61a6..0000000 --- a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs +++ /dev/null @@ -1,279 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -// -// -// -// Implements the algorithm for distributing loop indices to parallel loop workers -// -// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - -using System; -using System.Threading; -using System.Diagnostics; -using System.Diagnostics.Contracts; - -#pragma warning disable 0420 - -namespace System.Threading.Tasks -{ - /// - /// Represents an index range - /// - internal struct IndexRange - { - // the From and To values for this range. These do not change. - internal long m_nFromInclusive; - internal long m_nToExclusive; - - // The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual - // value saves us from overflows that can happen due to multiple workers racing to increment this. - // All updates to this field need to be interlocked. - internal volatile Shared m_nSharedCurrentIndexOffset; - - // to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here. - internal int m_bRangeFinished; - } - - - /// - /// The RangeWorker struct wraps the state needed by a task that services the parallel loop - /// - internal struct RangeWorker - { - // reference to the IndexRange array allocated by the range manager - internal readonly IndexRange[] m_indexRanges; - - // index of the current index range that this worker is grabbing chunks from - internal int m_nCurrentIndexRange; - - // the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager) - internal long m_nStep; - - // increment value is the current amount that this worker will use - // to increment the shared index of the range it's working on - internal long m_nIncrementValue; - - // the increment value is doubled each time this worker finds work, and is capped at this value - internal readonly long m_nMaxIncrementValue; - - /// - /// Initializes a RangeWorker struct - /// - internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep) - { - m_indexRanges = ranges; - m_nCurrentIndexRange = nInitialRange; - m_nStep = nStep; - - m_nIncrementValue = nStep; - - m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep; - } - - /// - /// Implements the core work search algorithm that will be used for this range worker. - /// - /// - /// Usage pattern is: - /// 1) the thread associated with this rangeworker calls FindNewWork - /// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values - /// to execute the sequential loop - /// 3) if we return false it means there is no more work left. It's time to quit. - /// - internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal) - { - // since we iterate over index ranges circularly, we will use the - // count of visited ranges as our exit condition - int numIndexRangesToVisit = m_indexRanges.Length; - - do - { - // local snap to save array access bounds checks in places where we only read fields - IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange]; - - if (currentRange.m_bRangeFinished == 0) - { - if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null) - { - Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared(0), null); - } - - // this access needs to be on the array slot - long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value, - m_nIncrementValue) - m_nIncrementValue; - - if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset) - { - // we found work - - nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset; - nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue; - - // Check for going past end of range, or wrapping - if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) ) - { - nToExclusiveLocal = currentRange.m_nToExclusive; - } - - // We will double our unit of increment until it reaches the maximum. - if (m_nIncrementValue < m_nMaxIncrementValue) - { - m_nIncrementValue *= 2; - if (m_nIncrementValue > m_nMaxIncrementValue) - { - m_nIncrementValue = m_nMaxIncrementValue; - } - } - - return true; - } - else - { - // this index range is completed, mark it so that others can skip it quickly - Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1); - } - } - - // move on to the next index range, in circular order. - m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length; - numIndexRangesToVisit--; - - } while (numIndexRangesToVisit > 0); - // we've visited all index ranges possible => there's no work remaining - - nFromInclusiveLocal = 0; - nToExclusiveLocal = 0; - - return false; - } - - - /// - /// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values. - /// - internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32) - { - long nFromInclusiveLocal; - long nToExclusiveLocal; - - bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal); - - Debug.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) && - (nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue)); - - // convert to 32 bit before returning - nFromInclusiveLocal32 = (int)nFromInclusiveLocal; - nToExclusiveLocal32 = (int)nToExclusiveLocal; - - return bRetVal; - } - } - - - /// - /// Represents the entire loop operation, keeping track of workers and ranges. - /// - /// - /// The usage pattern is: - /// 1) The Parallel loop entry function (ForWorker) creates an instance of this class - /// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a - /// RangeWorker struct to wrap the state it will need to find and execute work, - /// and they keep interacting with that struct until the end of the loop - internal class RangeManager - { - internal readonly IndexRange[] m_indexRanges; - - internal int m_nCurrentIndexRangeToAssign; - internal long m_nStep; - - /// - /// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges - /// - internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers) - { - m_nCurrentIndexRangeToAssign = 0; - m_nStep = nStep; - - // Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2. - if (nNumExpectedWorkers == 1) - nNumExpectedWorkers = 2; - - // - // calculate the size of each index range - // - - ulong uSpan = (ulong)(nToExclusive - nFromInclusive); - ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first - - uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep - // otherwise index range transitions will derail us from nStep - - if (uRangeSize == 0) - { - uRangeSize = (ulong) nStep; - } - - // - // find the actual number of index ranges we will need - // - Debug.Assert((uSpan / uRangeSize) < Int32.MaxValue); - - int nNumRanges = (int)(uSpan / uRangeSize); - - if (uSpan % uRangeSize != 0) - { - nNumRanges++; - } - - - // Convert to signed so the rest of the logic works. - // Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2. - long nRangeSize = (long)uRangeSize; - - // allocate the array of index ranges - m_indexRanges = new IndexRange[nNumRanges]; - - long nCurrentIndex = nFromInclusive; - for (int i = 0; i < nNumRanges; i++) - { - // the fromInclusive of the new index range is always on nCurrentIndex - m_indexRanges[i].m_nFromInclusive = nCurrentIndex; - m_indexRanges[i].m_nSharedCurrentIndexOffset = null; - m_indexRanges[i].m_bRangeFinished = 0; - - // now increment it to find the toExclusive value for our range - nCurrentIndex += nRangeSize; - - // detect integer overflow or range overage and snap to nToExclusive - if (nCurrentIndex < nCurrentIndex - nRangeSize || - nCurrentIndex > nToExclusive) - { - // this should only happen at the last index - Debug.Assert(i == nNumRanges - 1); - - nCurrentIndex = nToExclusive; - } - - // now that the end point of the new range is calculated, assign it. - m_indexRanges[i].m_nToExclusive = nCurrentIndex; - } - } - - /// - /// The function that needs to be called by each new worker thread servicing the parallel loop - /// in order to get a RangeWorker struct that wraps the state for finding and executing indices - /// - internal RangeWorker RegisterNewWorker() - { - Debug.Assert(m_indexRanges != null && m_indexRanges.Length != 0); - - int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length; - - return new RangeWorker(m_indexRanges, nInitialRange, m_nStep); - } - } -} -#pragma warning restore 0420 diff --git a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/Task.cs b/src/coreclr/src/mscorlib/src/System/Threading/Tasks/Task.cs index cf081f7..24a57fa 100644 --- a/src/coreclr/src/mscorlib/src/System/Threading/Tasks/Task.cs +++ b/src/coreclr/src/mscorlib/src/System/Threading/Tasks/Task.cs @@ -6693,102 +6693,6 @@ namespace System.Threading.Tasks public TaskStatus Status { get { return m_task.Status; } } } - // Special purpose derivation of Task that supports limited replication through - // overriding the ShouldReplicate() method. This is used by the Parallel.For/ForEach - // methods. - internal class ParallelForReplicatingTask : Task - { - // Member variables - private int m_replicationDownCount; // downcounter to control replication - - // - // Constructors - // - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // Methods containing StackCrawlMark local var have to be marked non-inlineable - internal ParallelForReplicatingTask( - ParallelOptions parallelOptions, Action action, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions) - : base(action, null, Task.InternalCurrent, default(CancellationToken), creationOptions, internalOptions | InternalTaskOptions.SelfReplicating, null) - { - // Compute the down count based on scheduler/DOP info in parallelOptions. - m_replicationDownCount = parallelOptions.EffectiveMaxConcurrencyLevel; - - StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; - PossiblyCaptureContext(ref stackMark); - } - - - // Controls degree of replication. If downcounter is initialized to -1, then - // replication will be allowed to "run wild". Otherwise, this method decrements - // the downcounter each time it is called, calling false when it is called with - // a zero downcounter. This method returning false effectively ends the replication - // of the associated ParallelForReplicatingTask. - internal override bool ShouldReplicate() - { - if (m_replicationDownCount == -1) return true; // "run wild" - - if (m_replicationDownCount > 0) // Decrement and return true if not called with 0 downcount - { - m_replicationDownCount--; - return true; - } - - return false; // We're done replicating - } - - internal override Task CreateReplicaTask(Action taskReplicaDelegate, Object stateObject, Task parentTask, TaskScheduler taskScheduler, - TaskCreationOptions creationOptionsForReplica, InternalTaskOptions internalOptionsForReplica) - { - return new ParallelForReplicaTask(taskReplicaDelegate, stateObject, parentTask, taskScheduler, - creationOptionsForReplica, internalOptionsForReplica); - } - - - } - - internal class ParallelForReplicaTask : Task - { - internal object m_stateForNextReplica; // some replicas may quit prematurely, in which case they will use this variable - // to save state they want to be picked up by the next replica queued to the same thread - - internal object m_stateFromPreviousReplica; // some replicas may quit prematurely, in which case they will use this variable - // to save state they want to be picked up by the next replica queued to the same thread - - internal Task m_handedOverChildReplica; // some replicas may quit prematurely, in which case they will use this variable - // to hand over the child replica they had queued to the next task that will replace them - - internal ParallelForReplicaTask(Action taskReplicaDelegate, Object stateObject, Task parentTask, TaskScheduler taskScheduler, - TaskCreationOptions creationOptionsForReplica, InternalTaskOptions internalOptionsForReplica) : - base(taskReplicaDelegate, stateObject, parentTask, default(CancellationToken), creationOptionsForReplica, internalOptionsForReplica, taskScheduler) - { - } - - // Allows internal deriving classes to support replicas that exit prematurely and want to pass on state to the next replica - internal override Object SavedStateForNextReplica - { - get { return m_stateForNextReplica; } - - set { m_stateForNextReplica = value; } - } - - // Allows internal deriving classes to support replicas that exit prematurely and want to pass on state to the next replica - internal override Object SavedStateFromPreviousReplica - { - get { return m_stateFromPreviousReplica; } - - set { m_stateFromPreviousReplica = value; } - } - - // Allows internal deriving classes to support replicas that exit prematurely and want to hand over the child replica that they - // had queued, so that the replacement replica can work with that child task instead of queuing up yet another one - internal override Task HandedOverChildReplica - { - get { return m_handedOverChildReplica; } - - set { m_handedOverChildReplica = value; } - } - } - /// /// Specifies flags that control optional behavior for the creation and execution of tasks. /// diff --git a/src/coreclr/src/mscorlib/src/System/Threading/Thread.cs b/src/coreclr/src/mscorlib/src/System/Threading/Thread.cs index 8294c20..b2c559d 100644 --- a/src/coreclr/src/mscorlib/src/System/Threading/Thread.cs +++ b/src/coreclr/src/mscorlib/src/System/Threading/Thread.cs @@ -915,217 +915,6 @@ namespace System.Threading { set { SetAbortReason(value); } } - /*========================================================================= - ** Volatile Read & Write and MemoryBarrier methods. - ** Provides the ability to read and write values ensuring that the values - ** are read/written each time they are accessed. - =========================================================================*/ - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static byte VolatileRead(ref byte address) - { - byte ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static short VolatileRead(ref short address) - { - short ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static int VolatileRead(ref int address) - { - int ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static long VolatileRead(ref long address) - { - long ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static sbyte VolatileRead(ref sbyte address) - { - sbyte ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static ushort VolatileRead(ref ushort address) - { - ushort ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static uint VolatileRead(ref uint address) - { - uint ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static IntPtr VolatileRead(ref IntPtr address) - { - IntPtr ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static UIntPtr VolatileRead(ref UIntPtr address) - { - UIntPtr ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static ulong VolatileRead(ref ulong address) - { - ulong ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static float VolatileRead(ref float address) - { - float ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static double VolatileRead(ref double address) - { - double ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static Object VolatileRead(ref Object address) - { - Object ret = address; - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - return ret; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref byte address, byte value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref short address, short value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref int address, int value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref long address, long value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref sbyte address, sbyte value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref ushort address, ushort value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref uint address, uint value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref IntPtr address, IntPtr value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref UIntPtr address, UIntPtr value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [CLSCompliant(false)] - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref ulong address, ulong value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref float address, float value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref double address, double value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // disable optimizations - public static void VolatileWrite(ref Object address, Object value) - { - MemoryBarrier(); // Call MemoryBarrier to ensure the proper semantic in a portable way. - address = value; - } - [MethodImplAttribute(MethodImplOptions.InternalCall)] public static extern void MemoryBarrier(); -- 2.7.4