Merge pull request #17851 from anton-potapov:sole_tbb_executor
authorAnton Potapov <anton.potapov@intel.com>
Mon, 30 Nov 2020 13:15:13 +0000 (16:15 +0300)
committerGitHub <noreply@github.com>
Mon, 30 Nov 2020 13:15:13 +0000 (13:15 +0000)
* TBB executor for GAPI

 - the sole executor
 - unit tests for it
 - no usage in the GAPI at the momnet

* TBB executor for GAPI

 - introduced new overload of execute to explicitly accept tbb::arena
   argument
 - added more basic tests
 - moved arena creation code into tests
 -

* TBB executor for GAPI

 - fixed compie errors & warnings

* TBB executor for GAPI

 - split all-in-one execute() function into logicaly independant parts

* TBB executor for GAPI

 - used util::variant in in the tile_node

* TBB executor for GAPI

 - moved copy_through_move to separate header
 - rearranged details staff in proper namespaces
 - moved all implementation into detail namespace

* TBB executor for GAPI

 - fixed build error with TBB 4.4.
 - fixed build warnings

* TBB executor for GAPI

 - aligned strings width
 - fixed spaces in expressions
 - fixed english grammar
 - minor improvements

* TBB executor for GAPI

 - added more comments
 - minor improvements

* TBB executor for GAPI

 - changed ITT_ prefix for macroses to GAPI_ITT

* TBB executor for GAPI

 - no more "unused" warning for GAPI_DbgAssert
 - changed local assert macro to man onto GAPI_DbgAssert

* TBB executor for GAPI

 - file renamings
 - changed local assert macro to man onto GAPI_DbgAsse

* TBB executor for GAPI

 - test file renamed
 - add more comments

* TBB executor for GAPI

 - minor clenups and cosmetic changes

* TBB executor for GAPI

 - minor clenups and cosmetic changes

* TBB executor for GAPI

 - changed spaces and curly braces alignment

* TBB executor for GAPI

 - minor cleanups

* TBB executor for GAPI

 - minor cleanups

modules/gapi/CMakeLists.txt
modules/gapi/include/opencv2/gapi/own/assert.hpp
modules/gapi/include/opencv2/gapi/util/copy_through_move.hpp [new file with mode: 0644]
modules/gapi/src/executor/gapi_itt.hpp [new file with mode: 0644]
modules/gapi/src/executor/gtbbexecutor.cpp [new file with mode: 0644]
modules/gapi/src/executor/gtbbexecutor.hpp [new file with mode: 0644]
modules/gapi/test/executor/gtbbexecutor_internal_tests.cpp [new file with mode: 0644]

index 0067cfa..c1f58ee 100644 (file)
@@ -107,6 +107,7 @@ set(gapi_srcs
 
     # Executor
     src/executor/gexecutor.cpp
+    src/executor/gtbbexecutor.cpp
     src/executor/gstreamingexecutor.cpp
     src/executor/gasync.cpp
 
@@ -196,6 +197,10 @@ if(TARGET opencv_test_gapi)
   target_link_libraries(opencv_test_gapi PRIVATE ade)
 endif()
 
+if(HAVE_TBB AND TARGET opencv_test_gapi)
+  ocv_target_link_libraries(opencv_test_gapi PRIVATE tbb)
+endif()
+
 if(HAVE_FREETYPE)
   ocv_target_compile_definitions(${the_module} PRIVATE -DHAVE_FREETYPE)
   if(TARGET opencv_test_gapi)
index d0e0f1c..d50543f 100644 (file)
@@ -2,16 +2,28 @@
 // It is subject to the license terms in the LICENSE file found in the top-level directory
 // of this distribution and at http://opencv.org/license.html.
 //
-// Copyright (C) 2018 Intel Corporation
+// Copyright (C) 2018-2020 Intel Corporation
 
 
 #ifndef OPENCV_GAPI_OWN_ASSERT_HPP
 #define OPENCV_GAPI_OWN_ASSERT_HPP
 
+#include <opencv2/gapi/util/compiler_hints.hpp>
+
+#define GAPI_DbgAssertNoOp(expr) {                  \
+    constexpr bool _assert_tmp = false && (expr);   \
+    cv::util::suppress_unused_warning(_assert_tmp); \
+}
+
 #if !defined(GAPI_STANDALONE)
 #include <opencv2/core/base.hpp>
 #define GAPI_Assert CV_Assert
-#define GAPI_DbgAssert CV_DbgAssert
+
+#if defined _DEBUG || defined CV_STATIC_ANALYSIS
+#  define GAPI_DbgAssert CV_DbgAssert
+#else
+#  define GAPI_DbgAssert(expr) GAPI_DbgAssertNoOp(expr)
+#endif
 
 #else
 #include <stdexcept>
