Cancelation support fo Async GAPI
authorAnton Potapov <anton.potapov@intel.com>
Fri, 31 May 2019 14:22:12 +0000 (17:22 +0300)
committerAnton Potapov <anton.potapov@intel.com>
Wed, 19 Jun 2019 10:41:39 +0000 (13:41 +0300)
 -

modules/gapi/cmake/standalone.cmake
modules/gapi/include/opencv2/gapi/gasync_context.hpp [new file with mode: 0644]
modules/gapi/include/opencv2/gapi/gcompiled_async.hpp
modules/gapi/include/opencv2/gapi/gcomputation_async.hpp
modules/gapi/include/opencv2/gapi/own/cvdefs.hpp
modules/gapi/src/executor/gasync.cpp
modules/gapi/test/gapi_async_test.cpp

index 9dd5540..e8dbcaa 100644 (file)
@@ -37,6 +37,7 @@ set_property(TARGET ${FLUID_TARGET} PROPERTY CXX_STANDARD 11)
 
 if(MSVC)
   target_compile_options(${FLUID_TARGET} PUBLIC "/wd4251")
+  target_compile_options(${FLUID_TARGET} PUBLIC "/wd4275")
   target_compile_definitions(${FLUID_TARGET} PRIVATE _CRT_SECURE_NO_DEPRECATE)
 endif()
 
diff --git a/modules/gapi/include/opencv2/gapi/gasync_context.hpp b/modules/gapi/include/opencv2/gapi/gasync_context.hpp
new file mode 100644 (file)
index 0000000..3e01577
--- /dev/null
@@ -0,0 +1,38 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2019 Intel Corporation
+
+#ifndef OPENCV_GAPI_GASYNC_CONTEXT_HPP
+#define OPENCV_GAPI_GASYNC_CONTEXT_HPP
+
+#if !defined(GAPI_STANDALONE)
+#  include <opencv2/core/cvdef.h>
+#else   // Without OpenCV
+#  include <opencv2/gapi/own/cvdefs.hpp>
+#endif // !defined(GAPI_STANDALONE)
+
+#include <opencv2/gapi/own/exports.hpp>
+
+namespace cv {
+namespace gapi{
+namespace wip {
+
+class GAPI_EXPORTS GAsyncContext{
+    std::atomic<bool> cancelation_requested = {false};
+public:
+    //returns true if it was a first request to cancel the context
+    bool cancel();
+    bool isCanceled() const;
+};
+
+class GAPI_EXPORTS GAsyncCanceled : public std::exception {
+public:
+    virtual const char* what() const noexcept CV_OVERRIDE;
+};
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+
+#endif //OPENCV_GAPI_GASYNC_CONTEXT_HPP
index 924380d..559f653 100644 (file)
@@ -12,6 +12,7 @@
 #include <exception>        //for std::exception_ptr
 #include <functional>       //for std::function
 #include <opencv2/gapi/garg.hpp>
+#include <opencv2/gapi/own/exports.hpp>
 
 namespace cv {
     //fwd declaration
@@ -19,13 +20,17 @@ namespace cv {
 
 namespace gapi{
 namespace wip {
+    class GAsyncContext;
     //These functions asynchronously (i.e. probably on a separate thread of execution) call operator() member function of their first argument with copies of rest of arguments (except callback) passed in.
     //The difference between the function is the way to get the completion notification (via callback or a waiting on std::future object)
     //If exception is occurred during execution of apply it is transfered to the callback (via function parameter) or passed to future (and will be thrown on call to std::future::get)
     GAPI_EXPORTS void                async(GCompiled& gcmpld, std::function<void(std::exception_ptr)>&& callback, GRunArgs &&ins, GRunArgsP &&outs);
+    GAPI_EXPORTS void                async(GCompiled& gcmpld, std::function<void(std::exception_ptr)>&& callback, GRunArgs &&ins, GRunArgsP &&outs, GAsyncContext& ctx);
+
     GAPI_EXPORTS std::future<void>   async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs);
-} // namespace gapi
+    GAPI_EXPORTS std::future<void>   async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs, GAsyncContext& ctx);
 } // namespace wip
+} // namespace gapi
 } // namespace cv
 
 #endif // OPENCV_GAPI_GCOMPILED_ASYNC_HPP
index 1f76dd0..7444abe 100644 (file)
@@ -8,24 +8,30 @@
 #define OPENCV_GAPI_GCOMPUTATION_ASYNC_HPP
 
 
-#include <future>
+#include <future>                           //for std::future
 #include <exception>                        //for std::exception_ptr
 #include <functional>                       //for std::function
 #include <opencv2/gapi/garg.hpp>            //for GRunArgs, GRunArgsP
 #include <opencv2/gapi/gcommon.hpp>         //for GCompileArgs
+#include <opencv2/gapi/own/exports.hpp>
+
 
 namespace cv {
     //fwd declaration
     class GComputation;
 namespace gapi {
 namespace wip  {
+    class GAsyncContext;
     //These functions asynchronously (i.e. probably on a separate thread of execution) call apply member function of their first argument with copies of rest of arguments (except callback) passed in.
     //The difference between the function is the way to get the completion notification (via callback or a waiting on std::future object)
     //If exception is occurred during execution of apply it is transfered to the callback (via function parameter) or passed to future (and will be thrown on call to std::future::get)
     GAPI_EXPORTS void                async_apply(GComputation& gcomp, std::function<void(std::exception_ptr)>&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args = {});
+    GAPI_EXPORTS void                async_apply(GComputation& gcomp, std::function<void(std::exception_ptr)>&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args, GAsyncContext& ctx);
+
     GAPI_EXPORTS std::future<void>   async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args = {});
-} // nmaepspace gapi
+    GAPI_EXPORTS std::future<void>   async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args,  GAsyncContext& ctx);
 } // namespace wip
