dnn: use AsyncArray
authorAlexander Alekhin <alexander.a.alekhin@gmail.com>
Wed, 1 May 2019 11:51:12 +0000 (11:51 +0000)
committerAlexander Alekhin <alexander.a.alekhin@gmail.com>
Sat, 18 May 2019 19:32:23 +0000 (19:32 +0000)
modules/dnn/include/opencv2/dnn/dnn.hpp
modules/dnn/misc/python/pyopencv_dnn.hpp
modules/dnn/misc/python/shadow_async_mat.hpp [deleted file]
modules/dnn/misc/python/test/test_dnn.py
modules/dnn/src/dnn.cpp
modules/dnn/src/op_inf_engine.cpp
modules/dnn/src/op_inf_engine.hpp
modules/dnn/test/test_misc.cpp
samples/dnn/object_detection.cpp
samples/dnn/object_detection.py

index a8ee534..2fe8d61 100644 (file)
@@ -44,9 +44,7 @@
 
 #include <vector>
 #include <opencv2/core.hpp>
-#ifdef CV_CXX11
-#include <future>
-#endif
+#include "opencv2/core/async.hpp"
 
 #if !defined CV_DOXYGEN && !defined CV_STATIC_ANALYSIS && !defined CV_DNN_DONT_ADD_EXPERIMENTAL_NS
 #define CV__DNN_EXPERIMENTAL_NS_BEGIN namespace experimental_dnn_34_v12 {
@@ -67,18 +65,6 @@ CV__DNN_EXPERIMENTAL_NS_BEGIN
 
     typedef std::vector<int> MatShape;
 
-#if defined(CV_CXX11) || defined(CV_DOXYGEN)
-    typedef std::future<Mat> AsyncMat;
-#else
-    // Just a workaround for bindings.
-    struct AsyncMat
-    {
-        Mat get() { return Mat(); }
-        void wait() const {}
-        size_t wait_for(size_t milliseconds) const { CV_UNUSED(milliseconds); return -1; }
-    };
-#endif
-
     /**
      * @brief Enum of computation backends supported by layers.
      * @see Net::setPreferableBackend
@@ -483,7 +469,7 @@ CV__DNN_EXPERIMENTAL_NS_BEGIN
          *  This is an asynchronous version of forward(const String&).
          *  dnn::DNN_BACKEND_INFERENCE_ENGINE backend is required.
          */
-        CV_WRAP AsyncMat forwardAsync(const String& outputName = String());
+        CV_WRAP AsyncArray forwardAsync(const String& outputName = String());
 
         /** @brief Runs forward pass to compute output of layer with name @p outputName.
          *  @param outputBlobs contains all output blobs for specified layer.
index 8e6d803..03728f6 100644 (file)
@@ -2,13 +2,6 @@
 typedef dnn::DictValue LayerId;
 typedef std::vector<dnn::MatShape> vector_MatShape;
 typedef std::vector<std::vector<dnn::MatShape> > vector_vector_MatShape;
-#ifdef CV_CXX11
-typedef std::chrono::milliseconds chrono_milliseconds;
-typedef std::future_status AsyncMatStatus;
-#else
-typedef size_t chrono_milliseconds;
-typedef size_t AsyncMatStatus;
-#endif
 
 template<>
 bool pyopencv_to(PyObject *o, dnn::DictValue &dv, const char *name)
@@ -46,46 +39,6 @@ bool pyopencv_to(PyObject *o, std::vector<Mat> &blobs, const char *name) //requi
   return pyopencvVecConverter<Mat>::to(o, blobs, ArgInfo(name, false));
 }
 
-#ifdef CV_CXX11
-
-template<>
-PyObject* pyopencv_from(const std::future<Mat>& f_)
-{
-    std::future<Mat>& f = const_cast<std::future<Mat>&>(f_);
-    Ptr<cv::dnn::AsyncMat> p(new std::future<Mat>(std::move(f)));
-    return pyopencv_from(p);
-}
-
-template<>
-PyObject* pyopencv_from(const std::future_status& status)
-{
-    return pyopencv_from((int)status);
-}
-
-template<>
-bool pyopencv_to(PyObject* src, std::chrono::milliseconds& dst, const char* name)
-{
-    size_t millis = 0;
-    if (pyopencv_to(src, millis, name))
-    {
-        dst = std::chrono::milliseconds(millis);
-        return true;
-    }
-    else
-        return false;
-}
-
-#else
-
-template<>
-PyObject* pyopencv_from(const cv::dnn::AsyncMat&)
-{
-    CV_Error(Error::StsNotImplemented, "C++11 is required.");
-    return 0;
-}
-
-#endif  // CV_CXX11
-
 template<typename T>
 PyObject* pyopencv_from(const dnn::DictValue &dv)
 {
diff --git a/modules/dnn/misc/python/shadow_async_mat.hpp b/modules/dnn/misc/python/shadow_async_mat.hpp
deleted file mode 100644 (file)
index 8807b86..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-#error This is a shadow header file, which is not intended for processing by any compiler. \
-       Only bindings parser should handle this file.
-
-namespace cv { namespace dnn {
-
-class CV_EXPORTS_W AsyncMat
-{
-public:
-    //! Wait for Mat object readiness and return it.
-    CV_WRAP Mat get();
-
-    //! Wait for Mat object readiness.
-    CV_WRAP void wait() const;
-
-    /** @brief Wait for Mat object readiness specific amount of time.
-     *  @param timeout Timeout in milliseconds
-     *  @returns [std::future_status](https://en.cppreference.com/w/cpp/thread/future_status)
-     */
-    CV_WRAP AsyncMatStatus wait_for(std::chrono::milliseconds timeout) const;
-};
-
-}}
index 58f6885..600106d 100644 (file)
@@ -69,8 +69,9 @@ def printParams(backend, target):
 
 class dnn_test(NewOpenCVTests):
 
