Merge pull request #21660 from TolyaTalamanov:at/handle-exception-in-streamingexecutor
authorAnatoliy Talamanov <anatoliy.talamanov@intel.com>
Fri, 25 Mar 2022 08:19:53 +0000 (11:19 +0300)
committerGitHub <noreply@github.com>
Fri, 25 Mar 2022 08:19:53 +0000 (08:19 +0000)
[G-API] Handle exceptions in streaming executor

* Handle exceptions in streaming executor

* Rethrow exception in non-streaming executor

* Clean up

* Put more tests

* Handle exceptions in IE backend

* Handle exception in IE callbacks

* Handle exception in GExecutor

* Handle all exceptions in IE backend

* Not only (std::exception& e)

* Fix comments to review

* Handle input exception in generic way

* Fix comment

* Clean up

* Apply review comments

* Put more comments
* Fix alignment
* Move test outside of HAVE_NGRAPH

* Fix compilation

modules/gapi/include/opencv2/gapi/cpu/gcpukernel.hpp
modules/gapi/src/backends/ie/giebackend.cpp
modules/gapi/src/backends/streaming/gstreamingbackend.cpp
modules/gapi/src/compiler/gislandmodel.cpp
modules/gapi/src/compiler/gislandmodel.hpp
modules/gapi/src/executor/gexecutor.cpp
modules/gapi/src/executor/gstreamingexecutor.cpp
modules/gapi/src/executor/gstreamingexecutor.hpp
modules/gapi/test/infer/gapi_infer_ie_test.cpp
modules/gapi/test/streaming/gapi_streaming_tests.cpp

index 48909a8..ff3ee45 100644 (file)
@@ -2,12 +2,17 @@
 // It is subject to the license terms in the LICENSE file found in the top-level directory
 // of this distribution and at http://opencv.org/license.html.
 //
-// Copyright (C) 2018-2020 Intel Corporation
+// Copyright (C) 2018-2022 Intel Corporation
 
 
 #ifndef OPENCV_GAPI_GCPUKERNEL_HPP
 #define OPENCV_GAPI_GCPUKERNEL_HPP
 
+#ifdef _MSC_VER
+#pragma warning(disable: 4702)  // "Unreachable code"
+// on postprocess(...) call inside OCVCallHelper
+#endif
+
 #include <functional>
 #include <unordered_map>
 #include <utility>
index 711827d..52c60c1 100644 (file)
@@ -389,10 +389,13 @@ public:
     const IEUnit                          &uu;
     cv::gimpl::GIslandExecutable::IOutput &out;
 
-    // NB: Need to gurantee that MediaFrame::View don't die until request is over.
+    // NB: Need to gurantee that MediaFrame::View doesn't die until request is over.
     using Views = std::vector<std::unique_ptr<cv::MediaFrame::View>>;
     Views views;
 
+    // To store exception appeared in callback.
+    std::exception_ptr eptr;
+
 private:
     cv::detail::VectorRef& outVecRef(std::size_t idx);
 
@@ -656,7 +659,7 @@ std::vector<InferenceEngine::InferRequest> cv::gimpl::ie::IECompiled::createInfe
 class cv::gimpl::ie::RequestPool {
 public:
     using RunF      = std::function<void(InferenceEngine::InferRequest&)>;
-    using CallbackF = std::function<void(InferenceEngine::InferRequest&)>;
+    using CallbackF = std::function<void(InferenceEngine::InferRequest&, InferenceEngine::StatusCode)>;
 
     // NB: The task is represented by:
     // RunF      - function which is set blobs and run async inference.
@@ -675,7 +678,7 @@ private:
     void callback(Task task,
                   size_t id,
                   IE::InferRequest request,
-                  IE::StatusCode code);
+                  IE::StatusCode code) noexcept;
     void setup();
 
     QueueClass<size_t>                         m_idle_ids;
@@ -706,32 +709,28 @@ void cv::gimpl::ie::RequestPool::execute(cv::gimpl::ie::RequestPool::Task&& t) {
             static_cast<callback_t>(
                 std::bind(&cv::gimpl::ie::RequestPool::callback, this,
                           t, id, _1, _2)));
-    t.run(request);
+    // NB: InferRequest is already marked as busy
+    // in case of exception need to return it back to the idle.
+    try {
+        t.run(request);
+    } catch (...) {
+        request.SetCompletionCallback([](){});
+        m_idle_ids.push(id);
+        throw;
+    }
 }
 
 void cv::gimpl::ie::RequestPool::callback(cv::gimpl::ie::RequestPool::Task task,
                                           size_t id,
                                           IE::InferRequest request,
-                                          IE::StatusCode code) {
-    // FIXME: Any exception which is arrised here must not leave this callback,
-    // because it won't be handled.
-    try {
-        if (code != IE::StatusCode::OK) {
-            throw std::logic_error("IE::InferRequest finished with not OK status");
-        }
-        task.callback(request);
-        // NB: IE::InferRequest keeps the callback until the new one is set.
-        // Since user's callback might keep resources that should be released,
-        // need to destroy its after execution.
-        // Let's set the empty one to cause the destruction of a callback.
-        request.SetCompletionCallback([](){});
-        m_idle_ids.push(id);
-    } catch (const std::exception& e) {
-        GAPI_LOG_FATAL(NULL, "Callback failed with error: " << e.what());
-        //FIXME: Exception CAN't be rethrown here, since this callback works
-        // in separate IE thread and such scenarios aren't handled properly in
-        // G-API so far.
-    }
+                                          IE::StatusCode code) noexcept {
+    // NB: Inference is over.
+    // 1. Run callback
+    // 2. Destroy callback to free resources.
+    // 3. Mark InferRequest as idle.
+    task.callback(request, code);
+    request.SetCompletionCallback([](){});
+    m_idle_ids.push(id);
 }
 
 // NB: Not thread-safe.
@@ -786,18 +785,19 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput  &in
     //     1. Collect island inputs/outputs.
     //     2. Create kernel context. (Every kernel has his own context).
     //     3. If the EndOfStream message is recieved, wait until all passed task are done.
-    //     4.
+    //     4. If the Exception message is revieved, propagate it further.
+    //     5.
     //        5.1 Run the kernel.
     //        5.2 Kernel wait for all nececcary infer requests and start asynchronous execution.
     //        5.3 After the kernel is finished continue processing next frame.
     //