+} // namespace gapi
 } // namespace cv
 
 
index e110536..71c2aa8 100644 (file)
@@ -108,6 +108,10 @@ typedef unsigned short ushort;
 #define CV_ELEM_SIZE(type) \
     (CV_MAT_CN(type) << ((((sizeof(size_t)/4+1)*16384|0x3a50) >> CV_MAT_DEPTH(type)*2) & 3))
 
+#ifndef CV_OVERRIDE
+#  define CV_OVERRIDE override
+#endif
+
 // base.h:
 namespace cv
 {
index a66563b..b92dbdc 100644 (file)
@@ -4,10 +4,12 @@
 //
 // Copyright (C) 2019 Intel Corporation
 
+
 #include <opencv2/gapi/gcomputation_async.hpp>
 #include <opencv2/gapi/gcomputation.hpp>
 #include <opencv2/gapi/gcompiled_async.hpp>
 #include <opencv2/gapi/gcompiled.hpp>
+#include <opencv2/gapi/gasync_context.hpp>
 
 #include <condition_variable>
 
 namespace {
     //This is a tool to move initialize captures of a lambda in C++11
     template<typename T>
-    struct move_through_copy{
+    struct copy_through_move{
        T value;
-       move_through_copy(T&& g) : value(std::move(g)) {}
-       move_through_copy(move_through_copy&&) = default;
-       move_through_copy(move_through_copy const& lhs) : move_through_copy(std::move(const_cast<move_through_copy&>(lhs))) {}
+       copy_through_move(T&& g) : value(std::move(g)) {}
+       copy_through_move(copy_through_move&&) = default;
+       copy_through_move(copy_through_move const& lhs) : copy_through_move(std::move(const_cast<copy_through_move&>(lhs))) {}
     };
 }
 
@@ -80,6 +82,7 @@ public:
                 }};
             }
         }
+
         std::unique_lock<std::mutex> lck{mtx};
         bool first_task = q.empty();
         q.push(std::move(t));
