From c850e12504df34d3cdf9a1f38303679cf0ef217a Mon Sep 17 00:00:00 2001 From: Hyoung Joo Ahn Date: Thu, 25 Jun 2020 12:13:58 +0900 Subject: [PATCH] [Mux] add the new sync_mode:REFRESH This new sync mode allows PUSHING with only one new buffer of sinkpads of `tensor_mux`. Previously, if `tensor_mux` want to push its buffers to srcpad, all of sink pads had to be filled with new buffers. However, this new mode, REFRESH, push buffers when `tensor_mux` receive a new buffer. For the other sink pads, except the one received the new buffer, will use again the previous one. It decides EOS when any of the sink pads receive it. Signed-off-by: Hyoung Joo Ahn --- gst/nnstreamer/tensor_common.h | 2 ++ gst/nnstreamer/tensor_common_pipeline.c | 39 +++++++++++++++++--- gst/nnstreamer/tensor_mux/gsttensormux.c | 61 ++++++++++++++++++++++++++++++-- 3 files changed, 94 insertions(+), 8 deletions(-) diff --git a/gst/nnstreamer/tensor_common.h b/gst/nnstreamer/tensor_common.h index f61727a..faf4874 100644 --- a/gst/nnstreamer/tensor_common.h +++ b/gst/nnstreamer/tensor_common.h @@ -56,12 +56,14 @@ G_BEGIN_DECLS /** * @brief time synchronization options + * @see https://github.com/nnstreamer/nnstreamer/wiki/Synchronization-Policies-at-Mux-and-Merge */ typedef enum { SYNC_NOSYNC = 0, SYNC_SLOWEST = 1, SYNC_BASEPAD = 2, + SYNC_REFRESH = 3, SYNC_END, } tensor_time_sync_mode; diff --git a/gst/nnstreamer/tensor_common_pipeline.c b/gst/nnstreamer/tensor_common_pipeline.c index a8c0e06..3e602e2 100644 --- a/gst/nnstreamer/tensor_common_pipeline.c +++ b/gst/nnstreamer/tensor_common_pipeline.c @@ -20,6 +20,7 @@ static const gchar *gst_tensor_time_sync_mode_string[] = { [SYNC_NOSYNC] = "nosync", [SYNC_SLOWEST] = "slowest", [SYNC_BASEPAD] = "basepad", + [SYNC_REFRESH] = "refresh", [SYNC_END] = NULL }; @@ -111,9 +112,16 @@ _gst_tensor_time_sync_is_eos (GstCollectPads * collect, total = g_slist_length (collect->data); - /** @todo update below with each sync mode */ - if (empty > 0 || empty == total) - is_eos = TRUE; + switch (sync->mode) { + case SYNC_REFRESH: + if (empty == total) + is_eos = TRUE; + break; + default: + if (empty > 0) + is_eos = TRUE; + break; + } return is_eos; } @@ -145,6 +153,7 @@ gst_tensor_time_sync_get_current_time (GstCollectPads * collect, case SYNC_NOSYNC: /* fall-through */ case SYNC_SLOWEST: + case SYNC_REFRESH: if (*current_time < GST_BUFFER_PTS (buf)) *current_time = GST_BUFFER_PTS (buf); break; @@ -259,6 +268,7 @@ gst_tensor_time_sync_buffer_from_collectpad (GstCollectPads * collect, while (walk) { gboolean configured = FALSE; + gboolean is_empty = FALSE; data = (GstCollectData *) walk->data; pad = (GstTensorCollectPadData *) data; @@ -298,9 +308,28 @@ gst_tensor_time_sync_buffer_from_collectpad (GstCollectPads * collect, if (FALSE == _gst_tensor_time_sync_buffer_update (&buf, collect, data, current_time, base_time, sync)) return FALSE; + is_empty = (buf == NULL); break; case SYNC_NOSYNC: buf = gst_collect_pads_pop (collect, data); + is_empty = (buf == NULL); + break; + case SYNC_REFRESH: + buf = gst_collect_pads_pop (collect, data); + if (buf != NULL) { + if (pad->buffer != NULL) { + gst_buffer_unref (pad->buffer); + } + pad->buffer = gst_buffer_ref (buf); + } else { + if (pad->buffer == NULL) { + *is_eos = FALSE; + ml_logd ("Not the all buffers are arrived yet."); + return FALSE; + } + is_empty = TRUE; + buf = gst_buffer_ref (pad->buffer); + } break; default: break; @@ -324,9 +353,9 @@ gst_tensor_time_sync_buffer_from_collectpad (GstCollectPads * collect, } gst_buffer_unref (buf); - } else { - empty_pad++; } + if (is_empty) + empty_pad++; } configs->info.num_tensors = counting; diff --git a/gst/nnstreamer/tensor_mux/gsttensormux.c b/gst/nnstreamer/tensor_mux/gsttensormux.c index 8ab1cf7..0741551 100644 --- a/gst/nnstreamer/tensor_mux/gsttensormux.c +++ b/gst/nnstreamer/tensor_mux/gsttensormux.c @@ -121,7 +121,10 @@ static GstStateChangeReturn gst_tensor_mux_change_state (GstElement * element, static gboolean gst_tensor_mux_sink_event (GstCollectPads * pads, GstCollectData * data, GstEvent * event, GstTensorMux * tensor_mux); static GstFlowReturn gst_tensor_mux_collected (GstCollectPads * pads, - GstTensorMux * tesnor_mux); + GstTensorMux * tensor_mux); +static GstFlowReturn gst_tensor_mux_do_clip (GstCollectPads * pads, + GstCollectData * data, GstBuffer * buffer, GstBuffer ** out, + GstTensorMux * tensor_mux); static void gst_tensor_mux_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -208,6 +211,9 @@ gst_tensor_mux_init (GstTensorMux * tensor_mux) gst_collect_pads_set_function (tensor_mux->collect, (GstCollectPadsFunction) GST_DEBUG_FUNCPTR (gst_tensor_mux_collected), tensor_mux); + gst_collect_pads_set_clip_function (tensor_mux->collect, + (GstCollectPadsClipFunction) GST_DEBUG_FUNCPTR (gst_tensor_mux_do_clip), + tensor_mux); tensor_mux->silent = TRUE; tensor_mux->sync.mode = SYNC_SLOWEST; @@ -264,10 +270,21 @@ gst_tensor_mux_request_new_pad (GstElement * element, GstPadTemplate * templ, if (newpad) { GstTensorCollectPadData *tensormuxpad; + gboolean locked, waiting; + + locked = waiting = TRUE; + + if (tensor_mux->sync.mode == SYNC_REFRESH) { + locked = waiting = FALSE; + } tensormuxpad = (GstTensorCollectPadData *) gst_collect_pads_add_pad (tensor_mux->collect, newpad, - sizeof (GstTensorCollectPadData), NULL, TRUE); + sizeof (GstTensorCollectPadData), NULL, locked); + + /* NOTE: if locked is TRUE, waiting flag is not effective */ + gst_collect_pads_set_waiting (tensor_mux->collect, + (GstCollectData *) tensormuxpad, waiting); tensormuxpad->pad = newpad; gst_pad_set_element_private (newpad, tensormuxpad); @@ -298,6 +315,23 @@ gst_tensor_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) } /** + * @brief set pads waiting property + */ +static void +gst_tensor_mux_set_waiting (GstTensorMux * tensor_mux, gboolean waiting) +{ + if (tensor_mux->sync.mode == SYNC_REFRESH) { + GstCollectPads *pads = tensor_mux->collect; + GSList *walk = pads->data; + + while (walk) { + gst_collect_pads_set_waiting (pads, walk->data, waiting); + walk = g_slist_next (walk); + } + } +} + +/** * @brief sink event vmethod */ static gboolean @@ -310,6 +344,9 @@ gst_tensor_mux_sink_event (GstCollectPads * pads, GstCollectData * data, case GST_EVENT_FLUSH_STOP: tensor_mux->need_segment = TRUE; break; + case GST_EVENT_EOS: + gst_tensor_mux_set_waiting (tensor_mux, FALSE); + break; default: break; } @@ -410,6 +447,7 @@ gst_tensor_mux_collected (GstCollectPads * pads, GstTensorMux * tensor_mux) GstFlowReturn ret = GST_FLOW_OK; GstBuffer *tensors_buf; gboolean isEOS = FALSE; + gboolean buf_collected = FALSE; GST_DEBUG_OBJECT (tensor_mux, " all pads are collected "); @@ -425,7 +463,12 @@ gst_tensor_mux_collected (GstCollectPads * pads, GstTensorMux * tensor_mux) return GST_FLOW_ERROR; } - if (!gst_tensor_mux_collect_buffer (tensor_mux, tensors_buf, &isEOS)) { + buf_collected = + gst_tensor_mux_collect_buffer (tensor_mux, tensors_buf, &isEOS); + + gst_tensor_mux_set_waiting (tensor_mux, TRUE); + + if (!buf_collected) { if (isEOS) { gst_pad_push_event (tensor_mux->srcpad, gst_event_new_eos ()); ret = GST_FLOW_EOS; @@ -455,6 +498,18 @@ gst_tensor_mux_collected (GstCollectPads * pads, GstTensorMux * tensor_mux) } /** + * @brief Gst Clip Pads Function which is called right after a buffer is received for each pad. + */ +static GstFlowReturn +gst_tensor_mux_do_clip (GstCollectPads * pads, GstCollectData * data, + GstBuffer * buffer, GstBuffer ** out, GstTensorMux * tensor_mux) +{ + gst_tensor_mux_set_waiting (tensor_mux, FALSE); + *out = buffer; + return GST_FLOW_OK; +} + +/** * @brief Ready --> Pasuse State Change */ static void -- 2.7.4