-    //     5. If graph is compiled in non-streaming mode, wait until all tasks are done.
+    //     6. If graph is compiled in non-streaming mode, wait until all tasks are done.
 
     std::vector<InObj>  input_objs;
     std::vector<OutObj> output_objs;
 
-    const auto &in_desc  = in.desc();
-    const auto  in_msg   = in.get();
+    const auto &in_desc = in.desc();
+          auto  in_msg  = in.get();
 
     if (cv::util::holds_alternative<cv::gimpl::EndOfStream>(in_msg))
     {
@@ -835,10 +835,20 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput  &in
 
     const auto &kk = giem.metadata(this_nh).get<IECallable>();
 
-    // (4) Run the kernel.
-    kk.run(ctx, *m_reqPool);
+    // (5) Run the kernel.
+    try {
+        kk.run(ctx, *m_reqPool);
+    } catch (...) {
+        auto eptr = std::current_exception();
+        for (auto i : ade::util::iota(ctx->uu.params.num_out))
+        {
+            auto output = ctx->output(i);
+            ctx->out.post(std::move(output), eptr);
+        }
+        return;
+    }
 
-    // (5) In non-streaming mode need to wait until the all tasks are done
+    // (6) In non-streaming mode need to wait until the all tasks are done
     // FIXME: Is there more graceful way to handle this case ?
     if (!m_gm.metadata().contains<Streaming>()) {
         m_reqPool->waitAll();
@@ -944,19 +954,26 @@ static IE::PreProcessInfo configurePreProcInfo(const IE::InputInfo::CPtr& ii,
 
 // NB: This is a callback used by async infer
 // to post outputs blobs (cv::GMat's).
-static void PostOutputs(InferenceEngine::InferRequest   &request,
-                        std::shared_ptr<IECallContext>   ctx) {
+static void PostOutputs(InferenceEngine::InferRequest &request,
+                        InferenceEngine::StatusCode    code,
+                        std::shared_ptr<IECallContext> ctx) {
     GAPI_ITT_STATIC_LOCAL_HANDLE(ie_cb_post_outputs_hndl, "IE_async_callback_PostOutputs");
     GAPI_ITT_AUTO_TRACE_GUARD(ie_cb_post_outputs_hndl);
 
-    for (auto i : ade::util::iota(ctx->uu.params.num_out))
-    {
+    if (code != IE::StatusCode::OK) {
+        std::stringstream ss;
+        ss << "InferRequest for model: " << ctx->uu.params.model_path
+           << " finished with InferenceEngine::StatusCode: " << static_cast<int>(code);
+        ctx->eptr = std::make_exception_ptr(std::logic_error(ss.str()));
+    }
+
+    for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
         auto& out_mat = ctx->outMatR(i);
         IE::Blob::Ptr this_blob = request.GetBlob(ctx->uu.params.output_names[i]);
         copyFromIE(this_blob, out_mat);
         auto output = ctx->output(i);
         ctx->out.meta(output, ctx->input(0).meta);
-        ctx->out.post(std::move(output));
+        ctx->out.post(std::move(output), ctx->eptr);
     }
 }
 
@@ -966,7 +983,9 @@ public:
                     std::shared_ptr<IECallContext> ctx,
                     std::vector<std::vector<int>>&& cached_dims);
 
-    void operator()(InferenceEngine::InferRequest &request, size_t pos) const;
+    void operator()(InferenceEngine::InferRequest &request,
+                    InferenceEngine::StatusCode    code,
+                    size_t                         pos) const;
 
 private:
     struct Priv {
@@ -987,20 +1006,30 @@ PostOutputsList::PostOutputsList(size_t size,
     m_priv->cached_dims = std::move(cached_dims);
 }
 
-void PostOutputsList::operator()(InferenceEngine::InferRequest &req, size_t pos) const {
+void PostOutputsList::operator()(InferenceEngine::InferRequest &req,
+                                 InferenceEngine::StatusCode    code,
+                                 size_t                         pos) const {
     auto&& ctx         = m_priv->ctx;
     auto&& cached_dims = m_priv->cached_dims;
     auto&& finished    = m_priv->finished;
     auto&& size        = m_priv->size;
-    for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
-        std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
 
-        IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]);
-        GAPI_Assert(out_blob);
+    if (code != IE::StatusCode::OK) {
+        ctx->eptr = std::make_exception_ptr(
+               std::logic_error("IE::InferRequest finished with not OK status"));
+    }
+
+    if (!ctx->eptr) {
+        for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
+            std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
 
-        // FIXME: Avoid data copy. Not sure if it is possible though
-        out_vec[pos].create(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
-        copyFromIE(out_blob, out_vec[pos]);
+            IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]);
+            GAPI_Assert(out_blob);
+
+            // FIXME: Avoid data copy. Not sure if it is possible though
+            out_vec[pos].create(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
+            copyFromIE(out_blob, out_vec[pos]);
+        }
     }
     ++finished;
 
@@ -1008,7 +1037,7 @@ void PostOutputsList::operator()(InferenceEngine::InferRequest &req, size_t pos)
         for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
             auto output = ctx->output(i);
             ctx->out.meta(output, ctx->input(0).meta);
-            ctx->out.post(std::move(output));
+            ctx->out.post(std::move(output), ctx->eptr);
         }
     }
 }
@@ -1123,7 +1152,7 @@ struct Infer: public cv::detail::KernelTag {
                         // What about to do that in RequestPool ?
                         req.StartAsync();
                     },
-                    std::bind(PostOutputs, _1, ctx)
+                    std::bind(PostOutputs, _1, _2, ctx)
                 }
         );
     }
@@ -1218,7 +1247,7 @@ struct InferROI: public cv::detail::KernelTag {
                         // What about to do that in RequestPool ?
                         req.StartAsync();
                     },
-                    std::bind(PostOutputs, _1, ctx)
+                    std::bind(PostOutputs, _1, _2, ctx)
                 }
         );
     }
@@ -1294,7 +1323,6 @@ struct InferList: public cv::detail::KernelTag {
 
     static void run(std::shared_ptr<IECallContext>  ctx,
                     cv::gimpl::ie::RequestPool     &reqPool) {
-
         const auto& in_roi_vec = ctx->inArg<cv::detail::VectorRef>(0u).rref<cv::Rect>();
         // NB: In case there is no input data need to post output anyway
         if (in_roi_vec.empty()) {
@@ -1335,7 +1363,7 @@ struct InferList: public cv::detail::KernelTag {
                         setROIBlob(req, ctx->uu.params.input_names[0u], this_blob, rc, *ctx);
                         req.StartAsync();
                     },
-                    std::bind(callback, std::placeholders::_1, pos)
+                    std::bind(callback, std::placeholders::_1, std::placeholders::_2, pos)
                 }
             );
         }
@@ -1506,7 +1534,7 @@ struct InferList2: public cv::detail::KernelTag {
                         }
                         req.StartAsync();
                     },
-                    std::bind(callback, std::placeholders::_1, list_idx)
+                    std::bind(callback, std::placeholders::_1, std::placeholders::_2, list_idx)
                 } // task
             );
         } // for
index 4bd2a10..69b5f6c 100644 (file)
@@ -172,6 +172,7 @@ void Copy::Actor::run(cv::gimpl::GIslandExecutable::IInput  &in,
         return;
     }
 
+    GAPI_DbgAssert(cv::util::holds_alternative<cv::GRunArgs>(in_msg));
     const cv::GRunArgs &in_args = cv::util::get<cv::GRunArgs>(in_msg);
     GAPI_Assert(in_args.size() == 1u);
 
@@ -212,6 +213,7 @@ public:
             return;
         }
 
+        GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(in_msg));
         const cv::GRunArgs &in_args = cv::util::get<cv::GRunArgs>(in_msg);
         GAPI_Assert(in_args.size() == 1u);
         auto frame = cv::util::get<cv::MediaFrame>(in_args[0]);
index 1a8e093..920fd70 100644 (file)
@@ -412,7 +412,17 @@ void GIslandExecutable::run(GIslandExecutable::IInput &in, GIslandExecutable::IO
         out_objs.emplace_back(ade::util::value(it),
                               out.get(ade::util::checked_cast<int>(ade::util::index(it))));
     }
-    run(std::move(in_objs), std::move(out_objs));
+
+    try {
+        run(std::move(in_objs), std::move(out_objs));
+    } catch (...) {
+        auto eptr = std::current_exception();
+        for (auto &&it: out_objs)
+        {
+            out.post(std::move(it.second), eptr);
+        }
+        return;
+    }
 
     // Propagate in-graph meta down to the graph
     // Note: this is not a complete implementation! Mainly this is a stub
index 063504a..565b3c4 100644 (file)
@@ -161,7 +161,12 @@ public:
     const std::vector<cv::gimpl::RcDesc> &desc() const   { return d; }
 };
 struct EndOfStream {};
-using StreamMsg = cv::util::variant<EndOfStream, cv::GRunArgs>;
+
+struct Exception {
+    std::exception_ptr eptr;
+};
+
+using StreamMsg = cv::util::variant<EndOfStream, cv::GRunArgs, Exception>;
 struct GIslandExecutable::IInput: public GIslandExecutable::IODesc {
     virtual ~IInput() = default;
     virtual StreamMsg get() = 0;     // Get a new input vector (blocking)
@@ -169,9 +174,11 @@ struct GIslandExecutable::IInput: public GIslandExecutable::IODesc {
 };
 struct GIslandExecutable::IOutput: public GIslandExecutable::IODesc {
     virtual ~IOutput() = default;
-    virtual GRunArgP get(int idx) = 0;  // Allocate (wrap) a new data object for output idx
-    virtual void post(GRunArgP&&) = 0;  // Release the object back to the framework (mark available)
-    virtual void post(EndOfStream&&) = 0; // Post end-of-stream marker back to the framework
+    virtual GRunArgP get(int idx) = 0;                                 // Allocate (wrap) a new data object for output idx
+    virtual void post(GRunArgP&&, const std::exception_ptr& = {}) = 0; // Release the object back to the framework (mark available)
+    virtual void post(EndOfStream&&) = 0;                              // Post end-of-stream marker back to the framework
+    virtual void post(Exception&&) = 0;
+
 
     // Assign accumulated metadata to the given output object.
     // This method can only be called after get() and before post().
index 6c15d1d..b7b0b5c 100644 (file)
@@ -270,6 +270,7 @@ class cv::gimpl::GExecutor::Output final: public cv::gimpl::GIslandExecutable::I
 {
     cv::gimpl::Mag &mag;
     std::unordered_map<const void*, int> out_idx;
+    std::exception_ptr eptr;
 
     GRunArgP get(int idx) override
     {
@@ -278,8 +279,18 @@ class cv::gimpl::GExecutor::Output final: public cv::gimpl::GIslandExecutable::I
         out_idx[cv::gimpl::proto::ptr(r)] = idx;
         return r;
     }
-    void post(GRunArgP&&) override { } // Do nothing here
+    void post(GRunArgP&&, const std::exception_ptr& e) override
+    {
+        if (e)
+        {
+            eptr = e;
+        }
+    }
     void post(EndOfStream&&) override {} // Do nothing here too
+    void post(Exception&& ex) override
+    {
+        eptr = std::move(ex.eptr);
+    }
     void meta(const GRunArgP &out, const GRunArg::Meta &m) override
     {
         const auto idx = out_idx.at(cv::gimpl::proto::ptr(out));
@@ -291,6 +302,14 @@ public:
     {
         set(rcs);
     }
+
+    void verify()
+    {
+        if (eptr)
+        {
+            std::rethrow_exception(eptr);
+        }
+    }
 };
 
 void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args)
@@ -389,6 +408,8 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args)
         Input i{m_res, op.in_objects};
         Output o{m_res, op.out_objects};
         op.isl_exec->run(i, o);
+        // NB: Check if execution finished without exception.
+        o.verify();
     }
 
     // (7)
index a3a2746..34424cb 100644 (file)
@@ -31,6 +31,8 @@
 #include <opencv2/gapi/streaming/meta.hpp>
 #include <opencv2/gapi/streaming/sync.hpp>
 
+#include <opencv2/gapi/util/variant.hpp>
+
 namespace
 {
 using namespace cv::gimpl::stream;
@@ -310,14 +312,13 @@ class QueueReader
                       const std::size_t  this_id);
 
 public:
-    bool getInputVector  (std::vector<Q*>   &in_queues,
-                          cv::GRunArgs      &in_constants,
-                          cv::GRunArgs      &isl_inputs);
-
-    bool getResultsVector(std::vector<Q*>         &in_queues,
-                          const std::vector<int>  &in_mapping,
-                          const std::size_t        out_size,
-                          cv::GRunArgs            &out_results);
+    cv::gimpl::StreamMsg getInputVector  (std::vector<Q*>   &in_queues,
+                                          cv::GRunArgs      &in_constants);
+
+    using V = cv::util::variant<cv::GRunArgs, Stop, cv::gimpl::Exception>;
+    V getResultsVector(std::vector<Q*>         &in_queues,
+                       const std::vector<int>  &in_mapping,
+                       const std::size_t        out_size);
 };
 
 void rewindToStop(std::vector<Q*> &in_queues,
@@ -369,9 +370,8 @@ void QueueReader::rewindToStop(std::vector<Q*>   &in_queues,
     ::rewindToStop(in_queues, this_id);
 }
 
-bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
-                                 cv::GRunArgs    &in_constants,
-                                 cv::GRunArgs    &isl_inputs)
+cv::gimpl::StreamMsg QueueReader::getInputVector(std::vector<Q*> &in_queues,
+                                                 cv::GRunArgs    &in_constants)
 {
     // NB: Need to release resources from the previous step, to fetch new ones.
     // On some systems it might be impossible to allocate new memory
@@ -381,72 +381,98 @@ bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
     // lifetime, keep the whole cmd vector (of size == # of inputs)
     // in memory.
     m_cmd.resize(in_queues.size());
-    isl_inputs.resize(in_queues.size());
+    cv::GRunArgs isl_inputs(in_queues.size());
 
+    cv::optional<cv::gimpl::Exception> exception;
     for (auto &&it : ade::util::indexed(in_queues))
     {
-        auto id = ade::util::index(it);
-        auto &q = ade::util::value(it);
-
-        if (q == nullptr)
-        {
-            GAPI_Assert(!in_constants.empty());
-            // NULL queue means a graph-constant value (like a
-            // value-initialized scalar)
-            // It can also hold a constant value received with
-            // Stop::Kind::CNST message (see above).
-            isl_inputs[id] = in_constants[id];
-            continue;
-        }
-
-        q->pop(m_cmd[id]);
-        if (!cv::util::holds_alternative<Stop>(m_cmd[id]))
-        {
-            isl_inputs[id] = cv::util::get<cv::GRunArg>(m_cmd[id]);
-        }
-        else // A Stop sign
-        {
-            const auto &stop = cv::util::get<Stop>(m_cmd[id]);
-            if (stop.kind == Stop::Kind::CNST)
-            {
-                // We've got a Stop signal from a const source,
-                // propagated as a result of real stream reaching its
-                // end.  Sometimes these signals come earlier than
-                // real EOS Stops so are deprioritized -- just
-                // remember the Const value here and continue
-                // processing other queues. Set queue pointer to
-                // nullptr and update the const_val vector
-                // appropriately
-                m_finishing = true;
-                in_queues[id] = nullptr;
-                in_constants.resize(in_queues.size());
-                in_constants[id] = std::move(stop.cdata);
-
-                // NEXT time (on a next call to getInputVector()), the
-                // "q==nullptr" check above will be triggered, but now
-                // we need to make it manually:
-                isl_inputs[id] = in_constants[id];
-            }
-            else
-            {
-                GAPI_Assert(stop.kind == Stop::Kind::HARD);
-                rewindToStop(in_queues, id);
-                // After queues are read to the proper indicator,
-                // indicate end-of-stream
-                return false;
-            } // if(Cnst)
-        } // if(Stop)
+       auto id = ade::util::index(it);
+       auto &q = ade::util::value(it);
+
+       if (q == nullptr)
+       {
+           GAPI_Assert(!in_constants.empty());
+           // NULL queue means a graph-constant value (like a
+           // value-initialized scalar)
+           // It can also hold a constant value received with
+           // Stop::Kind::CNST message (see above).
+           isl_inputs[id] = in_constants[id];
+           continue;
+       }
+
+       q->pop(m_cmd[id]);
+       switch (m_cmd[id].index())
+       {
+           case Cmd::index_of<cv::GRunArg>():
+               isl_inputs[id] = cv::util::get<cv::GRunArg>(m_cmd[id]);
+               break;
+           case Cmd::index_of<Stop>():
+           {
+               const auto &stop = cv::util::get<Stop>(m_cmd[id]);
+               if (stop.kind == Stop::Kind::CNST)
+               {
+                   // We've got a Stop signal from a const source,
+                   // propagated as a result of real stream reaching its
+                   // end.  Sometimes these signals come earlier than
+                   // real EOS Stops so are deprioritized -- just
+                   // remember the Const value here and continue
+                   // processing other queues. Set queue pointer to
+                   // nullptr and update the const_val vector
+                   // appropriately
+                   m_finishing = true;
+                   in_queues[id] = nullptr;
+                   in_constants.resize(in_queues.size());
+                   in_constants[id] = std::move(stop.cdata);
+
+                   // NEXT time (on a next call to getInputVector()), the
+                   // "q==nullptr" check above will be triggered, but now
+                   // we need to make it manually:
+                   isl_inputs[id] = in_constants[id];
+               }
+               else
+               {
+                   GAPI_Assert(stop.kind == Stop::Kind::HARD);
+                   rewindToStop(in_queues, id);
+                   // After queues are read to the proper indicator,
+                   // indicate end-of-stream
+                   return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}};
+              } // if(Cnst)
+              break;
+          }
+          case Cmd::index_of<cv::gimpl::Exception>():
+          {
+              exception =
+                  cv::util::make_optional(cv::util::get<cv::gimpl::Exception>(m_cmd[id]));
+              break;
+          }
+          default:
+              GAPI_Assert(false && "Unsupported cmd type in getInputVector()");
+       }
     } // for(in_queues)
 
+    if (exception.has_value()) {
+        return cv::gimpl::StreamMsg{exception.value()};
+    }
+
     if (m_finishing)
     {
         // If the process is about to end (a soft Stop was received
         // already) and an island has no other inputs than constant
         // inputs, its queues may all become nullptrs. Indicate it as
         // "no data".
-        return !ade::util::all_of(in_queues, [](Q *ptr){return ptr == nullptr;});
+        if (ade::util::all_of(in_queues, [](Q *ptr){return ptr == nullptr;})) {
+            return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}};
+        }
+    }
+    // A regular case - there is data to process
+    for (auto& arg : isl_inputs) {
+        if (arg.index() == cv::GRunArg::index_of<cv::Mat>()) {
+            arg = cv::GRunArg{ cv::make_rmat<cv::gimpl::RMatOnMat>(cv::util::get<cv::Mat>(arg))
+                             , arg.meta
+                             };
+        }
     }
