Asynchronous C++ sample
authorDmitry Kurtaev <dmitry.kurtaev+github@gmail.com>
Tue, 14 May 2019 14:43:48 +0000 (17:43 +0300)
committerDmitry Kurtaev <dmitry.kurtaev+github@gmail.com>
Tue, 14 May 2019 16:09:07 +0000 (19:09 +0300)
samples/dnn/object_detection.cpp

index c30e217..6f6a1ca 100644 (file)
@@ -5,6 +5,11 @@
 #include <opencv2/imgproc.hpp>
 #include <opencv2/highgui.hpp>
 
+#ifdef CV_CXX11
+#include <thread>
+#include <queue>
+#endif
+
 #include "common.hpp"
 
 std::string keys =
@@ -26,8 +31,9 @@ std::string keys =
                          "0: CPU target (by default), "
                          "1: OpenCL, "
                          "2: OpenCL fp16 (half-float precision), "
-                         "3: VPU }";
-
+                         "3: VPU }"
+    "{ async       | 0 | Number of asynchronous forwards at the same time. "
+                        "Choose 0 for synchronous mode }";
 
 using namespace cv;
 using namespace dnn;
@@ -35,13 +41,66 @@ using namespace dnn;
 float confThreshold, nmsThreshold;
 std::vector<std::string> classes;
 
+inline void preprocess(const Mat& frame, Net& net, Size inpSize, float scale,
+                       const Scalar& mean, bool swapRB);
+
 void postprocess(Mat& frame, const std::vector<Mat>& out, Net& net);
 
 void drawPred(int classId, float conf, int left, int top, int right, int bottom, Mat& frame);
 
 void callback(int pos, void* userdata);
 
