2 // Copyright (c) Microsoft. All rights reserved.
3 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
15 This module is the header file for thread pools using Win32 APIs.
22 #ifndef _WIN32THREADPOOL_H
23 #define _WIN32THREADPOOL_H
25 #include "delegateinfo.h"
27 #include "nativeoverlapped.h"
28 #include "hillclimbing.h"
30 #define MAX_WAITHANDLES 64
32 #define MAX_CACHED_EVENTS 40 // upper limit on number of wait events cached
34 #define WAIT_REGISTERED 0x01
35 #define WAIT_ACTIVE 0x02
36 #define WAIT_DELETE 0x04
38 #define TIMER_REGISTERED 0x01
39 #define TIMER_ACTIVE 0x02
40 #define TIMER_DELETE 0x04
42 #define WAIT_SINGLE_EXECUTION 0x00000001
43 #define WAIT_FREE_CONTEXT 0x00000002
44 #define WAIT_INTERNAL_COMPLETION 0x00000004
46 #define QUEUE_ONLY 0x00000000 // do not attempt to call on the thread
47 #define CALL_OR_QUEUE 0x00000001 // call on the same thread if not too busy, else queue
49 const int MaxLimitThreadsPerCPU=250; // upper limit on number of cp threads per CPU
50 const int MaxFreeCPThreadsPerCPU=2; // upper limit on number of free cp threads per CPU
52 const int CpuUtilizationHigh=95; // remove threads when above this
53 const int CpuUtilizationLow =80; // inject more threads if below this
56 extern HANDLE (WINAPI *g_pufnCreateIoCompletionPort)(HANDLE FileHandle,
57 HANDLE ExistingCompletionPort,
58 ULONG_PTR CompletionKey,
59 DWORD NumberOfConcurrentThreads);
61 extern int (WINAPI *g_pufnNtQueryInformationThread) (HANDLE ThreadHandle,
62 THREADINFOCLASS ThreadInformationClass,
63 PVOID ThreadInformation,
64 ULONG ThreadInformationLength,
67 extern int (WINAPI * g_pufnNtQuerySystemInformation) (SYSTEM_INFORMATION_CLASS SystemInformationClass,
68 PVOID SystemInformation,
69 ULONG SystemInformationLength,
70 PULONG ReturnLength OPTIONAL);
71 #endif // !FEATURE_PAL
73 #define FILETIME_TO_INT64(t) (*(__int64*)&(t))
74 #define MILLI_TO_100NANO(x) (x * 10000) // convert from milliseond to 100 nanosecond unit
77 * This type is supposed to be private to ThreadpoolMgr.
78 * It's at global scope because Strike needs to be able to access its
83 LPTHREAD_START_ROUTINE Function;
88 typedef struct _IOCompletionContext
91 DWORD numBytesTransferred;
92 LPOVERLAPPED lpOverlapped;
94 } IOCompletionContext, *PIOCompletionContext;
96 typedef DPTR(WorkRequest) PTR_WorkRequest;
99 friend class ClrDataAccess;
100 friend struct DelegateInfo;
101 friend class ThreadPoolNative;
102 friend class TimerNative;
103 friend class UnManagedPerAppDomainTPCount;
104 friend class ManagedPerAppDomainTPCount;
105 friend class PerAppDomainTPCountList;
106 friend class HillClimbing;
107 friend struct _DacGlobals;
110 // UnfairSemaphore is a more scalable semaphore than CLRSemaphore. It prefers to release threads that have more recently begun waiting,
111 // to preserve locality. Additionally, very recently-waiting threads can be released without an addition kernel transition to unblock
112 // them, which reduces latency.
114 // UnfairSemaphore is only appropriate in scenarios where the order of unblocking threads is not important, and where threads frequently
115 // need to be woken. This is true of the ThreadPool's "worker semaphore", but not, for example, of the "retired worker semaphore" which is
116 // only rarely signalled.
118 // A further optimization that could be done here would be to replace CLRSemaphore with a Win32 IO Completion Port. Completion ports
119 // unblock threads in LIFO order, unlike the roughly-FIFO ordering of ordinary semaphores, and that would help to keep the "warm" threads warm.
120 // We did not do this in CLR 4.0 because hosts currently have no way of intercepting calls to IO Completion Ports (other than THE completion port
121 // behind the I/O thread pool), and we did not have time to explore the implications of this. Also, completion ports are not available on the Mac,
122 // though Snow Leopard has something roughly similar (and a regular Semaphore would do on the Mac in a pinch).
124 class UnfairSemaphore
128 // padding to ensure we get our own cache line
132 // We track everything we care about in a single 64-bit struct to allow us to
133 // do CompareExchanges on this for atomic updates.
139 int spinners : 16; //how many threads are currently spin-waiting for this semaphore?
140 int countForSpinners : 16; //how much of the semaphore's count is availble to spinners?
141 int waiters : 16; //how many threads are blocked in the OS waiting for this semaphore?
142 int countForWaiters : 16; //how much count is available to waiters?
150 CLRSemaphore m_sem; //waiters wait on this
152 // padding to ensure we get our own cache line
155 INDEBUG(int m_maxCount;)
157 bool UpdateCounts(Counts newCounts, Counts currentCounts)
159 LIMITED_METHOD_CONTRACT;
161 oldCounts.asLongLong = FastInterlockCompareExchangeLong(&m_counts.asLongLong, newCounts.asLongLong, currentCounts.asLongLong);
162 if (oldCounts.asLongLong == currentCounts.asLongLong)
164 // we succesfully updated the counts. Now validate what we put in.
165 // Note: we can't validate these unless the CompareExchange succeeds, because
166 // on x86 a VolatileLoad of m_counts is not atomic; we could end up getting inconsistent
167 // values. It's not until we've successfully stored the new values that we know for sure
168 // that the old values were correct (because if they were not, the CompareExchange would have
170 _ASSERTE(newCounts.spinners >= 0);
171 _ASSERTE(newCounts.countForSpinners >= 0);
172 _ASSERTE(newCounts.waiters >= 0);
173 _ASSERTE(newCounts.countForWaiters >= 0);
174 _ASSERTE(newCounts.countForSpinners + newCounts.countForWaiters <= m_maxCount);
180 // we lost a race with some other thread, and will need to try again.
187 UnfairSemaphore(int maxCount)
197 _ASSERTE(maxCount <= 0x7fff); //counts need to fit in signed 16-bit ints
198 INDEBUG(m_maxCount = maxCount;)
200 m_counts.asLongLong = 0;
201 m_sem.Create(0, maxCount);
205 // no destructor - CLRSemaphore will close itself in its own destructor.
212 void Release(int countToRelease)
216 Counts currentCounts, newCounts;
217 currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong);
218 newCounts = currentCounts;
220 int remainingCount = countToRelease;
222 // First, prefer to release existing spinners,
223 // because a) they're hot, and b) we don't need a kernel
224 // transition to release them.
225 int spinnersToRelease = max(0, min(remainingCount, currentCounts.spinners - currentCounts.countForSpinners));
226 newCounts.countForSpinners += spinnersToRelease;
227 remainingCount -= spinnersToRelease;
229 // Next, prefer to release existing waiters
230 int waitersToRelease = max(0, min(remainingCount, currentCounts.waiters - currentCounts.countForWaiters));
231 newCounts.countForWaiters += waitersToRelease;
232 remainingCount -= waitersToRelease;
234 // Finally, release any future spinners that might come our way
235 newCounts.countForSpinners += remainingCount;
237 // Try to commit the transaction
238 if (UpdateCounts(newCounts, currentCounts))
240 // Now we need to release the waiters we promised to release
241 if (waitersToRelease > 0)
244 INDEBUG(BOOL success =) m_sem.Release((LONG)waitersToRelease, &previousCount);
253 bool Wait(DWORD timeout)
257 Counts currentCounts, newCounts;
258 currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong);
259 newCounts = currentCounts;
261 // First, just try to grab some count.
262 if (currentCounts.countForSpinners > 0)
264 newCounts.countForSpinners--;
265 if (UpdateCounts(newCounts, currentCounts))
270 // No count available, become a spinner
271 newCounts.spinners++;
272 if (UpdateCounts(newCounts, currentCounts))
278 // Now we're a spinner.
281 const int spinLimitPerProcessor = 50;
284 Counts currentCounts, newCounts;
286 currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong);
287 newCounts = currentCounts;
289 if (currentCounts.countForSpinners > 0)
291 newCounts.countForSpinners--;
292 newCounts.spinners--;
293 if (UpdateCounts(newCounts, currentCounts))
298 double spinnersPerProcessor = (double)currentCounts.spinners / ThreadpoolMgr::NumberOfProcessors;
299 int spinLimit = (int)((spinLimitPerProcessor / spinnersPerProcessor) + 0.5);
300 if (numSpins >= spinLimit)
302 newCounts.spinners--;
304 if (UpdateCounts(newCounts, currentCounts))
310 // We yield to other threads using SleepEx rather than the more traditional SwitchToThread.
311 // This is because SwitchToThread does not yield to threads currently scheduled to run on other
312 // processors. On a 4-core machine, for example, this means that SwitchToThread is only ~25% likely
313 // to yield to the correct thread in some scenarios.
314 // SleepEx has the disadvantage of not yielding to lower-priority threads. However, this is ok because
315 // once we've called this a few times we'll become a "waiter" and wait on the CLRSemaphore, and that will
316 // yield to anything that is runnable.
318 ClrSleepEx(0, FALSE);
325 // Now we're a waiter
327 DWORD result = m_sem.Wait(timeout, FALSE);
328 _ASSERTE(WAIT_OBJECT_0 == result || WAIT_TIMEOUT == result);
332 Counts currentCounts, newCounts;
334 currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong);
335 newCounts = currentCounts;
339 if (result == WAIT_OBJECT_0)
340 newCounts.countForWaiters--;
342 if (UpdateCounts(newCounts, currentCounts))
343 return (result == WAIT_OBJECT_0);
351 static const int MaxPossibleCount = 0x7fff;
358 // Note: these are signed rather than unsigned to allow us to detect under/overflow.
360 int MaxWorking : 16; //Determined by HillClimbing; adjusted elsewhere for timeouts, etc.
361 int NumActive : 16; //Active means working or waiting on WorkerSemaphore. These are "warm/hot" threads.
362 int NumWorking : 16; //Trying to get work from various queues. Not waiting on either semaphore.
363 int NumRetired : 16; //Not trying to get work; waiting on RetiredWorkerSemaphore. These are "cold" threads.
365 // Note: the only reason we need "retired" threads at all is that it allows some threads to eventually time out
366 // even if other threads are getting work. If we ever make WorkerSemaphore a true LIFO semaphore, we will no longer
367 // need the concept of "retirement" - instead, the very "coldest" threads will naturally be the first to time out.
372 bool operator==(Counts other) {LIMITED_METHOD_CONTRACT; return AsLongLong == other.AsLongLong;}
376 Counts GetCleanCounts()
378 LIMITED_METHOD_CONTRACT;
380 #ifndef DACCESS_COMPILE
381 result.AsLongLong = FastInterlockCompareExchangeLong(&counts.AsLongLong, 0, 0);
382 ValidateCounts(result);
384 result.AsLongLong = 0; //prevents prefast warning for DAC builds
390 // This does a non-atomic read of the counts. The returned value is suitable only
391 // for use inside of a read-compare-exchange loop, where the compare-exhcange must succeed
392 // before any action is taken. Use GetCleanWorkerCounts for other needs, but keep in mind
395 Counts DangerousGetDirtyCounts()
397 LIMITED_METHOD_CONTRACT;
399 #ifndef DACCESS_COMPILE
400 result.AsLongLong = VolatileLoad(&counts.AsLongLong);
402 result.AsLongLong = 0; //prevents prefast warning for DAC builds
408 Counts CompareExchangeCounts(Counts newCounts, Counts oldCounts)
410 LIMITED_METHOD_CONTRACT;
412 #ifndef DACCESS_COMPILE
413 result.AsLongLong = FastInterlockCompareExchangeLong(&counts.AsLongLong, newCounts.AsLongLong, oldCounts.AsLongLong);
414 if (result == oldCounts)
416 // can only do validation on success; if we failed, it may have been due to a previous
417 // dirty read, which may contain invalid values.
418 ValidateCounts(result);
419 ValidateCounts(newCounts);
422 result.AsLongLong = 0; //prevents prefast warning for DAC builds
428 static void ValidateCounts(Counts counts)
430 LIMITED_METHOD_CONTRACT;
431 _ASSERTE(counts.MaxWorking > 0);
432 _ASSERTE(counts.NumActive >= 0);
433 _ASSERTE(counts.NumWorking >= 0);
434 _ASSERTE(counts.NumRetired >= 0);
435 _ASSERTE(counts.NumWorking <= counts.NumActive);
441 static void ReportThreadStatus(bool isWorking);
443 // enumeration of different kinds of memory blocks that are recycled
446 MEMTYPE_AsyncCallback = 0,
447 MEMTYPE_DelegateInfo = 1,
448 MEMTYPE_WorkRequest = 2,
449 MEMTYPE_PostRequest = 3,
453 static BOOL Initialize();
455 static BOOL SetMaxThreadsHelper(DWORD MaxWorkerThreads,
456 DWORD MaxIOCompletionThreads);
458 static BOOL SetMaxThreads(DWORD MaxWorkerThreads,
459 DWORD MaxIOCompletionThreads);
461 static BOOL GetMaxThreads(DWORD* MaxWorkerThreads,
462 DWORD* MaxIOCompletionThreads);
464 static BOOL SetMinThreads(DWORD MinWorkerThreads,
465 DWORD MinIOCompletionThreads);
467 static BOOL GetMinThreads(DWORD* MinWorkerThreads,
468 DWORD* MinIOCompletionThreads);
470 static BOOL GetAvailableThreads(DWORD* AvailableWorkerThreads,
471 DWORD* AvailableIOCompletionThreads);
473 static BOOL QueueUserWorkItem(LPTHREAD_START_ROUTINE Function,
476 BOOL UnmanagedTPRequest=TRUE);
478 static BOOL PostQueuedCompletionStatus(LPOVERLAPPED lpOverlapped,
479 LPOVERLAPPED_COMPLETION_ROUTINE Function);
481 inline static BOOL IsCompletionPortInitialized()
483 LIMITED_METHOD_CONTRACT;
484 return GlobalCompletionPort != NULL;
487 static BOOL DrainCompletionPortQueue();
489 static BOOL RegisterWaitForSingleObject(PHANDLE phNewWaitObject,
491 WAITORTIMERCALLBACK Callback,
496 static BOOL UnregisterWaitEx(HANDLE hWaitObject,HANDLE CompletionEvent);
497 static void WaitHandleCleanup(HANDLE hWaitObject);
499 static BOOL BindIoCompletionCallback(HANDLE FileHandle,
500 LPOVERLAPPED_COMPLETION_ROUTINE Function,
504 static void WaitIOCompletionCallback(DWORD dwErrorCode,
505 DWORD numBytesTransferred,
506 LPOVERLAPPED lpOverlapped);
508 static VOID CallbackForInitiateDrainageOfCompletionPortQueue(
510 DWORD dwNumberOfBytesTransfered,
511 LPOVERLAPPED lpOverlapped
514 static VOID CallbackForContinueDrainageOfCompletionPortQueue(
516 DWORD dwNumberOfBytesTransfered,
517 LPOVERLAPPED lpOverlapped
520 static BOOL SetAppDomainRequestsActive(BOOL UnmanagedTP = FALSE);
521 static void ClearAppDomainRequestsActive(BOOL UnmanagedTP = FALSE, BOOL AdUnloading = FALSE, LONG index = -1);
523 static inline void UpdateLastDequeueTime()
525 LIMITED_METHOD_CONTRACT;
526 LastDequeueTime = GetTickCount();
529 static BOOL CreateTimerQueueTimer(PHANDLE phNewTimer,
530 WAITORTIMERCALLBACK Callback,
536 static BOOL ChangeTimerQueueTimer(HANDLE Timer,
539 static BOOL DeleteTimerQueueTimer(HANDLE Timer,
540 HANDLE CompletionEvent);
542 static void RecycleMemory(LPVOID mem, enum MemType memType);
544 static void FlushQueueOfTimerInfos();
546 static BOOL HaveTimerInfosToFlush() { return TimerInfosToBeRecycled != NULL; }
548 inline static BOOL IsThreadPoolHosted()
550 #ifdef FEATURE_INCLUDE_ALL_INTERFACES
551 IHostThreadpoolManager *provider = CorHost2::GetHostThreadpoolManager();
560 static LPOVERLAPPED CompletionPortDispatchWorkWithinAppDomain(Thread* pThread, DWORD* pErrorCode, DWORD* pNumBytes, size_t* pKey, DWORD adid);
561 static void StoreOverlappedInfoInThread(Thread* pThread, DWORD dwErrorCode, DWORD dwNumBytes, size_t key, LPOVERLAPPED lpOverlapped);
562 #endif // !FEATURE_PAL
564 // Enable filtering of correlation ETW events for cases handled at a higher abstraction level
566 #ifndef DACCESS_COMPILE
567 static FORCEINLINE BOOL AreEtwQueueEventsSpeciallyHandled(LPTHREAD_START_ROUTINE Function)
569 // Timer events are handled at a higher abstraction level: in the managed Timer class
570 return (Function == ThreadpoolMgr::AsyncTimerCallbackCompletion);
573 static FORCEINLINE BOOL AreEtwIOQueueEventsSpeciallyHandled(LPOVERLAPPED_COMPLETION_ROUTINE Function)
575 // We ignore drainage events b/c they are uninteresting
576 // We handle registered waits at a higher abstraction level
577 return (Function == ThreadpoolMgr::CallbackForInitiateDrainageOfCompletionPortQueue
578 || Function == ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue
579 || Function == ThreadpoolMgr::WaitIOCompletionCallback);
585 #ifndef DACCESS_COMPILE
587 inline static void FreeWorkRequest(WorkRequest* workRequest)
589 RecycleMemory( workRequest, MEMTYPE_WorkRequest ); //delete workRequest;
592 inline static WorkRequest* MakeWorkRequest(LPTHREAD_START_ROUTINE function, PVOID context)
602 WorkRequest* wr = (WorkRequest*) GetRecycledMemory(MEMTYPE_WorkRequest);
606 wr->Function = function;
607 wr->Context = context;
613 LPOVERLAPPED_COMPLETION_ROUTINE Function;
615 DWORD numBytesTransferred;
616 LPOVERLAPPED lpOverlapped;
620 inline static PostRequest* MakePostRequest(LPOVERLAPPED_COMPLETION_ROUTINE function, LPOVERLAPPED overlapped)
630 PostRequest* pr = (PostRequest*) GetRecycledMemory(MEMTYPE_PostRequest);
634 pr->Function = function;
636 pr->numBytesTransferred = 0;
637 pr->lpOverlapped = overlapped;
642 inline static void ReleasePostRequest(PostRequest *postRequest)
645 ThreadpoolMgr::RecycleMemory(postRequest, MEMTYPE_PostRequest);
648 typedef Wrapper< PostRequest *, DoNothing<PostRequest *>, ThreadpoolMgr::ReleasePostRequest > PostRequestHolder;
650 #endif // #ifndef DACCESS_COMPILE
655 LPOVERLAPPED pOverlapped;
659 typedef DPTR(struct _LIST_ENTRY) PTR_LIST_ENTRY;
660 typedef struct _LIST_ENTRY {
661 struct _LIST_ENTRY *Flink;
662 struct _LIST_ENTRY *Blink;
663 } LIST_ENTRY, *PLIST_ENTRY;
671 LONG NumWaitHandles; // number of wait objects registered to the thread <=64
672 LONG NumActiveWaits; // number of objects, thread is actually waiting on (this may be less than
673 // NumWaitHandles since the thread may not have activated some waits
674 HANDLE waitHandle[MAX_WAITHANDLES]; // array of wait handles (copied from waitInfo since
675 // we need them to be contiguous)
676 LIST_ENTRY waitPointer[MAX_WAITHANDLES]; // array of doubly linked list of corresponding waitinfo
681 ULONG startTime; // time at which wait was started
682 // endTime = startTime+timeout
683 ULONG remainingTime; // endTime - currentTime
687 LIST_ENTRY link; // Win9x does not allow duplicate waithandles, so we need to
688 // group all waits on a single waithandle using this linked list
690 WAITORTIMERCALLBACK Callback;
697 LONG refCount; // when this reaches 0, the waitInfo can be safely deleted
698 CLREvent PartialCompletionEvent; // used to synchronize deactivation of a wait
699 CLREvent InternalCompletionEvent; // only one of InternalCompletion or ExternalCompletion is used
700 // but I cant make a union since CLREvent has a non-default constructor
701 HANDLE ExternalCompletionEvent; // they are signalled when all callbacks have completed (refCount=0)
703 OBJECTHANDLE ExternalEventSafeHandle;
707 // structure used to maintain global information about wait threads. Protected by WaitThreadsCriticalSection
708 typedef struct WaitThreadTag {
714 struct AsyncCallback{
719 #ifndef DACCESS_COMPILE
722 AcquireAsyncCallback(AsyncCallback *pAsyncCB)
724 LIMITED_METHOD_CONTRACT;
728 ReleaseAsyncCallback(AsyncCallback *pAsyncCB)
738 WaitInfo *waitInfo = pAsyncCB->wait;
739 ThreadpoolMgr::RecycleMemory((LPVOID*)pAsyncCB, ThreadpoolMgr::MEMTYPE_AsyncCallback);
741 // if this was a single execution, we now need to stop rooting registeredWaitHandle
742 // in a GC handle. This will cause the finalizer to pick it up and call the cleanup
744 if ( (waitInfo->flag & WAIT_SINGLE_EXECUTION) && (waitInfo->flag & WAIT_FREE_CONTEXT))
747 DelegateInfo* pDelegate = (DelegateInfo*) waitInfo->Context;
749 _ASSERTE(pDelegate->m_registeredWaitHandle);
753 AppDomainFromIDHolder ad(pDelegate->m_appDomainId, TRUE);
754 if (!ad.IsUnloaded())
755 // if no domain then handle already gone or about to go.
756 StoreObjectInHandle(pDelegate->m_registeredWaitHandle, NULL);
760 if (InterlockedDecrement(&waitInfo->refCount) == 0)
761 ThreadpoolMgr::DeleteWait(waitInfo);
765 typedef Holder<AsyncCallback *, ThreadpoolMgr::AcquireAsyncCallback, ThreadpoolMgr::ReleaseAsyncCallback> AsyncCallbackHolder;
766 inline static AsyncCallback* MakeAsyncCallback()
769 return (AsyncCallback*) GetRecycledMemory(MEMTYPE_AsyncCallback);
772 static VOID ReleaseInfo(OBJECTHANDLE& hndSafeHandle,
774 HANDLE hndNativeHandle)
784 // Use of EX_TRY, GCPROTECT etc in the same function is causing prefast to complain about local variables with
785 // same name masking each other (#246). The error could not be suppressed with "#pragma PREFAST_SUPPRESS"
788 if (hndSafeHandle != NULL)
791 SAFEHANDLEREF refSH = NULL;
794 GCPROTECT_BEGIN(refSH);
799 ENTER_DOMAIN_ID(owningAD);
801 // Read the GC handle
802 refSH = (SAFEHANDLEREF) ObjectToOBJECTREF(ObjectFromHandle(hndSafeHandle));
804 // Destroy the GC handle
805 DestroyHandle(hndSafeHandle);
809 SafeHandleHolder h(&refSH);
811 HANDLE hEvent = refSH->GetHandle();
812 if (hEvent != INVALID_HANDLE_VALUE)
814 UnsafeSetEvent(hEvent);
818 END_DOMAIN_TRANSITION;
823 EX_END_CATCH(SwallowAllExceptions);
828 hndSafeHandle = NULL;
834 #endif // #ifndef DACCESS_COMPILE
843 LIST_ENTRY link; // doubly linked list of timers
844 ULONG FiringTime; // TickCount of when to fire next
845 WAITORTIMERCALLBACK Function; // Function to call when timer fires
846 PVOID Context; // Context to pass to function when timer fires
848 DWORD flag; // How do we deal with the context
851 HANDLE ExternalCompletionEvent; // only one of this is used, but cant do a union since CLREvent has a non-default constructor
852 CLREvent InternalCompletionEvent; // flags indicates which one is being used
853 OBJECTHANDLE ExternalEventSafeHandle;
857 static VOID AcquireWaitInfo(WaitInfo *pInfo)
860 static VOID ReleaseWaitInfo(WaitInfo *pInfo)
863 #ifndef DACCESS_COMPILE
864 ReleaseInfo(pInfo->ExternalEventSafeHandle,
865 pInfo->handleOwningAD,
866 pInfo->ExternalCompletionEvent);
869 static VOID AcquireTimerInfo(TimerInfo *pInfo)
872 static VOID ReleaseTimerInfo(TimerInfo *pInfo)
875 #ifndef DACCESS_COMPILE
876 ReleaseInfo(pInfo->ExternalEventSafeHandle,
877 pInfo->handleOwningAD,
878 pInfo->ExternalCompletionEvent);
882 typedef Holder<WaitInfo *, ThreadpoolMgr::AcquireWaitInfo, ThreadpoolMgr::ReleaseWaitInfo> WaitInfoHolder;
883 typedef Holder<TimerInfo *, ThreadpoolMgr::AcquireTimerInfo, ThreadpoolMgr::ReleaseTimerInfo> TimerInfoHolder;
886 TimerInfo* Timer; // timer to be updated
887 ULONG DueTime ; // new due time
888 ULONG Period ; // new period
891 // Definitions and data structures to support recycling of high-frequency
892 // memory blocks. We use a spin-lock to access the list
894 class RecycledListInfo
896 static const unsigned int MaxCachedEntries = 40;
903 Volatile<LONG> lock; // this is the spin lock
904 DWORD count; // count of number of elements in the list
905 Entry* root; // ptr to first element of recycled list
907 DWORD filler; // Pad the structure to a multiple of the 16.
915 LIMITED_METHOD_CONTRACT;
922 FORCEINLINE bool CanInsert()
924 LIMITED_METHOD_CONTRACT;
926 return count < MaxCachedEntries;
929 FORCEINLINE LPVOID Remove()
931 LIMITED_METHOD_CONTRACT;
933 if(root == NULL) return NULL; // No need for acquiring the lock, there's nothing to remove.
937 Entry* ret = (Entry*)root;
950 FORCEINLINE void Insert( LPVOID mem )
952 LIMITED_METHOD_CONTRACT;
956 Entry* entry = (Entry*)mem;
967 FORCEINLINE void AcquireLock()
969 LIMITED_METHOD_CONTRACT;
971 unsigned int rounds = 0;
973 DWORD dwSwitchCount = 0;
975 while(lock != 0 || FastInterlockExchange( &lock, 1 ) != 0)
977 YieldProcessor(); // indicate to the processor that we are spinning
981 if((rounds % 32) == 0)
983 __SwitchToThread( 0, ++dwSwitchCount );
988 FORCEINLINE void ReleaseLock()
990 LIMITED_METHOD_CONTRACT;
997 // It's critical that we ensure these pointers are allocated by the linker away from
998 // variables that are modified a lot at runtime.
1000 // The use of the CacheGuard is a temporary solution,
1001 // the thread pool has to be refactor away from static variable and
1002 // toward a single global structure, where we can control the locality of variables.
1004 class RecycledListsWrapper
1006 DWORD CacheGuardPre[64/sizeof(DWORD)];
1008 RecycledListInfo (*pRecycledListPerProcessor)[MEMTYPE_COUNT]; // RecycledListInfo [numProc][MEMTYPE_COUNT]
1010 DWORD CacheGuardPost[64/sizeof(DWORD)];
1013 void Initialize( unsigned int numProcs );
1015 FORCEINLINE bool IsInitialized()
1017 LIMITED_METHOD_CONTRACT;
1019 return pRecycledListPerProcessor != NULL;
1022 FORCEINLINE RecycledListInfo& GetRecycleMemoryInfo( enum MemType memType )
1024 LIMITED_METHOD_CONTRACT;
1026 if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups())
1027 return pRecycledListPerProcessor[CPUGroupInfo::CalculateCurrentProcessorNumber()][memType];
1029 // Turns out GetCurrentProcessorNumber can return a value greater than the number of processors reported by
1030 // GetSystemInfo, if we're running in WOW64 on a machine with >32 processors.
1031 return pRecycledListPerProcessor[GetCurrentProcessorNumber()%NumberOfProcessors][memType];
1035 #define GATE_THREAD_STATUS_NOT_RUNNING 0 // There is no gate thread
1036 #define GATE_THREAD_STATUS_REQUESTED 1 // There is a gate thread, and someone has asked it to stick around recently
1037 #define GATE_THREAD_STATUS_WAITING_FOR_REQUEST 2 // There is a gate thread, but nobody has asked it to stay. It may die soon
1041 static DWORD __stdcall intermediateThreadProc(PVOID arg);
1044 LPTHREAD_START_ROUTINE lpThreadFunction;
1046 } intermediateThreadParam;
1048 static Thread* CreateUnimpersonatedThread(LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpArgs, BOOL *pIsCLRThread);
1050 static BOOL CreateWorkerThread();
1052 static void EnqueueWorkRequest(WorkRequest* wr);
1054 static WorkRequest* DequeueWorkRequest();
1056 static void ExecuteWorkRequest(bool* foundWork, bool* wasNotRecalled);
1058 static DWORD WINAPI ExecuteHostRequest(PVOID pArg);
1060 #ifndef DACCESS_COMPILE
1062 inline static void AppendWorkRequest(WorkRequest* entry)
1072 if (WorkRequestTail)
1074 _ASSERTE(WorkRequestHead != NULL);
1075 WorkRequestTail->next = entry;
1079 _ASSERTE(WorkRequestHead == NULL);
1080 WorkRequestHead = entry;
1083 WorkRequestTail = entry;
1084 _ASSERTE(WorkRequestTail->next == NULL);
1087 inline static WorkRequest* RemoveWorkRequest()
1097 WorkRequest* entry = NULL;
1098 if (WorkRequestHead)
1100 entry = WorkRequestHead;
1101 WorkRequestHead = entry->next;
1102 if (WorkRequestHead == NULL)
1103 WorkRequestTail = NULL;
1108 static void EnsureInitialized();
1109 static void InitPlatformVariables();
1111 inline static BOOL IsInitialized()
1113 LIMITED_METHOD_CONTRACT;
1114 return Initialization == -1;
1117 static void MaybeAddWorkingWorker();
1119 static void NotifyWorkItemCompleted()
1121 WRAPPER_NO_CONTRACT;
1122 if (!CLRThreadpoolHosted())
1124 Thread::IncrementThreadPoolCompletionCount();
1125 UpdateLastDequeueTime();
1129 static bool ShouldAdjustMaxWorkersActive()
1131 WRAPPER_NO_CONTRACT;
1133 if (CLRThreadpoolHosted())
1136 DWORD requiredInterval = NextCompletedWorkRequestsTime - PriorCompletedWorkRequestsTime;
1137 DWORD elapsedInterval = GetTickCount() - PriorCompletedWorkRequestsTime;
1138 if (elapsedInterval >= requiredInterval)
1140 ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts();
1141 if (counts.NumActive <= counts.MaxWorking)
1148 static void AdjustMaxWorkersActive();
1149 static bool ShouldWorkerKeepRunning();
1151 static BOOL SuspendProcessing();
1153 static DWORD SafeWait(CLREvent * ev, DWORD sleepTime, BOOL alertable);
1155 static DWORD __stdcall WorkerThreadStart(LPVOID lpArgs);
1157 static BOOL AddWaitRequest(HANDLE waitHandle, WaitInfo* waitInfo);
1160 static ThreadCB* FindWaitThread(); // returns a wait thread that can accomodate another wait request
1162 static BOOL CreateWaitThread();
1164 static void __stdcall InsertNewWaitForSelf(WaitInfo* pArg);
1166 static int FindWaitIndex(const ThreadCB* threadCB, const HANDLE waitHandle);
1168 static DWORD MinimumRemainingWait(LIST_ENTRY* waitInfo, unsigned int numWaits);
1170 static void ProcessWaitCompletion( WaitInfo* waitInfo,
1171 unsigned index, // array index
1174 static DWORD __stdcall WaitThreadStart(LPVOID lpArgs);
1176 static DWORD __stdcall AsyncCallbackCompletion(PVOID pArgs);
1178 static void QueueTimerInfoForRelease(TimerInfo *pTimerInfo);
1180 static DWORD __stdcall QUWIPostCompletion(PVOID pArgs);
1182 static void DeactivateWait(WaitInfo* waitInfo);
1183 static void DeactivateNthWait(WaitInfo* waitInfo, DWORD index);
1185 static void DeleteWait(WaitInfo* waitInfo);
1188 inline static void ShiftWaitArray( ThreadCB* threadCB,
1193 LIMITED_METHOD_CONTRACT;
1194 memcpy(&threadCB->waitHandle[DestIndex],
1195 &threadCB->waitHandle[SrcIndex],
1196 count * sizeof(HANDLE));
1197 memcpy(&threadCB->waitPointer[DestIndex],
1198 &threadCB->waitPointer[SrcIndex],
1199 count * sizeof(LIST_ENTRY));
1202 static void __stdcall DeregisterWait(WaitInfo* pArgs);
1205 // holds the aggregate of system cpu usage of all processors
1206 typedef struct _PROCESS_CPU_INFORMATION
1208 LARGE_INTEGER idleTime;
1209 LARGE_INTEGER kernelTime;
1210 LARGE_INTEGER userTime;
1211 DWORD_PTR affinityMask;
1212 int numberOfProcessors;
1213 SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION* usageBuffer;
1214 int usageBufferSize;
1215 } PROCESS_CPU_INFORMATION;
1217 static int GetCPUBusyTime_NT(PROCESS_CPU_INFORMATION* pOldInfo);
1218 static BOOL CreateCompletionPortThread(LPVOID lpArgs);
1219 static DWORD __stdcall CompletionPortThreadStart(LPVOID lpArgs);
1221 inline static bool HaveNativeWork()
1223 LIMITED_METHOD_CONTRACT;
1224 return WorkRequestHead != NULL;
1227 static void GrowCompletionPortThreadpoolIfNeeded();
1228 static BOOL ShouldGrowCompletionPortThreadpool(ThreadCounter::Counts counts);
1230 static int GetCPUBusyTime_NT(PAL_IOCP_CPU_INFORMATION* pOldInfo);
1232 #endif // !FEATURE_PAL
1235 static BOOL IsIoPending();
1237 static BOOL CreateGateThread();
1238 static void EnsureGateThreadRunning();
1239 static bool ShouldGateThreadKeepRunning();
1240 static DWORD __stdcall GateThreadStart(LPVOID lpArgs);
1241 static BOOL SufficientDelaySinceLastSample(unsigned int LastThreadCreationTime,
1242 unsigned NumThreads, // total number of threads of that type (worker or CP)
1243 double throttleRate=0.0 // the delay is increased by this percentage for each extra thread
1245 static BOOL SufficientDelaySinceLastDequeue();
1247 static LPVOID GetRecycledMemory(enum MemType memType);
1249 static DWORD __stdcall TimerThreadStart(LPVOID args);
1250 static void TimerThreadFire(); // helper method used by TimerThreadStart
1251 static void __stdcall InsertNewTimer(TimerInfo* pArg);
1252 static DWORD FireTimers();
1253 static DWORD __stdcall AsyncTimerCallbackCompletion(PVOID pArgs);
1254 static void DeactivateTimer(TimerInfo* timerInfo);
1255 static DWORD __stdcall AsyncDeleteTimer(PVOID pArgs);
1256 static void DeleteTimer(TimerInfo* timerInfo);
1257 static void __stdcall UpdateTimer(TimerUpdateInfo* pArgs);
1259 static void __stdcall DeregisterTimer(TimerInfo* pArgs);
1261 inline static DWORD QueueDeregisterWait(HANDLE waitThread, WaitInfo* waitInfo)
1271 DWORD result = QueueUserAPC(reinterpret_cast<PAPCFUNC>(DeregisterWait), waitThread, reinterpret_cast<ULONG_PTR>(waitInfo));
1272 SetWaitThreadAPCPending();
1277 inline static void SetWaitThreadAPCPending() {IsApcPendingOnWaitThread = TRUE;}
1278 inline static void ResetWaitThreadAPCPending() {IsApcPendingOnWaitThread = FALSE;}
1279 inline static BOOL IsWaitThreadAPCPending() {return IsApcPendingOnWaitThread;}
1282 inline static DWORD GetTickCount()
1284 LIMITED_METHOD_CONTRACT;
1285 return ::GetTickCount() + TickCountAdjustment;
1289 #endif // #ifndef DACCESS_COMPILE
1290 // Private variables
1292 static LONG Initialization; // indicator of whether the threadpool is initialized.
1294 SVAL_DECL(LONG,MinLimitTotalWorkerThreads); // same as MinLimitTotalCPThreads
1295 SVAL_DECL(LONG,MaxLimitTotalWorkerThreads); // same as MaxLimitTotalCPThreads
1297 static Volatile<unsigned int> LastDequeueTime; // used to determine if work items are getting thread starved
1299 static HillClimbing HillClimbingInstance;
1301 static Volatile<LONG> PriorCompletedWorkRequests;
1302 static Volatile<DWORD> PriorCompletedWorkRequestsTime;
1303 static Volatile<DWORD> NextCompletedWorkRequestsTime;
1305 static LARGE_INTEGER CurrentSampleStartTime;
1307 static int ThreadAdjustmentInterval;
1309 SPTR_DECL(WorkRequest,WorkRequestHead); // Head of work request queue
1310 SPTR_DECL(WorkRequest,WorkRequestTail); // Head of work request queue
1312 static unsigned int LastCPThreadCreation; // last time a completion port thread was created
1313 static unsigned int NumberOfProcessors; // = NumberOfWorkerThreads - no. of blocked threads
1315 static BOOL IsApcPendingOnWaitThread; // Indicates if an APC is pending on the wait thread
1317 // This needs to be non-hosted, because worker threads can run prior to EE startup.
1318 static DangerousNonHostedSpinLock ThreadAdjustmentLock;
1321 static CrstStatic WorkerCriticalSection;
1324 static const DWORD WorkerTimeout = 20 * 1000;
1325 static const DWORD WorkerTimeoutAppX = 5 * 1000; // shorter timeout to allow threads to exit prior to app suspension
1327 SVAL_DECL(ThreadCounter,WorkerCounter);
1330 // WorkerSemaphore is an UnfairSemaphore because:
1331 // 1) Threads enter and exit this semaphore very frequently, and thus benefit greatly from the spinning done by UnfairSemaphore
1332 // 2) There is no functional reason why any particular thread should be preferred when waking workers. This only impacts performance,
1333 // and un-fairness helps performance in this case.
1335 static UnfairSemaphore* WorkerSemaphore;
1338 // RetiredWorkerSemaphore is a regular CLRSemaphore, not an UnfairSemaphore, because if a thread waits on this semaphore is it almost certainly
1339 // NOT going to be released soon, so the spinning done in UnfairSemaphore only burns valuable CPU time. However, if UnfairSemaphore is ever
1340 // implemented in terms of a Win32 IO Completion Port, we should reconsider this. The IOCP's LIFO unblocking behavior could help keep working set
1341 // down, by constantly re-using the same small set of retired workers rather than round-robining between all of them as CLRSemaphore will do.
1342 // If we go that route, we should add a "no-spin" option to UnfairSemaphore.Wait to avoid wasting CPU.
1344 static CLRSemaphore* RetiredWorkerSemaphore;
1346 static CLREvent * RetiredCPWakeupEvent;
1348 static CrstStatic WaitThreadsCriticalSection;
1349 static LIST_ENTRY WaitThreadsHead; // queue of wait threads, each thread can handle upto 64 waits
1351 static TimerInfo *TimerInfosToBeRecycled; // list of delegate infos associated with deleted timers
1352 static CrstStatic TimerQueueCriticalSection; // critical section to synchronize timer queue access
1353 SVAL_DECL(LIST_ENTRY,TimerQueue); // queue of timers
1354 static HANDLE TimerThread; // Currently we only have one timer thread
1355 static Thread* pTimerThread;
1356 static DWORD LastTickCount; // the count just before timer thread goes to sleep
1358 static BOOL InitCompletionPortThreadpool; // flag indicating whether completion port threadpool has been initialized
1359 static HANDLE GlobalCompletionPort; // used for binding io completions on file handles
1362 SVAL_DECL(ThreadCounter,CPThreadCounter);
1365 SVAL_DECL(LONG,MaxLimitTotalCPThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS
1366 SVAL_DECL(LONG,MinLimitTotalCPThreads);
1367 SVAL_DECL(LONG,MaxFreeCPThreads); // = MaxFreeCPThreadsPerCPU * Number of CPUS
1369 static LONG GateThreadStatus; // See GateThreadStatus enumeration
1371 static Volatile<LONG> NumCPInfrastructureThreads; // number of threads currently busy handling draining cycle
1373 SVAL_DECL(LONG,cpuUtilization);
1374 static LONG cpuUtilizationAverage;
1376 static RecycledListsWrapper RecycledLists;
1379 static DWORD TickCountAdjustment; // add this value to value returned by GetTickCount
1382 static int offset_counter;
1383 static const int offset_multiplier = 128;
1389 #endif // _WIN32THREADPOOL_H