@@ -33,7 +45,7 @@ namespace detail
 
 
 #ifdef NDEBUG
-#  define GAPI_DbgAssert(expr)
+#  define GAPI_DbgAssert(expr) GAPI_DbgAssertNoOp(expr)
 #else
 #  define GAPI_DbgAssert(expr) GAPI_Assert(expr)
 #endif
diff --git a/modules/gapi/include/opencv2/gapi/util/copy_through_move.hpp b/modules/gapi/include/opencv2/gapi/util/copy_through_move.hpp
new file mode 100644 (file)
index 0000000..1a1121e
--- /dev/null
@@ -0,0 +1,34 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2020 Intel Corporation
+
+#ifndef OPENCV_GAPI_UTIL_COPY_THROUGH_MOVE_HPP
+#define OPENCV_GAPI_UTIL_COPY_THROUGH_MOVE_HPP
+
+#include <opencv2/gapi/util/type_traits.hpp> //decay_t
+
+namespace cv
+{
+namespace util
+{
+    //This is a tool to move initialize captures of a lambda in C++11
+    template<typename T>
+    struct copy_through_move_t{
+       T value;
+       const T& get() const {return value;}
+       T&       get()       {return value;}
+       copy_through_move_t(T&& g) : value(std::move(g)) {}
+       copy_through_move_t(copy_through_move_t&&) = default;
+       copy_through_move_t(copy_through_move_t const& lhs) : copy_through_move_t(std::move(const_cast<copy_through_move_t&>(lhs))) {}
+    };
+
+    template<typename T>
+    copy_through_move_t<util::decay_t<T>> copy_through_move(T&& t){
+        return std::forward<T>(t);
+    }
+} // namespace util
+} // namespace cv
+
+#endif /* OPENCV_GAPI_UTIL_COPY_THROUGH_MOVE_HPP */
diff --git a/modules/gapi/src/executor/gapi_itt.hpp b/modules/gapi/src/executor/gapi_itt.hpp
new file mode 100644 (file)
index 0000000..2ab3237
--- /dev/null
@@ -0,0 +1,59 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2020 Intel Corporation
+
+#ifndef OPENCV_GAPI_GAPI_ITT_HPP
+#define OPENCV_GAPI_GAPI_ITT_HPP
+
+//for ITT_NAMED_TRACE_GUARD
+#include <type_traits>
+#include <memory>
+
+// FIXME: It seems that this macro is not propagated here by the OpenCV cmake (as this is not core module).
+// (Consider using OpenCV's trace.hpp )
+#ifdef OPENCV_WITH_ITT
+#include <ittnotify.h>
+#endif
+
+#include <opencv2/gapi/util/compiler_hints.hpp>
+namespace cv {
+namespace util {
+    template< class T >
+    using remove_reference_t = typename std::remove_reference<T>::type;
+
+    // Home brew ScopeGuard
+    // D will be called automatically with p as argument when ScopeGuard goes out of scope.
+    // call release() on the ScopeGuard object to revoke guard action
+    template<typename T, typename D>
+    auto make_ptr_guard(T* p, D&& d) -> std::unique_ptr<T, util::remove_reference_t<D>> {
+        return {p, std::forward<D>(d)};
+    }
+}  // namespace util
+
+// FIXME: make it more reusable (and move to other place and other namespace)
+namespace gimpl { namespace parallel {
+    #ifdef OPENCV_WITH_ITT
+    extern const __itt_domain* gapi_itt_domain;
+
+    namespace {
+        auto make_itt_guard = [](__itt_string_handle* h) {
+           __itt_task_begin(gapi_itt_domain, __itt_null, __itt_null, (h));
+           return util::make_ptr_guard(reinterpret_cast<int*>(1), [](int* ) { __itt_task_end(gapi_itt_domain); });
+        };
+    }  // namespace
+
+    #define GAPI_ITT_NAMED_TRACE_GUARD(name, h)  auto name =  cv::gimpl::parallel::make_itt_guard(h); cv::util::suppress_unused_warning(name)
+    #else
+    struct dumb_guard {void reset(){}};
+    #define GAPI_ITT_NAMED_TRACE_GUARD(name, h)  cv::gimpl::parallel::dumb_guard name; cv::util::suppress_unused_warning(name)
+    #endif
+
+    #define GAPI_ITT_AUTO_TRACE_GUARD_IMPL_(LINE, h)        GAPI_ITT_NAMED_TRACE_GUARD(itt_trace_guard_##LINE, h)
+    #define GAPI_ITT_AUTO_TRACE_GUARD_IMPL(LINE, h)         GAPI_ITT_AUTO_TRACE_GUARD_IMPL_(LINE, h)
+    #define GAPI_ITT_AUTO_TRACE_GUARD(h)                    GAPI_ITT_AUTO_TRACE_GUARD_IMPL(__LINE__, h)
+}} //gimpl::parallel
+}  //namespace cv
+
+#endif /* OPENCV_GAPI_GAPI_ITT_HPP */
diff --git a/modules/gapi/src/executor/gtbbexecutor.cpp b/modules/gapi/src/executor/gtbbexecutor.cpp
new file mode 100644 (file)
index 0000000..03c6757
--- /dev/null
@@ -0,0 +1,445 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2020 Intel Corporation
+
+#include "gtbbexecutor.hpp"
+
+#if defined(HAVE_TBB)
+#include "gapi_itt.hpp"
+
+#include <opencv2/gapi/own/assert.hpp>
+#include <opencv2/gapi/util/copy_through_move.hpp>
+#include "logger.hpp" // GAPI_LOG
+
+#include <tbb/task.h>
+#include <memory> // unique_ptr
+
+#include <atomic>
+#include <condition_variable>
+
+#include <chrono>
+
+#define ASSERT(expr)          GAPI_DbgAssert(expr)
+
+#define LOG_INFO(tag, ...)    GAPI_LOG_INFO(tag, __VA_ARGS__)
+#define LOG_WARNING(tag, ...) GAPI_LOG_WARNING(tag, __VA_ARGS__)
+#define LOG_DEBUG(tag, ...)   GAPI_LOG_DEBUG(tag, __VA_ARGS__)
+
+
+#ifdef OPENCV_WITH_ITT
+const __itt_domain* cv::gimpl::parallel::gapi_itt_domain = __itt_domain_create("GAPI Context");
+#endif
+
+namespace cv { namespace gimpl { namespace parallel {
+
+namespace detail {
+// some helper staff to deal with tbb::task related entities
+namespace tasking {
+
+enum class use_tbb_scheduler_bypass {
+   NO,
+   YES
+};
+
+inline void assert_graph_is_running(tbb::task* root) {
+   // tbb::task::wait_for_all block calling thread until task ref_count is dropped to 1
+   // So if the root task ref_count is greater than 1 graph still has a job to do and
+   // according wait_for_all() has not yet returned
+   ASSERT(root->ref_count() > 1);
+}
+
+// made template to break circular dependencies
+template<typename body_t>
+struct functor_task : tbb::task {
+   body_t body;
+
+   template<typename arg_t>
+   functor_task(arg_t&& a) : body(std::forward<arg_t>(a)) {}
+
+   tbb::task * execute() override {
+      assert_graph_is_running(parent());
+
+      auto reuse_current_task = body();
+      // if needed, say TBB to execute current task once again
+      return (use_tbb_scheduler_bypass::YES ==  reuse_current_task) ? (recycle_as_continuation(), this) : nullptr;
+   }
+   ~functor_task() {
+      assert_graph_is_running(parent());
+   }
+};
+
+template<typename body_t>
+auto allocate_task(tbb::task* root, body_t const& body) -> functor_task<body_t>* {
+    return new(tbb::task::allocate_additional_child_of(*root)) functor_task<body_t>{body};
+}
+
+template<typename body_t>
+void spawn_no_assert(tbb::task* root, body_t const& body) {
+   tbb::task::spawn(* allocate_task(root, body));
+}
+
+#ifdef OPENCV_WITH_ITT
+namespace {
+    static __itt_string_handle* ittTbbAddReadyBlocksToQueue   = __itt_string_handle_create("add ready blocks to queue");
+    static __itt_string_handle* ittTbbSpawnReadyBlocks        = __itt_string_handle_create("spawn ready blocks");
+    static __itt_string_handle* ittTbbEnqueueSpawnReadyBlocks = __itt_string_handle_create("enqueueing a spawn of ready blocks");
+    static __itt_string_handle* ittTbbUnlockMasterThread      = __itt_string_handle_create("Unlocking master thread");
+}
+#endif // OPENCV_WITH_ITT
+
+
+template<typename body_t>
+void batch_spawn(size_t count, tbb::task* root, body_t const& body, bool do_assert_graph_is_running = true) {
+   GAPI_ITT_AUTO_TRACE_GUARD(ittTbbSpawnReadyBlocks);
+   if (do_assert_graph_is_running) {
+       assert_graph_is_running(root);
+   }
+
+   for (size_t i=0; i<count; i++) {
+       spawn_no_assert(root, body);
+   }
+}
+
+
+struct destroy_tbb_task {
+    void operator()(tbb::task* t) const { if (t) tbb::task::destroy(*t);};
+};
+
+using root_t = std::unique_ptr<tbb::task, destroy_tbb_task>;
+
+root_t inline create_root(tbb::task_group_context& ctx) {
+    root_t  root{new (tbb::task::allocate_root(ctx)) tbb::empty_task};
+    root->set_ref_count(1); // required by wait_for_all, as it waits until counter drops to 1
+    return root;
+}
+
+std::size_t inline tg_context_traits() {
+    // Specify tbb::task_group_context::concurrent_wait in the traits to ask TBB scheduler not to change
+    // ref_count of the task we wait on (root) when wait is complete.
+    return tbb::task_group_context::default_traits | tbb::task_group_context::concurrent_wait;
+}
+
+} // namespace tasking
+
+namespace async {
+struct async_tasks_t {
+    std::atomic<size_t>         count {0};
+    std::condition_variable     cv;
+    std::mutex                  mtx;
+};
+
+enum class wake_tbb_master {
+   NO,
+   YES
+};
+
+void inline wake_master(async_tasks_t& async_tasks, wake_tbb_master wake_master) {
+    // TODO: seems that this can be relaxed
+    auto active_async_tasks = --async_tasks.count;
+
+    if ((active_async_tasks == 0) || (wake_master == wake_tbb_master::YES)) {
+        // Was the last async task or asked to wake TBB master up(e.g. there are new TBB tasks to execute)
+        GAPI_ITT_AUTO_TRACE_GUARD(ittTbbUnlockMasterThread);
+        // While decrement of async_tasks_t::count is atomic, it might occur after the waiting
+        // thread has read its value but _before_ it actually starts waiting on the condition variable.
+        // So, lock acquire is needed to guarantee that current condition check (if any) in the waiting thread
+        // (possibly ran in parallel to async_tasks_t::count decrement above) is completed _before_ signal is issued.
+        // Therefore when notify_one is called, waiting thread is either sleeping on the condition variable or
+        // running a new check which is guaranteed to pick the new value and return from wait().
+
+        // There is no need to _hold_ the lock while signaling, only to acquire it.
+        std::unique_lock<std::mutex> {async_tasks.mtx};   // Acquire and release the lock.
+        async_tasks.cv.notify_one();
+    }
+}
+
+struct master_thread_sleep_lock_t
+{
+    struct sleep_unlock {
+       void operator()(async_tasks_t* t) const {
+          ASSERT(t);
+          wake_master(*t, wake_tbb_master::NO);
+       }
+    };
+
+    std::unique_ptr<async_tasks_t, sleep_unlock>  guard;
+
+    master_thread_sleep_lock_t() = default;
+    master_thread_sleep_lock_t(async_tasks_t*  async_tasks_ptr) : guard(async_tasks_ptr) {
+        // TODO: seems that this can be relaxed
+        ++(guard->count);
+    }
+
+    void unlock(wake_tbb_master wake) {
+        if (auto* p = guard.release()) {
+            wake_master(*p, wake);
+        }
+    }
+};
+
+master_thread_sleep_lock_t inline lock_sleep_master(async_tasks_t& async_tasks) {
+    return {&async_tasks};
+}
+
+enum class is_tbb_work_present {
+   NO,
+   YES
+};
+
+//RAII object to block TBB master thread (one that does wait_for_all())
+//N.B. :wait_for_all() return control when root ref_count drops to 1,
+struct root_wait_lock_t {
+    struct root_decrement_ref_count{
+        void operator()(tbb::task* t) const {
+            ASSERT(t);
+            auto result = t->decrement_ref_count();
+            ASSERT(result >= 1);
+        }
+    };
+
+    std::unique_ptr<tbb::task, root_decrement_ref_count> guard;
+
+    root_wait_lock_t() = default;
+    root_wait_lock_t(tasking::root_t& root, is_tbb_work_present& previous_state) : guard{root.get()} {
+        // Block the master thread while the *this object is alive.
+        auto new_root_ref_count = root->add_ref_count(1);
+        previous_state = (new_root_ref_count == 2) ? is_tbb_work_present::NO : is_tbb_work_present::YES;
+    }
+
+};
+
+root_wait_lock_t inline lock_wait_master(tasking::root_t& root, is_tbb_work_present& previous_state) {
+    return root_wait_lock_t{root, previous_state};
+}
+
+} // namespace async
+
+inline tile_node*  pop(prio_items_queue_t& q) {
+    tile_node* node = nullptr;
+    bool popped = q.try_pop(node);
+    ASSERT(popped && "queue should be non empty as we push items to it before we spawn");
+    return node;
+}
+
+namespace graph {
+    // Returns : number of items actually pushed into the q
+    std::size_t inline push_ready_dependants(prio_items_queue_t& q, tile_node* node) {
+        GAPI_ITT_AUTO_TRACE_GUARD(ittTbbAddReadyBlocksToQueue);
+        std::size_t ready_items = 0;
+        // enable dependent tasks
+        for (auto* dependant : node->dependants) {
+            // fetch_and_sub returns previous value
+            if (1 == dependant->dependency_count.fetch_sub(1)) {
+                // tile node is ready for execution, add it to the queue
+                q.push(dependant);
+                ++ready_items;
+            }
+        }
+        return ready_items;
+    }
+
+    struct exec_ctx {
+        tbb::task_arena&                arena;
+        prio_items_queue_t&             q;
+        tbb::task_group_context         tg_ctx;
+        tasking::root_t                 root;
+        detail::async::async_tasks_t    async_tasks;
+        std::atomic<size_t>             executed {0};
+
+        exec_ctx(tbb::task_arena& arena_, prio_items_queue_t& q_)
+            : arena(arena_), q(q_),
+              // As the traits is last argument, explicitly specify (default) value for first argument
+              tg_ctx{tbb::task_group_context::bound, tasking::tg_context_traits()},
+              root(tasking::create_root(tg_ctx))
+        {}
+    };
+
+    // At the moment there are no suitable tools to  manage TBB priorities on task by task basis.
+    // Instead priority queue is used to respect tile_node priorities.
+    // As well, TBB task is not bound to any particular tile_node until actually executed.
+
+    // Strictly speaking there are two graphs here:
+    // - G-API one, described by the connected tile_node instances.
+    //   This graph is :
+    //    - Known beforehand, and do not change during the execution (i.e. static)
+    //    - Contains both TBB non-TBB parts
+    //    - prioritized, (i.e. all nodes has assigned priority of execution)
+    //
+    // - TBB task tree, which is :
+    //    - flat (Has only two levels : root and leaves)
+    //    - dynamic, i.e. new leaves are added on demand when new tbb tasks are spawned
+    //    - describes only TBB/CPU part of the whole graph
+    //    - non-prioritized (i.e. all tasks are created equal)
+
+    // Class below represents TBB task payload.
+    //
+    // Each instance basically does the three things :
+    // 1. Gets the tile_node item from the top of the queue
+    // 2. Executes its body
+    // 3. Pushes dependent tile_nodes to the queue once they are ready
+    //
+    struct task_body {
+        exec_ctx& ctx;
+
+        std::size_t push_ready_dependants(tile_node* node) const {
+            return graph::push_ready_dependants(ctx.q, node);
+        }
+
+        void spawn_clones(std::size_t items) const {
+            tasking::batch_spawn(items, ctx.root.get(), *this);
+        }
+
+        task_body(exec_ctx& ctx_) : ctx(ctx_) {}
+        tasking::use_tbb_scheduler_bypass operator()() const {
+            ASSERT(!ctx.q.empty() && "Spawned task with no job to do ? ");
+
+            tile_node* node = detail::pop(ctx.q);
+
+            auto result = tasking::use_tbb_scheduler_bypass::NO;
+            // execute the task
+
+            if (auto p = util::get_if<tile_node::sync_task_body>(&(node->task_body))) {
+                // synchronous task
+                p->body();
+
+                std::size_t ready_items = push_ready_dependants(node);
+
+                if (ready_items > 0) {
+                    // spawn one less tasks and say TBB to reuse(recycle) current task
+                    spawn_clones(ready_items - 1);
+                    result = tasking::use_tbb_scheduler_bypass::YES;
+                }
+            }
+            else {
+                LOG_DEBUG(NULL, "Async task");
+                using namespace detail::async;
+                using util::copy_through_move;
+
+                auto block_master = copy_through_move(lock_sleep_master(ctx.async_tasks));
+
+                auto self_copy = *this;
+                auto callback = [node, block_master, self_copy] () mutable /*due to block_master.get().unlock()*/ {
+                    LOG_DEBUG(NULL, "Async task callback is called");
+                    // Implicitly unlock master right in the end of callback
+                    auto master_sleep_lock = std::move(block_master);
+                    std::size_t ready_items = self_copy.push_ready_dependants(node);
+                    if (ready_items > 0) {
+                        auto master_was_active = is_tbb_work_present::NO;
+                        {
+                            GAPI_ITT_AUTO_TRACE_GUARD(ittTbbEnqueueSpawnReadyBlocks);
+                            // Force master thread (one that does wait_for_all()) to (actively) wait for enqueued tasks
+                            // and unlock it right after all dependent tasks are spawned.
+
+                            auto root_wait_lock = copy_through_move(lock_wait_master(self_copy.ctx.root, master_was_active));
+
+                            // TODO: add test to cover proper holding of root_wait_lock
+                            // As the calling thread most likely is not TBB one, instead of spawning TBB tasks directly we
+                            // enqueue a task which will spawn them.
+                            // For master thread to not leave wait_for_all() prematurely,
+                            // hold the root_wait_lock until need tasks are actually spawned.
+                            self_copy.ctx.arena.enqueue([ready_items, self_copy, root_wait_lock]() {
+                                self_copy.spawn_clones(ready_items);
+                                // TODO: why we need this? Either write a descriptive comment or remove it
+                                volatile auto unused = root_wait_lock.get().guard.get();
+                                util::suppress_unused_warning(unused);
+                            });
+                        }
+                        // Wake master thread (if any) to pick up the enqueued tasks iff:
+                        // 1. there is new TBB work to do, and
+                        // 2. Master thread was sleeping on condition variable waiting for async tasks to complete
+                        //   (There was no active work before (i.e. root->ref_count() was == 1))
+                        auto wake_master = (master_was_active == is_tbb_work_present::NO) ?
+                                wake_tbb_master::YES : wake_tbb_master::NO;
+                        master_sleep_lock.get().unlock(wake_master);
+                    }
+                };
+
+                auto& body = util::get<tile_node::async_task_body>(node->task_body).body;
+                body(std::move(callback), node->total_order_index);
+            }
+
+            ctx.executed++;
+            // reset dependecy_count to initial state to simplify re-execution of the same graph
+            node->dependency_count = node->dependencies;
+
+            return result;
+        }
+    };
+}
+} // namespace detail
+}}}  // namespace cv::gimpl::parallel
+
+void cv::gimpl::parallel::execute(prio_items_queue_t& q) {
+    // get the reference to current task_arena (i.e. one we are running in)
+#if TBB_INTERFACE_VERSION > 9002
+    using attach_t = tbb::task_arena::attach;
+#else
+    using attach_t = tbb::internal::attach;
+#endif
+
+    tbb::task_arena arena{attach_t{}};
+    execute(q, arena);
+}
+
+void cv::gimpl::parallel::execute(prio_items_queue_t& q, tbb::task_arena& arena) {
+    using namespace detail;
+    graph::exec_ctx ctx{arena, q};
+
+    arena.execute(
+        [&]() {
+            // Passed in queue is assumed to contain starting tasks, i.e. ones with no (or resolved) dependencies
+            auto num_start_tasks = q.size();
+
+            // TODO: use recursive spawning and task soft affinity for faster task distribution
+            // As graph is starting and no task has been spawned yet
+            // assert_graph_is_running(root) will not hold, so spawn without assert
+            tasking::batch_spawn(num_start_tasks, ctx.root.get(), graph::task_body{ctx}, /* assert_graph_is_running*/false);
+
+            using namespace std::chrono;
+            high_resolution_clock timer;
+
+            auto tbb_work_done   = [&ctx]() { return 1 == ctx.root->ref_count(); };
+            auto async_work_done = [&ctx]() { return 0 == ctx.async_tasks.count; };
+            do {
+               // First participate in execution of TBB graph till there are no more ready tasks.
+               ctx.root->wait_for_all();
+
+               if (!async_work_done()) { // Wait on the conditional variable iff there is active async work
+                   auto start = timer.now();
+                   std::unique_lock<std::mutex> lk(ctx.async_tasks.mtx);
+                   // Wait (probably by sleeping) until all async tasks are completed or new TBB tasks are created.
+                   // FIXME: Use TBB resumable tasks here to avoid blocking TBB thread
+                   ctx.async_tasks.cv.wait(lk, [&]{return async_work_done() || !tbb_work_done() ;});
+
+                   LOG_INFO(NULL, "Slept for " << duration_cast<milliseconds>(timer.now() - start).count() << " ms \n");
+               }
+            }
+            while(!tbb_work_done() || !async_work_done());
+
+            ASSERT(tbb_work_done() && async_work_done() && "Graph is still running?");
+        }
+    );
+
+    LOG_INFO(NULL, "Done. Executed " << ctx.executed << " tasks");
+}
+
+std::ostream& cv::gimpl::parallel::operator<<(std::ostream& o, tile_node const& n) {
+    o << "("
+            << " at:"    << &n << ","
+            << "indx: "  << n.total_order_index << ","
+            << "deps #:" << n.dependency_count.value << ", "
+            << "prods:"  << n.dependants.size();
+
+    o << "[";
+    for (auto* d: n.dependants) {
+        o << d << ",";
+    }
+    o << "]";
+
+    o << ")";
+    return o;
+}
+
+#endif // HAVE_TBB
diff --git a/modules/gapi/src/executor/gtbbexecutor.hpp b/modules/gapi/src/executor/gtbbexecutor.hpp
new file mode 100644 (file)
index 0000000..8a62266
--- /dev/null
@@ -0,0 +1,103 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2020 Intel Corporation
+
+#ifndef OPENCV_GAPI_TBB_EXECUTOR_HPP
+#define OPENCV_GAPI_TBB_EXECUTOR_HPP
+
+#if !defined(GAPI_STANDALONE)
+#include <opencv2/cvconfig.h>
+#endif
+
+#if defined(HAVE_TBB)
+
+#include <atomic>
+#include <vector>
+#include <functional>
+#include <iosfwd>
+
+#include <tbb/concurrent_priority_queue.h>
+#include <tbb/task_arena.h>
+
+#include <opencv2/gapi/util/variant.hpp>
+
+namespace cv { namespace gimpl { namespace parallel {
+
+// simple wrapper to allow copies of std::atomic
+template<typename  count_t>
+struct atomic_copyable_wrapper {
+    std::atomic<count_t> value;
+
+    atomic_copyable_wrapper(count_t val) : value(val) {}
+    atomic_copyable_wrapper(atomic_copyable_wrapper const& lhs) : value (lhs.value.load(std::memory_order_relaxed)) {}
+
+    atomic_copyable_wrapper& operator=(count_t val) {
+        value.store(val, std::memory_order_relaxed);
+        return *this;
+    }
+
+    count_t fetch_sub(count_t val) {
+        return value.fetch_sub(val);
+    }
+
+    count_t fetch_add(count_t val) {
+        return value.fetch_add(val);
+    }
+};
+
+struct async_tag {};
+constexpr async_tag async;
+
+// Class describing a piece of work in the node in the tasks graph.
+// Most of the fields are set only once during graph compilation and never changes.
+// (However at the moment they can not be made const due to two phase initialization
+// of the tile_node objects)
+// FIXME: refactor the code to make the const?
+struct tile_node {
+    // place in totally ordered queue of tasks to execute. Inverse to priority, i.e.
+    // lower index means higher priority
+    size_t                                          total_order_index = 0;
+
+    // FIXME: use templates here instead of std::function
+    struct sync_task_body {
+        std::function<void()> body;
+    };
+    struct async_task_body {
+        std::function<void(std::function<void()>&& callback, size_t total_order_index)> body;
+    };
+
+    util::variant<sync_task_body, async_task_body>  task_body;
+
+    // number of dependencies according to a dependency graph (i.e. number of "input" edges).
+    size_t                                          dependencies     = 0;
+
+    // number of unsatisfied dependencies. When drops to zero task is ready for execution.
+    // Initially equal to "dependencies"
+    atomic_copyable_wrapper<size_t>                 dependency_count = 0;
+
+    std::vector<tile_node*>                         dependants;
+
+    tile_node(decltype(sync_task_body::body)&& f) : task_body(sync_task_body{std::move(f)}) {};
+    tile_node(async_tag, decltype(async_task_body::body)&& f) : task_body(async_task_body{std::move(f)}) {};
+};
+
+std::ostream& operator<<(std::ostream& o, tile_node const& n);
+
+struct tile_node_indirect_priority_comparator {
+    bool operator()(tile_node const * lhs, tile_node const * rhs) const {
+        return lhs->total_order_index > rhs->total_order_index;
+    }
+};
+
+using prio_items_queue_t = tbb::concurrent_priority_queue<tile_node*, tile_node_indirect_priority_comparator>;
+
+void execute(prio_items_queue_t& q);
+void execute(prio_items_queue_t& q, tbb::task_arena& arena);
+
+}}} // namespace cv::gimpl::parallel
+
+#endif // HAVE_TBB
+
+#endif // OPENCV_GAPI_TBB_EXECUTOR_HPP
diff --git a/modules/gapi/test/executor/gtbbexecutor_internal_tests.cpp b/modules/gapi/test/executor/gtbbexecutor_internal_tests.cpp
new file mode 100644 (file)
index 0000000..d793683
--- /dev/null
@@ -0,0 +1,172 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2020 Intel Corporation
+
+// Deliberately include .cpp file instead of header as we use non exported function (execute)
+#include <executor/gtbbexecutor.cpp>
+
+#if defined(HAVE_TBB)
+
+#include "../test_precomp.hpp"
+#include <tbb/task_arena.h>
+#include <thread>
+
+namespace {
+    tbb::task_arena create_task_arena(int max_concurrency  = tbb::task_arena::automatic /* set to 1 for single thread */) {
+        unsigned int reserved_for_master_threads = 1;
+        if (max_concurrency == 1) {
+            // Leave no room for TBB worker threads, by reserving all to masters.
+            // TBB runtime guarantees that no worker threads will join the arena
+            // if max_concurrency is equal to reserved_for_master_threads
+            // except 1:1 + use of enqueued tasks for safety guarantee.
+            // So deliberately make it 2:2 to force TBB not to create extra thread.
+            //
+            // N.B. one slot will left empty as only one master thread(one that
+            // calls root->wait_for_all()) will join the arena.
+
+            // FIXME: strictly speaking master can take any free slot, not the first one.
+            // However at the moment master seems to pick 0 slot all the time.
+            max_concurrency = 2;
+            reserved_for_master_threads = 2;
+        }
+        return tbb::task_arena{max_concurrency, reserved_for_master_threads};
+    }
+}
+
+namespace opencv_test {
+
+TEST(TBBExecutor, Basic) {
+    using namespace cv::gimpl::parallel;
+    bool executed = false;
+    prio_items_queue_t q;
+    tile_node n([&]() {
+        executed = true;
+    });
+    q.push(&n);
+    execute(q);
+    EXPECT_EQ(true, executed);
+}
+
+TEST(TBBExecutor, SerialExecution) {
+    using namespace cv::gimpl::parallel;
+    const int n = 10;
+    prio_items_queue_t q;
+    std::vector<tile_node> nodes; nodes.reserve(n+1);
+    std::vector<std::thread::id> thread_id(n);
+    for (int i=0; i <n; i++) {
+        nodes.push_back(tile_node([&, i]() {
+                thread_id[i] = std::this_thread::get_id();
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+        }));
+        q.push(&nodes.back());
+    }
+
+    auto serial_arena = create_task_arena(1);
+    execute(q, serial_arena);
+    auto print_thread_ids = [&] {
+        std::stringstream str;
+        for (auto& i : thread_id) { str << i <<" ";}
+        return str.str();
+    };
+    EXPECT_NE(thread_id[0], std::thread::id{}) << print_thread_ids();
+    EXPECT_EQ(thread_id.size(), static_cast<size_t>(std::count(thread_id.begin(), thread_id.end(), thread_id[0])))
+        << print_thread_ids();
+}
+
+TEST(TBBExecutor, AsyncBasic) {
+    using namespace cv::gimpl::parallel;
+
+    std::atomic<bool> callback_ready {false};
+    std::function<void()> callback;
+
+    std::atomic<bool> callback_called   {false};
+    std::atomic<bool> master_is_waiting {true};
+    std::atomic<bool> master_was_blocked_until_callback_called {false};
+
+    auto async_thread = std::thread([&] {
+            bool slept = false;
+            while (!callback_ready) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(1));
+                slept = true;
+            }
+            if (!slept) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(1));
+            }
+            callback();
+            callback_called = true;
+            master_was_blocked_until_callback_called = (master_is_waiting == true);
+    });
+
+    auto async_task_body = [&](std::function<void()>&& cb, size_t /*total_order_index*/) {
+        callback = std::move(cb);
+        callback_ready = true;
+    };
+    tile_node n(async, std::move(async_task_body));
+
+    prio_items_queue_t q;
+    q.push(&n);
+    execute(q);
+    master_is_waiting = false;
+
+    async_thread.join();
+
+    EXPECT_EQ(true, callback_called);
+    EXPECT_EQ(true, master_was_blocked_until_callback_called);
+}
+
+TEST(TBBExecutor, Dependencies) {
+    using namespace cv::gimpl::parallel;
+    const int n = 10;
+    bool serial = true;
+    std::atomic<int> counter {0};
+    prio_items_queue_t q;
+    std::vector<tile_node> nodes; nodes.reserve(n+1);
+    const int invalid_order = -10;
+    std::vector<int> tiles_exec_order(n, invalid_order);
+
+    auto add_dependency_to = [](tile_node& node, tile_node& dependency) {
+        dependency.dependants.push_back(&node);
+        node.dependencies++;
+        node.dependency_count.fetch_add(1);
+    };
+    for (int i=0; i <n; i++) {
+        nodes.push_back(tile_node([&, i]() {
+                tiles_exec_order[i] = counter++;
+                if (!serial) {
+                    //sleep gives a better chance for other threads to take part in the execution
+                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
+                }
+        }));
+        if (i >0) {
+            auto last_node = nodes.end() - 1;
+            add_dependency_to(*last_node, *(last_node -1));
+        }
+    }
+
+    q.push(&nodes.front());
+
+    auto arena = serial ? create_task_arena(1) : create_task_arena();
+    execute(q, arena);
+    auto print_execution_order = [&] {
+        std::stringstream str;
+        for (auto& i : tiles_exec_order) { str << i <<" ";}
+        return str.str();
+    };
+    ASSERT_EQ(0, std::count(tiles_exec_order.begin(), tiles_exec_order.end(), invalid_order))
+        << "Not all " << n << " task executed ?\n"
+        <<" execution order : " << print_execution_order();
+
+    for (size_t i=0; i <nodes.size(); i++) {
+        auto node_exec_order = tiles_exec_order[i];
+        for (auto* dependee : nodes[i].dependants) {
+            auto index = std::distance(&nodes.front(), dependee);
+            auto dependee_execution_order = tiles_exec_order[index];
+            ASSERT_LT(node_exec_order, dependee_execution_order) << "node number " << index << " is executed earlier than it's dependency " << i;
+        }
+    }
+}
+} // namespace opencv_test
+#endif //HAVE_TBB