const GValue * value, GParamSpec * pspec);
static void gst_sctp_dec_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
+static void gst_sctp_dec_finalize (GObject * object);
static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element,
GstStateChange transition);
static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self,
static void stop_all_srcpad_tasks (GstSctpDec * self);
static void sctpdec_cleanup (GstSctpDec * self);
static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id);
-static void remove_pad (GstElement * element, GstPad * pad);
+static void remove_pad (GstSctpDec * self, GstPad * pad);
static void on_reset_stream (GstSctpDec * self, guint stream_id);
static void
gobject_class->set_property = gst_sctp_dec_set_property;
gobject_class->get_property = gst_sctp_dec_get_property;
+ gobject_class->finalize = gst_sctp_dec_finalize;
element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state);
self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT;
+ self->flow_combiner = gst_flow_combiner_new ();
+
self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink");
gst_pad_set_chain_function (self->sink_pad,
GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain));
}
}
+static void
+gst_sctp_dec_finalize (GObject * object)
+{
+ GstSctpDec *self = GST_SCTP_DEC (object);
+
+ gst_flow_combiner_free (self->flow_combiner);
+ self->flow_combiner = NULL;
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
static GstStateChangeReturn
gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
{
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
+ gst_flow_combiner_reset (self->flow_combiner);
if (!configure_association (self))
ret = GST_STATE_CHANGE_FAILURE;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
sctpdec_cleanup (self);
+ gst_flow_combiner_reset (self->flow_combiner);
break;
default:
break;
static GstFlowReturn
gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
{
+ GstFlowReturn flow_ret;
GstMapInfo map;
if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
gst_buffer_unmap (buf, &map);
gst_buffer_unref (buf);
- return GST_FLOW_OK;
+ GST_OBJECT_LOCK (self);
+ /* This gets the last combined flow return from all source pads */
+ flow_ret = gst_flow_combiner_update_flow (self->flow_combiner, GST_FLOW_OK);
+ GST_OBJECT_UNLOCK (self);
+
+ if (flow_ret != GST_FLOW_OK) {
+ GST_DEBUG_OBJECT (self, "Returning %s", gst_flow_get_name (flow_ret));
+ }
+
+ return flow_ret;
}
static void
static void
gst_sctp_data_srcpad_loop (GstPad * pad)
{
+ GstSctpDec *self;
GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
GstDataQueueItem *item;
+ self = GST_SCTP_DEC (gst_pad_get_parent (pad));
+
if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) {
+ GstBuffer *buffer;
GstFlowReturn flow_ret;
- flow_ret = gst_pad_push (pad, GST_BUFFER (item->object));
+ buffer = GST_BUFFER (item->object);
+
+ flow_ret = gst_pad_push (pad, buffer);
item->object = NULL;
+
+ GST_OBJECT_LOCK (self);
+ gst_flow_combiner_update_pad_flow (self->flow_combiner, pad, flow_ret);
+ GST_OBJECT_UNLOCK (self);
+
if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
|| flow_ret == GST_FLOW_NOT_LINKED)) {
GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
item->destroy (item);
} else {
+ GST_OBJECT_LOCK (self);
+ gst_flow_combiner_update_pad_flow (self->flow_combiner, pad,
+ GST_FLOW_FLUSHING);
+ GST_OBJECT_UNLOCK (self);
+
GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
gst_pad_pause_task (pad);
}
+
+ gst_object_unref (self);
}
static gboolean
if (!gst_element_add_pad (GST_ELEMENT (self), new_pad))
goto error_cleanup;
+ GST_OBJECT_LOCK (self);
+ gst_flow_combiner_add_pad (self->flow_combiner, new_pad);
+ GST_OBJECT_UNLOCK (self);
+
gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop,
new_pad, NULL);
}
static void
-remove_pad (GstElement * element, GstPad * pad)
+remove_pad (GstSctpDec * self, GstPad * pad)
{
stop_srcpad_task (pad);
gst_pad_set_active (pad, FALSE);
- gst_element_remove_pad (element, pad);
+ gst_element_remove_pad (GST_ELEMENT (self), pad);
+ GST_OBJECT_LOCK (self);
+ gst_flow_combiner_remove_pad (self->flow_combiner, pad);
+ GST_OBJECT_UNLOCK (self);
}
static void
GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad");
return;
}
- remove_pad (GST_ELEMENT (self), srcpad);
+ remove_pad (self, srcpad);
gst_object_unref (srcpad);
}
GstPad *pad = g_value_get_object (item);
GstSctpDec *self = user_data;
- remove_pad (GST_ELEMENT (self), pad);
+ remove_pad (self, pad);
}
static void