-    return true; // A regular case - there is data to process.
+    return cv::gimpl::StreamMsg{std::move(isl_inputs)};
 }
 
 // This is a special method to obtain a result vector
@@ -474,33 +500,47 @@ bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
 // (_may be_ partially filled) to the same final output queue.
 // The receiver part at the GStreamingExecutor level won't change
 // because of that.
-bool QueueReader::getResultsVector(std::vector<Q*>   &in_queues,
-                                   const std::vector<int>  &in_mapping,
-                                   const std::size_t  out_size,
-                                   cv::GRunArgs      &out_results)
+
+QueueReader::V QueueReader::getResultsVector(std::vector<Q*>        &in_queues,
+                                             const std::vector<int> &in_mapping,
+                                             const std::size_t      out_size)
 {
+    cv::GRunArgs out_results(out_size);
     m_cmd.resize(out_size);
+    cv::optional<cv::gimpl::Exception> exception;
     for (auto &&it : ade::util::indexed(in_queues))
     {
         auto ii = ade::util::index(it);
         auto oi = in_mapping[ii];
         auto &q = ade::util::value(it);
         q->pop(m_cmd[oi]);
-        if (!cv::util::holds_alternative<Stop>(m_cmd[oi]))
-        {
-            out_results[oi] = std::move(cv::util::get<cv::GRunArg>(m_cmd[oi]));
-        }
-        else // A Stop sign
-        {
-            // In theory, the CNST should never reach here.
-            // Collector thread never handles the inputs directly
-            // (collector's input queues are always produced by
-            // islands in the graph).
-            rewindToStop(in_queues, ii);
-            return false;
-        } // if(Stop)
+
+        switch (m_cmd[oi].index()) {
+            case Cmd::index_of<cv::GRunArg>():
+                out_results[oi] = std::move(cv::util::get<cv::GRunArg>(m_cmd[oi]));
+                break;
+            case Cmd::index_of<Stop>():
+                // In theory, the CNST should never reach here.
+                // Collector thread never handles the inputs directly
+                // (collector's input queues are always produced by
+                // islands in the graph).
+                rewindToStop(in_queues, ii);
+                return QueueReader::V(Stop{});
+            case Cmd::index_of<cv::gimpl::Exception>():
+                exception =
+                    cv::util::make_optional(cv::util::get<cv::gimpl::Exception>(m_cmd[oi]));
+                break;
+            default:
+                cv::util::throw_error(
+                        std::logic_error("Unexpected cmd kind in getResultsVector"));
+        } // switch
     } // for(in_queues)
-    return true;
+
+    if (exception.has_value()) {
+        return QueueReader::V(exception.value());
+    }
+
+    return QueueReader::V(out_results);
 }
 
 
@@ -521,7 +561,9 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
                 || cv::util::holds_alternative<Stop>(cmd));
     if (cv::util::holds_alternative<Stop>(cmd))
     {
-        for (auto &&oq : out_queues) oq->push(cmd);
+        for (auto &&oq : out_queues) {
+            oq->push(cmd);
+        }
         return;
     }
 
@@ -547,10 +589,21 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
         // Try to obtain next data chunk from the source
         cv::GRunArg data;
 
-        const bool result = [&](){
-            GAPI_ITT_AUTO_TRACE_GUARD(emitter_pull_hndl);
-            return emitter->pull(data);
-        }();
+        bool result = false;
+        try {
+            result = [&](){
+                GAPI_ITT_AUTO_TRACE_GUARD(emitter_pull_hndl);
+                return emitter->pull(data);
+            }();
+        } catch (...) {
+           auto eptr = std::current_exception();
+           for (auto &&oq : out_queues)
+           {
+               oq->push(Cmd{cv::gimpl::Exception{eptr}});
+           }
+           // NB: Go to the next iteration.
+           continue;
+       }
 
         if (result)
         {
@@ -673,28 +726,8 @@ class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput
     std::vector<Q*> &in_queues; // FIXME: This can be part of QueueReader
     cv::GRunArgs &in_constants; // FIXME: This can be part of QueueReader
 
-    virtual cv::gimpl::StreamMsg get() override
-    {
-        GAPI_ITT_STATIC_LOCAL_HANDLE(inputs_get_hndl, "StreamingInput::get");
-        GAPI_ITT_AUTO_TRACE_GUARD(inputs_get_hndl);
-
-        cv::GRunArgs isl_input_args;
+    cv::optional<cv::gimpl::StreamMsg> last_read_msg;
 
-        if (!qr.getInputVector(in_queues, in_constants, isl_input_args))
-        {
-            // Stop case
-            return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}};
-        }
-        // Wrap all input cv::Mats with RMats
-        for (auto& arg : isl_input_args) {
-            if (arg.index() == cv::GRunArg::index_of<cv::Mat>()) {
-                arg = cv::GRunArg{ cv::make_rmat<cv::gimpl::RMatOnMat>(cv::util::get<cv::Mat>(arg))
-                                 , arg.meta
-                                 };
-            }
-        }
-        return cv::gimpl::StreamMsg{std::move(isl_input_args)};
-    }
     virtual cv::gimpl::StreamMsg try_get() override
     {
         // FIXME: This is not very usable at the moment!
@@ -709,17 +742,43 @@ class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput
     {
         set(in_descs);
     }
+
+    const cv::gimpl::StreamMsg& read()
+    {
+        GAPI_ITT_STATIC_LOCAL_HANDLE(inputs_get_hndl, "StreamingInput::read");
+        GAPI_ITT_AUTO_TRACE_GUARD(inputs_get_hndl);
+
+        last_read_msg =
+            cv::optional<cv::gimpl::StreamMsg>(
+                    qr.getInputVector(in_queues, in_constants));
+        return last_read_msg.value();
+    }
+
+    virtual cv::gimpl::StreamMsg get() override
+    {
+        GAPI_ITT_STATIC_LOCAL_HANDLE(inputs_get_hndl, "StreamingInput::get");
+        GAPI_ITT_AUTO_TRACE_GUARD(inputs_get_hndl);
+
+        if (!last_read_msg.has_value()) {
+            (void)read();
+        }
+        auto msg = std::move(last_read_msg.value());
+        last_read_msg = cv::optional<cv::gimpl::StreamMsg>();
+        return msg;
+    }
 };
 
 class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
 {
     // These objects form an internal state of the StreamingOutput
     struct Posting
-    {
-        using V = cv::util::variant<cv::GRunArg, cv::gimpl::EndOfStream>;
-        V data;
-        bool ready = false;
-    };
+   {
+       using V = cv::util::variant<cv::GRunArg,
+                                   cv::gimpl::EndOfStream,
+                                   cv::gimpl::Exception>;
+       V data;
+       bool ready = false;
+   };
     using PostingList = std::list<Posting>;
     std::vector<PostingList> m_postings;
     std::unordered_map< const void*
@@ -820,7 +879,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
         return ret_val;
     }
 
