//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
-using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
-using System.Security;
namespace System.Threading.Tasks
{
public class ConcurrentExclusiveSchedulerPair
{
/// <summary>A dictionary mapping thread ID to a processing mode to denote what kinds of tasks are currently being processed on this thread.</summary>
- private readonly ConcurrentDictionary<int, ProcessingMode> m_threadProcessingMapping = new ConcurrentDictionary<int, ProcessingMode>();
+ private readonly ThreadLocal<ProcessingMode> m_threadProcessingMode = new ThreadLocal<ProcessingMode>(() => ProcessingMode.NotCurrentlyProcessing);
/// <summary>The scheduler used to queue and execute "concurrent" tasks that may run concurrently with other concurrent tasks.</summary>
private readonly ConcurrentExclusiveTaskScheduler m_concurrentTaskScheduler;
/// <summary>The scheduler used to queue and execute "exclusive" tasks that must run exclusively while no other tasks for this pair are running.</summary>
private static Int32 DefaultMaxConcurrencyLevel { get { return Environment.ProcessorCount; } }
/// <summary>Gets the sync obj used to protect all state on this instance.</summary>
- private object ValueLock { get { return m_threadProcessingMapping; } }
+ private object ValueLock { get { return m_threadProcessingMode; } }
/// <summary>
/// Initializes the ConcurrentExclusiveSchedulerPair.
cs.m_completionQueued = true;
ThreadPool.QueueUserWorkItem(state =>
{
- var localCs = (CompletionState)state; // don't use 'cs', as it'll force a closure
- Debug.Assert(!localCs.Task.IsCompleted, "Completion should only happen once.");
+ var localThis = (ConcurrentExclusiveSchedulerPair)state;
+ Debug.Assert(!localThis.m_completionState.Task.IsCompleted, "Completion should only happen once.");
- var exceptions = localCs.m_exceptions;
+ List<Exception> exceptions = localThis.m_completionState.m_exceptions;
bool success = (exceptions != null && exceptions.Count > 0) ?
- localCs.TrySetException(exceptions) :
- localCs.TrySetResult(default(VoidTaskResult));
+ localThis.m_completionState.TrySetException(exceptions) :
+ localThis.m_completionState.TrySetResult(default);
Debug.Assert(success, "Expected to complete completion task.");
- }, cs);
+
+ localThis.m_threadProcessingMode.Dispose();
+ }, this);
}
}
try
{
// Note that we're processing exclusive tasks on the current thread
- Debug.Assert(!m_threadProcessingMapping.ContainsKey(Thread.CurrentThread.ManagedThreadId),
+ Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.NotCurrentlyProcessing,
"This thread should not yet be involved in this pair's processing.");
- m_threadProcessingMapping[Thread.CurrentThread.ManagedThreadId] = ProcessingMode.ProcessingExclusiveTask;
+ m_threadProcessingMode.Value = ProcessingMode.ProcessingExclusiveTask;
// Process up to the maximum number of items per task allowed
for (int i = 0; i < m_maxItemsPerTask; i++)
finally
{
// We're no longer processing exclusive tasks on the current thread
- ProcessingMode currentMode;
- m_threadProcessingMapping.TryRemove(Thread.CurrentThread.ManagedThreadId, out currentMode);
- Debug.Assert(currentMode == ProcessingMode.ProcessingExclusiveTask,
- "Somehow we ended up escaping exclusive mode.");
+ Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.ProcessingExclusiveTask, "Somehow we ended up escaping exclusive mode.");
+ m_threadProcessingMode.Value = ProcessingMode.NotCurrentlyProcessing;
lock (ValueLock)
{
try
{
// Note that we're processing concurrent tasks on the current thread
- Debug.Assert(!m_threadProcessingMapping.ContainsKey(Thread.CurrentThread.ManagedThreadId),
+ Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.NotCurrentlyProcessing,
"This thread should not yet be involved in this pair's processing.");
- m_threadProcessingMapping[Thread.CurrentThread.ManagedThreadId] = ProcessingMode.ProcessingConcurrentTasks;
+ m_threadProcessingMode.Value = ProcessingMode.ProcessingConcurrentTasks;
// Process up to the maximum number of items per task allowed
for (int i = 0; i < m_maxItemsPerTask; i++)
finally
{
// We're no longer processing concurrent tasks on the current thread
- ProcessingMode currentMode;
- m_threadProcessingMapping.TryRemove(Thread.CurrentThread.ManagedThreadId, out currentMode);
- Debug.Assert(currentMode == ProcessingMode.ProcessingConcurrentTasks,
- "Somehow we ended up escaping concurrent mode.");
+ Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.ProcessingConcurrentTasks, "Somehow we ended up escaping concurrent mode.");
+ m_threadProcessingMode.Value = ProcessingMode.NotCurrentlyProcessing;
lock (ValueLock)
{
}
}
-#if PRENET45
- /// <summary>
- /// Type used with TaskCompletionSource(Of TResult) as the TResult
- /// to ensure that the resulting task can't be upcast to something
- /// that in the future could lead to compat problems.
- /// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")]
- [DebuggerNonUserCode]
- private struct VoidTaskResult { }
-#endif
-
/// <summary>
/// Holder for lazily-initialized state about the completion of a scheduler pair.
/// Completion is only triggered either by rare exceptional conditions or by
// If a task is already running on this thread, allow inline execution to proceed.
// If there's already a task from this scheduler running on the current thread, we know it's safe
// to run this task, in effect temporarily taking that task's count allocation.
- ProcessingMode currentThreadMode;
- if (m_pair.m_threadProcessingMapping.TryGetValue(Thread.CurrentThread.ManagedThreadId, out currentThreadMode) &&
- currentThreadMode == m_processingMode)
+ if (m_pair.m_threadProcessingMode.Value == m_processingMode)
{
// If we're targeting the default scheduler and taskWasPreviouslyQueued is false,
// we know the default scheduler will allow it, so we can just execute it here.
internal static void ContractAssertMonitorStatus(object syncObj, bool held)
{
Debug.Assert(syncObj != null, "The monitor object to check must be provided.");
-#if PRENET45
-#if DEBUG
- // This check is expensive,
- // which is why it's protected by ShouldCheckMonitorStatus and controlled by an environment variable DEBUGSYNC.
- if (ShouldCheckMonitorStatus)
- {
- bool exceptionThrown;
- try
- {
- Monitor.Pulse(syncObj); // throws a SynchronizationLockException if the monitor isn't held by this thread
- exceptionThrown = false;
- }
- catch (SynchronizationLockException) { exceptionThrown = true; }
- Debug.Assert(held == !exceptionThrown, "The locking scheme was not correctly followed.");
- }
-#endif
-#else
Debug.Assert(Monitor.IsEntered(syncObj) == held, "The locking scheme was not correctly followed.");
-#endif
}
/// <summary>Gets the options to use for tasks.</summary>
/// <returns>The options to use.</returns>
internal static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica = false)
{
- TaskCreationOptions options =
-#if PRENET45
- TaskCreationOptions.None;
-#else
- TaskCreationOptions.DenyChildAttach;
-#endif
+ TaskCreationOptions options = TaskCreationOptions.DenyChildAttach;
if (isReplacementReplica) options |= TaskCreationOptions.PreferFairness;
return options;
}