2 * Copyright (c) 2015, Collabora Ltd.
4 * Redistribution and use in source and binary forms, with or without modification,
5 * are permitted provided that the following conditions are met:
7 * 1. Redistributions of source code must retain the above copyright notice, this
8 * list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice, this
11 * list of conditions and the following disclaimer in the documentation and/or other
12 * materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
29 #include "gstsctpdec.h"
31 #include <gst/sctp/sctpreceivemeta.h>
32 #include <gst/base/gstdataqueue.h>
37 GST_DEBUG_CATEGORY_STATIC (gst_sctp_dec_debug_category);
38 #define GST_CAT_DEFAULT gst_sctp_dec_debug_category
40 #define gst_sctp_dec_parent_class parent_class
41 G_DEFINE_TYPE (GstSctpDec, gst_sctp_dec, GST_TYPE_ELEMENT);
42 GST_ELEMENT_REGISTER_DEFINE (sctpdec, "sctpdec", GST_RANK_NONE,
45 static GstStaticPadTemplate sink_template =
46 GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK,
47 GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
49 static GstStaticPadTemplate src_template =
50 GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC,
51 GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY);
59 static guint signals[NUM_SIGNALS];
65 PROP_GST_SCTP_ASSOCIATION_ID,
71 static GParamSpec *properties[NUM_PROPERTIES];
73 #define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
74 #define DEFAULT_LOCAL_SCTP_PORT 0
75 #define MAX_SCTP_PORT 65535
76 #define MAX_GST_SCTP_ASSOCIATION_ID 65535
77 #define MAX_STREAM_ID 65535
79 GType gst_sctp_dec_pad_get_type (void);
81 #define GST_TYPE_SCTP_DEC_PAD (gst_sctp_dec_pad_get_type())
82 #define GST_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPad))
83 #define GST_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPadClass))
84 #define GST_IS_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_DEC_PAD))
85 #define GST_IS_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_DEC_PAD))
87 typedef struct _GstSctpDecPad GstSctpDecPad;
88 typedef GstPadClass GstSctpDecPadClass;
94 GstDataQueue *packet_queue;
97 G_DEFINE_TYPE (GstSctpDecPad, gst_sctp_dec_pad, GST_TYPE_PAD);
100 gst_sctp_dec_pad_finalize (GObject * object)
102 GstSctpDecPad *self = GST_SCTP_DEC_PAD (object);
104 gst_object_unref (self->packet_queue);
106 G_OBJECT_CLASS (gst_sctp_dec_pad_parent_class)->finalize (object);
110 data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
111 guint64 time, gpointer user_data)
113 /* FIXME: Are we full at some point and block? */
118 data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
123 data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
128 gst_sctp_dec_pad_class_init (GstSctpDecPadClass * klass)
130 GObjectClass *gobject_class;
132 gobject_class = G_OBJECT_CLASS (klass);
134 gobject_class->finalize = gst_sctp_dec_pad_finalize;
138 gst_sctp_dec_pad_init (GstSctpDecPad * self)
140 self->packet_queue = gst_data_queue_new (data_queue_check_full_cb,
141 data_queue_full_cb, data_queue_empty_cb, NULL);
144 static void gst_sctp_dec_set_property (GObject * object, guint prop_id,
145 const GValue * value, GParamSpec * pspec);
146 static void gst_sctp_dec_get_property (GObject * object, guint prop_id,
147 GValue * value, GParamSpec * pspec);
148 static void gst_sctp_dec_finalize (GObject * object);
149 static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element,
150 GstStateChange transition);
151 static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self,
153 static gboolean gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self,
155 static void gst_sctp_data_srcpad_loop (GstPad * pad);
157 static gboolean configure_association (GstSctpDec * self);
158 static void on_gst_sctp_association_stream_reset (GstSctpAssociation *
159 gst_sctp_association, guint16 stream_id, GstSctpDec * self);
160 static void on_receive (GstSctpAssociation * gst_sctp_association,
161 guint8 * buf, gsize length, guint16 stream_id, guint ppid,
163 static void stop_srcpad_task (GstPad * pad);
164 static void stop_all_srcpad_tasks (GstSctpDec * self);
165 static void sctpdec_cleanup (GstSctpDec * self);
166 static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id);
167 static void remove_pad (GstSctpDec * self, GstPad * pad);
168 static void on_reset_stream (GstSctpDec * self, guint stream_id);
171 gst_sctp_dec_class_init (GstSctpDecClass * klass)
173 GObjectClass *gobject_class;
174 GstElementClass *element_class;
176 gobject_class = G_OBJECT_CLASS (klass);
177 element_class = GST_ELEMENT_CLASS (klass);
179 GST_DEBUG_CATEGORY_INIT (gst_sctp_dec_debug_category,
180 "sctpdec", 0, "debug category for sctpdec element");
182 gst_element_class_add_pad_template (element_class,
183 gst_static_pad_template_get (&src_template));
184 gst_element_class_add_pad_template (element_class,
185 gst_static_pad_template_get (&sink_template));
187 gobject_class->set_property = gst_sctp_dec_set_property;
188 gobject_class->get_property = gst_sctp_dec_get_property;
189 gobject_class->finalize = gst_sctp_dec_finalize;
191 element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state);
193 klass->on_reset_stream = on_reset_stream;
195 properties[PROP_GST_SCTP_ASSOCIATION_ID] =
196 g_param_spec_uint ("sctp-association-id",
197 "SCTP Association ID",
198 "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
199 "This value must be set before any pads are requested.",
200 0, MAX_GST_SCTP_ASSOCIATION_ID, DEFAULT_GST_SCTP_ASSOCIATION_ID,
201 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
203 properties[PROP_LOCAL_SCTP_PORT] =
204 g_param_spec_uint ("local-sctp-port",
206 "Local sctp port for the sctp association. The remote port is configured via the "
207 "GstSctpEnc element.",
208 0, MAX_SCTP_PORT, DEFAULT_LOCAL_SCTP_PORT,
209 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
211 g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
213 signals[SIGNAL_RESET_STREAM] = g_signal_new ("reset-stream",
214 G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
215 G_STRUCT_OFFSET (GstSctpDecClass, on_reset_stream), NULL, NULL,
216 NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
218 gst_element_class_set_static_metadata (element_class,
220 "Decoder/Network/SCTP",
221 "Decodes packets with SCTP",
222 "George Kiagiadakis <george.kiagiadakis@collabora.com>");
226 gst_sctp_dec_init (GstSctpDec * self)
228 self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
229 self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT;
231 self->flow_combiner = gst_flow_combiner_new ();
233 self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink");
234 gst_pad_set_chain_function (self->sink_pad,
235 GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain));
236 gst_pad_set_event_function (self->sink_pad,
237 GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_packet_event));
239 gst_element_add_pad (GST_ELEMENT (self), self->sink_pad);
243 gst_sctp_dec_set_property (GObject * object, guint prop_id,
244 const GValue * value, GParamSpec * pspec)
246 GstSctpDec *self = GST_SCTP_DEC (object);
249 case PROP_GST_SCTP_ASSOCIATION_ID:
250 self->sctp_association_id = g_value_get_uint (value);
252 case PROP_LOCAL_SCTP_PORT:
253 self->local_sctp_port = g_value_get_uint (value);
256 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
262 gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value,
265 GstSctpDec *self = GST_SCTP_DEC (object);
268 case PROP_GST_SCTP_ASSOCIATION_ID:
269 g_value_set_uint (value, self->sctp_association_id);
271 case PROP_LOCAL_SCTP_PORT:
272 g_value_set_uint (value, self->local_sctp_port);
275 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
281 gst_sctp_dec_finalize (GObject * object)
283 GstSctpDec *self = GST_SCTP_DEC (object);
285 gst_flow_combiner_free (self->flow_combiner);
286 self->flow_combiner = NULL;
288 G_OBJECT_CLASS (parent_class)->finalize (object);
291 static GstStateChangeReturn
292 gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
294 GstSctpDec *self = GST_SCTP_DEC (element);
295 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
297 switch (transition) {
298 case GST_STATE_CHANGE_READY_TO_PAUSED:
299 gst_flow_combiner_reset (self->flow_combiner);
300 if (!configure_association (self))
301 ret = GST_STATE_CHANGE_FAILURE;
303 case GST_STATE_CHANGE_PAUSED_TO_READY:
304 stop_all_srcpad_tasks (self);
310 if (ret != GST_STATE_CHANGE_FAILURE)
311 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
313 switch (transition) {
314 case GST_STATE_CHANGE_PAUSED_TO_READY:
315 sctpdec_cleanup (self);
316 gst_flow_combiner_reset (self->flow_combiner);
326 gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
328 GstFlowReturn flow_ret;
331 GST_DEBUG_OBJECT (self, "Processing received buffer %" GST_PTR_FORMAT, buf);
333 if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
334 GST_ERROR_OBJECT (self, "Could not map GstBuffer");
335 gst_buffer_unref (buf);
336 return GST_FLOW_ERROR;
339 gst_sctp_association_incoming_packet (self->sctp_association,
340 (const guint8 *) map.data, (guint32) map.size);
341 gst_buffer_unmap (buf, &map);
342 gst_buffer_unref (buf);
344 GST_OBJECT_LOCK (self);
345 /* This gets the last combined flow return from all source pads */
346 flow_ret = gst_flow_combiner_update_flow (self->flow_combiner, GST_FLOW_OK);
347 GST_OBJECT_UNLOCK (self);
349 if (flow_ret != GST_FLOW_OK) {
350 GST_DEBUG_OBJECT (self, "Returning %s", gst_flow_get_name (flow_ret));
357 flush_srcpad (const GValue * item, gpointer user_data)
359 GstSctpDecPad *sctpdec_pad = g_value_get_object (item);
360 gboolean flush = GPOINTER_TO_INT (user_data);
363 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
364 gst_data_queue_flush (sctpdec_pad->packet_queue);
366 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
367 gst_pad_start_task (GST_PAD (sctpdec_pad),
368 (GstTaskFunction) gst_sctp_data_srcpad_loop, sctpdec_pad, NULL);
373 gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
375 switch (GST_EVENT_TYPE (event)) {
376 case GST_EVENT_STREAM_START:
378 /* We create our own stream-start events and the caps event does not
380 gst_event_unref (event);
383 /* Drop this, we're never EOS until shut down */
384 gst_event_unref (event);
386 case GST_EVENT_FLUSH_START:{
389 it = gst_element_iterate_src_pads (GST_ELEMENT (self));
390 while (gst_iterator_foreach (it, flush_srcpad,
391 GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
392 gst_iterator_resync (it);
393 gst_iterator_free (it);
395 return gst_pad_event_default (pad, GST_OBJECT (self), event);
397 case GST_EVENT_FLUSH_STOP:{
400 it = gst_element_iterate_src_pads (GST_ELEMENT (self));
401 while (gst_iterator_foreach (it, flush_srcpad,
402 GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
403 gst_iterator_resync (it);
404 gst_iterator_free (it);
406 return gst_pad_event_default (pad, GST_OBJECT (self), event);
409 return gst_pad_event_default (pad, GST_OBJECT (self), event);
414 gst_sctp_data_srcpad_loop (GstPad * pad)
417 GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
418 GstDataQueueItem *item;
420 self = GST_SCTP_DEC (gst_pad_get_parent (pad));
422 if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) {
424 GstFlowReturn flow_ret;
426 buffer = GST_BUFFER (item->object);
427 GST_DEBUG_OBJECT (pad, "Forwarding buffer %" GST_PTR_FORMAT, buffer);
429 flow_ret = gst_pad_push (pad, buffer);
432 GST_OBJECT_LOCK (self);
433 gst_flow_combiner_update_pad_flow (self->flow_combiner, pad, flow_ret);
434 GST_OBJECT_UNLOCK (self);
436 if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
437 || flow_ret == GST_FLOW_NOT_LINKED) || flow_ret == GST_FLOW_EOS) {
438 GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
439 gst_flow_get_name (flow_ret));
440 } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
441 GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
442 gst_flow_get_name (flow_ret));
445 if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
446 GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
447 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
448 gst_data_queue_flush (sctpdec_pad->packet_queue);
449 gst_pad_pause_task (pad);
452 item->destroy (item);
454 GST_OBJECT_LOCK (self);
455 gst_flow_combiner_update_pad_flow (self->flow_combiner, pad,
457 GST_OBJECT_UNLOCK (self);
459 GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
460 gst_pad_pause_task (pad);
463 gst_object_unref (self);
467 configure_association (GstSctpDec * self)
471 self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
473 g_object_get (self->sctp_association, "state", &state, NULL);
475 if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
476 GST_WARNING_OBJECT (self,
477 "Could not configure SCTP association. Association already in use!");
478 g_object_unref (self->sctp_association);
479 self->sctp_association = NULL;
483 self->signal_handler_stream_reset =
484 g_signal_connect_object (self->sctp_association, "stream-reset",
485 G_CALLBACK (on_gst_sctp_association_stream_reset), self, 0);
487 g_object_bind_property (self, "local-sctp-port", self->sctp_association,
488 "local-port", G_BINDING_SYNC_CREATE);
490 gst_sctp_association_set_on_packet_received (self->sctp_association,
491 on_receive, gst_object_ref (self), gst_object_unref);
499 gst_sctp_dec_src_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
501 switch (GST_EVENT_TYPE (event)) {
502 case GST_EVENT_RECONFIGURE:
503 case GST_EVENT_FLUSH_STOP:{
504 GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
506 /* Unflush and start task again */
507 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
508 gst_pad_start_task (pad, (GstTaskFunction) gst_sctp_data_srcpad_loop, pad,
511 return gst_pad_event_default (pad, GST_OBJECT (self), event);
513 case GST_EVENT_FLUSH_START:{
514 GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
516 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
517 gst_data_queue_flush (sctpdec_pad->packet_queue);
519 return gst_pad_event_default (pad, GST_OBJECT (self), event);
522 return gst_pad_event_default (pad, GST_OBJECT (self), event);
527 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
529 GstPad *new_pad = user_data;
531 if (GST_EVENT_TYPE (*event) != GST_EVENT_CAPS
532 && GST_EVENT_TYPE (*event) != GST_EVENT_STREAM_START)
533 gst_pad_store_sticky_event (new_pad, *event);
539 get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id)
541 GstPad *new_pad = NULL;
543 gchar *pad_name, *pad_stream_id;
544 GstPadTemplate *template;
546 pad_name = g_strdup_printf ("src_%hu", stream_id);
547 new_pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
553 g_object_get (self->sctp_association, "state", &state, NULL);
555 if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
556 GST_ERROR_OBJECT (self,
557 "The SCTP association must be established before a new stream can be created");
561 GST_DEBUG_OBJECT (self, "Creating new pad for stream id %u", stream_id);
563 if (stream_id > MAX_STREAM_ID)
566 template = gst_static_pad_template_get (&src_template);
567 new_pad = g_object_new (GST_TYPE_SCTP_DEC_PAD, "name", pad_name,
568 "direction", template->direction, "template", template, NULL);
570 gst_clear_object (&template);
572 gst_pad_set_event_function (new_pad,
573 GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_src_event));
575 if (!gst_pad_set_active (new_pad, TRUE))
579 gst_pad_create_stream_id_printf (new_pad, GST_ELEMENT (self), "%hu",
581 gst_pad_push_event (new_pad, gst_event_new_stream_start (pad_stream_id));
582 g_free (pad_stream_id);
583 gst_pad_sticky_events_foreach (self->sink_pad, copy_sticky_events, new_pad);
585 if (!gst_element_add_pad (GST_ELEMENT (self), new_pad))
588 GST_OBJECT_LOCK (self);
589 gst_flow_combiner_add_pad (self->flow_combiner, new_pad);
590 GST_OBJECT_UNLOCK (self);
592 gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop,
595 gst_object_ref (new_pad);
599 gst_pad_set_active (new_pad, FALSE);
601 gst_object_unref (new_pad);
606 remove_pad (GstSctpDec * self, GstPad * pad)
608 stop_srcpad_task (pad);
609 GST_PAD_STREAM_LOCK (pad);
610 gst_pad_set_active (pad, FALSE);
611 if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (self)))
612 gst_element_remove_pad (GST_ELEMENT (self), pad);
613 GST_PAD_STREAM_UNLOCK (pad);
614 GST_OBJECT_LOCK (self);
615 gst_flow_combiner_remove_pad (self->flow_combiner, pad);
616 GST_OBJECT_UNLOCK (self);
620 on_gst_sctp_association_stream_reset (GstSctpAssociation * gst_sctp_association,
621 guint16 stream_id, GstSctpDec * self)
626 GST_DEBUG_OBJECT (self, "Stream %u reset", stream_id);
628 pad_name = g_strdup_printf ("src_%hu", stream_id);
629 srcpad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
632 /* This can happen if a stream is created but the peer never sends any data.
633 * We still need to signal the reset by removing the relevant pad. To do
634 * that, we need to add the relevant pad first. */
635 srcpad = get_pad_for_stream_id (self, stream_id);
637 GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad");
641 remove_pad (self, srcpad);
642 gst_object_unref (srcpad);
646 data_queue_item_free (GstDataQueueItem * item)
649 gst_mini_object_unref (item->object);
654 on_receive (GstSctpAssociation * sctp_association, guint8 * buf,
655 gsize length, guint16 stream_id, guint ppid, gpointer user_data)
657 GstSctpDec *self = user_data;
658 GstSctpDecPad *sctpdec_pad;
660 GstDataQueueItem *item;
663 src_pad = get_pad_for_stream_id (self, stream_id);
666 GST_DEBUG_OBJECT (src_pad,
667 "Received incoming packet of size %" G_GSIZE_FORMAT
668 " with stream id %u ppid %u", length, stream_id, ppid);
670 sctpdec_pad = GST_SCTP_DEC_PAD (src_pad);
672 gst_buffer_new_wrapped_full (0, buf, length, 0, length, buf,
673 (GDestroyNotify) usrsctp_freedumpbuffer);
674 gst_sctp_buffer_add_receive_meta (gstbuf, ppid);
676 item = g_new0 (GstDataQueueItem, 1);
677 item->object = GST_MINI_OBJECT (gstbuf);
679 item->visible = TRUE;
680 item->destroy = (GDestroyNotify) data_queue_item_free;
681 if (!gst_data_queue_push (sctpdec_pad->packet_queue, item)) {
682 item->destroy (item);
683 GST_DEBUG_OBJECT (src_pad, "Failed to push item because we're flushing");
686 gst_object_unref (src_pad);
690 stop_srcpad_task (GstPad * pad)
692 GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
694 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
695 gst_data_queue_flush (sctpdec_pad->packet_queue);
696 gst_pad_stop_task (pad);
700 remove_pad_it (const GValue * item, gpointer user_data)
702 GstPad *pad = g_value_get_object (item);
703 GstSctpDec *self = user_data;
705 remove_pad (self, pad);
709 stop_all_srcpad_tasks (GstSctpDec * self)
713 it = gst_element_iterate_src_pads (GST_ELEMENT (self));
714 while (gst_iterator_foreach (it, remove_pad_it, self) == GST_ITERATOR_RESYNC)
715 gst_iterator_resync (it);
716 gst_iterator_free (it);
720 sctpdec_cleanup (GstSctpDec * self)
722 if (self->sctp_association) {
723 gst_sctp_association_set_on_packet_received (self->sctp_association, NULL,
725 g_signal_handler_disconnect (self->sctp_association,
726 self->signal_handler_stream_reset);
727 gst_sctp_association_force_close (self->sctp_association);
728 g_object_unref (self->sctp_association);
729 self->sctp_association = NULL;
734 on_reset_stream (GstSctpDec * self, guint stream_id)
736 if (self->sctp_association) {
737 gst_sctp_association_reset_stream (self->sctp_association, stream_id);
738 on_gst_sctp_association_stream_reset (self->sctp_association, stream_id,