-    virtual void post(cv::GRunArgP&& argp) override
+    virtual void post(cv::GRunArgP&& argp, const std::exception_ptr& exptr) override
     {
         GAPI_ITT_STATIC_LOCAL_HANDLE(outputs_post_hndl, "StreamingOutput::post");
         GAPI_ITT_AUTO_TRACE_GUARD(outputs_post_hndl);
@@ -834,6 +893,9 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
         const int out_idx = it->second.first;
         const auto out_iter = it->second.second;
         out_iter->ready = true;
+        if (exptr) {
+            out_iter->data = cv::gimpl::Exception{exptr};
+        }
         m_postIdx.erase(it); // Drop the link from the cache anyway
         if (out_iter != m_postings[out_idx].begin())
         {
@@ -845,16 +907,22 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
         while (post_iter != m_postings[out_idx].end() && post_iter->ready == true)
         {
             Cmd cmd;
-            if (cv::util::holds_alternative<cv::GRunArg>(post_iter->data))
+            switch (post_iter->data.index())
             {
-                cmd = Cmd{cv::util::get<cv::GRunArg>(post_iter->data)};
-            }
-            else
-            {
-                GAPI_Assert(cv::util::holds_alternative<cv::gimpl::EndOfStream>(post_iter->data));
-                cmd = Cmd{Stop{}};
-                m_stops_sent++;
+                case Posting::V::index_of<cv::GRunArg>():
+                    cmd = Cmd{cv::util::get<cv::GRunArg>(post_iter->data)};
+                    break;
+                case Posting::V::index_of<cv::gimpl::Exception>():
+                    cmd = Cmd{cv::util::get<cv::gimpl::Exception>(post_iter->data)};
+                    break;
+                case Posting::V::index_of<cv::gimpl::EndOfStream>():
+                    cmd = Cmd{Stop{}};
+                    m_stops_sent++;
+                    break;
+                default:
+                    GAPI_Assert(false && "Unreachable code");
             }
+
             for (auto &&q : m_out_queues[out_idx])
             {
                 q->push(cmd);
@@ -889,6 +957,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
             }
         }
     }
+
     void meta(const cv::GRunArgP &out, const cv::GRunArg::Meta &m) override
     {
         std::lock_guard<std::mutex> lock{m_mutex};
@@ -919,6 +988,32 @@ public:
         // when it posted/resent all STOP messages to all its outputs.
         return m_stops_sent == desc().size();
     }
+
+    virtual void post(cv::gimpl::Exception&& error) override
+    {
+        std::lock_guard<std::mutex> lock{m_mutex};
+        // If the posting list is empty, just broadcast the stop message.
+        // If it is not, enqueue the Stop message in the postings list.
+        for (auto &&it : ade::util::indexed(m_postings))
+        {
+            const auto  idx = ade::util::index(it);
+                  auto &lst = ade::util::value(it);
+            if (lst.empty())
+            {
+                for (auto &&q : m_out_queues[idx])
+                {
+                    q->push(Cmd(std::move(error)));
+                }
+            }
+            else
+            {
+                Posting p;
+                p.data = Posting::V{std::move(error)};
+                p.ready = true;
+                lst.push_back(std::move(p)); // FIXME: For some reason {}-ctor didn't work here
+            }
+        }
+    }
 };
 
 // This thread is a plain dumb processing actor. What it do is just:
@@ -947,7 +1042,17 @@ void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs,
     while (!output.done())
     {
         GAPI_ITT_AUTO_TRACE_GUARD(island_hndl);
-        island_exec->run(input, output);
+        // NB: In case the input message is an cv::gimpl::Exception
+        // handle it in a general way.
+        if (cv::util::holds_alternative<cv::gimpl::Exception>(input.read()))
+        {
+            auto in_msg = input.get();
+            output.post(std::move(cv::util::get<cv::gimpl::Exception>(in_msg)));
+        }
+        else
+        {
+            island_exec->run(input, output);
+        }
     }
 }
 
@@ -984,26 +1089,33 @@ void collectorThread(std::vector<Q*>   in_queues,
     while (true)
     {
         GAPI_ITT_AUTO_TRACE_GUARD(collector_hndl);
-        cv::GRunArgs this_result(out_size);
 
-        const bool ok = [&](){
+        const auto result = [&](){
             GAPI_ITT_AUTO_TRACE_GUARD(collector_get_results_hndl);
-            return qr.getResultsVector(in_queues, in_mapping, out_size, this_result);
+            return qr.getResultsVector(in_queues, in_mapping, out_size);
         }();
 
-        if (!ok)
+        switch (result.index())
         {
-            if (handle_stop)
+            case QueueReader::V::index_of<cv::GRunArgs>():
             {
-                out_queue.push(Cmd{Stop{}});
+                GAPI_ITT_AUTO_TRACE_GUARD(collector_push_hndl);
+                auto this_result = cv::util::get<cv::GRunArgs>(result);
+                out_queue.push(Cmd{Result{std::move(this_result), flags}});
+                break;
             }
-            // Terminate the thread anyway
-            return;
-        }
-
-        {
-            GAPI_ITT_AUTO_TRACE_GUARD(collector_push_hndl);
-            out_queue.push(Cmd{Result{std::move(this_result), flags}});
+            case QueueReader::V::index_of<Stop>():
+                if (handle_stop)
+                {
+                    out_queue.push(Cmd{Stop{}});
+                }
+                // Terminate the thread anyway
+                return;
+            case QueueReader::V::index_of<cv::gimpl::Exception>():
+                out_queue.push(Cmd{cv::util::get<cv::gimpl::Exception>(result)});
+                break;
+            default:
+                GAPI_Assert(false && "Unreachable code");
         }
     }
 }
@@ -1707,16 +1819,24 @@ bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs)
 
     Cmd cmd;
     m_out_queue.pop(cmd);
