Merge pull request #19487 from TolyaTalamanov:at/support-nireq-option
authorAnatoliy Talamanov <anatoliy.talamanov@intel.com>
Fri, 26 Feb 2021 12:53:30 +0000 (15:53 +0300)
committerGitHub <noreply@github.com>
Fri, 26 Feb 2021 12:53:30 +0000 (12:53 +0000)
[G-API] Support multiple asynchronous requests

* Support nireq option

* Disable tests to check CI

* Fix bug with hanging

* WA to green CI

* Snapshot

* Simplify RequestPool

* Add default values to id

* Fix win warning

modules/gapi/include/opencv2/gapi/infer/ie.hpp
modules/gapi/src/backends/ie/giebackend.cpp
modules/gapi/src/backends/ie/giebackend.hpp
modules/gapi/src/executor/gstreamingexecutor.cpp
modules/gapi/test/infer/gapi_infer_ie_test.cpp

index 53e31fb..e1df80f 100644 (file)
@@ -67,6 +67,9 @@ namespace detail {
         Kind kind;
         bool is_generic;
         IEConfig config;
+
+        // NB: Number of asyncrhonious infer requests
+        size_t nireq;
     };
 } // namespace detail
 
@@ -91,7 +94,8 @@ public:
               , std::tuple_size<typename Net::OutArgs>::value // num_out
               , detail::ParamDesc::Kind::Load
               , false
-              , {}} {
+              , {}
+              , 1u} {
     };
 
     Params(const std::string &model,
@@ -101,7 +105,8 @@ public:
               , std::tuple_size<typename Net::OutArgs>::value // num_out
               , detail::ParamDesc::Kind::Import
               , false
-              , {}} {
+              , {}
+              , 1u} {
     };
 
     Params<Net>& cfgInputLayers(const typename PortCfg<Net>::In &ll) {
@@ -137,6 +142,12 @@ public:
         return *this;
     }
 
+    Params& cfgNumRequests(size_t nireq) {
+        GAPI_Assert(nireq > 0 && "Number of infer requests must be greater than zero!");
+        desc.nireq = nireq;
+        return *this;
+    }
+
     // BEGIN(G-API's network parametrization API)
     GBackend      backend()    const { return cv::gapi::ie::backend();  }
     std::string   tag()        const { return Net::tag(); }
@@ -154,13 +165,13 @@ public:
            const std::string &model,
            const std::string &weights,
            const std::string &device)
