Merge branch 'tizen_gst_1.20.0' into tizen
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / ext / sctp / gstsctpdec.c
1 /*
2  * Copyright (c) 2015, Collabora Ltd.
3  *
4  * Redistribution and use in source and binary forms, with or without modification,
5  * are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice, this
11  * list of conditions and the following disclaimer in the documentation and/or other
12  * materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17  * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23  * OF SUCH DAMAGE.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 #include "gstsctpdec.h"
30
31 #include <gst/sctp/sctpreceivemeta.h>
32 #include <gst/base/gstdataqueue.h>
33
34 #include <stdio.h>
35 #include <stdlib.h>
36
37 GST_DEBUG_CATEGORY_STATIC (gst_sctp_dec_debug_category);
38 #define GST_CAT_DEFAULT gst_sctp_dec_debug_category
39
40 #define gst_sctp_dec_parent_class parent_class
41 G_DEFINE_TYPE (GstSctpDec, gst_sctp_dec, GST_TYPE_ELEMENT);
42 GST_ELEMENT_REGISTER_DEFINE (sctpdec, "sctpdec", GST_RANK_NONE,
43     GST_TYPE_SCTP_DEC);
44
45 static GstStaticPadTemplate sink_template =
46 GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK,
47     GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
48
49 static GstStaticPadTemplate src_template =
50 GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC,
51     GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY);
52
53 enum
54 {
55   SIGNAL_RESET_STREAM,
56   NUM_SIGNALS
57 };
58
59 static guint signals[NUM_SIGNALS];
60
61 enum
62 {
63   PROP_0,
64
65   PROP_GST_SCTP_ASSOCIATION_ID,
66   PROP_LOCAL_SCTP_PORT,
67
68   NUM_PROPERTIES
69 };
70
71 static GParamSpec *properties[NUM_PROPERTIES];
72
73 #define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
74 #define DEFAULT_LOCAL_SCTP_PORT 0
75 #define MAX_SCTP_PORT 65535
76 #define MAX_GST_SCTP_ASSOCIATION_ID 65535
77 #define MAX_STREAM_ID 65535
78
79 GType gst_sctp_dec_pad_get_type (void);
80
81 #define GST_TYPE_SCTP_DEC_PAD (gst_sctp_dec_pad_get_type())
82 #define GST_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPad))
83 #define GST_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPadClass))
84 #define GST_IS_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_DEC_PAD))
85 #define GST_IS_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_DEC_PAD))
86
87 typedef struct _GstSctpDecPad GstSctpDecPad;
88 typedef GstPadClass GstSctpDecPadClass;
89
90 struct _GstSctpDecPad
91 {
92   GstPad parent;
93
94   GstDataQueue *packet_queue;
95 };
96
97 G_DEFINE_TYPE (GstSctpDecPad, gst_sctp_dec_pad, GST_TYPE_PAD);
98
99 static void
100 gst_sctp_dec_pad_finalize (GObject * object)
101 {
102   GstSctpDecPad *self = GST_SCTP_DEC_PAD (object);
103
104   gst_object_unref (self->packet_queue);
105
106   G_OBJECT_CLASS (gst_sctp_dec_pad_parent_class)->finalize (object);
107 }
108
109 static gboolean
110 data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
111     guint64 time, gpointer user_data)
112 {
113   /* FIXME: Are we full at some point and block? */
114   return FALSE;
115 }
116
117 static void
118 data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
119 {
120 }
121
122 static void
123 data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
124 {
125 }
126
127 static void
128 gst_sctp_dec_pad_class_init (GstSctpDecPadClass * klass)
129 {
130   GObjectClass *gobject_class;
131
132   gobject_class = G_OBJECT_CLASS (klass);
133
134   gobject_class->finalize = gst_sctp_dec_pad_finalize;
135 }
136
137 static void
138 gst_sctp_dec_pad_init (GstSctpDecPad * self)
139 {
140   self->packet_queue = gst_data_queue_new (data_queue_check_full_cb,
141       data_queue_full_cb, data_queue_empty_cb, NULL);
142 }
143
144 static void gst_sctp_dec_set_property (GObject * object, guint prop_id,
145     const GValue * value, GParamSpec * pspec);
146 static void gst_sctp_dec_get_property (GObject * object, guint prop_id,
147     GValue * value, GParamSpec * pspec);
148 static void gst_sctp_dec_finalize (GObject * object);
149 static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element,
150     GstStateChange transition);
151 static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self,
152     GstBuffer * buf);
153 static gboolean gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self,
154     GstEvent * event);
155 static void gst_sctp_data_srcpad_loop (GstPad * pad);
156
157 static gboolean configure_association (GstSctpDec * self);
158 static void on_gst_sctp_association_stream_reset (GstSctpAssociation *
159     gst_sctp_association, guint16 stream_id, GstSctpDec * self);
160 static void on_receive (GstSctpAssociation * gst_sctp_association,
161     guint8 * buf, gsize length, guint16 stream_id, guint ppid,
162     gpointer user_data);
163 static void stop_srcpad_task (GstPad * pad);
164 static void stop_all_srcpad_tasks (GstSctpDec * self);
165 static void sctpdec_cleanup (GstSctpDec * self);
166 static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id);
167 static void remove_pad (GstSctpDec * self, GstPad * pad);
168 static void on_reset_stream (GstSctpDec * self, guint stream_id);
169
170 static void
171 gst_sctp_dec_class_init (GstSctpDecClass * klass)
172 {
173   GObjectClass *gobject_class;
174   GstElementClass *element_class;
175
176   gobject_class = G_OBJECT_CLASS (klass);
177   element_class = GST_ELEMENT_CLASS (klass);
178
179   GST_DEBUG_CATEGORY_INIT (gst_sctp_dec_debug_category,
180       "sctpdec", 0, "debug category for sctpdec element");
181
182   gst_element_class_add_pad_template (element_class,
183       gst_static_pad_template_get (&src_template));
184   gst_element_class_add_pad_template (element_class,
185       gst_static_pad_template_get (&sink_template));
186
187   gobject_class->set_property = gst_sctp_dec_set_property;
188   gobject_class->get_property = gst_sctp_dec_get_property;
189   gobject_class->finalize = gst_sctp_dec_finalize;
190
191   element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state);
192
193   klass->on_reset_stream = on_reset_stream;
194
195   properties[PROP_GST_SCTP_ASSOCIATION_ID] =
196       g_param_spec_uint ("sctp-association-id",
197       "SCTP Association ID",
198       "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
199       "This value must be set before any pads are requested.",
200       0, MAX_GST_SCTP_ASSOCIATION_ID, DEFAULT_GST_SCTP_ASSOCIATION_ID,
201       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
202
203   properties[PROP_LOCAL_SCTP_PORT] =
204       g_param_spec_uint ("local-sctp-port",
205       "Local SCTP port",
206       "Local sctp port for the sctp association. The remote port is configured via the "
207       "GstSctpEnc element.",
208       0, MAX_SCTP_PORT, DEFAULT_LOCAL_SCTP_PORT,
209       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
210
211   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
212
213   signals[SIGNAL_RESET_STREAM] = g_signal_new ("reset-stream",
214       G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
215       G_STRUCT_OFFSET (GstSctpDecClass, on_reset_stream), NULL, NULL,
216       NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
217
218   gst_element_class_set_static_metadata (element_class,
219       "SCTP Decoder",
220       "Decoder/Network/SCTP",
221       "Decodes packets with SCTP",
222       "George Kiagiadakis <george.kiagiadakis@collabora.com>");
223 }
224
225 static void
226 gst_sctp_dec_init (GstSctpDec * self)
227 {
228   self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
229   self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT;
230
231   self->flow_combiner = gst_flow_combiner_new ();
232
233   self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink");
234   gst_pad_set_chain_function (self->sink_pad,
235       GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain));
236   gst_pad_set_event_function (self->sink_pad,
237       GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_packet_event));
238
239   gst_element_add_pad (GST_ELEMENT (self), self->sink_pad);
240 }
241
242 static void
243 gst_sctp_dec_set_property (GObject * object, guint prop_id,
244     const GValue * value, GParamSpec * pspec)
245 {
246   GstSctpDec *self = GST_SCTP_DEC (object);
247
248   switch (prop_id) {
249     case PROP_GST_SCTP_ASSOCIATION_ID:
250       self->sctp_association_id = g_value_get_uint (value);
251       break;
252     case PROP_LOCAL_SCTP_PORT:
253       self->local_sctp_port = g_value_get_uint (value);
254       break;
255     default:
256       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
257       break;
258   }
259 }
260
261 static void
262 gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value,
263     GParamSpec * pspec)
264 {
265   GstSctpDec *self = GST_SCTP_DEC (object);
266
267   switch (prop_id) {
268     case PROP_GST_SCTP_ASSOCIATION_ID:
269       g_value_set_uint (value, self->sctp_association_id);
270       break;
271     case PROP_LOCAL_SCTP_PORT:
272       g_value_set_uint (value, self->local_sctp_port);
273       break;
274     default:
275       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
276       break;
277   }
278 }
279
280 static void
281 gst_sctp_dec_finalize (GObject * object)
282 {
283   GstSctpDec *self = GST_SCTP_DEC (object);
284
285   gst_flow_combiner_free (self->flow_combiner);
286   self->flow_combiner = NULL;
287
288   G_OBJECT_CLASS (parent_class)->finalize (object);
289 }
290
291 static GstStateChangeReturn
292 gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
293 {
294   GstSctpDec *self = GST_SCTP_DEC (element);
295   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
296
297   switch (transition) {
298     case GST_STATE_CHANGE_READY_TO_PAUSED:
299       gst_flow_combiner_reset (self->flow_combiner);
300       if (!configure_association (self))
301         ret = GST_STATE_CHANGE_FAILURE;
302       break;
303     case GST_STATE_CHANGE_PAUSED_TO_READY:
304       stop_all_srcpad_tasks (self);
305       break;
306     default:
307       break;
308   }
309
310   if (ret != GST_STATE_CHANGE_FAILURE)
311     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
312
313   switch (transition) {
314     case GST_STATE_CHANGE_PAUSED_TO_READY:
315       sctpdec_cleanup (self);
316       gst_flow_combiner_reset (self->flow_combiner);
317       break;
318     default:
319       break;
320   }
321
322   return ret;
323 }
324
325 static GstFlowReturn
326 gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
327 {
328   GstFlowReturn flow_ret;
329   GstMapInfo map;
330
331   GST_DEBUG_OBJECT (self, "Processing received buffer %" GST_PTR_FORMAT, buf);
332
333   if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
334     GST_ERROR_OBJECT (self, "Could not map GstBuffer");
335     gst_buffer_unref (buf);
336     return GST_FLOW_ERROR;
337   }
338
339   gst_sctp_association_incoming_packet (self->sctp_association,
340       (const guint8 *) map.data, (guint32) map.size);
341   gst_buffer_unmap (buf, &map);
342   gst_buffer_unref (buf);
343
344   GST_OBJECT_LOCK (self);
345   /* This gets the last combined flow return from all source pads */
346   flow_ret = gst_flow_combiner_update_flow (self->flow_combiner, GST_FLOW_OK);
347   GST_OBJECT_UNLOCK (self);
348
349   if (flow_ret != GST_FLOW_OK) {
350     GST_DEBUG_OBJECT (self, "Returning %s", gst_flow_get_name (flow_ret));
351   }
352
353   return flow_ret;
354 }
355
356 static void
357 flush_srcpad (const GValue * item, gpointer user_data)
358 {
359   GstSctpDecPad *sctpdec_pad = g_value_get_object (item);
360   gboolean flush = GPOINTER_TO_INT (user_data);
361
362   if (flush) {
363     gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
364     gst_data_queue_flush (sctpdec_pad->packet_queue);
365   } else {
366     gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
367     gst_pad_start_task (GST_PAD (sctpdec_pad),
368         (GstTaskFunction) gst_sctp_data_srcpad_loop, sctpdec_pad, NULL);
369   }
370 }
371
372 static gboolean
373 gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
374 {
375   switch (GST_EVENT_TYPE (event)) {
376     case GST_EVENT_STREAM_START:
377     case GST_EVENT_CAPS:
378       /* We create our own stream-start events and the caps event does not
379        * make sense */
380       gst_event_unref (event);
381       return TRUE;
382     case GST_EVENT_EOS:
383       /* Drop this, we're never EOS until shut down */
384       gst_event_unref (event);
385       return TRUE;
386     case GST_EVENT_FLUSH_START:{
387       GstIterator *it;
388
389       it = gst_element_iterate_src_pads (GST_ELEMENT (self));
390       while (gst_iterator_foreach (it, flush_srcpad,
391               GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
392         gst_iterator_resync (it);
393       gst_iterator_free (it);
394
395       return gst_pad_event_default (pad, GST_OBJECT (self), event);
396     }
397     case GST_EVENT_FLUSH_STOP:{
398       GstIterator *it;
399
400       it = gst_element_iterate_src_pads (GST_ELEMENT (self));
401       while (gst_iterator_foreach (it, flush_srcpad,
402               GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
403         gst_iterator_resync (it);
404       gst_iterator_free (it);
405
406       return gst_pad_event_default (pad, GST_OBJECT (self), event);
407     }
408     default:
409       return gst_pad_event_default (pad, GST_OBJECT (self), event);
410   }
411 }
412
413 static void
414 gst_sctp_data_srcpad_loop (GstPad * pad)
415 {
416   GstSctpDec *self;
417   GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
418   GstDataQueueItem *item;
419
420   self = GST_SCTP_DEC (gst_pad_get_parent (pad));
421
422   if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) {
423     GstBuffer *buffer;
424     GstFlowReturn flow_ret;
425
426     buffer = GST_BUFFER (item->object);
427     GST_DEBUG_OBJECT (pad, "Forwarding buffer %" GST_PTR_FORMAT, buffer);
428
429     flow_ret = gst_pad_push (pad, buffer);
430     item->object = NULL;
431
432     GST_OBJECT_LOCK (self);
433     gst_flow_combiner_update_pad_flow (self->flow_combiner, pad, flow_ret);
434     GST_OBJECT_UNLOCK (self);
435
436     if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
437             || flow_ret == GST_FLOW_NOT_LINKED) || flow_ret == GST_FLOW_EOS) {
438       GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
439           gst_flow_get_name (flow_ret));
440     } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
441       GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
442           gst_flow_get_name (flow_ret));
443     }
444
445     if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
446       GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
447       gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
448       gst_data_queue_flush (sctpdec_pad->packet_queue);
449       gst_pad_pause_task (pad);
450     }
451
452     item->destroy (item);
453   } else {
454     GST_OBJECT_LOCK (self);
455     gst_flow_combiner_update_pad_flow (self->flow_combiner, pad,
456         GST_FLOW_FLUSHING);
457     GST_OBJECT_UNLOCK (self);
458
459     GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
460     gst_pad_pause_task (pad);
461   }
462
463   gst_object_unref (self);
464 }
465
466 static gboolean
467 configure_association (GstSctpDec * self)
468 {
469   gint state;
470
471   self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
472
473   g_object_get (self->sctp_association, "state", &state, NULL);
474
475   if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
476     GST_WARNING_OBJECT (self,
477         "Could not configure SCTP association. Association already in use!");
478     g_object_unref (self->sctp_association);
479     self->sctp_association = NULL;
480     goto error;
481   }
482
483   self->signal_handler_stream_reset =
484       g_signal_connect_object (self->sctp_association, "stream-reset",
485       G_CALLBACK (on_gst_sctp_association_stream_reset), self, 0);
486
487   g_object_bind_property (self, "local-sctp-port", self->sctp_association,
488       "local-port", G_BINDING_SYNC_CREATE);
489
490   gst_sctp_association_set_on_packet_received (self->sctp_association,
491       on_receive, gst_object_ref (self), gst_object_unref);
492
493   return TRUE;
494 error:
495   return FALSE;
496 }
497
498 static gboolean
499 gst_sctp_dec_src_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
500 {
501   switch (GST_EVENT_TYPE (event)) {
502     case GST_EVENT_RECONFIGURE:
503     case GST_EVENT_FLUSH_STOP:{
504       GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
505
506       /* Unflush and start task again */
507       gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
508       gst_pad_start_task (pad, (GstTaskFunction) gst_sctp_data_srcpad_loop, pad,
509           NULL);
510
511       return gst_pad_event_default (pad, GST_OBJECT (self), event);
512     }
513     case GST_EVENT_FLUSH_START:{
514       GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
515
516       gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
517       gst_data_queue_flush (sctpdec_pad->packet_queue);
518
519       return gst_pad_event_default (pad, GST_OBJECT (self), event);
520     }
521     default:
522       return gst_pad_event_default (pad, GST_OBJECT (self), event);
523   }
524 }
525
526 static gboolean
527 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
528 {
529   GstPad *new_pad = user_data;
530
531   if (GST_EVENT_TYPE (*event) != GST_EVENT_CAPS
532       && GST_EVENT_TYPE (*event) != GST_EVENT_STREAM_START)
533     gst_pad_store_sticky_event (new_pad, *event);
534
535   return TRUE;
536 }
537
538 static GstPad *
539 get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id)
540 {
541   GstPad *new_pad = NULL;
542   gint state;
543   gchar *pad_name, *pad_stream_id;
544   GstPadTemplate *template;
545
546   pad_name = g_strdup_printf ("src_%hu", stream_id);
547   new_pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
548   if (new_pad) {
549     g_free (pad_name);
550     return new_pad;
551   }
552
553   g_object_get (self->sctp_association, "state", &state, NULL);
554
555   if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
556     GST_ERROR_OBJECT (self,
557         "The SCTP association must be established before a new stream can be created");
558     return NULL;
559   }
560
561   GST_DEBUG_OBJECT (self, "Creating new pad for stream id %u", stream_id);
562
563   if (stream_id > MAX_STREAM_ID)
564     return NULL;
565
566   template = gst_static_pad_template_get (&src_template);
567   new_pad = g_object_new (GST_TYPE_SCTP_DEC_PAD, "name", pad_name,
568       "direction", template->direction, "template", template, NULL);
569   g_free (pad_name);
570   gst_clear_object (&template);
571
572   gst_pad_set_event_function (new_pad,
573       GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_src_event));
574
575   if (!gst_pad_set_active (new_pad, TRUE))
576     goto error_cleanup;
577
578   pad_stream_id =
579       gst_pad_create_stream_id_printf (new_pad, GST_ELEMENT (self), "%hu",
580       stream_id);
581   gst_pad_push_event (new_pad, gst_event_new_stream_start (pad_stream_id));
582   g_free (pad_stream_id);
583   gst_pad_sticky_events_foreach (self->sink_pad, copy_sticky_events, new_pad);
584
585   if (!gst_element_add_pad (GST_ELEMENT (self), new_pad))
586     goto error_add;
587
588   GST_OBJECT_LOCK (self);
589   gst_flow_combiner_add_pad (self->flow_combiner, new_pad);
590   GST_OBJECT_UNLOCK (self);
591
592   gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop,
593       new_pad, NULL);
594
595   gst_object_ref (new_pad);
596
597   return new_pad;
598 error_add:
599   gst_pad_set_active (new_pad, FALSE);
600 error_cleanup:
601   gst_object_unref (new_pad);
602   return NULL;
603 }
604
605 static void
606 remove_pad (GstSctpDec * self, GstPad * pad)
607 {
608   stop_srcpad_task (pad);
609   GST_PAD_STREAM_LOCK (pad);
610   gst_pad_set_active (pad, FALSE);
611   if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (self)))
612     gst_element_remove_pad (GST_ELEMENT (self), pad);
613   GST_PAD_STREAM_UNLOCK (pad);
614   GST_OBJECT_LOCK (self);
615   gst_flow_combiner_remove_pad (self->flow_combiner, pad);
616   GST_OBJECT_UNLOCK (self);
617 }
618
619 static void
620 on_gst_sctp_association_stream_reset (GstSctpAssociation * gst_sctp_association,
621     guint16 stream_id, GstSctpDec * self)
622 {
623   gchar *pad_name;
624   GstPad *srcpad;
625
626   GST_DEBUG_OBJECT (self, "Stream %u reset", stream_id);
627
628   pad_name = g_strdup_printf ("src_%hu", stream_id);
629   srcpad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
630   g_free (pad_name);
631   if (!srcpad) {
632     GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad");
633     return;
634   }
635   remove_pad (self, srcpad);
636   gst_object_unref (srcpad);
637 }
638
639 static void
640 data_queue_item_free (GstDataQueueItem * item)
641 {
642   if (item->object)
643     gst_mini_object_unref (item->object);
644   g_free (item);
645 }
646
647 static void
648 on_receive (GstSctpAssociation * sctp_association, guint8 * buf,
649     gsize length, guint16 stream_id, guint ppid, gpointer user_data)
650 {
651   GstSctpDec *self = user_data;
652   GstSctpDecPad *sctpdec_pad;
653   GstPad *src_pad;
654   GstDataQueueItem *item;
655   GstBuffer *gstbuf;
656
657   src_pad = get_pad_for_stream_id (self, stream_id);
658   g_assert (src_pad);
659
660   GST_DEBUG_OBJECT (src_pad,
661       "Received incoming packet of size %" G_GSIZE_FORMAT
662       " with stream id %u ppid %u", length, stream_id, ppid);
663
664   sctpdec_pad = GST_SCTP_DEC_PAD (src_pad);
665   gstbuf =
666       gst_buffer_new_wrapped_full (0, buf, length, 0, length, buf,
667       (GDestroyNotify) usrsctp_freedumpbuffer);
668   gst_sctp_buffer_add_receive_meta (gstbuf, ppid);
669
670   item = g_new0 (GstDataQueueItem, 1);
671   item->object = GST_MINI_OBJECT (gstbuf);
672   item->size = length;
673   item->visible = TRUE;
674   item->destroy = (GDestroyNotify) data_queue_item_free;
675   if (!gst_data_queue_push (sctpdec_pad->packet_queue, item)) {
676     item->destroy (item);
677     GST_DEBUG_OBJECT (src_pad, "Failed to push item because we're flushing");
678   }
679
680   gst_object_unref (src_pad);
681 }
682
683 static void
684 stop_srcpad_task (GstPad * pad)
685 {
686   GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
687
688   gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
689   gst_data_queue_flush (sctpdec_pad->packet_queue);
690   gst_pad_stop_task (pad);
691 }
692
693 static void
694 remove_pad_it (const GValue * item, gpointer user_data)
695 {
696   GstPad *pad = g_value_get_object (item);
697   GstSctpDec *self = user_data;
698
699   remove_pad (self, pad);
700 }
701
702 static void
703 stop_all_srcpad_tasks (GstSctpDec * self)
704 {
705   GstIterator *it;
706
707   it = gst_element_iterate_src_pads (GST_ELEMENT (self));
708   while (gst_iterator_foreach (it, remove_pad_it, self) == GST_ITERATOR_RESYNC)
709     gst_iterator_resync (it);
710   gst_iterator_free (it);
711 }
712
713 static void
714 sctpdec_cleanup (GstSctpDec * self)
715 {
716   if (self->sctp_association) {
717     gst_sctp_association_set_on_packet_received (self->sctp_association, NULL,
718         NULL, NULL);
719     g_signal_handler_disconnect (self->sctp_association,
720         self->signal_handler_stream_reset);
721     gst_sctp_association_force_close (self->sctp_association);
722     g_object_unref (self->sctp_association);
723     self->sctp_association = NULL;
724   }
725 }
726
727 static void
728 on_reset_stream (GstSctpDec * self, guint stream_id)
729 {
730   if (self->sctp_association) {
731     gst_sctp_association_reset_stream (self->sctp_association, stream_id);
732     on_gst_sctp_association_stream_reset (self->sctp_association, stream_id,
733         self);
734   }
735 }