sctpdec: Use a flow combiner for the source pad flow returns and propagate errors...
authorSebastian Dröge <sebastian@centricular.com>
Thu, 30 Jan 2020 13:56:36 +0000 (15:56 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Thu, 30 Jan 2020 13:56:36 +0000 (15:56 +0200)
Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/1180

ext/sctp/gstsctpdec.c
ext/sctp/gstsctpdec.h

index 48a0175..51a1638 100644 (file)
@@ -143,6 +143,7 @@ static void gst_sctp_dec_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
 static void gst_sctp_dec_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
+static void gst_sctp_dec_finalize (GObject * object);
 static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element,
     GstStateChange transition);
 static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self,
@@ -160,7 +161,7 @@ static void stop_srcpad_task (GstPad * pad);
 static void stop_all_srcpad_tasks (GstSctpDec * self);
 static void sctpdec_cleanup (GstSctpDec * self);
 static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id);
-static void remove_pad (GstElement * element, GstPad * pad);
+static void remove_pad (GstSctpDec * self, GstPad * pad);
 static void on_reset_stream (GstSctpDec * self, guint stream_id);
 
 static void
@@ -182,6 +183,7 @@ gst_sctp_dec_class_init (GstSctpDecClass * klass)
 
   gobject_class->set_property = gst_sctp_dec_set_property;
   gobject_class->get_property = gst_sctp_dec_get_property;
+  gobject_class->finalize = gst_sctp_dec_finalize;
 
   element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state);
 
@@ -223,6 +225,8 @@ gst_sctp_dec_init (GstSctpDec * self)
   self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
   self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT;
 
+  self->flow_combiner = gst_flow_combiner_new ();
+
   self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink");
   gst_pad_set_chain_function (self->sink_pad,
       GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain));
@@ -270,6 +274,17 @@ gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value,
   }
 }
 
+static void
+gst_sctp_dec_finalize (GObject * object)
+{
+  GstSctpDec *self = GST_SCTP_DEC (object);
+
+  gst_flow_combiner_free (self->flow_combiner);
+  self->flow_combiner = NULL;
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
 static GstStateChangeReturn
 gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
 {
@@ -278,11 +293,13 @@ gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
 
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
+      gst_flow_combiner_reset (self->flow_combiner);
       if (!configure_association (self))
         ret = GST_STATE_CHANGE_FAILURE;
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       sctpdec_cleanup (self);
+      gst_flow_combiner_reset (self->flow_combiner);
       break;
     default:
       break;
@@ -297,6 +314,7 @@ gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
 static GstFlowReturn
 gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
 {
+  GstFlowReturn flow_ret;
   GstMapInfo map;
 
   if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
@@ -310,7 +328,16 @@ gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
   gst_buffer_unmap (buf, &map);
   gst_buffer_unref (buf);
 
-  return GST_FLOW_OK;
+  GST_OBJECT_LOCK (self);
+  /* This gets the last combined flow return from all source pads */
+  flow_ret = gst_flow_combiner_update_flow (self->flow_combiner, GST_FLOW_OK);
+  GST_OBJECT_UNLOCK (self);
+
+  if (flow_ret != GST_FLOW_OK) {
+    GST_DEBUG_OBJECT (self, "Returning %s", gst_flow_get_name (flow_ret));
+  }
+
+  return flow_ret;
 }
 
 static void
@@ -373,14 +400,25 @@ gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
 static void
 gst_sctp_data_srcpad_loop (GstPad * pad)
 {
+  GstSctpDec *self;
   GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
   GstDataQueueItem *item;
 
+  self = GST_SCTP_DEC (gst_pad_get_parent (pad));
+
   if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) {
+    GstBuffer *buffer;
     GstFlowReturn flow_ret;
 
-    flow_ret = gst_pad_push (pad, GST_BUFFER (item->object));
+    buffer = GST_BUFFER (item->object);
+
+    flow_ret = gst_pad_push (pad, buffer);
     item->object = NULL;
+
+    GST_OBJECT_LOCK (self);
+    gst_flow_combiner_update_pad_flow (self->flow_combiner, pad, flow_ret);
+    GST_OBJECT_UNLOCK (self);
+
     if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
             || flow_ret == GST_FLOW_NOT_LINKED)) {
       GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
@@ -399,9 +437,16 @@ gst_sctp_data_srcpad_loop (GstPad * pad)
 
     item->destroy (item);
   } else {
+    GST_OBJECT_LOCK (self);
+    gst_flow_combiner_update_pad_flow (self->flow_combiner, pad,
+        GST_FLOW_FLUSHING);
+    GST_OBJECT_UNLOCK (self);
+
     GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
     gst_pad_pause_task (pad);
   }
+
+  gst_object_unref (self);
 }
 
 static gboolean
@@ -523,6 +568,10 @@ get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id)
   if (!gst_element_add_pad (GST_ELEMENT (self), new_pad))
     goto error_cleanup;
 
+  GST_OBJECT_LOCK (self);
+  gst_flow_combiner_add_pad (self->flow_combiner, new_pad);
+  GST_OBJECT_UNLOCK (self);
+
   gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop,
       new_pad, NULL);
 
@@ -536,11 +585,14 @@ error_cleanup:
 }
 
 static void
-remove_pad (GstElement * element, GstPad * pad)
+remove_pad (GstSctpDec * self, GstPad * pad)
 {
   stop_srcpad_task (pad);
   gst_pad_set_active (pad, FALSE);
-  gst_element_remove_pad (element, pad);
+  gst_element_remove_pad (GST_ELEMENT (self), pad);
+  GST_OBJECT_LOCK (self);
+  gst_flow_combiner_remove_pad (self->flow_combiner, pad);
+  GST_OBJECT_UNLOCK (self);
 }
 
 static void
@@ -557,7 +609,7 @@ on_gst_sctp_association_stream_reset (GstSctpAssociation * gst_sctp_association,
     GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad");
     return;
   }
-  remove_pad (GST_ELEMENT (self), srcpad);
+  remove_pad (self, srcpad);
   gst_object_unref (srcpad);
 }
 
@@ -615,7 +667,7 @@ remove_pad_it (const GValue * item, gpointer user_data)
   GstPad *pad = g_value_get_object (item);
   GstSctpDec *self = user_data;
 
-  remove_pad (GST_ELEMENT (self), pad);
+  remove_pad (self, pad);
 }
 
 static void
index 845fac4..6a5591f 100644 (file)
@@ -27,6 +27,7 @@
 #define __GST_SCTP_DEC_H__
 
 #include <gst/gst.h>
+#include <gst/base/base.h>
 
 #include "sctpassociation.h"
 
@@ -44,6 +45,8 @@ struct _GstSctpDec
 {
   GstElement element;
 
+  GstFlowCombiner *flow_combiner;
+
   GstPad *sink_pad;
   guint sctp_association_id;
   guint local_sctp_port;