-        : desc{ model, weights, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Load, true, {}}, m_tag(tag) {
+        : desc{ model, weights, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Load, true, {}, 1u}, m_tag(tag) {
     };
 
     Params(const std::string &tag,
            const std::string &model,
            const std::string &device)
-        : desc{ model, {}, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Import, true, {}}, m_tag(tag) {
+        : desc{ model, {}, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Import, true, {}, 1u}, m_tag(tag) {
     };
 
     Params& pluginConfig(IEConfig&& cfg) {
index 949c803..7999782 100644 (file)
 #include "ie_compound_blob.h"
 #endif
 
+#if defined(HAVE_TBB)
+#  include <tbb/concurrent_queue.h> // FIXME: drop it from here!
+template<typename T> using QueueClass = tbb::concurrent_bounded_queue<T>;
+#else
+#  include "executor/conc_queue.hpp"
+template<typename T> using QueueClass = cv::gapi::own::concurrent_bounded_queue<T>;
+#endif // TBB
+
 namespace IE = InferenceEngine;
 
 namespace {
@@ -254,17 +262,7 @@ struct IEUnit {
             non_const_this->this_network = cv::gimpl::ie::wrap::loadNetwork(non_const_this->this_plugin, net, params);
         }
 
-        auto this_request = non_const_this->this_network.CreateInferRequest();
-        // Bind const data to infer request
-        for (auto &&p : params.const_inputs) {
-            // FIXME: SetBlob is known to be inefficient,
-            // it is worth to make a customizable "initializer" and pass the
-            // cv::Mat-wrapped blob there to support IE's optimal "GetBlob idiom"
-            // Still, constant data is to set only once.
-            this_request.SetBlob(p.first, wrapIE(p.second.first, p.second.second));
-        }
-
-        return {this_plugin, this_network, this_request};
+        return {params, this_plugin, this_network};
     }
 };
 
@@ -273,7 +271,6 @@ class IECallContext
 public:
     IECallContext(const IEUnit                                      &  unit,
                   cv::gimpl::GIslandExecutable::IOutput             &  output,
-                  cv::gimpl::ie::SyncPrim                           &  sync,
                   const cv::GArgs                                   &  args,
                   const std::vector<cv::gimpl::RcDesc>              &  outs,
                   std::vector<cv::gimpl::GIslandExecutable::InObj>  && input_objs,
@@ -302,7 +299,6 @@ public:
 
     const IEUnit                          &uu;
     cv::gimpl::GIslandExecutable::IOutput &out;
-    cv::gimpl::ie::SyncPrim               &sync;
 
     // NB: Need to gurantee that MediaFrame::View don't die until request is over.
     using Views = std::vector<std::unique_ptr<cv::MediaFrame::View>>;
@@ -333,13 +329,11 @@ private:
 
 IECallContext::IECallContext(const IEUnit                                      &  unit,
                              cv::gimpl::GIslandExecutable::IOutput             &  output,
-                             cv::gimpl::ie::SyncPrim                           &  syncp,
                              const cv::GArgs                                   &  args,
                              const std::vector<cv::gimpl::RcDesc>              &  outs,
                              std::vector<cv::gimpl::GIslandExecutable::InObj>  && input_objs,
                              std::vector<cv::gimpl::GIslandExecutable::OutObj> && output_objs)
-: uu(unit), out(output), sync(syncp), m_input_objs(std::move(input_objs)),
-    m_output_objs(std::move(output_objs))
+: uu(unit), out(output), m_input_objs(std::move(input_objs)), m_output_objs(std::move(output_objs))
 {
     for (auto& it : m_input_objs)  cv::gimpl::magazine::bindInArg (m_res, it.first, it.second);
     for (auto& it : m_output_objs) cv::gimpl::magazine::bindOutArg(m_res, it.first, it.second);
@@ -355,12 +349,12 @@ IECallContext::IECallContext(const IEUnit                                      &
                 return arg.get<cv::gimpl::RcDesc>().shape;
             });
 
-     for (const auto out_it : ade::util::indexed(outs)) {
-         // FIXME: Can the same GArg type resolution mechanism be reused here?
-         const auto port  = ade::util::index(out_it);
-         const auto desc  = ade::util::value(out_it);
-         m_results[port] = cv::gimpl::magazine::getObjPtr(m_res, desc);
-     }
+    for (const auto out_it : ade::util::indexed(outs)) {
+        // FIXME: Can the same GArg type resolution mechanism be reused here?
+        const auto port  = ade::util::index(out_it);
+        const auto desc  = ade::util::value(out_it);
+        m_results[port] = cv::gimpl::magazine::getObjPtr(m_res, desc);
+    }
 }
 
 const cv::GArgs& IECallContext::inArgs() const {
@@ -429,7 +423,7 @@ cv::GArg IECallContext::packArg(const cv::GArg &arg) {
 
 struct IECallable {
     static const char *name() { return "IERequestCallable"; }
-    using Run = std::function<void(cv::gimpl::ie::IECompiled&, std::shared_ptr<IECallContext>)>;
+    using Run = std::function<void(std::shared_ptr<IECallContext>, cv::gimpl::ie::RequestPool&)>;
     Run run;
 };
 
@@ -480,6 +474,97 @@ inline IE::Blob::Ptr extractBlob(IECallContext& ctx, std::size_t i) {
 }
 } // anonymous namespace
 
+std::vector<InferenceEngine::InferRequest> cv::gimpl::ie::IECompiled::createInferRequests() {
+    std::vector<InferenceEngine::InferRequest> requests;
+    requests.reserve(params.nireq);
+
+    for (size_t i = 0; i < params.nireq; ++i) {
+        requests.push_back(this_network.CreateInferRequest());
+        auto& request = requests.back();
+        // Bind const data to infer request
+        for (auto &&p : params.const_inputs) {
+            // FIXME: SetBlob is known to be inefficient,
+            // it is worth to make a customizable "initializer" and pass the
+            // cv::Mat-wrapped blob there to support IE's optimal "GetBlob idiom"
+            // Still, constant data is to set only once.
+            request.SetBlob(p.first, wrapIE(p.second.first, p.second.second));
+        }
+    }
+
+    return requests;
+}
+
+class cv::gimpl::ie::RequestPool {
+public:
+    using RunF      = std::function<void(InferenceEngine::InferRequest&)>;
+    using CallbackF = std::function<void(InferenceEngine::InferRequest&)>;
+
+    // NB: The task is represented by:
+    // RunF      - function which is set blobs and run async inference.
+    // CallbackF - function which is obtain output blobs and post it to output.
+    struct Task {
+        RunF run;
+        CallbackF callback;
+    };
+
+    explicit RequestPool(std::vector<InferenceEngine::InferRequest>&& requests);
+
+    void execute(Task&& t, bool async = true);
+    void waitAndShutdown();
+
+private:
+    void callback(Task task, InferenceEngine::InferRequest& request, size_t id);
+
+    QueueClass<size_t>                         m_idle_ids;
+    std::vector<InferenceEngine::InferRequest> m_requests;
+};
+
+// RequestPool implementation //////////////////////////////////////////////
+cv::gimpl::ie::RequestPool::RequestPool(std::vector<InferenceEngine::InferRequest>&& requests)
+    : m_requests(std::move(requests)) {
+        for (size_t i = 0; i < m_requests.size(); ++i) {
+            m_idle_ids.push(i);
+        }
+    }
+
+void cv::gimpl::ie::RequestPool::execute(cv::gimpl::ie::RequestPool::Task&& t, bool async) {
+    size_t id = 0u;
+    m_idle_ids.pop(id);
+
+    auto& request = m_requests[id];
+
+    // FIXME: This WA should be removed after supporting async mode for InferList and Infer2.
+    // InferList and Infer2 work synchronously without calling callback,
+    // therefore don't release InferRequest idle id.
+    if (!async) {
+        // NB: Synchronous execution.
+        t.run(request);
+        // NB: Explicitly call callback to release id.
+        callback(t, request, id);
+        return;
+    }
+
+    request.SetCompletionCallback(
+            std::bind(&cv::gimpl::ie::RequestPool::callback, this, t, std::ref(request), id));
+    t.run(request);
+}
+
+void cv::gimpl::ie::RequestPool::callback(cv::gimpl::ie::RequestPool::Task task,
+                                          InferenceEngine::InferRequest& request,
+                                          size_t id) {
+    task.callback(request);
+    m_idle_ids.push(id);
+}
+
+// NB: Not thread-safe.
+void cv::gimpl::ie::RequestPool::waitAndShutdown() {
+    // NB: It will be blocked if at least one request is busy.
+    for (size_t i = 0; i < m_requests.size(); ++i) {
+        size_t id = 0u;
+        m_idle_ids.pop(id);
+    }
+}
+
 // GCPUExcecutable implementation //////////////////////////////////////////////
 cv::gimpl::ie::GIEExecutable::GIEExecutable(const ade::Graph &g,
                                             const std::vector<ade::NodeHandle> &nodes)
@@ -494,6 +579,7 @@ cv::gimpl::ie::GIEExecutable::GIEExecutable(const ade::Graph &g,
             if (this_nh == nullptr) {
                 this_nh = nh;
                 this_iec = iem.metadata(this_nh).get<IEUnit>().compile();
+                m_reqPool.reset(new RequestPool(this_iec.createInferRequests()));
             }
             else
                 util::throw_error(std::logic_error("Multi-node inference is not supported!"));
@@ -518,27 +604,26 @@ cv::gimpl::ie::GIEExecutable::GIEExecutable(const ade::Graph &g,
 void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput  &in,
                                        cv::gimpl::GIslandExecutable::IOutput &out) {
     // General alghoritm:
-    //     1. Since only single async request is supported
-    //        wait until it is over and start collecting new data.
-    //     2. Collect island inputs/outputs.
-    //     3. Create kernel context. (Every kernel has his own context.)
-    //     4. Go to the next frame without waiting until the async request is over (1)
+    //     1. Collect island inputs/outputs.
+    //     2. Create kernel context. (Every kernel has his own context).
+    //     3. If the EndOfStream message is recieved, wait until all passed task are done.
+    //     4.
+    //        5.1 Run the kernel.
+    //        5.2 Kernel wait for all nececcary infer requests and start asynchronous execution.
+    //        5.3 After the kernel is finished continue processing next frame.
     //
-    //     5. If graph is compiled in non-streaming mode, wait until request is over.
-
-    // (1) To prevent data race on the IOutput object, need to wait
-    // for async request callback, which post outputs and only after that get new data.
-    m_sync.wait();
+    //     5. If graph is compiled in non-streaming mode, wait until all tasks are done.
 
     std::vector<InObj>  input_objs;
     std::vector<OutObj> output_objs;
 
     const auto &in_desc  = in.desc();
-    const auto &out_desc = out.desc();
     const auto  in_msg   = in.get();
 
     if (cv::util::holds_alternative<cv::gimpl::EndOfStream>(in_msg))
     {
+        // (3) Wait until all passed task are done.
+        m_reqPool->waitAndShutdown();
         out.post(cv::gimpl::EndOfStream{});
         return;
     }
@@ -546,39 +631,38 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput  &in
     GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(in_msg));
     const auto in_vector = cv::util::get<cv::GRunArgs>(in_msg);
 
-    // (2) Collect inputs/outputs
+    // (1) Collect island inputs/outputs
     input_objs.reserve(in_desc.size());
-    output_objs.reserve(out_desc.size());
     for (auto &&it: ade::util::zip(ade::util::toRange(in_desc),
-                                   ade::util::toRange(in_vector)))
+                ade::util::toRange(in_vector)))
     {
         input_objs.emplace_back(std::get<0>(it), std::get<1>(it));
     }
+
+    const auto &out_desc = out.desc();
+    output_objs.reserve(out_desc.size());
     for (auto &&it: ade::util::indexed(ade::util::toRange(out_desc)))
     {
         output_objs.emplace_back(ade::util::value(it),
-                              out.get(ade::util::checked_cast<int>(ade::util::index(it))));
+                out.get(ade::util::checked_cast<int>(ade::util::index(it))));
     }
 
     GConstGIEModel giem(m_g);
     const auto &uu = giem.metadata(this_nh).get<IEUnit>();
     const auto &op = m_gm.metadata(this_nh).get<Op>();
-    // (3) Create kernel context
-    auto context = std::make_shared<IECallContext>(uu, out, m_sync, op.args, op.outs,
+    // (2) Create kernel context
+    auto ctx = std::make_shared<IECallContext>(uu, out, op.args, op.outs,
             std::move(input_objs), std::move(output_objs));
 
-
-    // (5) Run the kernel and start handle next frame.
     const auto &kk = giem.metadata(this_nh).get<IECallable>();
-    // FIXME: Running just a single node now.
-    // Not sure if need to support many of them, though
-    // FIXME: Make this island-unmergeable?
-    kk.run(this_iec, context);
 
-    // (6) In not-streaming mode need to wait until the async request is over
+    // (4) Run the kernel.
+    kk.run(ctx, *m_reqPool);
+
+    // (5) In non-streaming mode need to wait until the all tasks are done
     // FIXME: Is there more graceful way to handle this case ?
     if (!m_gm.metadata().contains<Streaming>()) {
-        m_sync.wait();
+        m_reqPool->waitAndShutdown();
     }
 }
 
@@ -616,54 +700,16 @@ static void configureInputInfo(const IE::InputInfo::Ptr& ii, const cv::GMetaArg
 
 // NB: This is a callback used by async infer
 // to post outputs blobs (cv::GMat's).
-struct PostOutputs {
-    // NB: Should be const to pass into SetCompletionCallback
-    void operator()() const {
-        for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
-             auto& out_mat = ctx->outMatR(i);
-             IE::Blob::Ptr this_blob = iec.this_request.GetBlob(ctx->uu.params.output_names[i]);
-             copyFromIE(this_blob, out_mat);
-             ctx->out.post(ctx->output(i));
-        }
-        ctx->sync.release_and_notify();
-    }
-
-    IECompiled                            &iec ;
-    std::shared_ptr<IECallContext>         ctx ;
-};
-
-// NB: This is a callback used by async infer
-// to post output list of blobs (cv::GArray<cv::GMat>).
-struct PostOutputsList {
-    // NB: Should be const to pass into SetCompletionCallback
-    void operator()() const {
-        for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
-            std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
-
-            IE::Blob::Ptr out_blob = iec.this_request.GetBlob(ctx->uu.params.output_names[i]);
-
-            cv::Mat out_mat(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
-            // FIXME: Avoid data copy. Not sure if it is possible though
-            copyFromIE(out_blob, out_mat);
-            out_vec.push_back(std::move(out_mat));
-        }
-        // NB: Callbacks run synchronously yet, so the lock isn't necessary
-        auto&& out_vec_size = ctx->outVecR<cv::Mat>(0).size();
-        // NB: Now output vector is collected and can be posted to output
-        if (nrequests == out_vec_size) {
-            for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
-                 ctx->out.post(ctx->output(i));
-            }
-        }
-
-        ctx->sync.release_and_notify();
+static void PostOutputs(InferenceEngine::InferRequest   &request,
+                        std::shared_ptr<IECallContext>   ctx) {
+    for (auto i : ade::util::iota(ctx->uu.params.num_out))
+    {
+        auto& out_mat = ctx->outMatR(i);
+        IE::Blob::Ptr this_blob = request.GetBlob(ctx->uu.params.output_names[i]);
+        copyFromIE(this_blob, out_mat);
+        ctx->out.post(ctx->output(i));
     }
-
-    IECompiled                            &iec ;
-    std::shared_ptr<IECallContext>         ctx ;
-    std::vector< std::vector<int> >        cached_dims;
-    size_t                                 nrequests;
-};
+}
 
 struct Infer: public cv::detail::KernelTag {
     using API = cv::GInferBase;
@@ -715,23 +761,28 @@ struct Infer: public cv::detail::KernelTag {
         return result;
     }
 
-    static void run(IECompiled &iec, std::shared_ptr<IECallContext> ctx) {
-        // non-generic version for now:
-        // - assumes all inputs/outputs are always Mats
-        for (auto i : ade::util::iota(ctx->uu.params.num_in)) {
-            // TODO: Ideally we shouldn't do SetBlob() but GetBlob() instead,
-            // and redirect our data producers to this memory
-            // (A memory dialog comes to the picture again)
-            IE::Blob::Ptr this_blob = extractBlob(*ctx, i);
-            iec.this_request.SetBlob(ctx->uu.params.input_names[i], this_blob);
-        }
-
-        iec.this_request.SetCompletionCallback(PostOutputs{iec, ctx});
-
-        // NB: Since only single async request is supported, need to lock other
-        // attempts to get access while request is working.
-        ctx->sync.acquire();
-        iec.this_request.StartAsync();
+    static void run(std::shared_ptr<IECallContext>  ctx,
+                    cv::gimpl::ie::RequestPool     &reqPool) {
+        using namespace std::placeholders;
+        reqPool.execute(
+                cv::gimpl::ie::RequestPool::Task {
+                    [ctx](InferenceEngine::InferRequest &req) {
+                        // non-generic version for now:
+                        // - assumes all inputs/outputs are always Mats
+                        for (auto i : ade::util::iota(ctx->uu.params.num_in)) {
+                            // TODO: Ideally we shouldn't do SetBlob() but GetBlob() instead,
+                            // and redirect our data producers to this memory
+                            // (A memory dialog comes to the picture again)
+                            IE::Blob::Ptr this_blob = extractBlob(*ctx, i);
+                            req.SetBlob(ctx->uu.params.input_names[i], this_blob);
+                        }
+                        // FIXME: Should it be done by kernel ?
+                        // What about to do that in RequestPool ?
+                        req.StartAsync();
+                    },
+                    std::bind(PostOutputs, _1, ctx)
+                }
+        );
     }
 };
 
@@ -776,22 +827,27 @@ struct InferROI: public cv::detail::KernelTag {
         return result;
     }
 
-    static void run(IECompiled &iec, std::shared_ptr<IECallContext> ctx) {
-        // non-generic version for now, per the InferROI's definition
-        GAPI_Assert(ctx->uu.params.num_in == 1);
-        const auto& this_roi = ctx->inArg<cv::detail::OpaqueRef>(0).rref<cv::Rect>();
-
-        IE::Blob::Ptr this_blob = extractBlob(*ctx, 1u);
-
-        iec.this_request.SetBlob(*(ctx->uu.params.input_names.begin()),
-                IE::make_shared_blob(this_blob, toIE(this_roi)));
-
-        iec.this_request.SetCompletionCallback(PostOutputs{iec, ctx});
-
-        // NB: Since only single async request is supported, need to lock other
-        // attempts to get access while request is working.
-        ctx->sync.acquire();
-        iec.this_request.StartAsync();
+    static void run(std::shared_ptr<IECallContext>  ctx,
+                    cv::gimpl::ie::RequestPool     &reqPool) {
+        using namespace std::placeholders;
+        reqPool.execute(
+                cv::gimpl::ie::RequestPool::Task {
+                    [ctx](InferenceEngine::InferRequest &req) {
+                        GAPI_Assert(ctx->uu.params.num_in == 1);
+                        auto&& this_roi = ctx->inArg<cv::detail::OpaqueRef>(0).rref<cv::Rect>();
+
+                        IE::Blob::Ptr this_blob = extractBlob(*ctx, 1);
+
+                        req.SetBlob(*(ctx->uu.params.input_names.begin()),
+                                IE::make_shared_blob(this_blob, toIE(this_roi)));
+
+                        // FIXME: Should it be done by kernel ?
+                        // What about to do that in RequestPool ?
+                        req.StartAsync();
+                    },
+                    std::bind(PostOutputs, _1, ctx)
+                }
+        );
     }
 };
 
@@ -834,52 +890,63 @@ struct InferList: public cv::detail::KernelTag {
                              cv::GMetaArg{cv::empty_array_desc()});
     }
 
-    static void run(IECompiled &iec, std::shared_ptr<IECallContext> ctx) {
-        // non-generic version for now:
-        // - assumes zero input is always ROI list
-        // - assumes all inputs/outputs are always Mats
-        GAPI_Assert(ctx->uu.params.num_in == 1); // roi list is not counted in net's inputs
-
-        const auto& in_roi_vec = ctx->inArg<cv::detail::VectorRef>(0u).rref<cv::Rect>();
-
-        IE::Blob::Ptr this_blob = extractBlob(*ctx, 1u);
-
-        // FIXME: This could be done ONCE at graph compile stage!
-        std::vector< std::vector<int> > cached_dims(ctx->uu.params.num_out);
-        for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
-            const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]);
-            cached_dims[i] = toCV(ie_out->getTensorDesc().getDims());
-            ctx->outVecR<cv::Mat>(i).clear();
-            // FIXME: Isn't this should be done automatically
-            // by some resetInternalData(), etc? (Probably at the GExecutor level)
-        }
-
-        // NB: If list of roi is empty need to post output data anyway.
-        if (in_roi_vec.empty()) {
-            for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
-                 ctx->out.post(ctx->output(i));
-            }
-            return;
-        }
-
-        for (auto&& rc : in_roi_vec) {
-            // NB: Only single async request is supported now,
-            // so need to wait until previos iteration is over.
-            // However there is no need to wait async request from last iteration,
-            // this will be done by backend.
-            ctx->sync.wait();
-
-            IE::Blob::Ptr roi_blob = IE::make_shared_blob(this_blob, toIE(rc));
-            iec.this_request.SetBlob(ctx->uu.params.input_names[0u], roi_blob);
-
-            iec.this_request.SetCompletionCallback(
-                    PostOutputsList{iec, ctx, cached_dims, in_roi_vec.size()});
-
-            // NB: Since only single async request is supported, need to lock other
-            // attempts to get access while request is working.
-            ctx->sync.acquire();
-            iec.this_request.StartAsync();
-        }
+    static void run(std::shared_ptr<IECallContext>  ctx,
+                    cv::gimpl::ie::RequestPool     &reqPool) {
+
+        using namespace std::placeholders;
+        reqPool.execute(
+                cv::gimpl::ie::RequestPool::Task {
+                    [ctx](InferenceEngine::InferRequest &req) {
+                        // non-generic version for now:
+                        // - assumes zero input is always ROI list
+                        // - assumes all inputs/outputs are always Mats
+                        const auto& in_roi_vec = ctx->inArg<cv::detail::VectorRef>(0u).rref<cv::Rect>();
+                        // NB: In case there is no input data need to post output anyway
+                        if (in_roi_vec.empty()) {
+                            for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+                                ctx->out.post(ctx->output(i));
+                            }
+                            return;
+                        }
+
+                        IE::Blob::Ptr this_blob = extractBlob(*ctx, 1);
+
+                        // FIXME: This could be done ONCE at graph compile stage!
+                        std::vector<std::vector<int>> cached_dims(ctx->uu.params.num_out);
+                        for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+                            const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]);
+                            cached_dims[i] = toCV(ie_out->getTensorDesc().getDims());
+                            // FIXME: Isn't this should be done automatically
+                            // by some resetInternalData(), etc? (Probably at the GExecutor level)
+                            ctx->outVecR<cv::Mat>(i).clear();
+                        }
+
+                        for (auto&& rc : in_roi_vec) {
+                            IE::Blob::Ptr roi_blob = IE::make_shared_blob(this_blob, toIE(rc));
+                            req.SetBlob(ctx->uu.params.input_names[0u], roi_blob);
+
+                            req.Infer();
+
+                            for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+                                std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
+
+                                IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]);
+
+                                cv::Mat out_mat(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
+                                // FIXME: Avoid data copy. Not sure if it is possible though
+                                copyFromIE(out_blob, out_mat);
+                                out_vec.push_back(std::move(out_mat));
+                            }
+                        }
+
+                        for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+                            ctx->out.post(ctx->output(i));
+                        }
+                    },
+                    [](InferenceEngine::InferRequest &) { /* do nothing */ }
+                },
+            false /* not async */
+        );
     }
 };
 
