2 #include "LinearMath/btMinMax.h"
3 #include "LinearMath/btAlignedObjectArray.h"
4 #include "LinearMath/btThreads.h"
5 #include "LinearMath/btQuickprof.h"
11 #include "btThreadSupportInterface.h"
15 #define WIN32_LEAN_AND_MEAN
21 typedef unsigned long long btU64;
22 static const int kCacheLineSize = 64;
31 struct WorkerThreadStatus
42 ATTRIBUTE_ALIGNED64(class)
43 WorkerThreadDirectives
45 static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
46 // directives for all worker threads packed into a single cacheline
47 char m_threadDirs[kMaxThreadCount];
53 kGoToSleep, // go to sleep
54 kStayAwakeButIdle, // wait for not checking job queue
55 kScanForJobs, // actively scan job queue for jobs
57 WorkerThreadDirectives()
59 for (int i = 0; i < kMaxThreadCount; ++i)
65 Type getDirective(int threadId)
67 btAssert(threadId < kMaxThreadCount);
68 return static_cast<Type>(m_threadDirs[threadId]);
71 void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
73 btAssert(threadBegin < threadEnd);
74 btAssert(threadEnd <= kMaxThreadCount);
75 char dirChar = static_cast<char>(dir);
76 for (int i = threadBegin; i < threadEnd; ++i)
78 m_threadDirs[i] = dirChar;
85 ATTRIBUTE_ALIGNED64(struct)
89 WorkerThreadStatus::Type m_status;
90 int m_numJobsFinished;
93 WorkerThreadDirectives* m_directive;
96 unsigned int m_cooldownTime;
101 virtual void executeJob(int threadId) = 0;
104 class ParallelForJob : public IJob
106 const btIParallelForBody* m_body;
111 ParallelForJob(int iBegin, int iEnd, const btIParallelForBody& body)
117 virtual void executeJob(int threadId) BT_OVERRIDE
119 BT_PROFILE("executeJob");
121 // call the functor body to do the work
122 m_body->forLoop(m_begin, m_end);
126 class ParallelSumJob : public IJob
128 const btIParallelSumBody* m_body;
129 ThreadLocalStorage* m_threadLocalStoreArray;
134 ParallelSumJob(int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls)
137 m_threadLocalStoreArray = tls;
141 virtual void executeJob(int threadId) BT_OVERRIDE
143 BT_PROFILE("executeJob");
145 // call the functor body to do the work
146 btScalar val = m_body->sumLoop(m_begin, m_end);
147 #if BT_PARALLEL_SUM_DETERMINISTISM
148 // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
149 const float TRUNC_SCALE = float(1 << 19);
150 val = floor(val * TRUNC_SCALE + 0.5f) / TRUNC_SCALE; // truncate some bits
152 m_threadLocalStoreArray[threadId].m_sumResult += val;
156 ATTRIBUTE_ALIGNED64(class)
159 btThreadSupportInterface* m_threadSupport;
160 btCriticalSection* m_queueLock;
163 btAlignedObjectArray<IJob*> m_jobQueue;
171 btAlignedObjectArray<JobQueue*> m_neighborContexts;
172 char m_cachePadding[kCacheLineSize]; // prevent false sharing
179 btAlignedFree(m_jobMem);
183 void resizeJobMem(int newSize)
185 if (newSize > m_jobMemSize)
188 m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
189 m_jobMemSize = newSize;
198 m_threadSupport = NULL;
202 m_useSpinMutex = false;
211 if (m_queueLock && m_threadSupport)
213 m_threadSupport->deleteCriticalSection(m_queueLock);
219 void init(btThreadSupportInterface * threadSup, btAlignedObjectArray<JobQueue> * contextArray)
221 m_threadSupport = threadSup;
224 m_queueLock = m_threadSupport->createCriticalSection();
226 setupJobStealing(contextArray, contextArray->size());
228 void setupJobStealing(btAlignedObjectArray<JobQueue> * contextArray, int numActiveContexts)
230 btAlignedObjectArray<JobQueue>& contexts = *contextArray;
232 for (int i = 0; i < contexts.size(); ++i)
234 if (this == &contexts[i])
240 int numNeighbors = btMin(2, contexts.size() - 1);
241 int neighborOffsets[] = {-1, 1, -2, 2, -3, 3};
242 int numOffsets = sizeof(neighborOffsets) / sizeof(neighborOffsets[0]);
243 m_neighborContexts.reserve(numNeighbors);
244 m_neighborContexts.resizeNoInitialize(0);
245 for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
247 int neighborIndex = selfIndex + neighborOffsets[i];
248 if (neighborIndex >= 0 && neighborIndex < numActiveContexts)
250 m_neighborContexts.push_back(&contexts[neighborIndex]);
255 bool isQueueEmpty() const { return m_queueIsEmpty; }
275 m_queueLock->unlock();
278 void clearQueue(int jobCount, int jobSize)
284 m_queueIsEmpty = true;
285 int jobBufSize = jobSize * jobCount;
286 // make sure we have enough memory allocated to store jobs
287 if (jobBufSize > m_jobMemSize)
289 resizeJobMem(jobBufSize);
291 // make sure job queue is big enough
292 if (jobCount > m_jobQueue.capacity())
294 m_jobQueue.reserve(jobCount);
297 m_jobQueue.resizeNoInitialize(0);
299 void* allocJobMem(int jobSize)
301 btAssert(m_jobMemSize >= (m_allocSize + jobSize));
302 void* jobMem = &m_jobMem[m_allocSize];
303 m_allocSize += jobSize;
306 void submitJob(IJob * job)
308 btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
309 m_jobQueue.push_back(job);
312 m_queueIsEmpty = false;
315 IJob* consumeJobFromOwnQueue()
319 // lock free path. even if this is taken erroneously it isn't harmful
326 job = m_jobQueue[m_headIndex++];
327 btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
328 if (m_headIndex == m_tailIndex)
330 m_queueIsEmpty = true;
338 if (IJob* job = consumeJobFromOwnQueue())
342 // own queue is empty, try to steal from neighbor
343 for (int i = 0; i < m_neighborContexts.size(); ++i)
345 JobQueue* otherContext = m_neighborContexts[i];
346 if (IJob* job = otherContext->consumeJobFromOwnQueue())
355 static void WorkerThreadFunc(void* userPtr)
357 BT_PROFILE("WorkerThreadFunc");
358 ThreadLocalStorage* localStorage = (ThreadLocalStorage*)userPtr;
359 JobQueue* jobQueue = localStorage->m_queue;
361 bool shouldSleep = false;
362 int threadId = localStorage->m_threadId;
366 localStorage->m_mutex.lock();
367 while (IJob* job = jobQueue->consumeJob())
369 localStorage->m_status = WorkerThreadStatus::kWorking;
370 job->executeJob(threadId);
371 localStorage->m_numJobsFinished++;
373 localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
374 localStorage->m_mutex.unlock();
375 btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
376 // while queue is empty,
377 while (jobQueue->isQueueEmpty())
379 // todo: spin wait a bit to avoid hammering the empty queue
381 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep)
386 // if jobs are incoming,
387 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs)
389 clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
393 for (int i = 0; i < 50; ++i)
399 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
404 // if no jobs incoming and queue has been empty for the cooldown time, sleep
405 btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
406 if (timeElapsed > localStorage->m_cooldownTime)
417 localStorage->m_mutex.lock();
418 localStorage->m_status = WorkerThreadStatus::kSleeping;
419 localStorage->m_mutex.unlock();
423 class btTaskSchedulerDefault : public btITaskScheduler
425 btThreadSupportInterface* m_threadSupport;
426 WorkerThreadDirectives* m_workerDirective;
427 btAlignedObjectArray<JobQueue> m_jobQueues;
428 btAlignedObjectArray<JobQueue*> m_perThreadJobQueues;
429 btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage;
430 btSpinMutex m_antiNestingLock; // prevent nested parallel-for
433 int m_numWorkerThreads;
434 int m_numActiveJobQueues;
437 static const int kFirstWorkerThreadId = 1;
440 btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
442 m_threadSupport = NULL;
443 m_workerDirective = NULL;
446 virtual ~btTaskSchedulerDefault()
448 waitForWorkersToSleep();
450 for (int i = 0; i < m_jobQueues.size(); ++i)
452 m_jobQueues[i].exit();
457 delete m_threadSupport;
458 m_threadSupport = NULL;
460 if (m_workerDirective)
462 btAlignedFree(m_workerDirective);
463 m_workerDirective = NULL;
469 btThreadSupportInterface::ConstructionInfo constructionInfo("TaskScheduler", WorkerThreadFunc);
470 m_threadSupport = btThreadSupportInterface::create(constructionInfo);
471 m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64));
473 m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
474 m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
475 m_numThreads = m_maxNumThreads;
476 // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
477 int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
478 int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads - 1) : (m_maxNumThreads / numThreadsPerQueue);
479 m_jobQueues.resize(numJobQueues);
480 m_numActiveJobQueues = numJobQueues;
481 for (int i = 0; i < m_jobQueues.size(); ++i)
483 m_jobQueues[i].init(m_threadSupport, &m_jobQueues);
485 m_perThreadJobQueues.resize(m_numThreads);
486 for (int i = 0; i < m_numThreads; i++)
489 // only worker threads get a job queue
492 if (numThreadsPerQueue == 1)
494 // one queue per worker thread
495 jq = &m_jobQueues[i - kFirstWorkerThreadId];
499 // 2 threads share each queue
500 jq = &m_jobQueues[i / numThreadsPerQueue];
503 m_perThreadJobQueues[i] = jq;
505 m_threadLocalStorage.resize(m_numThreads);
506 for (int i = 0; i < m_numThreads; i++)
508 ThreadLocalStorage& storage = m_threadLocalStorage[i];
509 storage.m_threadId = i;
510 storage.m_directive = m_workerDirective;
511 storage.m_status = WorkerThreadStatus::kSleeping;
512 storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do
513 storage.m_clock = &m_clock;
514 storage.m_queue = m_perThreadJobQueues[i];
516 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep); // no work for them yet
517 setNumThreads(m_threadSupport->getCacheFriendlyNumThreads());
520 void setWorkerDirectives(WorkerThreadDirectives::Type dir)
522 m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
525 virtual int getMaxNumThreads() const BT_OVERRIDE
527 return m_maxNumThreads;
530 virtual int getNumThreads() const BT_OVERRIDE
535 virtual void setNumThreads(int numThreads) BT_OVERRIDE
537 m_numThreads = btMax(btMin(numThreads, int(m_maxNumThreads)), 1);
538 m_numWorkerThreads = m_numThreads - 1;
539 m_numActiveJobQueues = 0;
540 // if there is at least 1 worker,
541 if (m_numWorkerThreads > 0)
543 // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
544 JobQueue* lastActiveContext = m_perThreadJobQueues[m_numThreads - 1];
545 int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
546 m_numActiveJobQueues = iLastActiveContext + 1;
547 for (int i = 0; i < m_jobQueues.size(); ++i)
549 m_jobQueues[i].setupJobStealing(&m_jobQueues, m_numActiveJobQueues);
552 m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
557 BT_PROFILE("waitJobs");
558 // have the main thread work until the job queues are empty
559 int numMainThreadJobsFinished = 0;
560 for (int i = 0; i < m_numActiveJobQueues; ++i)
562 while (IJob* job = m_jobQueues[i].consumeJob())
565 numMainThreadJobsFinished++;
569 // done with jobs for now, tell workers to rest (but not sleep)
570 setWorkerDirectives(WorkerThreadDirectives::kStayAwakeButIdle);
572 btU64 clockStart = m_clock.getTimeMicroseconds();
573 // wait for workers to finish any jobs in progress
576 int numWorkerJobsFinished = 0;
577 for (int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread)
579 ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
580 storage->m_mutex.lock();
581 numWorkerJobsFinished += storage->m_numJobsFinished;
582 storage->m_mutex.unlock();
584 if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
588 btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
589 btAssert(timeElapsed < 1000);
590 if (timeElapsed > 100000)
598 void wakeWorkers(int numWorkersToWake)
600 BT_PROFILE("wakeWorkers");
601 btAssert(m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs);
602 int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
603 int numActiveWorkers = 0;
604 for (int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker)
606 // note this count of active workers is not necessarily totally reliable, because a worker thread could be
607 // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
608 ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
609 if (storage.m_status != WorkerThreadStatus::kSleeping)
614 for (int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker)
616 ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
617 if (storage.m_status == WorkerThreadStatus::kSleeping)
619 m_threadSupport->runTask(iWorker, &storage);
625 void waitForWorkersToSleep()
627 BT_PROFILE("waitForWorkersToSleep");
628 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
629 m_threadSupport->waitForAllTasks();
630 for (int i = kFirstWorkerThreadId; i < m_numThreads; i++)
632 ThreadLocalStorage& storage = m_threadLocalStorage[i];
633 btAssert(storage.m_status == WorkerThreadStatus::kSleeping);
637 virtual void sleepWorkerThreadsHint() BT_OVERRIDE
639 BT_PROFILE("sleepWorkerThreadsHint");
640 // hint the task scheduler that we may not be using these threads for a little while
641 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
644 void prepareWorkerThreads()
646 for (int i = kFirstWorkerThreadId; i < m_numThreads; ++i)
648 ThreadLocalStorage& storage = m_threadLocalStorage[i];
649 storage.m_mutex.lock();
650 storage.m_numJobsFinished = 0;
651 storage.m_mutex.unlock();
653 setWorkerDirectives(WorkerThreadDirectives::kScanForJobs);
656 virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody& body) BT_OVERRIDE
658 BT_PROFILE("parallelFor_ThreadSupport");
659 btAssert(iEnd >= iBegin);
660 btAssert(grainSize >= 1);
661 int iterationCount = iEnd - iBegin;
662 if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock())
664 typedef ParallelForJob JobType;
665 int jobCount = (iterationCount + grainSize - 1) / grainSize;
666 m_numJobs = jobCount;
667 btAssert(jobCount >= 2); // need more than one job for multithreading
668 int jobSize = sizeof(JobType);
670 for (int i = 0; i < m_numActiveJobQueues; ++i)
672 m_jobQueues[i].clearQueue(jobCount, jobSize);
674 // prepare worker threads for incoming work
675 prepareWorkerThreads();
676 // submit all of the jobs
678 int iThread = kFirstWorkerThreadId; // first worker thread
679 for (int i = iBegin; i < iEnd; i += grainSize)
681 btAssert(iJob < jobCount);
682 int iE = btMin(i + grainSize, iEnd);
683 JobQueue* jq = m_perThreadJobQueues[iThread];
685 btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
686 void* jobMem = jq->allocJobMem(jobSize);
687 JobType* job = new (jobMem) ParallelForJob(i, iE, body); // placement new
691 if (iThread >= m_numThreads)
693 iThread = kFirstWorkerThreadId; // first worker thread
696 wakeWorkers(jobCount - 1);
698 // put the main thread to work on emptying the job queue and then wait for all workers to finish
700 m_antiNestingLock.unlock();
704 BT_PROFILE("parallelFor_mainThread");
705 // just run on main thread
706 body.forLoop(iBegin, iEnd);
709 virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body) BT_OVERRIDE
711 BT_PROFILE("parallelSum_ThreadSupport");
712 btAssert(iEnd >= iBegin);
713 btAssert(grainSize >= 1);
714 int iterationCount = iEnd - iBegin;
715 if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock())
717 typedef ParallelSumJob JobType;
718 int jobCount = (iterationCount + grainSize - 1) / grainSize;
719 m_numJobs = jobCount;
720 btAssert(jobCount >= 2); // need more than one job for multithreading
721 int jobSize = sizeof(JobType);
722 for (int i = 0; i < m_numActiveJobQueues; ++i)
724 m_jobQueues[i].clearQueue(jobCount, jobSize);
727 // initialize summation
728 for (int iThread = 0; iThread < m_numThreads; ++iThread)
730 m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
733 // prepare worker threads for incoming work
734 prepareWorkerThreads();
735 // submit all of the jobs
737 int iThread = kFirstWorkerThreadId; // first worker thread
738 for (int i = iBegin; i < iEnd; i += grainSize)
740 btAssert(iJob < jobCount);
741 int iE = btMin(i + grainSize, iEnd);
742 JobQueue* jq = m_perThreadJobQueues[iThread];
744 btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
745 void* jobMem = jq->allocJobMem(jobSize);
746 JobType* job = new (jobMem) ParallelSumJob(i, iE, body, &m_threadLocalStorage[0]); // placement new
750 if (iThread >= m_numThreads)
752 iThread = kFirstWorkerThreadId; // first worker thread
755 wakeWorkers(jobCount - 1);
757 // put the main thread to work on emptying the job queue and then wait for all workers to finish
760 // add up all the thread sums
761 btScalar sum = btScalar(0);
762 for (int iThread = 0; iThread < m_numThreads; ++iThread)
764 sum += m_threadLocalStorage[iThread].m_sumResult;
766 m_antiNestingLock.unlock();
771 BT_PROFILE("parallelSum_mainThread");
772 // just run on main thread
773 return body.sumLoop(iBegin, iEnd);
778 btITaskScheduler* btCreateDefaultTaskScheduler()
780 btTaskSchedulerDefault* ts = new btTaskSchedulerDefault();
785 #else // #if BT_THREADSAFE
787 btITaskScheduler* btCreateDefaultTaskScheduler()
792 #endif // #else // #if BT_THREADSAFE