@@ -108,8 +111,12 @@ async_service the_ctx;
 }
 
 namespace {
-template<typename f_t>
-std::exception_ptr call_and_catch(f_t&& f){
+template<typename f_t, typename context_t>
+std::exception_ptr call_and_catch(f_t&& f, context_t&& ctx){
+    if (std::forward<context_t>(ctx).isCanceled()){
+        return std::make_exception_ptr(GAsyncCanceled{});
+    }
+
     std::exception_ptr eptr;
     try {
         std::forward<f_t>(f)();
@@ -120,15 +127,21 @@ std::exception_ptr call_and_catch(f_t&& f){
     return eptr;
 }
 
-template<typename f_t, typename callback_t>
-void call_with_callback(f_t&& f, callback_t&& cb){
-    auto eptr = call_and_catch(std::forward<f_t>(f));
+struct DummyContext {
+    bool isCanceled() const {
+        return false;
+    }
+};
+
+template<typename f_t, typename callback_t, typename context_t>
+void call_with_callback(f_t&& f, callback_t&& cb, context_t&& ctx){
+    auto eptr =  call_and_catch(std::forward<f_t>(f), std::forward<context_t>(ctx));
     std::forward<callback_t>(cb)(eptr);
 }
 
-template<typename f_t>
-void call_with_futute(f_t&& f, std::promise<void>& p){
-    auto eptr = call_and_catch(std::forward<f_t>(f));
+template<typename f_t, typename context_t>
+void call_with_future(f_t&& f, std::promise<void>& p, context_t&& ctx){
+    auto eptr =  call_and_catch(std::forward<f_t>(f), std::forward<context_t>(ctx));
     if (eptr){
         p.set_exception(eptr);
     }
@@ -138,56 +151,126 @@ void call_with_futute(f_t&& f, std::promise<void>& p){
 }
 }//namespace
 
+bool GAsyncContext::cancel(){
+    bool expected = false;
+    bool updated  = cancelation_requested.compare_exchange_strong(expected, true);
+    return updated;
+}
+
+bool GAsyncContext::isCanceled() const {
+    return cancelation_requested.load();
+}
+
+const char* GAsyncCanceled::what() const noexcept {
+    return "GAPI asynchronous operation was canceled";
+}
+
 //For now these async functions are simply wrapping serial version of apply/operator() into a functor.
 //These functors are then serialized into single queue, which is processed by a devoted background thread.
 void async_apply(GComputation& gcomp, std::function<void(std::exception_ptr)>&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args){
-    //TODO: use move_through_copy for all args except gcomp
+    //TODO: use copy_through_move for all args except gcomp
+    //TODO: avoid code duplication between versions of "async" functions
     auto l = [=]() mutable {
         auto apply_l = [&](){
             gcomp.apply(std::move(ins), std::move(outs), std::move(args));
         };
 
-        call_with_callback(apply_l,std::move(callback));
+        call_with_callback(apply_l,std::move(callback), DummyContext{});
     };
     impl::the_ctx.add_task(l);
 }
 
 std::future<void> async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args){
-    move_through_copy<std::promise<void>> prms{{}};
+    copy_through_move<std::promise<void>> prms{{}};
     auto f = prms.value.get_future();
     auto l = [=]() mutable {
         auto apply_l = [&](){
             gcomp.apply(std::move(ins), std::move(outs), std::move(args));
         };
 
-        call_with_futute(apply_l, prms.value);
+        call_with_future(apply_l, prms.value, DummyContext{});
     };
 
     impl::the_ctx.add_task(l);
     return f;
 }
 
+void async_apply(GComputation& gcomp, std::function<void(std::exception_ptr)>&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args, GAsyncContext& ctx){
+    //TODO: use copy_through_move for all args except gcomp
+    auto l = [=, &ctx]() mutable {
+        auto apply_l = [&](){
+            gcomp.apply(std::move(ins), std::move(outs), std::move(args));
+        };
+
+        call_with_callback(apply_l,std::move(callback), ctx);
+    };
+    impl::the_ctx.add_task(l);
+}
+
+std::future<void> async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args, GAsyncContext& ctx){
+    copy_through_move<std::promise<void>> prms{{}};
+    auto f = prms.value.get_future();
+    auto l = [=, &ctx]() mutable {
+        auto apply_l = [&](){
+            gcomp.apply(std::move(ins), std::move(outs), std::move(args));
+        };
+
+        call_with_future(apply_l, prms.value, ctx);
+    };
+
+    impl::the_ctx.add_task(l);
+    return f;
+
+}
+
 void async(GCompiled& gcmpld, std::function<void(std::exception_ptr)>&& callback, GRunArgs &&ins, GRunArgsP &&outs){
     auto l = [=]() mutable {
         auto apply_l = [&](){
             gcmpld(std::move(ins), std::move(outs));
         };
 
-        call_with_callback(apply_l,std::move(callback));
+        call_with_callback(apply_l,std::move(callback), DummyContext{});
+    };
+
+    impl::the_ctx.add_task(l);
+}
+
+void async(GCompiled& gcmpld, std::function<void(std::exception_ptr)>&& callback, GRunArgs &&ins, GRunArgsP &&outs, GAsyncContext& ctx){
+    auto l = [=, &ctx]() mutable {
+        auto apply_l = [&](){
+            gcmpld(std::move(ins), std::move(outs));
+        };
+
+        call_with_callback(apply_l,std::move(callback), ctx);
     };
 
     impl::the_ctx.add_task(l);
 }
 
 std::future<void> async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs){
-    move_through_copy<std::promise<void>> prms{{}};
+    copy_through_move<std::promise<void>> prms{{}};
     auto f = prms.value.get_future();
     auto l = [=]() mutable {
         auto apply_l = [&](){
             gcmpld(std::move(ins), std::move(outs));
         };
 
-        call_with_futute(apply_l, prms.value);
+        call_with_future(apply_l, prms.value, DummyContext{});
+    };
+
+    impl::the_ctx.add_task(l);
+    return f;
+
+}
+std::future<void> async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs, GAsyncContext& ctx){
+    copy_through_move<std::promise<void>> prms{{}};
+    auto f = prms.value.get_future();
+    auto l = [=, &ctx]() mutable {
+        auto apply_l = [&](){
+            gcmpld(std::move(ins), std::move(outs));
+        };
+
+        call_with_future(apply_l, prms.value, ctx);
     };
 
     impl::the_ctx.add_task(l);
index 4ad4c59..d6fc593 100644 (file)
@@ -8,6 +8,8 @@
 #include "test_precomp.hpp"
 #include <opencv2/gapi/gcomputation_async.hpp>
 #include <opencv2/gapi/gcompiled_async.hpp>
+#include <opencv2/gapi/gasync_context.hpp>
+
 
 #include <condition_variable>
 #include <stdexcept>
@@ -78,6 +80,32 @@ namespace {
             }
         }
     };
+
+
+    //TODO: unify with callback helper code
+    struct cancel_struct {
+        std::atomic<int> num_tasks_to_spawn;
+
+        cv::gapi::wip::GAsyncContext ctx;
+
+        cancel_struct(int tasks_to_spawn) : num_tasks_to_spawn(tasks_to_spawn) {}
+    };
+
+    G_TYPED_KERNEL(GCancelationAdHoc, <GMat(GMat, cancel_struct*)>, "org.opencv.test.cancel_ad_hoc")
+    {
+        static GMatDesc outMeta(GMatDesc in, cancel_struct* ) { return in;  }
+
+    };
+
+    GAPI_OCV_KERNEL(GCancelationAdHocImpl, GCancelationAdHoc)
+    {
+        static void run(const cv::Mat& , cancel_struct* cancel_struct_p, cv::Mat&)        {
+            auto& cancel_struct_ = * cancel_struct_p;
+            auto num_tasks_to_spawn =  -- cancel_struct_.num_tasks_to_spawn;
+            cancel_struct_.ctx.cancel();
+            EXPECT_GT(num_tasks_to_spawn, 0)<<"Incorrect Test setup - to small number of tasks to feed the queue \n";
+        }
+    };
 }
 
 struct ExceptionOnExecution {
@@ -117,6 +145,41 @@ struct ExceptionOnExecution {
 
 };
 
+struct SelfCanceling {
+    cv::GComputation self_cancel;
+    SelfCanceling(cancel_struct* cancel_struct_p) : self_cancel([cancel_struct_p]{
+        cv::GMat in;
+        cv::GMat out = GCancelationAdHoc::on(in, cancel_struct_p);
+        return GComputation{in, out};
+    })
+    {}
+
+    const cv::Size sz{2, 2};
+    cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)};
+    cv::Mat out_mat;
+
+    cv::GCompiled compile(){
+        return self_cancel.compile(descr_of(in_mat), compile_args());
+    }
+
+    cv::GComputation& computation(){
+        return self_cancel;
+    }
+
+    cv::GRunArgs in_args(){
+        return cv::gin(in_mat);
+    }
+
+    cv::GRunArgsP out_args(){
+        return cv::gout(out_mat);
+    }
+
+    cv::GCompileArgs compile_args(){
+        auto pkg = cv::gapi::kernels<GCancelationAdHocImpl>();
+        return cv::compile_args(pkg);
+    }
+};
+
 template<typename crtp_final_t>
 struct crtp_cast {
     template<typename crtp_base_t>
@@ -150,6 +213,11 @@ struct CallBack: crtp_cast<crtp_final_t> {
         this->crtp_cast_(this)->async(callback(), std::forward<Args>(args)...);
     }
 
+    template<typename... Args >
+    void start_async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args){
+        this->crtp_cast_(this)->async(ctx, callback(), std::forward<Args>(args)...);
+    }
+
     void wait_for_result()
     {
         std::unique_lock<std::mutex> lck{mtx};
@@ -186,6 +254,14 @@ struct AsyncCompiled  : crtp_cast<crtp_final_t>{
         auto gcmpld = this->crtp_cast_(this)->compile();
         return cv::gapi::wip::async(gcmpld, std::forward<Args>(args)...);
     }
+
+    template<typename... Args>
+    auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) ->
+        decltype(cv::gapi::wip::async(std::declval<cv::GCompiled&>(), std::forward<Args>(args)..., std::declval<cv::gapi::wip::GAsyncContext&>()))
+    {
+        auto gcmpld = this->crtp_cast_(this)->compile();
+        return cv::gapi::wip::async(gcmpld, std::forward<Args>(args)..., ctx);
+    }
 };
 
 //Test Mixin, hiding details of calling apply (async_apply) on GAPI Computation object
