From 481628fd1c9df04b2ad57e205f034cd3ad35e825 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 18 Feb 2021 17:00:38 -0500 Subject: [PATCH] Remove partitioning from CancellationTokenSource (#48251) When CancellationTokenSource was original created, the expectation was that a majority use case would be lots of threads in parallel registering and unregistering handlers. This led to a design where CTS internally partitions its registrations to minimize contention between threads contending on its internal data structures. While that certainly comes up in practice, a much more common case is just one thread registering and unregistering at a time as a CancellationToken unique to a particular operation (e.g. a linked token source) is passed down through it, with various levels of the chain registering and unregistering from that non-concurrently-used token source. And having such partitioning results in non-trivial allocation overheads, in particular for a short-lived CTS with which only one or a few registrations are employed in its lifetime. This change removes that partitioning scheme; all scenarios end up with less memory allocation, and non-concurrent scenarios end up measurably faster... scenarios where there is contention do take a measurable hit, but given that's the rare case, it's believed to be the right trade-off (when in doubt, it's also the simpler implementation). As long as I was refactoring a bunch of code, I fixed up a few other things along the way: - Avoided allocating while holding the instance's spin lock - Made WaitForCallbackAsync into a polling async method rather than an async-over-sync method - Changed the state values to be 0-based to avoid needing to initialize _state to something other than 0 in the common case - Used existing throw helpers in a few more cases - Renamed a few methods, and made a few others to be local functions --- .../src/System/Threading/CancellationToken.cs | 2 +- .../Threading/CancellationTokenRegistration.cs | 113 ++--- .../System/Threading/CancellationTokenSource.cs | 546 ++++++++++----------- 3 files changed, 321 insertions(+), 340 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationToken.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationToken.cs index c4ad9c6..ab03e12 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationToken.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationToken.cs @@ -288,7 +288,7 @@ namespace System.Threading CancellationTokenSource? source = _source; return source != null ? - source.InternalRegister(callback, state, useSynchronizationContext ? SynchronizationContext.Current : null, useExecutionContext ? ExecutionContext.Capture() : null) : + source.Register(callback, state, useSynchronizationContext ? SynchronizationContext.Current : null, useExecutionContext ? ExecutionContext.Capture() : null) : default; // Nothing to do for tokens than can never reach the canceled state. Give back a dummy registration. } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenRegistration.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenRegistration.cs index 59b210c..9e080d7 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenRegistration.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenRegistration.cs @@ -31,10 +31,31 @@ namespace System.Threading /// public void Dispose() { - CancellationTokenSource.CallbackNode node = _node; - if (node != null && !node.Partition.Unregister(_id, node)) + if (_node is CancellationTokenSource.CallbackNode node && !node.Registrations.Unregister(_id, node)) { - WaitForCallbackIfNecessary(); + WaitForCallbackIfNecessary(_id, node); + + static void WaitForCallbackIfNecessary(long id, CancellationTokenSource.CallbackNode node) + { + // We're a valid registration but we were unable to unregister, which means the callback wasn't in the list, + // which means either it already executed or it's currently executing. We guarantee that we will not return + // if the callback is being executed (assuming we are not currently called by the callback itself) + // We achieve this by the following rules: + // 1. If we are called in the context of an executing callback, no need to wait (determined by tracking callback-executor threadID) + // - if the currently executing callback is this CTR, then waiting would deadlock. (We choose to return rather than deadlock) + // - if not, then this CTR cannot be the one executing, hence no need to wait + // 2. If unregistration failed, and we are on a different thread, then the callback may be running under control of cts.Cancel() + // => poll until cts.ExecutingCallback is not the one we are trying to unregister. + CancellationTokenSource source = node.Registrations.Source; + if (source.IsCancellationRequested && // Running callbacks has commenced. + !source.IsCancellationCompleted && // Running callbacks hasn't finished. + node.Registrations.ThreadIDExecutingCallbacks != Environment.CurrentManagedThreadId) // The executing thread ID is not this thread's ID. + { + // Callback execution is in progress, the executing thread is different from this thread and has taken the callback for execution + // so observe and wait until this target callback is no longer the executing callback. + node.Registrations.WaitForCallbackToComplete(id); + } + } } } @@ -47,10 +68,27 @@ namespace System.Threading /// public ValueTask DisposeAsync() { - CancellationTokenSource.CallbackNode node = _node; - return node != null && !node.Partition.Unregister(_id, node) ? - WaitForCallbackIfNecessaryAsync() : + return _node is CancellationTokenSource.CallbackNode node && !node.Registrations.Unregister(_id, node) ? + WaitForCallbackIfNecessaryAsync(_id, node) : default; + + static ValueTask WaitForCallbackIfNecessaryAsync(long id, CancellationTokenSource.CallbackNode node) + { + // Same as WaitForCallbackIfNecessary, except returning a task that'll be completed when callbacks complete. + + CancellationTokenSource source = node.Registrations.Source; + if (source.IsCancellationRequested && // Running callbacks has commenced. + !source.IsCancellationCompleted && // Running callbacks hasn't finished. + node.Registrations.ThreadIDExecutingCallbacks != Environment.CurrentManagedThreadId) // The executing thread ID is not this thread's ID. + { + // Callback execution is in progress, the executing thread is different from this thread and has taken the callback for execution + // so get a task that'll complete when this target callback is no longer the executing callback. + return node.Registrations.WaitForCallbackToCompleteAsync(id); + } + + // Callback is either already completed, won't execute, or the callback itself is calling this. + return default; + } } /// Gets the with which this registration is associated. @@ -59,66 +97,17 @@ namespace System.Threading /// to on a token that already had cancellation requested), /// this will return a default token. /// - public CancellationToken Token - { - get - { - CancellationTokenSource.CallbackNode node = _node; - return node != null ? - new CancellationToken(node.Partition.Source) : // avoid CTS.Token, which throws after disposal - default; - } - } + public CancellationToken Token => + _node is CancellationTokenSource.CallbackNode node ? + new CancellationToken(node.Registrations.Source) : // avoid CTS.Token, which throws after disposal + default; /// /// Disposes of the registration and unregisters the target callback from the associated /// CancellationToken. /// - public bool Unregister() - { - CancellationTokenSource.CallbackNode node = _node; - return node != null && node.Partition.Unregister(_id, node); - } - - private void WaitForCallbackIfNecessary() - { - // We're a valid registration but we were unable to unregister, which means the callback wasn't in the list, - // which means either it already executed or it's currently executing. We guarantee that we will not return - // if the callback is being executed (assuming we are not currently called by the callback itself) - // We achieve this by the following rules: - // 1. If we are called in the context of an executing callback, no need to wait (determined by tracking callback-executor threadID) - // - if the currently executing callback is this CTR, then waiting would deadlock. (We choose to return rather than deadlock) - // - if not, then this CTR cannot be the one executing, hence no need to wait - // 2. If unregistration failed, and we are on a different thread, then the callback may be running under control of cts.Cancel() - // => poll until cts.ExecutingCallback is not the one we are trying to unregister. - CancellationTokenSource source = _node.Partition.Source; - if (source.IsCancellationRequested && // Running callbacks has commenced. - !source.IsCancellationCompleted && // Running callbacks hasn't finished. - source.ThreadIDExecutingCallbacks != Environment.CurrentManagedThreadId) // The executing thread ID is not this thread's ID. - { - // Callback execution is in progress, the executing thread is different from this thread and has taken the callback for execution - // so observe and wait until this target callback is no longer the executing callback. - source.WaitForCallbackToComplete(_id); - } - } - - private ValueTask WaitForCallbackIfNecessaryAsync() - { - // Same as WaitForCallbackIfNecessary, except returning a task that'll be completed when callbacks complete. - - CancellationTokenSource source = _node.Partition.Source; - if (source.IsCancellationRequested && // Running callbacks has commenced. - !source.IsCancellationCompleted && // Running callbacks hasn't finished. - source.ThreadIDExecutingCallbacks != Environment.CurrentManagedThreadId) // The executing thread ID is not this thread's ID. - { - // Callback execution is in progress, the executing thread is different from this thread and has taken the callback for execution - // so get a task that'll complete when this target callback is no longer the executing callback. - return source.WaitForCallbackToCompleteAsync(_id); - } - - // Callback is either already completed, won't execute, or the callback itself is calling this. - return default; - } + public bool Unregister() => + _node is CancellationTokenSource.CallbackNode node && node.Registrations.Unregister(_id, node); /// /// Determines whether two CancellationToken. /// - public override bool Equals([NotNullWhen(true)] object? obj) => obj is CancellationTokenRegistration && Equals((CancellationTokenRegistration)obj); + public override bool Equals([NotNullWhen(true)] object? obj) => obj is CancellationTokenRegistration other && Equals(other); /// /// Determines whether the current CancellationToken instance is equal to the diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs index 19357cd..4b03075 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; using System.Threading.Tasks; namespace System.Threading @@ -35,35 +36,22 @@ namespace System.Threading ((CancellationTokenSource)obj).NotifyCancellation(throwOnFirstException: false); // skip ThrowIfDisposed() check in Cancel() }; - /// The number of callback partitions to use in a . Must be a power of 2. - private static readonly int s_numPartitions = GetPartitionCount(); - /// - 1, used to quickly mod into . - private static readonly int s_numPartitionsMask = s_numPartitions - 1; - /// The current state of the CancellationTokenSource. private volatile int _state; - /// The ID of the thread currently executing the main body of CTS.Cancel() - /// - /// This helps us to know if a call to ctr.Dispose() is running 'within' a cancellation callback. - /// This is updated as we move between the main thread calling cts.Cancel() and any syncContexts - /// that are used to actually run the callbacks. - /// - private volatile int _threadIDExecutingCallbacks = -1; - /// Tracks the running callback to assist ctr.Dispose() to wait for the target callback to complete. - private long _executingCallbackId; - /// Partitions of callbacks. Split into multiple partitions to help with scalability of registering/unregistering; each is protected by its own lock. - private volatile CallbackPartition?[]? _callbackPartitions; + /// Whether this has been disposed. + private bool _disposed; /// TimerQueueTimer used by CancelAfter and Timer-related ctors. Used instead of Timer to avoid extra allocations and because the rooted behavior is desired. private volatile TimerQueueTimer? _timer; /// lazily initialized and returned from . private volatile ManualResetEvent? _kernelEvent; - /// Whether this has been disposed. - private bool _disposed; + /// Registration state for the source. + /// Lazily-initialized, also serving as the lock to protect its contained state. + private Registrations? _registrations; // legal values for _state - private const int NotCanceledState = 1; - private const int NotifyingState = 2; - private const int NotifyingCompleteState = 3; + private const int NotCanceledState = 0; // default value of _state + private const int NotifyingState = 1; + private const int NotifyingCompleteState = 2; /// Gets whether cancellation has been requested for this . /// Whether cancellation has been requested for this . @@ -85,16 +73,6 @@ namespace System.Threading /// A simple helper to determine whether cancellation has finished. internal bool IsCancellationCompleted => _state == NotifyingCompleteState; - /// A simple helper to determine whether disposal has occurred. - internal bool IsDisposed => _disposed; - - /// The ID of the thread that is running callbacks. - internal int ThreadIDExecutingCallbacks - { - get => _threadIDExecutingCallbacks; - set => _threadIDExecutingCallbacks = value; - } - /// Gets the associated with this . /// The associated with this . /// The token source has been disposed. @@ -139,12 +117,8 @@ namespace System.Threading } } - - /// Gets the ID of the currently executing callback. - internal long ExecutingCallback => Volatile.Read(ref _executingCallbackId); - /// Initializes the . - public CancellationTokenSource() => _state = NotCanceledState; + public CancellationTokenSource() { } /// /// Constructs a that will be canceled after a specified time span. @@ -170,7 +144,7 @@ namespace System.Threading long totalMilliseconds = (long)delay.TotalMilliseconds; if (totalMilliseconds < -1 || totalMilliseconds > Timer.MaxSupportedTimeout) { - throw new ArgumentOutOfRangeException(nameof(delay)); + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.delay); } InitializeWithTimer((uint)totalMilliseconds); @@ -199,7 +173,7 @@ namespace System.Threading { if (millisecondsDelay < -1) { - throw new ArgumentOutOfRangeException(nameof(millisecondsDelay)); + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.millisecondsDelay); } InitializeWithTimer((uint)millisecondsDelay); @@ -217,13 +191,12 @@ namespace System.Threading } else { - _state = NotCanceledState; _timer = new TimerQueueTimer(s_timerCallback, this, millisecondsDelay, Timeout.UnsignedInfinite, flowExecutionContext: false); // The timer roots this CTS instance while it's scheduled. That is by design, so // that code like: - // CancellationToken ct = new CancellationTokenSource(timeout).Token; - // will successfully cancel the token after the timeout. + // new CancellationTokenSource(timeout).Token.Register(() => ...); + // will successfully invoke the delegate after the timeout. } } @@ -394,8 +367,6 @@ namespace System.Threading } } - - /// Releases the resources used by this . /// This method is not thread-safe for any other concurrent calls. public void Dispose() @@ -440,7 +411,7 @@ namespace System.Threading timer.Close(); // TimerQueueTimer.Close is thread-safe } - _callbackPartitions = null; // free for GC; Cancel correctly handles a null field + _registrations = null; // allow the GC to clean up registrations // If a kernel event was created via WaitHandle, we'd like to Dispose of it. However, // we only want to do so if it's not being used by Cancel concurrently. First, we @@ -466,101 +437,106 @@ namespace System.Threading if (_disposed) { ThrowObjectDisposedException(); + + [DoesNotReturn] + static void ThrowObjectDisposedException() => throw new ObjectDisposedException(null, SR.CancellationTokenSource_Disposed); } } - /// Throws an . Separated out from ThrowIfDisposed to help with inlining. - [DoesNotReturn] - private static void ThrowObjectDisposedException() => - throw new ObjectDisposedException(null, SR.CancellationTokenSource_Disposed); - /// /// Registers a callback object. If cancellation has already occurred, the /// callback will have been run by the time this method returns. /// - internal CancellationTokenRegistration InternalRegister( + internal CancellationTokenRegistration Register( Delegate callback, object? stateForCallback, SynchronizationContext? syncContext, ExecutionContext? executionContext) { Debug.Assert(this != s_neverCanceledSource, "This source should never be exposed via a CancellationToken."); Debug.Assert(callback is Action || callback is Action); // If not canceled, register the handler; if canceled already, run the callback synchronously. - // This also ensures that during ExecuteCallbackHandlers() there will be no mutation of the _callbackPartitions. if (!IsCancellationRequested) { - // In order to enable code to not leak too many handlers, we allow Dispose to be called concurrently - // with Register. While this is not a recommended practice, consumers can and do use it this way. - // We don't make any guarantees about whether the CTS will hold onto the supplied callback if the CTS - // has already been disposed when the callback is registered, but we try not to while at the same time - // not paying any non-negligible overhead. The simple compromise is to check whether we're disposed - // (not volatile), and if we see we are, to return an empty registration. If there's a race and _disposed - // is false even though it's been disposed, or if the disposal request comes in after this line, we simply - // run the minor risk of having _callbackPartitions reinitialized (after it was cleared to null during Dispose). + // We allow Dispose to be called concurrently with Register. While this is not a recommended practice, + // consumers can and do use it this way. if (_disposed) { return default; } - // Get the partitions... - CallbackPartition?[]? partitions = _callbackPartitions; - if (partitions == null) + // Get the registrations object. It's lazily initialized to keep the size of a CTS smaller for situations + // where all operations associated with the CTS complete synchronously and never actually need to register, + // or all only poll. + Registrations? registrations = Volatile.Read(ref _registrations); + if (registrations is null) { - partitions = new CallbackPartition[s_numPartitions]; - partitions = Interlocked.CompareExchange(ref _callbackPartitions, partitions, null) ?? partitions; + registrations = new Registrations(this); + registrations = Interlocked.CompareExchange(ref _registrations, registrations, null) ?? registrations; } - // ...and determine which partition to use. - int partitionIndex = Environment.CurrentManagedThreadId & s_numPartitionsMask; - Debug.Assert(partitionIndex < partitions.Length, $"Expected {partitionIndex} to be less than {partitions.Length}"); - CallbackPartition? partition = partitions[partitionIndex]; - if (partition == null) + // If it looks like there's a node in the freelist we could grab, grab the lock and try to get, configure, + // and register the node. + CallbackNode? node = null; + long id = 0; + if (registrations.FreeNodeList is not null) { - partition = new CallbackPartition(this); - partition = Interlocked.CompareExchange(ref partitions[partitionIndex], partition, null) ?? partition; - } - - // Store the callback information into the callback arrays. - long id; - CallbackNode? node; - bool lockTaken = false; - partition.Lock.Enter(ref lockTaken); - try - { - // Assign the next available unique ID. - id = partition.NextAvailableId++; - Debug.Assert(id != 0, "IDs should never be the reserved value 0."); - - // Get a node, from the free list if possible or else a new one. - node = partition.FreeNodeList; - if (node != null) + registrations.EnterLock(); + try { - partition.FreeNodeList = node.Next; - Debug.Assert(node.Prev == null, "Nodes in the free list should all have a null Prev"); - // node.Next will be overwritten below so no need to set it here. + // Try to take a free node. If we're able to, configure the node and register it. + node = registrations.FreeNodeList; + if (node is not null) + { + Debug.Assert(node.Prev == null, "Nodes in the free list should all have a null Prev"); + registrations.FreeNodeList = node.Next; + + node.Id = id = registrations.NextAvailableId++; + node.Callback = callback; + node.CallbackState = stateForCallback; + node.ExecutionContext = executionContext; + node.SynchronizationContext = syncContext; + node.Next = registrations.Callbacks; + registrations.Callbacks = node; + if (node.Next != null) + { + node.Next.Prev = node; + } + } } - else + finally { - node = new CallbackNode(partition); + registrations.ExitLock(); } + } + + // If we were unsuccessful in using a free node, create a new one, configure it, and register it. + if (node is null) + { + // Allocate the node if we couldn't get one from the free list. We avoid + // doing this while holding the spin lock, to avoid a potentially arbitrary + // amount of GC-related work under the lock, which we aim to keep very tight, + // just a few assignments. + node = new CallbackNode(registrations); - // Configure the node. - node.Id = id; node.Callback = callback; node.CallbackState = stateForCallback; node.ExecutionContext = executionContext; node.SynchronizationContext = syncContext; - // Add it to the callbacks list. - node.Next = partition.Callbacks; - if (node.Next != null) + registrations.EnterLock(); + try { - node.Next.Prev = node; + node.Id = id = registrations.NextAvailableId++; + node.Next = registrations.Callbacks; + if (node.Next != null) + { + node.Next.Prev = node; + } + registrations.Callbacks = node; + } + finally + { + registrations.ExitLock(); } - partition.Callbacks = node; - } - finally - { - partition.Lock.Exit(useMemoryBarrier: false); // no check on lockTaken needed without thread aborts } // If cancellation hasn't been requested, return the registration. @@ -568,10 +544,10 @@ namespace System.Threading // ourselves, but if we can't unregister it (e.g. the thread running Cancel snagged // our callback for execution), return the registration so that the caller can wait // for callback completion in ctr.Dispose(). - var ctr = new CancellationTokenRegistration(id, node); - if (!IsCancellationRequested || !partition.Unregister(id, node)) + Debug.Assert(id != 0, "IDs should never be the reserved value 0."); + if (!IsCancellationRequested || !registrations.Unregister(id, node)) { - return ctr; + return new CancellationTokenRegistration(id, node); } } @@ -614,110 +590,101 @@ namespace System.Threading { Debug.Assert(IsCancellationRequested, "ExecuteCallbackHandlers should only be called after setting IsCancellationRequested->true"); - // Record the threadID being used for running the callbacks. - ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; - // If there are no callbacks to run, we can safely exit. Any race conditions to lazy initialize it // will see IsCancellationRequested and will then run the callback themselves. - CallbackPartition?[]? partitions = Interlocked.Exchange(ref _callbackPartitions, null); - if (partitions == null) + Registrations? registrations = Interlocked.Exchange(ref _registrations, null); + if (registrations is null) { Interlocked.Exchange(ref _state, NotifyingCompleteState); return; } + // Record the threadID being used for running the callbacks. + registrations.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; + List? exceptionList = null; try { - // For each partition, and each callback in that partition, execute the associated handler. // We call the delegates in LIFO order on each partition so that callbacks fire 'deepest first'. // This is intended to help with nesting scenarios so that child enlisters cancel before their parents. - foreach (CallbackPartition? partition in partitions) - { - if (partition == null) - { - // Uninitialized partition. Nothing to do. - continue; - } - // Iterate through all nodes in the partition. We remove each node prior - // to processing it. This allows for unregistration of subsequent registrations - // to still be effective even as other registrations are being invoked. - while (true) + // Iterate through all nodes in the partition. We remove each node prior + // to processing it. This allows for unregistration of subsequent registrations + // to still be effective even as other registrations are being invoked. + while (true) + { + CallbackNode? node; + registrations.EnterLock(); + try { - CallbackNode? node; - bool lockTaken = false; - partition.Lock.Enter(ref lockTaken); - try + // Pop the next registration from the callbacks list. + node = registrations.Callbacks; + if (node == null) { - // Pop the next registration from the callbacks list. - node = partition.Callbacks; - if (node == null) - { - // No more registrations to process. - break; - } - else - { - Debug.Assert(node.Prev == null); - if (node.Next != null) node.Next.Prev = null; - partition.Callbacks = node.Next; - } - - // Publish the intended callback ID, to ensure ctr.Dispose can tell if a wait is necessary. - // This write happens while the lock is held so that Dispose is either able to successfully - // unregister or is guaranteed to see an accurate executing callback ID, since it takes - // the same lock to remove the node from the callback list. - _executingCallbackId = node.Id; - - // Now that we've grabbed the Id, reset the node's Id to 0. This signals - // to code unregistering that the node is no longer associated with a callback. - node.Id = 0; + // No more registrations to process. + break; } - finally + + Debug.Assert(node.Registrations.Source == this); + Debug.Assert(node.Prev == null); + if (node.Next != null) { - partition.Lock.Exit(useMemoryBarrier: false); // no check on lockTaken needed without thread aborts + node.Next.Prev = null; } + registrations.Callbacks = node.Next; + + // Publish the intended callback ID, to ensure ctr.Dispose can tell if a wait is necessary. + // This write happens while the lock is held so that Dispose is either able to successfully + // unregister or is guaranteed to see an accurate executing callback ID, since it takes + // the same lock to remove the node from the callback list. + registrations.ExecutingCallbackId = node.Id; - // Invoke the callback on this thread if there's no sync context or on the - // target sync context if there is one. - try + // Now that we've grabbed the Id, reset the node's Id to 0. This signals + // to code unregistering that the node is no longer associated with a callback. + node.Id = 0; + } + finally + { + registrations.ExitLock(); + } + + // Invoke the callback on this thread if there's no sync context or on the + // target sync context if there is one. + try + { + if (node.SynchronizationContext != null) { - if (node.SynchronizationContext != null) + // Transition to the target syncContext and continue there. + node.SynchronizationContext.Send(static s => { - // Transition to the target syncContext and continue there. - node.SynchronizationContext.Send(static s => - { - var n = (CallbackNode)s!; - n.Partition.Source.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; - n.ExecuteCallback(); - }, node); - ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; // above may have altered ThreadIDExecutingCallbacks, so reset it - } - else - { - node.ExecuteCallback(); - } + var n = (CallbackNode)s!; + n.Registrations.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; + n.ExecuteCallback(); + }, node); + registrations.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; // above may have altered ThreadIDExecutingCallbacks, so reset it } - catch (Exception ex) when (!throwOnFirstException) + else { - // Store the exception and continue - (exceptionList ??= new List()).Add(ex); + node.ExecuteCallback(); } - - // Drop the node. While we could add it to the free list, doing so has cost (we'd need to take the lock again) - // and very limited value. Since a source can only be canceled once, and after it's canceled registrations don't - // need nodes, the only benefit to putting this on the free list would be if Register raced with cancellation - // occurring, such that it could have used this free node but would instead need to allocate a new node (if - // there wasn't another free node available). } + catch (Exception ex) when (!throwOnFirstException) + { + // Store the exception and continue + (exceptionList ??= new List()).Add(ex); + } + + // Drop the node. While we could add it to the free list, doing so has cost (we'd need to take the lock again) + // and very limited value. Since a source can only be canceled once, and after it's canceled registrations don't + // need nodes, the only benefit to putting this on the free list would be if Register raced with cancellation + // occurring, such that it could have used this free node but would instead need to allocate a new node (if + // there wasn't another free node available). } } finally { _state = NotifyingCompleteState; - Volatile.Write(ref _executingCallbackId, 0); - Interlocked.MemoryBarrier(); // for safety, prevent reorderings crossing this point and seeing inconsistent state. + Interlocked.Exchange(ref registrations.ExecutingCallbackId, 0); // for safety, prevent reorderings crossing this point and seeing inconsistent state. } if (exceptionList != null) @@ -727,21 +694,6 @@ namespace System.Threading } } - /// Gets the number of callback partitions to use based on the number of cores. - /// A power of 2 representing the number of partitions to use. - private static int GetPartitionCount() - { - int procs = Environment.ProcessorCount; - int count = - procs > 8 ? 16 : // capped at 16 to limit memory usage on larger machines - procs > 4 ? 8 : - procs > 2 ? 4 : - procs > 1 ? 2 : - 1; - Debug.Assert(count > 0 && (count & (count - 1)) == 0, $"Got {count}, but expected a power of 2"); - return count; - } - /// /// Creates a that will be in the canceled state /// when any of the source tokens are in the canceled state. @@ -790,47 +742,6 @@ namespace System.Threading }; } - /// - /// Wait for a single callback to complete (or, more specifically, to not be running). - /// It is ok to call this method if the callback has already finished. - /// Calling this method before the target callback has been selected for execution would be an error. - /// - internal void WaitForCallbackToComplete(long id) - { - SpinWait sw = default; - while (ExecutingCallback == id) - { - sw.SpinOnce(); // spin, as we assume callback execution is fast and that this situation is rare. - } - } - - /// - /// Asynchronously wait for a single callback to complete (or, more specifically, to not be running). - /// It is ok to call this method if the callback has already finished. - /// Calling this method before the target callback has been selected for execution would be an error. - /// - internal ValueTask WaitForCallbackToCompleteAsync(long id) - { - // If the currently executing callback is not the target one, then the target one has already - // completed and we can simply return. This should be the most common case, as the caller - // calls if we're currently canceling but doesn't know what callback is running, if any. - if (ExecutingCallback != id) - { - return default; - } - - // The specified callback is actually running: queue a task that'll poll for the currently - // executing callback to complete. In general scheduling such a work item that polls is a really - // unfortunate thing to do. However, we expect this to be a rare case (disposing while the associated - // callback is running), and brief when it happens (so the polling will be minimal), and making - // this work with a callback mechanism will add additional cost to other more common cases. - return new ValueTask(Task.Factory.StartNew(static s => - { - var state = (TupleSlim)s!; - state.Item1.WaitForCallbackToComplete(state.Item2); - }, new TupleSlim(this, id), CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default)); - } - private sealed class Linked1CancellationTokenSource : CancellationTokenSource { private readonly CancellationTokenRegistration _reg1; @@ -885,7 +796,7 @@ namespace System.Threading }; private CancellationTokenRegistration[]? _linkingRegistrations; - internal LinkedNCancellationTokenSource(params CancellationToken[] tokens) + internal LinkedNCancellationTokenSource(CancellationToken[] tokens) { _linkingRegistrations = new CancellationTokenRegistration[tokens.Length]; @@ -922,28 +833,59 @@ namespace System.Threading } } - internal sealed class CallbackPartition + private static void Invoke(Delegate d, object? state, CancellationTokenSource source) { - /// The associated source that owns this partition. + Debug.Assert(d is Action || d is Action); + + if (d is Action actionWithState) + { + actionWithState(state); + } + else + { + ((Action)d)(state, new CancellationToken(source)); + } + } + + /// Set of all the registrations in the token source. + /// + /// Separated out into a separate instance to keep CancellationTokenSource smaller for the case where one is created but nothing is registered with it. + /// This happens not infrequently, in particular when one is created for an operation that ends up completing synchronously / quickly. + /// + internal sealed class Registrations + { + /// The associated source. public readonly CancellationTokenSource Source; - /// Lock that protects all state in the partition. - public SpinLock Lock = new SpinLock(enableThreadOwnerTracking: false); // mutable struct; do not make this readonly - /// Doubly-linked list of callbacks registered with the partition. Callbacks are removed during unregistration and as they're invoked. + /// Doubly-linked list of callbacks registered with the source. Callbacks are removed during unregistration and as they're invoked. public CallbackNode? Callbacks; /// Singly-linked list of free nodes that can be used for subsequent callback registrations. public CallbackNode? FreeNodeList; /// Every callback is assigned a unique, never-reused ID. This defines the next available ID. public long NextAvailableId = 1; // avoid using 0, as that's the default long value and used to represent an empty node - - public CallbackPartition(CancellationTokenSource source) - { - Debug.Assert(source != null, "Expected non-null source"); - Source = source; - } - - internal bool Unregister(long id, CallbackNode node) + /// Tracks the running callback to assist ctr.Dispose() to wait for the target callback to complete. + public long ExecutingCallbackId; + /// The ID of the thread currently executing the main body of CTS.Cancel() + /// + /// This helps us to know if a call to ctr.Dispose() is running 'within' a cancellation callback. + /// This is updated as we move between the main thread calling cts.Cancel() and any syncContexts + /// that are used to actually run the callbacks. + /// + public volatile int ThreadIDExecutingCallbacks = -1; + /// Spin lock that protects state in the instance. + private int _lock; + + /// Initializes the instance. + /// The associated source. + public Registrations(CancellationTokenSource source) => Source = source; + + /// Unregisters a callback. + /// The expected id of the registration. + /// The callback node. + /// true if the node was found and removed; false if it couldn't be found or didn't match the provided id. + public bool Unregister(long id, CallbackNode node) { Debug.Assert(node != null, "Expected non-null node"); + Debug.Assert(node.Registrations == this, "Expected node to come from this registrations instance"); if (id == 0) { @@ -953,8 +895,7 @@ namespace System.Threading return false; } - bool lockTaken = false; - Lock.Enter(ref lockTaken); + EnterLock(); try { if (node.Id != id) @@ -987,9 +928,8 @@ namespace System.Threading } // Clear out the now unused node and put it on the singly-linked free list. - // The only field we don't clear out is the associated Partition, as that's fixed - // throughout the nodes lifetime, regardless of how many times its reused by - // the same partition (it's never used on a different partition). + // The only field we don't clear out is the associated Source, as that's fixed + // throughout the nodes lifetime. node.Id = 0; node.Callback = null; node.CallbackState = null; @@ -1003,15 +943,82 @@ namespace System.Threading } finally { - Lock.Exit(useMemoryBarrier: false); // no check on lockTaken needed without thread aborts + ExitLock(); + } + } + + /// + /// Wait for a single callback to complete (or, more specifically, to not be running). + /// It is ok to call this method if the callback has already finished. + /// Calling this method before the target callback has been selected for execution would be an error. + /// + public void WaitForCallbackToComplete(long id) + { + SpinWait sw = default; + while (Volatile.Read(ref ExecutingCallbackId) == id) + { + sw.SpinOnce(); // spin, as we assume callback execution is fast and that this situation is rare. + } + } + + /// + /// Asynchronously wait for a single callback to complete (or, more specifically, to not be running). + /// It is ok to call this method if the callback has already finished. + /// Calling this method before the target callback has been selected for execution would be an error. + /// + public ValueTask WaitForCallbackToCompleteAsync(long id) + { + // If the currently executing callback is not the target one, then the target one has already + // completed and we can simply return. This should be the most common case, as the caller + // calls if we're currently canceling but doesn't know what callback is running, if any. + if (Volatile.Read(ref ExecutingCallbackId) != id) + { + return default; + } + + // The specified callback is actually running: queue an async loop that'll poll for the currently executing + // callback to complete. While such polling isn't ideal, we expect this to be a rare case (disposing while + // the associated callback is running), and brief when it happens (so the polling will be minimal), and making + // this work with a callback mechanism will add additional cost to other more common cases. + return new ValueTask(Task.Factory.StartNew(static async s => + { + var state = (TupleSlim)s!; + while (Volatile.Read(ref state.Item1.ExecutingCallbackId) == state.Item2) + { + await Task.Yield(); + } + }, new TupleSlim(this, id), CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default).Unwrap()); + } + + /// Enters the lock for this instance. The current thread must not be holding the lock, but that is not validated. + public void EnterLock() + { + ref int value = ref _lock; + if (Interlocked.Exchange(ref value, 1) != 0) + { + Contention(ref value); + + [MethodImpl(MethodImplOptions.NoInlining)] + static void Contention(ref int value) + { + SpinWait sw = default; + do { sw.SpinOnce(); } while (Interlocked.Exchange(ref value, 1) == 1); + } } } + + /// Exits the lock for this instance. The current thread must be holding the lock, but that is not validated. + public void ExitLock() + { + Debug.Assert(_lock == 1); + Volatile.Write(ref _lock, 0); + } } /// All of the state associated a registered callback, in a node that's part of a linked list of registered callbacks. internal sealed class CallbackNode { - public readonly CallbackPartition Partition; + public readonly Registrations Registrations; public CallbackNode? Prev; public CallbackNode? Next; @@ -1021,10 +1028,10 @@ namespace System.Threading public ExecutionContext? ExecutionContext; public SynchronizationContext? SynchronizationContext; - public CallbackNode(CallbackPartition partition) + public CallbackNode(Registrations registrations) { - Debug.Assert(partition != null, "Expected non-null partition"); - Partition = partition; + Debug.Assert(registrations != null, "Expected non-null parent registrations"); + Registrations = registrations; } public void ExecuteCallback() @@ -1033,7 +1040,7 @@ namespace System.Threading if (context is null) { Debug.Assert(Callback != null); - Invoke(Callback, CallbackState, Partition.Source); + Invoke(Callback, CallbackState, Registrations.Source); } else { @@ -1041,25 +1048,10 @@ namespace System.Threading { var node = (CallbackNode)s!; Debug.Assert(node.Callback != null); - Invoke(node.Callback, node.CallbackState, node.Partition.Source); + Invoke(node.Callback, node.CallbackState, node.Registrations.Source); }, this); - } } } - - private static void Invoke(Delegate d, object? state, CancellationTokenSource source) - { - Debug.Assert(d is Action || d is Action); - - if (d is Action actionWithState) - { - actionWithState(state); - } - else - { - ((Action)d)(state, new CancellationToken(source)); - } - } } } -- 2.7.4