[TensorMux] Code rewriting with GstCollectPads
authorjijoong.moon <jijoong.moon@samsung.com>
Fri, 20 Jul 2018 01:27:57 +0000 (10:27 +0900)
committer함명주/동작제어Lab(SR)/Principal Engineer/삼성전자 <myungjoo.ham@samsung.com>
Fri, 20 Jul 2018 06:41:33 +0000 (15:41 +0900)
In order to dealing wiht input stream, it is better to use
GstCollectPads. Therefore most of the codes are rewritten and
restructured.

Signed-off-by: jijoong.moon <jijoong.moon@samsung.com>
common/tensor_meta.c
gst/tensor_mux/gsttensormux.c
gst/tensor_mux/gsttensormux.h

index 5f405b8..8540f81 100644 (file)
@@ -105,19 +105,7 @@ static void
 gst_meta_tensor_free (GstMeta * meta, GstBuffer * buffer)
 {
   GstMetaTensor *emeta = (GstMetaTensor *) meta;
-
-  GList *l = emeta->dimensions;
-  while (l != NULL) {
-    GList *next = l->next;
-    g_free (next->data);
-  }
   g_list_free (emeta->dimensions);
-
-  l = emeta->types;
-  while (l != NULL) {
-    GList *next = l->next;
-    g_free (next->data);
-  }
   g_list_free (emeta->types);
 
   emeta->num_tensors = 0;
@@ -156,6 +144,9 @@ gst_buffer_add_meta_tensor (GstBuffer * buffer)
   meta =
       (GstMetaTensor *) gst_buffer_add_meta (buffer, GST_META_TENSOR_INFO,
       NULL);
+  meta->dimensions = NULL;
+  meta->types = NULL;
+  meta->ordering = NULL;
 
   return meta;
 }
@@ -186,7 +177,7 @@ _get_tensor_order (GstMetaTensor * meta, gint nth)
           g_list_insert (meta->ordering, GINT_TO_POINTER (nth), order);
       return order;
     } else if (nth == GPOINTER_TO_INT (list->data))
-      GST_ERROR_OBJECT (meta, "Pad odering is not consistent\n");
+      GST_ERROR_OBJECT (meta, "Pad odering is not consistent : nth %d\n", nth);
     th++;
   }
   order = th;
index c159c30..5fd51b3 100644 (file)
  * filesrc location=b.png ! pngdec ! videoscale ! imagefreeze ! videoconvert ! video/x-raw,format=RGB,width=100,height=100,framerate=0/1  ! tensor_converter ! mux.sink_1
  * filesrc location=b.png ! pngdec ! videoscale ! imagefreeze ! videoconvert ! video/x-raw,format=RGB,width=100,height=100,framerate=0/1  ! tensor_converter ! mux.sink_2
  * ]|