@@ -193,9 +269,23 @@ template<typename crtp_final_t>
 struct AsyncApply : crtp_cast<crtp_final_t> {
 
     template<typename... Args>
-    auto async(Args&&... args) ->decltype(cv::gapi::wip::async_apply(std::declval<cv::GComputation&>(), std::forward<Args>(args)...)) {
-        return cv::gapi::wip::async_apply(this->crtp_cast_(this)->computation(), std::forward<Args>(args)..., this->crtp_cast_(this)->compile_args());
+    auto async(Args&&... args) ->
+         decltype(cv::gapi::wip::async_apply(std::declval<cv::GComputation&>(), std::forward<Args>(args)..., std::declval<cv::GCompileArgs>()))
+    {
+        return cv::gapi::wip::async_apply(
+                this->crtp_cast_(this)->computation(), std::forward<Args>(args)..., this->crtp_cast_(this)->compile_args()
+        );
     }
+
+    template<typename... Args>
+    auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) ->
+         decltype(cv::gapi::wip::async_apply(std::declval<cv::GComputation&>(), std::forward<Args>(args)... , std::declval<cv::GCompileArgs>(), std::declval<cv::gapi::wip::GAsyncContext&>()))
+    {
+        return cv::gapi::wip::async_apply(
+                this->crtp_cast_(this)->computation(), std::forward<Args>(args)..., this->crtp_cast_(this)->compile_args(), ctx
+        );
+    }
+
 };
 
 