-    def __init__(self, *args, **kwargs):
-        super(dnn_test, self).__init__(*args, **kwargs)
+    def setUp(self):
+        super(dnn_test, self).setUp()
+
         self.dnnBackendsAndTargets = [
             [cv.dnn.DNN_BACKEND_OPENCV, cv.dnn.DNN_TARGET_CPU],
         ]
@@ -168,7 +169,7 @@ class dnn_test(NewOpenCVTests):
             normAssertDetections(self, ref, out, 0.5, scoresDiff, iouDiff)
 
     def test_async(self):
-        timeout = 5000  # in milliseconds
+        timeout = 500*10**6  # in nanoseconds (500ms)
         testdata_required = bool(os.environ.get('OPENCV_DNN_TEST_REQUIRE_TESTDATA', False))
         proto = self.find_dnn_file('dnn/layers/layer_convolution.prototxt', required=testdata_required)
         model = self.find_dnn_file('dnn/layers/layer_convolution.caffemodel', required=testdata_required)
@@ -209,11 +210,9 @@ class dnn_test(NewOpenCVTests):
                 outs.insert(0, netAsync.forwardAsync())
 
             for i in reversed(range(numInputs)):
-                ret = outs[i].wait_for(timeout)
-                if ret == 1:
-                    self.fail("Timeout")
-                self.assertEqual(ret, 0)  # is ready
-                normAssert(self, refs[i], outs[i].get(), 'Index: %d' % i, 1e-10)
+                ret, result = outs[i].get(timeoutNs=float(timeout))
+                self.assertTrue(ret)
+                normAssert(self, refs[i], result, 'Index: %d' % i, 1e-10)
 
 
 if __name__ == '__main__':
index e7a363b..edf0ba9 100644 (file)
@@ -2557,7 +2557,7 @@ struct Net::Impl
     }
 
 #ifdef CV_CXX11
-    std::future<Mat> getBlobAsync(const LayerPin& pin)
+    AsyncArray getBlobAsync(const LayerPin& pin)
     {
         CV_TRACE_FUNCTION();
 #ifdef HAVE_INF_ENGINE
@@ -2586,7 +2586,7 @@ struct Net::Impl
 #endif
     }
 
-    std::future<Mat> getBlobAsync(String outputName)
+    AsyncArray getBlobAsync(String outputName)
     {
         return getBlobAsync(getPinByAlias(outputName));
     }
@@ -2714,7 +2714,7 @@ Mat Net::forward(const String& outputName)
     return impl->getBlob(layerName);
 }
 
