sctpenc: Use g_signal_emit() instead of g_signal_emit_by_name()
[platform/upstream/gstreamer.git] / ext / sctp / gstsctpenc.c
1 /*
2  * Copyright (c) 2015, Collabora Ltd.
3  *
4  * Redistribution and use in source and binary forms, with or without modification,
5  * are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice, this
11  * list of conditions and the following disclaimer in the documentation and/or other
12  * materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17  * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23  * OF SUCH DAMAGE.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 #include "gstsctpenc.h"
30
31 #include <gst/sctp/sctpsendmeta.h>
32 #include <stdio.h>
33
34 GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category);
35 #define GST_CAT_DEFAULT gst_sctp_enc_debug_category
36
37 #define gst_sctp_enc_parent_class parent_class
38 G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT);
39
40 static GstStaticPadTemplate sink_template =
41 GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK,
42     GST_PAD_REQUEST, GST_STATIC_CAPS_ANY);
43
44 static GstStaticPadTemplate src_template =
45 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
46     GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
47
48 enum
49 {
50   SIGNAL_SCTP_ASSOCIATION_ESTABLISHED,
51   SIGNAL_GET_STREAM_BYTES_SENT,
52   NUM_SIGNALS
53 };
54
55 static guint signals[NUM_SIGNALS];
56
57 enum
58 {
59   PROP_0,
60
61   PROP_GST_SCTP_ASSOCIATION_ID,
62   PROP_REMOTE_SCTP_PORT,
63   PROP_USE_SOCK_STREAM,
64
65   NUM_PROPERTIES
66 };
67
68 static GParamSpec *properties[NUM_PROPERTIES];
69
70 #define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
71 #define DEFAULT_REMOTE_SCTP_PORT 0
72 #define DEFAULT_GST_SCTP_ORDERED TRUE
73 #define DEFAULT_SCTP_PPID 1
74 #define DEFAULT_USE_SOCK_STREAM FALSE
75
76 #define BUFFER_FULL_SLEEP_TIME 100000
77
78 GType gst_sctp_enc_pad_get_type (void);
79
80 #define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type())
81 #define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad))
82 #define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass))
83 #define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD))
84 #define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD))
85
86 typedef struct _GstSctpEncPad GstSctpEncPad;
87 typedef GstPadClass GstSctpEncPadClass;
88
89 struct _GstSctpEncPad
90 {
91   GstPad parent;
92
93   guint16 stream_id;
94   gboolean ordered;
95   guint32 ppid;
96   GstSctpAssociationPartialReliability reliability;
97   guint32 reliability_param;
98
99   guint64 bytes_sent;
100
101   GMutex lock;
102   GCond cond;
103   gboolean flushing;
104 };
105
106 G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
107
108 static void
109 gst_sctp_enc_pad_finalize (GObject * object)
110 {
111   GstSctpEncPad *self = GST_SCTP_ENC_PAD (object);
112
113   g_cond_clear (&self->cond);
114   g_mutex_clear (&self->lock);
115
116   G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object);
117 }
118
119 static void
120 gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass)
121 {
122   GObjectClass *gobject_class = (GObjectClass *) klass;
123
124   gobject_class->finalize = gst_sctp_enc_pad_finalize;
125 }
126
127 static void
128 gst_sctp_enc_pad_init (GstSctpEncPad * self)
129 {
130   g_mutex_init (&self->lock);
131   g_cond_init (&self->cond);
132   self->flushing = FALSE;
133 }
134
135 static void gst_sctp_enc_finalize (GObject * object);
136 static void gst_sctp_enc_set_property (GObject * object, guint prop_id,
137     const GValue * value, GParamSpec * pspec);
138 static void gst_sctp_enc_get_property (GObject * object, guint prop_id,
139     GValue * value, GParamSpec * pspec);
140 static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element,
141     GstStateChange transition);
142 static GstPad *gst_sctp_enc_request_new_pad (GstElement * element,
143     GstPadTemplate * template, const gchar * name, const GstCaps * caps);
144 static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad);
145 static void gst_sctp_enc_srcpad_loop (GstPad * pad);
146 static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent,
147     GstBuffer * buffer);
148 static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent,
149     GstEvent * event);
150 static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent,
151     GstEvent * event);
152 static void on_sctp_association_state_changed (GstSctpAssociation *
153     sctp_association, GParamSpec * pspec, GstSctpEnc * self);
154
155 static gboolean configure_association (GstSctpEnc * self);
156 static void on_sctp_packet_out (GstSctpAssociation * sctp_association,
157     const guint8 * buf, gsize length, gpointer user_data);
158 static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self);
159 static void sctpenc_cleanup (GstSctpEnc * self);
160 static void get_config_from_caps (const GstCaps * caps, gboolean * ordered,
161     GstSctpAssociationPartialReliability * reliability,
162     guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available);
163 static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id);
164
165 static void
166 gst_sctp_enc_class_init (GstSctpEncClass * klass)
167 {
168   GObjectClass *gobject_class;
169   GstElementClass *element_class;
170
171   gobject_class = (GObjectClass *) klass;
172   element_class = (GstElementClass *) klass;
173
174   GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category,
175       "sctpenc", 0, "debug category for sctpenc element");
176
177   gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
178       gst_static_pad_template_get (&src_template));
179   gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
180       gst_static_pad_template_get (&sink_template));
181
182   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize);
183   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property);
184   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property);
185
186   element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state);
187   element_class->request_new_pad =
188       GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad);
189   element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad);
190
191   properties[PROP_GST_SCTP_ASSOCIATION_ID] =
192       g_param_spec_uint ("sctp-association-id",
193       "SCTP Association ID",
194       "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
195       "This value must be set before any pads are requested.",
196       0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID,
197       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
198
199   properties[PROP_REMOTE_SCTP_PORT] =
200       g_param_spec_uint ("remote-sctp-port",
201       "Remote SCTP port",
202       "Sctp remote sctp port for the sctp association. The local port is configured via the "
203       "GstSctpDec element.",
204       0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT,
205       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
206
207   properties[PROP_USE_SOCK_STREAM] =
208       g_param_spec_boolean ("use-sock-stream",
209       "Use sock-stream",
210       "When set to TRUE, a sequenced, reliable, connection-based connection is used."
211       "When TRUE the partial reliability parameters of the channel are ignored.",
212       DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
213
214   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
215
216   signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] =
217       g_signal_new ("sctp-association-established",
218       G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST,
219       G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established),
220       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
221
222   signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent",
223       G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
224       G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL,
225       NULL, G_TYPE_UINT64, 1, G_TYPE_UINT);
226
227   klass->on_get_stream_bytes_sent =
228       GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent);
229
230   gst_element_class_set_static_metadata (element_class,
231       "SCTP Encoder",
232       "Encoder/Network/SCTP",
233       "Encodes packets with SCTP",
234       "George Kiagiadakis <george.kiagiadakis@collabora.com>");
235 }
236
237 static gboolean
238 data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
239     guint64 time, gpointer user_data)
240 {
241   /* TODO: When are we considered full? */
242   return FALSE;
243 }
244
245 static void
246 data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
247 {
248 }
249
250 static void
251 data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
252 {
253 }
254
255 static void
256 gst_sctp_enc_init (GstSctpEnc * self)
257 {
258   self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
259   self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT;
260
261   self->sctp_association = NULL;
262   self->outbound_sctp_packet_queue =
263       gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb,
264       data_queue_empty_cb, NULL);
265
266   self->src_pad = gst_pad_new_from_static_template (&src_template, "src");
267   gst_pad_set_event_function (self->src_pad,
268       GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event));
269   gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
270
271   g_queue_init (&self->pending_pads);
272   self->src_ret = GST_FLOW_FLUSHING;
273 }
274
275 static void
276 gst_sctp_enc_finalize (GObject * object)
277 {
278   GstSctpEnc *self = GST_SCTP_ENC (object);
279
280   g_queue_clear (&self->pending_pads);
281   gst_object_unref (self->outbound_sctp_packet_queue);
282
283   G_OBJECT_CLASS (parent_class)->finalize (object);
284 }
285
286 static void
287 gst_sctp_enc_set_property (GObject * object, guint prop_id,
288     const GValue * value, GParamSpec * pspec)
289 {
290   GstSctpEnc *self = GST_SCTP_ENC (object);
291
292   switch (prop_id) {
293     case PROP_GST_SCTP_ASSOCIATION_ID:
294       self->sctp_association_id = g_value_get_uint (value);
295       break;
296     case PROP_REMOTE_SCTP_PORT:
297       self->remote_sctp_port = g_value_get_uint (value);
298       break;
299     case PROP_USE_SOCK_STREAM:
300       self->use_sock_stream = g_value_get_boolean (value);
301       break;
302     default:
303       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
304       break;
305   }
306 }
307
308 static void
309 gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value,
310     GParamSpec * pspec)
311 {
312   GstSctpEnc *self = GST_SCTP_ENC (object);
313
314   switch (prop_id) {
315     case PROP_GST_SCTP_ASSOCIATION_ID:
316       g_value_set_uint (value, self->sctp_association_id);
317       break;
318     case PROP_REMOTE_SCTP_PORT:
319       g_value_set_uint (value, self->remote_sctp_port);
320       break;
321     case PROP_USE_SOCK_STREAM:
322       g_value_set_boolean (value, self->use_sock_stream);
323       break;
324     default:
325       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
326       break;
327   }
328 }
329
330 static GstStateChangeReturn
331 gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
332 {
333   GstSctpEnc *self = GST_SCTP_ENC (element);
334   GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
335   gboolean res = TRUE;
336
337   switch (transition) {
338     case GST_STATE_CHANGE_NULL_TO_READY:
339       break;
340     case GST_STATE_CHANGE_READY_TO_PAUSED:
341       self->need_segment = self->need_stream_start_caps = TRUE;
342       self->src_ret = GST_FLOW_OK;
343       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
344       res = configure_association (self);
345       break;
346     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
347       break;
348     case GST_STATE_CHANGE_PAUSED_TO_READY:
349       sctpenc_cleanup (self);
350       self->src_ret = GST_FLOW_FLUSHING;
351       break;
352     case GST_STATE_CHANGE_READY_TO_NULL:
353       break;
354     default:
355       break;
356   }
357
358   if (res)
359     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
360
361   switch (transition) {
362     case GST_STATE_CHANGE_NULL_TO_READY:
363       break;
364     case GST_STATE_CHANGE_READY_TO_PAUSED:
365       gst_pad_start_task (self->src_pad,
366           (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
367       break;
368     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
369       break;
370     case GST_STATE_CHANGE_PAUSED_TO_READY:
371       break;
372     case GST_STATE_CHANGE_READY_TO_NULL:
373       break;
374     default:
375       break;
376   }
377
378   return ret;
379 }
380
381 static GstPad *
382 gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template,
383     const gchar * new_pad_name, const GstCaps * caps)
384 {
385   GstSctpEnc *self = GST_SCTP_ENC (element);
386   GstPad *new_pad = NULL;
387   GstSctpEncPad *sctpenc_pad;
388   guint32 stream_id;
389   gint state;
390   guint32 new_ppid;
391   gboolean is_new_ppid;
392
393   g_object_get (self->sctp_association, "state", &state, NULL);
394
395   if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
396     g_warning
397         ("The SCTP association must be established before a new stream can be created");
398     goto invalid_state;
399   }
400
401   if (!template)
402     goto invalid_parameter;
403
404   if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1)
405       || stream_id > 65534)     /* 65535 is not a valid stream id */
406     goto invalid_parameter;
407
408   new_pad = gst_element_get_static_pad (element, new_pad_name);
409   if (new_pad) {
410     gst_object_unref (new_pad);
411     new_pad = NULL;
412     goto invalid_parameter;
413   }
414
415   new_pad =
416       g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction",
417       template->direction, "template", template, NULL);
418   gst_pad_set_chain_function (new_pad,
419       GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain));
420   gst_pad_set_event_function (new_pad,
421       GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event));
422
423   sctpenc_pad = GST_SCTP_ENC_PAD (new_pad);
424   sctpenc_pad->stream_id = stream_id;
425   sctpenc_pad->ppid = DEFAULT_SCTP_PPID;
426
427   if (caps) {
428     get_config_from_caps (caps, &sctpenc_pad->ordered,
429         &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
430         &is_new_ppid);
431
432     if (is_new_ppid)
433       sctpenc_pad->ppid = new_ppid;
434   }
435
436   sctpenc_pad->flushing = FALSE;
437
438   if (!gst_pad_set_active (new_pad, TRUE))
439     goto error_cleanup;
440
441   if (!gst_element_add_pad (element, new_pad))
442     goto error_cleanup;
443
444 invalid_state:
445 invalid_parameter:
446   return new_pad;
447 error_cleanup:
448   gst_object_unref (new_pad);
449   return NULL;
450 }
451
452 static void
453 gst_sctp_enc_release_pad (GstElement * element, GstPad * pad)
454 {
455   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
456   GstSctpEnc *self;
457   guint stream_id = 0;
458
459   self = GST_SCTP_ENC (element);
460
461   g_mutex_lock (&sctpenc_pad->lock);
462   sctpenc_pad->flushing = TRUE;
463   g_cond_signal (&sctpenc_pad->cond);
464   g_mutex_unlock (&sctpenc_pad->lock);
465
466   stream_id = sctpenc_pad->stream_id;
467   gst_pad_set_active (pad, FALSE);
468
469   if (self->sctp_association)
470     gst_sctp_association_reset_stream (self->sctp_association, stream_id);
471
472   gst_element_remove_pad (element, pad);
473 }
474
475 static void
476 gst_sctp_enc_srcpad_loop (GstPad * pad)
477 {
478   GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad));
479   GstFlowReturn flow_ret;
480   GstDataQueueItem *item;
481
482   if (self->need_stream_start_caps) {
483     gchar s_id[32];
484     GstCaps *caps;
485
486     g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ());
487     gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id));
488
489     caps = gst_caps_new_empty_simple ("application/x-sctp");
490     gst_pad_set_caps (self->src_pad, caps);
491     gst_caps_unref (caps);
492
493     self->need_stream_start_caps = FALSE;
494   }
495
496   if (self->need_segment) {
497     GstSegment segment;
498
499     gst_segment_init (&segment, GST_FORMAT_BYTES);
500     gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment));
501
502     self->need_segment = FALSE;
503   }
504
505   if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
506     GstBuffer *buffer = GST_BUFFER (item->object);
507
508     flow_ret = gst_pad_push (self->src_pad, buffer);
509     item->object = NULL;
510
511     GST_OBJECT_LOCK (self);
512     self->src_ret = flow_ret;
513     GST_OBJECT_UNLOCK (self);
514
515     if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
516             || flow_ret == GST_FLOW_NOT_LINKED)) {
517       GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
518           gst_flow_get_name (flow_ret));
519     } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
520       GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
521           gst_flow_get_name (flow_ret));
522     }
523
524     if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
525       GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
526       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
527       gst_data_queue_flush (self->outbound_sctp_packet_queue);
528       gst_pad_pause_task (pad);
529     }
530
531     item->destroy (item);
532   } else {
533     GST_OBJECT_LOCK (self);
534     self->src_ret = GST_FLOW_FLUSHING;
535     GST_OBJECT_UNLOCK (self);
536
537     GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
538     gst_pad_pause_task (pad);
539   }
540 }
541
542 static GstFlowReturn
543 gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
544 {
545   GstSctpEnc *self = GST_SCTP_ENC (parent);
546   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
547   GstMapInfo map;
548   guint32 ppid;
549   gboolean ordered;
550   GstSctpAssociationPartialReliability pr;
551   guint32 pr_param;
552   gpointer state = NULL;
553   GstMeta *meta;
554   const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
555   GstFlowReturn flow_ret = GST_FLOW_ERROR;
556
557   GST_OBJECT_LOCK (self);
558   if (self->src_ret != GST_FLOW_OK) {
559     GST_ERROR_OBJECT (pad, "Pushing on source pad failed before: %s",
560         gst_flow_get_name (self->src_ret));
561     flow_ret = self->src_ret;
562     GST_OBJECT_UNLOCK (self);
563     gst_buffer_unref (buffer);
564     return flow_ret;
565   }
566   GST_OBJECT_UNLOCK (self);
567
568   ppid = sctpenc_pad->ppid;
569   ordered = sctpenc_pad->ordered;
570   pr = sctpenc_pad->reliability;
571   pr_param = sctpenc_pad->reliability_param;
572
573   while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
574     if (meta->info->api == meta_info->api) {
575       GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta;
576
577       ppid = sctp_send_meta->ppid;
578       ordered = sctp_send_meta->ordered;
579       pr_param = sctp_send_meta->pr_param;
580       switch (sctp_send_meta->pr) {
581         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE:
582           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
583           break;
584         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX:
585           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
586           break;
587         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF:
588           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
589           break;
590         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL:
591           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
592           break;
593       }
594       break;
595     }
596   }
597
598   if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
599     g_warning ("Could not map GstBuffer");
600     goto error;
601   }
602
603   g_mutex_lock (&sctpenc_pad->lock);
604   while (!sctpenc_pad->flushing) {
605     gboolean data_sent = FALSE;
606
607     g_mutex_unlock (&sctpenc_pad->lock);
608
609     data_sent =
610         gst_sctp_association_send_data (self->sctp_association, map.data,
611         map.size, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param);
612
613     g_mutex_lock (&sctpenc_pad->lock);
614     if (data_sent) {
615       sctpenc_pad->bytes_sent += map.size;
616       break;
617     } else if (!sctpenc_pad->flushing) {
618       gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
619
620       /* The buffer was probably full. Retry in a while */
621       GST_OBJECT_LOCK (self);
622       g_queue_push_tail (&self->pending_pads, sctpenc_pad);
623       GST_OBJECT_UNLOCK (self);
624
625       g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
626
627       GST_OBJECT_LOCK (self);
628       g_queue_remove (&self->pending_pads, sctpenc_pad);
629       GST_OBJECT_UNLOCK (self);
630     }
631   }
632   flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
633   g_mutex_unlock (&sctpenc_pad->lock);
634
635   gst_buffer_unmap (buffer, &map);
636 error:
637   gst_buffer_unref (buffer);
638   return flow_ret;
639 }
640
641 static gboolean
642 gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
643 {
644   GstSctpEnc *self = GST_SCTP_ENC (parent);
645   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
646   gboolean ret, is_new_ppid;
647   guint32 new_ppid;
648
649   switch (GST_EVENT_TYPE (event)) {
650     case GST_EVENT_CAPS:{
651       GstCaps *caps;
652
653       gst_event_parse_caps (event, &caps);
654       get_config_from_caps (caps, &sctpenc_pad->ordered,
655           &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
656           &is_new_ppid);
657       if (is_new_ppid)
658         sctpenc_pad->ppid = new_ppid;
659       gst_event_unref (event);
660       ret = TRUE;
661       break;
662     }
663     case GST_EVENT_STREAM_START:
664     case GST_EVENT_SEGMENT:
665       /* Drop these, we create our own */
666       ret = TRUE;
667       gst_event_unref (event);
668       break;
669     case GST_EVENT_EOS:
670       /* Drop this, we're never EOS until shut down */
671       ret = TRUE;
672       gst_event_unref (event);
673       break;
674     case GST_EVENT_FLUSH_START:
675       g_mutex_lock (&sctpenc_pad->lock);
676       sctpenc_pad->flushing = TRUE;
677       g_cond_signal (&sctpenc_pad->cond);
678       g_mutex_unlock (&sctpenc_pad->lock);
679
680       ret = gst_pad_event_default (pad, parent, event);
681       break;
682     case GST_EVENT_FLUSH_STOP:
683       sctpenc_pad->flushing = FALSE;
684       GST_OBJECT_LOCK (self);
685       self->src_ret = GST_FLOW_OK;
686       GST_OBJECT_UNLOCK (self);
687       ret = gst_pad_event_default (pad, parent, event);
688       break;
689     default:
690       ret = gst_pad_event_default (pad, parent, event);
691       break;
692   }
693   return ret;
694 }
695
696 static void
697 flush_sinkpad (const GValue * item, gpointer user_data)
698 {
699   GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
700   gboolean flush = GPOINTER_TO_INT (user_data);
701
702   if (flush) {
703     g_mutex_lock (&sctpenc_pad->lock);
704     sctpenc_pad->flushing = TRUE;
705     g_cond_signal (&sctpenc_pad->cond);
706     g_mutex_unlock (&sctpenc_pad->lock);
707   } else {
708     sctpenc_pad->flushing = FALSE;
709   }
710 }
711
712 static gboolean
713 gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
714 {
715   GstSctpEnc *self = GST_SCTP_ENC (parent);
716   gboolean ret;
717
718   switch (GST_EVENT_TYPE (event)) {
719     case GST_EVENT_FLUSH_START:{
720       GstIterator *it;
721
722       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
723       gst_data_queue_flush (self->outbound_sctp_packet_queue);
724
725       it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
726       while (gst_iterator_foreach (it, flush_sinkpad,
727               GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
728         gst_iterator_resync (it);
729       gst_iterator_free (it);
730
731       ret = gst_pad_event_default (pad, parent, event);
732       break;
733     }
734     case GST_EVENT_RECONFIGURE:
735     case GST_EVENT_FLUSH_STOP:{
736       GstIterator *it;
737
738       it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
739       while (gst_iterator_foreach (it, flush_sinkpad,
740               GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
741         gst_iterator_resync (it);
742       gst_iterator_free (it);
743
744       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
745       self->need_segment = TRUE;
746       GST_OBJECT_LOCK (self);
747       self->src_ret = GST_FLOW_OK;
748       GST_OBJECT_UNLOCK (self);
749       gst_pad_start_task (self->src_pad,
750           (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
751
752       ret = gst_pad_event_default (pad, parent, event);
753       break;
754     }
755     default:
756       ret = gst_pad_event_default (pad, parent, event);
757       break;
758   }
759   return ret;
760 }
761
762 static gboolean
763 configure_association (GstSctpEnc * self)
764 {
765   gint state;
766
767   self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
768
769   g_object_get (self->sctp_association, "state", &state, NULL);
770
771   if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
772     GST_WARNING_OBJECT (self,
773         "Could not configure SCTP association. Association already in use!");
774     g_object_unref (self->sctp_association);
775     self->sctp_association = NULL;
776     goto error;
777   }
778
779   self->signal_handler_state_changed =
780       g_signal_connect_object (self->sctp_association, "notify::state",
781       G_CALLBACK (on_sctp_association_state_changed), self, 0);
782
783   g_object_bind_property (self, "remote-sctp-port", self->sctp_association,
784       "remote-port", G_BINDING_SYNC_CREATE);
785
786   g_object_bind_property (self, "use-sock-stream", self->sctp_association,
787       "use-sock-stream", G_BINDING_SYNC_CREATE);
788
789   gst_sctp_association_set_on_packet_out (self->sctp_association,
790       on_sctp_packet_out, self);
791
792   return TRUE;
793 error:
794   return FALSE;
795 }
796
797 static void
798 on_sctp_association_state_changed (GstSctpAssociation * sctp_association,
799     GParamSpec * pspec, GstSctpEnc * self)
800 {
801   gint state;
802
803   g_object_get (sctp_association, "state", &state, NULL);
804   switch (state) {
805     case GST_SCTP_ASSOCIATION_STATE_NEW:
806       break;
807     case GST_SCTP_ASSOCIATION_STATE_READY:
808       gst_sctp_association_start (sctp_association);
809       break;
810     case GST_SCTP_ASSOCIATION_STATE_CONNECTING:
811       break;
812     case GST_SCTP_ASSOCIATION_STATE_CONNECTED:
813       g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
814           TRUE);
815       break;
816     case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING:
817       g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
818           FALSE);
819       break;
820     case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED:
821       break;
822     case GST_SCTP_ASSOCIATION_STATE_ERROR:
823       break;
824   }
825 }
826
827 static void
828 data_queue_item_free (GstDataQueueItem * item)
829 {
830   if (item->object)
831     gst_mini_object_unref (item->object);
832   g_free (item);
833 }
834
835 static void
836 on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
837     gsize length, gpointer user_data)
838 {
839   GstSctpEnc *self = user_data;
840   GstBuffer *gstbuf;
841   GstDataQueueItem *item;
842   GList *pending_pads, *l;
843   GstSctpEncPad *sctpenc_pad;
844
845   gstbuf = gst_buffer_new_wrapped (g_memdup (buf, length), length);
846
847   item = g_new0 (GstDataQueueItem, 1);
848   item->object = GST_MINI_OBJECT (gstbuf);
849   item->size = length;
850   item->visible = TRUE;
851   item->destroy = (GDestroyNotify) data_queue_item_free;
852
853   if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) {
854     item->destroy (item);
855     GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
856   }
857
858   /* Wake up pads in the order they waited, oldest pad first */
859   GST_OBJECT_LOCK (self);
860   pending_pads = NULL;
861   while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) {
862     pending_pads = g_list_prepend (pending_pads, sctpenc_pad);
863   }
864   GST_OBJECT_UNLOCK (self);
865
866   for (l = pending_pads; l; l = l->next) {
867     sctpenc_pad = l->data;
868     g_mutex_lock (&sctpenc_pad->lock);
869     g_cond_signal (&sctpenc_pad->cond);
870     g_mutex_unlock (&sctpenc_pad->lock);
871   }
872   g_list_free (pending_pads);
873 }
874
875 static void
876 stop_srcpad_task (GstPad * pad, GstSctpEnc * self)
877 {
878   gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
879   gst_data_queue_flush (self->outbound_sctp_packet_queue);
880   gst_pad_stop_task (pad);
881 }
882
883 static void
884 remove_sinkpad (const GValue * item, gpointer user_data)
885 {
886   GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
887   GstSctpEnc *self = user_data;
888
889   gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad));
890 }
891
892 static void
893 sctpenc_cleanup (GstSctpEnc * self)
894 {
895   GstIterator *it;
896
897   /* FIXME: make this threadsafe */
898   /* gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL); */
899
900   g_signal_handler_disconnect (self->sctp_association,
901       self->signal_handler_state_changed);
902   stop_srcpad_task (self->src_pad, self);
903   gst_sctp_association_force_close (self->sctp_association);
904   g_object_unref (self->sctp_association);
905   self->sctp_association = NULL;
906
907   it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
908   while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC)
909     gst_iterator_resync (it);
910   gst_iterator_free (it);
911   g_queue_clear (&self->pending_pads);
912 }
913
914 static void
915 get_config_from_caps (const GstCaps * caps, gboolean * ordered,
916     GstSctpAssociationPartialReliability * reliability,
917     guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available)
918 {
919   GstStructure *s;
920   guint i, n;
921
922   *ordered = TRUE;
923   *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
924   *reliability_param = 0;
925   *ppid_available = FALSE;
926
927   n = gst_caps_get_size (caps);
928   for (i = 0; i < n; i++) {
929     s = gst_caps_get_structure (caps, i);
930     if (gst_structure_has_field (s, "ordered")) {
931       const GValue *v = gst_structure_get_value (s, "ordered");
932       *ordered = g_value_get_boolean (v);
933     }
934     if (gst_structure_has_field (s, "partially-reliability")) {
935       const GValue *v = gst_structure_get_value (s, "partially-reliability");
936       const gchar *reliability_string = g_value_get_string (v);
937
938       if (!g_strcmp0 (reliability_string, "none"))
939         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
940       else if (!g_strcmp0 (reliability_string, "ttl"))
941         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
942       else if (!g_strcmp0 (reliability_string, "buf"))
943         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
944       else if (!g_strcmp0 (reliability_string, "rtx"))
945         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
946     }
947     if (gst_structure_has_field (s, "reliability-parameter")) {
948       const GValue *v = gst_structure_get_value (s, "reliability-parameter");
949       *reliability_param = g_value_get_uint (v);
950     }
951     if (gst_structure_has_field (s, "ppid")) {
952       const GValue *v = gst_structure_get_value (s, "ppid");
953       *ppid = g_value_get_uint (v);
954       *ppid_available = TRUE;
955     }
956   }
957 }
958
959 static guint64
960 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id)
961 {
962   gchar *pad_name;
963   GstPad *pad;
964   GstSctpEncPad *sctpenc_pad;
965   guint64 bytes_sent;
966
967   pad_name = g_strdup_printf ("sink_%u", stream_id);
968   pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
969   g_free (pad_name);
970
971   if (!pad) {
972     GST_DEBUG_OBJECT (self,
973         "Buffered amount requested on a stream that does not exist!");
974     return 0;
975   }
976
977   sctpenc_pad = GST_SCTP_ENC_PAD (pad);
978
979   g_mutex_lock (&sctpenc_pad->lock);
980   bytes_sent = sctpenc_pad->bytes_sent;
981   g_mutex_unlock (&sctpenc_pad->lock);
982
983   gst_object_unref (sctpenc_pad);
984
985   return bytes_sent;
986 }