2 * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
26 #include <gst/glib-compat-private.h>
28 #include "gstrtpbin-marshal.h"
29 #include "rtpsession.h"
31 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
32 #define GST_CAT_DEFAULT rtp_session_debug
34 /* signals and args */
37 SIGNAL_GET_SOURCE_BY_SSRC,
39 SIGNAL_ON_SSRC_COLLISION,
40 SIGNAL_ON_SSRC_VALIDATED,
41 SIGNAL_ON_SSRC_ACTIVE,
44 SIGNAL_ON_BYE_TIMEOUT,
46 SIGNAL_ON_SENDER_TIMEOUT,
47 SIGNAL_ON_SENDING_RTCP,
48 SIGNAL_ON_FEEDBACK_RTCP,
53 #define DEFAULT_INTERNAL_SOURCE NULL
54 #define DEFAULT_BANDWIDTH RTP_STATS_BANDWIDTH
55 #define DEFAULT_RTCP_FRACTION (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
56 #define DEFAULT_RTCP_RR_BANDWIDTH -1
57 #define DEFAULT_RTCP_RS_BANDWIDTH -1
58 #define DEFAULT_RTCP_MTU 1400
59 #define DEFAULT_SDES NULL
60 #define DEFAULT_NUM_SOURCES 0
61 #define DEFAULT_NUM_ACTIVE_SOURCES 0
62 #define DEFAULT_SOURCES NULL
63 #define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND)
64 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
65 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
74 PROP_RTCP_RR_BANDWIDTH,
75 PROP_RTCP_RS_BANDWIDTH,
79 PROP_NUM_ACTIVE_SOURCES,
82 PROP_RTCP_MIN_INTERVAL,
83 PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
84 PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
88 /* update average packet size */
89 #define INIT_AVG(avg, val) \
91 #define UPDATE_AVG(avg, val) \
95 (avg) = ((val) + (15 * (avg))) >> 4;
98 /* The number RTCP intervals after which to timeout entries in the
101 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
103 /* GObject vmethods */
104 static void rtp_session_finalize (GObject * object);
105 static void rtp_session_set_property (GObject * object, guint prop_id,
106 const GValue * value, GParamSpec * pspec);
107 static void rtp_session_get_property (GObject * object, guint prop_id,
108 GValue * value, GParamSpec * pspec);
110 static gboolean rtp_session_on_sending_rtcp (RTPSession * sess,
111 GstBuffer * buffer, gboolean early);
112 static void rtp_session_send_rtcp (RTPSession * sess,
113 GstClockTimeDiff max_delay);
116 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
118 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
120 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
121 gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
122 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
123 const gchar * reason, GstClockTime current_time);
124 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
125 gboolean deterministic, gboolean first);
128 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
129 const GValue * handler_return, gpointer data)
131 if (g_value_get_boolean (handler_return))
132 g_value_set_boolean (return_accu, TRUE);
138 gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN (GClosure * closure,
139 GValue * return_value G_GNUC_UNUSED, guint n_param_values,
140 const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
141 gpointer marshal_data)
143 typedef gboolean (*GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (gpointer data1,
144 gpointer arg_1, gboolean arg_2, gpointer data2);
145 register GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN callback;
146 register GCClosure *cc = (GCClosure *) closure;
147 register gpointer data1, data2;
150 g_return_if_fail (return_value != NULL);
151 g_return_if_fail (n_param_values == 3);
153 if (G_CCLOSURE_SWAP_DATA (closure)) {
154 data1 = closure->data;
155 data2 = g_value_peek_pointer (param_values + 0);
157 data1 = g_value_peek_pointer (param_values + 0);
158 data2 = closure->data;
161 (GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (marshal_data ? marshal_data :
164 v_return = callback (data1,
165 gst_value_get_mini_object (param_values + 1),
166 g_value_get_boolean (param_values + 2), data2);
168 g_value_set_boolean (return_value, v_return);
172 gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT (GClosure * closure,
173 GValue * return_value G_GNUC_UNUSED, guint n_param_values,
174 const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
175 gpointer marshal_data)
177 typedef void (*GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (gpointer
178 data1, guint arg_1, guint arg_2, guint arg_3, guint arg_4, gpointer arg_5,
180 register GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT callback;
181 register GCClosure *cc = (GCClosure *) closure;
182 register gpointer data1, data2;
184 g_return_if_fail (n_param_values == 6);
186 if (G_CCLOSURE_SWAP_DATA (closure)) {
187 data1 = closure->data;
188 data2 = g_value_peek_pointer (param_values + 0);
190 data1 = g_value_peek_pointer (param_values + 0);
191 data2 = closure->data;
194 (GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (marshal_data ?
195 marshal_data : cc->callback);
198 g_value_get_uint (param_values + 1),
199 g_value_get_uint (param_values + 2),
200 g_value_get_uint (param_values + 3),
201 g_value_get_uint (param_values + 4),
202 gst_value_get_mini_object (param_values + 5), data2);
207 rtp_session_class_init (RTPSessionClass * klass)
209 GObjectClass *gobject_class;
211 gobject_class = (GObjectClass *) klass;
213 gobject_class->finalize = rtp_session_finalize;
214 gobject_class->set_property = rtp_session_set_property;
215 gobject_class->get_property = rtp_session_get_property;
218 * RTPSession::get-source-by-ssrc:
219 * @session: the object which received the signal
220 * @ssrc: the SSRC of the RTPSource
222 * Request the #RTPSource object with SSRC @ssrc in @session.
224 rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
225 g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
226 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
227 get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
228 RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
231 * RTPSession::on-new-ssrc:
232 * @session: the object which received the signal
233 * @src: the new RTPSource
235 * Notify of a new SSRC that entered @session.
237 rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
238 g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
239 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
240 NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
243 * RTPSession::on-ssrc-collision:
244 * @session: the object which received the signal
245 * @src: the #RTPSource that caused a collision
247 * Notify when we have an SSRC collision
249 rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
250 g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
251 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
252 NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
255 * RTPSession::on-ssrc-validated:
256 * @session: the object which received the signal
257 * @src: the new validated RTPSource
259 * Notify of a new SSRC that became validated.
261 rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
262 g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
263 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
264 NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
267 * RTPSession::on-ssrc-active:
268 * @session: the object which received the signal
269 * @src: the active RTPSource
271 * Notify of a SSRC that is active, i.e., sending RTCP.
273 rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
274 g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
275 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
276 NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
279 * RTPSession::on-ssrc-sdes:
280 * @session: the object which received the signal
281 * @src: the RTPSource
283 * Notify that a new SDES was received for SSRC.
285 rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
286 g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
287 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
288 NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
291 * RTPSession::on-bye-ssrc:
292 * @session: the object which received the signal
293 * @src: the RTPSource that went away
295 * Notify of an SSRC that became inactive because of a BYE packet.
297 rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
298 g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
299 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
300 NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
303 * RTPSession::on-bye-timeout:
304 * @session: the object which received the signal
305 * @src: the RTPSource that timed out
307 * Notify of an SSRC that has timed out because of BYE
309 rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
310 g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
311 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
312 NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
315 * RTPSession::on-timeout:
316 * @session: the object which received the signal
317 * @src: the RTPSource that timed out
319 * Notify of an SSRC that has timed out
321 rtp_session_signals[SIGNAL_ON_TIMEOUT] =
322 g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
323 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
324 NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
327 * RTPSession::on-sender-timeout:
328 * @session: the object which received the signal
329 * @src: the RTPSource that timed out
331 * Notify of an SSRC that was a sender but timed out and became a receiver.
333 rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
334 g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
335 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
336 NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
340 * RTPSession::on-sending-rtcp
341 * @session: the object which received the signal
342 * @buffer: the #GstBuffer containing the RTCP packet about to be sent
343 * @early: %TRUE if the packet is early, %FALSE if it is regular
345 * This signal is emitted before sending an RTCP packet, it can be used
346 * to add extra RTCP Packets.
348 * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
349 * if suppressing it is acceptable
351 rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
352 g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
353 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
354 accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN,
355 G_TYPE_BOOLEAN, 2, GST_TYPE_BUFFER, G_TYPE_BOOLEAN);
358 * RTPSession::on-feedback-rtcp:
359 * @session: the object which received the signal
360 * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
361 * %GST_RTCP_TYPE_RTPFB
362 * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
363 * @sender_ssrc: The SSRC of the sender
364 * @media_ssrc: The SSRC of the media this refers to
365 * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
368 * Notify that a RTCP feedback packet has been received
371 rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
372 g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
373 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
374 NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT,
375 G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
379 * RTPSession::send-rtcp:
380 * @session: the object which received the signal
381 * @max_delay: The maximum delay after which the feedback will not be useful
384 * Requests that the #RTPSession initiate a new RTCP packet as soon as
385 * possible within the requested delay.
388 rtp_session_signals[SIGNAL_SEND_RTCP] =
389 g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
390 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
391 G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
392 gst_rtp_bin_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64);
394 g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
395 g_param_spec_uint ("internal-ssrc", "Internal SSRC",
396 "The internal SSRC used for the session",
397 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399 g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
400 g_param_spec_object ("internal-source", "Internal Source",
401 "The internal source element of the session",
402 RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
404 g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
405 g_param_spec_double ("bandwidth", "Bandwidth",
406 "The bandwidth of the session (0 for auto-discover)",
407 0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
408 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410 g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
411 g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
412 "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
413 0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
414 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
416 g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
417 g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
418 "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
419 -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
420 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422 g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
423 g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
424 "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
425 -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
426 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428 g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
429 g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
430 "The maximum size of the RTCP packets",
431 16, G_MAXINT16, DEFAULT_RTCP_MTU,
432 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434 g_object_class_install_property (gobject_class, PROP_SDES,
435 g_param_spec_boxed ("sdes", "SDES",
436 "The SDES items of this session",
437 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439 g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
440 g_param_spec_uint ("num-sources", "Num Sources",
441 "The number of sources in the session", 0, G_MAXUINT,
442 DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
444 g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
445 g_param_spec_uint ("num-active-sources", "Num Active Sources",
446 "The number of active sources in the session", 0, G_MAXUINT,
447 DEFAULT_NUM_ACTIVE_SOURCES,
448 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
452 * Get a GValue Array of all sources in the session.
455 * <title>Getting the #RTPSources of a session
462 * g_object_get (sess, "sources", &arr, NULL);
464 * for (i = 0; i < arr->n_values; i++) {
467 * val = g_value_array_get_nth (arr, i);
468 * source = g_value_get_object (val);
470 * g_value_array_free (arr);
475 g_object_class_install_property (gobject_class, PROP_SOURCES,
476 g_param_spec_boxed ("sources", "Sources",
477 "An array of all known sources in the session",
478 G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
480 g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
481 g_param_spec_boolean ("favor-new", "Favor new sources",
482 "Resolve SSRC conflict in favor of new sources", FALSE,
483 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
485 g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
486 g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
487 "Minimum interval between Regular RTCP packet (in ns)",
488 0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
489 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
491 g_object_class_install_property (gobject_class,
492 PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
493 g_param_spec_uint64 ("rtcp-feedback-retention-window",
494 "RTCP Feedback retention window",
495 "Duration during which RTCP Feedback packets are retained (in ns)",
496 0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
497 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
499 g_object_class_install_property (gobject_class,
500 PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
501 g_param_spec_uint ("rtcp-immediate-feedback-threshold",
502 "RTCP Immediate Feedback threshold",
503 "The maximum number of members of a RTP session for which immediate"
505 0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
506 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
508 klass->get_source_by_ssrc =
509 GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
510 klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp);
511 klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
513 GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
517 rtp_session_init (RTPSession * sess)
522 sess->lock = g_mutex_new ();
523 sess->key = g_random_int ();
527 for (i = 0; i < 32; i++) {
529 g_hash_table_new_full (NULL, NULL, NULL,
530 (GDestroyNotify) g_object_unref);
532 sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
534 rtp_stats_init_defaults (&sess->stats);
536 sess->recalc_bandwidth = TRUE;
537 sess->bandwidth = DEFAULT_BANDWIDTH;
538 sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
539 sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
540 sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
542 /* create an active SSRC for this session manager */
543 sess->source = rtp_session_create_source (sess);
544 sess->source->validated = TRUE;
545 sess->source->internal = TRUE;
546 sess->stats.active_sources++;
547 INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
548 sess->source->stats.prev_rtcptime = 0;
549 sess->source->stats.last_rtcptime = 1;
551 rtp_stats_set_min_interval (&sess->stats,
552 (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
554 /* default UDP header length */
555 sess->header_len = 28;
556 sess->mtu = DEFAULT_RTCP_MTU;
558 /* some default SDES entries */
560 /* we do not want to leak details like the username or hostname here */
561 str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
562 rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
566 /* we do not want to leak the user's real name here */
567 str = g_strdup_printf ("Anon%u", g_random_int ());
568 rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME, str);
572 rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
574 sess->first_rtcp = TRUE;
575 sess->allow_early = TRUE;
576 sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
577 sess->rtcp_immediate_feedback_threshold =
578 DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
580 sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
582 GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
586 rtp_session_finalize (GObject * object)
591 sess = RTP_SESSION_CAST (object);
593 g_mutex_free (sess->lock);
594 for (i = 0; i < 32; i++)
595 g_hash_table_destroy (sess->ssrcs[i]);
597 g_free (sess->bye_reason);
599 g_hash_table_destroy (sess->cnames);
600 g_object_unref (sess->source);
602 G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
606 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
608 GValue value = { 0 };
610 g_value_init (&value, RTP_TYPE_SOURCE);
611 g_value_take_object (&value, source);
612 /* copies the value */
613 g_value_array_append (arr, &value);
617 rtp_session_create_sources (RTPSession * sess)
622 RTP_SESSION_LOCK (sess);
623 /* get number of elements in the table */
624 size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
625 /* create the result value array */
626 res = g_value_array_new (size);
628 /* and copy all values into the array */
629 g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
630 RTP_SESSION_UNLOCK (sess);
636 rtp_session_set_property (GObject * object, guint prop_id,
637 const GValue * value, GParamSpec * pspec)
641 sess = RTP_SESSION (object);
644 case PROP_INTERNAL_SSRC:
645 rtp_session_set_internal_ssrc (sess, g_value_get_uint (value));
648 sess->bandwidth = g_value_get_double (value);
649 sess->recalc_bandwidth = TRUE;
651 case PROP_RTCP_FRACTION:
652 sess->rtcp_bandwidth = g_value_get_double (value);
653 sess->recalc_bandwidth = TRUE;
655 case PROP_RTCP_RR_BANDWIDTH:
656 sess->rtcp_rr_bandwidth = g_value_get_int (value);
657 sess->recalc_bandwidth = TRUE;
659 case PROP_RTCP_RS_BANDWIDTH:
660 sess->rtcp_rs_bandwidth = g_value_get_int (value);
661 sess->recalc_bandwidth = TRUE;
664 sess->mtu = g_value_get_uint (value);
667 rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
670 sess->favor_new = g_value_get_boolean (value);
672 case PROP_RTCP_MIN_INTERVAL:
673 rtp_stats_set_min_interval (&sess->stats,
674 (gdouble) g_value_get_uint64 (value) / GST_SECOND);
675 /* trigger reconsideration */
676 RTP_SESSION_LOCK (sess);
677 sess->next_rtcp_check_time = 0;
678 RTP_SESSION_UNLOCK (sess);
679 if (sess->callbacks.reconsider)
680 sess->callbacks.reconsider (sess, sess->reconsider_user_data);
682 case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
683 sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
686 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
692 rtp_session_get_property (GObject * object, guint prop_id,
693 GValue * value, GParamSpec * pspec)
697 sess = RTP_SESSION (object);
700 case PROP_INTERNAL_SSRC:
701 g_value_set_uint (value, rtp_session_get_internal_ssrc (sess));
703 case PROP_INTERNAL_SOURCE:
704 g_value_take_object (value, rtp_session_get_internal_source (sess));
707 g_value_set_double (value, sess->bandwidth);
709 case PROP_RTCP_FRACTION:
710 g_value_set_double (value, sess->rtcp_bandwidth);
712 case PROP_RTCP_RR_BANDWIDTH:
713 g_value_set_int (value, sess->rtcp_rr_bandwidth);
715 case PROP_RTCP_RS_BANDWIDTH:
716 g_value_set_int (value, sess->rtcp_rs_bandwidth);
719 g_value_set_uint (value, sess->mtu);
722 g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
724 case PROP_NUM_SOURCES:
725 g_value_set_uint (value, rtp_session_get_num_sources (sess));
727 case PROP_NUM_ACTIVE_SOURCES:
728 g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
731 g_value_take_boxed (value, rtp_session_create_sources (sess));
734 g_value_set_boolean (value, sess->favor_new);
736 case PROP_RTCP_MIN_INTERVAL:
737 g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
739 case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
740 g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
743 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
749 on_new_ssrc (RTPSession * sess, RTPSource * source)
751 g_object_ref (source);
752 RTP_SESSION_UNLOCK (sess);
753 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
754 RTP_SESSION_LOCK (sess);
755 g_object_unref (source);
759 on_ssrc_collision (RTPSession * sess, RTPSource * source)
761 g_object_ref (source);
762 RTP_SESSION_UNLOCK (sess);
763 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
765 RTP_SESSION_LOCK (sess);
766 g_object_unref (source);
770 on_ssrc_validated (RTPSession * sess, RTPSource * source)
772 g_object_ref (source);
773 RTP_SESSION_UNLOCK (sess);
774 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
776 RTP_SESSION_LOCK (sess);
777 g_object_unref (source);
781 on_ssrc_active (RTPSession * sess, RTPSource * source)
783 g_object_ref (source);
784 RTP_SESSION_UNLOCK (sess);
785 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
786 RTP_SESSION_LOCK (sess);
787 g_object_unref (source);
791 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
793 g_object_ref (source);
794 GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
795 RTP_SESSION_UNLOCK (sess);
796 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
797 RTP_SESSION_LOCK (sess);
798 g_object_unref (source);
802 on_bye_ssrc (RTPSession * sess, RTPSource * source)
804 g_object_ref (source);
805 RTP_SESSION_UNLOCK (sess);
806 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
807 RTP_SESSION_LOCK (sess);
808 g_object_unref (source);
812 on_bye_timeout (RTPSession * sess, RTPSource * source)
814 g_object_ref (source);
815 RTP_SESSION_UNLOCK (sess);
816 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
817 RTP_SESSION_LOCK (sess);
818 g_object_unref (source);
822 on_timeout (RTPSession * sess, RTPSource * source)
824 g_object_ref (source);
825 RTP_SESSION_UNLOCK (sess);
826 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
827 RTP_SESSION_LOCK (sess);
828 g_object_unref (source);
832 on_sender_timeout (RTPSession * sess, RTPSource * source)
834 g_object_ref (source);
835 RTP_SESSION_UNLOCK (sess);
836 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
838 RTP_SESSION_LOCK (sess);
839 g_object_unref (source);
845 * Create a new session object.
847 * Returns: a new #RTPSession. g_object_unref() after usage.
850 rtp_session_new (void)
854 sess = g_object_new (RTP_TYPE_SESSION, NULL);
860 * rtp_session_set_callbacks:
861 * @sess: an #RTPSession
862 * @callbacks: callbacks to configure
863 * @user_data: user data passed in the callbacks
865 * Configure a set of callbacks to be notified of actions.
868 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
871 g_return_if_fail (RTP_IS_SESSION (sess));
873 if (callbacks->process_rtp) {
874 sess->callbacks.process_rtp = callbacks->process_rtp;
875 sess->process_rtp_user_data = user_data;
877 if (callbacks->send_rtp) {
878 sess->callbacks.send_rtp = callbacks->send_rtp;
879 sess->send_rtp_user_data = user_data;
881 if (callbacks->send_rtcp) {
882 sess->callbacks.send_rtcp = callbacks->send_rtcp;
883 sess->send_rtcp_user_data = user_data;
885 if (callbacks->sync_rtcp) {
886 sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
887 sess->sync_rtcp_user_data = user_data;
889 if (callbacks->clock_rate) {
890 sess->callbacks.clock_rate = callbacks->clock_rate;
891 sess->clock_rate_user_data = user_data;
893 if (callbacks->reconsider) {
894 sess->callbacks.reconsider = callbacks->reconsider;
895 sess->reconsider_user_data = user_data;
897 if (callbacks->request_key_unit) {
898 sess->callbacks.request_key_unit = callbacks->request_key_unit;
899 sess->request_key_unit_user_data = user_data;
901 if (callbacks->request_time) {
902 sess->callbacks.request_time = callbacks->request_time;
903 sess->request_time_user_data = user_data;
908 * rtp_session_set_process_rtp_callback:
909 * @sess: an #RTPSession
910 * @callback: callback to set
911 * @user_data: user data passed in the callback
913 * Configure only the process_rtp callback to be notified of the process_rtp action.
916 rtp_session_set_process_rtp_callback (RTPSession * sess,
917 RTPSessionProcessRTP callback, gpointer user_data)
919 g_return_if_fail (RTP_IS_SESSION (sess));
921 sess->callbacks.process_rtp = callback;
922 sess->process_rtp_user_data = user_data;
926 * rtp_session_set_send_rtp_callback:
927 * @sess: an #RTPSession
928 * @callback: callback to set
929 * @user_data: user data passed in the callback
931 * Configure only the send_rtp callback to be notified of the send_rtp action.
934 rtp_session_set_send_rtp_callback (RTPSession * sess,
935 RTPSessionSendRTP callback, gpointer user_data)
937 g_return_if_fail (RTP_IS_SESSION (sess));
939 sess->callbacks.send_rtp = callback;
940 sess->send_rtp_user_data = user_data;
944 * rtp_session_set_send_rtcp_callback:
945 * @sess: an #RTPSession
946 * @callback: callback to set
947 * @user_data: user data passed in the callback
949 * Configure only the send_rtcp callback to be notified of the send_rtcp action.
952 rtp_session_set_send_rtcp_callback (RTPSession * sess,
953 RTPSessionSendRTCP callback, gpointer user_data)
955 g_return_if_fail (RTP_IS_SESSION (sess));
957 sess->callbacks.send_rtcp = callback;
958 sess->send_rtcp_user_data = user_data;
962 * rtp_session_set_sync_rtcp_callback:
963 * @sess: an #RTPSession
964 * @callback: callback to set
965 * @user_data: user data passed in the callback
967 * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
970 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
971 RTPSessionSyncRTCP callback, gpointer user_data)
973 g_return_if_fail (RTP_IS_SESSION (sess));
975 sess->callbacks.sync_rtcp = callback;
976 sess->sync_rtcp_user_data = user_data;
980 * rtp_session_set_clock_rate_callback:
981 * @sess: an #RTPSession
982 * @callback: callback to set
983 * @user_data: user data passed in the callback
985 * Configure only the clock_rate callback to be notified of the clock_rate action.
988 rtp_session_set_clock_rate_callback (RTPSession * sess,
989 RTPSessionClockRate callback, gpointer user_data)
991 g_return_if_fail (RTP_IS_SESSION (sess));
993 sess->callbacks.clock_rate = callback;
994 sess->clock_rate_user_data = user_data;
998 * rtp_session_set_reconsider_callback:
999 * @sess: an #RTPSession
1000 * @callback: callback to set
1001 * @user_data: user data passed in the callback
1003 * Configure only the reconsider callback to be notified of the reconsider action.
1006 rtp_session_set_reconsider_callback (RTPSession * sess,
1007 RTPSessionReconsider callback, gpointer user_data)
1009 g_return_if_fail (RTP_IS_SESSION (sess));
1011 sess->callbacks.reconsider = callback;
1012 sess->reconsider_user_data = user_data;
1016 * rtp_session_set_request_time_callback:
1017 * @sess: an #RTPSession
1018 * @callback: callback to set
1019 * @user_data: user data passed in the callback
1021 * Configure only the request_time callback
1024 rtp_session_set_request_time_callback (RTPSession * sess,
1025 RTPSessionRequestTime callback, gpointer user_data)
1027 g_return_if_fail (RTP_IS_SESSION (sess));
1029 sess->callbacks.request_time = callback;
1030 sess->request_time_user_data = user_data;
1034 * rtp_session_set_bandwidth:
1035 * @sess: an #RTPSession
1036 * @bandwidth: the bandwidth allocated
1038 * Set the session bandwidth in bytes per second.
1041 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1043 g_return_if_fail (RTP_IS_SESSION (sess));
1045 RTP_SESSION_LOCK (sess);
1046 sess->stats.bandwidth = bandwidth;
1047 RTP_SESSION_UNLOCK (sess);
1051 * rtp_session_get_bandwidth:
1052 * @sess: an #RTPSession
1054 * Get the session bandwidth.
1056 * Returns: the session bandwidth.
1059 rtp_session_get_bandwidth (RTPSession * sess)
1063 g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1065 RTP_SESSION_LOCK (sess);
1066 result = sess->stats.bandwidth;
1067 RTP_SESSION_UNLOCK (sess);
1073 * rtp_session_set_rtcp_fraction:
1074 * @sess: an #RTPSession
1075 * @bandwidth: the RTCP bandwidth
1077 * Set the bandwidth in bytes per second that should be used for RTCP
1081 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1083 g_return_if_fail (RTP_IS_SESSION (sess));
1085 RTP_SESSION_LOCK (sess);
1086 sess->stats.rtcp_bandwidth = bandwidth;
1087 RTP_SESSION_UNLOCK (sess);
1091 * rtp_session_get_rtcp_fraction:
1092 * @sess: an #RTPSession
1094 * Get the session bandwidth used for RTCP.
1096 * Returns: The bandwidth used for RTCP messages.
1099 rtp_session_get_rtcp_fraction (RTPSession * sess)
1103 g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1105 RTP_SESSION_LOCK (sess);
1106 result = sess->stats.rtcp_bandwidth;
1107 RTP_SESSION_UNLOCK (sess);
1113 * rtp_session_set_sdes_string:
1114 * @sess: an #RTPSession
1115 * @type: the type of the SDES item
1116 * @item: a null-terminated string to set.
1118 * Store an SDES item of @type in @sess.
1120 * Returns: %FALSE if the data was unchanged @type is invalid.
1123 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
1128 g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1130 RTP_SESSION_LOCK (sess);
1131 result = rtp_source_set_sdes_string (sess->source, type, item);
1132 RTP_SESSION_UNLOCK (sess);
1138 * rtp_session_get_sdes_string:
1139 * @sess: an #RTPSession
1140 * @type: the type of the SDES item
1142 * Get the SDES item of @type from @sess.
1144 * Returns: a null-terminated copy of the SDES item or NULL when @type was not
1145 * valid. g_free() after usage.
1148 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
1152 g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1154 RTP_SESSION_LOCK (sess);
1155 result = rtp_source_get_sdes_string (sess->source, type);
1156 RTP_SESSION_UNLOCK (sess);
1162 * rtp_session_get_sdes_struct:
1163 * @sess: an #RTSPSession
1165 * Get the SDES data as a #GstStructure
1167 * Returns: a GstStructure with SDES items for @sess. This function returns a
1168 * copy of the SDES structure, use gst_structure_free() after usage.
1171 rtp_session_get_sdes_struct (RTPSession * sess)
1173 const GstStructure *sdes;
1174 GstStructure *result = NULL;
1176 g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1178 RTP_SESSION_LOCK (sess);
1179 sdes = rtp_source_get_sdes_struct (sess->source);
1181 result = gst_structure_copy (sdes);
1182 RTP_SESSION_UNLOCK (sess);
1188 * rtp_session_set_sdes_struct:
1189 * @sess: an #RTSPSession
1190 * @sdes: a #GstStructure
1192 * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1195 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1197 g_return_if_fail (sdes);
1198 g_return_if_fail (RTP_IS_SESSION (sess));
1200 RTP_SESSION_LOCK (sess);
1201 rtp_source_set_sdes_struct (sess->source, gst_structure_copy (sdes));
1202 RTP_SESSION_UNLOCK (sess);
1205 static GstFlowReturn
1206 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1208 GstFlowReturn result = GST_FLOW_OK;
1210 if (source == session->source) {
1211 GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1213 RTP_SESSION_UNLOCK (session);
1215 if (session->callbacks.send_rtp)
1217 session->callbacks.send_rtp (session, source, data,
1218 session->send_rtp_user_data);
1220 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1223 GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1224 RTP_SESSION_UNLOCK (session);
1226 if (session->callbacks.process_rtp)
1228 session->callbacks.process_rtp (session, source,
1229 GST_BUFFER_CAST (data), session->process_rtp_user_data);
1231 gst_buffer_unref (GST_BUFFER_CAST (data));
1233 RTP_SESSION_LOCK (session);
1239 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1243 RTP_SESSION_UNLOCK (session);
1245 if (session->callbacks.clock_rate)
1247 session->callbacks.clock_rate (session, pt,
1248 session->clock_rate_user_data);
1252 RTP_SESSION_LOCK (session);
1254 GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1259 static RTPSourceCallbacks callbacks = {
1260 (RTPSourcePushRTP) source_push_rtp,
1261 (RTPSourceClockRate) source_clock_rate,
1265 check_collision (RTPSession * sess, RTPSource * source,
1266 RTPArrivalStats * arrival, gboolean rtp)
1268 /* If we have no arrival address, we can't do collision checking */
1269 if (!arrival->have_address)
1272 if (sess->source != source) {
1273 GstNetAddress *from;
1276 /* This is not our local source, but lets check if two remote
1281 from = &source->rtp_from;
1282 have_from = source->have_rtp_from;
1284 from = &source->rtcp_from;
1285 have_from = source->have_rtcp_from;
1289 if (gst_netaddress_equal (from, &arrival->address)) {
1290 /* Address is the same */
1293 GST_LOG ("we have a third-party collision or loop ssrc:%x",
1294 rtp_source_get_ssrc (source));
1295 if (sess->favor_new) {
1296 if (rtp_source_find_conflicting_address (source,
1297 &arrival->address, arrival->current_time)) {
1299 gst_netaddress_to_string (&arrival->address, buf1, 40);
1300 GST_LOG ("Known conflict on %x for %s, dropping packet",
1301 rtp_source_get_ssrc (source), buf1);
1304 gchar buf1[40], buf2[40];
1306 /* Current address is not a known conflict, lets assume this is
1307 * a new source. Save old address in possible conflict list
1309 rtp_source_add_conflicting_address (source, from,
1310 arrival->current_time);
1312 gst_netaddress_to_string (from, buf1, 40);
1313 gst_netaddress_to_string (&arrival->address, buf2, 40);
1314 GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1315 " saving old as known conflict",
1316 rtp_source_get_ssrc (source), buf1, buf2);
1319 rtp_source_set_rtp_from (source, &arrival->address);
1321 rtp_source_set_rtcp_from (source, &arrival->address);
1325 /* Don't need to save old addresses, we ignore new sources */
1330 /* We don't already have a from address for RTP, just set it */
1332 rtp_source_set_rtp_from (source, &arrival->address);
1334 rtp_source_set_rtcp_from (source, &arrival->address);
1338 /* FIXME: Log 3rd party collision somehow
1339 * Maybe should be done in upper layer, only the SDES can tell us
1340 * if its a collision or a loop
1343 /* If the source has been inactive for some time, we assume that it has
1344 * simply changed its transport source address. Hence, there is no true
1345 * third-party collision - only a simulated one. */
1346 if (arrival->current_time > source->last_activity) {
1347 GstClockTime inactivity_period =
1348 arrival->current_time - source->last_activity;
1349 if (inactivity_period > 1 * GST_SECOND) {
1350 /* Use new network address */
1352 g_assert (source->have_rtp_from);
1353 rtp_source_set_rtp_from (source, &arrival->address);
1355 g_assert (source->have_rtcp_from);
1356 rtp_source_set_rtcp_from (source, &arrival->address);
1362 /* This is sending with our ssrc, is it an address we already know */
1364 if (rtp_source_find_conflicting_address (source, &arrival->address,
1365 arrival->current_time)) {
1366 /* Its a known conflict, its probably a loop, not a collision
1367 * lets just drop the incoming packet
1369 GST_DEBUG ("Our packets are being looped back to us, dropping");
1371 /* Its a new collision, lets change our SSRC */
1373 rtp_source_add_conflicting_address (source, &arrival->address,
1374 arrival->current_time);
1376 GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
1377 on_ssrc_collision (sess, source);
1379 rtp_session_schedule_bye_locked (sess, "SSRC Collision",
1380 arrival->current_time);
1382 sess->change_ssrc = TRUE;
1390 /* must be called with the session lock, the returned source needs to be
1391 * unreffed after usage. */
1393 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1394 RTPArrivalStats * arrival, gboolean rtp)
1399 g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1400 if (source == NULL) {
1401 /* make new Source in probation and insert */
1402 source = rtp_source_new (ssrc);
1404 /* for RTP packets we need to set the source in probation. Receiving RTCP
1405 * packets of an SSRC, on the other hand, is a strong indication that we
1406 * are dealing with a valid source. */
1408 source->probation = RTP_DEFAULT_PROBATION;
1410 source->probation = 0;
1412 /* store from address, if any */
1413 if (arrival->have_address) {
1415 rtp_source_set_rtp_from (source, &arrival->address);
1417 rtp_source_set_rtcp_from (source, &arrival->address);
1420 /* configure a callback on the source */
1421 rtp_source_set_callbacks (source, &callbacks, sess);
1423 g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1426 /* we have one more source now */
1427 sess->total_sources++;
1431 /* check for collision, this updates the address when not previously set */
1432 if (check_collision (sess, source, arrival, rtp)) {
1436 /* update last activity */
1437 source->last_activity = arrival->current_time;
1439 source->last_rtp_activity = arrival->current_time;
1440 g_object_ref (source);
1446 * rtp_session_get_internal_source:
1447 * @sess: a #RTPSession
1449 * Get the internal #RTPSource of @sess.
1451 * Returns: The internal #RTPSource. g_object_unref() after usage.
1454 rtp_session_get_internal_source (RTPSession * sess)
1458 g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1460 result = g_object_ref (sess->source);
1466 * rtp_session_set_internal_ssrc:
1467 * @sess: a #RTPSession
1470 * Set the SSRC of @sess to @ssrc.
1473 rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
1475 RTP_SESSION_LOCK (sess);
1476 if (ssrc != sess->source->ssrc) {
1477 g_hash_table_steal (sess->ssrcs[sess->mask_idx],
1478 GINT_TO_POINTER (sess->source->ssrc));
1480 GST_DEBUG ("setting internal SSRC to %08x", ssrc);
1481 /* After this call, any receiver of the old SSRC either in RTP or RTCP
1482 * packets will timeout on the old SSRC, we could potentially schedule a
1483 * BYE RTCP for the old SSRC... */
1484 sess->source->ssrc = ssrc;
1485 rtp_source_reset (sess->source);
1487 /* rehash with the new SSRC */
1488 g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1489 GINT_TO_POINTER (sess->source->ssrc), sess->source);
1491 RTP_SESSION_UNLOCK (sess);
1493 g_object_notify (G_OBJECT (sess), "internal-ssrc");
1497 * rtp_session_get_internal_ssrc:
1498 * @sess: a #RTPSession
1500 * Get the internal SSRC of @sess.
1502 * Returns: The SSRC of the session.
1505 rtp_session_get_internal_ssrc (RTPSession * sess)
1509 RTP_SESSION_LOCK (sess);
1510 ssrc = sess->source->ssrc;
1511 RTP_SESSION_UNLOCK (sess);
1517 * rtp_session_add_source:
1518 * @sess: a #RTPSession
1519 * @src: #RTPSource to add
1521 * Add @src to @session.
1523 * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1524 * existed in the session.
1527 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1529 gboolean result = FALSE;
1532 g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1533 g_return_val_if_fail (src != NULL, FALSE);
1535 RTP_SESSION_LOCK (sess);
1537 g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1538 GINT_TO_POINTER (src->ssrc));
1540 g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1541 GINT_TO_POINTER (src->ssrc), src);
1542 /* we have one more source now */
1543 sess->total_sources++;
1546 RTP_SESSION_UNLOCK (sess);
1552 * rtp_session_get_num_sources:
1553 * @sess: an #RTPSession
1555 * Get the number of sources in @sess.
1557 * Returns: The number of sources in @sess.
1560 rtp_session_get_num_sources (RTPSession * sess)
1564 g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1566 RTP_SESSION_LOCK (sess);
1567 result = sess->total_sources;
1568 RTP_SESSION_UNLOCK (sess);
1574 * rtp_session_get_num_active_sources:
1575 * @sess: an #RTPSession
1577 * Get the number of active sources in @sess. A source is considered active when
1578 * it has been validated and has not yet received a BYE RTCP message.
1580 * Returns: The number of active sources in @sess.
1583 rtp_session_get_num_active_sources (RTPSession * sess)
1587 g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1589 RTP_SESSION_LOCK (sess);
1590 result = sess->stats.active_sources;
1591 RTP_SESSION_UNLOCK (sess);
1597 * rtp_session_get_source_by_ssrc:
1598 * @sess: an #RTPSession
1601 * Find the source with @ssrc in @sess.
1603 * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1604 * g_object_unref() after usage.
1607 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1611 g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1613 RTP_SESSION_LOCK (sess);
1615 g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1617 g_object_ref (result);
1618 RTP_SESSION_UNLOCK (sess);
1624 * rtp_session_get_source_by_cname:
1625 * @sess: a #RTPSession
1628 * Find the source with @cname in @sess.
1630 * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
1631 * g_object_unref() after usage.
1634 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
1638 g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1639 g_return_val_if_fail (cname != NULL, NULL);
1641 RTP_SESSION_LOCK (sess);
1642 result = g_hash_table_lookup (sess->cnames, cname);
1644 g_object_ref (result);
1645 RTP_SESSION_UNLOCK (sess);
1650 /* should be called with the SESSION lock */
1652 rtp_session_create_new_ssrc (RTPSession * sess)
1657 ssrc = g_random_int ();
1659 /* see if it exists in the session, we're done if it doesn't */
1660 if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1661 GINT_TO_POINTER (ssrc)) == NULL)
1669 * rtp_session_create_source:
1670 * @sess: an #RTPSession
1672 * Create an #RTPSource for use in @sess. This function will create a source
1673 * with an ssrc that is currently not used by any participants in the session.
1675 * Returns: an #RTPSource.
1678 rtp_session_create_source (RTPSession * sess)
1683 RTP_SESSION_LOCK (sess);
1684 ssrc = rtp_session_create_new_ssrc (sess);
1685 source = rtp_source_new (ssrc);
1686 rtp_source_set_callbacks (source, &callbacks, sess);
1687 /* we need an additional ref for the source in the hashtable */
1688 g_object_ref (source);
1689 g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1691 /* we have one more source now */
1692 sess->total_sources++;
1693 RTP_SESSION_UNLOCK (sess);
1698 /* update the RTPArrivalStats structure with the current time and other bits
1699 * about the current buffer we are handling.
1700 * This function is typically called when a validated packet is received.
1701 * This function should be called with the SESSION_LOCK
1704 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1705 gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1706 GstClockTime running_time, guint64 ntpnstime)
1708 /* get time of arrival */
1709 arrival->current_time = current_time;
1710 arrival->running_time = running_time;
1711 arrival->ntpnstime = ntpnstime;
1713 /* get packet size including header overhead */
1714 arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
1717 arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1719 arrival->payload_len = 0;
1722 /* for netbuffer we can store the IP address to check for collisions */
1723 arrival->have_address = GST_IS_NETBUFFER (buffer);
1724 if (arrival->have_address) {
1725 GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1727 memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1732 * rtp_session_process_rtp:
1733 * @sess: and #RTPSession
1734 * @buffer: an RTP buffer
1735 * @current_time: the current system time
1736 * @running_time: the running_time of @buffer
1738 * Process an RTP buffer in the session manager. This function takes ownership
1741 * Returns: a #GstFlowReturn.
1744 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1745 GstClockTime current_time, GstClockTime running_time)
1747 GstFlowReturn result;
1751 gboolean prevsender, prevactive;
1752 RTPArrivalStats arrival;
1757 g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1758 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1760 if (!gst_rtp_buffer_validate (buffer))
1761 goto invalid_packet;
1763 RTP_SESSION_LOCK (sess);
1764 /* update arrival stats */
1765 update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1768 /* ignore more RTP packets when we left the session */
1769 if (sess->source->received_bye)
1772 /* get SSRC and look up in session database */
1773 ssrc = gst_rtp_buffer_get_ssrc (buffer);
1774 source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1778 prevsender = RTP_SOURCE_IS_SENDER (source);
1779 prevactive = RTP_SOURCE_IS_ACTIVE (source);
1780 oldrate = source->bitrate;
1782 /* copy available csrc for later */
1783 count = gst_rtp_buffer_get_csrc_count (buffer);
1784 /* make sure to not overflow our array. An RTP buffer can maximally contain
1786 count = MIN (count, 16);
1788 for (i = 0; i < count; i++)
1789 csrcs[i] = gst_rtp_buffer_get_csrc (buffer, i);
1791 /* let source process the packet */
1792 result = rtp_source_process_rtp (source, buffer, &arrival);
1794 /* source became active */
1795 if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1796 sess->stats.active_sources++;
1797 GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1798 sess->stats.active_sources);
1799 on_ssrc_validated (sess, source);
1801 if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1802 sess->stats.sender_sources++;
1803 GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1804 sess->stats.sender_sources);
1806 if (oldrate != source->bitrate)
1807 sess->recalc_bandwidth = TRUE;
1810 on_new_ssrc (sess, source);
1812 if (source->validated) {
1815 /* for validated sources, we add the CSRCs as well */
1816 for (i = 0; i < count; i++) {
1818 RTPSource *csrc_src;
1823 csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1828 GST_DEBUG ("created new CSRC: %08x", csrc);
1829 rtp_source_set_as_csrc (csrc_src);
1830 if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1831 sess->stats.active_sources++;
1832 on_new_ssrc (sess, csrc_src);
1834 g_object_unref (csrc_src);
1837 g_object_unref (source);
1839 RTP_SESSION_UNLOCK (sess);
1846 gst_buffer_unref (buffer);
1847 GST_DEBUG ("invalid RTP packet received");
1852 gst_buffer_unref (buffer);
1853 RTP_SESSION_UNLOCK (sess);
1854 GST_DEBUG ("ignoring RTP packet because we are leaving");
1859 gst_buffer_unref (buffer);
1860 RTP_SESSION_UNLOCK (sess);
1861 GST_DEBUG ("ignoring packet because its collisioning");
1867 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1868 GstRTCPPacket * packet, RTPArrivalStats * arrival)
1872 count = gst_rtcp_packet_get_rb_count (packet);
1873 for (i = 0; i < count; i++) {
1874 guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1875 guint8 fractionlost;
1878 gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1879 &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1881 GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1883 if (ssrc == sess->source->ssrc) {
1884 /* only deal with report blocks for our session, we update the stats of
1885 * the sender of the RTCP message. We could also compare our stats against
1886 * the other sender to see if we are better or worse. */
1887 rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
1888 packetslost, exthighestseq, jitter, lsr, dlsr);
1891 on_ssrc_active (sess, source);
1894 /* A Sender report contains statistics about how the sender is doing. This
1895 * includes timing informataion such as the relation between RTP and NTP
1896 * timestamps and the number of packets/bytes it sent to us.
1898 * In this report is also included a set of report blocks related to how this
1899 * sender is receiving data (in case we (or somebody else) is also sending stuff
1900 * to it). This info includes the packet loss, jitter and seqnum. It also
1901 * contains information to calculate the round trip time (LSR/DLSR).
1904 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1905 RTPArrivalStats * arrival, gboolean * do_sync)
1907 guint32 senderssrc, rtptime, packet_count, octet_count;
1910 gboolean created, prevsender;
1912 gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1913 &packet_count, &octet_count);
1915 GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1916 senderssrc, GST_TIME_ARGS (arrival->current_time));
1918 source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1922 /* don't try to do lip-sync for sources that sent a BYE */
1923 if (rtp_source_received_bye (source))
1928 prevsender = RTP_SOURCE_IS_SENDER (source);
1930 /* first update the source */
1931 rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
1932 packet_count, octet_count);
1934 if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1935 sess->stats.sender_sources++;
1936 GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1937 sess->stats.sender_sources);
1941 on_new_ssrc (sess, source);
1943 rtp_session_process_rb (sess, source, packet, arrival);
1944 g_object_unref (source);
1947 /* A receiver report contains statistics about how a receiver is doing. It
1948 * includes stuff like packet loss, jitter and the seqnum it received last. It
1949 * also contains info to calculate the round trip time.
1951 * We are only interested in how the sender of this report is doing wrt to us.
1954 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1955 RTPArrivalStats * arrival)
1961 senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1963 GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1965 source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1970 on_new_ssrc (sess, source);
1972 rtp_session_process_rb (sess, source, packet, arrival);
1973 g_object_unref (source);
1976 /* Get SDES items and store them in the SSRC */
1978 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1979 RTPArrivalStats * arrival)
1982 gboolean more_items, more_entries;
1984 items = gst_rtcp_packet_sdes_get_item_count (packet);
1985 GST_DEBUG ("got SDES packet with %d items", items);
1987 more_items = gst_rtcp_packet_sdes_first_item (packet);
1989 while (more_items) {
1991 gboolean changed, created, validated;
1995 ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1997 GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2001 /* find src, no probation when dealing with RTCP */
2002 source = obtain_source (sess, ssrc, &created, arrival, FALSE);
2006 sdes = gst_structure_new ("application/x-rtp-source-sdes", NULL);
2008 more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2010 while (more_entries) {
2011 GstRTCPSDESType type;
2017 gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2019 GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2022 if (type == GST_RTCP_SDES_PRIV) {
2023 name = g_strndup ((const gchar *) &data[1], data[0]);
2025 data += data[0] + 1;
2027 name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2030 value = g_strndup ((const gchar *) data, len);
2032 gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2037 more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2041 /* takes ownership of sdes */
2042 changed = rtp_source_set_sdes_struct (source, sdes);
2044 validated = !RTP_SOURCE_IS_ACTIVE (source);
2045 source->validated = TRUE;
2047 /* source became active */
2049 sess->stats.active_sources++;
2050 GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
2051 sess->stats.active_sources);
2052 on_ssrc_validated (sess, source);
2056 on_new_ssrc (sess, source);
2058 on_ssrc_sdes (sess, source);
2060 g_object_unref (source);
2062 more_items = gst_rtcp_packet_sdes_next_item (packet);
2067 /* BYE is sent when a client leaves the session
2070 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2071 RTPArrivalStats * arrival)
2075 gboolean reconsider = FALSE;
2077 reason = gst_rtcp_packet_bye_get_reason (packet);
2078 GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2080 count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2081 for (i = 0; i < count; i++) {
2084 gboolean created, prevactive, prevsender;
2085 guint pmembers, members;
2087 ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2088 GST_DEBUG ("SSRC: %08x", ssrc);
2090 if (ssrc == sess->source->ssrc)
2093 /* find src and mark bye, no probation when dealing with RTCP */
2094 source = obtain_source (sess, ssrc, &created, arrival, FALSE);
2098 /* store time for when we need to time out this source */
2099 source->bye_time = arrival->current_time;
2101 prevactive = RTP_SOURCE_IS_ACTIVE (source);
2102 prevsender = RTP_SOURCE_IS_SENDER (source);
2104 /* let the source handle the rest */
2105 rtp_source_process_bye (source, reason);
2107 pmembers = sess->stats.active_sources;
2109 if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
2110 sess->stats.active_sources--;
2111 GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
2112 sess->stats.active_sources);
2114 if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
2115 sess->stats.sender_sources--;
2116 GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
2117 sess->stats.sender_sources);
2119 members = sess->stats.active_sources;
2121 if (!sess->source->received_bye && members < pmembers) {
2122 /* some members went away since the previous timeout estimate.
2123 * Perform reverse reconsideration but only when we are not scheduling a
2125 if (arrival->current_time < sess->next_rtcp_check_time) {
2126 GstClockTime time_remaining;
2128 time_remaining = sess->next_rtcp_check_time - arrival->current_time;
2129 sess->next_rtcp_check_time =
2130 gst_util_uint64_scale (time_remaining, members, pmembers);
2132 GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2133 GST_TIME_ARGS (sess->next_rtcp_check_time));
2135 sess->next_rtcp_check_time += arrival->current_time;
2137 /* mark pending reconsider. We only want to signal the reconsideration
2138 * once after we handled all the source in the bye packet */
2144 on_new_ssrc (sess, source);
2146 on_bye_ssrc (sess, source);
2148 g_object_unref (source);
2151 RTP_SESSION_UNLOCK (sess);
2152 /* notify app of reconsideration */
2153 if (sess->callbacks.reconsider)
2154 sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2155 RTP_SESSION_LOCK (sess);
2161 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2162 RTPArrivalStats * arrival)
2164 GST_DEBUG ("received APP");
2168 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2169 gboolean fir, GstClockTime current_time)
2171 guint32 round_trip = 0;
2173 rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
2175 if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2176 GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2179 if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE &&
2180 current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
2181 GST_DEBUG ("Ignoring %s request because one was send without one "
2182 "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2183 fir ? "FIR" : "PLI",
2184 GST_TIME_ARGS (current_time - sess->last_keyframe_request),
2185 GST_TIME_ARGS (round_trip_in_ns));;
2190 sess->last_keyframe_request = current_time;
2192 GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
2193 rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
2194 sess->callbacks.request_key_unit);
2196 RTP_SESSION_UNLOCK (sess);
2197 sess->callbacks.request_key_unit (sess, fir,
2198 sess->request_key_unit_user_data);
2199 RTP_SESSION_LOCK (sess);
2205 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2206 guint32 media_ssrc, GstClockTime current_time)
2210 if (!sess->callbacks.request_key_unit)
2213 src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
2214 GINT_TO_POINTER (sender_ssrc));
2218 rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
2222 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2223 guint8 * fci_data, guint fci_length, GstClockTime current_time)
2228 gboolean our_request = FALSE;
2230 if (!sess->callbacks.request_key_unit)
2236 src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
2237 GINT_TO_POINTER (sender_ssrc));
2239 /* Hack because Google fails to set the sender_ssrc correctly */
2240 if (!src && sender_ssrc == 1) {
2241 GHashTableIter iter;
2243 if (sess->stats.sender_sources >
2244 RTP_SOURCE_IS_SENDER (sess->source) ? 2 : 1)
2247 g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2249 while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2250 if (src != sess->source && rtp_source_is_sender (src))
2259 for (position = 0; position < fci_length; position += 8) {
2260 guint8 *data = fci_data + position;
2262 ssrc = GST_READ_UINT32_BE (data);
2264 if (ssrc == rtp_source_get_ssrc (sess->source)) {
2272 rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
2276 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2277 RTPArrivalStats * arrival, GstClockTime current_time)
2279 GstRTCPType type = gst_rtcp_packet_get_type (packet);
2280 GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
2281 guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2282 guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2283 guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
2284 guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
2286 GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2287 "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2289 if (g_signal_has_handler_pending (sess,
2290 rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2291 GstBuffer *fci_buffer = NULL;
2293 if (fci_length > 0) {
2294 fci_buffer = gst_buffer_create_sub (packet->buffer,
2295 fci_data - GST_BUFFER_DATA (packet->buffer), fci_length);
2296 GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
2299 RTP_SESSION_UNLOCK (sess);
2300 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2301 type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2302 RTP_SESSION_LOCK (sess);
2305 gst_buffer_unref (fci_buffer);
2308 if (sess->rtcp_feedback_retention_window) {
2309 RTPSource *src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
2310 GINT_TO_POINTER (media_ssrc));
2313 rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
2316 if (rtp_source_get_ssrc (sess->source) == media_ssrc ||
2317 /* PSFB FIR puts the media ssrc inside the FCI */
2318 (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
2320 case GST_RTCP_TYPE_PSFB:
2322 case GST_RTCP_PSFB_TYPE_PLI:
2323 rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2326 case GST_RTCP_PSFB_TYPE_FIR:
2327 rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
2334 case GST_RTCP_TYPE_RTPFB:
2342 * rtp_session_process_rtcp:
2343 * @sess: and #RTPSession
2344 * @buffer: an RTCP buffer
2345 * @current_time: the current system time
2346 * @ntpnstime: the current NTP time in nanoseconds
2348 * Process an RTCP buffer in the session manager. This function takes ownership
2351 * Returns: a #GstFlowReturn.
2354 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2355 GstClockTime current_time, guint64 ntpnstime)
2357 GstRTCPPacket packet;
2358 gboolean more, is_bye = FALSE, do_sync = FALSE;
2359 RTPArrivalStats arrival;
2360 GstFlowReturn result = GST_FLOW_OK;
2362 g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2363 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2365 if (!gst_rtcp_buffer_validate (buffer))
2366 goto invalid_packet;
2368 GST_DEBUG ("received RTCP packet");
2370 RTP_SESSION_LOCK (sess);
2371 /* update arrival stats */
2372 update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
2378 /* start processing the compound packet */
2379 more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
2383 type = gst_rtcp_packet_get_type (&packet);
2385 /* when we are leaving the session, we should ignore all non-BYE messages */
2386 if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
2387 GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
2392 case GST_RTCP_TYPE_SR:
2393 rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
2395 case GST_RTCP_TYPE_RR:
2396 rtp_session_process_rr (sess, &packet, &arrival);
2398 case GST_RTCP_TYPE_SDES:
2399 rtp_session_process_sdes (sess, &packet, &arrival);
2401 case GST_RTCP_TYPE_BYE:
2403 /* don't try to attempt lip-sync anymore for streams with a BYE */
2405 rtp_session_process_bye (sess, &packet, &arrival);
2407 case GST_RTCP_TYPE_APP:
2408 rtp_session_process_app (sess, &packet, &arrival);
2410 case GST_RTCP_TYPE_RTPFB:
2411 case GST_RTCP_TYPE_PSFB:
2412 rtp_session_process_feedback (sess, &packet, &arrival, current_time);
2415 GST_WARNING ("got unknown RTCP packet");
2419 more = gst_rtcp_packet_move_to_next (&packet);
2422 /* if we are scheduling a BYE, we only want to count bye packets, else we
2423 * count everything */
2424 if (sess->source->received_bye) {
2426 sess->stats.bye_members++;
2427 UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2430 /* keep track of average packet size */
2431 UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2433 GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2434 sess->stats.avg_rtcp_packet_size, arrival.bytes);
2435 RTP_SESSION_UNLOCK (sess);
2437 /* notify caller of sr packets in the callback */
2438 if (do_sync && sess->callbacks.sync_rtcp) {
2439 /* make writable, we might want to change the buffer */
2440 buffer = gst_buffer_make_metadata_writable (buffer);
2442 result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
2443 sess->sync_rtcp_user_data);
2445 gst_buffer_unref (buffer);
2452 GST_DEBUG ("invalid RTCP packet received");
2453 gst_buffer_unref (buffer);
2458 gst_buffer_unref (buffer);
2459 RTP_SESSION_UNLOCK (sess);
2460 GST_DEBUG ("ignoring RTP packet because we left");
2466 * rtp_session_send_rtp:
2467 * @sess: an #RTPSession
2468 * @data: pointer to either an RTP buffer or a list of RTP buffers
2469 * @is_list: TRUE when @data is a buffer list
2470 * @current_time: the current system time
2471 * @running_time: the running time of @data
2473 * Send the RTP buffer in the session manager. This function takes ownership of
2476 * Returns: a #GstFlowReturn.
2479 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2480 GstClockTime current_time, GstClockTime running_time)
2482 GstFlowReturn result;
2484 gboolean prevsender;
2485 gboolean valid_packet;
2488 g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2489 g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2492 valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data));
2494 valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data));
2498 goto invalid_packet;
2500 GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2502 RTP_SESSION_LOCK (sess);
2503 source = sess->source;
2505 /* update last activity */
2506 source->last_rtp_activity = current_time;
2508 prevsender = RTP_SOURCE_IS_SENDER (source);
2509 oldrate = source->bitrate;
2511 /* we use our own source to send */
2512 result = rtp_source_send_rtp (source, data, is_list, running_time);
2514 if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
2515 sess->stats.sender_sources++;
2516 if (oldrate != source->bitrate)
2517 sess->recalc_bandwidth = TRUE;
2518 RTP_SESSION_UNLOCK (sess);
2525 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2526 GST_DEBUG ("invalid RTP packet received");
2532 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2534 *bandwidth += source->bitrate;
2538 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2541 GstClockTime result;
2543 /* recalculate bandwidth when it changed */
2544 if (sess->recalc_bandwidth) {
2547 if (sess->bandwidth > 0)
2548 bandwidth = sess->bandwidth;
2550 /* If it is <= 0, then try to estimate the actual bandwidth */
2551 bandwidth = sess->source->bitrate;
2553 g_hash_table_foreach (sess->cnames, (GHFunc) add_bitrates, &bandwidth);
2556 if (bandwidth < 8000)
2557 bandwidth = RTP_STATS_BANDWIDTH;
2559 rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2560 sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2562 sess->recalc_bandwidth = FALSE;
2565 if (sess->source->received_bye) {
2566 result = rtp_stats_calculate_bye_interval (&sess->stats);
2568 result = rtp_stats_calculate_rtcp_interval (&sess->stats,
2569 RTP_SOURCE_IS_SENDER (sess->source), first);
2572 GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2573 GST_TIME_ARGS (result), first);
2575 if (!deterministic && result != GST_CLOCK_TIME_NONE)
2576 result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
2578 GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2583 /* Stop the current @sess and schedule a BYE message for the other members.
2584 * One must have the session lock to call this function
2586 static GstFlowReturn
2587 rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason,
2588 GstClockTime current_time)
2590 GstFlowReturn result = GST_FLOW_OK;
2592 GstClockTime interval;
2594 g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2596 source = sess->source;
2598 /* ignore more BYEs */
2599 if (source->received_bye)
2602 /* we have BYE now */
2603 source->received_bye = TRUE;
2604 /* at least one member wants to send a BYE */
2605 g_free (sess->bye_reason);
2606 sess->bye_reason = g_strdup (reason);
2607 INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
2608 sess->stats.bye_members = 1;
2609 sess->first_rtcp = TRUE;
2610 sess->sent_bye = FALSE;
2611 sess->allow_early = TRUE;
2613 /* reschedule transmission */
2614 sess->last_rtcp_send_time = current_time;
2615 interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2616 sess->next_rtcp_check_time = current_time + interval;
2618 GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2619 GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2621 RTP_SESSION_UNLOCK (sess);
2622 /* notify app of reconsideration */
2623 if (sess->callbacks.reconsider)
2624 sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2625 RTP_SESSION_LOCK (sess);
2632 * rtp_session_schedule_bye:
2633 * @sess: an #RTPSession
2634 * @reason: a reason or NULL
2635 * @current_time: the current system time
2637 * Stop the current @sess and schedule a BYE message for the other members.
2639 * Returns: a #GstFlowReturn.
2642 rtp_session_schedule_bye (RTPSession * sess, const gchar * reason,
2643 GstClockTime current_time)
2645 GstFlowReturn result = GST_FLOW_OK;
2647 g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2649 RTP_SESSION_LOCK (sess);
2650 result = rtp_session_schedule_bye_locked (sess, reason, current_time);
2651 RTP_SESSION_UNLOCK (sess);
2657 * rtp_session_next_timeout:
2658 * @sess: an #RTPSession
2659 * @current_time: the current system time
2661 * Get the next time we should perform session maintenance tasks.
2663 * Returns: a time when rtp_session_on_timeout() should be called with the
2664 * current system time.
2667 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2669 GstClockTime result, interval = 0;
2671 g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2673 RTP_SESSION_LOCK (sess);
2675 if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
2676 result = sess->next_early_rtcp_time;
2680 result = sess->next_rtcp_check_time;
2682 GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
2683 GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2685 if (result < current_time) {
2686 GST_DEBUG ("take current time as base");
2687 /* our previous check time expired, start counting from the current time
2689 result = current_time;
2692 if (sess->source->received_bye) {
2693 if (sess->sent_bye) {
2694 GST_DEBUG ("we sent BYE already");
2695 interval = GST_CLOCK_TIME_NONE;
2696 } else if (sess->stats.active_sources >= 50) {
2697 GST_DEBUG ("reconsider BYE, more than 50 sources");
2698 /* reconsider BYE if members >= 50 */
2699 interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2702 if (sess->first_rtcp) {
2703 GST_DEBUG ("first RTCP packet");
2704 /* we are called for the first time */
2705 interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2706 } else if (sess->next_rtcp_check_time < current_time) {
2707 GST_DEBUG ("old check time expired, getting new timeout");
2708 /* get a new timeout when we need to */
2709 interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2713 if (interval != GST_CLOCK_TIME_NONE)
2716 result = GST_CLOCK_TIME_NONE;
2718 sess->next_rtcp_check_time = result;
2722 GST_DEBUG ("current time: %" GST_TIME_FORMAT
2723 ", next time: %" GST_TIME_FORMAT,
2724 GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2725 RTP_SESSION_UNLOCK (sess);
2734 GstClockTime current_time;
2736 GstClockTime running_time;
2737 GstClockTime interval;
2738 GstRTCPPacket packet;
2742 gboolean may_suppress;
2746 session_start_rtcp (RTPSession * sess, ReportData * data)
2748 GstRTCPPacket *packet = &data->packet;
2749 RTPSource *own = sess->source;
2751 data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2753 if (RTP_SOURCE_IS_SENDER (own)) {
2756 guint32 packet_count, octet_count;
2758 /* we are a sender, create SR */
2759 GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2760 gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
2762 /* get latest stats */
2763 rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
2764 &ntptime, &rtptime, &packet_count, &octet_count);
2766 rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2767 packet_count, octet_count);
2769 /* fill in sender report info */
2770 gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2771 ntptime, rtptime, packet_count, octet_count);
2773 /* we are only receiver, create RR */
2774 GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2775 gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
2776 gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2780 /* construct a Sender or Receiver Report */
2782 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2784 RTPSession *sess = data->sess;
2785 GstRTCPPacket *packet = &data->packet;
2787 /* create a new buffer if needed */
2788 if (data->rtcp == NULL) {
2789 session_start_rtcp (sess, data);
2790 } else if (data->is_early) {
2791 /* Put a single RR or SR in minimal compound packets */
2794 if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
2795 /* only report about other sender sources */
2796 if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
2797 guint8 fractionlost;
2799 guint32 exthighestseq, jitter;
2803 rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2804 &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2806 /* store last generated RR packet */
2807 source->last_rr.is_valid = TRUE;
2808 source->last_rr.fractionlost = fractionlost;
2809 source->last_rr.packetslost = packetslost;
2810 source->last_rr.exthighestseq = exthighestseq;
2811 source->last_rr.jitter = jitter;
2812 source->last_rr.lsr = lsr;
2813 source->last_rr.dlsr = dlsr;
2815 /* packet is not yet filled, add report block for this source. */
2816 gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2817 exthighestseq, jitter, lsr, dlsr);
2822 /* perform cleanup of sources that timed out */
2824 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2826 gboolean remove = FALSE;
2827 gboolean byetimeout = FALSE;
2828 gboolean sendertimeout = FALSE;
2829 gboolean is_sender, is_active;
2830 RTPSession *sess = data->sess;
2831 GstClockTime interval, binterval;
2834 is_sender = RTP_SOURCE_IS_SENDER (source);
2835 is_active = RTP_SOURCE_IS_ACTIVE (source);
2837 /* our own rtcp interval may have been forced low by secondary configuration,
2838 * while sender side may still operate with higher interval,
2839 * so do not just take our interval to decide on timing out sender,
2840 * but take (if data->interval <= 5 * GST_SECOND):
2841 * interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
2842 * where sender_interval is difference between last 2 received RTCP reports
2844 if (data->interval >= 5 * GST_SECOND || (source == sess->source)) {
2845 binterval = data->interval;
2847 GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
2848 GST_TIME_ARGS (source->stats.prev_rtcptime),
2849 GST_TIME_ARGS (source->stats.last_rtcptime));
2850 /* if not received enough yet, fallback to larger default */
2851 if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
2852 binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
2854 binterval = 5 * GST_SECOND;
2855 binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
2857 GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
2858 GST_TIME_ARGS (binterval));
2860 /* check for our own source, we don't want to delete our own source. */
2861 if (!(source == sess->source)) {
2862 if (source->received_bye) {
2863 /* if we received a BYE from the source, remove the source after some
2865 if (data->current_time > source->bye_time &&
2866 data->current_time - source->bye_time > sess->stats.bye_timeout) {
2867 GST_DEBUG ("removing BYE source %08x", source->ssrc);
2872 /* sources that were inactive for more than 5 times the deterministic reporting
2873 * interval get timed out. the min timeout is 5 seconds. */
2874 /* mind old time that might pre-date last time going to PLAYING */
2875 btime = MAX (source->last_activity, sess->start_time);
2876 if (data->current_time > btime) {
2877 interval = MAX (binterval * 5, 5 * GST_SECOND);
2878 if (data->current_time - btime > interval) {
2879 GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2880 source->ssrc, GST_TIME_ARGS (btime));
2886 /* senders that did not send for a long time become a receiver, this also
2887 * holds for our own source. */
2889 /* mind old time that might pre-date last time going to PLAYING */
2890 btime = MAX (source->last_rtp_activity, sess->start_time);
2891 if (data->current_time > btime) {
2892 interval = MAX (binterval * 2, 5 * GST_SECOND);
2893 if (data->current_time - btime > interval) {
2894 GST_DEBUG ("sender source %08x timed out and became receiver, last %"
2895 GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
2896 source->is_sender = FALSE;
2897 sess->stats.sender_sources--;
2898 sendertimeout = TRUE;
2904 sess->total_sources--;
2906 sess->stats.sender_sources--;
2908 sess->stats.active_sources--;
2911 on_bye_timeout (sess, source);
2913 on_timeout (sess, source);
2916 on_sender_timeout (sess, source);
2919 source->closing = remove;
2923 session_sdes (RTPSession * sess, ReportData * data)
2925 GstRTCPPacket *packet = &data->packet;
2926 const GstStructure *sdes;
2929 /* add SDES packet */
2930 gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
2932 gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
2934 sdes = rtp_source_get_sdes_struct (sess->source);
2936 /* add all fields in the structure, the order is not important. */
2937 n_fields = gst_structure_n_fields (sdes);
2938 for (i = 0; i < n_fields; ++i) {
2941 GstRTCPSDESType type;
2943 field = gst_structure_nth_field_name (sdes, i);
2946 value = gst_structure_get_string (sdes, field);
2949 type = gst_rtcp_sdes_name_to_type (field);
2951 /* Early packets are minimal and only include the CNAME */
2952 if (data->is_early && type != GST_RTCP_SDES_CNAME)
2955 if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
2956 gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
2957 (const guint8 *) value);
2958 } else if (type == GST_RTCP_SDES_PRIV) {
2964 /* don't accept entries that are too big */
2965 prefix_len = strlen (field);
2966 if (prefix_len > 255)
2968 value_len = strlen (value);
2969 if (value_len > 255)
2971 data_len = 1 + prefix_len + value_len;
2975 data[0] = prefix_len;
2976 memcpy (&data[1], field, prefix_len);
2977 memcpy (&data[1 + prefix_len], value, value_len);
2979 gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
2983 data->has_sdes = TRUE;
2986 /* schedule a BYE packet */
2988 session_bye (RTPSession * sess, ReportData * data)
2990 GstRTCPPacket *packet = &data->packet;
2993 session_start_rtcp (sess, data);
2996 session_sdes (sess, data);
2998 /* add a BYE packet */
2999 gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
3000 gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
3001 if (sess->bye_reason)
3002 gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
3004 /* we have a BYE packet now */
3005 data->is_bye = TRUE;
3009 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
3011 GstClockTime new_send_time, elapsed;
3013 if (data->is_early && sess->next_early_rtcp_time < current_time)
3016 /* no need to check yet */
3017 if (sess->next_rtcp_check_time > current_time) {
3018 GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
3019 GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
3020 GST_TIME_ARGS (current_time));
3024 /* get elapsed time since we last reported */
3025 elapsed = current_time - sess->last_rtcp_send_time;
3027 /* perform forward reconsideration */
3028 new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
3030 GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
3031 GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
3033 new_send_time += sess->last_rtcp_send_time;
3035 /* check if reconsideration */
3036 if (current_time < new_send_time) {
3037 GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
3038 GST_TIME_ARGS (new_send_time));
3039 /* store new check time */
3040 sess->next_rtcp_check_time = new_send_time;
3046 new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
3048 GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
3049 GST_TIME_ARGS (new_send_time));
3050 sess->next_rtcp_check_time = current_time + new_send_time;
3052 /* Apply the rules from RFC 4585 section 3.5.3 */
3053 if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
3054 GstClockTimeDiff T_rr_current_interval = g_random_double_range (0.5, 1.5) *
3055 sess->stats.min_interval;
3057 /* This will caused the RTCP to be suppressed if no FB packets are added */
3058 if (sess->last_rtcp_send_time + T_rr_current_interval >
3059 sess->next_rtcp_check_time) {
3060 GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
3061 " last: %" GST_TIME_FORMAT
3062 " + T_rr_current_interval: %" GST_TIME_FORMAT
3063 " > sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
3064 GST_TIME_ARGS (sess->stats.min_interval),
3065 GST_TIME_ARGS (sess->last_rtcp_send_time),
3066 GST_TIME_ARGS (T_rr_current_interval),
3067 GST_TIME_ARGS (sess->next_rtcp_check_time));
3068 data->may_suppress = TRUE;
3076 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
3078 g_hash_table_insert (hash_table, key, g_object_ref (source));
3082 remove_closing_sources (const gchar * key, RTPSource * source, gpointer * data)
3084 return source->closing;
3088 * rtp_session_on_timeout:
3089 * @sess: an #RTPSession
3090 * @current_time: the current system time
3091 * @ntpnstime: the current NTP time in nanoseconds
3092 * @running_time: the current running_time of the pipeline
3094 * Perform maintenance actions after the timeout obtained with
3095 * rtp_session_next_timeout() expired.
3097 * This function will perform timeouts of receivers and senders, send a BYE
3098 * packet or generate RTCP packets with current session stats.
3100 * This function can call the #RTPSessionSendRTCP callback, possibly multiple
3101 * times, for each packet that should be processed.
3103 * Returns: a #GstFlowReturn.
3106 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
3107 guint64 ntpnstime, GstClockTime running_time)
3109 GstFlowReturn result = GST_FLOW_OK;
3112 GHashTable *table_copy;
3113 gboolean notify = FALSE;
3115 g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3117 GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
3118 GST_TIME_ARGS (current_time), GST_TIME_ARGS (ntpnstime));
3122 data.current_time = current_time;
3123 data.ntpnstime = ntpnstime;
3124 data.is_bye = FALSE;
3125 data.has_sdes = FALSE;
3126 data.may_suppress = FALSE;
3127 data.running_time = running_time;
3131 RTP_SESSION_LOCK (sess);
3132 /* get a new interval, we need this for various cleanups etc */
3133 data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
3135 /* Make a local copy of the hashtable. We need to do this because the
3136 * cleanup stage below releases the session lock. */
3137 table_copy = g_hash_table_new_full (NULL, NULL, NULL,
3138 (GDestroyNotify) g_object_unref);
3139 g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3140 (GHFunc) clone_ssrcs_hashtable, table_copy);
3142 /* Clean up the session, mark the source for removing, this might release the
3144 g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
3145 g_hash_table_destroy (table_copy);
3147 /* Now remove the marked sources */
3148 g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
3149 (GHRFunc) remove_closing_sources, NULL);
3151 if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3152 data.is_early = TRUE;
3154 data.is_early = FALSE;
3156 /* see if we need to generate SR or RR packets */
3157 if (is_rtcp_time (sess, current_time, &data)) {
3158 if (own->received_bye) {
3159 /* generate BYE instead */
3160 GST_DEBUG ("generating BYE message");
3161 session_bye (sess, &data);
3162 sess->sent_bye = TRUE;
3164 /* loop over all known sources and do something */
3165 g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3166 (GHFunc) session_report_blocks, &data);
3171 /* we keep track of the last report time in order to timeout inactive
3172 * receivers or senders */
3173 if (!data.is_early && !data.may_suppress)
3174 sess->last_rtcp_send_time = data.current_time;
3175 sess->first_rtcp = FALSE;
3176 sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
3178 /* add SDES for this source when not already added */
3180 session_sdes (sess, &data);
3183 /* check for outdated collisions */
3184 GST_DEBUG ("Timing out collisions");
3185 rtp_source_timeout (sess->source, current_time,
3186 /* "a relatively long time" -- RFC 3550 section 8.2 */
3187 RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
3188 running_time - sess->rtcp_feedback_retention_window);
3190 if (sess->change_ssrc) {
3191 GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
3192 g_hash_table_steal (sess->ssrcs[sess->mask_idx],
3193 GINT_TO_POINTER (own->ssrc));
3195 own->ssrc = rtp_session_create_new_ssrc (sess);
3196 rtp_source_reset (own);
3198 g_hash_table_insert (sess->ssrcs[sess->mask_idx],
3199 GINT_TO_POINTER (own->ssrc), own);
3201 g_free (sess->bye_reason);
3202 sess->bye_reason = NULL;
3203 sess->sent_bye = FALSE;
3204 sess->change_ssrc = FALSE;
3206 GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
3209 sess->allow_early = TRUE;
3211 RTP_SESSION_UNLOCK (sess);
3214 g_object_notify (G_OBJECT (sess), "internal-ssrc");
3216 /* push out the RTCP packet */
3218 gboolean do_not_suppress;
3220 /* Give the user a change to add its own packet */
3221 g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
3222 data.rtcp, data.is_early, &do_not_suppress);
3224 if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
3227 /* close the RTCP packet */
3228 gst_rtcp_buffer_end (data.rtcp);
3230 packet_size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
3232 UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
3233 GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
3234 sess->stats.avg_rtcp_packet_size, packet_size);
3236 sess->callbacks.send_rtcp (sess, own, data.rtcp, sess->sent_bye,
3237 sess->send_rtcp_user_data);
3239 GST_DEBUG ("freeing packet callback: %p"
3240 " do_not_suppress: %d may_suppress: %d",
3241 sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
3242 gst_buffer_unref (data.rtcp);
3250 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
3251 GstClockTimeDiff max_delay)
3253 GstClockTime T_dither_max;
3255 /* Implements the algorithm described in RFC 4585 section 3.5.2 */
3257 RTP_SESSION_LOCK (sess);
3259 /* Check if already requested */
3260 /* RFC 4585 section 3.5.2 step 2 */
3261 if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3264 /* Ignore the request a scheduled packet will be in time anyway */
3265 if (current_time + max_delay > sess->next_rtcp_check_time)
3268 /* RFC 4585 section 3.5.2 step 2b */
3269 /* If the total sources is <=2, then there is only us and one peer */
3270 if (sess->total_sources <= 2) {
3273 /* Divide by 2 because l = 0.5 */
3274 T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
3278 /* RFC 4585 section 3.5.2 step 3 */
3279 if (current_time + T_dither_max > sess->next_rtcp_check_time)
3282 /* RFC 4585 section 3.5.2 step 4
3283 * Don't send if allow_early is FALSE, but not if we are in
3284 * immediate mode, meaning we are part of a group of at most the
3285 * application-specific threshold.
3287 if (sess->total_sources > sess->rtcp_immediate_feedback_threshold &&
3288 sess->allow_early == FALSE)
3292 /* Schedule an early transmission later */
3293 sess->next_early_rtcp_time = g_random_double () * T_dither_max +
3296 /* If no dithering, schedule it for NOW */
3297 sess->next_early_rtcp_time = current_time;
3300 RTP_SESSION_UNLOCK (sess);
3302 /* notify app of need to send packet early
3303 * and therefore of timeout change */
3304 if (sess->callbacks.reconsider)
3305 sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3311 RTP_SESSION_UNLOCK (sess);
3315 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, GstClockTime now,
3316 gboolean fir, gint count)
3318 RTPSource *src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
3319 GUINT_TO_POINTER (ssrc));
3325 src->send_pli = FALSE;
3326 src->send_fir = TRUE;
3328 if (count == -1 || count != src->last_fir_count)
3329 src->current_send_fir_seqnum++;
3330 src->last_fir_count = count;
3331 } else if (!src->send_fir) {
3332 src->send_pli = TRUE;
3335 rtp_session_request_early_rtcp (sess, now, 200 * GST_MSECOND);
3341 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3343 GstRTCPPacket packet;
3345 packet.buffer = (GstBuffer *) a;
3348 if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3349 gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3356 rtp_session_on_sending_rtcp (RTPSession * sess, GstBuffer * buffer,
3359 gboolean ret = FALSE;
3360 GHashTableIter iter;
3361 gpointer key, value;
3362 gboolean started_fir = FALSE;
3363 GstRTCPPacket fir_rtcppacket;
3365 RTP_SESSION_LOCK (sess);
3367 g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
3368 while (g_hash_table_iter_next (&iter, &key, &value)) {
3369 guint media_ssrc = GPOINTER_TO_UINT (key);
3370 RTPSource *media_src = value;
3373 if (media_src->send_fir) {
3375 if (!gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_PSFB,
3378 gst_rtcp_packet_fb_set_type (&fir_rtcppacket, GST_RTCP_PSFB_TYPE_FIR);
3379 gst_rtcp_packet_fb_set_sender_ssrc (&fir_rtcppacket,
3380 rtp_source_get_ssrc (sess->source));
3381 gst_rtcp_packet_fb_set_media_ssrc (&fir_rtcppacket, 0);
3383 if (!gst_rtcp_packet_fb_set_fci_length (&fir_rtcppacket, 2)) {
3384 gst_rtcp_packet_remove (&fir_rtcppacket);
3390 if (!gst_rtcp_packet_fb_set_fci_length (&fir_rtcppacket,
3391 !gst_rtcp_packet_fb_get_fci_length (&fir_rtcppacket) + 2))
3395 fci_data = gst_rtcp_packet_fb_get_fci (&fir_rtcppacket) -
3396 ((gst_rtcp_packet_fb_get_fci_length (&fir_rtcppacket) - 2) * 4);
3398 GST_WRITE_UINT32_BE (fci_data, media_ssrc);
3400 fci_data[0] = media_src->current_send_fir_seqnum;
3401 fci_data[1] = fci_data[2] = fci_data[3] = 0;
3402 media_src->send_fir = FALSE;
3406 g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
3407 while (g_hash_table_iter_next (&iter, &key, &value)) {
3408 guint media_ssrc = GPOINTER_TO_UINT (key);
3409 RTPSource *media_src = value;
3410 GstRTCPPacket pli_rtcppacket;
3412 if (media_src->send_pli && !rtp_source_has_retained (media_src,
3413 has_pli_compare_func, NULL)) {
3414 if (gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_PSFB,
3416 gst_rtcp_packet_fb_set_type (&pli_rtcppacket, GST_RTCP_PSFB_TYPE_PLI);
3417 gst_rtcp_packet_fb_set_sender_ssrc (&pli_rtcppacket,
3418 rtp_source_get_ssrc (sess->source));
3419 gst_rtcp_packet_fb_set_media_ssrc (&pli_rtcppacket, media_ssrc);
3422 /* Break because the packet is full, will put next request in a
3428 media_src->send_pli = FALSE;
3431 RTP_SESSION_UNLOCK (sess);
3437 rtp_session_send_rtcp (RTPSession * sess, GstClockTimeDiff max_delay)
3441 if (!sess->callbacks.send_rtcp)
3444 now = sess->callbacks.request_time (sess, sess->request_time_user_data);
3446 rtp_session_request_early_rtcp (sess, now, max_delay);