byte[] rentedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
try
{
- Task<int> t = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length)).AsTask();
- ((IAsyncResult)t).AsyncWaitHandle.WaitOne();
- int readLength = t.GetAwaiter().GetResult();
+ int readLength = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length)).AsTask().GetAwaiter().GetResult();
rentedBuffer.AsSpan(0, readLength).CopyTo(buffer);
return readLength;
}
ThrowIfDisposed();
// TODO: optimize this.
- Task t = WriteAsync(buffer.ToArray()).AsTask();
- ((IAsyncResult)t).AsyncWaitHandle.WaitOne();
- t.GetAwaiter().GetResult();
+ WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
}
// MsQuic doesn't support explicit flushing
get
{
_threadAdjustmentLock.VerifyIsLocked();
- return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
+ return Math.Min(_separated.counts.NumThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
}
}
Debug.Assert(_numBlockedThreads > 0);
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary &&
- _separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
+ _separated.counts.NumThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
{
if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None)
{
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately &&
_numThreadsAddedDueToBlocking > 0 &&
- _separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
+ _separated.counts.NumThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
{
wakeGateThread = true;
_pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately;
addWorker = false;
short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment;
- short numThreadsGoal = _separated.numThreadsGoal;
+ ThreadCounts counts = _separated.counts;
+ short numThreadsGoal = counts.NumThreadsGoal;
if (numThreadsGoal == targetThreadsGoal)
{
return 0;
short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking);
_numThreadsAddedDueToBlocking -= toSubtract;
- _separated.numThreadsGoal = numThreadsGoal -= toSubtract;
+ numThreadsGoal -= toSubtract;
+ _separated.counts.InterlockedSetNumThreadsGoal(numThreadsGoal);
HillClimbing.ThreadPoolHillClimber.ForceChange(
numThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
{
// Calculate how many threads can be added without a delay. Threads that were already created but may be just
// waiting for work can be released for work without a delay, but creating a new thread may need a delay.
- ThreadCounts counts = _separated.counts;
short maxThreadsGoalWithoutDelay =
Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads));
short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay);
} while (false);
_numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal);
- _separated.numThreadsGoal = newNumThreadsGoal;
+ counts = _separated.counts.InterlockedSetNumThreadsGoal(newNumThreadsGoal);
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
// of the number of existing threads, is compared with the goal. There may be alternative
// solutions, for now this is only to maintain consistency in behavior.
ThreadCounts counts = threadPoolInstance._separated.counts;
- if (counts.NumProcessingWork < threadPoolInstance._maxThreads &&
- counts.NumProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
+ while (
+ counts.NumProcessingWork < threadPoolInstance._maxThreads &&
+ counts.NumProcessingWork >= counts.NumThreadsGoal)
{
if (debuggerBreakOnWorkStarvation)
{
Debugger.Break();
}
+ ThreadCounts newCounts = counts;
short newNumThreadsGoal = (short)(counts.NumProcessingWork + 1);
- threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
- HillClimbing.ThreadPoolHillClimber.ForceChange(
- newNumThreadsGoal,
- HillClimbing.StateOrTransition.Starvation);
- addWorker = true;
+ newCounts.NumThreadsGoal = newNumThreadsGoal;
+
+ ThreadCounts countsBeforeUpdate =
+ threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
+ if (countsBeforeUpdate == counts)
+ {
+ HillClimbing.ThreadPoolHillClimber.ForceChange(
+ newNumThreadsGoal,
+ HillClimbing.StateOrTransition.Starvation);
+ addWorker = true;
+ break;
+ }
+
+ counts = countsBeforeUpdate;
}
}
finally
}
else
{
- minimumDelay = (uint)threadPoolInstance._separated.numThreadsGoal * DequeueDelayThresholdMs;
+ minimumDelay = (uint)threadPoolInstance._separated.counts.NumThreadsGoal * DequeueDelayThresholdMs;
}
return delay > minimumDelay;
// SOS's ThreadPool command depends on this layout
private const byte NumProcessingWorkShift = 0;
private const byte NumExistingThreadsShift = 16;
+ private const byte NumThreadsGoalShift = 32;
- private uint _data; // SOS's ThreadPool command depends on this name
+ private ulong _data; // SOS's ThreadPool command depends on this name
- private ThreadCounts(uint data) => _data = data;
+ private ThreadCounts(ulong data) => _data = data;
private short GetInt16Value(byte shift) => (short)(_data >> shift);
private void SetInt16Value(short value, byte shift) =>
- _data = (_data & ~((uint)ushort.MaxValue << shift)) | ((uint)(ushort)value << shift);
+ _data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)(ushort)value << shift);
/// <summary>
/// Number of threads processing work items.
Debug.Assert(value >= 0);
Debug.Assert(value <= NumProcessingWork);
- _data -= (uint)(ushort)value << NumProcessingWorkShift;
+ _data -= (ulong)(ushort)value << NumProcessingWorkShift;
}
public void InterlockedDecrementNumProcessingWork()
Debug.Assert(value >= 0);
Debug.Assert(value <= NumExistingThreads);
- _data -= (uint)(ushort)value << NumExistingThreadsShift;
+ _data -= (ulong)(ushort)value << NumExistingThreadsShift;
+ }
+
+ /// <summary>
+ /// Max possible thread pool threads we want to have.
+ /// </summary>
+ public short NumThreadsGoal
+ {
+ get => GetInt16Value(NumThreadsGoalShift);
+ set
+ {
+ Debug.Assert(value > 0);
+ SetInt16Value(value, NumThreadsGoalShift);
+ }
+ }
+
+ public ThreadCounts InterlockedSetNumThreadsGoal(short value)
+ {
+ ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();
+
+ ThreadCounts counts = this;
+ while (true)
+ {
+ ThreadCounts newCounts = counts;
+ newCounts.NumThreadsGoal = value;
+
+ ThreadCounts countsBeforeUpdate = InterlockedCompareExchange(newCounts, counts);
+ if (countsBeforeUpdate == counts)
+ {
+ return newCounts;
+ }
+
+ counts = countsBeforeUpdate;
+ }
}
public ThreadCounts VolatileRead() => new ThreadCounts(Volatile.Read(ref _data));
- public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts) =>
- new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
+ public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts)
+ {
+#if DEBUG
+ if (newCounts.NumThreadsGoal != oldCounts.NumThreadsGoal)
+ {
+ ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();
+ }
+#endif
+
+ return new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
+ }
public static bool operator ==(ThreadCounts lhs, ThreadCounts rhs) => lhs._data == rhs._data;
public static bool operator !=(ThreadCounts lhs, ThreadCounts rhs) => lhs._data != rhs._data;
public override bool Equals([NotNullWhen(true)] object? obj) => obj is ThreadCounts other && _data == other._data;
- public override int GetHashCode() => (int)_data;
+ public override int GetHashCode() => (int)_data + (int)(_data >> 32);
}
}
}
ThreadCounts newCounts = counts;
newCounts.SubtractNumExistingThreads(1);
short newNumExistingThreads = (short)(numExistingThreads - 1);
-
- ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
+ short newNumThreadsGoal =
+ Math.Max(
+ threadPoolInstance.MinThreadsGoal,
+ Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
+ newCounts.NumThreadsGoal = newNumThreadsGoal;
+
+ ThreadCounts oldCounts =
+ threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
- short newNumThreadsGoal =
- Math.Max(
- threadPoolInstance.MinThreadsGoal,
- Math.Min(newNumExistingThreads, threadPoolInstance._separated.numThreadsGoal));
- if (threadPoolInstance._separated.numThreadsGoal != newNumThreadsGoal)
- {
- threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
- HillClimbing.ThreadPoolHillClimber.ForceChange(
- newNumThreadsGoal,
- HillClimbing.StateOrTransition.ThreadTimedOut);
- }
-
+ HillClimbing.ThreadPoolHillClimber.ForceChange(
+ newNumThreadsGoal,
+ HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
while (true)
{
numProcessingWork = counts.NumProcessingWork;
- if (numProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
+ if (numProcessingWork >= counts.NumThreadsGoal)
{
return;
}
// code from which this implementation was ported, which turns a processing thread into a retired thread
// and checks for pending requests like RemoveWorkingWorker. In this implementation there are
// no retired threads, so only the count of threads processing work is considered.
- if (counts.NumProcessingWork <= threadPoolInstance._separated.numThreadsGoal)
+ if (counts.NumProcessingWork <= counts.NumThreadsGoal)
{
return false;
}
{
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)]
public ThreadCounts counts; // SOS's ThreadPool command depends on this name
- [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1 + sizeof(uint))]
- public short numThreadsGoal;
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)]
public int lastDequeueTime;
_maxThreads = _minThreads;
}
- _separated.numThreadsGoal = _minThreads;
+ _separated.counts.NumThreadsGoal = _minThreads;
}
public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
wakeGateThread = true;
}
}
- else if (_separated.numThreadsGoal < newMinThreads)
+ else if (_separated.counts.NumThreadsGoal < newMinThreads)
{
- _separated.numThreadsGoal = newMinThreads;
+ _separated.counts.InterlockedSetNumThreadsGoal(newMinThreads);
if (_separated.numRequestedWorkers > 0)
{
addWorker = true;
short newMaxThreads = (short)Math.Min(workerThreads, MaxPossibleThreadCount);
_maxThreads = newMaxThreads;
- if (_separated.numThreadsGoal > newMaxThreads)
+ if (_separated.counts.NumThreadsGoal > newMaxThreads)
{
- _separated.numThreadsGoal = newMaxThreads;
+ _separated.counts.InterlockedSetNumThreadsGoal(newMaxThreads);
}
return true;
}
bool addWorker = false;
try
{
- // Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the
- // blocking adjustment heuristics and increase the thread count too quickly.
- if (_pendingBlockingAdjustment != PendingBlockingAdjustment.None)
+ // Repeated checks from ShouldAdjustMaxWorkersActive() inside the lock
+ ThreadCounts counts = _separated.counts;
+ if (counts.NumProcessingWork > counts.NumThreadsGoal ||
+ _pendingBlockingAdjustment != PendingBlockingAdjustment.None)
{
return;
}
+
long startTime = _currentSampleStartTime;
long endTime = Stopwatch.GetTimestamp();
long freq = Stopwatch.Frequency;
int totalNumCompletions = (int)_completionCounter.Count;
int numCompletions = totalNumCompletions - _separated.priorCompletionCount;
+ short oldNumThreadsGoal = counts.NumThreadsGoal;
int newNumThreadsGoal;
(newNumThreadsGoal, _threadAdjustmentIntervalMs) =
- HillClimbing.ThreadPoolHillClimber.Update(_separated.numThreadsGoal, elapsedSeconds, numCompletions);
- short oldNumThreadsGoal = _separated.numThreadsGoal;
+ HillClimbing.ThreadPoolHillClimber.Update(oldNumThreadsGoal, elapsedSeconds, numCompletions);
if (oldNumThreadsGoal != (short)newNumThreadsGoal)
{
- _separated.numThreadsGoal = (short)newNumThreadsGoal;
+ _separated.counts.InterlockedSetNumThreadsGoal((short)newNumThreadsGoal);
//
// If we're increasing the goal, inject a thread. If that thread finds work, it will inject
// threads processing work to stop in response to a decreased thread count goal. The logic here is a bit
// different from the original CoreCLR code from which this implementation was ported because in this
// implementation there are no retired threads, so only the count of threads processing work is considered.
- if (_separated.counts.NumProcessingWork > _separated.numThreadsGoal)
+ ThreadCounts counts = _separated.counts;
+ if (counts.NumProcessingWork > counts.NumThreadsGoal)
{
return false;
}
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
+using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
}).Dispose();
}
+ [ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))]
+ public static void CooperativeBlockingWithProcessingThreadsAndGoalThreadsAndAddWorkerRaceTest()
+ {
+ // Avoid contaminating the main process' environment
+ RemoteExecutor.Invoke(() =>
+ {
+ try
+ {
+ // The test is run affinitized to at most 2 processors for more frequent repros. The actual test process below
+ // will inherit the affinity.
+ Process testParentProcess = Process.GetCurrentProcess();
+ testParentProcess.ProcessorAffinity = (nint)testParentProcess.ProcessorAffinity & 0x3;
+ }
+ catch (PlatformNotSupportedException)
+ {
+ // Processor affinity is not supported on some platforms, try to run the test anyway
+ }
+
+ RemoteExecutor.Invoke(() =>
+ {
+ const uint TestDurationMs = 4000;
+
+ var done = new ManualResetEvent(false);
+ int startTimeMs = Environment.TickCount;
+ Action<object> completingTask = data => ((TaskCompletionSource<int>)data).SetResult(0);
+ Action repeatingTask = null;
+ repeatingTask = () =>
+ {
+ if ((uint)(Environment.TickCount - startTimeMs) >= TestDurationMs)
+ {
+ done.Set();
+ return;
+ }
+
+ Task.Run(repeatingTask);
+
+ var tcs = new TaskCompletionSource<int>();
+ Task.Factory.StartNew(completingTask, tcs);
+ tcs.Task.Wait();
+ };
+
+ for (int i = 0; i < Environment.ProcessorCount; ++i)
+ {
+ Task.Run(repeatingTask);
+ }
+
+ done.CheckedWait();
+ }).Dispose();
+ }).Dispose();
+ }
+
public static bool IsThreadingAndRemoteExecutorSupported =>
PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;
}