2 * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-rtpsession
22 * @see_also: rtpjitterbuffer, rtpbin, rtpptdemux, rtpssrcdemux
24 * The RTP session manager models participants with unique SSRC in an RTP
25 * session. This session can be used to send and receive RTP and RTCP packets.
26 * Based on what REQUEST pads are requested from the session manager, specific
27 * functionality can be activated.
29 * The session manager currently implements RFC 3550 including:
32 * <para>RTP packet validation based on consecutive sequence numbers.</para>
35 * <para>Maintainance of the SSRC participant database.</para>
38 * <para>Keeping per participant statistics based on received RTCP packets.</para>
41 * <para>Scheduling of RR/SR RTCP packets.</para>
44 * <para>Support for multiple sender SSRC.</para>
48 * The rtpsession will not demux packets based on SSRC or payload type, nor will
49 * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux,
50 * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
51 * perform these tasks. It is usually a good idea to use #GstRtpBin, which
52 * combines all these features in one element.
54 * To use #GstRtpSession as an RTP receiver, request a recv_rtp_sink pad, which will
55 * automatically create recv_rtp_src pad. Data received on the recv_rtp_sink pad
56 * will be processed in the session and after being validated forwarded on the
59 * To also use #GstRtpSession as an RTCP receiver, request a recv_rtcp_sink pad,
60 * which will automatically create a sync_src pad. Packets received on the RTCP
61 * pad will be used by the session manager to update the stats and database of
62 * the other participants. SR packets will be forwarded on the sync_src pad
63 * so that they can be used to perform inter-stream synchronisation when needed.
65 * If you want the session manager to generate and send RTCP packets, request
66 * the send_rtcp_src pad. Packet pushed on this pad contain SR/RR RTCP reports
67 * that should be sent to all participants in the session.
69 * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will
70 * automatically create a send_rtp_src pad. The session manager will
71 * forward the packets on the send_rtp_src pad after updating its internal state.
73 * The session manager needs the clock-rate of the payload types it is handling
74 * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
75 * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
79 * <title>Example pipelines</title>
81 * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
82 * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
83 * decoder and display. Note that the application/x-rtp caps on udpsrc should be
84 * configured based on some negotiation process such as RTSP for this pipeline
87 * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession name=session \
88 * .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \
89 * udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink
90 * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
91 * decoder and display. Receive RTCP packets from port 5001 and process them in
92 * the session manager.
93 * Note that the application/x-rtp caps on udpsrc should be
94 * configured based on some negotiation process such as RTSP for this pipeline
97 * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession .send_rtp_src ! udpsink port=5000
98 * ]| Send theora RTP packets through the session manager and out on UDP port
101 * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession name=session .send_rtp_src \
102 * ! udpsink port=5000 session.send_rtcp_src ! udpsink port=5001
103 * ]| Send theora RTP packets through the session manager and out on UDP port
104 * 5000. Send RTCP packets on port 5001. Note that this pipeline will not preroll
105 * correctly because the second udpsink will not preroll correctly (no RTCP
106 * packets are sent in the PAUSED state). Applications should manually set and
107 * keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
115 #include <gst/rtp/gstrtpbuffer.h>
117 #include <gst/glib-compat-private.h>
119 #include "gstrtpsession.h"
120 #include "rtpsession.h"
122 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
123 #define GST_CAT_DEFAULT gst_rtp_session_debug
126 gst_rtp_ntp_time_source_get_type (void)
128 static GType type = 0;
129 static const GEnumValue values[] = {
130 {GST_RTP_NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
131 {GST_RTP_NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
132 {GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME,
133 "Running time based on pipeline clock",
135 {GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
140 type = g_enum_register_static ("GstRtpNtpTimeSource", values);
146 static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
147 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
150 GST_STATIC_CAPS ("application/x-rtp")
153 static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
154 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
157 GST_STATIC_CAPS ("application/x-rtcp")
160 static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
164 GST_STATIC_CAPS ("application/x-rtp")
168 static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
169 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
172 GST_STATIC_CAPS ("application/x-rtp")
175 static GstStaticPadTemplate rtpsession_sync_src_template =
176 GST_STATIC_PAD_TEMPLATE ("sync_src",
179 GST_STATIC_CAPS ("application/x-rtcp")
182 static GstStaticPadTemplate rtpsession_send_rtp_src_template =
183 GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
186 GST_STATIC_CAPS ("application/x-rtp")
189 static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
190 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
193 GST_STATIC_CAPS ("application/x-rtcp")
196 /* signals and args */
199 SIGNAL_REQUEST_PT_MAP,
203 SIGNAL_ON_SSRC_COLLISION,
204 SIGNAL_ON_SSRC_VALIDATED,
205 SIGNAL_ON_SSRC_ACTIVE,
208 SIGNAL_ON_BYE_TIMEOUT,
210 SIGNAL_ON_SENDER_TIMEOUT,
214 #define DEFAULT_BANDWIDTH 0
215 #define DEFAULT_RTCP_FRACTION RTP_STATS_RTCP_FRACTION
216 #define DEFAULT_RTCP_RR_BANDWIDTH -1
217 #define DEFAULT_RTCP_RS_BANDWIDTH -1
218 #define DEFAULT_SDES NULL
219 #define DEFAULT_NUM_SOURCES 0
220 #define DEFAULT_NUM_ACTIVE_SOURCES 0
221 #define DEFAULT_USE_PIPELINE_CLOCK FALSE
222 #define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND)
223 #define DEFAULT_PROBATION RTP_DEFAULT_PROBATION
224 #define DEFAULT_RTP_PROFILE GST_RTP_PROFILE_AVP
225 #define DEFAULT_NTP_TIME_SOURCE GST_RTP_NTP_TIME_SOURCE_NTP
226 #define DEFAULT_RTCP_SYNC_SEND_TIME TRUE
233 PROP_RTCP_RR_BANDWIDTH,
234 PROP_RTCP_RS_BANDWIDTH,
237 PROP_NUM_ACTIVE_SOURCES,
238 PROP_INTERNAL_SESSION,
239 PROP_USE_PIPELINE_CLOCK,
240 PROP_RTCP_MIN_INTERVAL,
244 PROP_NTP_TIME_SOURCE,
245 PROP_RTCP_SYNC_SEND_TIME
248 #define GST_RTP_SESSION_GET_PRIVATE(obj) \
249 (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRtpSessionPrivate))
251 #define GST_RTP_SESSION_LOCK(sess) g_mutex_lock (&(sess)->priv->lock)
252 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->priv->lock)
254 #define GST_RTP_SESSION_WAIT(sess) g_cond_wait (&(sess)->priv->cond, &(sess)->priv->lock)
255 #define GST_RTP_SESSION_SIGNAL(sess) g_cond_signal (&(sess)->priv->cond)
257 struct _GstRtpSessionPrivate
265 /* thread for sending out RTCP */
267 gboolean stop_thread;
269 gboolean thread_stopped;
275 GstClockTime send_latency;
277 gboolean use_pipeline_clock;
278 GstRtpNtpTimeSource ntp_time_source;
279 gboolean rtcp_sync_send_time;
284 /* callbacks to handle actions from the session manager */
285 static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
286 RTPSource * src, GstBuffer * buffer, gpointer user_data);
287 static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
288 RTPSource * src, gpointer data, gpointer user_data);
289 static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
290 RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data);
291 static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
292 GstBuffer * buffer, gpointer user_data);
293 static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
295 static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
296 static void gst_rtp_session_request_key_unit (RTPSession * sess,
297 gboolean all_headers, gpointer user_data);
298 static GstClockTime gst_rtp_session_request_time (RTPSession * session,
300 static void gst_rtp_session_notify_nack (RTPSession * sess,
301 guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
302 static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
304 static RTPSessionCallbacks callbacks = {
305 gst_rtp_session_process_rtp,
306 gst_rtp_session_send_rtp,
307 gst_rtp_session_sync_rtcp,
308 gst_rtp_session_send_rtcp,
309 gst_rtp_session_clock_rate,
310 gst_rtp_session_reconsider,
311 gst_rtp_session_request_key_unit,
312 gst_rtp_session_request_time,
313 gst_rtp_session_notify_nack,
314 gst_rtp_session_reconfigure
317 /* GObject vmethods */
318 static void gst_rtp_session_finalize (GObject * object);
319 static void gst_rtp_session_set_property (GObject * object, guint prop_id,
320 const GValue * value, GParamSpec * pspec);
321 static void gst_rtp_session_get_property (GObject * object, guint prop_id,
322 GValue * value, GParamSpec * pspec);
324 /* GstElement vmethods */
325 static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
326 GstStateChange transition);
327 static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
328 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
329 static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
331 static gboolean gst_rtp_session_sink_setcaps (GstPad * pad,
332 GstRtpSession * rtpsession, GstCaps * caps);
333 static gboolean gst_rtp_session_setcaps_send_rtp (GstPad * pad,
334 GstRtpSession * rtpsession, GstCaps * caps);
336 static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession);
338 static GstStructure *gst_rtp_session_create_stats (GstRtpSession * rtpsession);
340 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
343 on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
345 g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
350 on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess)
352 GstPad *send_rtp_sink;
354 g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
357 GST_RTP_SESSION_LOCK (sess);
358 if ((send_rtp_sink = sess->send_rtp_sink))
359 gst_object_ref (send_rtp_sink);
360 GST_RTP_SESSION_UNLOCK (sess);
363 GstStructure *structure;
365 RTPSource *internal_src;
366 guint32 suggested_ssrc;
368 structure = gst_structure_new ("GstRTPCollision", "ssrc", G_TYPE_UINT,
369 (guint) src->ssrc, NULL);
371 /* if there is no source using the suggested ssrc, most probably because
372 * this ssrc has just collided, suggest upstream to use it */
373 suggested_ssrc = rtp_session_suggest_ssrc (session, NULL);
374 internal_src = rtp_session_get_source_by_ssrc (session, suggested_ssrc);
376 gst_structure_set (structure, "suggested-ssrc", G_TYPE_UINT,
377 (guint) suggested_ssrc, NULL);
379 g_object_unref (internal_src);
381 event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure);
382 gst_pad_push_event (send_rtp_sink, event);
383 gst_object_unref (send_rtp_sink);
388 on_ssrc_validated (RTPSession * session, RTPSource * src, GstRtpSession * sess)
390 g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
395 on_ssrc_active (RTPSession * session, RTPSource * src, GstRtpSession * sess)
397 g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
402 on_ssrc_sdes (RTPSession * session, RTPSource * src, GstRtpSession * sess)
407 /* convert the new SDES info into a message */
408 RTP_SESSION_LOCK (session);
409 g_object_get (src, "sdes", &s, NULL);
410 RTP_SESSION_UNLOCK (session);
412 m = gst_message_new_custom (GST_MESSAGE_ELEMENT, GST_OBJECT (sess), s);
413 gst_element_post_message (GST_ELEMENT_CAST (sess), m);
415 g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0,
420 on_bye_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
422 g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
427 on_bye_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
429 g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
434 on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
436 g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
441 on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
443 g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
447 #define gst_rtp_session_parent_class parent_class
448 G_DEFINE_TYPE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
451 gst_rtp_session_class_init (GstRtpSessionClass * klass)
453 GObjectClass *gobject_class;
454 GstElementClass *gstelement_class;
456 gobject_class = (GObjectClass *) klass;
457 gstelement_class = (GstElementClass *) klass;
459 g_type_class_add_private (klass, sizeof (GstRtpSessionPrivate));
461 gobject_class->finalize = gst_rtp_session_finalize;
462 gobject_class->set_property = gst_rtp_session_set_property;
463 gobject_class->get_property = gst_rtp_session_get_property;
466 * GstRtpSession::request-pt-map:
467 * @sess: the object which received the signal
470 * Request the payload type as #GstCaps for @pt.
472 gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
473 g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
474 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
475 NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 1, G_TYPE_UINT);
477 * GstRtpSession::clear-pt-map:
478 * @sess: the object which received the signal
480 * Clear the cached pt-maps requested with #GstRtpSession::request-pt-map.
482 gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
483 g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
484 G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
485 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
488 * GstRtpSession::on-new-ssrc:
489 * @sess: the object which received the signal
492 * Notify of a new SSRC that entered @session.
494 gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
495 g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
496 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
497 NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
499 * GstRtpSession::on-ssrc_collision:
500 * @sess: the object which received the signal
503 * Notify when we have an SSRC collision
505 gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
506 g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
507 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
508 on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
509 G_TYPE_NONE, 1, G_TYPE_UINT);
511 * GstRtpSession::on-ssrc_validated:
512 * @sess: the object which received the signal
515 * Notify of a new SSRC that became validated.
517 gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
518 g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
519 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
520 on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
521 G_TYPE_NONE, 1, G_TYPE_UINT);
523 * GstRtpSession::on-ssrc_active:
524 * @sess: the object which received the signal
527 * Notify of a SSRC that is active, i.e., sending RTCP.
529 gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
530 g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
531 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
532 on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
533 G_TYPE_NONE, 1, G_TYPE_UINT);
535 * GstRtpSession::on-ssrc-sdes:
536 * @session: the object which received the signal
539 * Notify that a new SDES was received for SSRC.
541 gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
542 g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
543 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_ssrc_sdes),
544 NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
547 * GstRtpSession::on-bye-ssrc:
548 * @sess: the object which received the signal
551 * Notify of an SSRC that became inactive because of a BYE packet.
553 gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
554 g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
555 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_ssrc),
556 NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
558 * GstRtpSession::on-bye-timeout:
559 * @sess: the object which received the signal
562 * Notify of an SSRC that has timed out because of BYE
564 gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
565 g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
566 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_timeout),
567 NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
569 * GstRtpSession::on-timeout:
570 * @sess: the object which received the signal
573 * Notify of an SSRC that has timed out
575 gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
576 g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
577 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
578 NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
580 * GstRtpSession::on-sender-timeout:
581 * @sess: the object which received the signal
584 * Notify of a sender SSRC that has timed out and became a receiver
586 gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
587 g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
588 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
589 on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT,
590 G_TYPE_NONE, 1, G_TYPE_UINT);
592 g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
593 g_param_spec_double ("bandwidth", "Bandwidth",
594 "The bandwidth of the session in bytes per second (0 for auto-discover)",
595 0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
596 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
598 g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
599 g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
600 "The RTCP bandwidth of the session in bytes per second "
601 "(or as a real fraction of the RTP bandwidth if < 1.0)",
602 0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
603 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
605 g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
606 g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
607 "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
608 -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
609 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
611 g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
612 g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
613 "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
614 -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
615 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
617 g_object_class_install_property (gobject_class, PROP_SDES,
618 g_param_spec_boxed ("sdes", "SDES",
619 "The SDES items of this session",
620 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
622 g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
623 g_param_spec_uint ("num-sources", "Num Sources",
624 "The number of sources in the session", 0, G_MAXUINT,
625 DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
627 g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
628 g_param_spec_uint ("num-active-sources", "Num Active Sources",
629 "The number of active sources in the session", 0, G_MAXUINT,
630 DEFAULT_NUM_ACTIVE_SOURCES,
631 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
633 g_object_class_install_property (gobject_class, PROP_INTERNAL_SESSION,
634 g_param_spec_object ("internal-session", "Internal Session",
635 "The internal RTPSession object", RTP_TYPE_SESSION,
636 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
638 g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
639 g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
640 "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
641 "(DEPRECATED: Use ntp-time-source property)",
642 DEFAULT_USE_PIPELINE_CLOCK,
643 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
645 g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
646 g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
647 "Minimum interval between Regular RTCP packet (in ns)",
648 0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
649 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
651 g_object_class_install_property (gobject_class, PROP_PROBATION,
652 g_param_spec_uint ("probation", "Number of probations",
653 "Consecutive packet sequence numbers to accept the source",
654 0, G_MAXUINT, DEFAULT_PROBATION,
655 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
658 * GstRtpSession::stats:
660 * Various session statistics. This property returns a GstStructure
661 * with name application/x-rtp-session-stats with the following fields:
663 * "rtx-count" G_TYPE_UINT The number of retransmission events
664 * received from downstream (in receiver mode)
665 * "rtx-drop-count" G_TYPE_UINT The number of retransmission events
666 * dropped (due to bandwidth constraints)
667 * "sent-nack-count" G_TYPE_UINT Number of NACKs sent
668 * "recv-nack-count" G_TYPE_UINT Number of NACKs received
672 g_object_class_install_property (gobject_class, PROP_STATS,
673 g_param_spec_boxed ("stats", "Statistics",
674 "Various statistics", GST_TYPE_STRUCTURE,
675 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
677 g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
678 g_param_spec_enum ("rtp-profile", "RTP Profile",
679 "RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
680 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
682 g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
683 g_param_spec_enum ("ntp-time-source", "NTP Time Source",
684 "NTP time source for RTCP packets",
685 gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
686 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
688 g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
689 g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
690 "Use send time or capture time for RTCP sync "
691 "(TRUE = send time, FALSE = capture time)",
692 DEFAULT_RTCP_SYNC_SEND_TIME,
693 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
695 gstelement_class->change_state =
696 GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
697 gstelement_class->request_new_pad =
698 GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
699 gstelement_class->release_pad =
700 GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
702 klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map);
705 gst_element_class_add_pad_template (gstelement_class,
706 gst_static_pad_template_get (&rtpsession_recv_rtp_sink_template));
707 gst_element_class_add_pad_template (gstelement_class,
708 gst_static_pad_template_get (&rtpsession_recv_rtcp_sink_template));
709 gst_element_class_add_pad_template (gstelement_class,
710 gst_static_pad_template_get (&rtpsession_send_rtp_sink_template));
713 gst_element_class_add_pad_template (gstelement_class,
714 gst_static_pad_template_get (&rtpsession_recv_rtp_src_template));
715 gst_element_class_add_pad_template (gstelement_class,
716 gst_static_pad_template_get (&rtpsession_sync_src_template));
717 gst_element_class_add_pad_template (gstelement_class,
718 gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
719 gst_element_class_add_pad_template (gstelement_class,
720 gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));
722 gst_element_class_set_static_metadata (gstelement_class, "RTP Session",
723 "Filter/Network/RTP",
724 "Implement an RTP session", "Wim Taymans <wim.taymans@gmail.com>");
726 GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
727 "rtpsession", 0, "RTP Session");
731 gst_rtp_session_init (GstRtpSession * rtpsession)
733 rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
734 g_mutex_init (&rtpsession->priv->lock);
735 g_cond_init (&rtpsession->priv->cond);
736 rtpsession->priv->sysclock = gst_system_clock_obtain ();
737 rtpsession->priv->session = rtp_session_new ();
738 rtpsession->priv->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
739 rtpsession->priv->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
741 /* configure callbacks */
742 rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
743 /* configure signals */
744 g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
745 (GCallback) on_new_ssrc, rtpsession);
746 g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
747 (GCallback) on_ssrc_collision, rtpsession);
748 g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
749 (GCallback) on_ssrc_validated, rtpsession);
750 g_signal_connect (rtpsession->priv->session, "on-ssrc-active",
751 (GCallback) on_ssrc_active, rtpsession);
752 g_signal_connect (rtpsession->priv->session, "on-ssrc-sdes",
753 (GCallback) on_ssrc_sdes, rtpsession);
754 g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
755 (GCallback) on_bye_ssrc, rtpsession);
756 g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
757 (GCallback) on_bye_timeout, rtpsession);
758 g_signal_connect (rtpsession->priv->session, "on-timeout",
759 (GCallback) on_timeout, rtpsession);
760 g_signal_connect (rtpsession->priv->session, "on-sender-timeout",
761 (GCallback) on_sender_timeout, rtpsession);
762 rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
763 (GDestroyNotify) gst_caps_unref);
765 gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
766 gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
768 rtpsession->priv->thread_stopped = TRUE;
770 rtpsession->priv->rtx_count = 0;
772 rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
776 gst_rtp_session_finalize (GObject * object)
778 GstRtpSession *rtpsession;
780 rtpsession = GST_RTP_SESSION (object);
782 g_hash_table_destroy (rtpsession->priv->ptmap);
783 g_mutex_clear (&rtpsession->priv->lock);
784 g_cond_clear (&rtpsession->priv->cond);
785 g_object_unref (rtpsession->priv->sysclock);
786 g_object_unref (rtpsession->priv->session);
788 G_OBJECT_CLASS (parent_class)->finalize (object);
792 gst_rtp_session_set_property (GObject * object, guint prop_id,
793 const GValue * value, GParamSpec * pspec)
795 GstRtpSession *rtpsession;
796 GstRtpSessionPrivate *priv;
798 rtpsession = GST_RTP_SESSION (object);
799 priv = rtpsession->priv;
803 g_object_set_property (G_OBJECT (priv->session), "bandwidth", value);
805 case PROP_RTCP_FRACTION:
806 g_object_set_property (G_OBJECT (priv->session), "rtcp-fraction", value);
808 case PROP_RTCP_RR_BANDWIDTH:
809 g_object_set_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
812 case PROP_RTCP_RS_BANDWIDTH:
813 g_object_set_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
817 rtp_session_set_sdes_struct (priv->session, g_value_get_boxed (value));
819 case PROP_USE_PIPELINE_CLOCK:
820 priv->use_pipeline_clock = g_value_get_boolean (value);
822 case PROP_RTCP_MIN_INTERVAL:
823 g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval",
827 g_object_set_property (G_OBJECT (priv->session), "probation", value);
829 case PROP_RTP_PROFILE:
830 g_object_set_property (G_OBJECT (priv->session), "rtp-profile", value);
832 case PROP_NTP_TIME_SOURCE:
833 priv->ntp_time_source = g_value_get_enum (value);
835 case PROP_RTCP_SYNC_SEND_TIME:
836 priv->rtcp_sync_send_time = g_value_get_boolean (value);
839 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
845 gst_rtp_session_get_property (GObject * object, guint prop_id,
846 GValue * value, GParamSpec * pspec)
848 GstRtpSession *rtpsession;
849 GstRtpSessionPrivate *priv;
851 rtpsession = GST_RTP_SESSION (object);
852 priv = rtpsession->priv;
856 g_object_get_property (G_OBJECT (priv->session), "bandwidth", value);
858 case PROP_RTCP_FRACTION:
859 g_object_get_property (G_OBJECT (priv->session), "rtcp-fraction", value);
861 case PROP_RTCP_RR_BANDWIDTH:
862 g_object_get_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
865 case PROP_RTCP_RS_BANDWIDTH:
866 g_object_get_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
870 g_value_take_boxed (value, rtp_session_get_sdes_struct (priv->session));
872 case PROP_NUM_SOURCES:
873 g_value_set_uint (value, rtp_session_get_num_sources (priv->session));
875 case PROP_NUM_ACTIVE_SOURCES:
876 g_value_set_uint (value,
877 rtp_session_get_num_active_sources (priv->session));
879 case PROP_INTERNAL_SESSION:
880 g_value_set_object (value, priv->session);
882 case PROP_USE_PIPELINE_CLOCK:
883 g_value_set_boolean (value, priv->use_pipeline_clock);
885 case PROP_RTCP_MIN_INTERVAL:
886 g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval",
890 g_object_get_property (G_OBJECT (priv->session), "probation", value);
893 g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession));
895 case PROP_RTP_PROFILE:
896 g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value);
898 case PROP_NTP_TIME_SOURCE:
899 g_value_set_enum (value, priv->ntp_time_source);
901 case PROP_RTCP_SYNC_SEND_TIME:
902 g_value_set_boolean (value, priv->rtcp_sync_send_time);
905 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
910 static GstStructure *
911 gst_rtp_session_create_stats (GstRtpSession * rtpsession)
915 g_object_get (rtpsession->priv->session, "stats", &s, NULL);
916 gst_structure_set (s, "rtx-count", G_TYPE_UINT, rtpsession->priv->rtx_count,
923 get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time,
928 GstClockTime base_time, rt, clock_time;
930 GST_OBJECT_LOCK (rtpsession);
931 if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
932 base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
933 gst_object_ref (clock);
934 GST_OBJECT_UNLOCK (rtpsession);
936 /* get current clock time and convert to running time */
937 clock_time = gst_clock_get_time (clock);
938 rt = clock_time - base_time;
940 if (rtpsession->priv->use_pipeline_clock) {
942 /* add constant to convert from 1970 based time to 1900 based time */
943 ntpns += (2208988800LL * GST_SECOND);
945 switch (rtpsession->priv->ntp_time_source) {
946 case GST_RTP_NTP_TIME_SOURCE_NTP:
947 case GST_RTP_NTP_TIME_SOURCE_UNIX:{
950 /* get current NTP time */
951 g_get_current_time (¤t);
952 ntpns = GST_TIMEVAL_TO_TIME (current);
954 /* add constant to convert from 1970 based time to 1900 based time */
955 if (rtpsession->priv->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
956 ntpns += (2208988800LL * GST_SECOND);
959 case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
962 case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
967 g_assert_not_reached ();
972 gst_object_unref (clock);
974 GST_OBJECT_UNLOCK (rtpsession);
985 rtcp_thread (GstRtpSession * rtpsession)
988 GstClockTime current_time;
989 GstClockTime next_timeout;
991 GstClockTime running_time;
995 GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
997 GST_RTP_SESSION_LOCK (rtpsession);
999 while (rtpsession->priv->wait_send) {
1000 GST_LOG_OBJECT (rtpsession, "waiting for getting started");
1001 GST_RTP_SESSION_WAIT (rtpsession);
1002 GST_LOG_OBJECT (rtpsession, "signaled...");
1005 sysclock = rtpsession->priv->sysclock;
1006 current_time = gst_clock_get_time (sysclock);
1008 session = rtpsession->priv->session;
1010 GST_DEBUG_OBJECT (rtpsession, "starting at %" GST_TIME_FORMAT,
1011 GST_TIME_ARGS (current_time));
1012 session->start_time = current_time;
1014 while (!rtpsession->priv->stop_thread) {
1017 /* get initial estimate */
1018 next_timeout = rtp_session_next_timeout (session, current_time);
1020 GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT,
1021 GST_TIME_ARGS (next_timeout));
1023 /* leave if no more timeouts, the session ended */
1024 if (next_timeout == GST_CLOCK_TIME_NONE)
1027 id = rtpsession->priv->id =
1028 gst_clock_new_single_shot_id (sysclock, next_timeout);
1029 GST_RTP_SESSION_UNLOCK (rtpsession);
1031 res = gst_clock_id_wait (id, NULL);
1033 GST_RTP_SESSION_LOCK (rtpsession);
1034 gst_clock_id_unref (id);
1035 rtpsession->priv->id = NULL;
1037 if (rtpsession->priv->stop_thread)
1040 /* update current time */
1041 current_time = gst_clock_get_time (sysclock);
1043 /* get current NTP time */
1044 get_current_times (rtpsession, &running_time, &ntpnstime);
1046 /* we get unlocked because we need to perform reconsideration, don't perform
1047 * the timeout but get a new reporting estimate. */
1048 GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT,
1049 res, GST_TIME_ARGS (current_time));
1051 /* perform actions, we ignore result. Release lock because it might push. */
1052 GST_RTP_SESSION_UNLOCK (rtpsession);
1053 rtp_session_on_timeout (session, current_time, ntpnstime, running_time);
1054 GST_RTP_SESSION_LOCK (rtpsession);
1056 /* mark the thread as stopped now */
1057 rtpsession->priv->thread_stopped = TRUE;
1058 GST_RTP_SESSION_UNLOCK (rtpsession);
1060 GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
1064 start_rtcp_thread (GstRtpSession * rtpsession)
1066 GError *error = NULL;
1069 GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
1071 GST_RTP_SESSION_LOCK (rtpsession);
1072 rtpsession->priv->stop_thread = FALSE;
1073 if (rtpsession->priv->thread_stopped) {
1074 /* if the thread stopped, and we still have a handle to the thread, join it
1075 * now. We can safely join with the lock held, the thread will not take it
1077 if (rtpsession->priv->thread)
1078 g_thread_join (rtpsession->priv->thread);
1079 /* only create a new thread if the old one was stopped. Otherwise we can
1080 * just reuse the currently running one. */
1081 rtpsession->priv->thread = g_thread_try_new ("rtpsession-rtcp-thread",
1082 (GThreadFunc) rtcp_thread, rtpsession, &error);
1083 rtpsession->priv->thread_stopped = FALSE;
1085 GST_RTP_SESSION_UNLOCK (rtpsession);
1087 if (error != NULL) {
1089 GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
1090 g_error_free (error);
1098 stop_rtcp_thread (GstRtpSession * rtpsession)
1100 GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
1102 GST_RTP_SESSION_LOCK (rtpsession);
1103 rtpsession->priv->stop_thread = TRUE;
1104 rtpsession->priv->wait_send = FALSE;
1105 GST_RTP_SESSION_SIGNAL (rtpsession);
1106 if (rtpsession->priv->id)
1107 gst_clock_id_unschedule (rtpsession->priv->id);
1108 GST_RTP_SESSION_UNLOCK (rtpsession);
1112 join_rtcp_thread (GstRtpSession * rtpsession)
1114 GST_RTP_SESSION_LOCK (rtpsession);
1115 /* don't try to join when we have no thread */
1116 if (rtpsession->priv->thread != NULL) {
1117 GST_DEBUG_OBJECT (rtpsession, "joining RTCP thread");
1118 GST_RTP_SESSION_UNLOCK (rtpsession);
1120 g_thread_join (rtpsession->priv->thread);
1122 GST_RTP_SESSION_LOCK (rtpsession);
1123 /* after the join, take the lock and clear the thread structure. The caller
1124 * is supposed to not concurrently call start and join. */
1125 rtpsession->priv->thread = NULL;
1127 GST_RTP_SESSION_UNLOCK (rtpsession);
1130 static GstStateChangeReturn
1131 gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
1133 GstStateChangeReturn res;
1134 GstRtpSession *rtpsession;
1136 rtpsession = GST_RTP_SESSION (element);
1138 switch (transition) {
1139 case GST_STATE_CHANGE_NULL_TO_READY:
1141 case GST_STATE_CHANGE_READY_TO_PAUSED:
1142 GST_RTP_SESSION_LOCK (rtpsession);
1143 if (rtpsession->send_rtp_src)
1144 rtpsession->priv->wait_send = TRUE;
1145 GST_RTP_SESSION_UNLOCK (rtpsession);
1147 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1149 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1150 case GST_STATE_CHANGE_PAUSED_TO_READY:
1151 /* no need to join yet, we might want to continue later. Also, the
1152 * dataflow could block downstream so that a join could just block
1154 stop_rtcp_thread (rtpsession);
1160 res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1162 switch (transition) {
1163 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1164 if (!start_rtcp_thread (rtpsession))
1167 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1169 case GST_STATE_CHANGE_PAUSED_TO_READY:
1170 /* downstream is now releasing the dataflow and we can join. */
1171 join_rtcp_thread (rtpsession);
1173 case GST_STATE_CHANGE_READY_TO_NULL:
1183 return GST_STATE_CHANGE_FAILURE;
1188 return_true (gpointer key, gpointer value, gpointer user_data)
1194 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
1196 g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
1199 /* called when the session manager has an RTP packet or a list of packets
1200 * ready for further processing */
1201 static GstFlowReturn
1202 gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
1203 GstBuffer * buffer, gpointer user_data)
1205 GstFlowReturn result;
1206 GstRtpSession *rtpsession;
1209 rtpsession = GST_RTP_SESSION (user_data);
1211 GST_RTP_SESSION_LOCK (rtpsession);
1212 if ((rtp_src = rtpsession->recv_rtp_src))
1213 gst_object_ref (rtp_src);
1214 GST_RTP_SESSION_UNLOCK (rtpsession);
1217 GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
1218 result = gst_pad_push (rtp_src, buffer);
1219 gst_object_unref (rtp_src);
1221 GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
1222 gst_buffer_unref (buffer);
1223 result = GST_FLOW_OK;
1228 /* called when the session manager has an RTP packet ready for further
1230 static GstFlowReturn
1231 gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
1232 gpointer data, gpointer user_data)
1234 GstFlowReturn result;
1235 GstRtpSession *rtpsession;
1238 rtpsession = GST_RTP_SESSION (user_data);
1240 GST_RTP_SESSION_LOCK (rtpsession);
1241 if ((rtp_src = rtpsession->send_rtp_src))
1242 gst_object_ref (rtp_src);
1243 if (rtpsession->priv->wait_send) {
1244 GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
1245 rtpsession->priv->wait_send = FALSE;
1246 GST_RTP_SESSION_SIGNAL (rtpsession);
1248 GST_RTP_SESSION_UNLOCK (rtpsession);
1251 if (GST_IS_BUFFER (data)) {
1252 GST_LOG_OBJECT (rtpsession, "sending RTP packet");
1253 result = gst_pad_push (rtp_src, GST_BUFFER_CAST (data));
1255 GST_LOG_OBJECT (rtpsession, "sending RTP list");
1256 result = gst_pad_push_list (rtp_src, GST_BUFFER_LIST_CAST (data));
1258 gst_object_unref (rtp_src);
1260 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1261 result = GST_FLOW_OK;
1267 do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
1273 gboolean have_group_id;
1277 g_strdup_printf ("%08x%08x%08x%08x", g_random_int (), g_random_int (),
1278 g_random_int (), g_random_int ());
1280 GST_RTP_SESSION_LOCK (rtpsession);
1281 if (rtpsession->recv_rtp_sink) {
1283 gst_pad_get_sticky_event (rtpsession->recv_rtp_sink,
1284 GST_EVENT_STREAM_START, 0);
1286 if (gst_event_parse_group_id (event, &group_id))
1287 have_group_id = TRUE;
1289 have_group_id = FALSE;
1290 gst_event_unref (event);
1292 have_group_id = TRUE;
1293 group_id = gst_util_group_id_next ();
1296 have_group_id = TRUE;
1297 group_id = gst_util_group_id_next ();
1299 GST_RTP_SESSION_UNLOCK (rtpsession);
1301 event = gst_event_new_stream_start (stream_id);
1303 gst_event_set_group_id (event, group_id);
1304 gst_pad_push_event (srcpad, event);
1307 caps = gst_caps_new_empty_simple ("application/x-rtcp");
1308 gst_pad_set_caps (srcpad, caps);
1309 gst_caps_unref (caps);
1311 gst_segment_init (&seg, GST_FORMAT_TIME);
1312 event = gst_event_new_segment (&seg);
1313 gst_pad_push_event (srcpad, event);
1316 /* called when the session manager has an RTCP packet ready for further
1317 * sending. The eos flag is set when an EOS event should be sent downstream as
1319 static GstFlowReturn
1320 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
1321 GstBuffer * buffer, gboolean eos, gpointer user_data)
1323 GstFlowReturn result;
1324 GstRtpSession *rtpsession;
1327 rtpsession = GST_RTP_SESSION (user_data);
1329 GST_RTP_SESSION_LOCK (rtpsession);
1330 if (rtpsession->priv->stop_thread)
1333 if ((rtcp_src = rtpsession->send_rtcp_src)) {
1334 gst_object_ref (rtcp_src);
1335 GST_RTP_SESSION_UNLOCK (rtpsession);
1337 /* set rtcp caps on output pad */
1338 if (!gst_pad_has_current_caps (rtcp_src))
1339 do_rtcp_events (rtpsession, rtcp_src);
1341 GST_LOG_OBJECT (rtpsession, "sending RTCP");
1342 result = gst_pad_push (rtcp_src, buffer);
1344 /* we have to send EOS after this packet */
1346 GST_LOG_OBJECT (rtpsession, "sending EOS");
1347 gst_pad_push_event (rtcp_src, gst_event_new_eos ());
1349 gst_object_unref (rtcp_src);
1351 GST_RTP_SESSION_UNLOCK (rtpsession);
1353 GST_DEBUG_OBJECT (rtpsession, "not sending RTCP, no output pad");
1354 gst_buffer_unref (buffer);
1355 result = GST_FLOW_OK;
1362 GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1363 gst_buffer_unref (buffer);
1364 GST_RTP_SESSION_UNLOCK (rtpsession);
1369 /* called when the session manager has an SR RTCP packet ready for handling
1370 * inter stream synchronisation */
1371 static GstFlowReturn
1372 gst_rtp_session_sync_rtcp (RTPSession * sess,
1373 GstBuffer * buffer, gpointer user_data)
1375 GstFlowReturn result;
1376 GstRtpSession *rtpsession;
1379 rtpsession = GST_RTP_SESSION (user_data);
1381 GST_RTP_SESSION_LOCK (rtpsession);
1382 if (rtpsession->priv->stop_thread)
1385 if ((sync_src = rtpsession->sync_src)) {
1386 gst_object_ref (sync_src);
1387 GST_RTP_SESSION_UNLOCK (rtpsession);
1389 /* set rtcp caps on output pad, this happens
1390 * when we receive RTCP muxed with RTP according
1391 * to RFC5761. Otherwise we would have forwarded
1392 * the events from the recv_rtcp_sink pad already
1394 if (!gst_pad_has_current_caps (sync_src))
1395 do_rtcp_events (rtpsession, sync_src);
1397 GST_LOG_OBJECT (rtpsession, "sending Sync RTCP");
1398 result = gst_pad_push (sync_src, buffer);
1399 gst_object_unref (sync_src);
1401 GST_RTP_SESSION_UNLOCK (rtpsession);
1403 GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
1404 gst_buffer_unref (buffer);
1405 result = GST_FLOW_OK;
1412 GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1413 gst_buffer_unref (buffer);
1414 GST_RTP_SESSION_UNLOCK (rtpsession);
1420 gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
1422 GstRtpSessionPrivate *priv;
1423 const GstStructure *s;
1426 priv = rtpsession->priv;
1428 GST_DEBUG_OBJECT (rtpsession, "parsing caps");
1430 s = gst_caps_get_structure (caps, 0);
1431 if (!gst_structure_get_int (s, "payload", &payload))
1434 if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
1437 g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
1438 gst_caps_ref (caps));
1442 gst_rtp_session_get_caps_for_pt (GstRtpSession * rtpsession, guint payload)
1444 GstCaps *caps = NULL;
1445 GValue args[2] = { {0}, {0} };
1448 GST_RTP_SESSION_LOCK (rtpsession);
1449 caps = g_hash_table_lookup (rtpsession->priv->ptmap,
1450 GINT_TO_POINTER (payload));
1452 gst_caps_ref (caps);
1456 /* not found in the cache, try to get it with a signal */
1457 g_value_init (&args[0], GST_TYPE_ELEMENT);
1458 g_value_set_object (&args[0], rtpsession);
1459 g_value_init (&args[1], G_TYPE_UINT);
1460 g_value_set_uint (&args[1], payload);
1462 g_value_init (&ret, GST_TYPE_CAPS);
1463 g_value_set_boxed (&ret, NULL);
1465 GST_RTP_SESSION_UNLOCK (rtpsession);
1467 g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
1470 GST_RTP_SESSION_LOCK (rtpsession);
1472 g_value_unset (&args[0]);
1473 g_value_unset (&args[1]);
1474 caps = (GstCaps *) g_value_dup_boxed (&ret);
1475 g_value_unset (&ret);
1479 gst_rtp_session_cache_caps (rtpsession, caps);
1482 GST_RTP_SESSION_UNLOCK (rtpsession);
1488 GST_DEBUG_OBJECT (rtpsession, "could not get caps");
1493 /* called when the session manager needs the clock rate */
1495 gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
1499 GstRtpSession *rtpsession;
1501 const GstStructure *s;
1503 rtpsession = GST_RTP_SESSION_CAST (user_data);
1505 caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1510 s = gst_caps_get_structure (caps, 0);
1511 if (!gst_structure_get_int (s, "clock-rate", &result))
1514 gst_caps_unref (caps);
1516 GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
1525 gst_caps_unref (caps);
1526 GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
1531 /* called when the session manager asks us to reconsider the timeout */
1533 gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
1535 GstRtpSession *rtpsession;
1537 rtpsession = GST_RTP_SESSION_CAST (user_data);
1539 GST_RTP_SESSION_LOCK (rtpsession);
1540 GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration");
1541 if (rtpsession->priv->id)
1542 gst_clock_id_unschedule (rtpsession->priv->id);
1543 GST_RTP_SESSION_UNLOCK (rtpsession);
1547 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent,
1550 GstRtpSession *rtpsession;
1551 gboolean ret = FALSE;
1553 rtpsession = GST_RTP_SESSION (parent);
1555 GST_DEBUG_OBJECT (rtpsession, "received event %s",
1556 GST_EVENT_TYPE_NAME (event));
1558 switch (GST_EVENT_TYPE (event)) {
1559 case GST_EVENT_CAPS:
1564 gst_event_parse_caps (event, &caps);
1565 gst_rtp_session_sink_setcaps (pad, rtpsession, caps);
1566 ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1569 case GST_EVENT_FLUSH_STOP:
1570 gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
1571 ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1573 case GST_EVENT_SEGMENT:
1575 GstSegment *segment, in_segment;
1577 segment = &rtpsession->recv_rtp_seg;
1579 /* the newsegment event is needed to convert the RTP timestamp to
1580 * running_time, which is needed to generate a mapping from RTP to NTP
1581 * timestamps in SR reports */
1582 gst_event_copy_segment (event, &in_segment);
1583 GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
1586 /* accept upstream */
1587 gst_segment_copy_into (&in_segment, segment);
1589 /* push event forward */
1590 ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1598 gst_pad_push_event (rtpsession->recv_rtp_src, gst_event_ref (event));
1600 GST_RTP_SESSION_LOCK (rtpsession);
1601 if ((rtcp_src = rtpsession->send_rtcp_src))
1602 gst_object_ref (rtcp_src);
1603 GST_RTP_SESSION_UNLOCK (rtpsession);
1606 ret = gst_pad_push_event (rtcp_src, event);
1607 gst_object_unref (rtcp_src);
1609 gst_event_unref (event);
1615 ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1624 gst_rtp_session_request_remote_key_unit (GstRtpSession * rtpsession,
1625 guint32 ssrc, guint payload, gboolean all_headers, gint count)
1629 caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1632 const GstStructure *s = gst_caps_get_structure (caps, 0);
1636 pli = gst_structure_has_field (s, "rtcp-fb-nack-pli");
1637 fir = gst_structure_has_field (s, "rtcp-fb-ccm-fir") && all_headers;
1639 /* Google Talk uses FIR for repair, so send it even if we just want a
1642 gst_structure_has_field (s, "rtcp-fb-x-gstreamer-fir-as-repair"))
1645 gst_caps_unref (caps);
1648 return rtp_session_request_key_unit (rtpsession->priv->session, ssrc,
1656 gst_rtp_session_event_recv_rtp_src (GstPad * pad, GstObject * parent,
1659 GstRtpSession *rtpsession;
1660 gboolean forward = TRUE;
1661 gboolean ret = TRUE;
1662 const GstStructure *s;
1666 rtpsession = GST_RTP_SESSION (parent);
1668 switch (GST_EVENT_TYPE (event)) {
1669 case GST_EVENT_CUSTOM_UPSTREAM:
1670 s = gst_event_get_structure (event);
1671 if (gst_structure_has_name (s, "GstForceKeyUnit") &&
1672 gst_structure_get_uint (s, "ssrc", &ssrc) &&
1673 gst_structure_get_uint (s, "payload", &pt)) {
1674 gboolean all_headers = FALSE;
1677 gst_structure_get_boolean (s, "all-headers", &all_headers);
1678 if (gst_structure_get_int (s, "count", &count) && count < 0)
1679 count += G_MAXINT; /* Make sure count is positive if present */
1680 if (gst_rtp_session_request_remote_key_unit (rtpsession, ssrc, pt,
1681 all_headers, count))
1683 } else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
1684 GstClockTime running_time;
1685 guint seqnum, delay, deadline, max_delay, avg_rtt;
1687 GST_RTP_SESSION_LOCK (rtpsession);
1688 rtpsession->priv->rtx_count++;
1689 GST_RTP_SESSION_UNLOCK (rtpsession);
1691 if (!gst_structure_get_clock_time (s, "running-time", &running_time))
1693 if (!gst_structure_get_uint (s, "ssrc", &ssrc))
1695 if (!gst_structure_get_uint (s, "seqnum", &seqnum))
1697 if (!gst_structure_get_uint (s, "delay", &delay))
1699 if (!gst_structure_get_uint (s, "deadline", &deadline))
1701 if (!gst_structure_get_uint (s, "avg-rtt", &avg_rtt))
1704 /* remaining time to receive the packet */
1705 max_delay = deadline;
1706 if (max_delay > delay)
1709 if (max_delay > avg_rtt)
1710 max_delay -= avg_rtt;
1714 if (rtp_session_request_nack (rtpsession->priv->session, ssrc, seqnum,
1715 max_delay * GST_MSECOND))
1724 GstPad *recv_rtp_sink;
1726 GST_RTP_SESSION_LOCK (rtpsession);
1727 if ((recv_rtp_sink = rtpsession->recv_rtp_sink))
1728 gst_object_ref (recv_rtp_sink);
1729 GST_RTP_SESSION_UNLOCK (rtpsession);
1731 if (recv_rtp_sink) {
1732 ret = gst_pad_push_event (recv_rtp_sink, event);
1733 gst_object_unref (recv_rtp_sink);
1735 gst_event_unref (event);
1737 gst_event_unref (event);
1744 static GstIterator *
1745 gst_rtp_session_iterate_internal_links (GstPad * pad, GstObject * parent)
1747 GstRtpSession *rtpsession;
1748 GstPad *otherpad = NULL;
1749 GstIterator *it = NULL;
1751 rtpsession = GST_RTP_SESSION (parent);
1753 GST_RTP_SESSION_LOCK (rtpsession);
1754 if (pad == rtpsession->recv_rtp_src) {
1755 otherpad = gst_object_ref (rtpsession->recv_rtp_sink);
1756 } else if (pad == rtpsession->recv_rtp_sink) {
1757 otherpad = gst_object_ref (rtpsession->recv_rtp_src);
1758 } else if (pad == rtpsession->send_rtp_src) {
1759 otherpad = gst_object_ref (rtpsession->send_rtp_sink);
1760 } else if (pad == rtpsession->send_rtp_sink) {
1761 otherpad = gst_object_ref (rtpsession->send_rtp_src);
1763 GST_RTP_SESSION_UNLOCK (rtpsession);
1766 GValue val = { 0, };
1768 g_value_init (&val, GST_TYPE_PAD);
1769 g_value_set_object (&val, otherpad);
1770 it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1771 g_value_unset (&val);
1772 gst_object_unref (otherpad);
1774 it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1781 gst_rtp_session_sink_setcaps (GstPad * pad, GstRtpSession * rtpsession,
1784 GST_RTP_SESSION_LOCK (rtpsession);
1785 gst_rtp_session_cache_caps (rtpsession, caps);
1786 GST_RTP_SESSION_UNLOCK (rtpsession);
1791 /* receive a packet from a sender, send it to the RTP session manager and
1792 * forward the packet on the rtp_src pad
1794 static GstFlowReturn
1795 gst_rtp_session_chain_recv_rtp (GstPad * pad, GstObject * parent,
1798 GstRtpSession *rtpsession;
1799 GstRtpSessionPrivate *priv;
1801 GstClockTime current_time, running_time;
1802 GstClockTime timestamp;
1805 rtpsession = GST_RTP_SESSION (parent);
1806 priv = rtpsession->priv;
1808 GST_LOG_OBJECT (rtpsession, "received RTP packet");
1810 GST_RTP_SESSION_LOCK (rtpsession);
1811 if (rtpsession->priv->wait_send) {
1812 GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
1813 rtpsession->priv->wait_send = FALSE;
1814 GST_RTP_SESSION_SIGNAL (rtpsession);
1816 GST_RTP_SESSION_UNLOCK (rtpsession);
1818 /* get NTP time when this packet was captured, this depends on the timestamp. */
1819 timestamp = GST_BUFFER_PTS (buffer);
1820 if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1821 /* convert to running time using the segment values */
1823 gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
1825 ntpnstime = GST_CLOCK_TIME_NONE;
1827 get_current_times (rtpsession, &running_time, &ntpnstime);
1829 current_time = gst_clock_get_time (priv->sysclock);
1831 ret = rtp_session_process_rtp (priv->session, buffer, current_time,
1832 running_time, ntpnstime);
1833 if (ret != GST_FLOW_OK)
1843 GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1844 gst_flow_get_name (ret));
1850 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
1853 GstRtpSession *rtpsession;
1854 gboolean ret = FALSE;
1856 rtpsession = GST_RTP_SESSION (parent);
1858 GST_DEBUG_OBJECT (rtpsession, "received event %s",
1859 GST_EVENT_TYPE_NAME (event));
1861 switch (GST_EVENT_TYPE (event)) {
1862 case GST_EVENT_SEGMENT:
1863 /* Make sure that the sync_src pad has caps before the segment event.
1864 * Otherwise we might get a segment event before caps from the receive
1865 * RTCP pad, and then later when receiving RTCP packets will set caps.
1866 * This will results in a sticky event misordering warning
1868 if (!gst_pad_has_current_caps (rtpsession->sync_src)) {
1869 GstCaps *caps = gst_caps_new_empty_simple ("application/x-rtcp");
1870 gst_pad_set_caps (rtpsession->sync_src, caps);
1871 gst_caps_unref (caps);
1875 ret = gst_pad_push_event (rtpsession->sync_src, event);
1882 /* Receive an RTCP packet from a sender, send it to the RTP session manager and
1883 * forward the SR packets to the sync_src pad.
1885 static GstFlowReturn
1886 gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent,
1889 GstRtpSession *rtpsession;
1890 GstRtpSessionPrivate *priv;
1891 GstClockTime current_time;
1894 rtpsession = GST_RTP_SESSION (parent);
1895 priv = rtpsession->priv;
1897 GST_LOG_OBJECT (rtpsession, "received RTCP packet");
1899 GST_RTP_SESSION_LOCK (rtpsession);
1900 if (rtpsession->priv->wait_send) {
1901 GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
1902 rtpsession->priv->wait_send = FALSE;
1903 GST_RTP_SESSION_SIGNAL (rtpsession);
1905 GST_RTP_SESSION_UNLOCK (rtpsession);
1907 current_time = gst_clock_get_time (priv->sysclock);
1908 get_current_times (rtpsession, NULL, &ntpnstime);
1910 rtp_session_process_rtcp (priv->session, buffer, current_time, ntpnstime);
1912 return GST_FLOW_OK; /* always return OK */
1916 gst_rtp_session_query_send_rtcp_src (GstPad * pad, GstObject * parent,
1919 GstRtpSession *rtpsession;
1920 gboolean ret = FALSE;
1922 rtpsession = GST_RTP_SESSION (parent);
1924 GST_DEBUG_OBJECT (rtpsession, "received QUERY %s",
1925 GST_QUERY_TYPE_NAME (query));
1927 switch (GST_QUERY_TYPE (query)) {
1928 case GST_QUERY_LATENCY:
1930 /* use the defaults for the latency query. */
1931 gst_query_set_latency (query, FALSE, 0, -1);
1934 /* other queries simply fail for now */
1942 gst_rtp_session_event_send_rtcp_src (GstPad * pad, GstObject * parent,
1945 GstRtpSession *rtpsession;
1946 gboolean ret = TRUE;
1948 rtpsession = GST_RTP_SESSION (parent);
1949 GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
1950 GST_EVENT_TYPE_NAME (event));
1952 switch (GST_EVENT_TYPE (event)) {
1953 case GST_EVENT_SEEK:
1954 case GST_EVENT_LATENCY:
1955 gst_event_unref (event);
1959 /* other events simply fail for now */
1960 gst_event_unref (event);
1970 gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent,
1973 GstRtpSession *rtpsession;
1974 gboolean ret = FALSE;
1976 rtpsession = GST_RTP_SESSION (parent);
1978 GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
1979 GST_EVENT_TYPE_NAME (event));
1981 switch (GST_EVENT_TYPE (event)) {
1982 case GST_EVENT_CAPS:
1987 gst_event_parse_caps (event, &caps);
1988 gst_rtp_session_setcaps_send_rtp (pad, rtpsession, caps);
1989 ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1992 case GST_EVENT_FLUSH_STOP:
1993 gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
1994 ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1996 case GST_EVENT_SEGMENT:{
1997 GstSegment *segment, in_segment;
1999 segment = &rtpsession->send_rtp_seg;
2001 /* the newsegment event is needed to convert the RTP timestamp to
2002 * running_time, which is needed to generate a mapping from RTP to NTP
2003 * timestamps in SR reports */
2004 gst_event_copy_segment (event, &in_segment);
2005 GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
2008 /* accept upstream */
2009 gst_segment_copy_into (&in_segment, segment);
2011 /* push event forward */
2012 ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2015 case GST_EVENT_EOS:{
2016 GstClockTime current_time;
2018 /* push downstream FIXME, we are not supposed to leave the session just
2019 * because we stop sending. */
2020 ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2021 current_time = gst_clock_get_time (rtpsession->priv->sysclock);
2023 GST_DEBUG_OBJECT (rtpsession, "scheduling BYE message");
2024 rtp_session_mark_all_bye (rtpsession->priv->session, "End Of Stream");
2025 rtp_session_schedule_bye (rtpsession->priv->session, current_time);
2029 GstPad *send_rtp_src;
2031 GST_RTP_SESSION_LOCK (rtpsession);
2032 if ((send_rtp_src = rtpsession->send_rtp_src))
2033 gst_object_ref (send_rtp_src);
2034 GST_RTP_SESSION_UNLOCK (rtpsession);
2037 ret = gst_pad_push_event (send_rtp_src, event);
2038 gst_object_unref (send_rtp_src);
2040 gst_event_unref (event);
2050 gst_rtp_session_event_send_rtp_src (GstPad * pad, GstObject * parent,
2053 GstRtpSession *rtpsession;
2054 gboolean ret = FALSE;
2056 rtpsession = GST_RTP_SESSION (parent);
2058 GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2059 GST_EVENT_TYPE_NAME (event));
2061 switch (GST_EVENT_TYPE (event)) {
2062 case GST_EVENT_LATENCY:
2063 /* save the latency, we need this to know when an RTP packet will be
2064 * rendered by the sink */
2065 gst_event_parse_latency (event, &rtpsession->priv->send_latency);
2067 ret = gst_pad_event_default (pad, parent, event);
2070 ret = gst_pad_event_default (pad, parent, event);
2077 gst_rtp_session_getcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2080 GstRtpSessionPrivate *priv;
2082 GstStructure *s1, *s2;
2086 priv = rtpsession->priv;
2088 ssrc = rtp_session_suggest_ssrc (priv->session, &is_random);
2090 /* we can basically accept anything but we prefer to receive packets with our
2091 * internal SSRC so that we don't have to patch it. Create a structure with
2092 * the SSRC and another one without.
2093 * Only do this if the session actually decided on an ssrc already,
2094 * otherwise we give upstream the opportunity to select an ssrc itself */
2096 s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc,
2098 s2 = gst_structure_new_empty ("application/x-rtp");
2100 result = gst_caps_new_full (s1, s2, NULL);
2102 result = gst_caps_new_empty_simple ("application/x-rtp");
2106 GstCaps *caps = result;
2108 result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
2109 gst_caps_unref (caps);
2112 GST_DEBUG_OBJECT (rtpsession, "getting caps %" GST_PTR_FORMAT, result);
2118 gst_rtp_session_query_send_rtp (GstPad * pad, GstObject * parent,
2121 gboolean res = FALSE;
2122 GstRtpSession *rtpsession;
2124 rtpsession = GST_RTP_SESSION (parent);
2126 switch (GST_QUERY_TYPE (query)) {
2127 case GST_QUERY_CAPS:
2129 GstCaps *filter, *caps;
2131 gst_query_parse_caps (query, &filter);
2132 caps = gst_rtp_session_getcaps_send_rtp (pad, rtpsession, filter);
2133 gst_query_set_caps_result (query, caps);
2134 gst_caps_unref (caps);
2139 res = gst_pad_query_default (pad, parent, query);
2147 gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2150 GstRtpSessionPrivate *priv;
2152 priv = rtpsession->priv;
2154 rtp_session_update_send_caps (priv->session, caps);
2159 /* Recieve an RTP packet or a list of packets to be send to the receivers,
2160 * send to RTP session manager and forward to send_rtp_src.
2162 static GstFlowReturn
2163 gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
2164 gpointer data, gboolean is_list)
2166 GstRtpSessionPrivate *priv;
2168 GstClockTime timestamp, running_time;
2169 GstClockTime current_time;
2171 priv = rtpsession->priv;
2173 GST_LOG_OBJECT (rtpsession, "received RTP %s", is_list ? "list" : "packet");
2175 /* get NTP time when this packet was captured, this depends on the timestamp. */
2177 GstBuffer *buffer = NULL;
2179 /* All groups in an list have the same timestamp.
2180 * So, just take it from the first group. */
2181 buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0);
2183 timestamp = GST_BUFFER_PTS (buffer);
2187 timestamp = GST_BUFFER_PTS (GST_BUFFER_CAST (data));
2190 if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2191 /* convert to running time using the segment start value. */
2193 gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
2195 if (priv->rtcp_sync_send_time)
2196 running_time += priv->send_latency;
2202 current_time = gst_clock_get_time (priv->sysclock);
2203 ret = rtp_session_send_rtp (priv->session, data, is_list, current_time,
2205 if (ret != GST_FLOW_OK)
2215 GST_DEBUG_OBJECT (rtpsession, "process returned %s",
2216 gst_flow_get_name (ret));
2221 static GstFlowReturn
2222 gst_rtp_session_chain_send_rtp (GstPad * pad, GstObject * parent,
2225 GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2227 return gst_rtp_session_chain_send_rtp_common (rtpsession, buffer, FALSE);
2230 static GstFlowReturn
2231 gst_rtp_session_chain_send_rtp_list (GstPad * pad, GstObject * parent,
2232 GstBufferList * list)
2234 GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2236 return gst_rtp_session_chain_send_rtp_common (rtpsession, list, TRUE);
2239 /* Create sinkpad to receive RTP packets from senders. This will also create a
2240 * srcpad for the RTP packets.
2243 create_recv_rtp_sink (GstRtpSession * rtpsession)
2245 GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad");
2247 rtpsession->recv_rtp_sink =
2248 gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
2250 gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
2251 gst_rtp_session_chain_recv_rtp);
2252 gst_pad_set_event_function (rtpsession->recv_rtp_sink,
2253 gst_rtp_session_event_recv_rtp_sink);
2254 gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
2255 gst_rtp_session_iterate_internal_links);
2256 GST_PAD_SET_PROXY_ALLOCATION (rtpsession->recv_rtp_sink);
2257 gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
2258 gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2259 rtpsession->recv_rtp_sink);
2261 GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad");
2262 rtpsession->recv_rtp_src =
2263 gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
2265 gst_pad_set_event_function (rtpsession->recv_rtp_src,
2266 gst_rtp_session_event_recv_rtp_src);
2267 gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_src,
2268 gst_rtp_session_iterate_internal_links);
2269 gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
2270 gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
2271 gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
2273 return rtpsession->recv_rtp_sink;
2276 /* Remove sinkpad to receive RTP packets from senders. This will also remove
2277 * the srcpad for the RTP packets.
2280 remove_recv_rtp_sink (GstRtpSession * rtpsession)
2282 GST_DEBUG_OBJECT (rtpsession, "removing RTP sink pad");
2284 /* deactivate from source to sink */
2285 gst_pad_set_active (rtpsession->recv_rtp_src, FALSE);
2286 gst_pad_set_active (rtpsession->recv_rtp_sink, FALSE);
2289 gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2290 rtpsession->recv_rtp_sink);
2291 rtpsession->recv_rtp_sink = NULL;
2293 GST_DEBUG_OBJECT (rtpsession, "removing RTP src pad");
2294 gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2295 rtpsession->recv_rtp_src);
2296 rtpsession->recv_rtp_src = NULL;
2299 /* Create a sinkpad to receive RTCP messages from senders, this will also create a
2300 * sync_src pad for the SR packets.
2303 create_recv_rtcp_sink (GstRtpSession * rtpsession)
2305 GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad");
2307 rtpsession->recv_rtcp_sink =
2308 gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
2310 gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
2311 gst_rtp_session_chain_recv_rtcp);
2312 gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
2313 gst_rtp_session_event_recv_rtcp_sink);
2314 gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtcp_sink,
2315 gst_rtp_session_iterate_internal_links);
2316 gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE);
2317 gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2318 rtpsession->recv_rtcp_sink);
2320 GST_DEBUG_OBJECT (rtpsession, "creating sync src pad");
2321 rtpsession->sync_src =
2322 gst_pad_new_from_static_template (&rtpsession_sync_src_template,
2324 gst_pad_set_iterate_internal_links_function (rtpsession->sync_src,
2325 gst_rtp_session_iterate_internal_links);
2326 gst_pad_use_fixed_caps (rtpsession->sync_src);
2327 gst_pad_set_active (rtpsession->sync_src, TRUE);
2328 gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2330 return rtpsession->recv_rtcp_sink;
2334 remove_recv_rtcp_sink (GstRtpSession * rtpsession)
2336 GST_DEBUG_OBJECT (rtpsession, "removing RTCP sink pad");
2338 gst_pad_set_active (rtpsession->sync_src, FALSE);
2339 gst_pad_set_active (rtpsession->recv_rtcp_sink, FALSE);
2341 gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2342 rtpsession->recv_rtcp_sink);
2343 rtpsession->recv_rtcp_sink = NULL;
2345 GST_DEBUG_OBJECT (rtpsession, "removing sync src pad");
2346 gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2347 rtpsession->sync_src = NULL;
2350 /* Create a sinkpad to receive RTP packets for receivers. This will also create a
2354 create_send_rtp_sink (GstRtpSession * rtpsession)
2356 GST_DEBUG_OBJECT (rtpsession, "creating pad");
2358 rtpsession->send_rtp_sink =
2359 gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
2361 gst_pad_set_chain_function (rtpsession->send_rtp_sink,
2362 gst_rtp_session_chain_send_rtp);
2363 gst_pad_set_chain_list_function (rtpsession->send_rtp_sink,
2364 gst_rtp_session_chain_send_rtp_list);
2365 gst_pad_set_query_function (rtpsession->send_rtp_sink,
2366 gst_rtp_session_query_send_rtp);
2367 gst_pad_set_event_function (rtpsession->send_rtp_sink,
2368 gst_rtp_session_event_send_rtp_sink);
2369 gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_sink,
2370 gst_rtp_session_iterate_internal_links);
2371 GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_sink);
2372 GST_PAD_SET_PROXY_ALLOCATION (rtpsession->send_rtp_sink);
2373 gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
2374 gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2375 rtpsession->send_rtp_sink);
2377 rtpsession->send_rtp_src =
2378 gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
2380 gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_src,
2381 gst_rtp_session_iterate_internal_links);
2382 gst_pad_set_event_function (rtpsession->send_rtp_src,
2383 gst_rtp_session_event_send_rtp_src);
2384 GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_src);
2385 gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
2386 gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
2388 return rtpsession->send_rtp_sink;
2392 remove_send_rtp_sink (GstRtpSession * rtpsession)
2394 GST_DEBUG_OBJECT (rtpsession, "removing pad");
2396 gst_pad_set_active (rtpsession->send_rtp_src, FALSE);
2397 gst_pad_set_active (rtpsession->send_rtp_sink, FALSE);
2399 gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2400 rtpsession->send_rtp_sink);
2401 rtpsession->send_rtp_sink = NULL;
2403 gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2404 rtpsession->send_rtp_src);
2405 rtpsession->send_rtp_src = NULL;
2408 /* Create a srcpad with the RTCP packets to send out.
2409 * This pad will be driven by the RTP session manager when it wants to send out
2413 create_send_rtcp_src (GstRtpSession * rtpsession)
2415 GST_DEBUG_OBJECT (rtpsession, "creating pad");
2417 rtpsession->send_rtcp_src =
2418 gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
2420 gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
2421 gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
2422 gst_pad_set_iterate_internal_links_function (rtpsession->send_rtcp_src,
2423 gst_rtp_session_iterate_internal_links);
2424 gst_pad_set_query_function (rtpsession->send_rtcp_src,
2425 gst_rtp_session_query_send_rtcp_src);
2426 gst_pad_set_event_function (rtpsession->send_rtcp_src,
2427 gst_rtp_session_event_send_rtcp_src);
2428 gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2429 rtpsession->send_rtcp_src);
2431 return rtpsession->send_rtcp_src;
2435 remove_send_rtcp_src (GstRtpSession * rtpsession)
2437 GST_DEBUG_OBJECT (rtpsession, "removing pad");
2439 gst_pad_set_active (rtpsession->send_rtcp_src, FALSE);
2441 gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2442 rtpsession->send_rtcp_src);
2443 rtpsession->send_rtcp_src = NULL;
2447 gst_rtp_session_request_new_pad (GstElement * element,
2448 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2450 GstRtpSession *rtpsession;
2451 GstElementClass *klass;
2454 g_return_val_if_fail (templ != NULL, NULL);
2455 g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
2457 rtpsession = GST_RTP_SESSION (element);
2458 klass = GST_ELEMENT_GET_CLASS (element);
2460 GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
2462 GST_RTP_SESSION_LOCK (rtpsession);
2464 /* figure out the template */
2465 if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
2466 if (rtpsession->recv_rtp_sink != NULL)
2469 result = create_recv_rtp_sink (rtpsession);
2470 } else if (templ == gst_element_class_get_pad_template (klass,
2471 "recv_rtcp_sink")) {
2472 if (rtpsession->recv_rtcp_sink != NULL)
2475 result = create_recv_rtcp_sink (rtpsession);
2476 } else if (templ == gst_element_class_get_pad_template (klass,
2478 if (rtpsession->send_rtp_sink != NULL)
2481 result = create_send_rtp_sink (rtpsession);
2482 } else if (templ == gst_element_class_get_pad_template (klass,
2484 if (rtpsession->send_rtcp_src != NULL)
2487 result = create_send_rtcp_src (rtpsession);
2489 goto wrong_template;
2491 GST_RTP_SESSION_UNLOCK (rtpsession);
2498 GST_RTP_SESSION_UNLOCK (rtpsession);
2499 g_warning ("rtpsession: this is not our template");
2504 GST_RTP_SESSION_UNLOCK (rtpsession);
2505 g_warning ("rtpsession: pad already requested");
2511 gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
2513 GstRtpSession *rtpsession;
2515 g_return_if_fail (GST_IS_RTP_SESSION (element));
2516 g_return_if_fail (GST_IS_PAD (pad));
2518 rtpsession = GST_RTP_SESSION (element);
2520 GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2522 GST_RTP_SESSION_LOCK (rtpsession);
2524 if (rtpsession->recv_rtp_sink == pad) {
2525 remove_recv_rtp_sink (rtpsession);
2526 } else if (rtpsession->recv_rtcp_sink == pad) {
2527 remove_recv_rtcp_sink (rtpsession);
2528 } else if (rtpsession->send_rtp_sink == pad) {
2529 remove_send_rtp_sink (rtpsession);
2530 } else if (rtpsession->send_rtcp_src == pad) {
2531 remove_send_rtcp_src (rtpsession);
2535 GST_RTP_SESSION_UNLOCK (rtpsession);
2542 GST_RTP_SESSION_UNLOCK (rtpsession);
2543 g_warning ("rtpsession: asked to release an unknown pad");
2549 gst_rtp_session_request_key_unit (RTPSession * sess,
2550 gboolean all_headers, gpointer user_data)
2552 GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2554 GstPad *send_rtp_sink;
2556 GST_RTP_SESSION_LOCK (rtpsession);
2557 if ((send_rtp_sink = rtpsession->send_rtp_sink))
2558 gst_object_ref (send_rtp_sink);
2559 GST_RTP_SESSION_UNLOCK (rtpsession);
2561 if (send_rtp_sink) {
2562 event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2563 gst_structure_new ("GstForceKeyUnit",
2564 "all-headers", G_TYPE_BOOLEAN, all_headers, NULL));
2565 gst_pad_push_event (send_rtp_sink, event);
2566 gst_object_unref (send_rtp_sink);
2571 gst_rtp_session_request_time (RTPSession * session, gpointer user_data)
2573 GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2575 return gst_clock_get_time (rtpsession->priv->sysclock);
2579 gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum,
2580 guint16 blp, guint32 ssrc, gpointer user_data)
2582 GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2584 GstPad *send_rtp_sink;
2586 GST_RTP_SESSION_LOCK (rtpsession);
2587 if ((send_rtp_sink = rtpsession->send_rtp_sink))
2588 gst_object_ref (send_rtp_sink);
2589 GST_RTP_SESSION_UNLOCK (rtpsession);
2591 if (send_rtp_sink) {
2593 event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2594 gst_structure_new ("GstRTPRetransmissionRequest",
2595 "seqnum", G_TYPE_UINT, (guint) seqnum,
2596 "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
2597 gst_pad_push_event (send_rtp_sink, event);
2603 while ((blp & 1) == 0) {
2609 gst_object_unref (send_rtp_sink);
2614 gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data)
2616 GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2617 GstPad *send_rtp_sink;
2619 GST_RTP_SESSION_LOCK (rtpsession);
2620 if ((send_rtp_sink = rtpsession->send_rtp_sink))
2621 gst_object_ref (send_rtp_sink);
2622 GST_RTP_SESSION_UNLOCK (rtpsession);
2624 if (send_rtp_sink) {
2625 gst_pad_push_event (send_rtp_sink, gst_event_new_reconfigure ());
2626 gst_object_unref (send_rtp_sink);