+ *
+ * |[
+ * gst-launch -v -m tensormux name=mux ! filesink location=mux.log
+ * multifilesrc location="testsequence_%1d.png" index=0 caps="image/png, framerate=(fraction)30/1" ! pngdec ! tensor_converter ! mux.sink_0
+ * multifilesrc location="testsequence_%1d.png" index=0 caps="image/png, framerate=(fraction)30/1" ! pngdec ! tensor_converter ! mux.sink_1
+ * multifilesrc location="testsequence_%1d.png" index=0 caps="image/png, framerate=(fraction)30/1" ! pngdec ! tensor_converter ! mux.sink_2
+ *
  * </refsect2>
  *
  */
@@ -65,17 +72,9 @@ GST_DEBUG_CATEGORY_STATIC (gst_tensor_mux_debug);
 enum
 {
   PROP_0,
-  PROP_TIMESTAMP_OFFSET,
   PROP_SILENT
 };
 
-GMutex buf_mutex;
-GCond buf_cond;
-guint32 buf_count = 0;
-gint num_sink = 0;
-
-#define DEFAULT_TIMESTAMP_OFFSET -1
-
 /**
  * @brief the capabilities of the inputs and outputs.
  * describe the real formats here.
@@ -92,26 +91,25 @@ static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink_%u",
     GST_STATIC_CAPS (GST_TENSOR_CAP_DEFAULT)
     );
 
-#define gst_tensor_mux_parent_class parent_class
+static void gst_tensor_mux_finalize (GObject * object);
 
+static gboolean gst_tensor_mux_handle_src_event (GstPad * pad,
+    GstObject * parent, GstEvent * event);
 static GstPad *gst_tensor_mux_request_new_pad (GstElement * element,
     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
-static void gst_tensor_mux_release_pad (GstElement * element, GstPad * pad);
-static GstFlowReturn
-gst_tensor_mux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer);
-static gboolean
-gst_tensor_mux_setcaps (GstPad * pad, GstTensorMux * tensor_mux,
-    GstCaps * caps);
-static gboolean gst_tensor_mux_sink_event (GstPad * pad, GstObject * parent,
-    GstEvent * event);
 static GstStateChangeReturn gst_tensor_mux_change_state (GstElement * element,
     GstStateChange transition);
+static gboolean gst_tensor_mux_sink_event (GstCollectPads * pads,
+    GstCollectData * data, GstEvent * event, GstTensorMux * tensor_mux);
+static GstFlowReturn gst_tensor_mux_collected (GstCollectPads * pads,
+    GstTensorMux * tesnor_mux);
+
 static void gst_tensor_mux_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
 static void gst_tensor_mux_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
-static void gst_tensor_mux_dispose (GObject * object);
 
+#define gst_tensor_mux_parent_class parent_class
 G_DEFINE_TYPE (GstTensorMux, gst_tensor_mux, GST_TYPE_ELEMENT);
 
 /**
@@ -126,32 +124,26 @@ gst_tensor_mux_class_init (GstTensorMuxClass * klass)
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
 
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sink_templ));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&src_templ));
+  parent_class = g_type_class_peek_parent (klass);
 
+  gobject_class->finalize = gst_tensor_mux_finalize;
   gobject_class->get_property = gst_tensor_mux_get_property;
   gobject_class->set_property = gst_tensor_mux_set_property;
-  gobject_class->dispose = gst_tensor_mux_dispose;
-
-  g_object_class_install_property (gobject_class,
-      PROP_TIMESTAMP_OFFSET, g_param_spec_int ("timestamp-offset",
-          "Timestamp Offset",
-          "Offset to add to all outgoing timestamps (-1 = random)", -1,
-          G_MAXINT, DEFAULT_TIMESTAMP_OFFSET,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   g_object_class_install_property (gobject_class, PROP_SILENT,
       g_param_spec_boolean ("silent", "Silent", "Produce verbose output ?",
           FALSE, G_PARAM_READWRITE));
 
   gstelement_class->request_new_pad =
       GST_DEBUG_FUNCPTR (gst_tensor_mux_request_new_pad);
-  gstelement_class->release_pad =
-      GST_DEBUG_FUNCPTR (gst_tensor_mux_release_pad);
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_tensor_mux_change_state);
 
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&sink_templ));
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&src_templ));
+
   gst_element_class_set_details_simple (gstelement_class,
       "tensormux",
       "Mux multiple tensor stream",
@@ -161,31 +153,6 @@ gst_tensor_mux_class_init (GstTensorMuxClass * klass)
 }
 
 /**
- * @brief dispose tensor mux / tensor mux pad object
- * @param object GstTensorMux*
- */
-static void
-gst_tensor_mux_dispose (GObject * object)
-{
-  GstTensorMux *tensor_mux = GST_TENSOR_MUX (object);
-  GList *item;
-
-  g_clear_object (&tensor_mux->last_pad);
-
-restart:
-  for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
-    GstPad *pad = GST_PAD (item->data);
-    if (GST_PAD_IS_SINK (pad)) {
-      gst_element_release_request_pad (GST_ELEMENT (object), pad);
-      goto restart;
-    }
-  }
-
-  G_OBJECT_CLASS (gst_tensor_mux_parent_class)->dispose (object);
-}
-
-
-/**
  * @brief initialize the new element
  * instantiate pads and add them to element
  * set pad calback functions
@@ -195,47 +162,42 @@ static void
 gst_tensor_mux_init (GstTensorMux * tensor_mux)
 {
   GstElementClass *klass = GST_ELEMENT_GET_CLASS (tensor_mux);
+
   tensor_mux->srcpad =
       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
           "src"), "src");
-  gst_pad_use_fixed_caps (tensor_mux->srcpad);
+  gst_pad_set_event_function (tensor_mux->srcpad,
+      gst_tensor_mux_handle_src_event);
+
   gst_element_add_pad (GST_ELEMENT (tensor_mux), tensor_mux->srcpad);
 
+  tensor_mux->collect = gst_collect_pads_new ();
+  gst_collect_pads_set_event_function (tensor_mux->collect,
+      (GstCollectPadsEventFunction)
+      GST_DEBUG_FUNCPTR (gst_tensor_mux_sink_event), tensor_mux);
+  gst_collect_pads_set_function (tensor_mux->collect,
+      (GstCollectPadsFunction) GST_DEBUG_FUNCPTR (gst_tensor_mux_collected),
+      tensor_mux);
+
   tensor_mux->num_tensors = 0;
-  tensor_mux->byte_count = 0;
   tensor_mux->silent = FALSE;
   tensor_mux->rank = 0;
-  tensor_mux->dimensions = g_string_new (NULL);
-  tensor_mux->types = g_string_new (NULL);
-  tensor_mux->outbuffer = gst_buffer_new ();
-  gst_make_tensors (tensor_mux->outbuffer);
-  g_cond_init (&buf_cond);
-  g_mutex_init (&buf_mutex);
 }
 
 /**
- * @brief Setup Sink Pad for tensor mux
- * Initialize pad private data & set functions for pad
- * @param tensor_mux GstTensorMux Pointer
- * @param sinkpad Sink pad
+ * @brief finalize vmethod
  */
 static void