@@ -240,7 +330,7 @@ TYPED_TEST_P(stress, test){
     const std::size_t number_of_threads  = 4;
 
     auto thread_body = [&](){
-        std::vector<TypeParam> requests{request_per_thread};
+        std::vector<TypeParam> requests(request_per_thread);
         for (auto&& r : requests){
             r.start_async(r.in_args(), r.out_args());
         }
@@ -262,13 +352,50 @@ TYPED_TEST_P(stress, test){
 }
 REGISTER_TYPED_TEST_CASE_P(stress, test);
 
+template<typename case_t>
+struct cancel : ::testing::Test{};
+TYPED_TEST_CASE_P(cancel);
+
+TYPED_TEST_P(cancel, basic){
+    constexpr int num_tasks = 100;
+    cancel_struct cancel_struct_ {num_tasks};
+    std::vector<TypeParam> requests; requests.reserve(num_tasks);
+
+    for (auto i = num_tasks; i>0; i--){
+        requests.emplace_back(&cancel_struct_);
+    }
+    for (auto&& r : requests){
+        //first request will cancel other on it's execution
+        r.start_async(cancel_struct_.ctx, r.in_args(), r.out_args());
+    }
+
+    unsigned int canceled = 0 ;
+    for (auto&& r : requests){
+        try {
+            r.wait_for_result();
+        }catch (cv::gapi::wip::GAsyncCanceled&){
+            ++canceled;
+        }
+    }
+    ASSERT_GT(canceled, 0u);
+}
+
+REGISTER_TYPED_TEST_CASE_P(cancel, basic);
+
 //little helpers to match up all combinations of setups
 template<typename compute_fixture_t,template <typename> class callback_or_future_t, template <typename> class compiled_or_apply_t>
 struct Case
         : compute_fixture_t,
           callback_or_future_t<Case<compute_fixture_t,callback_or_future_t,compiled_or_apply_t>>,
           compiled_or_apply_t <Case<compute_fixture_t,callback_or_future_t,compiled_or_apply_t>>
-{};
+{
+    template<typename... Args>
+    Case(Args&&... args) : compute_fixture_t(std::forward<Args>(args)...) { }
+    Case(Case const &  ) = default;
+    Case(Case &&  ) = default;
+
+    Case() = default;
+};
 
 template<typename computation_t>
 using cases = ::testing::Types<
@@ -282,6 +409,8 @@ INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIExceptionHandling_, exception,  cases<Exce
 
 INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIStress,             stress,     cases<SumOfSum2x2>);
 
+INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPICancelation,        cancel,     cases<SelfCanceling>);
+
 TEST(AsyncAPI, Sample){
     cv::GComputation self_mul([]{
         cv::GMat in;
@@ -296,4 +425,5 @@ TEST(AsyncAPI, Sample){
     auto f = cv::gapi::wip::async_apply(self_mul,cv::gin(in_mat), cv::gout(out));
     f.wait();
 }
+
 } // namespace opencv_test