-    if (cv::util::holds_alternative<Stop>(cmd))
-    {
-        wait_shutdown();
-        return false;
+    switch (cmd.index()) {
+        case Cmd::index_of<Stop>():
+            wait_shutdown();
+            return false;
+        case Cmd::index_of<Result>(): {
+            GAPI_Assert(cv::util::holds_alternative<Result>(cmd));
+            cv::GRunArgs &this_result = cv::util::get<Result>(cmd).args;
+            sync_data(this_result, outs);
+            return true;
+        }
+        case Cmd::index_of<Exception>(): {
+            std::rethrow_exception(cv::util::get<Exception>(cmd).eptr);
+            return true;
+        }
+        default:
+            GAPI_Assert(false && "Unsupported cmd type in pull");
     }
-
-    GAPI_Assert(cv::util::holds_alternative<Result>(cmd));
-    cv::GRunArgs &this_result = cv::util::get<Result>(cmd).args;
-    sync_data(this_result, outs);
-    return true;
+    GAPI_Assert(false && "Unreachable code");
 }
 
 bool cv::gimpl::GStreamingExecutor::pull(cv::GOptRunArgsP &&outs)
@@ -1734,15 +1854,20 @@ bool cv::gimpl::GStreamingExecutor::pull(cv::GOptRunArgsP &&outs)
 
     Cmd cmd;
     m_out_queue.pop(cmd);
-    if (cv::util::holds_alternative<Stop>(cmd))
-    {
-        wait_shutdown();
-        return false;
+    switch (cmd.index()) {
+        case Cmd::index_of<Stop>():
+            wait_shutdown();
+            return false;
+        case Cmd::index_of<Result>(): {
+            sync_data(cv::util::get<Result>(cmd), outs);
+            return true;
+        }
+        case Cmd::index_of<Exception>(): {
+            std::rethrow_exception(cv::util::get<Exception>(cmd).eptr);
+            return true;
+        }
     }
-
-    GAPI_Assert(cv::util::holds_alternative<Result>(cmd));
-    sync_data(cv::util::get<Result>(cmd), outs);
-    return true;
+    GAPI_Assert(false && "Unreachable code");
 }
 
 std::tuple<bool, cv::util::variant<cv::GRunArgs, cv::GOptRunArgs>> cv::gimpl::GStreamingExecutor::pull()
index b4aadcb..da27f6a 100644 (file)
@@ -50,11 +50,12 @@ struct Result {
 
 using Cmd = cv::util::variant
     < cv::util::monostate
-    , Start        // Tells emitters to start working. Not broadcasted to workers.
-    , Stop         // Tells emitters to stop working. Broadcasted to workers.
-    , cv::GRunArg  // Workers data payload to process.
-    , Result       // Pipeline's data for gout()
-    >;
+    , Start                // Tells emitters to start working. Not broadcasted to workers.
+    , Stop                 // Tells emitters to stop working. Broadcasted to workers.
+    , cv::GRunArg          // Workers data payload to process.
+    , Result               // Pipeline's data for gout()
+    , cv::gimpl::Exception // Exception which is thrown while execution.
+   >;
 
 // Interface over a queue. The underlying queue implementation may be
 // different. This class is mainly introduced to bring some
index 8dc23a3..3741438 100644 (file)
@@ -2915,6 +2915,47 @@ TEST(Infer, ModelWith2DInputs)
 
 #endif // HAVE_NGRAPH
 
+TEST(TestAgeGender, ThrowBlobAndInputPrecisionMismatchStreaming)
+{
+    const std::string device = "MYRIAD";
+    skipIfDeviceNotAvailable(device);
+
+    initDLDTDataPath();
+
+    cv::gapi::ie::detail::ParamDesc params;
+    // NB: Precision for inputs is U8.
+    params.model_path = compileAgeGenderBlob(device);
+    params.device_id = device;
+
+    // Configure & run G-API
+    using AGInfo = std::tuple<cv::GMat, cv::GMat>;
+    G_API_NET(AgeGender, <AGInfo(cv::GMat)>, "test-age-gender");
+
+    auto pp = cv::gapi::ie::Params<AgeGender> {
+        params.model_path, params.device_id
+    }.cfgOutputLayers({ "age_conv3", "prob" });
+
+    cv::GMat in, age, gender;
+    std::tie(age, gender) = cv::gapi::infer<AgeGender>(in);
+    auto pipeline = cv::GComputation(cv::GIn(in), cv::GOut(age, gender))
+        .compileStreaming(cv::compile_args(cv::gapi::networks(pp)));
+
+    cv::Mat in_mat(320, 240, CV_32FC3);
+    cv::randu(in_mat, 0, 1);
+    cv::Mat gapi_age, gapi_gender;
+
+    pipeline.setSource(cv::gin(in_mat));
+    pipeline.start();
+
+    // NB: Blob precision is U8, but user pass FP32 data, so exception will be thrown.
+    // Now exception comes directly from IE, but since G-API has information
+    // about data precision at the compile stage, consider the possibility of
+    // throwing exception from there.
+    for (int i = 0; i < 10; ++i) {
+        EXPECT_ANY_THROW(pipeline.pull(cv::gout(gapi_age, gapi_gender)));
+    }
+}
+
 } // namespace opencv_test
 
 #endif //  HAVE_INF_ENGINE
index 4d33d4b..ffa1d45 100644 (file)
@@ -304,6 +304,66 @@ void checkPullOverload(const cv::Mat& ref,
     EXPECT_EQ(0., cv::norm(ref, out_mat, cv::NORM_INF));
 }
 
