2 * Copyright (c) 2015, Collabora Ltd.
4 * Redistribution and use in source and binary forms, with or without modification,
5 * are permitted provided that the following conditions are met:
7 * 1. Redistributions of source code must retain the above copyright notice, this
8 * list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice, this
11 * list of conditions and the following disclaimer in the documentation and/or other
12 * materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
29 #include "gstsctpenc.h"
31 #include <gst/sctp/sctpsendmeta.h>
34 GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category);
35 #define GST_CAT_DEFAULT gst_sctp_enc_debug_category
37 #define gst_sctp_enc_parent_class parent_class
38 G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT);
40 static GstStaticPadTemplate sink_template =
41 GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK,
42 GST_PAD_REQUEST, GST_STATIC_CAPS_ANY);
44 static GstStaticPadTemplate src_template =
45 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
46 GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
50 SIGNAL_SCTP_ASSOCIATION_ESTABLISHED,
51 SIGNAL_GET_STREAM_BYTES_SENT,
55 static guint signals[NUM_SIGNALS];
61 PROP_GST_SCTP_ASSOCIATION_ID,
62 PROP_REMOTE_SCTP_PORT,
68 static GParamSpec *properties[NUM_PROPERTIES];
70 #define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
71 #define DEFAULT_REMOTE_SCTP_PORT 0
72 #define DEFAULT_GST_SCTP_ORDERED TRUE
73 #define DEFAULT_SCTP_PPID 1
74 #define DEFAULT_USE_SOCK_STREAM FALSE
76 #define BUFFER_FULL_SLEEP_TIME 100000
78 GType gst_sctp_enc_pad_get_type (void);
80 #define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type())
81 #define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad))
82 #define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass))
83 #define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD))
84 #define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD))
86 typedef struct _GstSctpEncPad GstSctpEncPad;
87 typedef GstPadClass GstSctpEncPadClass;
96 GstSctpAssociationPartialReliability reliability;
97 guint32 reliability_param;
106 G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
109 gst_sctp_enc_pad_finalize (GObject * object)
111 GstSctpEncPad *self = GST_SCTP_ENC_PAD (object);
113 g_cond_clear (&self->cond);
114 g_mutex_clear (&self->lock);
116 G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object);
120 gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass)
122 GObjectClass *gobject_class = (GObjectClass *) klass;
124 gobject_class->finalize = gst_sctp_enc_pad_finalize;
128 gst_sctp_enc_pad_init (GstSctpEncPad * self)
130 g_mutex_init (&self->lock);
131 g_cond_init (&self->cond);
132 self->flushing = FALSE;
135 static void gst_sctp_enc_finalize (GObject * object);
136 static void gst_sctp_enc_set_property (GObject * object, guint prop_id,
137 const GValue * value, GParamSpec * pspec);
138 static void gst_sctp_enc_get_property (GObject * object, guint prop_id,
139 GValue * value, GParamSpec * pspec);
140 static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element,
141 GstStateChange transition);
142 static GstPad *gst_sctp_enc_request_new_pad (GstElement * element,
143 GstPadTemplate * template, const gchar * name, const GstCaps * caps);
144 static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad);
145 static void gst_sctp_enc_srcpad_loop (GstPad * pad);
146 static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent,
148 static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent,
150 static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent,
152 static void on_sctp_association_state_changed (GstSctpAssociation *
153 sctp_association, GParamSpec * pspec, GstSctpEnc * self);
155 static gboolean configure_association (GstSctpEnc * self);
156 static void on_sctp_packet_out (GstSctpAssociation * sctp_association,
157 const guint8 * buf, gsize length, gpointer user_data);
158 static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self);
159 static void sctpenc_cleanup (GstSctpEnc * self);
160 static void get_config_from_caps (const GstCaps * caps, gboolean * ordered,
161 GstSctpAssociationPartialReliability * reliability,
162 guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available);
163 static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id);
166 gst_sctp_enc_class_init (GstSctpEncClass * klass)
168 GObjectClass *gobject_class;
169 GstElementClass *element_class;
171 gobject_class = (GObjectClass *) klass;
172 element_class = (GstElementClass *) klass;
174 GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category,
175 "sctpenc", 0, "debug category for sctpenc element");
177 gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
178 gst_static_pad_template_get (&src_template));
179 gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
180 gst_static_pad_template_get (&sink_template));
182 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize);
183 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property);
184 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property);
186 element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state);
187 element_class->request_new_pad =
188 GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad);
189 element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad);
191 properties[PROP_GST_SCTP_ASSOCIATION_ID] =
192 g_param_spec_uint ("sctp-association-id",
193 "SCTP Association ID",
194 "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
195 "This value must be set before any pads are requested.",
196 0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID,
197 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
199 properties[PROP_REMOTE_SCTP_PORT] =
200 g_param_spec_uint ("remote-sctp-port",
202 "Sctp remote sctp port for the sctp association. The local port is configured via the "
203 "GstSctpDec element.",
204 0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT,
205 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
207 properties[PROP_USE_SOCK_STREAM] =
208 g_param_spec_boolean ("use-sock-stream",
210 "When set to TRUE, a sequenced, reliable, connection-based connection is used."
211 "When TRUE the partial reliability parameters of the channel are ignored.",
212 DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
214 g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
216 signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] =
217 g_signal_new ("sctp-association-established",
218 G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST,
219 G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established),
220 NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
222 signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent",
223 G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
224 G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL,
225 NULL, G_TYPE_UINT64, 1, G_TYPE_UINT);
227 klass->on_get_stream_bytes_sent =
228 GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent);
230 gst_element_class_set_static_metadata (element_class,
232 "Encoder/Network/SCTP",
233 "Encodes packets with SCTP",
234 "George Kiagiadakis <george.kiagiadakis@collabora.com>");
238 data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
239 guint64 time, gpointer user_data)
241 /* TODO: When are we considered full? */
246 data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
251 data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
256 gst_sctp_enc_init (GstSctpEnc * self)
258 self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
259 self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT;
261 self->sctp_association = NULL;
262 self->outbound_sctp_packet_queue =
263 gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb,
264 data_queue_empty_cb, NULL);
266 self->src_pad = gst_pad_new_from_static_template (&src_template, "src");
267 gst_pad_set_event_function (self->src_pad,
268 GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event));
269 gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
271 g_queue_init (&self->pending_pads);
272 self->src_ret = GST_FLOW_FLUSHING;
276 gst_sctp_enc_finalize (GObject * object)
278 GstSctpEnc *self = GST_SCTP_ENC (object);
280 g_queue_clear (&self->pending_pads);
281 gst_object_unref (self->outbound_sctp_packet_queue);
283 G_OBJECT_CLASS (parent_class)->finalize (object);
287 gst_sctp_enc_set_property (GObject * object, guint prop_id,
288 const GValue * value, GParamSpec * pspec)
290 GstSctpEnc *self = GST_SCTP_ENC (object);
293 case PROP_GST_SCTP_ASSOCIATION_ID:
294 self->sctp_association_id = g_value_get_uint (value);
296 case PROP_REMOTE_SCTP_PORT:
297 self->remote_sctp_port = g_value_get_uint (value);
299 case PROP_USE_SOCK_STREAM:
300 self->use_sock_stream = g_value_get_boolean (value);
303 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
309 gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value,
312 GstSctpEnc *self = GST_SCTP_ENC (object);
315 case PROP_GST_SCTP_ASSOCIATION_ID:
316 g_value_set_uint (value, self->sctp_association_id);
318 case PROP_REMOTE_SCTP_PORT:
319 g_value_set_uint (value, self->remote_sctp_port);
321 case PROP_USE_SOCK_STREAM:
322 g_value_set_boolean (value, self->use_sock_stream);
325 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
330 static GstStateChangeReturn
331 gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
333 GstSctpEnc *self = GST_SCTP_ENC (element);
334 GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
337 switch (transition) {
338 case GST_STATE_CHANGE_NULL_TO_READY:
340 case GST_STATE_CHANGE_READY_TO_PAUSED:
341 self->need_segment = self->need_stream_start_caps = TRUE;
342 self->src_ret = GST_FLOW_OK;
343 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
344 res = configure_association (self);
346 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
348 case GST_STATE_CHANGE_PAUSED_TO_READY:
349 sctpenc_cleanup (self);
350 self->src_ret = GST_FLOW_FLUSHING;
352 case GST_STATE_CHANGE_READY_TO_NULL:
359 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
361 switch (transition) {
362 case GST_STATE_CHANGE_NULL_TO_READY:
364 case GST_STATE_CHANGE_READY_TO_PAUSED:
365 gst_pad_start_task (self->src_pad,
366 (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
368 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
370 case GST_STATE_CHANGE_PAUSED_TO_READY:
372 case GST_STATE_CHANGE_READY_TO_NULL:
382 gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template,
383 const gchar * new_pad_name, const GstCaps * caps)
385 GstSctpEnc *self = GST_SCTP_ENC (element);
386 GstPad *new_pad = NULL;
387 GstSctpEncPad *sctpenc_pad;
391 gboolean is_new_ppid;
393 g_object_get (self->sctp_association, "state", &state, NULL);
395 if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
397 ("The SCTP association must be established before a new stream can be created");
402 goto invalid_parameter;
404 if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1)
405 || stream_id > 65534) /* 65535 is not a valid stream id */
406 goto invalid_parameter;
408 new_pad = gst_element_get_static_pad (element, new_pad_name);
410 gst_object_unref (new_pad);
412 goto invalid_parameter;
416 g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction",
417 template->direction, "template", template, NULL);
418 gst_pad_set_chain_function (new_pad,
419 GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain));
420 gst_pad_set_event_function (new_pad,
421 GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event));
423 sctpenc_pad = GST_SCTP_ENC_PAD (new_pad);
424 sctpenc_pad->stream_id = stream_id;
425 sctpenc_pad->ppid = DEFAULT_SCTP_PPID;
428 get_config_from_caps (caps, &sctpenc_pad->ordered,
429 &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
433 sctpenc_pad->ppid = new_ppid;
436 sctpenc_pad->flushing = FALSE;
438 if (!gst_pad_set_active (new_pad, TRUE))
441 if (!gst_element_add_pad (element, new_pad))
448 gst_object_unref (new_pad);
453 gst_sctp_enc_release_pad (GstElement * element, GstPad * pad)
455 GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
459 self = GST_SCTP_ENC (element);
461 g_mutex_lock (&sctpenc_pad->lock);
462 sctpenc_pad->flushing = TRUE;
463 g_cond_signal (&sctpenc_pad->cond);
464 g_mutex_unlock (&sctpenc_pad->lock);
466 stream_id = sctpenc_pad->stream_id;
467 gst_pad_set_active (pad, FALSE);
469 if (self->sctp_association)
470 gst_sctp_association_reset_stream (self->sctp_association, stream_id);
472 gst_element_remove_pad (element, pad);
476 gst_sctp_enc_srcpad_loop (GstPad * pad)
478 GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad));
479 GstFlowReturn flow_ret;
480 GstDataQueueItem *item;
482 if (self->need_stream_start_caps) {
486 g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ());
487 gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id));
489 caps = gst_caps_new_empty_simple ("application/x-sctp");
490 gst_pad_set_caps (self->src_pad, caps);
491 gst_caps_unref (caps);
493 self->need_stream_start_caps = FALSE;
496 if (self->need_segment) {
499 gst_segment_init (&segment, GST_FORMAT_BYTES);
500 gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment));
502 self->need_segment = FALSE;
505 if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
506 GstBuffer *buffer = GST_BUFFER (item->object);
508 flow_ret = gst_pad_push (self->src_pad, buffer);
511 GST_OBJECT_LOCK (self);
512 self->src_ret = flow_ret;
513 GST_OBJECT_UNLOCK (self);
515 if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
516 || flow_ret == GST_FLOW_NOT_LINKED)) {
517 GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
518 gst_flow_get_name (flow_ret));
519 } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
520 GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
521 gst_flow_get_name (flow_ret));
524 if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
525 GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
526 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
527 gst_data_queue_flush (self->outbound_sctp_packet_queue);
528 gst_pad_pause_task (pad);
531 item->destroy (item);
533 GST_OBJECT_LOCK (self);
534 self->src_ret = GST_FLOW_FLUSHING;
535 GST_OBJECT_UNLOCK (self);
537 GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
538 gst_pad_pause_task (pad);
543 gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
545 GstSctpEnc *self = GST_SCTP_ENC (parent);
546 GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
550 GstSctpAssociationPartialReliability pr;
552 gpointer state = NULL;
554 const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
555 GstFlowReturn flow_ret = GST_FLOW_ERROR;
557 GST_OBJECT_LOCK (self);
558 if (self->src_ret != GST_FLOW_OK) {
559 GST_ERROR_OBJECT (pad, "Pushing on source pad failed before: %s",
560 gst_flow_get_name (self->src_ret));
561 flow_ret = self->src_ret;
562 GST_OBJECT_UNLOCK (self);
563 gst_buffer_unref (buffer);
566 GST_OBJECT_UNLOCK (self);
568 ppid = sctpenc_pad->ppid;
569 ordered = sctpenc_pad->ordered;
570 pr = sctpenc_pad->reliability;
571 pr_param = sctpenc_pad->reliability_param;
573 while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
574 if (meta->info->api == meta_info->api) {
575 GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta;
577 ppid = sctp_send_meta->ppid;
578 ordered = sctp_send_meta->ordered;
579 pr_param = sctp_send_meta->pr_param;
580 switch (sctp_send_meta->pr) {
581 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE:
582 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
584 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX:
585 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
587 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF:
588 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
590 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL:
591 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
598 if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
599 g_warning ("Could not map GstBuffer");
603 g_mutex_lock (&sctpenc_pad->lock);
604 while (!sctpenc_pad->flushing) {
605 gboolean data_sent = FALSE;
607 g_mutex_unlock (&sctpenc_pad->lock);
610 gst_sctp_association_send_data (self->sctp_association, map.data,
611 map.size, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param);
613 g_mutex_lock (&sctpenc_pad->lock);
615 sctpenc_pad->bytes_sent += map.size;
617 } else if (!sctpenc_pad->flushing) {
618 gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
620 /* The buffer was probably full. Retry in a while */
621 GST_OBJECT_LOCK (self);
622 g_queue_push_tail (&self->pending_pads, sctpenc_pad);
623 GST_OBJECT_UNLOCK (self);
625 g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
627 GST_OBJECT_LOCK (self);
628 g_queue_remove (&self->pending_pads, sctpenc_pad);
629 GST_OBJECT_UNLOCK (self);
632 flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
633 g_mutex_unlock (&sctpenc_pad->lock);
635 gst_buffer_unmap (buffer, &map);
637 gst_buffer_unref (buffer);
642 gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
644 GstSctpEnc *self = GST_SCTP_ENC (parent);
645 GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
646 gboolean ret, is_new_ppid;
649 switch (GST_EVENT_TYPE (event)) {
650 case GST_EVENT_CAPS:{
653 gst_event_parse_caps (event, &caps);
654 get_config_from_caps (caps, &sctpenc_pad->ordered,
655 &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
658 sctpenc_pad->ppid = new_ppid;
659 gst_event_unref (event);
663 case GST_EVENT_STREAM_START:
664 case GST_EVENT_SEGMENT:
665 /* Drop these, we create our own */
667 gst_event_unref (event);
670 /* Drop this, we're never EOS until shut down */
672 gst_event_unref (event);
674 case GST_EVENT_FLUSH_START:
675 g_mutex_lock (&sctpenc_pad->lock);
676 sctpenc_pad->flushing = TRUE;
677 g_cond_signal (&sctpenc_pad->cond);
678 g_mutex_unlock (&sctpenc_pad->lock);
680 ret = gst_pad_event_default (pad, parent, event);
682 case GST_EVENT_FLUSH_STOP:
683 sctpenc_pad->flushing = FALSE;
684 GST_OBJECT_LOCK (self);
685 self->src_ret = GST_FLOW_OK;
686 GST_OBJECT_UNLOCK (self);
687 ret = gst_pad_event_default (pad, parent, event);
690 ret = gst_pad_event_default (pad, parent, event);
697 flush_sinkpad (const GValue * item, gpointer user_data)
699 GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
700 gboolean flush = GPOINTER_TO_INT (user_data);
703 g_mutex_lock (&sctpenc_pad->lock);
704 sctpenc_pad->flushing = TRUE;
705 g_cond_signal (&sctpenc_pad->cond);
706 g_mutex_unlock (&sctpenc_pad->lock);
708 sctpenc_pad->flushing = FALSE;
713 gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
715 GstSctpEnc *self = GST_SCTP_ENC (parent);
718 switch (GST_EVENT_TYPE (event)) {
719 case GST_EVENT_FLUSH_START:{
722 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
723 gst_data_queue_flush (self->outbound_sctp_packet_queue);
725 it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
726 while (gst_iterator_foreach (it, flush_sinkpad,
727 GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
728 gst_iterator_resync (it);
729 gst_iterator_free (it);
731 ret = gst_pad_event_default (pad, parent, event);
734 case GST_EVENT_RECONFIGURE:
735 case GST_EVENT_FLUSH_STOP:{
738 it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
739 while (gst_iterator_foreach (it, flush_sinkpad,
740 GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
741 gst_iterator_resync (it);
742 gst_iterator_free (it);
744 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
745 self->need_segment = TRUE;
746 GST_OBJECT_LOCK (self);
747 self->src_ret = GST_FLOW_OK;
748 GST_OBJECT_UNLOCK (self);
749 gst_pad_start_task (self->src_pad,
750 (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
752 ret = gst_pad_event_default (pad, parent, event);
756 ret = gst_pad_event_default (pad, parent, event);
763 configure_association (GstSctpEnc * self)
767 self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
769 g_object_get (self->sctp_association, "state", &state, NULL);
771 if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
772 GST_WARNING_OBJECT (self,
773 "Could not configure SCTP association. Association already in use!");
774 g_object_unref (self->sctp_association);
775 self->sctp_association = NULL;
779 self->signal_handler_state_changed =
780 g_signal_connect_object (self->sctp_association, "notify::state",
781 G_CALLBACK (on_sctp_association_state_changed), self, 0);
783 g_object_bind_property (self, "remote-sctp-port", self->sctp_association,
784 "remote-port", G_BINDING_SYNC_CREATE);
786 g_object_bind_property (self, "use-sock-stream", self->sctp_association,
787 "use-sock-stream", G_BINDING_SYNC_CREATE);
789 gst_sctp_association_set_on_packet_out (self->sctp_association,
790 on_sctp_packet_out, self);
798 on_sctp_association_state_changed (GstSctpAssociation * sctp_association,
799 GParamSpec * pspec, GstSctpEnc * self)
803 g_object_get (sctp_association, "state", &state, NULL);
805 case GST_SCTP_ASSOCIATION_STATE_NEW:
807 case GST_SCTP_ASSOCIATION_STATE_READY:
808 gst_sctp_association_start (sctp_association);
810 case GST_SCTP_ASSOCIATION_STATE_CONNECTING:
812 case GST_SCTP_ASSOCIATION_STATE_CONNECTED:
813 g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
816 case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING:
817 g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
820 case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED:
822 case GST_SCTP_ASSOCIATION_STATE_ERROR:
828 data_queue_item_free (GstDataQueueItem * item)
831 gst_mini_object_unref (item->object);
836 on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
837 gsize length, gpointer user_data)
839 GstSctpEnc *self = user_data;
841 GstDataQueueItem *item;
842 GList *pending_pads, *l;
843 GstSctpEncPad *sctpenc_pad;
845 gstbuf = gst_buffer_new_wrapped (g_memdup (buf, length), length);
847 item = g_new0 (GstDataQueueItem, 1);
848 item->object = GST_MINI_OBJECT (gstbuf);
850 item->visible = TRUE;
851 item->destroy = (GDestroyNotify) data_queue_item_free;
853 if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) {
854 item->destroy (item);
855 GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
858 /* Wake up pads in the order they waited, oldest pad first */
859 GST_OBJECT_LOCK (self);
861 while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) {
862 pending_pads = g_list_prepend (pending_pads, sctpenc_pad);
864 GST_OBJECT_UNLOCK (self);
866 for (l = pending_pads; l; l = l->next) {
867 sctpenc_pad = l->data;
868 g_mutex_lock (&sctpenc_pad->lock);
869 g_cond_signal (&sctpenc_pad->cond);
870 g_mutex_unlock (&sctpenc_pad->lock);
872 g_list_free (pending_pads);
876 stop_srcpad_task (GstPad * pad, GstSctpEnc * self)
878 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
879 gst_data_queue_flush (self->outbound_sctp_packet_queue);
880 gst_pad_stop_task (pad);
884 remove_sinkpad (const GValue * item, gpointer user_data)
886 GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
887 GstSctpEnc *self = user_data;
889 gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad));
893 sctpenc_cleanup (GstSctpEnc * self)
897 /* FIXME: make this threadsafe */
898 /* gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL); */
900 g_signal_handler_disconnect (self->sctp_association,
901 self->signal_handler_state_changed);
902 stop_srcpad_task (self->src_pad, self);
903 gst_sctp_association_force_close (self->sctp_association);
904 g_object_unref (self->sctp_association);
905 self->sctp_association = NULL;
907 it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
908 while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC)
909 gst_iterator_resync (it);
910 gst_iterator_free (it);
911 g_queue_clear (&self->pending_pads);
915 get_config_from_caps (const GstCaps * caps, gboolean * ordered,
916 GstSctpAssociationPartialReliability * reliability,
917 guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available)
923 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
924 *reliability_param = 0;
925 *ppid_available = FALSE;
927 n = gst_caps_get_size (caps);
928 for (i = 0; i < n; i++) {
929 s = gst_caps_get_structure (caps, i);
930 if (gst_structure_has_field (s, "ordered")) {
931 const GValue *v = gst_structure_get_value (s, "ordered");
932 *ordered = g_value_get_boolean (v);
934 if (gst_structure_has_field (s, "partially-reliability")) {
935 const GValue *v = gst_structure_get_value (s, "partially-reliability");
936 const gchar *reliability_string = g_value_get_string (v);
938 if (!g_strcmp0 (reliability_string, "none"))
939 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
940 else if (!g_strcmp0 (reliability_string, "ttl"))
941 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
942 else if (!g_strcmp0 (reliability_string, "buf"))
943 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
944 else if (!g_strcmp0 (reliability_string, "rtx"))
945 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
947 if (gst_structure_has_field (s, "reliability-parameter")) {
948 const GValue *v = gst_structure_get_value (s, "reliability-parameter");
949 *reliability_param = g_value_get_uint (v);
951 if (gst_structure_has_field (s, "ppid")) {
952 const GValue *v = gst_structure_get_value (s, "ppid");
953 *ppid = g_value_get_uint (v);
954 *ppid_available = TRUE;
960 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id)
964 GstSctpEncPad *sctpenc_pad;
967 pad_name = g_strdup_printf ("sink_%u", stream_id);
968 pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
972 GST_DEBUG_OBJECT (self,
973 "Buffered amount requested on a stream that does not exist!");
977 sctpenc_pad = GST_SCTP_ENC_PAD (pad);
979 g_mutex_lock (&sctpenc_pad->lock);
980 bytes_sent = sctpenc_pad->bytes_sent;
981 g_mutex_unlock (&sctpenc_pad->lock);
983 gst_object_unref (sctpenc_pad);