-std::vector<String> getOutputsNames(const Net& net);
+#ifdef CV_CXX11
+template <typename T>
+class QueueFPS : public std::queue<T>
+{
+public:
+    QueueFPS() : counter(0) {}
+
+    void push(const T& entry)
+    {
+        std::lock_guard<std::mutex> lock(mutex);
+
+        std::queue<T>::push(entry);
+        counter += 1;
+        if (counter == 1)
+        {
+            // Start counting from a second frame (warmup).
+            tm.reset();
+            tm.start();
+        }
+    }
+
+    T get()
+    {
+        std::lock_guard<std::mutex> lock(mutex);
+        T entry = this->front();
+        this->pop();
+        return entry;
+    }
+
+    float getFPS()
+    {
+        tm.stop();
+        double fps = counter / tm.getTimeSec();
+        tm.start();
+        return static_cast<float>(fps);
+    }
+
+    void clear()
+    {
+        std::lock_guard<std::mutex> lock(mutex);
+        while (!this->empty())
+            this->pop();
+    }
+
+    unsigned int counter;
+
+private:
+    TickMeter tm;
+    std::mutex mutex;
+};
+#endif  // CV_CXX11
 
 int main(int argc, char** argv)
 {
@@ -67,6 +126,7 @@ int main(int argc, char** argv)
     bool swapRB = parser.get<bool>("rgb");
     int inpWidth = parser.get<int>("width");
     int inpHeight = parser.get<int>("height");
+    size_t async = parser.get<int>("async");
     CV_Assert(parser.has("model"));
     std::string modelPath = findFile(parser.get<String>("model"));
     std::string configPath = findFile(parser.get<String>("config"));
@@ -104,6 +164,108 @@ int main(int argc, char** argv)
     else
         cap.open(parser.get<int>("device"));
 
+#ifdef CV_CXX11
+    bool process = true;
+
+    // Frames capturing thread
+    QueueFPS<Mat> framesQueue;
+    std::thread framesThread([&](){
+        Mat frame;
+        while (process)
+        {
+            cap >> frame;
+            if (!frame.empty())
+                framesQueue.push(frame.clone());
+            else
+                break;
+        }
+    });
+
+    // Frames processing thread
+    QueueFPS<Mat> processedFramesQueue;
+    QueueFPS<std::vector<Mat> > predictionsQueue;
+    std::thread processingThread([&](){
+        std::queue<std::future<Mat> > futureOutputs;
+        Mat blob;
+        while (process)
+        {
+            // Get a next frame
+            Mat frame;
+            {
+                if (!framesQueue.empty())
+                {
+                    frame = framesQueue.get();
+                    if (async)
+                    {
+                        if (futureOutputs.size() == async)
+                            frame = Mat();
+                    }
+                    else
+                        framesQueue.clear();  // Skip the rest of frames
+                }
+            }
+
+            // Process the frame
+            if (!frame.empty())
+            {
+                preprocess(frame, net, Size(inpWidth, inpHeight), scale, mean, swapRB);
+                processedFramesQueue.push(frame);
+
+                if (async)
+                {
+                    futureOutputs.push(net.forwardAsync());
+                }
+                else
+                {
+                    std::vector<Mat> outs;
+                    net.forward(outs, outNames);
+                    predictionsQueue.push(outs);
+                }
+            }
+
+            while (!futureOutputs.empty() &&
+                   futureOutputs.front().wait_for(std::chrono::seconds(0)) == std::future_status::ready)
+            {
+                Mat out = futureOutputs.front().get();
+                predictionsQueue.push({out});
+                futureOutputs.pop();
+            }
+        }
+    });
+
+    // Postprocessing and rendering loop
+    while (waitKey(1) < 0)
+    {
+        if (predictionsQueue.empty())
+            continue;
+
+        std::vector<Mat> outs = predictionsQueue.get();
+        Mat frame = processedFramesQueue.get();
+
+        postprocess(frame, outs, net);
+
+        if (predictionsQueue.counter > 1)
+        {
+            std::string label = format("Camera: %.2f FPS", framesQueue.getFPS());
+            putText(frame, label, Point(0, 15), FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
+
+            label = format("Network: %.2f FPS", predictionsQueue.getFPS());
+            putText(frame, label, Point(0, 30), FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
+
+            label = format("Skipped frames: %d", framesQueue.counter - predictionsQueue.counter);
+            putText(frame, label, Point(0, 45), FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
+        }
+        imshow(kWinName, frame);
+    }
+
+    process = false;
+    framesThread.join();
+    processingThread.join();
+
+#else  // CV_CXX11
+    if (async)
+        CV_Error(Error::StsNotImplemented, "Asynchronous forward is supported only with Inference Engine backend.");
+
     // Process frames.
     Mat frame, blob;
     while (waitKey(1) < 0)
@@ -115,19 +277,8 @@ int main(int argc, char** argv)
             break;
         }
 
-        // Create a 4D blob from a frame.
-        Size inpSize(inpWidth > 0 ? inpWidth : frame.cols,
-                     inpHeight > 0 ? inpHeight : frame.rows);
-        blobFromImage(frame, blob, scale, inpSize, mean, swapRB, false);
+        preprocess(frame, net, Size(inpWidth, inpHeight), scale, mean, swapRB);
 
-        // Run a model.
-        net.setInput(blob);
-        if (net.getLayer(0)->outputNameToIndex("im_info") != -1)  // Faster-RCNN or R-FCN
-        {
-            resize(frame, frame, inpSize);
-            Mat imInfo = (Mat_<float>(1, 3) << inpSize.height, inpSize.width, 1.6f);
-            net.setInput(imInfo, "im_info");
-        }
         std::vector<Mat> outs;
         net.forward(outs, outNames);
 
@@ -142,9 +293,29 @@ int main(int argc, char** argv)
 
         imshow(kWinName, frame);
     }
+#endif  // CV_CXX11
     return 0;
 }
 
+inline void preprocess(const Mat& frame, Net& net, Size inpSize, float scale,
+                       const Scalar& mean, bool swapRB)
+{
+    static Mat blob;
+    // Create a 4D blob from a frame.
+    if (inpSize.width <= 0) inpSize.width = frame.cols;
+    if (inpSize.height <= 0) inpSize.height = frame.rows;
+    blobFromImage(frame, blob, 1.0, inpSize, Scalar(), swapRB, false, CV_8U);
+
+    // Run a model.
+    net.setInput(blob, "", scale, mean);
+    if (net.getLayer(0)->outputNameToIndex("im_info") != -1)  // Faster-RCNN or R-FCN
+    {
+        resize(frame, frame, inpSize);
+        Mat imInfo = (Mat_<float>(1, 3) << inpSize.height, inpSize.width, 1.6f);
+        net.setInput(imInfo, "im_info");
+    }
+}
+
 void postprocess(Mat& frame, const std::vector<Mat>& outs, Net& net)
 {
     static std::vector<int> outLayers = net.getUnconnectedOutLayers();