@@ -966,66 +1033,83 @@ struct InferList2: public cv::detail::KernelTag {
                              cv::GMetaArg{cv::empty_array_desc()});
     }
 
-    static void run(IECompiled &iec, std::shared_ptr<IECallContext> ctx) {
-        GAPI_Assert(ctx->inArgs().size() > 1u
-                && "This operation must have at least two arguments");
-
-        IE::Blob::Ptr blob_0 = extractBlob(*ctx, 0u);
-
-        // Take the next argument, which must be vector (of any kind).
-        // Use it only to obtain the ROI list size (sizes of all other
-        // vectors must be equal to this one)
-        const auto list_size = ctx->inArg<cv::detail::VectorRef>(1u).size();
-
-        // FIXME: This could be done ONCE at graph compile stage!
-        std::vector< std::vector<int> > cached_dims(ctx->uu.params.num_out);
-        for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
-            const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]);
-            cached_dims[i] = toCV(ie_out->getTensorDesc().getDims());
-            ctx->outVecR<cv::Mat>(i).clear();
-            // FIXME: Isn't this should be done automatically
-            // by some resetInternalData(), etc? (Probably at the GExecutor level)
-        }
-
-        // NB: If list of roi is empty need to post output data anyway.
-        if (list_size == 0u) {
-            for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
-                 ctx->out.post(ctx->output(i));
-            }
-            return;
-        }
-
-        for (const auto &list_idx : ade::util::iota(list_size)) {
-            // NB: Only single async request is supported now,
-            // so need to wait until previos iteration is over.
-            // However there is no need to wait async request from last iteration,
-            // this will be done by backend.
-            ctx->sync.wait();
-            for (auto in_idx : ade::util::iota(ctx->uu.params.num_in)) {
-                const auto &this_vec = ctx->inArg<cv::detail::VectorRef>(in_idx+1u);
-                GAPI_Assert(this_vec.size() == list_size);
-                IE::Blob::Ptr this_blob;
-                if (this_vec.getKind() == cv::detail::OpaqueKind::CV_RECT) {
-                    const auto &vec = this_vec.rref<cv::Rect>();
-                    this_blob = IE::make_shared_blob(blob_0, toIE(vec[list_idx]));
-                } else if (this_vec.getKind() == cv::detail::OpaqueKind::CV_MAT) {
-                    const auto &vec = this_vec.rref<cv::Mat>();
-                    const auto &mat = vec[list_idx];
-                    this_blob = wrapIE(mat, cv::gapi::ie::TraitAs::TENSOR);
-                } else {
-                    GAPI_Assert(false && "Only Rect and Mat types are supported for infer list 2!");
-                }
-                iec.this_request.SetBlob(ctx->uu.params.input_names[in_idx], this_blob);
-            }
-
-            iec.this_request.SetCompletionCallback(
-                    PostOutputsList{iec, ctx, cached_dims, list_size});
-
-            // NB: Since only single async request is supported, need to lock other
-            // attempts to get access while request is working.
-            ctx->sync.acquire();
-            iec.this_request.StartAsync();
-        }
+    static void run(std::shared_ptr<IECallContext> ctx,
+                    cv::gimpl::ie::RequestPool    &reqPool) {
+            reqPool.execute(
+                    cv::gimpl::ie::RequestPool::Task {
+                        [ctx](InferenceEngine::InferRequest &req) {
+                            GAPI_Assert(ctx->inArgs().size() > 1u
+                                    && "This operation must have at least two arguments");
+
+                            IE::Blob::Ptr blob_0 = extractBlob(*ctx, 0);
+
+                            // Take the next argument, which must be vector (of any kind).
+                            // Use it only to obtain the ROI list size (sizes of all other
+                            // vectors must be equal to this one)
+                            const auto list_size = ctx->inArg<cv::detail::VectorRef>(1u).size();
+                            if (list_size == 0u) {
+                                for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+                                    ctx->out.post(ctx->output(i));
+                                }
+                                return;
+                            }
+
+                            for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+                                ctx->outVecR<cv::Mat>(i).resize(list_size);
+                            }
+
+                            // FIXME: This could be done ONCE at graph compile stage!
+                            std::vector< std::vector<int> > cached_dims(ctx->uu.params.num_out);
+                            for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+                                const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]);
+                                cached_dims[i] = toCV(ie_out->getTensorDesc().getDims());
+                                // FIXME: Isn't this should be done automatically
+                                // by some resetInternalData(), etc? (Probably at the GExecutor level)
+                                ctx->outVecR<cv::Mat>(i).clear();
+                            }
+
+                            for (const auto &list_idx : ade::util::iota(list_size)) {
+                                for (auto in_idx : ade::util::iota(ctx->uu.params.num_in)) {
+                                    const auto &this_vec = ctx->inArg<cv::detail::VectorRef>(in_idx+1u);
+                                    GAPI_Assert(this_vec.size() == list_size);
+                                    IE::Blob::Ptr this_blob;
+                                    if (this_vec.getKind() == cv::detail::OpaqueKind::CV_RECT) {
+                                        const auto &vec = this_vec.rref<cv::Rect>();
+                                        this_blob = IE::make_shared_blob(blob_0, toIE(vec[list_idx]));
+                                    } else if (this_vec.getKind() == cv::detail::OpaqueKind::CV_MAT) {
+                                        const auto &vec = this_vec.rref<cv::Mat>();
+                                        const auto &mat = vec[list_idx];
+                                        this_blob = wrapIE(mat, cv::gapi::ie::TraitAs::TENSOR);
+                                    } else {
+                                        GAPI_Assert(false &&
+                                                "Only Rect and Mat types are supported for infer list 2!");
+                                    }
+
+                                    req.SetBlob(ctx->uu.params.input_names[in_idx], this_blob);
+                                }
+
+                                req.Infer();
+
+                                for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+                                    std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
+
+                                    IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]);
+
+                                    cv::Mat out_mat(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
+                                    // FIXME: Avoid data copy. Not sure if it is possible though
+                                    copyFromIE(out_blob, out_mat);
+                                    out_vec.push_back(std::move(out_mat));
+                                }
+                            }
+
+                            for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+                                ctx->out.post(ctx->output(i));
+                            }
+                        },
+                        [](InferenceEngine::InferRequest &) { /* do nothing */ }
+                    },
+                false /* not async */
+            );
     }
 };
 
