Publishing 2020.1 content
[platform/upstream/dldt.git] / inference-engine / include / ie_parallel.hpp
1 // Copyright (C) 2018-2020 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4
5 /**
6  * @brief Contains declarations and definitions for sequential and multi-threading implementations.
7  *
8  * Multi-threading support is implemented in two variants: using the Threading Building Blocks library and OpenMP*
9  * product. To build a particular implementation, use the corresponding identifier: IE_THREAD_TBB, IE_THREAD_TBB_AUTO,
10  * IE_THREAD_OMP or IE_THREAD_SEQ.
11  *
12  *  @file ie_parallel.hpp
13  */
14
15 #pragma once
16
17 #include <cstddef>
18
19 #define IE_THREAD_TBB 0
20 #define IE_THREAD_OMP 1
21 #define IE_THREAD_SEQ 2
22 #define IE_THREAD_TBB_AUTO 3
23
24 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
25 #define TBB_PREVIEW_LOCAL_OBSERVER 1
26 #ifndef TBB_PREVIEW_NUMA_SUPPORT
27 #define TBB_PREVIEW_NUMA_SUPPORT 1
28 #endif
29 #include "tbb/blocked_range.h"
30 #include "tbb/blocked_range2d.h"
31 #include "tbb/blocked_range3d.h"
32 #include "tbb/parallel_for.h"
33 #include "tbb/parallel_reduce.h"
34 #include "tbb/parallel_sort.h"
35 #include "tbb/task_arena.h"
36 #include "tbb/task_scheduler_observer.h"
37
38 inline int parallel_get_max_threads() {
39     return tbb::this_task_arena::max_concurrency();
40 }
41 inline int parallel_get_num_threads() {
42     return parallel_get_max_threads();
43 }
44 inline int parallel_get_thread_num() {
45     return tbb::this_task_arena::current_thread_index();
46 }
47 inline void parallel_set_num_threads(int n) {
48     return;
49 }
50 inline int parallel_get_env_threads() {
51     return 0;
52 }
53 #if IE_THREAD == IE_THREAD_TBB
54 #define PARTITIONING , tbb::static_partitioner()
55 #else
56 #define PARTITIONING
57 #endif
58 #elif IE_THREAD == IE_THREAD_OMP
59 #include <omp.h>
60
61 #include <algorithm>
62 #include <cstdlib>
63 #include <string>
64
65 /* MSVC still supports omp 2.0 only */
66 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
67 #define collapse(x)
68 #endif  // defined(_MSC_VER) && !defined(__INTEL_COMPILER)
69 inline int parallel_get_max_threads() {
70     return omp_get_max_threads();
71 }
72 inline int parallel_get_num_threads() {
73     return omp_get_num_threads();
74 }
75 inline int parallel_get_thread_num() {
76     return omp_get_thread_num();
77 }
78 inline void parallel_set_num_threads(int n) {
79     omp_set_num_threads(n);
80 }
81 inline int parallel_get_env_threads() {
82     int env_cores = 0;
83     if (getenv("OMP_NUM_THREADS") != nullptr) {
84         try {
85             env_cores = std::stoi(getenv("OMP_NUM_THREADS"));
86         } catch (const std::exception&) {
87             env_cores = 0;
88         }
89     }
90     return env_cores;
91 }
92
93 #elif IE_THREAD == IE_THREAD_SEQ
94 #include <algorithm>  // NOLINT
95 inline int parallel_get_env_threads() {
96     return 1;
97 }
98 inline int parallel_get_max_threads() {
99     return 1;
100 }
101 inline int parallel_get_num_threads() {
102     return 1;
103 }
104 inline int parallel_get_thread_num() {
105     return 0;
106 }
107 inline void parallel_set_num_threads(int n) {
108     return;
109 }
110 #endif
111
112 namespace InferenceEngine {
113
114 template <typename F>
115 void parallel_nt(int nthr, const F& func) {
116 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
117     if (nthr == 0) nthr = parallel_get_max_threads();
118     if (nthr == 1) {
119         func(0, 1);
120         return;
121     }
122
123     tbb::parallel_for(0, nthr, [&](int ithr) {
124         func(ithr, nthr);
125     });
126 #elif IE_THREAD == IE_THREAD_OMP
127     if (nthr == 1) {
128         func(0, 1);
129         return;
130     }
131
132 #pragma omp parallel num_threads(nthr)
133     func(parallel_get_thread_num(), parallel_get_num_threads());
134 #elif IE_THREAD == IE_THREAD_SEQ
135     func(0, 1);
136 #endif
137 }
138
139 template <typename F>
140 void parallel_nt_static(int nthr, const F& func) {
141 #if IE_THREAD == IE_THREAD_SEQ
142     const bool serial = true;
143 #else
144     const bool serial = false;
145 #endif
146
147     if (serial || nthr == 1) {
148         func(0, 1);
149         return;
150     }
151
152     if (nthr == 0) nthr = parallel_get_max_threads();
153 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
154     tbb::parallel_for(
155         0, nthr,
156         [&](int ithr) {
157             func(ithr, nthr);
158         },
159         tbb::static_partitioner {});
160
161 #elif IE_THREAD == IE_THREAD_OMP
162
163 #pragma omp parallel num_threads(nthr)
164     { func(parallel_get_thread_num(), parallel_get_num_threads()); }
165 #endif
166 }
167
168 template <typename I, typename F>
169 void parallel_sort(I begin, I end, const F& comparator) {
170 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
171     tbb::parallel_sort(begin, end, comparator);
172 #elif IE_THREAD == IE_THREAD_OMP
173     // TODO: propose OpenMP version
174     std::sort(begin, end, comparator);
175 #elif IE_THREAD == IE_THREAD_SEQ
176     std::sort(begin, end, comparator);
177 #endif
178 }
179
180 template <typename T0, typename R, typename F>
181 R parallel_sum(const T0& D0, const R& input, const F& func) {
182 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
183     return tbb::parallel_reduce(
184         tbb::blocked_range<T0>(0, D0), input,
185         [&](const tbb::blocked_range<T0>& r, R init) -> R {
186             R sum = init;
187             for (T0 dim1 = r.begin(); dim1 < r.end(); ++dim1) sum += func(dim1);
188             return sum;
189         },
190         [](R x, R y) -> R {
191             return x + y;
192         } PARTITIONING);
193 #else
194     R sum = input;
195
196 #ifdef _MSC_VER
197     using T0_IT = typename std::make_signed<T0>::type;
198 #else
199     using T0_IT = T0;
200 #endif
201
202 #if IE_THREAD == IE_THREAD_OMP
203 #pragma omp parallel for reduction(+ : sum) schedule(static)
204 #endif
205     for (T0_IT dim1 = 0; dim1 < static_cast<T0_IT>(D0); dim1++) {
206         sum += static_cast<R>(func(dim1));
207     }
208     return sum;
209 #endif
210 }
211
212 template <typename T0, typename T1, typename R, typename F>
213 R parallel_sum2d(const T0& D0, const T1& D1, const R& input, const F& func) {
214 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
215     return tbb::parallel_reduce(
216         tbb::blocked_range2d<T0, T1>(0, D0, 0, D1), input,
217         [&](const tbb::blocked_range2d<T0, T1>& r, R init) -> R {
218             R sum = init;
219             for (T0 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
220                 for (T1 dim1 = r.cols().begin(); dim1 < r.cols().end(); dim1++) {
221                     sum += func(dim2, dim1);
222                 }
223             }
224             return sum;
225         },
226         [](R x, R y) -> R {
227             return x + y;
228         } PARTITIONING);
229 #else
230     R sum = input;
231
232 #ifdef _MSC_VER
233     using T0_IT = typename std::make_signed<T0>::type;
234     using T1_IT = typename std::make_signed<T1>::type;
235 #else
236     using T0_IT = T0;
237     using T1_IT = T1;
238 #endif
239
240 #if IE_THREAD == IE_THREAD_OMP
241 #pragma omp parallel for collapse(2) reduction(+ : sum) schedule(static)
242 #endif
243     for (T0_IT dim2 = 0; dim2 < D0; dim2++) {
244         for (T1_IT dim1 = 0; dim1 < D1; dim1++) {
245             sum += func(dim2, dim1);
246         }
247     }
248     return sum;
249 #endif
250 }
251 template <typename T0, typename T1, typename T2, typename R, typename F>
252 R parallel_sum3d(const T0& D0, const T1& D1, const T2& D2, const R& input, const F& func) {
253 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
254     return tbb::parallel_reduce(
255         tbb::blocked_range3d<T0, T1, T2>(0, D0, 0, D1, 0, D2), input,
256         [&](const tbb::blocked_range3d<T0, T1, T2>& r, R init) -> R {
257             R sum = init;
258             for (T0 dim1 = r.pages().begin(); dim1 < r.pages().end(); dim1++) {
259                 for (T1 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
260                     for (T2 dim3 = r.cols().begin(); dim3 < r.cols().end(); dim3++) {
261                         sum += func(dim1, dim2, dim3);
262                     }
263                 }
264             }
265             return sum;
266         },
267         [](R x, R y) -> R {
268             return x + y;
269         } PARTITIONING);
270 #else
271     R sum = input;
272
273 #ifdef _MSC_VER
274     using T0_IT = typename std::make_signed<T0>::type;
275     using T1_IT = typename std::make_signed<T1>::type;
276     using T2_IT = typename std::make_signed<T2>::type;
277 #else
278     using T0_IT = T0;
279     using T1_IT = T1;
280     using T2_IT = T2;
281 #endif
282
283 #if IE_THREAD == IE_THREAD_OMP
284 #pragma omp parallel for collapse(3) reduction(+ : sum) schedule(static)
285 #endif
286     for (T0_IT dim1 = 0; dim1 < static_cast<T0_IT>(D0); dim1++) {
287         for (T1_IT dim2 = 0; dim2 < static_cast<T1_IT>(D1); dim2++) {
288             for (T2_IT dim3 = 0; dim3 < static_cast<T2_IT>(D2); dim3++) {
289                 sum += func(dim1, dim2, dim3);
290             }
291         }
292     }
293     return sum;
294 #endif
295 }
296
297 template <typename T>
298 inline T parallel_it_init(T start) {
299     return start;
300 }
301 template <typename T, typename Q, typename R, typename... Args>
302 inline T parallel_it_init(T start, Q& x, const R& X, Args&&... tuple) {
303     start = parallel_it_init(start, static_cast<Args>(tuple)...);
304     x = start % X;
305     return start / X;
306 }
307
308 inline bool parallel_it_step() {
309     return true;
310 }
311 template <typename Q, typename R, typename... Args>
312 inline bool parallel_it_step(Q& x, const R& X, Args&&... tuple) {
313     if (parallel_it_step(static_cast<Args>(tuple)...)) {
314         x = (x + 1) % X;
315         return x == 0;
316     }
317     return false;
318 }
319
320 template <typename T, typename Q>
321 inline void splitter(const T& n, const Q& team, const Q& tid, T& n_start, T& n_end) {
322     if (team <= 1 || n == 0) {
323         n_start = 0;
324         n_end = n;
325     } else {
326         T n1 = (n + (T)team - 1) / (T)team;
327         T n2 = n1 - 1;
328         T T1 = n - n2 * (T)team;
329         n_end = (T)tid < T1 ? n1 : n2;
330         n_start = (T)tid <= T1 ? tid * n1 : T1 * n1 + ((T)tid - T1) * n2;
331     }
332
333     n_end += n_start;
334 }
335
336 template <typename T0, typename F>
337 void for_1d(const int& ithr, const int& nthr, const T0& D0, const F& func) {
338     T0 d0 {0}, end {0};
339     splitter(D0, nthr, ithr, d0, end);
340     for (; d0 < end; ++d0) func(d0);
341 }
342
343 template <typename T0, typename F>
344 void parallel_for(const T0& D0, const F& func) {
345 #if IE_THREAD == IE_THREAD_TBB
346     auto work_amount = static_cast<size_t>(D0);
347     int nthr = parallel_get_max_threads();
348     if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
349     if (nthr == 1) {
350         for_1d(0, 1, D0, func);
351     } else {
352         tbb::parallel_for(
353             0, nthr,
354             [&](int ithr) {
355                 for_1d(ithr, nthr, D0, func);
356             },
357             tbb::static_partitioner());
358     }
359 #elif IE_THREAD == IE_THREAD_TBB_AUTO
360     const int nthr = parallel_get_max_threads();
361     tbb::parallel_for(0, nthr, [&](int ithr) {
362         for_1d(ithr, nthr, D0, func);
363     });
364 #elif IE_THREAD == IE_THREAD_OMP
365 #pragma omp parallel
366     for_1d(parallel_get_thread_num(), parallel_get_num_threads(), D0, func);
367 #elif IE_THREAD == IE_THREAD_SEQ
368     for_1d(0, 1, D0, func);
369 #endif
370 }
371
372 template <typename T0, typename T1, typename F>
373 void for_2d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const F& func) {
374     const size_t work_amount = (size_t)D0 * D1;
375     if (work_amount == 0) return;
376     size_t start {0}, end {0};
377     splitter(work_amount, nthr, ithr, start, end);
378
379     T0 d0 {0};
380     T1 d1 {0};
381     parallel_it_init(start, d0, D0, d1, D1);
382     for (size_t iwork = start; iwork < end; ++iwork) {
383         func(d0, d1);
384         parallel_it_step(d0, D0, d1, D1);
385     }
386 }
387
388 template <typename T0, typename T1, typename F>
389 void parallel_for2d(const T0& D0, const T1& D1, const F& func) {
390 #if IE_THREAD == IE_THREAD_TBB
391     auto work_amount = static_cast<size_t>(D0 * D1);
392     int nthr = parallel_get_max_threads();
393     if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
394     if (nthr == 1) {
395         for_2d(0, 1, D0, D1, func);
396     } else {
397         tbb::parallel_for(
398             0, nthr,
399             [&](int ithr) {
400                 for_2d(ithr, nthr, D0, D1, func);
401             },
402             tbb::static_partitioner());
403     }
404 #elif IE_THREAD == IE_THREAD_TBB_AUTO
405     const int nthr = parallel_get_max_threads();
406     tbb::parallel_for(0, nthr, [&](int ithr) {
407         for_2d(ithr, nthr, D0, D1, func);
408     });
409 #elif IE_THREAD == IE_THREAD_OMP
410 #pragma omp parallel
411     for_2d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, func);
412 #elif IE_THREAD == IE_THREAD_SEQ
413     for_2d(0, 1, D0, D1, func);
414 #endif
415 }
416
417 template <typename T0, typename T1, typename T2, typename F>
418 void for_3d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const T2& D2, const F& func) {
419     const size_t work_amount = (size_t)D0 * D1 * D2;
420     if (work_amount == 0) return;
421     size_t start {0}, end {0};
422     splitter(work_amount, nthr, ithr, start, end);
423
424     T0 d0 {0};
425     T1 d1 {0};
426     T2 d2 {0};
427     parallel_it_init(start, d0, D0, d1, D1, d2, D2);
428     for (size_t iwork = start; iwork < end; ++iwork) {
429         func(d0, d1, d2);
430         parallel_it_step(d0, D0, d1, D1, d2, D2);
431     }
432 }
433
434 template <typename T0, typename T1, typename T2, typename F>
435 void parallel_for3d(const T0& D0, const T1& D1, const T2& D2, const F& func) {
436 #if IE_THREAD == IE_THREAD_TBB
437     auto work_amount = static_cast<size_t>(D0 * D1 * D2);
438     int nthr = parallel_get_max_threads();
439     if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
440     if (nthr == 1) {
441         for_3d(0, 1, D0, D1, D2, func);
442     } else {
443         tbb::parallel_for(
444             0, nthr,
445             [&](int ithr) {
446                 for_3d(ithr, nthr, D0, D1, D2, func);
447             },
448             tbb::static_partitioner());
449     }
450 #elif IE_THREAD == IE_THREAD_TBB_AUTO
451     const int nthr = parallel_get_max_threads();
452     tbb::parallel_for(0, nthr, [&](int ithr) {
453         for_3d(ithr, nthr, D0, D1, D2, func);
454     });
455 #elif IE_THREAD == IE_THREAD_OMP
456 #pragma omp parallel
457     for_3d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, func);
458 #elif IE_THREAD == IE_THREAD_SEQ
459     for_3d(0, 1, D0, D1, D2, func);
460 #endif
461 }
462
463 template <typename T0, typename T1, typename T2, typename T3, typename F>
464 void for_4d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const T2& D2, const T3& D3, const F& func) {
465     const size_t work_amount = (size_t)D0 * D1 * D2 * D3;
466     if (work_amount == 0) return;
467     size_t start {0}, end {0};
468     splitter(work_amount, nthr, ithr, start, end);
469
470     T0 d0 {0};
471     T1 d1 {0};
472     T2 d2 {0};
473     T3 d3 {0};
474     parallel_it_init(start, d0, D0, d1, D1, d2, D2, d3, D3);
475     for (size_t iwork = start; iwork < end; ++iwork) {
476         func(d0, d1, d2, d3);
477         parallel_it_step(d0, D0, d1, D1, d2, D2, d3, D3);
478     }
479 }
480
481 template <typename T0, typename T1, typename T2, typename T3, typename F>
482 void parallel_for4d(const T0& D0, const T1& D1, const T2& D2, const T3& D3, const F& func) {
483 #if IE_THREAD == IE_THREAD_TBB
484     auto work_amount = static_cast<size_t>(D0 * D1 * D2 * D3);
485     int nthr = parallel_get_max_threads();
486     if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
487     if (nthr == 1) {
488         for_4d(0, 1, D0, D1, D2, D3, func);
489     } else {
490         tbb::parallel_for(
491             0, nthr,
492             [&](int ithr) {
493                 for_4d(ithr, nthr, D0, D1, D2, D3, func);
494             },
495             tbb::static_partitioner());
496     }
497 #elif IE_THREAD == IE_THREAD_TBB_AUTO
498     const int nthr = parallel_get_max_threads();
499     tbb::parallel_for(0, nthr, [&](int ithr) {
500         for_4d(ithr, nthr, D0, D1, D2, D3, func);
501     });
502 #elif IE_THREAD == IE_THREAD_OMP
503 #pragma omp parallel
504     for_4d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, D3, func);
505 #elif IE_THREAD == IE_THREAD_SEQ
506     for_4d(0, 1, D0, D1, D2, D3, func);
507 #endif
508 }
509
510 template <typename T0, typename T1, typename T2, typename T3, typename T4, typename F>
511 void for_5d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const T2& D2, const T3& D3, const T4& D4,
512             const F& func) {
513     const size_t work_amount = (size_t)D0 * D1 * D2 * D3 * D4;
514     if (work_amount == 0) return;
515     size_t start {0}, end {0};
516     splitter(work_amount, nthr, ithr, start, end);
517
518     T0 d0 {0};
519     T1 d1 {0};
520     T2 d2 {0};
521     T3 d3 {0};
522     T4 d4 {0};
523     parallel_it_init(start, d0, D0, d1, D1, d2, D2, d3, D3, d4, D4);
524     for (size_t iwork = start; iwork < end; ++iwork) {
525         func(d0, d1, d2, d3, d4);
526         parallel_it_step(d0, D0, d1, D1, d2, D2, d3, D3, d4, D4);
527     }
528 }
529
530 template <typename T0, typename T1, typename T2, typename T3, typename T4, typename F>
531 void parallel_for5d(const T0& D0, const T1& D1, const T2& D2, const T3& D3, const T4& D4, const F& func) {
532 #if IE_THREAD == IE_THREAD_TBB
533     auto work_amount = static_cast<size_t>(D0 * D1 * D2 * D3 * D4);
534     int nthr = parallel_get_max_threads();
535     if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
536     if (nthr == 1) {
537         for_5d(0, 1, D0, D1, D2, D3, D4, func);
538     } else {
539         tbb::parallel_for(
540             0, nthr,
541             [&](int ithr) {
542                 for_5d(ithr, nthr, D0, D1, D2, D3, D4, func);
543             },
544             tbb::static_partitioner());
545     }
546 #elif IE_THREAD == IE_THREAD_TBB_AUTO
547     const int nthr = parallel_get_max_threads();
548     tbb::parallel_for(0, nthr, [&](int ithr) {
549         for_5d(ithr, nthr, D0, D1, D2, D3, D4, func);
550     });
551 #elif IE_THREAD == IE_THREAD_OMP
552 #pragma omp parallel
553     for_5d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, D3, D4, func);
554 #elif IE_THREAD == IE_THREAD_SEQ
555     for_5d(0, 1, D0, D1, D2, D3, D4, func);
556 #endif
557 }
558
559 }  // namespace InferenceEngine