-gst_tensor_mux_setup_sinkpad (GstTensorMux * tensor_mux, GstPad * sinkpad)
+gst_tensor_mux_finalize (GObject * object)
 {
-  GstTensorMuxPadPrivate *padpriv = g_slice_new0 (GstTensorMuxPadPrivate);
-  padpriv->done = FALSE;
-  gst_pad_set_chain_function (sinkpad,
-      GST_DEBUG_FUNCPTR (gst_tensor_mux_chain));
-  gst_pad_set_event_function (sinkpad,
-      GST_DEBUG_FUNCPTR (gst_tensor_mux_sink_event));
-
-  gst_pad_set_element_private (sinkpad, padpriv);
+  GstTensorMux *tensor_mux;
+  tensor_mux = GST_TENSOR_MUX (object);
 
-  gst_pad_set_active (sinkpad, TRUE);
-  gst_element_add_pad (GST_ELEMENT (tensor_mux), sinkpad);
+  if (tensor_mux->collect)
+    gst_object_unref (tensor_mux->collect);
+  G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
-
 /**
  * @brief making new request pad (gst element vmethod)
  */
@@ -245,24 +207,26 @@ gst_tensor_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
 {
   GstPad *newpad;
   GstTensorMux *tensor_mux;
+  gchar *name;
 
   g_return_val_if_fail (templ != NULL, NULL);
   g_return_val_if_fail (GST_IS_TENSOR_MUX (element), NULL);
 
   tensor_mux = GST_TENSOR_MUX (element);
 
-  if (templ->direction != GST_PAD_SINK) {
-    GST_WARNING_OBJECT (tensor_mux, "request pad that is not a SINK pad");
-    return NULL;
-  }
-
-  newpad = gst_pad_new_from_template (templ, req_name);
+  name = g_strdup_printf ("sink_%u", tensor_mux->num_tensors);
+  newpad = gst_pad_new_from_template (templ, name);
+  g_free (name);
 
   if (newpad) {
-    GST_OBJECT_LOCK (tensor_mux);
+    GstTensorMuxPadData *tensormuxpad;
+    tensormuxpad =
+        (GstTensorMuxPadData *) gst_collect_pads_add_pad (tensor_mux->collect,
+        newpad, sizeof (GstTensorMuxPadData), NULL, TRUE);
+    tensormuxpad->pad = newpad;
+    gst_pad_set_element_private (newpad, tensormuxpad);
     tensor_mux->num_tensors++;
-    GST_OBJECT_UNLOCK (tensor_mux);
-    gst_tensor_mux_setup_sinkpad (tensor_mux, newpad);
+    gst_element_add_pad (element, newpad);
   } else {
     GST_WARNING_OBJECT (tensor_mux, "failed to create request pad");
   }
@@ -270,335 +234,266 @@ gst_tensor_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
 }
 
 /**
- * @brief release request pad (gst element vmethod)
+ * @brief src event vmethod
  */
-static void
-gst_tensor_mux_release_pad (GstElement * element, GstPad * pad)
+static gboolean
+gst_tensor_mux_handle_src_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
 {
-  GstTensorMuxPadPrivate *padpriv;
-  GstTensorMux *tensor_mux = GST_TENSOR_MUX_CAST (element);
-
-  GST_DEBUG_OBJECT (tensor_mux, "releaseing pad %s:%s",
-      GST_DEBUG_PAD_NAME (pad));
-
-  gst_pad_set_active (pad, FALSE);
-
-  GST_OBJECT_LOCK (element);
-  padpriv = gst_pad_get_element_private (pad);
-  gst_pad_set_element_private (pad, NULL);
-  GST_OBJECT_UNLOCK (element);
-  gst_element_remove_pad (GST_ELEMENT_CAST (tensor_mux), pad);
-  if (padpriv) {
-    g_slice_free (GstTensorMuxPadPrivate, padpriv);
+  GstEventType type;
+  type = event ? GST_EVENT_TYPE (event) : GST_EVENT_UNKNOWN;
+  switch (type) {
+    case GST_EVENT_SEEK:
+      return FALSE;
+    default:
+      break;
   }
+
+  return gst_pad_event_default (pad, parent, event);
 }
 
 /**
- * @brief resend events for sticky events
+ * @brief sink event vmethod
  */
 static gboolean
-resend_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+gst_tensor_mux_sink_event (GstCollectPads * pads, GstCollectData * data,
+    GstEvent * event, GstTensorMux * tensor_mux)
 {
-  GstTensorMux *tensor_mux = user_data;
-
-  if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) {
-    GstCaps *caps;
-
-    gst_event_parse_caps (*event, &caps);
-    gst_tensor_mux_setcaps (pad, tensor_mux, caps);
-  } else if (GST_EVENT_TYPE (*event) == GST_EVENT_SEGMENT) {
-    GstSegment new_segment;
-    gst_segment_init (&new_segment, GST_FORMAT_TIME);
-    gst_pad_push_event (tensor_mux->srcpad,
-        gst_event_new_segment (&new_segment));
-  } else {
-    gst_pad_push_event (tensor_mux->srcpad, gst_event_ref (*event));
+  gboolean ret;
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_STOP:
+    {
+      tensor_mux->need_segment = TRUE;
+      break;
+    }
+    default:
+      break;
   }
 
-  return TRUE;
+  ret = gst_collect_pads_event_default (pads, data, event, FALSE);
+  return ret;
 }
 
 /**
- * @brief push tensor in tensors
- * @param pad Sink pad
- * @param tensor_mux Tensor Mux
- * @param buffer input buffer
+ * @brief Compare dts & pts time and find earliest
+ * @param tensor_mux tensor muxer
+ * @param old previous mux pad data
+ * @param new current mux pad data
+ * @return if > 0, new is earlier than old
  */
-static gboolean
-gst_push_tensor (GstPad * pad, GstTensorMux * tensor_mux, GstBuffer * buffer,
-    gint nth)
+static gint
+gst_tensor_mux_compare_pads (GstTensorMux * tensor_mux,
+    GstTensorMuxPadData * old, GstTensorMuxPadData * new)
 {
-  tensor_dim dim;
-  GstMemory *mem;
-  gboolean ret = FALSE;
-  tensor_type tensor_type;
-  GstTensor_Filter_CheckStatus status;
-
-  GstCaps *caps = gst_pad_get_current_caps (pad);
+  guint64 oldtime, newtime;
+  if (old == NULL || old->buffer == NULL)
+    return 1;
+  if (new == NULL || new->buffer == NULL)
+    return -1;
+  if (GST_CLOCK_TIME_IS_VALID (old->dts_timestamp) &&
+      GST_CLOCK_TIME_IS_VALID (new->dts_timestamp)) {
+    oldtime = old->dts_timestamp;
+    newtime = new->dts_timestamp;
+  } else {
+    oldtime = old->pts_timestamp;
+    newtime = new->pts_timestamp;
+  }
 
-  status = get_tensor_from_padcap (caps, dim, &tensor_type, NULL, NULL);
-  g_assert ((status & _TFC_ALL) == _TFC_ALL);
+  if (oldtime == GST_CLOCK_TIME_NONE)
+    return -1;
+  if (newtime == GST_CLOCK_TIME_NONE)
+    return 1;
 
-  mem = gst_buffer_get_memory (buffer, 0);
-  gst_memory_ref (mem);
-  if (gst_append_tensor (tensor_mux->outbuffer, mem, dim, tensor_type, nth))
-    ret = TRUE;
+  if (newtime < oldtime)
+    return 1;
+  else if (newtime > oldtime)
+    return -1;
 
-  return ret;
+  return 0;
 }
 
 /**
- * @brief chain function (gst element vmethod)
+ * @brief Looping to generete outbut buffer for srcpad
+ * @param tensor_mux tensor muxer
+ * @param tensors_buf output buffer for srcpad
+ * @param dimensions collected dimensions as string
+ * @param types collected types as string
+ * @param pts_time earliest pts time (present timestamp)
+ * @param dts_time earliest dts time (decoding timestamp)
+ * @return isEOS boolean EOS ( End of Stream )
  */
-static GstFlowReturn
-gst_tensor_mux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+gboolean
+gst_tensor_mux_collect_buffer (GstTensorMux * tensor_mux,
+    GstBuffer * tensors_buf, GString * dimensions, GString * types,
+    GstClockTime * pts_time, GstClockTime * dts_time)
 {
-  GstTensorMux *tensor_mux;
-  GstFlowReturn ret;
-  GstTensorMuxPadPrivate *padpriv;
-  gboolean changed = FALSE;
-  tensor_mux = GST_TENSOR_MUX (parent);
-
-  GST_DEBUG_OBJECT (pad, "recevied %" GST_PTR_FORMAT, buffer);
-
-  if (gst_pad_check_reconfigure (tensor_mux->srcpad)) {
-    GstCaps *current_caps = gst_pad_get_current_caps (pad);
-    if (!gst_tensor_mux_setcaps (pad, tensor_mux, current_caps)) {
-      gst_pad_mark_reconfigure (tensor_mux->srcpad);
-      if (GST_PAD_IS_FLUSHING (tensor_mux->srcpad))
-        ret = GST_FLOW_FLUSHING;
-      else
-        ret = GST_FLOW_NOT_NEGOTIATED;
-      goto out;
-    }
-    gst_caps_unref (current_caps);
-  }
-  GST_OBJECT_LOCK (tensor_mux);
-  padpriv = gst_pad_get_element_private (pad);
+  GSList *walk = NULL;
+  GstTensorMuxPadData *bestpad = NULL;
 
-  if (!padpriv) {
-    GST_OBJECT_UNLOCK (tensor_mux);
-    gst_buffer_unref (buffer);
-    return GST_FLOW_NOT_LINKED;
-  }
-
-  buffer = gst_buffer_make_writable (buffer);
-
-  if (pad != tensor_mux->last_pad) {
-    changed = TRUE;
-    g_clear_object (&tensor_mux->last_pad);
-    tensor_mux->last_pad = g_object_ref (pad);
-
-    if (!gst_push_tensor (pad, tensor_mux, buffer, padpriv->nth))
-      GST_ERROR_OBJECT (tensor_mux, "Cannot append GstMemory\n");
-
-    g_mutex_lock (&buf_mutex);
-    buf_count++;
-    g_cond_signal (&buf_cond);
-    g_mutex_unlock (&buf_mutex);
-  }
-
-  if (GST_BUFFER_DURATION_IS_VALID (buffer) && GST_BUFFER_PTS_IS_VALID (buffer))
-    tensor_mux->last_stop =
-        GST_BUFFER_PTS (buffer) + GST_BUFFER_DURATION (buffer);
-  else
-    tensor_mux->last_stop = GST_CLOCK_TIME_NONE;
-
-  gst_buffer_unref (buffer);
-  GST_OBJECT_UNLOCK (tensor_mux);
+  tensor_dim dim;
+  tensor_type type;
+  GstMemory *mem;
+  GstTensor_Filter_CheckStatus status;
+  gboolean isEOS = FALSE;
+  gint i;
+
+  gint old_numerator = 2147483647, old_denominator = 2147483647;
+  gint new_numerator, new_denominator;
+  gint counting = 0;
+  tensor_mux->rank = NNS_TENSOR_RANK_LIMIT;
+
+  walk = tensor_mux->collect->data;
+
+  while (walk) {
+    GstCollectData *data = (GstCollectData *) walk->data;
+    GstTensorMuxPadData *pad = (GstTensorMuxPadData *) data;
+
+    GstCaps *caps = gst_pad_get_current_caps (pad->pad);
+    status =
+        get_tensor_from_padcap (caps, dim, &type, &new_numerator,
+        &new_denominator);
+    g_assert ((status & _TFC_ALL) == _TFC_ALL);
+
+    if (new_denominator < old_denominator)
+      old_denominator = new_denominator;
+    if (new_numerator < old_numerator)
+      old_numerator = new_numerator;
+
+    if (dimensions->len != 0)
+      dimensions = g_string_append (dimensions, ",");
+    for (i = 0; i < NNS_TENSOR_RANK_LIMIT; i++) {
+      dimensions = g_string_append (dimensions, g_strdup_printf ("%d", dim[i]));
+      if (i < NNS_TENSOR_RANK_LIMIT - 1)
+        dimensions = g_string_append (dimensions, ":");
+    }
+    if (types->len != 0)
+      types = g_string_append (types, ",");
+    types = g_string_append (types, tensor_element_typename[type]);
+    gst_caps_unref (caps);
+    walk = g_slist_next (walk);
+    GstBuffer *buf = NULL;
+    buf = gst_collect_pads_pop (tensor_mux->collect, data);
+    if (buf && GST_BUFFER_PTS_IS_VALID (buf)) {
+      pad->pts_timestamp =
+          gst_segment_to_running_time (&data->segment, GST_FORMAT_TIME,
+          GST_BUFFER_PTS (buf));
+    } else {
+      pad->pts_timestamp = GST_CLOCK_TIME_NONE;
+    }
+    if (buf && GST_BUFFER_DTS_IS_VALID (buf)) {
+      pad->dts_timestamp =
+          gst_segment_to_running_time (&data->segment, GST_FORMAT_TIME,
+          GST_BUFFER_DTS (buf));
+    } else {
+      pad->dts_timestamp = GST_CLOCK_TIME_NONE;
+    }
 
-  g_mutex_lock (&buf_mutex);
-  while (buf_count != tensor_mux->num_tensors) {
-    g_cond_wait (&buf_cond, &buf_mutex);
-    g_cond_signal (&buf_cond);
+    pad->buffer = buf;
+    if (GST_IS_BUFFER (buf)) {
+      gst_buffer_unref (pad->buffer);
+      mem = gst_buffer_get_memory (buf, 0);
+      gst_memory_ref (mem);
+      if (!gst_append_tensor (tensors_buf, mem, dim, type, counting))
+        return FALSE;
+      if (pad->buffer != NULL)
+        if (gst_tensor_mux_compare_pads (tensor_mux, bestpad, pad) > 0) {
+          bestpad = pad;
+          *pts_time = bestpad->pts_timestamp;
+          *dts_time = bestpad->dts_timestamp;
+        }
+    } else {
+      isEOS = TRUE;
+    }
+    counting++;
   }
-  g_mutex_unlock (&buf_mutex);
 
-  if (changed)
-    gst_pad_sticky_events_foreach (pad, resend_events, tensor_mux);
+  tensor_mux->framerate_denominator = old_denominator;
+  tensor_mux->framerate_numerator = old_numerator;
 
-  if (pad == tensor_mux->last_pad) {
-    gst_buffer_ref (tensor_mux->outbuffer);
-    ret = gst_pad_push (tensor_mux->srcpad, tensor_mux->outbuffer);
-  } else {
-    ret = GST_FLOW_OK;
-  }
-
-out:
-  return ret;
+  return isEOS;
 }
 
-
 /**
- * @brief set caps (gst element vmethod)
+ * @brief Gst Collect Pads Function which is called once collect pads done.
+ * @param pads GstCollectPads
+ * @param tensor_mux Muxer
+ * @return GstFlowReturn
  */
-static gboolean
-gst_tensor_mux_setcaps (GstPad * pad, GstTensorMux * tensor_mux, GstCaps * caps)
+static GstFlowReturn
+gst_tensor_mux_collected (GstCollectPads * pads, GstTensorMux * tensor_mux)
 {
-  GstStructure *structure;
-  gboolean ret = FALSE;
-  GstTensorMuxPadPrivate *padpriv;
-  GstCaps *peercaps, *src_caps;
-  gint dim;
-  const gchar *t;
-
-  padpriv = gst_pad_get_element_private (pad);
-  if (padpriv->done)
-    return TRUE;
-  if (!gst_caps_is_fixed (caps))
-    return FALSE;
-
-  peercaps = gst_pad_peer_query_caps (tensor_mux->srcpad, NULL);
-  if (peercaps) {
-    GstCaps *tcaps, *othercaps;
-    tcaps = gst_pad_get_pad_template_caps (pad);
-    othercaps =
-        gst_caps_intersect_full (peercaps, tcaps, GST_CAPS_INTERSECT_FIRST);
-
-    if (gst_caps_get_size (othercaps) > 0) {
-      structure = gst_caps_get_structure (othercaps, 0);
-      GST_OBJECT_LOCK (tensor_mux);
-      GST_OBJECT_UNLOCK (tensor_mux);
-    }
-
-    gst_caps_unref (othercaps);
-    gst_caps_unref (peercaps);
-    gst_caps_unref (tcaps);
-  }
-
-
-  structure = gst_caps_get_structure (caps, 0);
-  if (!structure)
-    return FALSE;
-
-  GST_OBJECT_LOCK (tensor_mux);
-  gst_structure_get_int (structure, "rank", &tensor_mux->rank);
-  gst_structure_get_int (structure, "dim1", &dim);
-  if (tensor_mux->dimensions->len != 0)
-    tensor_mux->dimensions = g_string_append (tensor_mux->dimensions, ",");
-  tensor_mux->dimensions =
-      g_string_append (tensor_mux->dimensions, g_strdup_printf ("%d", dim));
-  tensor_mux->dimensions = g_string_append (tensor_mux->dimensions, ":");
-  gst_structure_get_int (structure, "dim2", &dim);
-  tensor_mux->dimensions =
-      g_string_append (tensor_mux->dimensions, g_strdup_printf ("%d", dim));
-  tensor_mux->dimensions = g_string_append (tensor_mux->dimensions, ":");
-  gst_structure_get_int (structure, "dim3", &dim);
-  tensor_mux->dimensions =
-      g_string_append (tensor_mux->dimensions, g_strdup_printf ("%d", dim));
-  tensor_mux->dimensions = g_string_append (tensor_mux->dimensions, ":");
-  gst_structure_get_int (structure, "dim4", &dim);
-  tensor_mux->dimensions =
-      g_string_append (tensor_mux->dimensions, g_strdup_printf ("%d", dim));
-
-  if (tensor_mux->types->len != 0)
-    tensor_mux->types = g_string_append (tensor_mux->types, ",");
-  t = gst_structure_get_string (structure, "type");
-  tensor_mux->types = g_string_append (tensor_mux->types, t);
-
-
-  gst_structure_get_fraction (structure, "framerate",
-      &tensor_mux->framerate_numerator, &tensor_mux->framerate_denominator);
-
-  padpriv = gst_pad_get_element_private (pad);
-  if (padpriv
-      && gst_structure_get_uint (structure, "timestamps-offset",
-          &padpriv->timestamp_offset)) {
-    padpriv->have_timestamp_offset = TRUE;
-  }
-
-  padpriv->done = TRUE;
-  padpriv->nth = num_sink;
-  num_sink++;
-  src_caps = gst_caps_new_simple ("other/tensors",
-      "rank", G_TYPE_INT, tensor_mux->rank,
-      "num_tensors", G_TYPE_INT, tensor_mux->num_tensors,
-      "types", G_TYPE_STRING, tensor_mux->types->str,
-      "framerate", GST_TYPE_FRACTION, tensor_mux->framerate_numerator,
-      tensor_mux->framerate_denominator, "dimensions", G_TYPE_STRING,
-      tensor_mux->dimensions->str, NULL);
-
-  GST_OBJECT_UNLOCK (tensor_mux);
-
-  if (tensor_mux->send_stream_start) {
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstBuffer *tensors_buf;
+  GString *dimensions = g_string_new (NULL);
+  GString *types = g_string_new (NULL);
+  GstClockTime pts_time, dts_time;
+  GstClockTime time = 0;
+  gboolean isEOS = FALSE;
+  GST_DEBUG_OBJECT (tensor_mux, " all pads are collected ");
+  if (tensor_mux->need_stream_start) {
     gchar s_id[32];
-
-    g_snprintf (s_id, sizeof (s_id), "interleave-%08x", g_random_int ());
+    g_snprintf (s_id, sizeof (s_id), " tensormux - %08x ", g_random_int ());
     gst_pad_push_event (tensor_mux->srcpad, gst_event_new_stream_start (s_id));
-
-    tensor_mux->send_stream_start = FALSE;
+    tensor_mux->need_stream_start = FALSE;
   }
 
-  GST_DEBUG_OBJECT (tensor_mux,
-      "setting caps %" GST_PTR_FORMAT " on src pad..", src_caps);
-
-  ret = gst_pad_set_caps (tensor_mux->srcpad, src_caps);
-  gst_caps_unref (src_caps);
-
-  return ret;
-}
+  tensors_buf = gst_buffer_new ();
+  gst_make_tensors (tensors_buf);
+  isEOS =
+      gst_tensor_mux_collect_buffer (tensor_mux, tensors_buf, dimensions,
+      types, &pts_time, &dts_time);
+  if (isEOS) {
+    if (tensors_buf)
+      gst_buffer_unref (tensors_buf);
+    gst_pad_push_event (tensor_mux->srcpad, gst_event_new_eos ());
+    ret = GST_FLOW_EOS;
+    goto beach;
+  }
 
-/**
- * @brief event function (gst element vmethod)
- */
-static gboolean
-gst_tensor_mux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
-{
-  GstTensorMux *mux = GST_TENSOR_MUX (parent);
-  gboolean is_pad;
-  gboolean ret = TRUE;
+  if (!tensor_mux->negotiated) {
+    GstCaps *newcaps;
+    newcaps =
+        gst_caps_new_simple ("other/tensors", "rank", G_TYPE_INT,
+        tensor_mux->rank, "num_tensors", G_TYPE_INT,
+        tensor_mux->num_tensors, "types", G_TYPE_STRING,
+        types->str, "framerate", GST_TYPE_FRACTION,
+        tensor_mux->framerate_numerator, tensor_mux->framerate_denominator,
+        "dimensions", G_TYPE_STRING, dimensions->str, NULL);
+    if (!gst_pad_set_caps (tensor_mux->srcpad, newcaps)) {
+      gst_caps_unref (newcaps);
+      goto nego_error;
+    }
 
-  GST_OBJECT_LOCK (mux);
-  is_pad = (pad == mux->last_pad);
-  GST_OBJECT_UNLOCK (mux);
+    gst_caps_unref (newcaps);
+    tensor_mux->negotiated = TRUE;
+  }
 
-  switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_CAPS:
-    {
-      GstCaps *caps;
-      gst_event_parse_caps (event, &caps);
-      GST_LOG_OBJECT (pad, "Received caps-event with caps: %"
-          GST_PTR_FORMAT, caps);
-      ret = gst_tensor_mux_setcaps (pad, mux, caps);
-      gst_event_unref (event);
-      return ret;
-    }
-      break;
-    case GST_EVENT_FLUSH_STOP:
-    {
-      GST_OBJECT_LOCK (mux);
-      mux->last_stop = GST_CLOCK_TIME_NONE;
-      GST_OBJECT_UNLOCK (mux);
-      break;
-    }
-    case GST_EVENT_SEGMENT:
-    {
-      GstTensorMuxPadPrivate *padpriv;
-      GST_OBJECT_LOCK (mux);
-      padpriv = gst_pad_get_element_private (pad);
-
-      if (padpriv) {
-        gst_event_copy_segment (event, &padpriv->segment);
-      }
-      GST_OBJECT_UNLOCK (mux);
-
-      if (is_pad) {
-        GstSegment new_segment;
-        gst_segment_init (&new_segment, GST_FORMAT_TIME);
-        gst_event_unref (event);
-        event = gst_event_new_segment (&new_segment);
-      }
-      break;
+  if (tensor_mux->need_segment) {
+    GstSegment segment;
+    if (dts_time != GST_CLOCK_TIME_NONE) {
+      time = dts_time;
+    } else if (pts_time != GST_CLOCK_TIME_NONE) {
+      time = pts_time;
+    } else {
+      time = 0;
     }
-    default:
-      break;
+
+    gst_segment_init (&segment, GST_FORMAT_TIME);
+    segment.start = time;
+    gst_pad_push_event (tensor_mux->srcpad, gst_event_new_segment (&segment));
+    tensor_mux->need_segment = FALSE;
   }
 
-  if (is_pad) {
-    return gst_pad_push_event (mux->srcpad, event);
-  } else {
-    gst_event_unref (event);
-    return ret;
+  ret = gst_pad_push (tensor_mux->srcpad, tensors_buf);
+  if (ret != GST_FLOW_OK)
+    goto beach;
+beach:
+  return ret;
+nego_error:
+  {
+    GST_WARNING_OBJECT (tensor_mux, "failed to set caps");
+    GST_ELEMENT_ERROR (tensor_mux, CORE, NEGOTIATION, (NULL), (NULL));
+    return GST_FLOW_NOT_NEGOTIATED;
   }
 }
 
@@ -608,10 +503,10 @@ gst_tensor_mux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 static void
 gst_tensor_mux_ready_to_paused (GstTensorMux * tensor_mux)
 {
-  GST_OBJECT_LOCK (tensor_mux);
-  g_clear_object (&tensor_mux->last_pad);
-  tensor_mux->send_stream_start = TRUE;
-  GST_OBJECT_UNLOCK (tensor_mux);
+  tensor_mux->need_stream_start = TRUE;
+  tensor_mux->need_segment = TRUE;
+  tensor_mux->negotiated = FALSE;
+  gst_collect_pads_start (tensor_mux->collect);
 }
 
 /**
@@ -622,23 +517,23 @@ gst_tensor_mux_change_state (GstElement * element, GstStateChange transition)
 {
   GstTensorMux *tensor_mux;
   GstStateChangeReturn ret;
-
   tensor_mux = GST_TENSOR_MUX (element);
-
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       gst_tensor_mux_ready_to_paused (tensor_mux);
       break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      gst_collect_pads_stop (tensor_mux->collect);
+      break;
     default:
       break;
   }
 
-  ret = GST_ELEMENT_CLASS (gst_tensor_mux_parent_class)->change_state (element,
-      transition);
-
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  if (ret == GST_STATE_CHANGE_FAILURE)
+    return ret;
   switch (transition) {
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      g_clear_object (&tensor_mux->last_pad);
       break;
     default:
       break;
@@ -655,7 +550,6 @@ gst_tensor_mux_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
 {
   GstTensorMux *filter = GST_TENSOR_MUX (object);
-
   switch (prop_id) {
     case PROP_SILENT:
       filter->silent = g_value_get_boolean (value);
@@ -674,7 +568,6 @@ gst_tensor_mux_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec)
 {
   GstTensorMux *filter = GST_TENSOR_MUX (object);
-
   switch (prop_id) {
     case PROP_SILENT:
       g_value_set_boolean (value, filter->silent);
@@ -701,7 +594,7 @@ gst_tensor_mux_get_property (GObject * object, guint prop_id,
  * initialize the plug-in itself
  * register the element factories and other features
  */
-static gboolean
+gboolean
 gst_tensor_mux_plugin_init (GstPlugin * tensormux)
 {
   /** debug category for fltering log messages
@@ -709,7 +602,6 @@ gst_tensor_mux_plugin_init (GstPlugin * tensormux)
    */
   GST_DEBUG_CATEGORY_INIT (gst_tensor_mux_debug, "tensormux", 0,
       "Tensor Muxer");
-
   return gst_element_register (tensormux, "tensormux",
       GST_RANK_NONE, GST_TYPE_TENSOR_MUX);
 }
index 5356259..3f1c9de 100644 (file)
 #define __GST_TENSOR_MUX_H__
 
 #include <gst/gst.h>
+#include <gst/base/gstcollectpads.h>
 #include <tensor_common.h>
 #include <tensor_meta.h>
 
 G_BEGIN_DECLS
-
 #define GST_TYPE_TENSOR_MUX (gst_tensor_mux_get_type ())
 #define GST_TENSOR_MUX(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TENSOR_MUX, GstTensorMux))
 #define GST_TENSOR_MUX_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TENSOR_MUX, GstTensorMuxClass))
 #define GST_TENSOR_MUX_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_TENSOR_MUX, GstTensorMuxClass))
 #define GST_IS_TENSOR_MUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TENSOR_MUX))
-#define GST_IS_TENSOR_MUX_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TENSOR_MUX))
+#define GST_IS_TENSOR_MUX_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TENSOR_MUX))
 #define GST_TENSOR_MUX_CAST(obj)((GstTensorMux*)(obj))
-
 typedef struct _GstTensorMux GstTensorMux;
 typedef struct _GstTensorMuxClass GstTensorMuxClass;
 
-/**
- * @brief Tensor Muxer pad private data structure
- */
 typedef struct
 {
-  gboolean have_timestamp_offset;
-  guint timestamp_offset;
-
-  GstSegment segment;
-
-  gboolean done;
-  gboolean priority;
-  gint nth;
-} GstTensorMuxPadPrivate;
+  GstCollectData collect;
+  GstBuffer *buffer;
+  GstClockTime pts_timestamp;
+  GstClockTime dts_timestamp;
+  GstPad *pad;
 
+  gboolean have_timestamp_offset;
+} GstTensorMuxPadData;
 
 /**
  * @brief Tensor Muxer data structure
@@ -71,28 +65,27 @@ struct _GstTensorMux
 {
   GstElement element;
 
-  guint64 byte_count;
   gboolean silent;
   GstPad *srcpad;
-  GstPad *last_pad;
-  GstBuffer *outbuffer;
 
-  GString *dimensions;
+  GstCollectPads *collect;
+  gboolean negotiated;
+  gboolean need_segment;
+  gboolean need_stream_start;
+
   guint32 num_tensors;
   gint rank;
-  GString *types;
   gint framerate_numerator;
   gint framerate_denominator;
-  GstClockTime last_stop;
   gboolean send_stream_start;
 };
 
 /**
  * @brief GstTensroMuxClass inherits GstElementClass
  */
-struct _GstTensorMuxClass {
+struct _GstTensorMuxClass
+{
   GstElementClass parent_class;
-  /** gboolean (*src_event) (GstTensorMux *tensor_mux, GstEvent *event); */
 };
 
 /**
@@ -101,5 +94,4 @@ struct _GstTensorMuxClass {
 GType gst_tensor_mux_get_type (void);
 
 G_END_DECLS
-
 #endif  /** __GST_TENSOR_MUX_H__ **/