+class InvalidSource : public cv::gapi::wip::IStreamSource {
+public:
+    InvalidSource(const size_t throw_every_nth_frame,
+                  const size_t num_frames)
+        : m_throw_every_nth_frame(throw_every_nth_frame),
+          m_curr_frame_id(0u),
+          m_num_frames(num_frames),
+          m_mat(1, 1, CV_8U) {
+    }
+
+    static std::string exception_msg()
+    {
+        return "InvalidSource sucessfuly failed!";
+    }
+
+    bool pull(cv::gapi::wip::Data& d) {
+        ++m_curr_frame_id;
+        if (m_curr_frame_id > m_num_frames) {
+            return false;
+        }
+
+        if (m_curr_frame_id % m_throw_every_nth_frame == 0) {
+            throw std::logic_error(InvalidSource::exception_msg());
+            return true;
+        } else {
+            d = cv::Mat(m_mat);
+        }
+
+        return true;
+    }
+
+    cv::GMetaArg descr_of() const override {
+        return cv::GMetaArg{cv::descr_of(m_mat)};
+    }
+
+private:
+    size_t m_throw_every_nth_frame;
+    size_t m_curr_frame_id;
+    size_t m_num_frames;
+    cv::Mat m_mat;
+};
+
+G_TYPED_KERNEL(GThrowExceptionOp, <GMat(GMat)>, "org.opencv.test.throw_error_op")
+{
+     static GMatDesc outMeta(GMatDesc in) { return in; }
+};
+
+GAPI_OCV_KERNEL(GThrowExceptionKernel, GThrowExceptionOp)
+{
+    static std::string exception_msg()
+    {
+        return "GThrowExceptionKernel sucessfuly failed";
+    }
+
+    static void run(const cv::Mat&, cv::Mat&)
+    {
+        throw std::logic_error(GThrowExceptionKernel::exception_msg());
+    }
+};
+
 } // anonymous namespace
 
 TEST_P(GAPI_Streaming, SmokeTest_ConstInput_GMat)
@@ -2512,5 +2572,109 @@ TEST(GAPI_Streaming, TestDesyncMediaFrameGray) {
     }
 }
 
+TEST(GAPI_Streaming_Exception, SingleKernelThrow) {
+    cv::GMat in;
+    auto pipeline = cv::GComputation(in, GThrowExceptionOp::on(in))
+        .compileStreaming(cv::compile_args(cv::gapi::kernels<GThrowExceptionKernel>()));
+
+    cv::Mat in_mat(cv::Size(300, 300), CV_8UC3);
+    cv::randu(in_mat, cv::Scalar::all(0), cv::Scalar::all(255));
+    pipeline.setSource(cv::gin(in_mat));
+    pipeline.start();
+
+    EXPECT_THROW(
+            try {
+                cv::Mat out_mat;
+                pipeline.pull(cv::gout(out_mat));
+            } catch (const std::logic_error& e) {
+                EXPECT_EQ(GThrowExceptionKernel::exception_msg(), e.what());
+                throw;
+            }, std::logic_error);
+}
+
+TEST(GAPI_Streaming_Exception, StreamingBackendExceptionAsInput) {
+    cv::GMat in;
+    auto pipeline = cv::GComputation(in,
+            cv::gapi::copy(GThrowExceptionOp::on(in)))
+        .compileStreaming(cv::compile_args(cv::gapi::kernels<GThrowExceptionKernel>()));
+
+    cv::Mat in_mat(cv::Size(300, 300), CV_8UC3);
+    cv::randu(in_mat, cv::Scalar::all(0), cv::Scalar::all(255));
+    pipeline.setSource(cv::gin(in_mat));
+    pipeline.start();
+
+    EXPECT_THROW(
+            try {
+                cv::Mat out_mat;
+                pipeline.pull(cv::gout(out_mat));
+            } catch (const std::logic_error& e) {
+                EXPECT_EQ(GThrowExceptionKernel::exception_msg(), e.what());
+                throw;
+            }, std::logic_error);
+}
+
+TEST(GAPI_Streaming_Exception, RegularBacckendsExceptionAsInput) {
+    cv::GMat in;
+    auto pipeline = cv::GComputation(in,
+            cv::gapi::add(GThrowExceptionOp::on(in), GThrowExceptionOp::on(in)))
+        .compileStreaming(cv::compile_args(cv::gapi::kernels<GThrowExceptionKernel>()));
+
+    cv::Mat in_mat(cv::Size(300, 300), CV_8UC3);
+    cv::randu(in_mat, cv::Scalar::all(0), cv::Scalar::all(255));
+    pipeline.setSource(cv::gin(in_mat));
+    pipeline.start();
+
+    EXPECT_THROW(
+            try {
+                cv::Mat out_mat;
+                pipeline.pull(cv::gout(out_mat));
+            } catch (const std::logic_error& e) {
+                EXPECT_EQ(GThrowExceptionKernel::exception_msg(), e.what());
+                throw;
+            }, std::logic_error);
+}
+
+TEST(GAPI_Streaming_Exception, SourceThrow) {
+    cv::GMat in;
+    auto pipeline = cv::GComputation(in, cv::gapi::copy(in)).compileStreaming();
+
+    pipeline.setSource(std::make_shared<InvalidSource>(1u, 1u));
+    pipeline.start();
+
+    EXPECT_THROW(
+            try {
+                cv::Mat out_mat;
+                pipeline.pull(cv::gout(out_mat));
+            } catch (const std::logic_error& e) {
+                EXPECT_EQ(InvalidSource::exception_msg(), e.what());
+                throw;
+            }, std::logic_error);
+}
+
+TEST(GAPI_Streaming_Exception, SourceThrowEverySecondFrame) {
+    constexpr size_t throw_every_nth_frame = 2u;
+    constexpr size_t num_frames = 10u;
+    size_t curr_frame = 0;
+    bool has_frame = true;
+    cv::Mat out_mat;
+
+    cv::GMat in;
+    auto pipeline = cv::GComputation(in, cv::gapi::copy(in)).compileStreaming();
+
+    pipeline.setSource(std::make_shared<InvalidSource>(throw_every_nth_frame, num_frames));
+    pipeline.start();
+    while (has_frame) {
+        ++curr_frame;
+        try {
+            has_frame = pipeline.pull(cv::gout(out_mat));
+        } catch (const std::exception& e) {
+            EXPECT_TRUE(curr_frame % throw_every_nth_frame == 0);
+            EXPECT_EQ(InvalidSource::exception_msg(), e.what());
+        }
+    }
+
+    // NB: Pull was called num_frames + 1(stop).
+    EXPECT_EQ(num_frames, curr_frame - 1);
+}
 
 } // namespace opencv_test