added v_reduce_sum4() universal intrinsic; corrected number of threads in cv::getNumT...
[platform/upstream/opencv.git] / modules / core / src / parallel.cpp
1 /*M///////////////////////////////////////////////////////////////////////////////////////
2 //
3 //  IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.
4 //
5 //  By downloading, copying, installing or using the software you agree to this license.
6 //  If you do not agree to this license, do not download, install,
7 //  copy or use the software.
8 //
9 //
10 //                           License Agreement
11 //                For Open Source Computer Vision Library
12 //
13 // Copyright (C) 2000-2008, Intel Corporation, all rights reserved.
14 // Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved.
15 // Third party copyrights are property of their respective owners.
16 //
17 // Redistribution and use in source and binary forms, with or without modification,
18 // are permitted provided that the following conditions are met:
19 //
20 //   * Redistribution's of source code must retain the above copyright notice,
21 //     this list of conditions and the following disclaimer.
22 //
23 //   * Redistribution's in binary form must reproduce the above copyright notice,
24 //     this list of conditions and the following disclaimer in the documentation
25 //     and/or other materials provided with the distribution.
26 //
27 //   * The name of the copyright holders may not be used to endorse or promote products
28 //     derived from this software without specific prior written permission.
29 //
30 // This software is provided by the copyright holders and contributors "as is" and
31 // any express or implied warranties, including, but not limited to, the implied
32 // warranties of merchantability and fitness for a particular purpose are disclaimed.
33 // In no event shall the Intel Corporation or contributors be liable for any direct,
34 // indirect, incidental, special, exemplary, or consequential damages
35 // (including, but not limited to, procurement of substitute goods or services;
36 // loss of use, data, or profits; or business interruption) however caused
37 // and on any theory of liability, whether in contract, strict liability,
38 // or tort (including negligence or otherwise) arising in any way out of
39 // the use of this software, even if advised of the possibility of such damage.
40 //
41 //M*/
42
43 #include "precomp.hpp"
44
45 #if defined WIN32 || defined WINCE
46     #include <windows.h>
47     #undef small
48     #undef min
49     #undef max
50     #undef abs
51 #endif
52
53 #if defined __linux__ || defined __APPLE__
54     #include <unistd.h>
55     #include <stdio.h>
56     #include <sys/types.h>
57     #if defined ANDROID
58         #include <sys/sysconf.h>
59     #elif defined __APPLE__
60         #include <sys/sysctl.h>
61     #endif
62 #endif
63
64 #ifdef _OPENMP
65     #define HAVE_OPENMP
66 #endif
67
68 #ifdef __APPLE__
69     #define HAVE_GCD
70 #endif
71
72 #if defined _MSC_VER && _MSC_VER >= 1600
73     #define HAVE_CONCURRENCY
74 #endif
75
76 /* IMPORTANT: always use the same order of defines
77    1. HAVE_TBB         - 3rdparty library, should be explicitly enabled
78    2. HAVE_CSTRIPES    - 3rdparty library, should be explicitly enabled
79    3. HAVE_OPENMP      - integrated to compiler, should be explicitly enabled
80    4. HAVE_GCD         - system wide, used automatically        (APPLE only)
81    5. WINRT            - system wide, used automatically        (Windows RT only)
82    6. HAVE_CONCURRENCY - part of runtime, used automatically    (Windows only - MSVS 10, MSVS 11)
83    7. HAVE_PTHREADS_PF - pthreads if available
84 */
85
86 #if defined HAVE_TBB
87     #include "tbb/tbb.h"
88     #include "tbb/task.h"
89     #include "tbb/tbb_stddef.h"
90     #if TBB_INTERFACE_VERSION >= 8000
91         #include "tbb/task_arena.h"
92     #endif
93     #undef min
94     #undef max
95 #elif defined HAVE_CSTRIPES
96     #include "C=.h"
97     #undef shared
98 #elif defined HAVE_OPENMP
99     #include <omp.h>
100 #elif defined HAVE_GCD
101     #include <dispatch/dispatch.h>
102     #include <pthread.h>
103 #elif defined WINRT && _MSC_VER < 1900
104     #include <ppltasks.h>
105 #elif defined HAVE_CONCURRENCY
106     #include <ppl.h>
107 #endif
108
109
110 #if defined HAVE_TBB
111 #  define CV_PARALLEL_FRAMEWORK "tbb"
112 #elif defined HAVE_CSTRIPES
113 #  define CV_PARALLEL_FRAMEWORK "cstripes"
114 #elif defined HAVE_OPENMP
115 #  define CV_PARALLEL_FRAMEWORK "openmp"
116 #elif defined HAVE_GCD
117 #  define CV_PARALLEL_FRAMEWORK "gcd"
118 #elif defined WINRT
119 #  define CV_PARALLEL_FRAMEWORK "winrt-concurrency"
120 #elif defined HAVE_CONCURRENCY
121 #  define CV_PARALLEL_FRAMEWORK "ms-concurrency"
122 #elif defined HAVE_PTHREADS_PF
123 #  define CV_PARALLEL_FRAMEWORK "pthreads"
124 #endif
125
126 namespace cv
127 {
128     ParallelLoopBody::~ParallelLoopBody() {}
129 #ifdef HAVE_PTHREADS_PF
130     void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
131     size_t parallel_pthreads_get_threads_num();
132     void parallel_pthreads_set_threads_num(int num);
133 #endif
134 }
135
136
137 namespace
138 {
139 #ifdef CV_PARALLEL_FRAMEWORK
140 #ifdef ENABLE_INSTRUMENTATION
141     static void SyncNodes(cv::instr::InstrNode *pNode)
142     {
143         std::vector<cv::instr::NodeDataTls*> data;
144         pNode->m_payload.m_tls.gather(data);
145
146         uint64 ticksMax = 0;
147         int    threads  = 0;
148         for(size_t i = 0; i < data.size(); i++)
149         {
150             if(data[i] && data[i]->m_ticksTotal)
151             {
152                 ticksMax = MAX(ticksMax, data[i]->m_ticksTotal);
153                 pNode->m_payload.m_ticksTotal -= data[i]->m_ticksTotal;
154                 data[i]->m_ticksTotal = 0;
155                 threads++;
156             }
157         }
158         pNode->m_payload.m_ticksTotal += ticksMax;
159         pNode->m_payload.m_threads = MAX(pNode->m_payload.m_threads, threads);
160
161         for(size_t i = 0; i < pNode->m_childs.size(); i++)
162             SyncNodes(pNode->m_childs[i]);
163     }
164 #endif
165
166     class ParallelLoopBodyWrapper : public cv::ParallelLoopBody
167     {
168     public:
169         ParallelLoopBodyWrapper(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes) :
170             is_rng_used(false)
171         {
172
173             body = &_body;
174             wholeRange = _r;
175             double len = wholeRange.end - wholeRange.start;
176             nstripes = cvRound(_nstripes <= 0 ? len : MIN(MAX(_nstripes, 1.), len));
177
178             // propagate main thread state
179             rng = cv::theRNG();
180
181 #ifdef ENABLE_INSTRUMENTATION
182             pThreadRoot = cv::instr::getInstrumentTLSStruct().pCurrentNode;
183 #endif
184         }
185         ~ParallelLoopBodyWrapper()
186         {
187 #ifdef ENABLE_INSTRUMENTATION
188             for(size_t i = 0; i < pThreadRoot->m_childs.size(); i++)
189                 SyncNodes(pThreadRoot->m_childs[i]);
190 #endif
191             if (is_rng_used)
192             {
193                 // Some parallel backends execute nested jobs in the main thread,
194                 // so we need to restore initial RNG state here.
195                 cv::theRNG() = rng;
196                 // We can't properly update RNG state based on RNG usage in worker threads,
197                 // so lets just change main thread RNG state to the next value.
198                 // Note: this behaviour is not equal to single-threaded mode.
199                 cv::theRNG().next();
200             }
201         }
202         void operator()(const cv::Range& sr) const
203         {
204 #ifdef ENABLE_INSTRUMENTATION
205             {
206                 cv::instr::InstrTLSStruct *pInstrTLS = &cv::instr::getInstrumentTLSStruct();
207                 pInstrTLS->pCurrentNode = pThreadRoot; // Initialize TLS node for thread
208             }
209 #endif
210             CV_INSTRUMENT_REGION()
211
212             // propagate main thread state
213             cv::theRNG() = rng;
214
215             cv::Range r;
216             r.start = (int)(wholeRange.start +
217                             ((uint64)sr.start*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes);
218             r.end = sr.end >= nstripes ? wholeRange.end : (int)(wholeRange.start +
219                             ((uint64)sr.end*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes);
220             (*body)(r);
221
222             if (!is_rng_used && !(cv::theRNG() == rng))
223                 is_rng_used = true;
224         }
225         cv::Range stripeRange() const { return cv::Range(0, nstripes); }
226
227     protected:
228         const cv::ParallelLoopBody* body;
229         cv::Range wholeRange;
230         int nstripes;
231         cv::RNG rng;
232         mutable bool is_rng_used;
233 #ifdef ENABLE_INSTRUMENTATION
234         cv::instr::InstrNode *pThreadRoot;
235 #endif
236     };
237
238 #if defined HAVE_TBB
239     class ProxyLoopBody : public ParallelLoopBodyWrapper
240     {
241     public:
242         ProxyLoopBody(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes)
243         : ParallelLoopBodyWrapper(_body, _r, _nstripes)
244         {}
245
246         void operator ()(const tbb::blocked_range<int>& range) const
247         {
248             this->ParallelLoopBodyWrapper::operator()(cv::Range(range.begin(), range.end()));
249         }
250     };
251 #elif defined HAVE_CSTRIPES || defined HAVE_OPENMP
252     typedef ParallelLoopBodyWrapper ProxyLoopBody;
253 #elif defined HAVE_GCD
254     typedef ParallelLoopBodyWrapper ProxyLoopBody;
255     static void block_function(void* context, size_t index)
256     {
257         ProxyLoopBody* ptr_body = static_cast<ProxyLoopBody*>(context);
258         (*ptr_body)(cv::Range((int)index, (int)index + 1));
259     }
260 #elif defined WINRT || defined HAVE_CONCURRENCY
261     class ProxyLoopBody : public ParallelLoopBodyWrapper
262     {
263     public:
264         ProxyLoopBody(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes)
265         : ParallelLoopBodyWrapper(_body, _r, _nstripes)
266         {}
267
268         void operator ()(int i) const
269         {
270             this->ParallelLoopBodyWrapper::operator()(cv::Range(i, i + 1));
271         }
272     };
273 #else
274     typedef ParallelLoopBodyWrapper ProxyLoopBody;
275 #endif
276
277 static int numThreads = -1;
278
279 #if defined HAVE_TBB
280 static tbb::task_scheduler_init tbbScheduler(tbb::task_scheduler_init::deferred);
281 #elif defined HAVE_CSTRIPES
282 // nothing for C=
283 #elif defined HAVE_OPENMP
284 static int numThreadsMax = omp_get_max_threads();
285 #elif defined HAVE_GCD
286 // nothing for GCD
287 #elif defined WINRT
288 // nothing for WINRT
289 #elif defined HAVE_CONCURRENCY
290
291 class SchedPtr
292 {
293     Concurrency::Scheduler* sched_;
294 public:
295     Concurrency::Scheduler* operator->() { return sched_; }
296     operator Concurrency::Scheduler*() { return sched_; }
297
298     void operator=(Concurrency::Scheduler* sched)
299     {
300         if (sched_) sched_->Release();
301         sched_ = sched;
302     }
303
304     SchedPtr() : sched_(0) {}
305     ~SchedPtr() {}
306 };
307 static SchedPtr pplScheduler;
308
309 #endif
310
311 #endif // CV_PARALLEL_FRAMEWORK
312
313 } //namespace
314
315 /* ================================   parallel_for_  ================================ */
316
317 void cv::parallel_for_(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
318 {
319     CV_INSTRUMENT_REGION_MT_FORK()
320     if (range.empty())
321         return;
322
323 #ifdef CV_PARALLEL_FRAMEWORK
324
325     if(numThreads != 0)
326     {
327         ProxyLoopBody pbody(body, range, nstripes);
328         cv::Range stripeRange = pbody.stripeRange();
329         if( stripeRange.end - stripeRange.start == 1 )
330         {
331             body(range);
332             return;
333         }
334
335 #if defined HAVE_TBB
336
337         tbb::parallel_for(tbb::blocked_range<int>(stripeRange.start, stripeRange.end), pbody);
338
339 #elif defined HAVE_CSTRIPES
340
341         parallel(MAX(0, numThreads))
342         {
343             int offset = stripeRange.start;
344             int len = stripeRange.end - offset;
345             Range r(offset + CPX_RANGE_START(len), offset + CPX_RANGE_END(len));
346             pbody(r);
347             barrier();
348         }
349
350 #elif defined HAVE_OPENMP
351
352         #pragma omp parallel for schedule(dynamic) num_threads(numThreads > 0 ? numThreads : numThreadsMax)
353         for (int i = stripeRange.start; i < stripeRange.end; ++i)
354             pbody(Range(i, i + 1));
355
356 #elif defined HAVE_GCD
357
358         dispatch_queue_t concurrent_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
359         dispatch_apply_f(stripeRange.end - stripeRange.start, concurrent_queue, &pbody, block_function);
360
361 #elif defined WINRT
362
363         Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
364
365 #elif defined HAVE_CONCURRENCY
366
367         if(!pplScheduler || pplScheduler->Id() == Concurrency::CurrentScheduler::Id())
368         {
369             Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
370         }
371         else
372         {
373             pplScheduler->Attach();
374             Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
375             Concurrency::CurrentScheduler::Detach();
376         }
377
378 #elif defined HAVE_PTHREADS_PF
379
380         parallel_for_pthreads(pbody.stripeRange(), pbody, pbody.stripeRange().size());
381
382 #else
383
384 #error You have hacked and compiling with unsupported parallel framework
385
386 #endif
387
388     }
389     else
390
391 #endif // CV_PARALLEL_FRAMEWORK
392     {
393         (void)nstripes;
394         body(range);
395     }
396 }
397
398 int cv::getNumThreads(void)
399 {
400 #ifdef CV_PARALLEL_FRAMEWORK
401
402     if(numThreads == 0)
403         return 1;
404
405 #endif
406
407 #if defined HAVE_TBB
408
409     return tbbScheduler.is_active()
410            ? numThreads
411            : tbb::task_scheduler_init::default_num_threads();
412
413 #elif defined HAVE_CSTRIPES
414
415     return numThreads > 0
416             ? numThreads
417             : cv::getNumberOfCPUs();
418
419 #elif defined HAVE_OPENMP
420
421     return numThreads > 0
422            ? numThreads
423            : numThreadsMax;
424
425
426 #elif defined HAVE_GCD
427
428     return cv::getNumberOfCPUs(); // the GCD thread pool limit
429
430 #elif defined WINRT
431
432     return 0;
433
434 #elif defined HAVE_CONCURRENCY
435
436     return 1 + (pplScheduler == 0
437         ? Concurrency::CurrentScheduler::Get()->GetNumberOfVirtualProcessors()
438         : pplScheduler->GetNumberOfVirtualProcessors());
439
440 #elif defined HAVE_PTHREADS_PF
441
442         return parallel_pthreads_get_threads_num();
443
444 #else
445
446     return 1;
447
448 #endif
449 }
450
451 void cv::setNumThreads( int threads )
452 {
453     (void)threads;
454 #ifdef CV_PARALLEL_FRAMEWORK
455     numThreads = threads;
456 #endif
457
458 #ifdef HAVE_TBB
459
460     if(tbbScheduler.is_active()) tbbScheduler.terminate();
461     if(threads > 0) tbbScheduler.initialize(threads);
462
463 #elif defined HAVE_CSTRIPES
464
465     return; // nothing needed
466
467 #elif defined HAVE_OPENMP
468
469     return; // nothing needed as num_threads clause is used in #pragma omp parallel for
470
471 #elif defined HAVE_GCD
472
473     // unsupported
474     // there is only private dispatch_queue_set_width() and only for desktop
475
476 #elif defined WINRT
477
478     return;
479
480 #elif defined HAVE_CONCURRENCY
481
482     if (threads <= 0)
483     {
484         pplScheduler = 0;
485     }
486     else if (threads == 1)
487     {
488         // Concurrency always uses >=2 threads, so we just disable it if 1 thread is requested
489         numThreads = 0;
490     }
491     else if (pplScheduler == 0 || 1 + pplScheduler->GetNumberOfVirtualProcessors() != (unsigned int)threads)
492     {
493         pplScheduler = Concurrency::Scheduler::Create(Concurrency::SchedulerPolicy(2,
494                        Concurrency::MinConcurrency, threads-1,
495                        Concurrency::MaxConcurrency, threads-1));
496     }
497
498 #elif defined HAVE_PTHREADS_PF
499
500     parallel_pthreads_set_threads_num(threads);
501
502 #endif
503 }
504
505
506 int cv::getThreadNum(void)
507 {
508 #if defined HAVE_TBB
509     #if TBB_INTERFACE_VERSION >= 9100
510         return tbb::this_task_arena::current_thread_index();
511     #elif TBB_INTERFACE_VERSION >= 8000
512         return tbb::task_arena::current_thread_index();
513     #else
514         return 0;
515     #endif
516 #elif defined HAVE_CSTRIPES
517     return pix();
518 #elif defined HAVE_OPENMP
519     return omp_get_thread_num();
520 #elif defined HAVE_GCD
521     return (int)(size_t)(void*)pthread_self(); // no zero-based indexing
522 #elif defined WINRT
523     return 0;
524 #elif defined HAVE_CONCURRENCY
525     return std::max(0, (int)Concurrency::Context::VirtualProcessorId()); // zero for master thread, unique number for others but not necessary 1,2,3,...
526 #elif defined HAVE_PTHREADS_PF
527     return (int)(size_t)(void*)pthread_self(); // no zero-based indexing
528 #else
529     return 0;
530 #endif
531 }
532
533 #ifdef ANDROID
534 static inline int getNumberOfCPUsImpl()
535 {
536    FILE* cpuPossible = fopen("/sys/devices/system/cpu/possible", "r");
537    if(!cpuPossible)
538        return 1;
539
540    char buf[2000]; //big enough for 1000 CPUs in worst possible configuration
541    char* pbuf = fgets(buf, sizeof(buf), cpuPossible);
542    fclose(cpuPossible);
543    if(!pbuf)
544       return 1;
545
546    //parse string of form "0-1,3,5-7,10,13-15"
547    int cpusAvailable = 0;
548
549    while(*pbuf)
550    {
551       const char* pos = pbuf;
552       bool range = false;
553       while(*pbuf && *pbuf != ',')
554       {
555           if(*pbuf == '-') range = true;
556           ++pbuf;
557       }
558       if(*pbuf) *pbuf++ = 0;
559       if(!range)
560         ++cpusAvailable;
561       else
562       {
563           int rstart = 0, rend = 0;
564           sscanf(pos, "%d-%d", &rstart, &rend);
565           cpusAvailable += rend - rstart + 1;
566       }
567
568    }
569    return cpusAvailable ? cpusAvailable : 1;
570 }
571 #endif
572
573 int cv::getNumberOfCPUs(void)
574 {
575 #if defined WIN32 || defined _WIN32
576     SYSTEM_INFO sysinfo;
577 #if (defined(_M_ARM) || defined(_M_X64) || defined(WINRT)) && _WIN32_WINNT >= 0x501
578     GetNativeSystemInfo( &sysinfo );
579 #else
580     GetSystemInfo( &sysinfo );
581 #endif
582
583     return (int)sysinfo.dwNumberOfProcessors;
584 #elif defined ANDROID
585     static int ncpus = getNumberOfCPUsImpl();
586     return ncpus;
587 #elif defined __linux__
588     return (int)sysconf( _SC_NPROCESSORS_ONLN );
589 #elif defined __APPLE__
590     int numCPU=0;
591     int mib[4];
592     size_t len = sizeof(numCPU);
593
594     /* set the mib for hw.ncpu */
595     mib[0] = CTL_HW;
596     mib[1] = HW_AVAILCPU;  // alternatively, try HW_NCPU;
597
598     /* get the number of CPUs from the system */
599     sysctl(mib, 2, &numCPU, &len, NULL, 0);
600
601     if( numCPU < 1 )
602     {
603         mib[1] = HW_NCPU;
604         sysctl( mib, 2, &numCPU, &len, NULL, 0 );
605
606         if( numCPU < 1 )
607             numCPU = 1;
608     }
609
610     return (int)numCPU;
611 #else
612     return 1;
613 #endif
614 }
615
616 const char* cv::currentParallelFramework() {
617 #ifdef CV_PARALLEL_FRAMEWORK
618     return CV_PARALLEL_FRAMEWORK;
619 #else
620     return NULL;
621 #endif
622 }
623
624 CV_IMPL void cvSetNumThreads(int nt)
625 {
626     cv::setNumThreads(nt);
627 }
628
629 CV_IMPL int cvGetNumThreads()
630 {
631     return cv::getNumThreads();
632 }
633
634 CV_IMPL int cvGetThreadNum()
635 {
636     return cv::getThreadNum();
637 }