rtpsession: fix compilation
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "gstrtpbin-marshal.h"
27 #include "rtpsession.h"
28
29 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
30 #define GST_CAT_DEFAULT rtp_session_debug
31
32 /* signals and args */
33 enum
34 {
35   SIGNAL_GET_SOURCE_BY_SSRC,
36   SIGNAL_ON_NEW_SSRC,
37   SIGNAL_ON_SSRC_COLLISION,
38   SIGNAL_ON_SSRC_VALIDATED,
39   SIGNAL_ON_SSRC_ACTIVE,
40   SIGNAL_ON_SSRC_SDES,
41   SIGNAL_ON_BYE_SSRC,
42   SIGNAL_ON_BYE_TIMEOUT,
43   SIGNAL_ON_TIMEOUT,
44   SIGNAL_ON_SENDER_TIMEOUT,
45   SIGNAL_ON_SENDING_RTCP,
46   LAST_SIGNAL
47 };
48
49 #define DEFAULT_INTERNAL_SOURCE      NULL
50 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
51 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
52 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
53 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
54 #define DEFAULT_RTCP_MTU             1400
55 #define DEFAULT_SDES                 NULL
56 #define DEFAULT_NUM_SOURCES          0
57 #define DEFAULT_NUM_ACTIVE_SOURCES   0
58 #define DEFAULT_SOURCES              NULL
59 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
60
61 enum
62 {
63   PROP_0,
64   PROP_INTERNAL_SSRC,
65   PROP_INTERNAL_SOURCE,
66   PROP_BANDWIDTH,
67   PROP_RTCP_FRACTION,
68   PROP_RTCP_RR_BANDWIDTH,
69   PROP_RTCP_RS_BANDWIDTH,
70   PROP_RTCP_MTU,
71   PROP_SDES,
72   PROP_NUM_SOURCES,
73   PROP_NUM_ACTIVE_SOURCES,
74   PROP_SOURCES,
75   PROP_FAVOR_NEW,
76   PROP_RTCP_MIN_INTERVAL,
77   PROP_LAST
78 };
79
80 /* update average packet size */
81 #define INIT_AVG(avg, val) \
82    (avg) = (val);
83 #define UPDATE_AVG(avg, val)            \
84   if ((avg) == 0)                       \
85    (avg) = (val);                       \
86   else                                  \
87    (avg) = ((val) + (15 * (avg))) >> 4;
88
89
90 /* The number RTCP intervals after which to timeout entries in the
91  * collision table
92  */
93 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
94
95 /* GObject vmethods */
96 static void rtp_session_finalize (GObject * object);
97 static void rtp_session_set_property (GObject * object, guint prop_id,
98     const GValue * value, GParamSpec * pspec);
99 static void rtp_session_get_property (GObject * object, guint prop_id,
100     GValue * value, GParamSpec * pspec);
101
102 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
103
104 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
105
106 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
107     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
108 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
109     const gchar * reason, GstClockTime current_time);
110 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
111     gboolean deterministic, gboolean first);
112
113 static gboolean
114 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
115     const GValue * handler_return, gpointer data)
116 {
117   if (g_value_get_boolean (handler_return))
118     g_value_set_boolean (return_accu, TRUE);
119
120   return TRUE;
121 }
122
123 static void
124 rtp_session_class_init (RTPSessionClass * klass)
125 {
126   GObjectClass *gobject_class;
127
128   gobject_class = (GObjectClass *) klass;
129
130   gobject_class->finalize = rtp_session_finalize;
131   gobject_class->set_property = rtp_session_set_property;
132   gobject_class->get_property = rtp_session_get_property;
133
134   /**
135    * RTPSession::get-source-by-ssrc:
136    * @session: the object which received the signal
137    * @ssrc: the SSRC of the RTPSource
138    *
139    * Request the #RTPSource object with SSRC @ssrc in @session.
140    */
141   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
142       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
143       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
144           get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
145       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
146
147   /**
148    * RTPSession::on-new-ssrc:
149    * @session: the object which received the signal
150    * @src: the new RTPSource
151    *
152    * Notify of a new SSRC that entered @session.
153    */
154   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
155       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
156       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
157       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
158       RTP_TYPE_SOURCE);
159   /**
160    * RTPSession::on-ssrc-collision:
161    * @session: the object which received the signal
162    * @src: the #RTPSource that caused a collision
163    *
164    * Notify when we have an SSRC collision
165    */
166   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
167       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
168       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
169       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
170       RTP_TYPE_SOURCE);
171   /**
172    * RTPSession::on-ssrc-validated:
173    * @session: the object which received the signal
174    * @src: the new validated RTPSource
175    *
176    * Notify of a new SSRC that became validated.
177    */
178   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
179       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
180       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
181       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
182       RTP_TYPE_SOURCE);
183   /**
184    * RTPSession::on-ssrc-active:
185    * @session: the object which received the signal
186    * @src: the active RTPSource
187    *
188    * Notify of a SSRC that is active, i.e., sending RTCP.
189    */
190   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
191       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
192       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
193       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
194       RTP_TYPE_SOURCE);
195   /**
196    * RTPSession::on-ssrc-sdes:
197    * @session: the object which received the signal
198    * @src: the RTPSource
199    *
200    * Notify that a new SDES was received for SSRC.
201    */
202   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
203       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
204       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
205       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
206       RTP_TYPE_SOURCE);
207   /**
208    * RTPSession::on-bye-ssrc:
209    * @session: the object which received the signal
210    * @src: the RTPSource that went away
211    *
212    * Notify of an SSRC that became inactive because of a BYE packet.
213    */
214   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
215       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
216       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
217       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
218       RTP_TYPE_SOURCE);
219   /**
220    * RTPSession::on-bye-timeout:
221    * @session: the object which received the signal
222    * @src: the RTPSource that timed out
223    *
224    * Notify of an SSRC that has timed out because of BYE
225    */
226   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
227       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
228       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
229       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
230       RTP_TYPE_SOURCE);
231   /**
232    * RTPSession::on-timeout:
233    * @session: the object which received the signal
234    * @src: the RTPSource that timed out
235    *
236    * Notify of an SSRC that has timed out
237    */
238   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
239       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
241       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
242       RTP_TYPE_SOURCE);
243   /**
244    * RTPSession::on-sender-timeout:
245    * @session: the object which received the signal
246    * @src: the RTPSource that timed out
247    *
248    * Notify of an SSRC that was a sender but timed out and became a receiver.
249    */
250   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
251       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
252       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
253       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
254       RTP_TYPE_SOURCE);
255
256   /**
257    * RTPSession::on-sending-rtcp
258    * @session: the object which received the signal
259    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
260    * @early: %TRUE if the packet is early, %FALSE if it is regular
261    *
262    * This signal is emitted before sending an RTCP packet, it can be used
263    * to add extra RTCP Packets.
264    *
265    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
266    * if suppressing it is acceptable
267    */
268   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
269       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
270       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
271       accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__POINTER_BOOLEAN,
272       G_TYPE_BOOLEAN, 2, G_TYPE_POINTER, G_TYPE_BOOLEAN);
273
274   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
275       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
276           "The internal SSRC used for the session",
277           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
278
279   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
280       g_param_spec_object ("internal-source", "Internal Source",
281           "The internal source element of the session",
282           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
283
284   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
285       g_param_spec_double ("bandwidth", "Bandwidth",
286           "The bandwidth of the session (0 for auto-discover)",
287           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
288           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289
290   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
291       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
292           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
293           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
294           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
295
296   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
297       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
298           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
299           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
300           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
301
302   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
303       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
304           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
305           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
306           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
307
308   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
309       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
310           "The maximum size of the RTCP packets",
311           16, G_MAXINT16, DEFAULT_RTCP_MTU,
312           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
313
314   g_object_class_install_property (gobject_class, PROP_SDES,
315       g_param_spec_boxed ("sdes", "SDES",
316           "The SDES items of this session",
317           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
318
319   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
320       g_param_spec_uint ("num-sources", "Num Sources",
321           "The number of sources in the session", 0, G_MAXUINT,
322           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
323
324   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
325       g_param_spec_uint ("num-active-sources", "Num Active Sources",
326           "The number of active sources in the session", 0, G_MAXUINT,
327           DEFAULT_NUM_ACTIVE_SOURCES,
328           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
329   /**
330    * RTPSource::sources
331    *
332    * Get a GValue Array of all sources in the session.
333    *
334    * <example>
335    * <title>Getting the #RTPSources of a session
336    * <programlisting>
337    * {
338    *   GValueArray *arr;
339    *   GValue *val;
340    *   guint i;
341    *
342    *   g_object_get (sess, "sources", &arr, NULL);
343    *
344    *   for (i = 0; i < arr->n_values; i++) {
345    *     RTPSource *source;
346    *
347    *     val = g_value_array_get_nth (arr, i);
348    *     source = g_value_get_object (val);
349    *   }
350    *   g_value_array_free (arr);
351    * }
352    * </programlisting>
353    * </example>
354    */
355   g_object_class_install_property (gobject_class, PROP_SOURCES,
356       g_param_spec_boxed ("sources", "Sources",
357           "An array of all known sources in the session",
358           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
359
360   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
361       g_param_spec_boolean ("favor-new", "Favor new sources",
362           "Resolve SSRC conflict in favor of new sources", FALSE,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364
365   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
366       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
367           "Minimum interval between Regular RTCP packet (in ns)",
368           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
369           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370
371   klass->get_source_by_ssrc =
372       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
373
374   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
375 }
376
377 static void
378 rtp_session_init (RTPSession * sess)
379 {
380   gint i;
381   gchar *str;
382
383   sess->lock = g_mutex_new ();
384   sess->key = g_random_int ();
385   sess->mask_idx = 0;
386   sess->mask = 0;
387
388   for (i = 0; i < 32; i++) {
389     sess->ssrcs[i] =
390         g_hash_table_new_full (NULL, NULL, NULL,
391         (GDestroyNotify) g_object_unref);
392   }
393   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
394
395   rtp_stats_init_defaults (&sess->stats);
396
397   sess->recalc_bandwidth = TRUE;
398   sess->bandwidth = DEFAULT_BANDWIDTH;
399   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
400   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
401   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
402
403   /* create an active SSRC for this session manager */
404   sess->source = rtp_session_create_source (sess);
405   sess->source->validated = TRUE;
406   sess->source->internal = TRUE;
407   sess->stats.active_sources++;
408   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
409
410   /* default UDP header length */
411   sess->header_len = 28;
412   sess->mtu = DEFAULT_RTCP_MTU;
413
414   /* some default SDES entries */
415   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
416   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
417   g_free (str);
418
419   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME,
420       g_get_real_name ());
421   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
422
423   sess->first_rtcp = TRUE;
424   sess->allow_early = TRUE;
425
426   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
427 }
428
429 static void
430 rtp_session_finalize (GObject * object)
431 {
432   RTPSession *sess;
433   gint i;
434
435   sess = RTP_SESSION_CAST (object);
436
437   g_mutex_free (sess->lock);
438   for (i = 0; i < 32; i++)
439     g_hash_table_destroy (sess->ssrcs[i]);
440
441   g_free (sess->bye_reason);
442
443   g_hash_table_destroy (sess->cnames);
444   g_object_unref (sess->source);
445
446   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
447 }
448
449 static void
450 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
451 {
452   GValue value = { 0 };
453
454   g_value_init (&value, RTP_TYPE_SOURCE);
455   g_value_take_object (&value, source);
456   /* copies the value */
457   g_value_array_append (arr, &value);
458 }
459
460 static GValueArray *
461 rtp_session_create_sources (RTPSession * sess)
462 {
463   GValueArray *res;
464   guint size;
465
466   RTP_SESSION_LOCK (sess);
467   /* get number of elements in the table */
468   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
469   /* create the result value array */
470   res = g_value_array_new (size);
471
472   /* and copy all values into the array */
473   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
474   RTP_SESSION_UNLOCK (sess);
475
476   return res;
477 }
478
479 static void
480 rtp_session_set_property (GObject * object, guint prop_id,
481     const GValue * value, GParamSpec * pspec)
482 {
483   RTPSession *sess;
484
485   sess = RTP_SESSION (object);
486
487   switch (prop_id) {
488     case PROP_INTERNAL_SSRC:
489       rtp_session_set_internal_ssrc (sess, g_value_get_uint (value));
490       break;
491     case PROP_BANDWIDTH:
492       sess->bandwidth = g_value_get_double (value);
493       sess->recalc_bandwidth = TRUE;
494       break;
495     case PROP_RTCP_FRACTION:
496       sess->rtcp_bandwidth = g_value_get_double (value);
497       sess->recalc_bandwidth = TRUE;
498       break;
499     case PROP_RTCP_RR_BANDWIDTH:
500       sess->rtcp_rr_bandwidth = g_value_get_int (value);
501       sess->recalc_bandwidth = TRUE;
502       break;
503     case PROP_RTCP_RS_BANDWIDTH:
504       sess->rtcp_rs_bandwidth = g_value_get_int (value);
505       sess->recalc_bandwidth = TRUE;
506       break;
507     case PROP_RTCP_MTU:
508       sess->mtu = g_value_get_uint (value);
509       break;
510     case PROP_SDES:
511       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
512       break;
513     case PROP_FAVOR_NEW:
514       sess->favor_new = g_value_get_boolean (value);
515       break;
516     case PROP_RTCP_MIN_INTERVAL:
517       rtp_stats_set_min_interval (&sess->stats,
518           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
519       break;
520     default:
521       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
522       break;
523   }
524 }
525
526 static void
527 rtp_session_get_property (GObject * object, guint prop_id,
528     GValue * value, GParamSpec * pspec)
529 {
530   RTPSession *sess;
531
532   sess = RTP_SESSION (object);
533
534   switch (prop_id) {
535     case PROP_INTERNAL_SSRC:
536       g_value_set_uint (value, rtp_session_get_internal_ssrc (sess));
537       break;
538     case PROP_INTERNAL_SOURCE:
539       g_value_take_object (value, rtp_session_get_internal_source (sess));
540       break;
541     case PROP_BANDWIDTH:
542       g_value_set_double (value, sess->bandwidth);
543       break;
544     case PROP_RTCP_FRACTION:
545       g_value_set_double (value, sess->rtcp_bandwidth);
546       break;
547     case PROP_RTCP_RR_BANDWIDTH:
548       g_value_set_int (value, sess->rtcp_rr_bandwidth);
549       break;
550     case PROP_RTCP_RS_BANDWIDTH:
551       g_value_set_int (value, sess->rtcp_rs_bandwidth);
552       break;
553     case PROP_RTCP_MTU:
554       g_value_set_uint (value, sess->mtu);
555       break;
556     case PROP_SDES:
557       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
558       break;
559     case PROP_NUM_SOURCES:
560       g_value_set_uint (value, rtp_session_get_num_sources (sess));
561       break;
562     case PROP_NUM_ACTIVE_SOURCES:
563       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
564       break;
565     case PROP_SOURCES:
566       g_value_take_boxed (value, rtp_session_create_sources (sess));
567       break;
568     case PROP_FAVOR_NEW:
569       g_value_set_boolean (value, sess->favor_new);
570       break;
571     case PROP_RTCP_MIN_INTERVAL:
572       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
573       break;
574     default:
575       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
576       break;
577   }
578 }
579
580 static void
581 on_new_ssrc (RTPSession * sess, RTPSource * source)
582 {
583   g_object_ref (source);
584   RTP_SESSION_UNLOCK (sess);
585   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
586   RTP_SESSION_LOCK (sess);
587   g_object_unref (source);
588 }
589
590 static void
591 on_ssrc_collision (RTPSession * sess, RTPSource * source)
592 {
593   g_object_ref (source);
594   RTP_SESSION_UNLOCK (sess);
595   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
596       source);
597   RTP_SESSION_LOCK (sess);
598   g_object_unref (source);
599 }
600
601 static void
602 on_ssrc_validated (RTPSession * sess, RTPSource * source)
603 {
604   g_object_ref (source);
605   RTP_SESSION_UNLOCK (sess);
606   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
607       source);
608   RTP_SESSION_LOCK (sess);
609   g_object_unref (source);
610 }
611
612 static void
613 on_ssrc_active (RTPSession * sess, RTPSource * source)
614 {
615   g_object_ref (source);
616   RTP_SESSION_UNLOCK (sess);
617   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
618   RTP_SESSION_LOCK (sess);
619   g_object_unref (source);
620 }
621
622 static void
623 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
624 {
625   g_object_ref (source);
626   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
627   RTP_SESSION_UNLOCK (sess);
628   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
629   RTP_SESSION_LOCK (sess);
630   g_object_unref (source);
631 }
632
633 static void
634 on_bye_ssrc (RTPSession * sess, RTPSource * source)
635 {
636   g_object_ref (source);
637   RTP_SESSION_UNLOCK (sess);
638   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
639   RTP_SESSION_LOCK (sess);
640   g_object_unref (source);
641 }
642
643 static void
644 on_bye_timeout (RTPSession * sess, RTPSource * source)
645 {
646   g_object_ref (source);
647   RTP_SESSION_UNLOCK (sess);
648   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
649   RTP_SESSION_LOCK (sess);
650   g_object_unref (source);
651 }
652
653 static void
654 on_timeout (RTPSession * sess, RTPSource * source)
655 {
656   g_object_ref (source);
657   RTP_SESSION_UNLOCK (sess);
658   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
659   RTP_SESSION_LOCK (sess);
660   g_object_unref (source);
661 }
662
663 static void
664 on_sender_timeout (RTPSession * sess, RTPSource * source)
665 {
666   g_object_ref (source);
667   RTP_SESSION_UNLOCK (sess);
668   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
669       source);
670   RTP_SESSION_LOCK (sess);
671   g_object_unref (source);
672 }
673
674 /**
675  * rtp_session_new:
676  *
677  * Create a new session object.
678  *
679  * Returns: a new #RTPSession. g_object_unref() after usage.
680  */
681 RTPSession *
682 rtp_session_new (void)
683 {
684   RTPSession *sess;
685
686   sess = g_object_new (RTP_TYPE_SESSION, NULL);
687
688   return sess;
689 }
690
691 /**
692  * rtp_session_set_callbacks:
693  * @sess: an #RTPSession
694  * @callbacks: callbacks to configure
695  * @user_data: user data passed in the callbacks
696  *
697  * Configure a set of callbacks to be notified of actions.
698  */
699 void
700 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
701     gpointer user_data)
702 {
703   g_return_if_fail (RTP_IS_SESSION (sess));
704
705   if (callbacks->process_rtp) {
706     sess->callbacks.process_rtp = callbacks->process_rtp;
707     sess->process_rtp_user_data = user_data;
708   }
709   if (callbacks->send_rtp) {
710     sess->callbacks.send_rtp = callbacks->send_rtp;
711     sess->send_rtp_user_data = user_data;
712   }
713   if (callbacks->send_rtcp) {
714     sess->callbacks.send_rtcp = callbacks->send_rtcp;
715     sess->send_rtcp_user_data = user_data;
716   }
717   if (callbacks->sync_rtcp) {
718     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
719     sess->sync_rtcp_user_data = user_data;
720   }
721   if (callbacks->clock_rate) {
722     sess->callbacks.clock_rate = callbacks->clock_rate;
723     sess->clock_rate_user_data = user_data;
724   }
725   if (callbacks->reconsider) {
726     sess->callbacks.reconsider = callbacks->reconsider;
727     sess->reconsider_user_data = user_data;
728   }
729 }
730
731 /**
732  * rtp_session_set_process_rtp_callback:
733  * @sess: an #RTPSession
734  * @callback: callback to set
735  * @user_data: user data passed in the callback
736  *
737  * Configure only the process_rtp callback to be notified of the process_rtp action.
738  */
739 void
740 rtp_session_set_process_rtp_callback (RTPSession * sess,
741     RTPSessionProcessRTP callback, gpointer user_data)
742 {
743   g_return_if_fail (RTP_IS_SESSION (sess));
744
745   sess->callbacks.process_rtp = callback;
746   sess->process_rtp_user_data = user_data;
747 }
748
749 /**
750  * rtp_session_set_send_rtp_callback:
751  * @sess: an #RTPSession
752  * @callback: callback to set
753  * @user_data: user data passed in the callback
754  *
755  * Configure only the send_rtp callback to be notified of the send_rtp action.
756  */
757 void
758 rtp_session_set_send_rtp_callback (RTPSession * sess,
759     RTPSessionSendRTP callback, gpointer user_data)
760 {
761   g_return_if_fail (RTP_IS_SESSION (sess));
762
763   sess->callbacks.send_rtp = callback;
764   sess->send_rtp_user_data = user_data;
765 }
766
767 /**
768  * rtp_session_set_send_rtcp_callback:
769  * @sess: an #RTPSession
770  * @callback: callback to set
771  * @user_data: user data passed in the callback
772  *
773  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
774  */
775 void
776 rtp_session_set_send_rtcp_callback (RTPSession * sess,
777     RTPSessionSendRTCP callback, gpointer user_data)
778 {
779   g_return_if_fail (RTP_IS_SESSION (sess));
780
781   sess->callbacks.send_rtcp = callback;
782   sess->send_rtcp_user_data = user_data;
783 }
784
785 /**
786  * rtp_session_set_sync_rtcp_callback:
787  * @sess: an #RTPSession
788  * @callback: callback to set
789  * @user_data: user data passed in the callback
790  *
791  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
792  */
793 void
794 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
795     RTPSessionSyncRTCP callback, gpointer user_data)
796 {
797   g_return_if_fail (RTP_IS_SESSION (sess));
798
799   sess->callbacks.sync_rtcp = callback;
800   sess->sync_rtcp_user_data = user_data;
801 }
802
803 /**
804  * rtp_session_set_clock_rate_callback:
805  * @sess: an #RTPSession
806  * @callback: callback to set
807  * @user_data: user data passed in the callback
808  *
809  * Configure only the clock_rate callback to be notified of the clock_rate action.
810  */
811 void
812 rtp_session_set_clock_rate_callback (RTPSession * sess,
813     RTPSessionClockRate callback, gpointer user_data)
814 {
815   g_return_if_fail (RTP_IS_SESSION (sess));
816
817   sess->callbacks.clock_rate = callback;
818   sess->clock_rate_user_data = user_data;
819 }
820
821 /**
822  * rtp_session_set_reconsider_callback:
823  * @sess: an #RTPSession
824  * @callback: callback to set
825  * @user_data: user data passed in the callback
826  *
827  * Configure only the reconsider callback to be notified of the reconsider action.
828  */
829 void
830 rtp_session_set_reconsider_callback (RTPSession * sess,
831     RTPSessionReconsider callback, gpointer user_data)
832 {
833   g_return_if_fail (RTP_IS_SESSION (sess));
834
835   sess->callbacks.reconsider = callback;
836   sess->reconsider_user_data = user_data;
837 }
838
839 /**
840  * rtp_session_set_bandwidth:
841  * @sess: an #RTPSession
842  * @bandwidth: the bandwidth allocated
843  *
844  * Set the session bandwidth in bytes per second.
845  */
846 void
847 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
848 {
849   g_return_if_fail (RTP_IS_SESSION (sess));
850
851   RTP_SESSION_LOCK (sess);
852   sess->stats.bandwidth = bandwidth;
853   RTP_SESSION_UNLOCK (sess);
854 }
855
856 /**
857  * rtp_session_get_bandwidth:
858  * @sess: an #RTPSession
859  *
860  * Get the session bandwidth.
861  *
862  * Returns: the session bandwidth.
863  */
864 gdouble
865 rtp_session_get_bandwidth (RTPSession * sess)
866 {
867   gdouble result;
868
869   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
870
871   RTP_SESSION_LOCK (sess);
872   result = sess->stats.bandwidth;
873   RTP_SESSION_UNLOCK (sess);
874
875   return result;
876 }
877
878 /**
879  * rtp_session_set_rtcp_fraction:
880  * @sess: an #RTPSession
881  * @bandwidth: the RTCP bandwidth
882  *
883  * Set the bandwidth in bytes per second that should be used for RTCP
884  * messages.
885  */
886 void
887 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
888 {
889   g_return_if_fail (RTP_IS_SESSION (sess));
890
891   RTP_SESSION_LOCK (sess);
892   sess->stats.rtcp_bandwidth = bandwidth;
893   RTP_SESSION_UNLOCK (sess);
894 }
895
896 /**
897  * rtp_session_get_rtcp_fraction:
898  * @sess: an #RTPSession
899  *
900  * Get the session bandwidth used for RTCP.
901  *
902  * Returns: The bandwidth used for RTCP messages.
903  */
904 gdouble
905 rtp_session_get_rtcp_fraction (RTPSession * sess)
906 {
907   gdouble result;
908
909   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
910
911   RTP_SESSION_LOCK (sess);
912   result = sess->stats.rtcp_bandwidth;
913   RTP_SESSION_UNLOCK (sess);
914
915   return result;
916 }
917
918 /**
919  * rtp_session_set_sdes_string:
920  * @sess: an #RTPSession
921  * @type: the type of the SDES item
922  * @item: a null-terminated string to set.
923  *
924  * Store an SDES item of @type in @sess.
925  *
926  * Returns: %FALSE if the data was unchanged @type is invalid.
927  */
928 gboolean
929 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
930     const gchar * item)
931 {
932   gboolean result;
933
934   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
935
936   RTP_SESSION_LOCK (sess);
937   result = rtp_source_set_sdes_string (sess->source, type, item);
938   RTP_SESSION_UNLOCK (sess);
939
940   return result;
941 }
942
943 /**
944  * rtp_session_get_sdes_string:
945  * @sess: an #RTPSession
946  * @type: the type of the SDES item
947  *
948  * Get the SDES item of @type from @sess.
949  *
950  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
951  * valid. g_free() after usage.
952  */
953 gchar *
954 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
955 {
956   gchar *result;
957
958   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
959
960   RTP_SESSION_LOCK (sess);
961   result = rtp_source_get_sdes_string (sess->source, type);
962   RTP_SESSION_UNLOCK (sess);
963
964   return result;
965 }
966
967 /**
968  * rtp_session_get_sdes_struct:
969  * @sess: an #RTSPSession
970  *
971  * Get the SDES data as a #GstStructure
972  *
973  * Returns: a GstStructure with SDES items for @sess. This function returns a
974  * copy of the SDES structure, use gst_structure_free() after usage.
975  */
976 GstStructure *
977 rtp_session_get_sdes_struct (RTPSession * sess)
978 {
979   const GstStructure *sdes;
980   GstStructure *result = NULL;
981
982   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
983
984   RTP_SESSION_LOCK (sess);
985   sdes = rtp_source_get_sdes_struct (sess->source);
986   if (sdes)
987     result = gst_structure_copy (sdes);
988   RTP_SESSION_UNLOCK (sess);
989
990   return result;
991 }
992
993 /**
994  * rtp_session_set_sdes_struct:
995  * @sess: an #RTSPSession
996  * @sdes: a #GstStructure
997  *
998  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
999  */
1000 void
1001 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1002 {
1003   g_return_if_fail (sdes);
1004   g_return_if_fail (RTP_IS_SESSION (sess));
1005
1006   RTP_SESSION_LOCK (sess);
1007   rtp_source_set_sdes_struct (sess->source, gst_structure_copy (sdes));
1008   RTP_SESSION_UNLOCK (sess);
1009 }
1010
1011 static GstFlowReturn
1012 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1013 {
1014   GstFlowReturn result = GST_FLOW_OK;
1015
1016   if (source == session->source) {
1017     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1018
1019     RTP_SESSION_UNLOCK (session);
1020
1021     if (session->callbacks.send_rtp)
1022       result =
1023           session->callbacks.send_rtp (session, source, data,
1024           session->send_rtp_user_data);
1025     else {
1026       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1027     }
1028   } else {
1029     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1030     RTP_SESSION_UNLOCK (session);
1031
1032     if (session->callbacks.process_rtp)
1033       result =
1034           session->callbacks.process_rtp (session, source,
1035           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1036     else
1037       gst_buffer_unref (GST_BUFFER_CAST (data));
1038   }
1039   RTP_SESSION_LOCK (session);
1040
1041   return result;
1042 }
1043
1044 static gint
1045 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1046 {
1047   gint result;
1048
1049   RTP_SESSION_UNLOCK (session);
1050
1051   if (session->callbacks.clock_rate)
1052     result =
1053         session->callbacks.clock_rate (session, pt,
1054         session->clock_rate_user_data);
1055   else
1056     result = -1;
1057
1058   RTP_SESSION_LOCK (session);
1059
1060   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1061
1062   return result;
1063 }
1064
1065 static RTPSourceCallbacks callbacks = {
1066   (RTPSourcePushRTP) source_push_rtp,
1067   (RTPSourceClockRate) source_clock_rate,
1068 };
1069
1070 static gboolean
1071 check_collision (RTPSession * sess, RTPSource * source,
1072     RTPArrivalStats * arrival, gboolean rtp)
1073 {
1074   /* If we have no arrival address, we can't do collision checking */
1075   if (!arrival->have_address)
1076     return FALSE;
1077
1078   if (sess->source != source) {
1079     GstNetAddress *from;
1080     gboolean have_from;
1081
1082     /* This is not our local source, but lets check if two remote
1083      * source collide
1084      */
1085
1086     if (rtp) {
1087       from = &source->rtp_from;
1088       have_from = source->have_rtp_from;
1089     } else {
1090       from = &source->rtcp_from;
1091       have_from = source->have_rtcp_from;
1092     }
1093
1094     if (have_from) {
1095       if (gst_netaddress_equal (from, &arrival->address)) {
1096         /* Address is the same */
1097         return FALSE;
1098       } else {
1099         GST_LOG ("we have a third-party collision or loop ssrc:%x",
1100             rtp_source_get_ssrc (source));
1101         if (sess->favor_new) {
1102           if (rtp_source_find_conflicting_address (source,
1103                   &arrival->address, arrival->current_time)) {
1104             gchar buf1[40];
1105             gst_netaddress_to_string (&arrival->address, buf1, 40);
1106             GST_LOG ("Known conflict on %x for %s, dropping packet",
1107                 rtp_source_get_ssrc (source), buf1);
1108             return TRUE;
1109           } else {
1110             gchar buf1[40], buf2[40];
1111
1112             /* Current address is not a known conflict, lets assume this is
1113              * a new source. Save old address in possible conflict list
1114              */
1115             rtp_source_add_conflicting_address (source, from,
1116                 arrival->current_time);
1117
1118             gst_netaddress_to_string (from, buf1, 40);
1119             gst_netaddress_to_string (&arrival->address, buf2, 40);
1120             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1121                 " saving old as known conflict",
1122                 rtp_source_get_ssrc (source), buf1, buf2);
1123
1124             if (rtp)
1125               rtp_source_set_rtp_from (source, &arrival->address);
1126             else
1127               rtp_source_set_rtcp_from (source, &arrival->address);
1128             return FALSE;
1129           }
1130         } else {
1131           /* Don't need to save old addresses, we ignore new sources */
1132           return TRUE;
1133         }
1134       }
1135     } else {
1136       /* We don't already have a from address for RTP, just set it */
1137       if (rtp)
1138         rtp_source_set_rtp_from (source, &arrival->address);
1139       else
1140         rtp_source_set_rtcp_from (source, &arrival->address);
1141       return FALSE;
1142     }
1143
1144     /* FIXME: Log 3rd party collision somehow
1145      * Maybe should be done in upper layer, only the SDES can tell us
1146      * if its a collision or a loop
1147      */
1148
1149     /* If the source has been inactive for some time, we assume that it has
1150      * simply changed its transport source address. Hence, there is no true
1151      * third-party collision - only a simulated one. */
1152     if (arrival->current_time > source->last_activity) {
1153       GstClockTime inactivity_period =
1154           arrival->current_time - source->last_activity;
1155       if (inactivity_period > 1 * GST_SECOND) {
1156         /* Use new network address */
1157         if (rtp) {
1158           g_assert (source->have_rtp_from);
1159           rtp_source_set_rtp_from (source, &arrival->address);
1160         } else {
1161           g_assert (source->have_rtcp_from);
1162           rtp_source_set_rtcp_from (source, &arrival->address);
1163         }
1164         return FALSE;
1165       }
1166     }
1167   } else {
1168     /* This is sending with our ssrc, is it an address we already know */
1169
1170     if (rtp_source_find_conflicting_address (source, &arrival->address,
1171             arrival->current_time)) {
1172       /* Its a known conflict, its probably a loop, not a collision
1173        * lets just drop the incoming packet
1174        */
1175       GST_DEBUG ("Our packets are being looped back to us, dropping");
1176     } else {
1177       /* Its a new collision, lets change our SSRC */
1178
1179       rtp_source_add_conflicting_address (source, &arrival->address,
1180           arrival->current_time);
1181
1182       GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
1183       on_ssrc_collision (sess, source);
1184
1185       rtp_session_schedule_bye_locked (sess, "SSRC Collision",
1186           arrival->current_time);
1187
1188       sess->change_ssrc = TRUE;
1189     }
1190   }
1191
1192   return TRUE;
1193 }
1194
1195
1196 /* must be called with the session lock, the returned source needs to be
1197  * unreffed after usage. */
1198 static RTPSource *
1199 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1200     RTPArrivalStats * arrival, gboolean rtp)
1201 {
1202   RTPSource *source;
1203
1204   source =
1205       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1206   if (source == NULL) {
1207     /* make new Source in probation and insert */
1208     source = rtp_source_new (ssrc);
1209
1210     /* for RTP packets we need to set the source in probation. Receiving RTCP
1211      * packets of an SSRC, on the other hand, is a strong indication that we
1212      * are dealing with a valid source. */
1213     if (rtp)
1214       source->probation = RTP_DEFAULT_PROBATION;
1215     else
1216       source->probation = 0;
1217
1218     /* store from address, if any */
1219     if (arrival->have_address) {
1220       if (rtp)
1221         rtp_source_set_rtp_from (source, &arrival->address);
1222       else
1223         rtp_source_set_rtcp_from (source, &arrival->address);
1224     }
1225
1226     /* configure a callback on the source */
1227     rtp_source_set_callbacks (source, &callbacks, sess);
1228
1229     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1230         source);
1231
1232     /* we have one more source now */
1233     sess->total_sources++;
1234     *created = TRUE;
1235   } else {
1236     *created = FALSE;
1237     /* check for collision, this updates the address when not previously set */
1238     if (check_collision (sess, source, arrival, rtp)) {
1239       return NULL;
1240     }
1241   }
1242   /* update last activity */
1243   source->last_activity = arrival->current_time;
1244   if (rtp)
1245     source->last_rtp_activity = arrival->current_time;
1246   g_object_ref (source);
1247
1248   return source;
1249 }
1250
1251 /**
1252  * rtp_session_get_internal_source:
1253  * @sess: a #RTPSession
1254  *
1255  * Get the internal #RTPSource of @sess.
1256  *
1257  * Returns: The internal #RTPSource. g_object_unref() after usage.
1258  */
1259 RTPSource *
1260 rtp_session_get_internal_source (RTPSession * sess)
1261 {
1262   RTPSource *result;
1263
1264   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1265
1266   result = g_object_ref (sess->source);
1267
1268   return result;
1269 }
1270
1271 /**
1272  * rtp_session_set_internal_ssrc:
1273  * @sess: a #RTPSession
1274  * @ssrc: an SSRC
1275  *
1276  * Set the SSRC of @sess to @ssrc.
1277  */
1278 void
1279 rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
1280 {
1281   RTP_SESSION_LOCK (sess);
1282   if (ssrc != sess->source->ssrc) {
1283     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
1284         GINT_TO_POINTER (sess->source->ssrc));
1285
1286     GST_DEBUG ("setting internal SSRC to %08x", ssrc);
1287     /* After this call, any receiver of the old SSRC either in RTP or RTCP
1288      * packets will timeout on the old SSRC, we could potentially schedule a
1289      * BYE RTCP for the old SSRC... */
1290     sess->source->ssrc = ssrc;
1291     rtp_source_reset (sess->source);
1292
1293     /* rehash with the new SSRC */
1294     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1295         GINT_TO_POINTER (sess->source->ssrc), sess->source);
1296   }
1297   RTP_SESSION_UNLOCK (sess);
1298
1299   g_object_notify (G_OBJECT (sess), "internal-ssrc");
1300 }
1301
1302 /**
1303  * rtp_session_get_internal_ssrc:
1304  * @sess: a #RTPSession
1305  *
1306  * Get the internal SSRC of @sess.
1307  *
1308  * Returns: The SSRC of the session.
1309  */
1310 guint32
1311 rtp_session_get_internal_ssrc (RTPSession * sess)
1312 {
1313   guint32 ssrc;
1314
1315   RTP_SESSION_LOCK (sess);
1316   ssrc = sess->source->ssrc;
1317   RTP_SESSION_UNLOCK (sess);
1318
1319   return ssrc;
1320 }
1321
1322 /**
1323  * rtp_session_add_source:
1324  * @sess: a #RTPSession
1325  * @src: #RTPSource to add
1326  *
1327  * Add @src to @session.
1328  *
1329  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1330  * existed in the session.
1331  */
1332 gboolean
1333 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1334 {
1335   gboolean result = FALSE;
1336   RTPSource *find;
1337
1338   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1339   g_return_val_if_fail (src != NULL, FALSE);
1340
1341   RTP_SESSION_LOCK (sess);
1342   find =
1343       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1344       GINT_TO_POINTER (src->ssrc));
1345   if (find == NULL) {
1346     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1347         GINT_TO_POINTER (src->ssrc), src);
1348     /* we have one more source now */
1349     sess->total_sources++;
1350     result = TRUE;
1351   }
1352   RTP_SESSION_UNLOCK (sess);
1353
1354   return result;
1355 }
1356
1357 /**
1358  * rtp_session_get_num_sources:
1359  * @sess: an #RTPSession
1360  *
1361  * Get the number of sources in @sess.
1362  *
1363  * Returns: The number of sources in @sess.
1364  */
1365 guint
1366 rtp_session_get_num_sources (RTPSession * sess)
1367 {
1368   guint result;
1369
1370   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1371
1372   RTP_SESSION_LOCK (sess);
1373   result = sess->total_sources;
1374   RTP_SESSION_UNLOCK (sess);
1375
1376   return result;
1377 }
1378
1379 /**
1380  * rtp_session_get_num_active_sources:
1381  * @sess: an #RTPSession
1382  *
1383  * Get the number of active sources in @sess. A source is considered active when
1384  * it has been validated and has not yet received a BYE RTCP message.
1385  *
1386  * Returns: The number of active sources in @sess.
1387  */
1388 guint
1389 rtp_session_get_num_active_sources (RTPSession * sess)
1390 {
1391   guint result;
1392
1393   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1394
1395   RTP_SESSION_LOCK (sess);
1396   result = sess->stats.active_sources;
1397   RTP_SESSION_UNLOCK (sess);
1398
1399   return result;
1400 }
1401
1402 /**
1403  * rtp_session_get_source_by_ssrc:
1404  * @sess: an #RTPSession
1405  * @ssrc: an SSRC
1406  *
1407  * Find the source with @ssrc in @sess.
1408  *
1409  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1410  * g_object_unref() after usage.
1411  */
1412 RTPSource *
1413 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1414 {
1415   RTPSource *result;
1416
1417   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1418
1419   RTP_SESSION_LOCK (sess);
1420   result =
1421       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1422   if (result)
1423     g_object_ref (result);
1424   RTP_SESSION_UNLOCK (sess);
1425
1426   return result;
1427 }
1428
1429 /**
1430  * rtp_session_get_source_by_cname:
1431  * @sess: a #RTPSession
1432  * @cname: an CNAME
1433  *
1434  * Find the source with @cname in @sess.
1435  *
1436  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
1437  * g_object_unref() after usage.
1438  */
1439 RTPSource *
1440 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
1441 {
1442   RTPSource *result;
1443
1444   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1445   g_return_val_if_fail (cname != NULL, NULL);
1446
1447   RTP_SESSION_LOCK (sess);
1448   result = g_hash_table_lookup (sess->cnames, cname);
1449   if (result)
1450     g_object_ref (result);
1451   RTP_SESSION_UNLOCK (sess);
1452
1453   return result;
1454 }
1455
1456 /* should be called with the SESSION lock */
1457 static guint32
1458 rtp_session_create_new_ssrc (RTPSession * sess)
1459 {
1460   guint32 ssrc;
1461
1462   while (TRUE) {
1463     ssrc = g_random_int ();
1464
1465     /* see if it exists in the session, we're done if it doesn't */
1466     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1467             GINT_TO_POINTER (ssrc)) == NULL)
1468       break;
1469   }
1470   return ssrc;
1471 }
1472
1473
1474 /**
1475  * rtp_session_create_source:
1476  * @sess: an #RTPSession
1477  *
1478  * Create an #RTPSource for use in @sess. This function will create a source
1479  * with an ssrc that is currently not used by any participants in the session.
1480  *
1481  * Returns: an #RTPSource.
1482  */
1483 RTPSource *
1484 rtp_session_create_source (RTPSession * sess)
1485 {
1486   guint32 ssrc;
1487   RTPSource *source;
1488
1489   RTP_SESSION_LOCK (sess);
1490   ssrc = rtp_session_create_new_ssrc (sess);
1491   source = rtp_source_new (ssrc);
1492   rtp_source_set_callbacks (source, &callbacks, sess);
1493   /* we need an additional ref for the source in the hashtable */
1494   g_object_ref (source);
1495   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1496       source);
1497   /* we have one more source now */
1498   sess->total_sources++;
1499   RTP_SESSION_UNLOCK (sess);
1500
1501   return source;
1502 }
1503
1504 /* update the RTPArrivalStats structure with the current time and other bits
1505  * about the current buffer we are handling.
1506  * This function is typically called when a validated packet is received.
1507  * This function should be called with the SESSION_LOCK
1508  */
1509 static void
1510 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1511     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1512     GstClockTime running_time)
1513 {
1514   /* get time of arrival */
1515   arrival->current_time = current_time;
1516   arrival->running_time = running_time;
1517
1518   /* get packet size including header overhead */
1519   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
1520
1521   if (rtp) {
1522     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1523   } else {
1524     arrival->payload_len = 0;
1525   }
1526
1527   /* for netbuffer we can store the IP address to check for collisions */
1528   arrival->have_address = GST_IS_NETBUFFER (buffer);
1529   if (arrival->have_address) {
1530     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1531
1532     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1533   }
1534 }
1535
1536 /**
1537  * rtp_session_process_rtp:
1538  * @sess: and #RTPSession
1539  * @buffer: an RTP buffer
1540  * @current_time: the current system time
1541  * @running_time: the running_time of @buffer
1542  *
1543  * Process an RTP buffer in the session manager. This function takes ownership
1544  * of @buffer.
1545  *
1546  * Returns: a #GstFlowReturn.
1547  */
1548 GstFlowReturn
1549 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1550     GstClockTime current_time, GstClockTime running_time)
1551 {
1552   GstFlowReturn result;
1553   guint32 ssrc;
1554   RTPSource *source;
1555   gboolean created;
1556   gboolean prevsender, prevactive;
1557   RTPArrivalStats arrival;
1558   guint32 csrcs[16];
1559   guint8 i, count;
1560   guint64 oldrate;
1561
1562   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1563   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1564
1565   if (!gst_rtp_buffer_validate (buffer))
1566     goto invalid_packet;
1567
1568   RTP_SESSION_LOCK (sess);
1569   /* update arrival stats */
1570   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1571       running_time);
1572
1573   /* ignore more RTP packets when we left the session */
1574   if (sess->source->received_bye)
1575     goto ignore;
1576
1577   /* get SSRC and look up in session database */
1578   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1579   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1580   if (!source)
1581     goto collision;
1582
1583   prevsender = RTP_SOURCE_IS_SENDER (source);
1584   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1585   oldrate = source->bitrate;
1586
1587   /* copy available csrc for later */
1588   count = gst_rtp_buffer_get_csrc_count (buffer);
1589   /* make sure to not overflow our array. An RTP buffer can maximally contain
1590    * 16 CSRCs */
1591   count = MIN (count, 16);
1592
1593   for (i = 0; i < count; i++)
1594     csrcs[i] = gst_rtp_buffer_get_csrc (buffer, i);
1595
1596   /* let source process the packet */
1597   result = rtp_source_process_rtp (source, buffer, &arrival);
1598
1599   /* source became active */
1600   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1601     sess->stats.active_sources++;
1602     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1603         sess->stats.active_sources);
1604     on_ssrc_validated (sess, source);
1605   }
1606   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1607     sess->stats.sender_sources++;
1608     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1609         sess->stats.sender_sources);
1610   }
1611   if (oldrate != source->bitrate)
1612     sess->recalc_bandwidth = TRUE;
1613
1614   if (created)
1615     on_new_ssrc (sess, source);
1616
1617   if (source->validated) {
1618     gboolean created;
1619
1620     /* for validated sources, we add the CSRCs as well */
1621     for (i = 0; i < count; i++) {
1622       guint32 csrc;
1623       RTPSource *csrc_src;
1624
1625       csrc = csrcs[i];
1626
1627       /* get source */
1628       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1629       if (!csrc_src)
1630         continue;
1631
1632       if (created) {
1633         GST_DEBUG ("created new CSRC: %08x", csrc);
1634         rtp_source_set_as_csrc (csrc_src);
1635         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1636           sess->stats.active_sources++;
1637         on_new_ssrc (sess, csrc_src);
1638       }
1639       g_object_unref (csrc_src);
1640     }
1641   }
1642   g_object_unref (source);
1643
1644   RTP_SESSION_UNLOCK (sess);
1645
1646   return result;
1647
1648   /* ERRORS */
1649 invalid_packet:
1650   {
1651     gst_buffer_unref (buffer);
1652     GST_DEBUG ("invalid RTP packet received");
1653     return GST_FLOW_OK;
1654   }
1655 ignore:
1656   {
1657     gst_buffer_unref (buffer);
1658     RTP_SESSION_UNLOCK (sess);
1659     GST_DEBUG ("ignoring RTP packet because we are leaving");
1660     return GST_FLOW_OK;
1661   }
1662 collision:
1663   {
1664     gst_buffer_unref (buffer);
1665     RTP_SESSION_UNLOCK (sess);
1666     GST_DEBUG ("ignoring packet because its collisioning");
1667     return GST_FLOW_OK;
1668   }
1669 }
1670
1671 static void
1672 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1673     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1674 {
1675   guint count, i;
1676
1677   count = gst_rtcp_packet_get_rb_count (packet);
1678   for (i = 0; i < count; i++) {
1679     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1680     guint8 fractionlost;
1681     gint32 packetslost;
1682
1683     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1684         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1685
1686     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1687
1688     if (ssrc == sess->source->ssrc) {
1689       /* only deal with report blocks for our session, we update the stats of
1690        * the sender of the RTCP message. We could also compare our stats against
1691        * the other sender to see if we are better or worse. */
1692       rtp_source_process_rb (source, arrival->current_time, fractionlost,
1693           packetslost, exthighestseq, jitter, lsr, dlsr);
1694     }
1695   }
1696   on_ssrc_active (sess, source);
1697 }
1698
1699 /* A Sender report contains statistics about how the sender is doing. This
1700  * includes timing informataion such as the relation between RTP and NTP
1701  * timestamps and the number of packets/bytes it sent to us.
1702  *
1703  * In this report is also included a set of report blocks related to how this
1704  * sender is receiving data (in case we (or somebody else) is also sending stuff
1705  * to it). This info includes the packet loss, jitter and seqnum. It also
1706  * contains information to calculate the round trip time (LSR/DLSR).
1707  */
1708 static void
1709 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1710     RTPArrivalStats * arrival, gboolean * do_sync)
1711 {
1712   guint32 senderssrc, rtptime, packet_count, octet_count;
1713   guint64 ntptime;
1714   RTPSource *source;
1715   gboolean created, prevsender;
1716
1717   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1718       &packet_count, &octet_count);
1719
1720   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1721       senderssrc, GST_TIME_ARGS (arrival->current_time));
1722
1723   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1724   if (!source)
1725     return;
1726
1727   /* don't try to do lip-sync for sources that sent a BYE */
1728   if (rtp_source_received_bye (source))
1729     *do_sync = FALSE;
1730   else
1731     *do_sync = TRUE;
1732
1733   prevsender = RTP_SOURCE_IS_SENDER (source);
1734
1735   /* first update the source */
1736   rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
1737       packet_count, octet_count);
1738
1739   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1740     sess->stats.sender_sources++;
1741     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1742         sess->stats.sender_sources);
1743   }
1744
1745   if (created)
1746     on_new_ssrc (sess, source);
1747
1748   rtp_session_process_rb (sess, source, packet, arrival);
1749   g_object_unref (source);
1750 }
1751
1752 /* A receiver report contains statistics about how a receiver is doing. It
1753  * includes stuff like packet loss, jitter and the seqnum it received last. It
1754  * also contains info to calculate the round trip time.
1755  *
1756  * We are only interested in how the sender of this report is doing wrt to us.
1757  */
1758 static void
1759 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1760     RTPArrivalStats * arrival)
1761 {
1762   guint32 senderssrc;
1763   RTPSource *source;
1764   gboolean created;
1765
1766   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1767
1768   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1769
1770   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1771   if (!source)
1772     return;
1773
1774   if (created)
1775     on_new_ssrc (sess, source);
1776
1777   rtp_session_process_rb (sess, source, packet, arrival);
1778   g_object_unref (source);
1779 }
1780
1781 /* Get SDES items and store them in the SSRC */
1782 static void
1783 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1784     RTPArrivalStats * arrival)
1785 {
1786   guint items, i, j;
1787   gboolean more_items, more_entries;
1788
1789   items = gst_rtcp_packet_sdes_get_item_count (packet);
1790   GST_DEBUG ("got SDES packet with %d items", items);
1791
1792   more_items = gst_rtcp_packet_sdes_first_item (packet);
1793   i = 0;
1794   while (more_items) {
1795     guint32 ssrc;
1796     gboolean changed, created, validated;
1797     RTPSource *source;
1798     GstStructure *sdes;
1799
1800     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1801
1802     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1803
1804     changed = FALSE;
1805
1806     /* find src, no probation when dealing with RTCP */
1807     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1808     if (!source)
1809       return;
1810
1811     sdes = gst_structure_new ("application/x-rtp-source-sdes", NULL);
1812
1813     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1814     j = 0;
1815     while (more_entries) {
1816       GstRTCPSDESType type;
1817       guint8 len;
1818       guint8 *data;
1819       gchar *name;
1820       gchar *value;
1821
1822       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1823
1824       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1825           data);
1826
1827       if (type == GST_RTCP_SDES_PRIV) {
1828         name = g_strndup ((const gchar *) &data[1], data[0]);
1829         len -= data[0] + 1;
1830         data += data[0] + 1;
1831       } else {
1832         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
1833       }
1834
1835       value = g_strndup ((const gchar *) data, len);
1836
1837       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
1838
1839       g_free (name);
1840       g_free (value);
1841
1842       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1843       j++;
1844     }
1845
1846     /* takes ownership of sdes */
1847     changed = rtp_source_set_sdes_struct (source, sdes);
1848
1849     validated = !RTP_SOURCE_IS_ACTIVE (source);
1850     source->validated = TRUE;
1851
1852     if (created)
1853       on_new_ssrc (sess, source);
1854     if (validated)
1855       on_ssrc_validated (sess, source);
1856     if (changed)
1857       on_ssrc_sdes (sess, source);
1858
1859     g_object_unref (source);
1860
1861     more_items = gst_rtcp_packet_sdes_next_item (packet);
1862     i++;
1863   }
1864 }
1865
1866 /* BYE is sent when a client leaves the session
1867  */
1868 static void
1869 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1870     RTPArrivalStats * arrival)
1871 {
1872   guint count, i;
1873   gchar *reason;
1874   gboolean reconsider = FALSE;
1875
1876   reason = gst_rtcp_packet_bye_get_reason (packet);
1877   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1878
1879   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1880   for (i = 0; i < count; i++) {
1881     guint32 ssrc;
1882     RTPSource *source;
1883     gboolean created, prevactive, prevsender;
1884     guint pmembers, members;
1885
1886     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1887     GST_DEBUG ("SSRC: %08x", ssrc);
1888
1889     /* find src and mark bye, no probation when dealing with RTCP */
1890     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1891     if (!source)
1892       return;
1893
1894     /* store time for when we need to time out this source */
1895     source->bye_time = arrival->current_time;
1896
1897     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1898     prevsender = RTP_SOURCE_IS_SENDER (source);
1899
1900     /* let the source handle the rest */
1901     rtp_source_process_bye (source, reason);
1902
1903     pmembers = sess->stats.active_sources;
1904
1905     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1906       sess->stats.active_sources--;
1907       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1908           sess->stats.active_sources);
1909     }
1910     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1911       sess->stats.sender_sources--;
1912       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1913           sess->stats.sender_sources);
1914     }
1915     members = sess->stats.active_sources;
1916
1917     if (!sess->source->received_bye && members < pmembers) {
1918       /* some members went away since the previous timeout estimate.
1919        * Perform reverse reconsideration but only when we are not scheduling a
1920        * BYE ourselves. */
1921       if (arrival->current_time < sess->next_rtcp_check_time) {
1922         GstClockTime time_remaining;
1923
1924         time_remaining = sess->next_rtcp_check_time - arrival->current_time;
1925         sess->next_rtcp_check_time =
1926             gst_util_uint64_scale (time_remaining, members, pmembers);
1927
1928         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1929             GST_TIME_ARGS (sess->next_rtcp_check_time));
1930
1931         sess->next_rtcp_check_time += arrival->current_time;
1932
1933         /* mark pending reconsider. We only want to signal the reconsideration
1934          * once after we handled all the source in the bye packet */
1935         reconsider = TRUE;
1936       }
1937     }
1938
1939     if (created)
1940       on_new_ssrc (sess, source);
1941
1942     on_bye_ssrc (sess, source);
1943
1944     g_object_unref (source);
1945   }
1946   if (reconsider) {
1947     RTP_SESSION_UNLOCK (sess);
1948     /* notify app of reconsideration */
1949     if (sess->callbacks.reconsider)
1950       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
1951     RTP_SESSION_LOCK (sess);
1952   }
1953   g_free (reason);
1954 }
1955
1956 static void
1957 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1958     RTPArrivalStats * arrival)
1959 {
1960   GST_DEBUG ("received APP");
1961 }
1962
1963 /**
1964  * rtp_session_process_rtcp:
1965  * @sess: and #RTPSession
1966  * @buffer: an RTCP buffer
1967  * @current_time: the current system time
1968  *
1969  * Process an RTCP buffer in the session manager. This function takes ownership
1970  * of @buffer.
1971  *
1972  * Returns: a #GstFlowReturn.
1973  */
1974 GstFlowReturn
1975 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
1976     GstClockTime current_time)
1977 {
1978   GstRTCPPacket packet;
1979   gboolean more, is_bye = FALSE, do_sync = FALSE;
1980   RTPArrivalStats arrival;
1981   GstFlowReturn result = GST_FLOW_OK;
1982
1983   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1984   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1985
1986   if (!gst_rtcp_buffer_validate (buffer))
1987     goto invalid_packet;
1988
1989   GST_DEBUG ("received RTCP packet");
1990
1991   RTP_SESSION_LOCK (sess);
1992   /* update arrival stats */
1993   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1);
1994
1995   if (sess->sent_bye)
1996     goto ignore;
1997
1998   /* make writable, we might want to change the buffer */
1999   buffer = gst_buffer_make_metadata_writable (buffer);
2000
2001   /* start processing the compound packet */
2002   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
2003   while (more) {
2004     GstRTCPType type;
2005
2006     type = gst_rtcp_packet_get_type (&packet);
2007
2008     /* when we are leaving the session, we should ignore all non-BYE messages */
2009     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
2010       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
2011       goto next;
2012     }
2013
2014     switch (type) {
2015       case GST_RTCP_TYPE_SR:
2016         rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
2017         break;
2018       case GST_RTCP_TYPE_RR:
2019         rtp_session_process_rr (sess, &packet, &arrival);
2020         break;
2021       case GST_RTCP_TYPE_SDES:
2022         rtp_session_process_sdes (sess, &packet, &arrival);
2023         break;
2024       case GST_RTCP_TYPE_BYE:
2025         is_bye = TRUE;
2026         /* don't try to attempt lip-sync anymore for streams with a BYE */
2027         do_sync = FALSE;
2028         rtp_session_process_bye (sess, &packet, &arrival);
2029         break;
2030       case GST_RTCP_TYPE_APP:
2031         rtp_session_process_app (sess, &packet, &arrival);
2032         break;
2033       default:
2034         GST_WARNING ("got unknown RTCP packet");
2035         break;
2036     }
2037   next:
2038     more = gst_rtcp_packet_move_to_next (&packet);
2039   }
2040
2041   /* if we are scheduling a BYE, we only want to count bye packets, else we
2042    * count everything */
2043   if (sess->source->received_bye) {
2044     if (is_bye) {
2045       sess->stats.bye_members++;
2046       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2047     }
2048   } else {
2049     /* keep track of average packet size */
2050     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2051   }
2052   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2053       sess->stats.avg_rtcp_packet_size, arrival.bytes);
2054   RTP_SESSION_UNLOCK (sess);
2055
2056   /* notify caller of sr packets in the callback */
2057   if (do_sync && sess->callbacks.sync_rtcp)
2058     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
2059         sess->sync_rtcp_user_data);
2060   else
2061     gst_buffer_unref (buffer);
2062
2063   return result;
2064
2065   /* ERRORS */
2066 invalid_packet:
2067   {
2068     GST_DEBUG ("invalid RTCP packet received");
2069     gst_buffer_unref (buffer);
2070     return GST_FLOW_OK;
2071   }
2072 ignore:
2073   {
2074     gst_buffer_unref (buffer);
2075     RTP_SESSION_UNLOCK (sess);
2076     GST_DEBUG ("ignoring RTP packet because we left");
2077     return GST_FLOW_OK;
2078   }
2079 }
2080
2081 /**
2082  * rtp_session_send_rtp:
2083  * @sess: an #RTPSession
2084  * @data: pointer to either an RTP buffer or a list of RTP buffers
2085  * @is_list: TRUE when @data is a buffer list
2086  * @current_time: the current system time
2087  * @running_time: the running time of @data
2088  *
2089  * Send the RTP buffer in the session manager. This function takes ownership of
2090  * @buffer.
2091  *
2092  * Returns: a #GstFlowReturn.
2093  */
2094 GstFlowReturn
2095 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2096     GstClockTime current_time, GstClockTime running_time)
2097 {
2098   GstFlowReturn result;
2099   RTPSource *source;
2100   gboolean prevsender;
2101   gboolean valid_packet;
2102   guint64 oldrate;
2103
2104   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2105   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2106
2107   if (is_list) {
2108     valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data));
2109   } else {
2110     valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data));
2111   }
2112
2113   if (!valid_packet)
2114     goto invalid_packet;
2115
2116   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2117
2118   RTP_SESSION_LOCK (sess);
2119   source = sess->source;
2120
2121   /* update last activity */
2122   source->last_rtp_activity = current_time;
2123
2124   prevsender = RTP_SOURCE_IS_SENDER (source);
2125   oldrate = source->bitrate;
2126
2127   /* we use our own source to send */
2128   result = rtp_source_send_rtp (source, data, is_list, running_time);
2129
2130   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
2131     sess->stats.sender_sources++;
2132   if (oldrate != source->bitrate)
2133     sess->recalc_bandwidth = TRUE;
2134   RTP_SESSION_UNLOCK (sess);
2135
2136   return result;
2137
2138   /* ERRORS */
2139 invalid_packet:
2140   {
2141     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2142     GST_DEBUG ("invalid RTP packet received");
2143     return GST_FLOW_OK;
2144   }
2145 }
2146
2147 static void
2148 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2149 {
2150   *bandwidth += source->bitrate;
2151 }
2152
2153 static GstClockTime
2154 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2155     gboolean first)
2156 {
2157   GstClockTime result;
2158
2159   /* recalculate bandwidth when it changed */
2160   if (sess->recalc_bandwidth) {
2161     gdouble bandwidth;
2162
2163     if (sess->bandwidth > 0)
2164       bandwidth = sess->bandwidth;
2165     else {
2166       /* If it is <= 0, then try to estimate the actual bandwidth */
2167       bandwidth = sess->source->bitrate;
2168
2169       g_hash_table_foreach (sess->cnames, (GHFunc) add_bitrates, &bandwidth);
2170       bandwidth /= 8.0;
2171     }
2172     if (bandwidth == 0)
2173       bandwidth = RTP_STATS_BANDWIDTH;
2174
2175     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2176         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2177
2178     sess->recalc_bandwidth = FALSE;
2179   }
2180
2181   if (sess->source->received_bye) {
2182     result = rtp_stats_calculate_bye_interval (&sess->stats);
2183   } else {
2184     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
2185         RTP_SOURCE_IS_SENDER (sess->source), first);
2186   }
2187
2188   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2189       GST_TIME_ARGS (result), first);
2190
2191   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2192     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
2193
2194   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2195
2196   return result;
2197 }
2198
2199 /* Stop the current @sess and schedule a BYE message for the other members.
2200  * One must have the session lock to call this function
2201  */
2202 static GstFlowReturn
2203 rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason,
2204     GstClockTime current_time)
2205 {
2206   GstFlowReturn result = GST_FLOW_OK;
2207   RTPSource *source;
2208   GstClockTime interval;
2209
2210   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2211
2212   source = sess->source;
2213
2214   /* ignore more BYEs */
2215   if (source->received_bye)
2216     goto done;
2217
2218   /* we have BYE now */
2219   source->received_bye = TRUE;
2220   /* at least one member wants to send a BYE */
2221   g_free (sess->bye_reason);
2222   sess->bye_reason = g_strdup (reason);
2223   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
2224   sess->stats.bye_members = 1;
2225   sess->first_rtcp = TRUE;
2226   sess->sent_bye = FALSE;
2227   sess->allow_early = TRUE;
2228
2229   /* reschedule transmission */
2230   sess->last_rtcp_send_time = current_time;
2231   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2232   sess->next_rtcp_check_time = current_time + interval;
2233
2234   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2235       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2236
2237   RTP_SESSION_UNLOCK (sess);
2238   /* notify app of reconsideration */
2239   if (sess->callbacks.reconsider)
2240     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2241   RTP_SESSION_LOCK (sess);
2242 done:
2243
2244   return result;
2245 }
2246
2247 /**
2248  * rtp_session_schedule_bye:
2249  * @sess: an #RTPSession
2250  * @reason: a reason or NULL
2251  * @current_time: the current system time
2252  *
2253  * Stop the current @sess and schedule a BYE message for the other members.
2254  *
2255  * Returns: a #GstFlowReturn.
2256  */
2257 GstFlowReturn
2258 rtp_session_schedule_bye (RTPSession * sess, const gchar * reason,
2259     GstClockTime current_time)
2260 {
2261   GstFlowReturn result = GST_FLOW_OK;
2262
2263   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2264
2265   RTP_SESSION_LOCK (sess);
2266   result = rtp_session_schedule_bye_locked (sess, reason, current_time);
2267   RTP_SESSION_UNLOCK (sess);
2268
2269   return result;
2270 }
2271
2272 /**
2273  * rtp_session_next_timeout:
2274  * @sess: an #RTPSession
2275  * @current_time: the current system time
2276  *
2277  * Get the next time we should perform session maintenance tasks.
2278  *
2279  * Returns: a time when rtp_session_on_timeout() should be called with the
2280  * current system time.
2281  */
2282 GstClockTime
2283 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2284 {
2285   GstClockTime result, interval = 0;
2286
2287   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2288
2289   RTP_SESSION_LOCK (sess);
2290
2291   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
2292     result = sess->next_early_rtcp_time;
2293     goto early_exit;
2294   }
2295
2296   result = sess->next_rtcp_check_time;
2297
2298   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
2299       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2300
2301   if (result < current_time) {
2302     GST_DEBUG ("take current time as base");
2303     /* our previous check time expired, start counting from the current time
2304      * again. */
2305     result = current_time;
2306   }
2307
2308   if (sess->source->received_bye) {
2309     if (sess->sent_bye) {
2310       GST_DEBUG ("we sent BYE already");
2311       interval = GST_CLOCK_TIME_NONE;
2312     } else if (sess->stats.active_sources >= 50) {
2313       GST_DEBUG ("reconsider BYE, more than 50 sources");
2314       /* reconsider BYE if members >= 50 */
2315       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2316     }
2317   } else {
2318     if (sess->first_rtcp) {
2319       GST_DEBUG ("first RTCP packet");
2320       /* we are called for the first time */
2321       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2322     } else if (sess->next_rtcp_check_time < current_time) {
2323       GST_DEBUG ("old check time expired, getting new timeout");
2324       /* get a new timeout when we need to */
2325       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2326     }
2327   }
2328
2329   if (interval != GST_CLOCK_TIME_NONE)
2330     result += interval;
2331   else
2332     result = GST_CLOCK_TIME_NONE;
2333
2334   sess->next_rtcp_check_time = result;
2335
2336 early_exit:
2337
2338   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2339       ", next time: %" GST_TIME_FORMAT,
2340       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2341   RTP_SESSION_UNLOCK (sess);
2342
2343   return result;
2344 }
2345
2346 typedef struct
2347 {
2348   RTPSession *sess;
2349   GstBuffer *rtcp;
2350   GstClockTime current_time;
2351   guint64 ntpnstime;
2352   GstClockTime running_time;
2353   GstClockTime interval;
2354   GstRTCPPacket packet;
2355   gboolean is_bye;
2356   gboolean has_sdes;
2357   gboolean is_early;
2358   gboolean may_suppress;
2359 } ReportData;
2360
2361 static void
2362 session_start_rtcp (RTPSession * sess, ReportData * data)
2363 {
2364   GstRTCPPacket *packet = &data->packet;
2365   RTPSource *own = sess->source;
2366
2367   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2368
2369   if (RTP_SOURCE_IS_SENDER (own)) {
2370     guint64 ntptime;
2371     guint32 rtptime;
2372     guint32 packet_count, octet_count;
2373
2374     /* we are a sender, create SR */
2375     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2376     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
2377
2378     /* get latest stats */
2379     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
2380         &ntptime, &rtptime, &packet_count, &octet_count);
2381     /* store stats */
2382     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2383         packet_count, octet_count);
2384
2385     /* fill in sender report info */
2386     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2387         ntptime, rtptime, packet_count, octet_count);
2388   } else {
2389     /* we are only receiver, create RR */
2390     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2391     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
2392     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2393   }
2394 }
2395
2396 /* construct a Sender or Receiver Report */
2397 static void
2398 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2399 {
2400   RTPSession *sess = data->sess;
2401   GstRTCPPacket *packet = &data->packet;
2402
2403   /* create a new buffer if needed */
2404   if (data->rtcp == NULL) {
2405     session_start_rtcp (sess, data);
2406   } else if (data->is_early) {
2407     /* Put a single RR or SR in minimal compound packets */
2408     return;
2409   }
2410   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
2411     /* only report about other sender sources */
2412     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
2413       guint8 fractionlost;
2414       gint32 packetslost;
2415       guint32 exthighestseq, jitter;
2416       guint32 lsr, dlsr;
2417
2418       /* get new stats */
2419       rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2420           &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2421
2422       /* store last generated RR packet */
2423       source->last_rr.is_valid = TRUE;
2424       source->last_rr.fractionlost = fractionlost;
2425       source->last_rr.packetslost = packetslost;
2426       source->last_rr.exthighestseq = exthighestseq;
2427       source->last_rr.jitter = jitter;
2428       source->last_rr.lsr = lsr;
2429       source->last_rr.dlsr = dlsr;
2430
2431       /* packet is not yet filled, add report block for this source. */
2432       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2433           exthighestseq, jitter, lsr, dlsr);
2434     }
2435   }
2436 }
2437
2438 /* perform cleanup of sources that timed out */
2439 static void
2440 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2441 {
2442   gboolean remove = FALSE;
2443   gboolean byetimeout = FALSE;
2444   gboolean sendertimeout = FALSE;
2445   gboolean is_sender, is_active;
2446   RTPSession *sess = data->sess;
2447   GstClockTime interval;
2448
2449   is_sender = RTP_SOURCE_IS_SENDER (source);
2450   is_active = RTP_SOURCE_IS_ACTIVE (source);
2451
2452   /* check for our own source, we don't want to delete our own source. */
2453   if (!(source == sess->source)) {
2454     if (source->received_bye) {
2455       /* if we received a BYE from the source, remove the source after some
2456        * time. */
2457       if (data->current_time > source->bye_time &&
2458           data->current_time - source->bye_time > sess->stats.bye_timeout) {
2459         GST_DEBUG ("removing BYE source %08x", source->ssrc);
2460         remove = TRUE;
2461         byetimeout = TRUE;
2462       }
2463     }
2464     /* sources that were inactive for more than 5 times the deterministic reporting
2465      * interval get timed out. the min timeout is 5 seconds. */
2466     if (data->current_time > source->last_activity) {
2467       interval = MAX (data->interval * 5, 5 * GST_SECOND);
2468       if (data->current_time - source->last_activity > interval) {
2469         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2470             source->ssrc, GST_TIME_ARGS (source->last_activity));
2471         remove = TRUE;
2472       }
2473     }
2474   }
2475
2476   /* senders that did not send for a long time become a receiver, this also
2477    * holds for our own source. */
2478   if (is_sender) {
2479     if (data->current_time > source->last_rtp_activity) {
2480       interval = MAX (data->interval * 2, 5 * GST_SECOND);
2481       if (data->current_time - source->last_rtp_activity > interval) {
2482         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
2483             GST_TIME_FORMAT, source->ssrc,
2484             GST_TIME_ARGS (source->last_rtp_activity));
2485         source->is_sender = FALSE;
2486         sess->stats.sender_sources--;
2487         sendertimeout = TRUE;
2488       }
2489     }
2490   }
2491
2492   if (remove) {
2493     sess->total_sources--;
2494     if (is_sender)
2495       sess->stats.sender_sources--;
2496     if (is_active)
2497       sess->stats.active_sources--;
2498
2499     if (byetimeout)
2500       on_bye_timeout (sess, source);
2501     else
2502       on_timeout (sess, source);
2503   } else {
2504     if (sendertimeout)
2505       on_sender_timeout (sess, source);
2506   }
2507
2508   source->closing = remove;
2509 }
2510
2511 static void
2512 session_sdes (RTPSession * sess, ReportData * data)
2513 {
2514   GstRTCPPacket *packet = &data->packet;
2515   const GstStructure *sdes;
2516   gint i, n_fields;
2517
2518   /* add SDES packet */
2519   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
2520
2521   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
2522
2523   sdes = rtp_source_get_sdes_struct (sess->source);
2524
2525   /* add all fields in the structure, the order is not important. */
2526   n_fields = gst_structure_n_fields (sdes);
2527   for (i = 0; i < n_fields; ++i) {
2528     const gchar *field;
2529     const gchar *value;
2530     GstRTCPSDESType type;
2531
2532     field = gst_structure_nth_field_name (sdes, i);
2533     if (field == NULL)
2534       continue;
2535     value = gst_structure_get_string (sdes, field);
2536     if (value == NULL)
2537       continue;
2538     type = gst_rtcp_sdes_name_to_type (field);
2539
2540     /* Early packets are minimal and only include the CNAME */
2541     if (data->is_early && type != GST_RTCP_SDES_CNAME)
2542       continue;
2543
2544     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
2545       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
2546           (const guint8 *) value);
2547     } else if (type == GST_RTCP_SDES_PRIV) {
2548       gsize prefix_len;
2549       gsize value_len;
2550       gsize data_len;
2551       guint8 data[256];
2552
2553       /* don't accept entries that are too big */
2554       prefix_len = strlen (field);
2555       if (prefix_len > 255)
2556         continue;
2557       value_len = strlen (value);
2558       if (value_len > 255)
2559         continue;
2560       data_len = 1 + prefix_len + value_len;
2561       if (data_len > 255)
2562         continue;
2563
2564       data[0] = prefix_len;
2565       memcpy (&data[1], field, prefix_len);
2566       memcpy (&data[1 + prefix_len], value, value_len);
2567
2568       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
2569     }
2570   }
2571
2572   data->has_sdes = TRUE;
2573 }
2574
2575 /* schedule a BYE packet */
2576 static void
2577 session_bye (RTPSession * sess, ReportData * data)
2578 {
2579   GstRTCPPacket *packet = &data->packet;
2580
2581   /* open packet */
2582   session_start_rtcp (sess, data);
2583
2584   /* add SDES */
2585   session_sdes (sess, data);
2586
2587   /* add a BYE packet */
2588   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
2589   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
2590   if (sess->bye_reason)
2591     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
2592
2593   /* we have a BYE packet now */
2594   data->is_bye = TRUE;
2595 }
2596
2597 static gboolean
2598 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
2599 {
2600   GstClockTime new_send_time, elapsed;
2601
2602   if (data->is_early && sess->next_early_rtcp_time < current_time)
2603     goto early;
2604
2605   /* no need to check yet */
2606   if (sess->next_rtcp_check_time > current_time) {
2607     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
2608         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
2609         GST_TIME_ARGS (current_time));
2610     return FALSE;
2611   }
2612
2613   /* get elapsed time since we last reported */
2614   elapsed = current_time - sess->last_rtcp_send_time;
2615
2616   /* perform forward reconsideration */
2617   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
2618
2619   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
2620       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
2621
2622   new_send_time += sess->last_rtcp_send_time;
2623
2624   /* check if reconsideration */
2625   if (current_time < new_send_time) {
2626     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
2627         GST_TIME_ARGS (new_send_time));
2628     /* store new check time */
2629     sess->next_rtcp_check_time = new_send_time;
2630     return FALSE;
2631   }
2632
2633 early:
2634
2635   new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
2636
2637   GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
2638       GST_TIME_ARGS (new_send_time));
2639   sess->next_rtcp_check_time = current_time + new_send_time;
2640
2641   /* Apply the rules from RFC 4585 section 3.5.3 */
2642   if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
2643     GstClockTimeDiff T_rr_current_interval = g_random_double_range (0.5, 1.5) *
2644         sess->stats.min_interval;
2645
2646     /* This will caused the RTCP to be suppressed if no FB packets are added */
2647     if (sess->last_rtcp_send_time + T_rr_current_interval >
2648         sess->next_rtcp_check_time) {
2649       GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
2650           " last: %" GST_TIME_FORMAT
2651           " + T_rr_current_interval: %" GST_TIME_FORMAT
2652           " >  sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
2653           GST_TIME_ARGS (sess->stats.min_interval),
2654           GST_TIME_ARGS (sess->last_rtcp_send_time),
2655           GST_TIME_ARGS (T_rr_current_interval),
2656           GST_TIME_ARGS (sess->next_rtcp_check_time));
2657       data->may_suppress = TRUE;
2658     }
2659   }
2660
2661   return TRUE;
2662 }
2663
2664 static void
2665 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
2666 {
2667   g_hash_table_insert (hash_table, key, g_object_ref (source));
2668 }
2669
2670 static gboolean
2671 remove_closing_sources (const gchar * key, RTPSource * source, gpointer * data)
2672 {
2673   return source->closing;
2674 }
2675
2676 /**
2677  * rtp_session_on_timeout:
2678  * @sess: an #RTPSession
2679  * @current_time: the current system time
2680  * @ntpnstime: the current NTP time in nanoseconds
2681  * @running_time: the current running_time of the pipeline
2682  *
2683  * Perform maintenance actions after the timeout obtained with
2684  * rtp_session_next_timeout() expired.
2685  *
2686  * This function will perform timeouts of receivers and senders, send a BYE
2687  * packet or generate RTCP packets with current session stats.
2688  *
2689  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
2690  * times, for each packet that should be processed.
2691  *
2692  * Returns: a #GstFlowReturn.
2693  */
2694 GstFlowReturn
2695 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
2696     guint64 ntpnstime, GstClockTime running_time)
2697 {
2698   GstFlowReturn result = GST_FLOW_OK;
2699   ReportData data;
2700   RTPSource *own;
2701   GHashTable *table_copy;
2702   gboolean notify = FALSE;
2703
2704   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2705
2706   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
2707       GST_TIME_ARGS (current_time), GST_TIME_ARGS (ntpnstime));
2708
2709   data.sess = sess;
2710   data.rtcp = NULL;
2711   data.current_time = current_time;
2712   data.ntpnstime = ntpnstime;
2713   data.is_bye = FALSE;
2714   data.has_sdes = FALSE;
2715   data.may_suppress = FALSE;
2716   data.running_time = running_time;
2717
2718   own = sess->source;
2719
2720   RTP_SESSION_LOCK (sess);
2721   /* get a new interval, we need this for various cleanups etc */
2722   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
2723
2724   /* Make a local copy of the hashtable. We need to do this because the
2725    * cleanup stage below releases the session lock. */
2726   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
2727       (GDestroyNotify) g_object_unref);
2728   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2729       (GHFunc) clone_ssrcs_hashtable, table_copy);
2730
2731   /* Clean up the session, mark the source for removing, this might release the
2732    * session lock. */
2733   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
2734   g_hash_table_destroy (table_copy);
2735
2736   /* Now remove the marked sources */
2737   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
2738       (GHRFunc) remove_closing_sources, NULL);
2739
2740   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
2741     data.is_early = TRUE;
2742   else
2743     data.is_early = FALSE;
2744
2745   /* see if we need to generate SR or RR packets */
2746   if (is_rtcp_time (sess, current_time, &data)) {
2747     if (own->received_bye) {
2748       /* generate BYE instead */
2749       GST_DEBUG ("generating BYE message");
2750       session_bye (sess, &data);
2751       sess->sent_bye = TRUE;
2752     } else {
2753       /* loop over all known sources and do something */
2754       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2755           (GHFunc) session_report_blocks, &data);
2756     }
2757   }
2758
2759   if (data.rtcp) {
2760     /* we keep track of the last report time in order to timeout inactive
2761      * receivers or senders */
2762     if (!data.is_early && !data.may_suppress)
2763       sess->last_rtcp_send_time = data.current_time;
2764     sess->first_rtcp = FALSE;
2765     sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
2766
2767     /* add SDES for this source when not already added */
2768     if (!data.has_sdes)
2769       session_sdes (sess, &data);
2770   }
2771
2772   /* check for outdated collisions */
2773   GST_DEBUG ("Timing out collisions");
2774   rtp_source_timeout (sess->source, current_time,
2775       data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT);
2776
2777   if (sess->change_ssrc) {
2778     GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
2779     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
2780         GINT_TO_POINTER (own->ssrc));
2781
2782     own->ssrc = rtp_session_create_new_ssrc (sess);
2783     rtp_source_reset (own);
2784
2785     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
2786         GINT_TO_POINTER (own->ssrc), own);
2787
2788     g_free (sess->bye_reason);
2789     sess->bye_reason = NULL;
2790     sess->sent_bye = FALSE;
2791     sess->change_ssrc = FALSE;
2792     notify = TRUE;
2793     GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
2794   }
2795
2796   sess->allow_early = TRUE;
2797
2798   RTP_SESSION_UNLOCK (sess);
2799
2800   if (notify)
2801     g_object_notify (G_OBJECT (sess), "internal-ssrc");
2802
2803   /* push out the RTCP packet */
2804   if (data.rtcp) {
2805     gboolean do_not_suppress;
2806
2807     /* Give the user a change to add its own packet */
2808     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
2809         data.rtcp, data.is_early, &do_not_suppress);
2810
2811     if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
2812       guint packet_size;
2813
2814       /* close the RTCP packet */
2815       gst_rtcp_buffer_end (data.rtcp);
2816
2817       packet_size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
2818
2819       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
2820       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
2821           sess->stats.avg_rtcp_packet_size, packet_size);
2822       result =
2823           sess->callbacks.send_rtcp (sess, own, data.rtcp, sess->sent_bye,
2824           sess->send_rtcp_user_data);
2825     } else {
2826       GST_DEBUG ("freeing packet callback: %p"
2827           " do_not_suppress: %d may_suppress: %d",
2828           sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
2829       gst_buffer_unref (data.rtcp);
2830     }
2831   }
2832
2833   return result;
2834 }
2835
2836 void
2837 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
2838     GstClockTimeDiff max_delay)
2839 {
2840   GstClockTime T_dither_max;
2841
2842   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
2843
2844   RTP_SESSION_LOCK (sess);
2845
2846   /* Check if already requested */
2847   /*  RFC 4585 section 3.5.2 step 2 */
2848   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
2849     goto dont_send;
2850
2851   /* Ignore the request a scheduled packet will be in time anyway */
2852   if (current_time + max_delay > sess->next_rtcp_check_time)
2853     goto dont_send;
2854
2855   /*  RFC 4585 section 3.5.2 step 2b */
2856   /* If the total sources is <=2, then there is only us and one peer */
2857   if (sess->total_sources <= 2) {
2858     T_dither_max = 0;
2859   } else {
2860     /* Divide by 2 because l = 0.5 */
2861     T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
2862     T_dither_max /= 2;
2863   }
2864
2865   /*  RFC 4585 section 3.5.2 step 3 */
2866   if (current_time + T_dither_max > sess->next_rtcp_check_time)
2867     goto dont_send;
2868
2869   /*  RFC 4585 section 3.5.2 step 4 */
2870   if (sess->allow_early == FALSE)
2871     goto dont_send;
2872
2873   if (T_dither_max) {
2874     /* Schedule an early transmission later */
2875     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
2876         current_time;
2877   } else {
2878     /* If no dithering, schedule it for NOW */
2879     sess->next_early_rtcp_time = current_time;
2880   }
2881
2882   RTP_SESSION_UNLOCK (sess);
2883
2884   /* notify app of need to send packet early
2885    * and therefore of timeout change */
2886   if (sess->callbacks.reconsider)
2887     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2888
2889   return;
2890
2891 dont_send:
2892
2893   RTP_SESSION_UNLOCK (sess);
2894
2895 }