index 87d3a99..fbfeecc 100644 (file)
 #include "backends/common/gbackend.hpp"
 #include "compiler/gislandmodel.hpp"
 
+#include "backends/ie/giebackend/giewrapper.hpp" // wrap::Plugin
+
 namespace cv {
 namespace gimpl {
 namespace ie {
 
 struct IECompiled {
-#if INF_ENGINE_RELEASE < 2019020000  // < 2019.R2
-    InferenceEngine::InferencePlugin   this_plugin;
-#else
-    InferenceEngine::Core              this_core;
-#endif
-    InferenceEngine::ExecutableNetwork this_network;
-    InferenceEngine::InferRequest      this_request;
-};
-
-// FIXME: Structure which collect all necessary sync primitives
-// will be deleted when the async request pool appears
-class SyncPrim {
-public:
-    void wait() {
-        std::unique_lock<std::mutex> l(m_mutex);
-        m_cv.wait(l, [this]{ return !m_is_busy; });
-    }
+    std::vector<InferenceEngine::InferRequest> createInferRequests();
 
-    void release_and_notify() {
-        {
-            std::lock_guard<std::mutex> lock(m_mutex);
-            m_is_busy = false;
-        }
-        m_cv.notify_one();
-    }
-
-    void acquire() {
-        std::lock_guard<std::mutex> lock(m_mutex);
-        m_is_busy = true;
-    }
-
-private:
-    // To wait until the async request isn't over
-    std::condition_variable m_cv;
-    // To avoid spurious cond var wake up
-    bool m_is_busy = false;
-    // To sleep until condition variable wakes up
-    std::mutex m_mutex;
+    cv::gapi::ie::detail::ParamDesc     params;
+    cv::gimpl::ie::wrap::Plugin         this_plugin;
+    InferenceEngine::ExecutableNetwork  this_network;
 };
 
+class RequestPool;
+
 class GIEExecutable final: public GIslandExecutable
 {
     const ade::Graph &m_g;
@@ -82,8 +53,8 @@ class GIEExecutable final: public GIslandExecutable
     // List of all resources in graph (both internal and external)
     std::vector<ade::NodeHandle> m_dataNodes;
 
-    // Sync primitive
-    SyncPrim m_sync;
+    // To manage multiple async requests
+    std::unique_ptr<RequestPool> m_reqPool;
 
 public:
     GIEExecutable(const ade::Graph                   &graph,
index c1de2e6..3715f44 100644 (file)
@@ -7,7 +7,6 @@
 #include "precomp.hpp"
 
 #include <memory> // make_shared
-#include <iostream>
 
 #include <ade/util/zip_range.hpp>
 
@@ -583,10 +582,16 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
     std::vector< std::vector<Q*> > &m_out_queues;
     std::shared_ptr<cv::gimpl::GIslandExecutable> m_island;
 
+    // NB: StreamingOutput have to be thread-safe.
+    // Now synchronization approach is quite poor and inefficient.
+    mutable std::mutex m_mutex;
+
     // Allocate a new data object for output under idx
     // Prepare this object for posting
     virtual cv::GRunArgP get(int idx) override
     {
+        std::lock_guard<std::mutex> lock{m_mutex};
+
         using MatType = cv::Mat;
         using SclType = cv::Scalar;
 
@@ -663,6 +668,8 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
     }
     virtual void post(cv::GRunArgP&& argp) override
     {
+        std::lock_guard<std::mutex> lock{m_mutex};
+
         // Mark the output ready for posting. If it is the first in the line,
         // actually post it and all its successors which are ready for posting too.
         auto it = m_postIdx.find(cv::gimpl::proto::ptr(argp));
@@ -700,6 +707,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
     }
     virtual void post(cv::gimpl::EndOfStream&&) override
     {
+        std::lock_guard<std::mutex> lock{m_mutex};
         // If the posting list is empty, just broadcast the stop message.
         // If it is not, enqueue the Stop message in the postings list.
         for (auto &&it : ade::util::indexed(m_postings))
@@ -725,6 +733,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
     }
     void meta(const cv::GRunArgP &out, const cv::GRunArg::Meta &m) override
     {
+        std::lock_guard<std::mutex> lock{m_mutex};
         const auto it = m_postIdx.find(cv::gimpl::proto::ptr(out));
         GAPI_Assert(it != m_postIdx.end());
 
@@ -747,6 +756,7 @@ public:
 
     bool done() const
     {
+        std::lock_guard<std::mutex> lock{m_mutex};
         // The streaming actor work is considered DONE for this stream
         // when it posted/resent all STOP messages to all its outputs.
         return m_stops_sent == desc().size();
index 93571e5..a996430 100644 (file)
@@ -953,6 +953,16 @@ TEST_F(ROIListNV12, Infer2MediaInputNV12)
     validate();
 }
 
+TEST(Infer, SetInvalidNumberOfRequests)
+{
+    using AGInfo = std::tuple<cv::GMat, cv::GMat>;
+    G_API_NET(AgeGender, <AGInfo(cv::GMat)>, "test-age-gender");
+
+    cv::gapi::ie::Params<AgeGender> pp{"model", "weights", "device"};
+
+    EXPECT_ANY_THROW(pp.cfgNumRequests(0u));
+}
+
 TEST(Infer, TestStreamingInfer)
 {
     initTestDataPath();
@@ -980,7 +990,8 @@ TEST(Infer, TestStreamingInfer)
 
     auto pp = cv::gapi::ie::Params<AgeGender> {
         params.model_path, params.weights_path, params.device_id
-    }.cfgOutputLayers({ "age_conv3", "prob" });
+    }.cfgOutputLayers({ "age_conv3", "prob" })
+     .cfgNumRequests(4u);
 
 
     std::size_t num_frames = 0u;
@@ -1049,7 +1060,8 @@ TEST(InferROI, TestStreamingInfer)
 
     auto pp = cv::gapi::ie::Params<AgeGender> {
         params.model_path, params.weights_path, params.device_id
-    }.cfgOutputLayers({ "age_conv3", "prob" });
+    }.cfgOutputLayers({ "age_conv3", "prob" })
+     .cfgNumRequests(4u);
 
 
     std::size_t num_frames = 0u;
@@ -1131,7 +1143,8 @@ TEST(InferList, TestStreamingInfer)
 
     auto pp = cv::gapi::ie::Params<AgeGender> {
         params.model_path, params.weights_path, params.device_id
-    }.cfgOutputLayers({ "age_conv3", "prob" });
+    }.cfgOutputLayers({ "age_conv3", "prob" })
+     .cfgNumRequests(4u);
 
 
     std::size_t num_frames = 0u;
@@ -1222,8 +1235,8 @@ TEST(Infer2, TestStreamingInfer)
 
     auto pp = cv::gapi::ie::Params<AgeGender> {
         params.model_path, params.weights_path, params.device_id
-    }.cfgOutputLayers({ "age_conv3", "prob" });
-
+    }.cfgOutputLayers({ "age_conv3", "prob" })
+     .cfgNumRequests(4u);
 
     std::size_t num_frames = 0u;
     std::size_t max_frames = 10u;
@@ -1311,8 +1324,8 @@ TEST(InferEmptyList, TestStreamingInfer)
 
     auto pp = cv::gapi::ie::Params<AgeGender> {
         params.model_path, params.weights_path, params.device_id
-    }.cfgOutputLayers({ "age_conv3", "prob" });
-
+    }.cfgOutputLayers({ "age_conv3", "prob" })
+     .cfgNumRequests(4u);
 
     std::size_t num_frames = 0u;
     std::size_t max_frames = 1u;
@@ -1366,8 +1379,8 @@ TEST(Infer2EmptyList, TestStreamingInfer)
 
     auto pp = cv::gapi::ie::Params<AgeGender> {
         params.model_path, params.weights_path, params.device_id
-    }.cfgOutputLayers({ "age_conv3", "prob" });
-
+    }.cfgOutputLayers({ "age_conv3", "prob" })
+     .cfgNumRequests(4u);
 
     std::size_t num_frames = 0u;
     std::size_t max_frames = 1u;