-AsyncMat Net::forwardAsync(const String& outputName)
+AsyncArray Net::forwardAsync(const String& outputName)
 {
     CV_TRACE_FUNCTION();
 #ifdef CV_CXX11
index 6a9a334..0883f53 100644 (file)
@@ -849,7 +849,7 @@ void InfEngineBackendNet::InfEngineReqWrapper::makePromises(const std::vector<Pt
     outsNames.resize(outs.size());
     for (int i = 0; i < outs.size(); ++i)
     {
-        outs[i]->futureMat = outProms[i].get_future();
+        outs[i]->futureMat = outProms[i].getArrayResult();
         outsNames[i] = outs[i]->dataPtr->name;
     }
 }
@@ -906,20 +906,38 @@ void InfEngineBackendNet::forward(const std::vector<Ptr<BackendWrapper> >& outBl
             {
                 InfEngineReqWrapper* wrapper;
                 request->GetUserData((void**)&wrapper, 0);
-                CV_Assert(wrapper);
+                CV_Assert(wrapper && "Internal error");
 
-                for (int i = 0; i < wrapper->outProms.size(); ++i)
+                size_t processedOutputs = 0;
+                try
                 {
-                    const std::string& name = wrapper->outsNames[i];
-                    Mat m = infEngineBlobToMat(wrapper->req.GetBlob(name));
+                    for (; processedOutputs < wrapper->outProms.size(); ++processedOutputs)
+                    {
+                        const std::string& name = wrapper->outsNames[processedOutputs];
+                        Mat m = infEngineBlobToMat(wrapper->req.GetBlob(name));
 
-                    if (status == InferenceEngine::StatusCode::OK)
-                        wrapper->outProms[i].set_value(m.clone());
-                    else
+                        try
+                        {
+                            CV_Assert(status == InferenceEngine::StatusCode::OK);
+                            wrapper->outProms[processedOutputs].setValue(m.clone());
+                        }
+                        catch (...)
+                        {
+                            try {
+                                wrapper->outProms[processedOutputs].setException(std::current_exception());
+                            } catch(...) {
+                                CV_LOG_ERROR(NULL, "DNN: Exception occured during async inference exception propagation");
+                            }
+                        }
+                    }
+                }
+                catch (...)
+                {
+                    std::exception_ptr e = std::current_exception();
+                    for (; processedOutputs < wrapper->outProms.size(); ++processedOutputs)
                     {
                         try {
-                            std::runtime_error e("Async request failed");
-                            wrapper->outProms[i].set_exception(std::make_exception_ptr(e));
+                            wrapper->outProms[processedOutputs].setException(e);
                         } catch(...) {
                             CV_LOG_ERROR(NULL, "DNN: Exception occured during async inference exception propagation");
                         }
index 009e121..44ffd5e 100644 (file)
@@ -12,6 +12,9 @@
 #include "opencv2/core/cvstd.hpp"
 #include "opencv2/dnn.hpp"
 
+#include "opencv2/core/async.hpp"
+#include "opencv2/core/detail/async_promise.hpp"
+
 #include "opencv2/dnn/utils/inference_engine.hpp"
 
 #ifdef HAVE_INF_ENGINE
@@ -208,7 +211,7 @@ private:
         void makePromises(const std::vector<Ptr<BackendWrapper> >& outs);
 
         InferenceEngine::InferRequest req;
-        std::vector<std::promise<Mat> > outProms;
+        std::vector<cv::AsyncPromise> outProms;
         std::vector<std::string> outsNames;
         bool isReady;
     };
@@ -264,7 +267,7 @@ public:
 
     InferenceEngine::DataPtr dataPtr;
     InferenceEngine::Blob::Ptr blob;
-    std::future<Mat> futureMat;
+    AsyncArray futureMat;
 };
 
 InferenceEngine::Blob::Ptr wrapToInfEngineBlob(const Mat& m, InferenceEngine::Layout layout = InferenceEngine::Layout::ANY);
index 59e6f91..c83dbc7 100644 (file)
@@ -341,12 +341,13 @@ TEST(Net, forwardAndRetrieve)
 }
 
 #ifdef HAVE_INF_ENGINE
+static const std::chrono::milliseconds async_timeout(500);
+
 // This test runs network in synchronous mode for different inputs and then
 // runs the same model asynchronously for the same inputs.
 typedef testing::TestWithParam<tuple<int, Target> > Async;
 TEST_P(Async, set_and_forward_single)
 {
-    static const int kTimeout = 5000;  // in milliseconds.
     const int dtype = get<0>(GetParam());
     const int target = get<1>(GetParam());
 
@@ -383,16 +384,16 @@ TEST_P(Async, set_and_forward_single)
     {
         netAsync.setInput(inputs[i]);
 
-        std::future<Mat> out = netAsync.forwardAsync();
-        if (out.wait_for(std::chrono::milliseconds(kTimeout)) == std::future_status::timeout)
-            CV_Error(Error::StsAssert, "Timeout");
-        normAssert(refs[i], out.get(), format("Index: %d", i).c_str(), 0, 0);
+        AsyncArray out = netAsync.forwardAsync();
+        ASSERT_TRUE(out.valid());
+        Mat result;
+        EXPECT_TRUE(out.get(result, async_timeout));
+        normAssert(refs[i], result, format("Index: %d", i).c_str(), 0, 0);
     }
 }
 
 TEST_P(Async, set_and_forward_all)
 {
-    static const int kTimeout = 5000;  // in milliseconds.
     const int dtype = get<0>(GetParam());
     const int target = get<1>(GetParam());
 
@@ -426,7 +427,7 @@ TEST_P(Async, set_and_forward_all)
     }
 
     // Run asynchronously. To make test more robust, process inputs in the reversed order.
-    std::vector<std::future<Mat> > outs(numInputs);
+    std::vector<AsyncArray> outs(numInputs);
     for (int i = numInputs - 1; i >= 0; --i)
     {
         netAsync.setInput(inputs[i]);
@@ -435,9 +436,10 @@ TEST_P(Async, set_and_forward_all)
 
     for (int i = numInputs - 1; i >= 0; --i)
     {
-        if (outs[i].wait_for(std::chrono::milliseconds(kTimeout)) == std::future_status::timeout)
-            CV_Error(Error::StsAssert, "Timeout");
-        normAssert(refs[i], outs[i].get(), format("Index: %d", i).c_str(), 0, 0);
+        ASSERT_TRUE(outs[i].valid());
+        Mat result;
+        EXPECT_TRUE(outs[i].get(result, async_timeout));
+        normAssert(refs[i], result, format("Index: %d", i).c_str(), 0, 0);
     }
 }
 
index 6f6a1ca..91ccd6c 100644 (file)
@@ -6,6 +6,7 @@
 #include <opencv2/highgui.hpp>
 
 #ifdef CV_CXX11
+#include <mutex>
 #include <thread>
 #include <queue>
 #endif
@@ -185,7 +186,7 @@ int main(int argc, char** argv)
     QueueFPS<Mat> processedFramesQueue;
     QueueFPS<std::vector<Mat> > predictionsQueue;
     std::thread processingThread([&](){
-        std::queue<std::future<Mat> > futureOutputs;
+        std::queue<AsyncArray> futureOutputs;
         Mat blob;
         while (process)
         {
@@ -224,11 +225,13 @@ int main(int argc, char** argv)
             }
 
             while (!futureOutputs.empty() &&
-                   futureOutputs.front().wait_for(std::chrono::seconds(0)) == std::future_status::ready)
+                   futureOutputs.front().wait_for(std::chrono::seconds(0)))
             {
-                Mat out = futureOutputs.front().get();
-                predictionsQueue.push({out});
+                AsyncArray async_out = futureOutputs.front();
                 futureOutputs.pop();
+                Mat out;
+                async_out.get(out);
+                predictionsQueue.push({out});
             }
         }
     });
index f32f76b..30b677c 100644 (file)
@@ -4,7 +4,7 @@ import numpy as np
 import sys
 import time
 from threading import Thread
-if sys.version_info[0] == '2':
+if sys.version_info[0] == 2:
     import Queue as queue
 else:
     import queue
@@ -262,7 +262,7 @@ def processingThreadBody():
                 outs = net.forward(outNames)
                 predictionsQueue.put(np.copy(outs))
 
-        while futureOutputs and futureOutputs[0].wait_for(0) == 0:
+        while futureOutputs and futureOutputs[0].wait_for(0):
             out = futureOutputs[0].get()
             predictionsQueue.put(np.copy([out]))