return img
+ def convertNV12p2BGR(in_nv12):
+ shape = in_nv12.shape
+ y_height = shape[0] // 3 * 2
+ uv_shape = (shape[0] // 3, shape[1])
+ new_uv_shape = (uv_shape[0], uv_shape[1] // 2, 2)
+ return cv.cvtColorTwoPlane(in_nv12[:y_height, :],
+ in_nv12[ y_height:, :].reshape(new_uv_shape),
+ cv.COLOR_YUV2BGR_NV12)
+
+
class test_gapi_streaming(NewOpenCVTests):
def test_image_input(self):
def test_gapi_streaming_meta(self):
- ksize = 3
path = self.find_file('cv/video/768x576.avi', [os.environ['OPENCV_TEST_DATA_PATH']])
# G-API
cv.gapi.compile_args(cv.gapi.streaming.queue_capacity(1)))
+ def get_gst_source(self, gstpipeline):
+ # NB: Skip test in case gstreamer isn't available.
+ try:
+ return cv.gapi.wip.make_gst_src(gstpipeline)
+ except cv.error as e:
+ if str(e).find('Built without GStreamer support!') == -1:
+ raise e
+ else:
+ raise unittest.SkipTest(str(e))
+
+
+ def test_gst_source(self):
+ if not cv.videoio_registry.hasBackend(cv.CAP_GSTREAMER):
+ raise unittest.SkipTest("Backend is not available/disabled: GSTREAMER")
+
+ gstpipeline = """videotestsrc is-live=true pattern=colors num-buffers=10 !
+ videorate ! videoscale ! video/x-raw,width=1920,height=1080,
+ framerate=30/1 ! appsink"""
+
+ g_in = cv.GMat()
+ g_out = cv.gapi.copy(g_in)
+ c = cv.GComputation(cv.GIn(g_in), cv.GOut(g_out))
+
+ ccomp = c.compileStreaming()
+
+ source = self.get_gst_source(gstpipeline)
+
+ ccomp.setSource(cv.gin(source))
+ ccomp.start()
+
+ has_frame, output = ccomp.pull()
+ while has_frame:
+ self.assertTrue(output.size != 0)
+ has_frame, output = ccomp.pull()
+
+
+ def open_VideoCapture_gstreamer(self, gstpipeline):
+ try:
+ cap = cv.VideoCapture(gstpipeline, cv.CAP_GSTREAMER)
+ except Exception as e:
+ raise unittest.SkipTest("Backend GSTREAMER can't open the video; " +
+ "cause: " + str(e))
+ if not cap.isOpened():
+ raise unittest.SkipTest("Backend GSTREAMER can't open the video")
+ return cap
+
+
+ def test_gst_source_accuracy(self):
+ if not cv.videoio_registry.hasBackend(cv.CAP_GSTREAMER):
+ raise unittest.SkipTest("Backend is not available/disabled: GSTREAMER")
+
+ path = self.find_file('highgui/video/big_buck_bunny.avi',
+ [os.environ['OPENCV_TEST_DATA_PATH']])
+ gstpipeline = """filesrc location=""" + path + """ ! decodebin ! videoconvert !
+ videoscale ! video/x-raw,format=NV12 ! appsink"""
+
+ # G-API pipeline
+ g_in = cv.GMat()
+ g_out = cv.gapi.copy(g_in)
+ c = cv.GComputation(cv.GIn(g_in), cv.GOut(g_out))
+
+ ccomp = c.compileStreaming()
+
+ # G-API Gst-source
+ source = self.get_gst_source(gstpipeline)
+ ccomp.setSource(cv.gin(source))
+ ccomp.start()
+
+ # OpenCV Gst-source
+ cap = self.open_VideoCapture_gstreamer(gstpipeline)
+
+ # Assert
+ max_num_frames = 10
+ for _ in range(max_num_frames):
+ has_expected, expected = cap.read()
+ has_actual, actual = ccomp.pull()
+
+ self.assertEqual(has_expected, has_actual)
+
+ if not has_expected:
+ break
+
+ self.assertEqual(0.0, cv.norm(convertNV12p2BGR(expected), actual, cv.NORM_INF))
+
+
+ def get_gst_pipeline(self, gstpipeline):
+ # NB: Skip test in case gstreamer isn't available.
+ try:
+ return cv.gapi.wip.GStreamerPipeline(gstpipeline)
+ except cv.error as e:
+ if str(e).find('Built without GStreamer support!') == -1:
+ raise e
+ else:
+ raise unittest.SkipTest(str(e))
+ except SystemError as e:
+ raise unittest.SkipTest(str(e) + ", casued by " + str(e.__cause__))
+
+
+ def test_gst_multiple_sources(self):
+ if not cv.videoio_registry.hasBackend(cv.CAP_GSTREAMER):
+ raise unittest.SkipTest("Backend is not available/disabled: GSTREAMER")
+
+ gstpipeline = """videotestsrc is-live=true pattern=colors num-buffers=10 !
+ videorate ! videoscale !
+ video/x-raw,width=1920,height=1080,framerate=30/1 !
+ appsink name=sink1
+ videotestsrc is-live=true pattern=colors num-buffers=10 !
+ videorate ! videoscale !
+ video/x-raw,width=1920,height=1080,framerate=30/1 !
+ appsink name=sink2"""
+
+ g_in1 = cv.GMat()
+ g_in2 = cv.GMat()
+ g_out = cv.gapi.add(g_in1, g_in2)
+ c = cv.GComputation(cv.GIn(g_in1, g_in2), cv.GOut(g_out))
+
+ ccomp = c.compileStreaming()
+
+ pp = self.get_gst_pipeline(gstpipeline)
+ src1 = cv.gapi.wip.get_streaming_source(pp, "sink1")
+ src2 = cv.gapi.wip.get_streaming_source(pp, "sink2")
+
+ ccomp.setSource(cv.gin(src1, src2))
+ ccomp.start()
+
+ has_frame, out = ccomp.pull()
+ while has_frame:
+ self.assertTrue(out.size != 0)
+ has_frame, out = ccomp.pull()
+
+
+ def test_gst_multiple_sources_accuracy(self):
+ if not cv.videoio_registry.hasBackend(cv.CAP_GSTREAMER):
+ raise unittest.SkipTest("Backend is not available/disabled: GSTREAMER")
+
+ path = self.find_file('highgui/video/big_buck_bunny.avi',
+ [os.environ['OPENCV_TEST_DATA_PATH']])
+ gstpipeline1 = """filesrc location=""" + path + """ ! decodebin ! videoconvert !
+ videoscale ! video/x-raw,format=NV12 ! appsink"""
+ gstpipeline2 = """filesrc location=""" + path + """ ! decodebin !
+ videoflip method=clockwise ! videoconvert ! videoscale !
+ video/x-raw,format=NV12 ! appsink"""
+ gstpipeline_gapi = gstpipeline1 + ' name=sink1 ' + gstpipeline2 + ' name=sink2'
+
+ # G-API pipeline
+ g_in1 = cv.GMat()
+ g_in2 = cv.GMat()
+ g_out1 = cv.gapi.copy(g_in1)
+ g_out2 = cv.gapi.copy(g_in2)
+ c = cv.GComputation(cv.GIn(g_in1, g_in2), cv.GOut(g_out1, g_out2))
+
+ ccomp = c.compileStreaming()
+
+ # G-API Gst-source
+ pp = self.get_gst_pipeline(gstpipeline_gapi)
+
+ src1 = cv.gapi.wip.get_streaming_source(pp, "sink1")
+ src2 = cv.gapi.wip.get_streaming_source(pp, "sink2")
+ ccomp.setSource(cv.gin(src1, src2))
+ ccomp.start()
+
+ # OpenCV Gst-source
+ cap1 = self.open_VideoCapture_gstreamer(gstpipeline1)
+ cap2 = self.open_VideoCapture_gstreamer(gstpipeline2)
+
+ # Assert
+ max_num_frames = 10
+ for _ in range(max_num_frames):
+ has_expected1, expected1 = cap1.read()
+ has_expected2, expected2 = cap2.read()
+ has_actual, (actual1, actual2) = ccomp.pull()
+
+ self.assertEqual(has_expected1, has_expected2)
+ has_expected = has_expected1 and has_expected2
+ self.assertEqual(has_expected, has_actual)
+
+ if not has_expected:
+ break
+
+ self.assertEqual(0.0, cv.norm(convertNV12p2BGR(expected1), actual1, cv.NORM_INF))
+ self.assertEqual(0.0, cv.norm(convertNV12p2BGR(expected2), actual2, cv.NORM_INF))
+
+
except unittest.SkipTest as e: