2 * Copyright (C) <2005,2006> Wim Taymans <wim@fluendo.com>
3 * <2013> Wim Taymans <wim.taymans@gmail.com>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
21 * Unless otherwise indicated, Source Code is licensed under MIT license.
22 * See further explanation attached in License Statement (distributed in the file
25 * Permission is hereby granted, free of charge, to any person obtaining a copy of
26 * this software and associated documentation files (the "Software"), to deal in
27 * the Software without restriction, including without limitation the rights to
28 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
29 * of the Software, and to permit persons to whom the Software is furnished to do
30 * so, subject to the following conditions:
32 * The above copyright notice and this permission notice shall be included in all
33 * copies or substantial portions of the Software.
35 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
36 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
37 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
38 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
39 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
40 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
43 /* Element-Checklist-Version: 5 */
46 * SECTION:element-rdtmanager
48 * @see_also: GstRtspSrc
50 * A simple RTP session manager used internally by rtspsrc.
53 /* #define HAVE_RTCP */
55 #include "gstrdtbuffer.h"
56 #include "rdtmanager.h"
57 #include "rdtjitterbuffer.h"
59 #include <gst/glib-compat-private.h>
63 GST_DEBUG_CATEGORY_STATIC (rdtmanager_debug);
64 #define GST_CAT_DEFAULT (rdtmanager_debug)
66 /* GstRDTManager signals and args */
69 SIGNAL_REQUEST_PT_MAP,
73 SIGNAL_ON_SSRC_COLLISION,
74 SIGNAL_ON_SSRC_VALIDATED,
75 SIGNAL_ON_SSRC_ACTIVE,
78 SIGNAL_ON_BYE_TIMEOUT,
84 #define DEFAULT_LATENCY_MS 200
92 static GstStaticPadTemplate gst_rdt_manager_recv_rtp_sink_template =
93 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
96 GST_STATIC_CAPS ("application/x-rdt")
99 static GstStaticPadTemplate gst_rdt_manager_recv_rtcp_sink_template =
100 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
103 GST_STATIC_CAPS ("application/x-rtcp")
106 static GstStaticPadTemplate gst_rdt_manager_recv_rtp_src_template =
107 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
110 GST_STATIC_CAPS ("application/x-rdt")
113 static GstStaticPadTemplate gst_rdt_manager_rtcp_src_template =
114 GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
117 GST_STATIC_CAPS ("application/x-rtcp")
120 static void gst_rdt_manager_finalize (GObject * object);
121 static void gst_rdt_manager_set_property (GObject * object,
122 guint prop_id, const GValue * value, GParamSpec * pspec);
123 static void gst_rdt_manager_get_property (GObject * object,
124 guint prop_id, GValue * value, GParamSpec * pspec);
126 static gboolean gst_rdt_manager_query_src (GstPad * pad, GstObject * parent,
128 static gboolean gst_rdt_manager_src_activate_mode (GstPad * pad,
129 GstObject * parent, GstPadMode mode, gboolean active);
131 static GstClock *gst_rdt_manager_provide_clock (GstElement * element);
132 static GstStateChangeReturn gst_rdt_manager_change_state (GstElement * element,
133 GstStateChange transition);
134 static GstPad *gst_rdt_manager_request_new_pad (GstElement * element,
135 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
136 static void gst_rdt_manager_release_pad (GstElement * element, GstPad * pad);
138 static gboolean gst_rdt_manager_parse_caps (GstRDTManager * rdtmanager,
139 GstRDTManagerSession * session, GstCaps * caps);
140 static gboolean gst_rdt_manager_event_rdt (GstPad * pad, GstObject * parent,
143 static GstFlowReturn gst_rdt_manager_chain_rdt (GstPad * pad,
144 GstObject * parent, GstBuffer * buffer);
145 static GstFlowReturn gst_rdt_manager_chain_rtcp (GstPad * pad,
146 GstObject * parent, GstBuffer * buffer);
147 static void gst_rdt_manager_loop (GstPad * pad);
149 static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 };
151 #define JBUF_LOCK(sess) (g_mutex_lock (&(sess)->jbuf_lock))
153 #define JBUF_LOCK_CHECK(sess,label) G_STMT_START { \
155 if (sess->srcresult != GST_FLOW_OK) \
159 #define JBUF_UNLOCK(sess) (g_mutex_unlock (&(sess)->jbuf_lock))
160 #define JBUF_WAIT(sess) (g_cond_wait (&(sess)->jbuf_cond, &(sess)->jbuf_lock))
162 #define JBUF_WAIT_CHECK(sess,label) G_STMT_START { \
164 if (sess->srcresult != GST_FLOW_OK) \
168 #define JBUF_SIGNAL(sess) (g_cond_signal (&(sess)->jbuf_cond))
170 /* Manages the receiving end of the packets.
172 * There is one such structure for each RTP session (audio/video/...).
173 * We get the RTP/RTCP packets and stuff them into the session manager.
175 struct _GstRDTManagerSession
183 /* we only support one ssrc and one pt */
192 /* the last seqnum we pushed out */
193 guint32 last_popped_seqnum;
194 /* the next expected seqnum */
196 /* last output time */
197 GstClockTime last_out_time;
199 /* the pads of the session */
200 GstPad *recv_rtp_sink;
201 GstPad *recv_rtp_src;
202 GstPad *recv_rtcp_sink;
205 GstFlowReturn srcresult;
212 /* jitterbuffer, lock and cond */
213 RDTJitterBuffer *jbuf;
217 /* some accounting */
219 guint64 num_duplicates;
222 /* find a session with the given id */
223 static GstRDTManagerSession *
224 find_session_by_id (GstRDTManager * rdtmanager, gint id)
228 for (walk = rdtmanager->sessions; walk; walk = g_slist_next (walk)) {
229 GstRDTManagerSession *sess = (GstRDTManagerSession *) walk->data;
237 /* create a session with the given id */
238 static GstRDTManagerSession *
239 create_session (GstRDTManager * rdtmanager, gint id)
241 GstRDTManagerSession *sess;
243 sess = g_new0 (GstRDTManagerSession, 1);
245 sess->dec = rdtmanager;
246 sess->jbuf = rdt_jitter_buffer_new ();
247 g_mutex_init (&sess->jbuf_lock);
248 g_cond_init (&sess->jbuf_cond);
249 rdtmanager->sessions = g_slist_prepend (rdtmanager->sessions, sess);
255 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
257 GstPad *srcpad = GST_PAD_CAST (user_data);
259 gst_pad_push_event (srcpad, gst_event_ref (*event));
265 activate_session (GstRDTManager * rdtmanager, GstRDTManagerSession * session,
266 guint32 ssrc, guint8 pt)
268 GstPadTemplate *templ;
269 GstElementClass *klass;
273 GValue args[3] = { {0}
278 GST_DEBUG_OBJECT (rdtmanager, "creating stream");
280 session->ssrc = ssrc;
284 g_value_init (&args[0], GST_TYPE_ELEMENT);
285 g_value_set_object (&args[0], rdtmanager);
286 g_value_init (&args[1], G_TYPE_UINT);
287 g_value_set_uint (&args[1], session->id);
288 g_value_init (&args[2], G_TYPE_UINT);
289 g_value_set_uint (&args[2], pt);
291 g_value_init (&ret, GST_TYPE_CAPS);
292 g_value_set_boxed (&ret, NULL);
294 g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0,
297 g_value_unset (&args[0]);
298 g_value_unset (&args[1]);
299 g_value_unset (&args[2]);
300 caps = (GstCaps *) g_value_dup_boxed (&ret);
301 g_value_unset (&ret);
304 gst_rdt_manager_parse_caps (rdtmanager, session, caps);
306 name = g_strdup_printf ("recv_rtp_src_%u_%u_%u", session->id, ssrc, pt);
307 klass = GST_ELEMENT_GET_CLASS (rdtmanager);
308 templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
309 session->recv_rtp_src = gst_pad_new_from_template (templ, name);
312 gst_pad_set_element_private (session->recv_rtp_src, session);
313 gst_pad_set_query_function (session->recv_rtp_src, gst_rdt_manager_query_src);
314 gst_pad_set_activatemode_function (session->recv_rtp_src,
315 gst_rdt_manager_src_activate_mode);
317 gst_pad_set_active (session->recv_rtp_src, TRUE);
319 gst_pad_sticky_events_foreach (session->recv_rtp_sink, forward_sticky_events,
320 session->recv_rtp_src);
321 gst_pad_set_caps (session->recv_rtp_src, caps);
322 gst_caps_unref (caps);
324 gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src);
330 free_session (GstRDTManagerSession * session)
332 g_object_unref (session->jbuf);
333 g_cond_clear (&session->jbuf_cond);
334 g_mutex_clear (&session->jbuf_lock);
338 #define gst_rdt_manager_parent_class parent_class
339 G_DEFINE_TYPE (GstRDTManager, gst_rdt_manager, GST_TYPE_ELEMENT);
340 GST_ELEMENT_REGISTER_DEFINE (rdtmanager, "rdtmanager",
341 GST_RANK_NONE, GST_TYPE_RDT_MANAGER);
343 /* BOXED:UINT,UINT */
344 #define g_marshal_value_peek_uint(v) g_value_get_uint (v)
347 gst_rdt_manager_marshal_BOXED__UINT_UINT (GClosure * closure,
348 GValue * return_value,
349 guint n_param_values,
350 const GValue * param_values,
351 gpointer invocation_hint, gpointer marshal_data)
353 typedef gpointer (*GMarshalFunc_BOXED__UINT_UINT) (gpointer data1,
354 guint arg_1, guint arg_2, gpointer data2);
355 register GMarshalFunc_BOXED__UINT_UINT callback;
356 register GCClosure *cc = (GCClosure *) closure;
357 register gpointer data1, data2;
360 g_return_if_fail (return_value != NULL);
361 g_return_if_fail (n_param_values == 3);
363 if (G_CCLOSURE_SWAP_DATA (closure)) {
364 data1 = closure->data;
365 data2 = g_value_peek_pointer (param_values + 0);
367 data1 = g_value_peek_pointer (param_values + 0);
368 data2 = closure->data;
371 (GMarshalFunc_BOXED__UINT_UINT) (marshal_data ? marshal_data :
374 v_return = callback (data1,
375 g_marshal_value_peek_uint (param_values + 1),
376 g_marshal_value_peek_uint (param_values + 2), data2);
378 g_value_take_boxed (return_value, v_return);
382 gst_rdt_manager_marshal_VOID__UINT_UINT (GClosure * closure,
383 GValue * return_value,
384 guint n_param_values,
385 const GValue * param_values,
386 gpointer invocation_hint, gpointer marshal_data)
388 typedef void (*GMarshalFunc_VOID__UINT_UINT) (gpointer data1,
389 guint arg_1, guint arg_2, gpointer data2);
390 register GMarshalFunc_VOID__UINT_UINT callback;
391 register GCClosure *cc = (GCClosure *) closure;
392 register gpointer data1, data2;
394 g_return_if_fail (n_param_values == 3);
396 if (G_CCLOSURE_SWAP_DATA (closure)) {
397 data1 = closure->data;
398 data2 = g_value_peek_pointer (param_values + 0);
400 data1 = g_value_peek_pointer (param_values + 0);
401 data2 = closure->data;
404 (GMarshalFunc_VOID__UINT_UINT) (marshal_data ? marshal_data :
408 g_marshal_value_peek_uint (param_values + 1),
409 g_marshal_value_peek_uint (param_values + 2), data2);
413 gst_rdt_manager_class_init (GstRDTManagerClass * g_class)
415 GObjectClass *gobject_class;
416 GstElementClass *gstelement_class;
417 GstRDTManagerClass *klass;
419 klass = (GstRDTManagerClass *) g_class;
420 gobject_class = (GObjectClass *) klass;
421 gstelement_class = (GstElementClass *) klass;
423 gobject_class->finalize = gst_rdt_manager_finalize;
424 gobject_class->set_property = gst_rdt_manager_set_property;
425 gobject_class->get_property = gst_rdt_manager_get_property;
427 g_object_class_install_property (gobject_class, PROP_LATENCY,
428 g_param_spec_uint ("latency", "Buffer latency in ms",
429 "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
430 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433 * GstRDTManager::request-pt-map:
434 * @rdtmanager: the object which received the signal
435 * @session: the session
438 * Request the payload type as #GstCaps for @pt in @session.
440 gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP] =
441 g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
442 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, request_pt_map),
443 NULL, NULL, gst_rdt_manager_marshal_BOXED__UINT_UINT, GST_TYPE_CAPS, 2,
444 G_TYPE_UINT, G_TYPE_UINT);
447 * GstRDTManager::clear-pt-map:
448 * @rtpbin: the object which received the signal
450 * Clear all previously cached pt-mapping obtained with
451 * GstRDTManager::request-pt-map.
453 gst_rdt_manager_signals[SIGNAL_CLEAR_PT_MAP] =
454 g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
455 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, clear_pt_map),
456 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
459 * GstRDTManager::on-bye-ssrc:
460 * @rtpbin: the object which received the signal
461 * @session: the session
464 * Notify of an SSRC that became inactive because of a BYE packet.
466 gst_rdt_manager_signals[SIGNAL_ON_BYE_SSRC] =
467 g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
468 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_bye_ssrc),
469 NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
470 G_TYPE_UINT, G_TYPE_UINT);
472 * GstRDTManager::on-bye-timeout:
473 * @rtpbin: the object which received the signal
474 * @session: the session
477 * Notify of an SSRC that has timed out because of BYE
479 gst_rdt_manager_signals[SIGNAL_ON_BYE_TIMEOUT] =
480 g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
481 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_bye_timeout),
482 NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
483 G_TYPE_UINT, G_TYPE_UINT);
485 * GstRDTManager::on-timeout:
486 * @rtpbin: the object which received the signal
487 * @session: the session
490 * Notify of an SSRC that has timed out
492 gst_rdt_manager_signals[SIGNAL_ON_TIMEOUT] =
493 g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
494 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_timeout),
495 NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
496 G_TYPE_UINT, G_TYPE_UINT);
499 * GstRDTManager::on-npt-stop:
500 * @rtpbin: the object which received the signal
501 * @session: the session
504 * Notify that SSRC sender has sent data up to the configured NPT stop time.
506 gst_rdt_manager_signals[SIGNAL_ON_NPT_STOP] =
507 g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
508 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_npt_stop),
509 NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
510 G_TYPE_UINT, G_TYPE_UINT);
513 gstelement_class->provide_clock =
514 GST_DEBUG_FUNCPTR (gst_rdt_manager_provide_clock);
515 gstelement_class->change_state =
516 GST_DEBUG_FUNCPTR (gst_rdt_manager_change_state);
517 gstelement_class->request_new_pad =
518 GST_DEBUG_FUNCPTR (gst_rdt_manager_request_new_pad);
519 gstelement_class->release_pad =
520 GST_DEBUG_FUNCPTR (gst_rdt_manager_release_pad);
523 gst_element_class_add_static_pad_template (gstelement_class,
524 &gst_rdt_manager_recv_rtp_sink_template);
525 gst_element_class_add_static_pad_template (gstelement_class,
526 &gst_rdt_manager_recv_rtcp_sink_template);
528 gst_element_class_add_static_pad_template (gstelement_class,
529 &gst_rdt_manager_recv_rtp_src_template);
530 gst_element_class_add_static_pad_template (gstelement_class,
531 &gst_rdt_manager_rtcp_src_template);
533 gst_element_class_set_static_metadata (gstelement_class, "RTP Decoder",
534 "Codec/Parser/Network",
535 "Accepts raw RTP and RTCP packets and sends them forward",
536 "Wim Taymans <wim.taymans@gmail.com>");
538 GST_DEBUG_CATEGORY_INIT (rdtmanager_debug, "rdtmanager", 0, "RTP decoder");
542 gst_rdt_manager_init (GstRDTManager * rdtmanager)
544 rdtmanager->provided_clock = gst_system_clock_obtain ();
545 rdtmanager->latency = DEFAULT_LATENCY_MS;
546 GST_OBJECT_FLAG_SET (rdtmanager, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
550 gst_rdt_manager_finalize (GObject * object)
552 GstRDTManager *rdtmanager;
554 rdtmanager = GST_RDT_MANAGER (object);
556 g_slist_foreach (rdtmanager->sessions, (GFunc) free_session, NULL);
557 g_slist_free (rdtmanager->sessions);
558 g_clear_object (&rdtmanager->provided_clock);
560 G_OBJECT_CLASS (parent_class)->finalize (object);
564 gst_rdt_manager_query_src (GstPad * pad, GstObject * parent, GstQuery * query)
566 GstRDTManager *rdtmanager;
569 rdtmanager = GST_RDT_MANAGER (parent);
571 switch (GST_QUERY_TYPE (query)) {
572 case GST_QUERY_LATENCY:
574 GstClockTime latency;
576 latency = rdtmanager->latency * GST_MSECOND;
578 /* we pretend to be live with a 3 second latency */
579 gst_query_set_latency (query, TRUE, latency, -1);
581 GST_DEBUG_OBJECT (rdtmanager, "reporting %" GST_TIME_FORMAT " of latency",
582 GST_TIME_ARGS (latency));
587 res = gst_pad_query_default (pad, parent, query);
594 gst_rdt_manager_src_activate_mode (GstPad * pad, GstObject * parent,
595 GstPadMode mode, gboolean active)
598 GstRDTManager *rdtmanager;
599 GstRDTManagerSession *session;
601 session = gst_pad_get_element_private (pad);
602 rdtmanager = session->dec;
605 case GST_PAD_MODE_PUSH:
607 /* allow data processing */
609 GST_DEBUG_OBJECT (rdtmanager, "Enabling pop on queue");
610 /* Mark as non flushing */
611 session->srcresult = GST_FLOW_OK;
612 gst_segment_init (&session->segment, GST_FORMAT_TIME);
613 session->last_popped_seqnum = -1;
614 session->last_out_time = -1;
615 session->next_seqnum = -1;
616 session->eos = FALSE;
617 JBUF_UNLOCK (session);
619 /* start pushing out buffers */
620 GST_DEBUG_OBJECT (rdtmanager, "Starting task on srcpad");
622 gst_pad_start_task (pad, (GstTaskFunction) gst_rdt_manager_loop,
625 /* make sure all data processing stops ASAP */
627 /* mark ourselves as flushing */
628 session->srcresult = GST_FLOW_FLUSHING;
629 GST_DEBUG_OBJECT (rdtmanager, "Disabling pop on queue");
630 /* this unblocks any waiting pops on the src pad task */
631 JBUF_SIGNAL (session);
632 /* unlock clock, we just unschedule, the entry will be released by
633 * the locking streaming thread. */
634 if (session->clock_id)
635 gst_clock_id_unschedule (session->clock_id);
636 JBUF_UNLOCK (session);
638 /* NOTE this will hardlock if the state change is called from the src pad
639 * task thread because we will _join() the thread. */
640 GST_DEBUG_OBJECT (rdtmanager, "Stopping task on srcpad");
641 result = gst_pad_stop_task (pad);
652 gst_rdt_manager_handle_data_packet (GstRDTManagerSession * session,
653 GstClockTime timestamp, GstRDTPacket * packet)
655 GstRDTManager *rdtmanager;
661 rdtmanager = session->dec;
666 GST_DEBUG_OBJECT (rdtmanager,
667 "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
668 GST_TIME_ARGS (timestamp));
670 buffer = gst_rdt_packet_to_buffer (packet);
672 JBUF_LOCK_CHECK (session, out_flushing);
674 /* insert the packet into the queue now, FIXME, use seqnum */
675 if (!rdt_jitter_buffer_insert (session->jbuf, buffer, timestamp,
676 session->clock_rate, &tail))
679 /* signal addition of new buffer when the _loop is waiting. */
680 if (session->waiting)
681 JBUF_SIGNAL (session);
684 JBUF_UNLOCK (session);
691 res = session->srcresult;
692 GST_DEBUG_OBJECT (rdtmanager, "flushing %s", gst_flow_get_name (res));
693 gst_buffer_unref (buffer);
698 GST_WARNING_OBJECT (rdtmanager, "Duplicate packet #%d detected, dropping",
700 session->num_duplicates++;
701 gst_buffer_unref (buffer);
707 gst_rdt_manager_parse_caps (GstRDTManager * rdtmanager,
708 GstRDTManagerSession * session, GstCaps * caps)
710 GstStructure *caps_struct;
713 /* first parse the caps */
714 caps_struct = gst_caps_get_structure (caps, 0);
716 GST_DEBUG_OBJECT (rdtmanager, "got caps");
718 /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
719 * measure the amount of data in the buffer */
720 if (!gst_structure_get_int (caps_struct, "clock-rate", &session->clock_rate))
721 session->clock_rate = 1000;
723 if (session->clock_rate <= 0)
726 GST_DEBUG_OBJECT (rdtmanager, "got clock-rate %d", session->clock_rate);
728 /* gah, clock-base is uint. If we don't have a base, we will use the first
729 * buffer timestamp as the base time. This will screw up sync but it's better
731 if (gst_structure_get_uint (caps_struct, "clock-base", &val))
732 session->clock_base = val;
734 session->clock_base = -1;
736 GST_DEBUG_OBJECT (rdtmanager, "got clock-base %" G_GINT64_FORMAT,
737 session->clock_base);
739 /* first expected seqnum */
740 if (gst_structure_get_uint (caps_struct, "seqnum-base", &val))
741 session->next_seqnum = val;
743 session->next_seqnum = -1;
745 GST_DEBUG_OBJECT (rdtmanager, "got seqnum-base %d", session->next_seqnum);
752 GST_DEBUG_OBJECT (rdtmanager, "Invalid clock-rate %d", session->clock_rate);
758 gst_rdt_manager_event_rdt (GstPad * pad, GstObject * parent, GstEvent * event)
760 GstRDTManager *rdtmanager;
761 GstRDTManagerSession *session;
764 rdtmanager = GST_RDT_MANAGER (parent);
766 session = gst_pad_get_element_private (pad);
768 switch (GST_EVENT_TYPE (event)) {
773 gst_event_parse_caps (event, &caps);
774 res = gst_rdt_manager_parse_caps (rdtmanager, session, caps);
775 gst_event_unref (event);
779 res = gst_pad_event_default (pad, parent, event);
786 gst_rdt_manager_chain_rdt (GstPad * pad, GstObject * parent, GstBuffer * buffer)
789 GstRDTManager *rdtmanager;
790 GstRDTManagerSession *session;
791 GstClockTime timestamp;
797 rdtmanager = GST_RDT_MANAGER (parent);
799 GST_DEBUG_OBJECT (rdtmanager, "got RDT packet");
804 GST_DEBUG_OBJECT (rdtmanager, "SSRC %08x, PT %d", ssrc, pt);
807 session = gst_pad_get_element_private (pad);
809 /* see if we have the pad */
810 if (!session->active) {
811 activate_session (rdtmanager, session, ssrc, pt);
812 session->active = TRUE;
815 if (GST_BUFFER_IS_DISCONT (buffer)) {
816 GST_DEBUG_OBJECT (rdtmanager, "received discont");
817 session->discont = TRUE;
822 /* take the timestamp of the buffer. This is the time when the packet was
823 * received and is used to calculate jitter and clock skew. We will adjust
824 * this timestamp with the smoothed value after processing it in the
826 timestamp = GST_BUFFER_TIMESTAMP (buffer);
827 /* bring to running time */
828 timestamp = gst_segment_to_running_time (&session->segment, GST_FORMAT_TIME,
831 more = gst_rdt_buffer_get_first_packet (buffer, &packet);
835 type = gst_rdt_packet_get_type (&packet);
836 GST_DEBUG_OBJECT (rdtmanager, "Have packet of type %04x", type);
838 if (GST_RDT_IS_DATA_TYPE (type)) {
839 GST_DEBUG_OBJECT (rdtmanager, "We have a data packet");
840 res = gst_rdt_manager_handle_data_packet (session, timestamp, &packet);
844 GST_DEBUG_OBJECT (rdtmanager, "Ignoring packet");
848 if (res != GST_FLOW_OK)
851 more = gst_rdt_packet_move_to_next (&packet);
854 gst_buffer_unref (buffer);
859 /* push packets from the queue to the downstream demuxer */
861 gst_rdt_manager_loop (GstPad * pad)
863 GstRDTManager *rdtmanager;
864 GstRDTManagerSession *session;
866 GstFlowReturn result;
868 rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
870 session = gst_pad_get_element_private (pad);
872 JBUF_LOCK_CHECK (session, flushing);
873 GST_DEBUG_OBJECT (rdtmanager, "Peeking item");
875 /* always wait if we are blocked */
876 if (!session->blocked) {
877 /* if we have a packet, we can exit the loop and grab it */
878 if (rdt_jitter_buffer_num_packets (session->jbuf) > 0)
880 /* no packets but we are EOS, do eos logic */
884 /* underrun, wait for packets or flushing now */
885 session->waiting = TRUE;
886 JBUF_WAIT_CHECK (session, flushing);
887 session->waiting = FALSE;
890 buffer = rdt_jitter_buffer_pop (session->jbuf);
892 GST_DEBUG_OBJECT (rdtmanager, "Got item %p", buffer);
894 if (session->discont) {
895 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
896 session->discont = FALSE;
899 JBUF_UNLOCK (session);
901 result = gst_pad_push (session->recv_rtp_src, buffer);
902 if (result != GST_FLOW_OK)
910 GST_DEBUG_OBJECT (rdtmanager, "we are flushing");
911 gst_pad_pause_task (session->recv_rtp_src);
912 JBUF_UNLOCK (session);
917 /* store result, we are flushing now */
918 GST_DEBUG_OBJECT (rdtmanager, "We are EOS, pushing EOS downstream");
919 session->srcresult = GST_FLOW_EOS;
920 gst_pad_pause_task (session->recv_rtp_src);
921 gst_pad_push_event (session->recv_rtp_src, gst_event_new_eos ());
922 JBUF_UNLOCK (session);
927 GST_DEBUG_OBJECT (rdtmanager, "pausing task, reason %s",
928 gst_flow_get_name (result));
932 session->srcresult = result;
933 /* we don't post errors or anything because upstream will do that for us
934 * when we pass the return value upstream. */
935 gst_pad_pause_task (session->recv_rtp_src);
936 JBUF_UNLOCK (session);
942 gst_rdt_manager_chain_rtcp (GstPad * pad, GstObject * parent,
949 GstRTCPPacket packet;
953 src = GST_RDT_MANAGER (parent);
955 GST_DEBUG_OBJECT (src, "got rtcp packet");
958 valid = gst_rtcp_buffer_validate (buffer);
962 /* position on first packet */
963 more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
965 switch (gst_rtcp_packet_get_type (&packet)) {
966 case GST_RTCP_TYPE_SR:
968 guint32 ssrc, rtptime, packet_count, octet_count;
972 gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime,
973 &packet_count, &octet_count);
975 GST_DEBUG_OBJECT (src,
976 "got SR packet: SSRC %08x, NTP %" G_GUINT64_FORMAT
977 ", RTP %u, PC %u, OC %u", ssrc, ntptime, rtptime, packet_count,
980 count = gst_rtcp_packet_get_rb_count (&packet);
981 for (i = 0; i < count; i++) {
982 guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
986 gst_rtcp_packet_get_rb (&packet, i, &ssrc, &fractionlost,
987 &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
989 GST_DEBUG_OBJECT (src, "got RB packet %d: SSRC %08x, FL %u"
990 ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", ssrc, fractionlost,
991 packetslost, exthighestseq, jitter, lsr, dlsr);
995 case GST_RTCP_TYPE_RR:
1000 ssrc = gst_rtcp_packet_rr_get_ssrc (&packet);
1002 GST_DEBUG_OBJECT (src, "got RR packet: SSRC %08x", ssrc);
1004 count = gst_rtcp_packet_get_rb_count (&packet);
1005 for (i = 0; i < count; i++) {
1006 guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1007 guint8 fractionlost;
1010 gst_rtcp_packet_get_rb (&packet, i, &ssrc, &fractionlost,
1011 &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1013 GST_DEBUG_OBJECT (src, "got RB packet %d: SSRC %08x, FL %u"
1014 ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", ssrc, fractionlost,
1015 packetslost, exthighestseq, jitter, lsr, dlsr);
1019 case GST_RTCP_TYPE_SDES:
1022 gboolean more_chunks, more_items;
1024 chunks = gst_rtcp_packet_sdes_get_chunk_count (&packet);
1025 GST_DEBUG_OBJECT (src, "got SDES packet with %d chunks", chunks);
1027 more_chunks = gst_rtcp_packet_sdes_first_chunk (&packet);
1029 while (more_chunks) {
1032 ssrc = gst_rtcp_packet_sdes_get_ssrc (&packet);
1034 GST_DEBUG_OBJECT (src, "chunk %d, SSRC %08x", i, ssrc);
1036 more_items = gst_rtcp_packet_sdes_first_item (&packet);
1038 while (more_items) {
1039 GstRTCPSDESType type;
1043 gst_rtcp_packet_sdes_get_item (&packet, &type, &len, &data);
1045 GST_DEBUG_OBJECT (src, "item %d, type %d, len %d, data %s", j,
1048 more_items = gst_rtcp_packet_sdes_next_item (&packet);
1051 more_chunks = gst_rtcp_packet_sdes_next_chunk (&packet);
1056 case GST_RTCP_TYPE_BYE:
1061 reason = gst_rtcp_packet_bye_get_reason (&packet);
1062 GST_DEBUG_OBJECT (src, "got BYE packet (reason: %s)",
1063 GST_STR_NULL (reason));
1066 count = gst_rtcp_packet_bye_get_ssrc_count (&packet);
1067 for (i = 0; i < count; i++) {
1071 ssrc = gst_rtcp_packet_bye_get_nth_ssrc (&packet, i);
1073 GST_DEBUG_OBJECT (src, "SSRC: %08x", ssrc);
1077 case GST_RTCP_TYPE_APP:
1078 GST_DEBUG_OBJECT (src, "got APP packet");
1081 GST_WARNING_OBJECT (src, "got unknown RTCP packet");
1084 more = gst_rtcp_packet_move_to_next (&packet);
1086 gst_buffer_unref (buffer);
1091 GST_WARNING_OBJECT (src, "got invalid RTCP packet");
1100 gst_rdt_manager_set_property (GObject * object, guint prop_id,
1101 const GValue * value, GParamSpec * pspec)
1105 src = GST_RDT_MANAGER (object);
1109 src->latency = g_value_get_uint (value);
1112 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1118 gst_rdt_manager_get_property (GObject * object, guint prop_id, GValue * value,
1123 src = GST_RDT_MANAGER (object);
1127 g_value_set_uint (value, src->latency);
1130 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1136 gst_rdt_manager_provide_clock (GstElement * element)
1138 GstRDTManager *rdtmanager;
1140 rdtmanager = GST_RDT_MANAGER (element);
1142 return GST_CLOCK_CAST (gst_object_ref (rdtmanager->provided_clock));
1145 static GstStateChangeReturn
1146 gst_rdt_manager_change_state (GstElement * element, GstStateChange transition)
1148 GstStateChangeReturn ret;
1150 switch (transition) {
1155 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1157 switch (transition) {
1158 case GST_STATE_CHANGE_READY_TO_PAUSED:
1159 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1160 /* we're NO_PREROLL when going to PAUSED */
1161 ret = GST_STATE_CHANGE_NO_PREROLL;
1170 /* Create a pad for receiving RTP for the session in @name
1173 create_recv_rtp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
1177 GstRDTManagerSession *session;
1179 /* first get the session number */
1180 if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
1183 GST_DEBUG_OBJECT (rdtmanager, "finding session %d", sessid);
1185 /* get or create session */
1186 session = find_session_by_id (rdtmanager, sessid);
1188 GST_DEBUG_OBJECT (rdtmanager, "creating session %d", sessid);
1189 /* create session now */
1190 session = create_session (rdtmanager, sessid);
1191 if (session == NULL)
1194 /* check if pad was requested */
1195 if (session->recv_rtp_sink != NULL)
1198 GST_DEBUG_OBJECT (rdtmanager, "getting RTP sink pad");
1200 session->recv_rtp_sink = gst_pad_new_from_template (templ, name);
1201 gst_pad_set_element_private (session->recv_rtp_sink, session);
1202 gst_pad_set_event_function (session->recv_rtp_sink,
1203 gst_rdt_manager_event_rdt);
1204 gst_pad_set_chain_function (session->recv_rtp_sink,
1205 gst_rdt_manager_chain_rdt);
1206 gst_pad_set_active (session->recv_rtp_sink, TRUE);
1207 gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_sink);
1209 return session->recv_rtp_sink;
1214 g_warning ("rdtmanager: invalid name given");
1219 /* create_session already warned */
1224 g_warning ("rdtmanager: recv_rtp pad already requested for session %d",
1230 /* Create a pad for receiving RTCP for the session in @name
1233 create_recv_rtcp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
1237 GstRDTManagerSession *session;
1239 /* first get the session number */
1240 if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
1243 GST_DEBUG_OBJECT (rdtmanager, "finding session %d", sessid);
1245 /* get the session, it must exist or we error */
1246 session = find_session_by_id (rdtmanager, sessid);
1250 /* check if pad was requested */
1251 if (session->recv_rtcp_sink != NULL)
1254 GST_DEBUG_OBJECT (rdtmanager, "getting RTCP sink pad");
1256 session->recv_rtcp_sink = gst_pad_new_from_template (templ, name);
1257 gst_pad_set_element_private (session->recv_rtp_sink, session);
1258 gst_pad_set_chain_function (session->recv_rtcp_sink,
1259 gst_rdt_manager_chain_rtcp);
1260 gst_pad_set_active (session->recv_rtcp_sink, TRUE);
1261 gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtcp_sink);
1263 return session->recv_rtcp_sink;
1268 g_warning ("rdtmanager: invalid name given");
1273 g_warning ("rdtmanager: no session with id %d", sessid);
1278 g_warning ("rdtmanager: recv_rtcp pad already requested for session %d",
1284 /* Create a pad for sending RTCP for the session in @name
1287 create_rtcp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
1291 GstRDTManagerSession *session;
1293 /* first get the session number */
1294 if (name == NULL || sscanf (name, "rtcp_src_%u", &sessid) != 1)
1297 /* get or create session */
1298 session = find_session_by_id (rdtmanager, sessid);
1302 /* check if pad was requested */
1303 if (session->rtcp_src != NULL)
1306 session->rtcp_src = gst_pad_new_from_template (templ, name);
1307 gst_pad_set_active (session->rtcp_src, TRUE);
1308 gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->rtcp_src);
1310 return session->rtcp_src;
1315 g_warning ("rdtmanager: invalid name given");
1320 g_warning ("rdtmanager: session with id %d does not exist", sessid);
1325 g_warning ("rdtmanager: rtcp_src pad already requested for session %d",
1334 gst_rdt_manager_request_new_pad (GstElement * element,
1335 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1337 GstRDTManager *rdtmanager;
1338 GstElementClass *klass;
1341 g_return_val_if_fail (templ != NULL, NULL);
1342 g_return_val_if_fail (GST_IS_RDT_MANAGER (element), NULL);
1344 rdtmanager = GST_RDT_MANAGER (element);
1345 klass = GST_ELEMENT_GET_CLASS (element);
1347 /* figure out the template */
1348 if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
1349 result = create_recv_rtp (rdtmanager, templ, name);
1350 } else if (templ == gst_element_class_get_pad_template (klass,
1351 "recv_rtcp_sink_%u")) {
1352 result = create_recv_rtcp (rdtmanager, templ, name);
1353 } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src_%u")) {
1354 result = create_rtcp (rdtmanager, templ, name);
1356 goto wrong_template;
1363 g_warning ("rdtmanager: this is not our template");
1369 gst_rdt_manager_release_pad (GstElement * element, GstPad * pad)