rtpsession: Make rtcp buffer metadata writable after processing it
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "gstrtpbin-marshal.h"
27 #include "rtpsession.h"
28
29 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
30 #define GST_CAT_DEFAULT rtp_session_debug
31
32 /* signals and args */
33 enum
34 {
35   SIGNAL_GET_SOURCE_BY_SSRC,
36   SIGNAL_ON_NEW_SSRC,
37   SIGNAL_ON_SSRC_COLLISION,
38   SIGNAL_ON_SSRC_VALIDATED,
39   SIGNAL_ON_SSRC_ACTIVE,
40   SIGNAL_ON_SSRC_SDES,
41   SIGNAL_ON_BYE_SSRC,
42   SIGNAL_ON_BYE_TIMEOUT,
43   SIGNAL_ON_TIMEOUT,
44   SIGNAL_ON_SENDER_TIMEOUT,
45   SIGNAL_ON_SENDING_RTCP,
46   SIGNAL_ON_FEEDBACK_RTCP,
47   LAST_SIGNAL
48 };
49
50 #define DEFAULT_INTERNAL_SOURCE      NULL
51 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
52 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
53 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
54 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
55 #define DEFAULT_RTCP_MTU             1400
56 #define DEFAULT_SDES                 NULL
57 #define DEFAULT_NUM_SOURCES          0
58 #define DEFAULT_NUM_ACTIVE_SOURCES   0
59 #define DEFAULT_SOURCES              NULL
60 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
61
62 enum
63 {
64   PROP_0,
65   PROP_INTERNAL_SSRC,
66   PROP_INTERNAL_SOURCE,
67   PROP_BANDWIDTH,
68   PROP_RTCP_FRACTION,
69   PROP_RTCP_RR_BANDWIDTH,
70   PROP_RTCP_RS_BANDWIDTH,
71   PROP_RTCP_MTU,
72   PROP_SDES,
73   PROP_NUM_SOURCES,
74   PROP_NUM_ACTIVE_SOURCES,
75   PROP_SOURCES,
76   PROP_FAVOR_NEW,
77   PROP_RTCP_MIN_INTERVAL,
78   PROP_LAST
79 };
80
81 /* update average packet size */
82 #define INIT_AVG(avg, val) \
83    (avg) = (val);
84 #define UPDATE_AVG(avg, val)            \
85   if ((avg) == 0)                       \
86    (avg) = (val);                       \
87   else                                  \
88    (avg) = ((val) + (15 * (avg))) >> 4;
89
90
91 /* The number RTCP intervals after which to timeout entries in the
92  * collision table
93  */
94 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
95
96 /* GObject vmethods */
97 static void rtp_session_finalize (GObject * object);
98 static void rtp_session_set_property (GObject * object, guint prop_id,
99     const GValue * value, GParamSpec * pspec);
100 static void rtp_session_get_property (GObject * object, guint prop_id,
101     GValue * value, GParamSpec * pspec);
102
103 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
104
105 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
106
107 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
108     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
109 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
110     const gchar * reason, GstClockTime current_time);
111 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
112     gboolean deterministic, gboolean first);
113
114 static gboolean
115 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
116     const GValue * handler_return, gpointer data)
117 {
118   if (g_value_get_boolean (handler_return))
119     g_value_set_boolean (return_accu, TRUE);
120
121   return TRUE;
122 }
123
124 static void
125 rtp_session_class_init (RTPSessionClass * klass)
126 {
127   GObjectClass *gobject_class;
128
129   gobject_class = (GObjectClass *) klass;
130
131   gobject_class->finalize = rtp_session_finalize;
132   gobject_class->set_property = rtp_session_set_property;
133   gobject_class->get_property = rtp_session_get_property;
134
135   /**
136    * RTPSession::get-source-by-ssrc:
137    * @session: the object which received the signal
138    * @ssrc: the SSRC of the RTPSource
139    *
140    * Request the #RTPSource object with SSRC @ssrc in @session.
141    */
142   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
143       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
144       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
145           get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
146       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
147
148   /**
149    * RTPSession::on-new-ssrc:
150    * @session: the object which received the signal
151    * @src: the new RTPSource
152    *
153    * Notify of a new SSRC that entered @session.
154    */
155   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
156       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
157       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
158       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
159       RTP_TYPE_SOURCE);
160   /**
161    * RTPSession::on-ssrc-collision:
162    * @session: the object which received the signal
163    * @src: the #RTPSource that caused a collision
164    *
165    * Notify when we have an SSRC collision
166    */
167   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
168       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
169       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
170       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
171       RTP_TYPE_SOURCE);
172   /**
173    * RTPSession::on-ssrc-validated:
174    * @session: the object which received the signal
175    * @src: the new validated RTPSource
176    *
177    * Notify of a new SSRC that became validated.
178    */
179   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
180       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
181       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
182       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
183       RTP_TYPE_SOURCE);
184   /**
185    * RTPSession::on-ssrc-active:
186    * @session: the object which received the signal
187    * @src: the active RTPSource
188    *
189    * Notify of a SSRC that is active, i.e., sending RTCP.
190    */
191   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
192       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
193       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
194       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
195       RTP_TYPE_SOURCE);
196   /**
197    * RTPSession::on-ssrc-sdes:
198    * @session: the object which received the signal
199    * @src: the RTPSource
200    *
201    * Notify that a new SDES was received for SSRC.
202    */
203   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
204       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
205       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
206       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
207       RTP_TYPE_SOURCE);
208   /**
209    * RTPSession::on-bye-ssrc:
210    * @session: the object which received the signal
211    * @src: the RTPSource that went away
212    *
213    * Notify of an SSRC that became inactive because of a BYE packet.
214    */
215   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
216       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
217       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
218       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
219       RTP_TYPE_SOURCE);
220   /**
221    * RTPSession::on-bye-timeout:
222    * @session: the object which received the signal
223    * @src: the RTPSource that timed out
224    *
225    * Notify of an SSRC that has timed out because of BYE
226    */
227   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
228       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
229       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
230       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
231       RTP_TYPE_SOURCE);
232   /**
233    * RTPSession::on-timeout:
234    * @session: the object which received the signal
235    * @src: the RTPSource that timed out
236    *
237    * Notify of an SSRC that has timed out
238    */
239   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
240       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
241       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
242       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
243       RTP_TYPE_SOURCE);
244   /**
245    * RTPSession::on-sender-timeout:
246    * @session: the object which received the signal
247    * @src: the RTPSource that timed out
248    *
249    * Notify of an SSRC that was a sender but timed out and became a receiver.
250    */
251   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
252       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
253       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
254       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
255       RTP_TYPE_SOURCE);
256
257   /**
258    * RTPSession::on-sending-rtcp
259    * @session: the object which received the signal
260    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
261    * @early: %TRUE if the packet is early, %FALSE if it is regular
262    *
263    * This signal is emitted before sending an RTCP packet, it can be used
264    * to add extra RTCP Packets.
265    *
266    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
267    * if suppressing it is acceptable
268    */
269   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
270       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
271       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
272       accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__POINTER_BOOLEAN,
273       G_TYPE_BOOLEAN, 2, G_TYPE_POINTER, G_TYPE_BOOLEAN);
274
275   /**
276    * RTPSession::on-feedback-rtcp:
277    * @session: the object which received the signal
278    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
279    *  %GST_RTCP_TYPE_RTPFB
280    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
281    * @sender_ssrc: The SSRC of the sender
282    * @media_ssrc: The SSRC of the media this refers to
283    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
284    * there was no FCI
285    *
286    * Notify that a RTCP feedback packet has been received
287    */
288
289   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
290       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
291       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
292       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_POINTER,
293       G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
294       G_TYPE_POINTER);
295
296   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
297       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
298           "The internal SSRC used for the session",
299           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300
301   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
302       g_param_spec_object ("internal-source", "Internal Source",
303           "The internal source element of the session",
304           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
305
306   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
307       g_param_spec_double ("bandwidth", "Bandwidth",
308           "The bandwidth of the session (0 for auto-discover)",
309           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
310           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
311
312   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
313       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
314           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
315           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
316           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
317
318   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
319       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
320           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
321           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
322           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
323
324   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
325       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
326           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
327           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
328           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
329
330   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
331       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
332           "The maximum size of the RTCP packets",
333           16, G_MAXINT16, DEFAULT_RTCP_MTU,
334           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
335
336   g_object_class_install_property (gobject_class, PROP_SDES,
337       g_param_spec_boxed ("sdes", "SDES",
338           "The SDES items of this session",
339           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
340
341   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
342       g_param_spec_uint ("num-sources", "Num Sources",
343           "The number of sources in the session", 0, G_MAXUINT,
344           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
345
346   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
347       g_param_spec_uint ("num-active-sources", "Num Active Sources",
348           "The number of active sources in the session", 0, G_MAXUINT,
349           DEFAULT_NUM_ACTIVE_SOURCES,
350           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
351   /**
352    * RTPSource::sources
353    *
354    * Get a GValue Array of all sources in the session.
355    *
356    * <example>
357    * <title>Getting the #RTPSources of a session
358    * <programlisting>
359    * {
360    *   GValueArray *arr;
361    *   GValue *val;
362    *   guint i;
363    *
364    *   g_object_get (sess, "sources", &arr, NULL);
365    *
366    *   for (i = 0; i < arr->n_values; i++) {
367    *     RTPSource *source;
368    *
369    *     val = g_value_array_get_nth (arr, i);
370    *     source = g_value_get_object (val);
371    *   }
372    *   g_value_array_free (arr);
373    * }
374    * </programlisting>
375    * </example>
376    */
377   g_object_class_install_property (gobject_class, PROP_SOURCES,
378       g_param_spec_boxed ("sources", "Sources",
379           "An array of all known sources in the session",
380           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
381
382   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
383       g_param_spec_boolean ("favor-new", "Favor new sources",
384           "Resolve SSRC conflict in favor of new sources", FALSE,
385           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386
387   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
388       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
389           "Minimum interval between Regular RTCP packet (in ns)",
390           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   klass->get_source_by_ssrc =
394       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
395
396   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
397 }
398
399 static void
400 rtp_session_init (RTPSession * sess)
401 {
402   gint i;
403   gchar *str;
404
405   sess->lock = g_mutex_new ();
406   sess->key = g_random_int ();
407   sess->mask_idx = 0;
408   sess->mask = 0;
409
410   for (i = 0; i < 32; i++) {
411     sess->ssrcs[i] =
412         g_hash_table_new_full (NULL, NULL, NULL,
413         (GDestroyNotify) g_object_unref);
414   }
415   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
416
417   rtp_stats_init_defaults (&sess->stats);
418
419   sess->recalc_bandwidth = TRUE;
420   sess->bandwidth = DEFAULT_BANDWIDTH;
421   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
422   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
423   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
424
425   /* create an active SSRC for this session manager */
426   sess->source = rtp_session_create_source (sess);
427   sess->source->validated = TRUE;
428   sess->source->internal = TRUE;
429   sess->stats.active_sources++;
430   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
431
432   /* default UDP header length */
433   sess->header_len = 28;
434   sess->mtu = DEFAULT_RTCP_MTU;
435
436   /* some default SDES entries */
437   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
438   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
439   g_free (str);
440
441   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME,
442       g_get_real_name ());
443   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
444
445   sess->first_rtcp = TRUE;
446   sess->allow_early = TRUE;
447
448   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
449 }
450
451 static void
452 rtp_session_finalize (GObject * object)
453 {
454   RTPSession *sess;
455   gint i;
456
457   sess = RTP_SESSION_CAST (object);
458
459   g_mutex_free (sess->lock);
460   for (i = 0; i < 32; i++)
461     g_hash_table_destroy (sess->ssrcs[i]);
462
463   g_free (sess->bye_reason);
464
465   g_hash_table_destroy (sess->cnames);
466   g_object_unref (sess->source);
467
468   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
469 }
470
471 static void
472 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
473 {
474   GValue value = { 0 };
475
476   g_value_init (&value, RTP_TYPE_SOURCE);
477   g_value_take_object (&value, source);
478   /* copies the value */
479   g_value_array_append (arr, &value);
480 }
481
482 static GValueArray *
483 rtp_session_create_sources (RTPSession * sess)
484 {
485   GValueArray *res;
486   guint size;
487
488   RTP_SESSION_LOCK (sess);
489   /* get number of elements in the table */
490   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
491   /* create the result value array */
492   res = g_value_array_new (size);
493
494   /* and copy all values into the array */
495   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
496   RTP_SESSION_UNLOCK (sess);
497
498   return res;
499 }
500
501 static void
502 rtp_session_set_property (GObject * object, guint prop_id,
503     const GValue * value, GParamSpec * pspec)
504 {
505   RTPSession *sess;
506
507   sess = RTP_SESSION (object);
508
509   switch (prop_id) {
510     case PROP_INTERNAL_SSRC:
511       rtp_session_set_internal_ssrc (sess, g_value_get_uint (value));
512       break;
513     case PROP_BANDWIDTH:
514       sess->bandwidth = g_value_get_double (value);
515       sess->recalc_bandwidth = TRUE;
516       break;
517     case PROP_RTCP_FRACTION:
518       sess->rtcp_bandwidth = g_value_get_double (value);
519       sess->recalc_bandwidth = TRUE;
520       break;
521     case PROP_RTCP_RR_BANDWIDTH:
522       sess->rtcp_rr_bandwidth = g_value_get_int (value);
523       sess->recalc_bandwidth = TRUE;
524       break;
525     case PROP_RTCP_RS_BANDWIDTH:
526       sess->rtcp_rs_bandwidth = g_value_get_int (value);
527       sess->recalc_bandwidth = TRUE;
528       break;
529     case PROP_RTCP_MTU:
530       sess->mtu = g_value_get_uint (value);
531       break;
532     case PROP_SDES:
533       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
534       break;
535     case PROP_FAVOR_NEW:
536       sess->favor_new = g_value_get_boolean (value);
537       break;
538     case PROP_RTCP_MIN_INTERVAL:
539       rtp_stats_set_min_interval (&sess->stats,
540           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
541       break;
542     default:
543       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
544       break;
545   }
546 }
547
548 static void
549 rtp_session_get_property (GObject * object, guint prop_id,
550     GValue * value, GParamSpec * pspec)
551 {
552   RTPSession *sess;
553
554   sess = RTP_SESSION (object);
555
556   switch (prop_id) {
557     case PROP_INTERNAL_SSRC:
558       g_value_set_uint (value, rtp_session_get_internal_ssrc (sess));
559       break;
560     case PROP_INTERNAL_SOURCE:
561       g_value_take_object (value, rtp_session_get_internal_source (sess));
562       break;
563     case PROP_BANDWIDTH:
564       g_value_set_double (value, sess->bandwidth);
565       break;
566     case PROP_RTCP_FRACTION:
567       g_value_set_double (value, sess->rtcp_bandwidth);
568       break;
569     case PROP_RTCP_RR_BANDWIDTH:
570       g_value_set_int (value, sess->rtcp_rr_bandwidth);
571       break;
572     case PROP_RTCP_RS_BANDWIDTH:
573       g_value_set_int (value, sess->rtcp_rs_bandwidth);
574       break;
575     case PROP_RTCP_MTU:
576       g_value_set_uint (value, sess->mtu);
577       break;
578     case PROP_SDES:
579       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
580       break;
581     case PROP_NUM_SOURCES:
582       g_value_set_uint (value, rtp_session_get_num_sources (sess));
583       break;
584     case PROP_NUM_ACTIVE_SOURCES:
585       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
586       break;
587     case PROP_SOURCES:
588       g_value_take_boxed (value, rtp_session_create_sources (sess));
589       break;
590     case PROP_FAVOR_NEW:
591       g_value_set_boolean (value, sess->favor_new);
592       break;
593     case PROP_RTCP_MIN_INTERVAL:
594       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
595       break;
596     default:
597       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
598       break;
599   }
600 }
601
602 static void
603 on_new_ssrc (RTPSession * sess, RTPSource * source)
604 {
605   g_object_ref (source);
606   RTP_SESSION_UNLOCK (sess);
607   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
608   RTP_SESSION_LOCK (sess);
609   g_object_unref (source);
610 }
611
612 static void
613 on_ssrc_collision (RTPSession * sess, RTPSource * source)
614 {
615   g_object_ref (source);
616   RTP_SESSION_UNLOCK (sess);
617   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
618       source);
619   RTP_SESSION_LOCK (sess);
620   g_object_unref (source);
621 }
622
623 static void
624 on_ssrc_validated (RTPSession * sess, RTPSource * source)
625 {
626   g_object_ref (source);
627   RTP_SESSION_UNLOCK (sess);
628   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
629       source);
630   RTP_SESSION_LOCK (sess);
631   g_object_unref (source);
632 }
633
634 static void
635 on_ssrc_active (RTPSession * sess, RTPSource * source)
636 {
637   g_object_ref (source);
638   RTP_SESSION_UNLOCK (sess);
639   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
640   RTP_SESSION_LOCK (sess);
641   g_object_unref (source);
642 }
643
644 static void
645 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
646 {
647   g_object_ref (source);
648   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
649   RTP_SESSION_UNLOCK (sess);
650   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
651   RTP_SESSION_LOCK (sess);
652   g_object_unref (source);
653 }
654
655 static void
656 on_bye_ssrc (RTPSession * sess, RTPSource * source)
657 {
658   g_object_ref (source);
659   RTP_SESSION_UNLOCK (sess);
660   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
661   RTP_SESSION_LOCK (sess);
662   g_object_unref (source);
663 }
664
665 static void
666 on_bye_timeout (RTPSession * sess, RTPSource * source)
667 {
668   g_object_ref (source);
669   RTP_SESSION_UNLOCK (sess);
670   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
671   RTP_SESSION_LOCK (sess);
672   g_object_unref (source);
673 }
674
675 static void
676 on_timeout (RTPSession * sess, RTPSource * source)
677 {
678   g_object_ref (source);
679   RTP_SESSION_UNLOCK (sess);
680   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
681   RTP_SESSION_LOCK (sess);
682   g_object_unref (source);
683 }
684
685 static void
686 on_sender_timeout (RTPSession * sess, RTPSource * source)
687 {
688   g_object_ref (source);
689   RTP_SESSION_UNLOCK (sess);
690   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
691       source);
692   RTP_SESSION_LOCK (sess);
693   g_object_unref (source);
694 }
695
696 /**
697  * rtp_session_new:
698  *
699  * Create a new session object.
700  *
701  * Returns: a new #RTPSession. g_object_unref() after usage.
702  */
703 RTPSession *
704 rtp_session_new (void)
705 {
706   RTPSession *sess;
707
708   sess = g_object_new (RTP_TYPE_SESSION, NULL);
709
710   return sess;
711 }
712
713 /**
714  * rtp_session_set_callbacks:
715  * @sess: an #RTPSession
716  * @callbacks: callbacks to configure
717  * @user_data: user data passed in the callbacks
718  *
719  * Configure a set of callbacks to be notified of actions.
720  */
721 void
722 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
723     gpointer user_data)
724 {
725   g_return_if_fail (RTP_IS_SESSION (sess));
726
727   if (callbacks->process_rtp) {
728     sess->callbacks.process_rtp = callbacks->process_rtp;
729     sess->process_rtp_user_data = user_data;
730   }
731   if (callbacks->send_rtp) {
732     sess->callbacks.send_rtp = callbacks->send_rtp;
733     sess->send_rtp_user_data = user_data;
734   }
735   if (callbacks->send_rtcp) {
736     sess->callbacks.send_rtcp = callbacks->send_rtcp;
737     sess->send_rtcp_user_data = user_data;
738   }
739   if (callbacks->sync_rtcp) {
740     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
741     sess->sync_rtcp_user_data = user_data;
742   }
743   if (callbacks->clock_rate) {
744     sess->callbacks.clock_rate = callbacks->clock_rate;
745     sess->clock_rate_user_data = user_data;
746   }
747   if (callbacks->reconsider) {
748     sess->callbacks.reconsider = callbacks->reconsider;
749     sess->reconsider_user_data = user_data;
750   }
751 }
752
753 /**
754  * rtp_session_set_process_rtp_callback:
755  * @sess: an #RTPSession
756  * @callback: callback to set
757  * @user_data: user data passed in the callback
758  *
759  * Configure only the process_rtp callback to be notified of the process_rtp action.
760  */
761 void
762 rtp_session_set_process_rtp_callback (RTPSession * sess,
763     RTPSessionProcessRTP callback, gpointer user_data)
764 {
765   g_return_if_fail (RTP_IS_SESSION (sess));
766
767   sess->callbacks.process_rtp = callback;
768   sess->process_rtp_user_data = user_data;
769 }
770
771 /**
772  * rtp_session_set_send_rtp_callback:
773  * @sess: an #RTPSession
774  * @callback: callback to set
775  * @user_data: user data passed in the callback
776  *
777  * Configure only the send_rtp callback to be notified of the send_rtp action.
778  */
779 void
780 rtp_session_set_send_rtp_callback (RTPSession * sess,
781     RTPSessionSendRTP callback, gpointer user_data)
782 {
783   g_return_if_fail (RTP_IS_SESSION (sess));
784
785   sess->callbacks.send_rtp = callback;
786   sess->send_rtp_user_data = user_data;
787 }
788
789 /**
790  * rtp_session_set_send_rtcp_callback:
791  * @sess: an #RTPSession
792  * @callback: callback to set
793  * @user_data: user data passed in the callback
794  *
795  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
796  */
797 void
798 rtp_session_set_send_rtcp_callback (RTPSession * sess,
799     RTPSessionSendRTCP callback, gpointer user_data)
800 {
801   g_return_if_fail (RTP_IS_SESSION (sess));
802
803   sess->callbacks.send_rtcp = callback;
804   sess->send_rtcp_user_data = user_data;
805 }
806
807 /**
808  * rtp_session_set_sync_rtcp_callback:
809  * @sess: an #RTPSession
810  * @callback: callback to set
811  * @user_data: user data passed in the callback
812  *
813  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
814  */
815 void
816 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
817     RTPSessionSyncRTCP callback, gpointer user_data)
818 {
819   g_return_if_fail (RTP_IS_SESSION (sess));
820
821   sess->callbacks.sync_rtcp = callback;
822   sess->sync_rtcp_user_data = user_data;
823 }
824
825 /**
826  * rtp_session_set_clock_rate_callback:
827  * @sess: an #RTPSession
828  * @callback: callback to set
829  * @user_data: user data passed in the callback
830  *
831  * Configure only the clock_rate callback to be notified of the clock_rate action.
832  */
833 void
834 rtp_session_set_clock_rate_callback (RTPSession * sess,
835     RTPSessionClockRate callback, gpointer user_data)
836 {
837   g_return_if_fail (RTP_IS_SESSION (sess));
838
839   sess->callbacks.clock_rate = callback;
840   sess->clock_rate_user_data = user_data;
841 }
842
843 /**
844  * rtp_session_set_reconsider_callback:
845  * @sess: an #RTPSession
846  * @callback: callback to set
847  * @user_data: user data passed in the callback
848  *
849  * Configure only the reconsider callback to be notified of the reconsider action.
850  */
851 void
852 rtp_session_set_reconsider_callback (RTPSession * sess,
853     RTPSessionReconsider callback, gpointer user_data)
854 {
855   g_return_if_fail (RTP_IS_SESSION (sess));
856
857   sess->callbacks.reconsider = callback;
858   sess->reconsider_user_data = user_data;
859 }
860
861 /**
862  * rtp_session_set_bandwidth:
863  * @sess: an #RTPSession
864  * @bandwidth: the bandwidth allocated
865  *
866  * Set the session bandwidth in bytes per second.
867  */
868 void
869 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
870 {
871   g_return_if_fail (RTP_IS_SESSION (sess));
872
873   RTP_SESSION_LOCK (sess);
874   sess->stats.bandwidth = bandwidth;
875   RTP_SESSION_UNLOCK (sess);
876 }
877
878 /**
879  * rtp_session_get_bandwidth:
880  * @sess: an #RTPSession
881  *
882  * Get the session bandwidth.
883  *
884  * Returns: the session bandwidth.
885  */
886 gdouble
887 rtp_session_get_bandwidth (RTPSession * sess)
888 {
889   gdouble result;
890
891   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
892
893   RTP_SESSION_LOCK (sess);
894   result = sess->stats.bandwidth;
895   RTP_SESSION_UNLOCK (sess);
896
897   return result;
898 }
899
900 /**
901  * rtp_session_set_rtcp_fraction:
902  * @sess: an #RTPSession
903  * @bandwidth: the RTCP bandwidth
904  *
905  * Set the bandwidth in bytes per second that should be used for RTCP
906  * messages.
907  */
908 void
909 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
910 {
911   g_return_if_fail (RTP_IS_SESSION (sess));
912
913   RTP_SESSION_LOCK (sess);
914   sess->stats.rtcp_bandwidth = bandwidth;
915   RTP_SESSION_UNLOCK (sess);
916 }
917
918 /**
919  * rtp_session_get_rtcp_fraction:
920  * @sess: an #RTPSession
921  *
922  * Get the session bandwidth used for RTCP.
923  *
924  * Returns: The bandwidth used for RTCP messages.
925  */
926 gdouble
927 rtp_session_get_rtcp_fraction (RTPSession * sess)
928 {
929   gdouble result;
930
931   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
932
933   RTP_SESSION_LOCK (sess);
934   result = sess->stats.rtcp_bandwidth;
935   RTP_SESSION_UNLOCK (sess);
936
937   return result;
938 }
939
940 /**
941  * rtp_session_set_sdes_string:
942  * @sess: an #RTPSession
943  * @type: the type of the SDES item
944  * @item: a null-terminated string to set.
945  *
946  * Store an SDES item of @type in @sess.
947  *
948  * Returns: %FALSE if the data was unchanged @type is invalid.
949  */
950 gboolean
951 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
952     const gchar * item)
953 {
954   gboolean result;
955
956   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
957
958   RTP_SESSION_LOCK (sess);
959   result = rtp_source_set_sdes_string (sess->source, type, item);
960   RTP_SESSION_UNLOCK (sess);
961
962   return result;
963 }
964
965 /**
966  * rtp_session_get_sdes_string:
967  * @sess: an #RTPSession
968  * @type: the type of the SDES item
969  *
970  * Get the SDES item of @type from @sess.
971  *
972  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
973  * valid. g_free() after usage.
974  */
975 gchar *
976 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
977 {
978   gchar *result;
979
980   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
981
982   RTP_SESSION_LOCK (sess);
983   result = rtp_source_get_sdes_string (sess->source, type);
984   RTP_SESSION_UNLOCK (sess);
985
986   return result;
987 }
988
989 /**
990  * rtp_session_get_sdes_struct:
991  * @sess: an #RTSPSession
992  *
993  * Get the SDES data as a #GstStructure
994  *
995  * Returns: a GstStructure with SDES items for @sess. This function returns a
996  * copy of the SDES structure, use gst_structure_free() after usage.
997  */
998 GstStructure *
999 rtp_session_get_sdes_struct (RTPSession * sess)
1000 {
1001   const GstStructure *sdes;
1002   GstStructure *result = NULL;
1003
1004   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1005
1006   RTP_SESSION_LOCK (sess);
1007   sdes = rtp_source_get_sdes_struct (sess->source);
1008   if (sdes)
1009     result = gst_structure_copy (sdes);
1010   RTP_SESSION_UNLOCK (sess);
1011
1012   return result;
1013 }
1014
1015 /**
1016  * rtp_session_set_sdes_struct:
1017  * @sess: an #RTSPSession
1018  * @sdes: a #GstStructure
1019  *
1020  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1021  */
1022 void
1023 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1024 {
1025   g_return_if_fail (sdes);
1026   g_return_if_fail (RTP_IS_SESSION (sess));
1027
1028   RTP_SESSION_LOCK (sess);
1029   rtp_source_set_sdes_struct (sess->source, gst_structure_copy (sdes));
1030   RTP_SESSION_UNLOCK (sess);
1031 }
1032
1033 static GstFlowReturn
1034 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1035 {
1036   GstFlowReturn result = GST_FLOW_OK;
1037
1038   if (source == session->source) {
1039     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1040
1041     RTP_SESSION_UNLOCK (session);
1042
1043     if (session->callbacks.send_rtp)
1044       result =
1045           session->callbacks.send_rtp (session, source, data,
1046           session->send_rtp_user_data);
1047     else {
1048       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1049     }
1050   } else {
1051     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1052     RTP_SESSION_UNLOCK (session);
1053
1054     if (session->callbacks.process_rtp)
1055       result =
1056           session->callbacks.process_rtp (session, source,
1057           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1058     else
1059       gst_buffer_unref (GST_BUFFER_CAST (data));
1060   }
1061   RTP_SESSION_LOCK (session);
1062
1063   return result;
1064 }
1065
1066 static gint
1067 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1068 {
1069   gint result;
1070
1071   RTP_SESSION_UNLOCK (session);
1072
1073   if (session->callbacks.clock_rate)
1074     result =
1075         session->callbacks.clock_rate (session, pt,
1076         session->clock_rate_user_data);
1077   else
1078     result = -1;
1079
1080   RTP_SESSION_LOCK (session);
1081
1082   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1083
1084   return result;
1085 }
1086
1087 static RTPSourceCallbacks callbacks = {
1088   (RTPSourcePushRTP) source_push_rtp,
1089   (RTPSourceClockRate) source_clock_rate,
1090 };
1091
1092 static gboolean
1093 check_collision (RTPSession * sess, RTPSource * source,
1094     RTPArrivalStats * arrival, gboolean rtp)
1095 {
1096   /* If we have no arrival address, we can't do collision checking */
1097   if (!arrival->have_address)
1098     return FALSE;
1099
1100   if (sess->source != source) {
1101     GstNetAddress *from;
1102     gboolean have_from;
1103
1104     /* This is not our local source, but lets check if two remote
1105      * source collide
1106      */
1107
1108     if (rtp) {
1109       from = &source->rtp_from;
1110       have_from = source->have_rtp_from;
1111     } else {
1112       from = &source->rtcp_from;
1113       have_from = source->have_rtcp_from;
1114     }
1115
1116     if (have_from) {
1117       if (gst_netaddress_equal (from, &arrival->address)) {
1118         /* Address is the same */
1119         return FALSE;
1120       } else {
1121         GST_LOG ("we have a third-party collision or loop ssrc:%x",
1122             rtp_source_get_ssrc (source));
1123         if (sess->favor_new) {
1124           if (rtp_source_find_conflicting_address (source,
1125                   &arrival->address, arrival->current_time)) {
1126             gchar buf1[40];
1127             gst_netaddress_to_string (&arrival->address, buf1, 40);
1128             GST_LOG ("Known conflict on %x for %s, dropping packet",
1129                 rtp_source_get_ssrc (source), buf1);
1130             return TRUE;
1131           } else {
1132             gchar buf1[40], buf2[40];
1133
1134             /* Current address is not a known conflict, lets assume this is
1135              * a new source. Save old address in possible conflict list
1136              */
1137             rtp_source_add_conflicting_address (source, from,
1138                 arrival->current_time);
1139
1140             gst_netaddress_to_string (from, buf1, 40);
1141             gst_netaddress_to_string (&arrival->address, buf2, 40);
1142             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1143                 " saving old as known conflict",
1144                 rtp_source_get_ssrc (source), buf1, buf2);
1145
1146             if (rtp)
1147               rtp_source_set_rtp_from (source, &arrival->address);
1148             else
1149               rtp_source_set_rtcp_from (source, &arrival->address);
1150             return FALSE;
1151           }
1152         } else {
1153           /* Don't need to save old addresses, we ignore new sources */
1154           return TRUE;
1155         }
1156       }
1157     } else {
1158       /* We don't already have a from address for RTP, just set it */
1159       if (rtp)
1160         rtp_source_set_rtp_from (source, &arrival->address);
1161       else
1162         rtp_source_set_rtcp_from (source, &arrival->address);
1163       return FALSE;
1164     }
1165
1166     /* FIXME: Log 3rd party collision somehow
1167      * Maybe should be done in upper layer, only the SDES can tell us
1168      * if its a collision or a loop
1169      */
1170
1171     /* If the source has been inactive for some time, we assume that it has
1172      * simply changed its transport source address. Hence, there is no true
1173      * third-party collision - only a simulated one. */
1174     if (arrival->current_time > source->last_activity) {
1175       GstClockTime inactivity_period =
1176           arrival->current_time - source->last_activity;
1177       if (inactivity_period > 1 * GST_SECOND) {
1178         /* Use new network address */
1179         if (rtp) {
1180           g_assert (source->have_rtp_from);
1181           rtp_source_set_rtp_from (source, &arrival->address);
1182         } else {
1183           g_assert (source->have_rtcp_from);
1184           rtp_source_set_rtcp_from (source, &arrival->address);
1185         }
1186         return FALSE;
1187       }
1188     }
1189   } else {
1190     /* This is sending with our ssrc, is it an address we already know */
1191
1192     if (rtp_source_find_conflicting_address (source, &arrival->address,
1193             arrival->current_time)) {
1194       /* Its a known conflict, its probably a loop, not a collision
1195        * lets just drop the incoming packet
1196        */
1197       GST_DEBUG ("Our packets are being looped back to us, dropping");
1198     } else {
1199       /* Its a new collision, lets change our SSRC */
1200
1201       rtp_source_add_conflicting_address (source, &arrival->address,
1202           arrival->current_time);
1203
1204       GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
1205       on_ssrc_collision (sess, source);
1206
1207       rtp_session_schedule_bye_locked (sess, "SSRC Collision",
1208           arrival->current_time);
1209
1210       sess->change_ssrc = TRUE;
1211     }
1212   }
1213
1214   return TRUE;
1215 }
1216
1217
1218 /* must be called with the session lock, the returned source needs to be
1219  * unreffed after usage. */
1220 static RTPSource *
1221 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1222     RTPArrivalStats * arrival, gboolean rtp)
1223 {
1224   RTPSource *source;
1225
1226   source =
1227       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1228   if (source == NULL) {
1229     /* make new Source in probation and insert */
1230     source = rtp_source_new (ssrc);
1231
1232     /* for RTP packets we need to set the source in probation. Receiving RTCP
1233      * packets of an SSRC, on the other hand, is a strong indication that we
1234      * are dealing with a valid source. */
1235     if (rtp)
1236       source->probation = RTP_DEFAULT_PROBATION;
1237     else
1238       source->probation = 0;
1239
1240     /* store from address, if any */
1241     if (arrival->have_address) {
1242       if (rtp)
1243         rtp_source_set_rtp_from (source, &arrival->address);
1244       else
1245         rtp_source_set_rtcp_from (source, &arrival->address);
1246     }
1247
1248     /* configure a callback on the source */
1249     rtp_source_set_callbacks (source, &callbacks, sess);
1250
1251     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1252         source);
1253
1254     /* we have one more source now */
1255     sess->total_sources++;
1256     *created = TRUE;
1257   } else {
1258     *created = FALSE;
1259     /* check for collision, this updates the address when not previously set */
1260     if (check_collision (sess, source, arrival, rtp)) {
1261       return NULL;
1262     }
1263   }
1264   /* update last activity */
1265   source->last_activity = arrival->current_time;
1266   if (rtp)
1267     source->last_rtp_activity = arrival->current_time;
1268   g_object_ref (source);
1269
1270   return source;
1271 }
1272
1273 /**
1274  * rtp_session_get_internal_source:
1275  * @sess: a #RTPSession
1276  *
1277  * Get the internal #RTPSource of @sess.
1278  *
1279  * Returns: The internal #RTPSource. g_object_unref() after usage.
1280  */
1281 RTPSource *
1282 rtp_session_get_internal_source (RTPSession * sess)
1283 {
1284   RTPSource *result;
1285
1286   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1287
1288   result = g_object_ref (sess->source);
1289
1290   return result;
1291 }
1292
1293 /**
1294  * rtp_session_set_internal_ssrc:
1295  * @sess: a #RTPSession
1296  * @ssrc: an SSRC
1297  *
1298  * Set the SSRC of @sess to @ssrc.
1299  */
1300 void
1301 rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
1302 {
1303   RTP_SESSION_LOCK (sess);
1304   if (ssrc != sess->source->ssrc) {
1305     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
1306         GINT_TO_POINTER (sess->source->ssrc));
1307
1308     GST_DEBUG ("setting internal SSRC to %08x", ssrc);
1309     /* After this call, any receiver of the old SSRC either in RTP or RTCP
1310      * packets will timeout on the old SSRC, we could potentially schedule a
1311      * BYE RTCP for the old SSRC... */
1312     sess->source->ssrc = ssrc;
1313     rtp_source_reset (sess->source);
1314
1315     /* rehash with the new SSRC */
1316     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1317         GINT_TO_POINTER (sess->source->ssrc), sess->source);
1318   }
1319   RTP_SESSION_UNLOCK (sess);
1320
1321   g_object_notify (G_OBJECT (sess), "internal-ssrc");
1322 }
1323
1324 /**
1325  * rtp_session_get_internal_ssrc:
1326  * @sess: a #RTPSession
1327  *
1328  * Get the internal SSRC of @sess.
1329  *
1330  * Returns: The SSRC of the session.
1331  */
1332 guint32
1333 rtp_session_get_internal_ssrc (RTPSession * sess)
1334 {
1335   guint32 ssrc;
1336
1337   RTP_SESSION_LOCK (sess);
1338   ssrc = sess->source->ssrc;
1339   RTP_SESSION_UNLOCK (sess);
1340
1341   return ssrc;
1342 }
1343
1344 /**
1345  * rtp_session_add_source:
1346  * @sess: a #RTPSession
1347  * @src: #RTPSource to add
1348  *
1349  * Add @src to @session.
1350  *
1351  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1352  * existed in the session.
1353  */
1354 gboolean
1355 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1356 {
1357   gboolean result = FALSE;
1358   RTPSource *find;
1359
1360   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1361   g_return_val_if_fail (src != NULL, FALSE);
1362
1363   RTP_SESSION_LOCK (sess);
1364   find =
1365       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1366       GINT_TO_POINTER (src->ssrc));
1367   if (find == NULL) {
1368     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1369         GINT_TO_POINTER (src->ssrc), src);
1370     /* we have one more source now */
1371     sess->total_sources++;
1372     result = TRUE;
1373   }
1374   RTP_SESSION_UNLOCK (sess);
1375
1376   return result;
1377 }
1378
1379 /**
1380  * rtp_session_get_num_sources:
1381  * @sess: an #RTPSession
1382  *
1383  * Get the number of sources in @sess.
1384  *
1385  * Returns: The number of sources in @sess.
1386  */
1387 guint
1388 rtp_session_get_num_sources (RTPSession * sess)
1389 {
1390   guint result;
1391
1392   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1393
1394   RTP_SESSION_LOCK (sess);
1395   result = sess->total_sources;
1396   RTP_SESSION_UNLOCK (sess);
1397
1398   return result;
1399 }
1400
1401 /**
1402  * rtp_session_get_num_active_sources:
1403  * @sess: an #RTPSession
1404  *
1405  * Get the number of active sources in @sess. A source is considered active when
1406  * it has been validated and has not yet received a BYE RTCP message.
1407  *
1408  * Returns: The number of active sources in @sess.
1409  */
1410 guint
1411 rtp_session_get_num_active_sources (RTPSession * sess)
1412 {
1413   guint result;
1414
1415   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1416
1417   RTP_SESSION_LOCK (sess);
1418   result = sess->stats.active_sources;
1419   RTP_SESSION_UNLOCK (sess);
1420
1421   return result;
1422 }
1423
1424 /**
1425  * rtp_session_get_source_by_ssrc:
1426  * @sess: an #RTPSession
1427  * @ssrc: an SSRC
1428  *
1429  * Find the source with @ssrc in @sess.
1430  *
1431  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1432  * g_object_unref() after usage.
1433  */
1434 RTPSource *
1435 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1436 {
1437   RTPSource *result;
1438
1439   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1440
1441   RTP_SESSION_LOCK (sess);
1442   result =
1443       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1444   if (result)
1445     g_object_ref (result);
1446   RTP_SESSION_UNLOCK (sess);
1447
1448   return result;
1449 }
1450
1451 /**
1452  * rtp_session_get_source_by_cname:
1453  * @sess: a #RTPSession
1454  * @cname: an CNAME
1455  *
1456  * Find the source with @cname in @sess.
1457  *
1458  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
1459  * g_object_unref() after usage.
1460  */
1461 RTPSource *
1462 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
1463 {
1464   RTPSource *result;
1465
1466   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1467   g_return_val_if_fail (cname != NULL, NULL);
1468
1469   RTP_SESSION_LOCK (sess);
1470   result = g_hash_table_lookup (sess->cnames, cname);
1471   if (result)
1472     g_object_ref (result);
1473   RTP_SESSION_UNLOCK (sess);
1474
1475   return result;
1476 }
1477
1478 /* should be called with the SESSION lock */
1479 static guint32
1480 rtp_session_create_new_ssrc (RTPSession * sess)
1481 {
1482   guint32 ssrc;
1483
1484   while (TRUE) {
1485     ssrc = g_random_int ();
1486
1487     /* see if it exists in the session, we're done if it doesn't */
1488     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1489             GINT_TO_POINTER (ssrc)) == NULL)
1490       break;
1491   }
1492   return ssrc;
1493 }
1494
1495
1496 /**
1497  * rtp_session_create_source:
1498  * @sess: an #RTPSession
1499  *
1500  * Create an #RTPSource for use in @sess. This function will create a source
1501  * with an ssrc that is currently not used by any participants in the session.
1502  *
1503  * Returns: an #RTPSource.
1504  */
1505 RTPSource *
1506 rtp_session_create_source (RTPSession * sess)
1507 {
1508   guint32 ssrc;
1509   RTPSource *source;
1510
1511   RTP_SESSION_LOCK (sess);
1512   ssrc = rtp_session_create_new_ssrc (sess);
1513   source = rtp_source_new (ssrc);
1514   rtp_source_set_callbacks (source, &callbacks, sess);
1515   /* we need an additional ref for the source in the hashtable */
1516   g_object_ref (source);
1517   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1518       source);
1519   /* we have one more source now */
1520   sess->total_sources++;
1521   RTP_SESSION_UNLOCK (sess);
1522
1523   return source;
1524 }
1525
1526 /* update the RTPArrivalStats structure with the current time and other bits
1527  * about the current buffer we are handling.
1528  * This function is typically called when a validated packet is received.
1529  * This function should be called with the SESSION_LOCK
1530  */
1531 static void
1532 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1533     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1534     GstClockTime running_time)
1535 {
1536   /* get time of arrival */
1537   arrival->current_time = current_time;
1538   arrival->running_time = running_time;
1539
1540   /* get packet size including header overhead */
1541   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
1542
1543   if (rtp) {
1544     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1545   } else {
1546     arrival->payload_len = 0;
1547   }
1548
1549   /* for netbuffer we can store the IP address to check for collisions */
1550   arrival->have_address = GST_IS_NETBUFFER (buffer);
1551   if (arrival->have_address) {
1552     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1553
1554     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1555   }
1556 }
1557
1558 /**
1559  * rtp_session_process_rtp:
1560  * @sess: and #RTPSession
1561  * @buffer: an RTP buffer
1562  * @current_time: the current system time
1563  * @running_time: the running_time of @buffer
1564  *
1565  * Process an RTP buffer in the session manager. This function takes ownership
1566  * of @buffer.
1567  *
1568  * Returns: a #GstFlowReturn.
1569  */
1570 GstFlowReturn
1571 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1572     GstClockTime current_time, GstClockTime running_time)
1573 {
1574   GstFlowReturn result;
1575   guint32 ssrc;
1576   RTPSource *source;
1577   gboolean created;
1578   gboolean prevsender, prevactive;
1579   RTPArrivalStats arrival;
1580   guint32 csrcs[16];
1581   guint8 i, count;
1582   guint64 oldrate;
1583
1584   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1585   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1586
1587   if (!gst_rtp_buffer_validate (buffer))
1588     goto invalid_packet;
1589
1590   RTP_SESSION_LOCK (sess);
1591   /* update arrival stats */
1592   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1593       running_time);
1594
1595   /* ignore more RTP packets when we left the session */
1596   if (sess->source->received_bye)
1597     goto ignore;
1598
1599   /* get SSRC and look up in session database */
1600   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1601   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1602   if (!source)
1603     goto collision;
1604
1605   prevsender = RTP_SOURCE_IS_SENDER (source);
1606   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1607   oldrate = source->bitrate;
1608
1609   /* copy available csrc for later */
1610   count = gst_rtp_buffer_get_csrc_count (buffer);
1611   /* make sure to not overflow our array. An RTP buffer can maximally contain
1612    * 16 CSRCs */
1613   count = MIN (count, 16);
1614
1615   for (i = 0; i < count; i++)
1616     csrcs[i] = gst_rtp_buffer_get_csrc (buffer, i);
1617
1618   /* let source process the packet */
1619   result = rtp_source_process_rtp (source, buffer, &arrival);
1620
1621   /* source became active */
1622   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1623     sess->stats.active_sources++;
1624     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1625         sess->stats.active_sources);
1626     on_ssrc_validated (sess, source);
1627   }
1628   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1629     sess->stats.sender_sources++;
1630     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1631         sess->stats.sender_sources);
1632   }
1633   if (oldrate != source->bitrate)
1634     sess->recalc_bandwidth = TRUE;
1635
1636   if (created)
1637     on_new_ssrc (sess, source);
1638
1639   if (source->validated) {
1640     gboolean created;
1641
1642     /* for validated sources, we add the CSRCs as well */
1643     for (i = 0; i < count; i++) {
1644       guint32 csrc;
1645       RTPSource *csrc_src;
1646
1647       csrc = csrcs[i];
1648
1649       /* get source */
1650       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1651       if (!csrc_src)
1652         continue;
1653
1654       if (created) {
1655         GST_DEBUG ("created new CSRC: %08x", csrc);
1656         rtp_source_set_as_csrc (csrc_src);
1657         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1658           sess->stats.active_sources++;
1659         on_new_ssrc (sess, csrc_src);
1660       }
1661       g_object_unref (csrc_src);
1662     }
1663   }
1664   g_object_unref (source);
1665
1666   RTP_SESSION_UNLOCK (sess);
1667
1668   return result;
1669
1670   /* ERRORS */
1671 invalid_packet:
1672   {
1673     gst_buffer_unref (buffer);
1674     GST_DEBUG ("invalid RTP packet received");
1675     return GST_FLOW_OK;
1676   }
1677 ignore:
1678   {
1679     gst_buffer_unref (buffer);
1680     RTP_SESSION_UNLOCK (sess);
1681     GST_DEBUG ("ignoring RTP packet because we are leaving");
1682     return GST_FLOW_OK;
1683   }
1684 collision:
1685   {
1686     gst_buffer_unref (buffer);
1687     RTP_SESSION_UNLOCK (sess);
1688     GST_DEBUG ("ignoring packet because its collisioning");
1689     return GST_FLOW_OK;
1690   }
1691 }
1692
1693 static void
1694 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1695     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1696 {
1697   guint count, i;
1698
1699   count = gst_rtcp_packet_get_rb_count (packet);
1700   for (i = 0; i < count; i++) {
1701     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1702     guint8 fractionlost;
1703     gint32 packetslost;
1704
1705     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1706         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1707
1708     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1709
1710     if (ssrc == sess->source->ssrc) {
1711       /* only deal with report blocks for our session, we update the stats of
1712        * the sender of the RTCP message. We could also compare our stats against
1713        * the other sender to see if we are better or worse. */
1714       rtp_source_process_rb (source, arrival->current_time, fractionlost,
1715           packetslost, exthighestseq, jitter, lsr, dlsr);
1716     }
1717   }
1718   on_ssrc_active (sess, source);
1719 }
1720
1721 /* A Sender report contains statistics about how the sender is doing. This
1722  * includes timing informataion such as the relation between RTP and NTP
1723  * timestamps and the number of packets/bytes it sent to us.
1724  *
1725  * In this report is also included a set of report blocks related to how this
1726  * sender is receiving data (in case we (or somebody else) is also sending stuff
1727  * to it). This info includes the packet loss, jitter and seqnum. It also
1728  * contains information to calculate the round trip time (LSR/DLSR).
1729  */
1730 static void
1731 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1732     RTPArrivalStats * arrival, gboolean * do_sync)
1733 {
1734   guint32 senderssrc, rtptime, packet_count, octet_count;
1735   guint64 ntptime;
1736   RTPSource *source;
1737   gboolean created, prevsender;
1738
1739   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1740       &packet_count, &octet_count);
1741
1742   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1743       senderssrc, GST_TIME_ARGS (arrival->current_time));
1744
1745   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1746   if (!source)
1747     return;
1748
1749   /* don't try to do lip-sync for sources that sent a BYE */
1750   if (rtp_source_received_bye (source))
1751     *do_sync = FALSE;
1752   else
1753     *do_sync = TRUE;
1754
1755   prevsender = RTP_SOURCE_IS_SENDER (source);
1756
1757   /* first update the source */
1758   rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
1759       packet_count, octet_count);
1760
1761   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1762     sess->stats.sender_sources++;
1763     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1764         sess->stats.sender_sources);
1765   }
1766
1767   if (created)
1768     on_new_ssrc (sess, source);
1769
1770   rtp_session_process_rb (sess, source, packet, arrival);
1771   g_object_unref (source);
1772 }
1773
1774 /* A receiver report contains statistics about how a receiver is doing. It
1775  * includes stuff like packet loss, jitter and the seqnum it received last. It
1776  * also contains info to calculate the round trip time.
1777  *
1778  * We are only interested in how the sender of this report is doing wrt to us.
1779  */
1780 static void
1781 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1782     RTPArrivalStats * arrival)
1783 {
1784   guint32 senderssrc;
1785   RTPSource *source;
1786   gboolean created;
1787
1788   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1789
1790   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1791
1792   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1793   if (!source)
1794     return;
1795
1796   if (created)
1797     on_new_ssrc (sess, source);
1798
1799   rtp_session_process_rb (sess, source, packet, arrival);
1800   g_object_unref (source);
1801 }
1802
1803 /* Get SDES items and store them in the SSRC */
1804 static void
1805 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1806     RTPArrivalStats * arrival)
1807 {
1808   guint items, i, j;
1809   gboolean more_items, more_entries;
1810
1811   items = gst_rtcp_packet_sdes_get_item_count (packet);
1812   GST_DEBUG ("got SDES packet with %d items", items);
1813
1814   more_items = gst_rtcp_packet_sdes_first_item (packet);
1815   i = 0;
1816   while (more_items) {
1817     guint32 ssrc;
1818     gboolean changed, created, validated;
1819     RTPSource *source;
1820     GstStructure *sdes;
1821
1822     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1823
1824     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1825
1826     changed = FALSE;
1827
1828     /* find src, no probation when dealing with RTCP */
1829     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1830     if (!source)
1831       return;
1832
1833     sdes = gst_structure_new ("application/x-rtp-source-sdes", NULL);
1834
1835     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1836     j = 0;
1837     while (more_entries) {
1838       GstRTCPSDESType type;
1839       guint8 len;
1840       guint8 *data;
1841       gchar *name;
1842       gchar *value;
1843
1844       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1845
1846       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1847           data);
1848
1849       if (type == GST_RTCP_SDES_PRIV) {
1850         name = g_strndup ((const gchar *) &data[1], data[0]);
1851         len -= data[0] + 1;
1852         data += data[0] + 1;
1853       } else {
1854         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
1855       }
1856
1857       value = g_strndup ((const gchar *) data, len);
1858
1859       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
1860
1861       g_free (name);
1862       g_free (value);
1863
1864       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1865       j++;
1866     }
1867
1868     /* takes ownership of sdes */
1869     changed = rtp_source_set_sdes_struct (source, sdes);
1870
1871     validated = !RTP_SOURCE_IS_ACTIVE (source);
1872     source->validated = TRUE;
1873
1874     if (created)
1875       on_new_ssrc (sess, source);
1876     if (validated)
1877       on_ssrc_validated (sess, source);
1878     if (changed)
1879       on_ssrc_sdes (sess, source);
1880
1881     g_object_unref (source);
1882
1883     more_items = gst_rtcp_packet_sdes_next_item (packet);
1884     i++;
1885   }
1886 }
1887
1888 /* BYE is sent when a client leaves the session
1889  */
1890 static void
1891 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1892     RTPArrivalStats * arrival)
1893 {
1894   guint count, i;
1895   gchar *reason;
1896   gboolean reconsider = FALSE;
1897
1898   reason = gst_rtcp_packet_bye_get_reason (packet);
1899   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1900
1901   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1902   for (i = 0; i < count; i++) {
1903     guint32 ssrc;
1904     RTPSource *source;
1905     gboolean created, prevactive, prevsender;
1906     guint pmembers, members;
1907
1908     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1909     GST_DEBUG ("SSRC: %08x", ssrc);
1910
1911     /* find src and mark bye, no probation when dealing with RTCP */
1912     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1913     if (!source)
1914       return;
1915
1916     /* store time for when we need to time out this source */
1917     source->bye_time = arrival->current_time;
1918
1919     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1920     prevsender = RTP_SOURCE_IS_SENDER (source);
1921
1922     /* let the source handle the rest */
1923     rtp_source_process_bye (source, reason);
1924
1925     pmembers = sess->stats.active_sources;
1926
1927     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1928       sess->stats.active_sources--;
1929       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1930           sess->stats.active_sources);
1931     }
1932     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1933       sess->stats.sender_sources--;
1934       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1935           sess->stats.sender_sources);
1936     }
1937     members = sess->stats.active_sources;
1938
1939     if (!sess->source->received_bye && members < pmembers) {
1940       /* some members went away since the previous timeout estimate.
1941        * Perform reverse reconsideration but only when we are not scheduling a
1942        * BYE ourselves. */
1943       if (arrival->current_time < sess->next_rtcp_check_time) {
1944         GstClockTime time_remaining;
1945
1946         time_remaining = sess->next_rtcp_check_time - arrival->current_time;
1947         sess->next_rtcp_check_time =
1948             gst_util_uint64_scale (time_remaining, members, pmembers);
1949
1950         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1951             GST_TIME_ARGS (sess->next_rtcp_check_time));
1952
1953         sess->next_rtcp_check_time += arrival->current_time;
1954
1955         /* mark pending reconsider. We only want to signal the reconsideration
1956          * once after we handled all the source in the bye packet */
1957         reconsider = TRUE;
1958       }
1959     }
1960
1961     if (created)
1962       on_new_ssrc (sess, source);
1963
1964     on_bye_ssrc (sess, source);
1965
1966     g_object_unref (source);
1967   }
1968   if (reconsider) {
1969     RTP_SESSION_UNLOCK (sess);
1970     /* notify app of reconsideration */
1971     if (sess->callbacks.reconsider)
1972       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
1973     RTP_SESSION_LOCK (sess);
1974   }
1975   g_free (reason);
1976 }
1977
1978 static void
1979 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1980     RTPArrivalStats * arrival)
1981 {
1982   GST_DEBUG ("received APP");
1983 }
1984
1985
1986 static void
1987 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
1988     RTPArrivalStats * arrival)
1989 {
1990   GstRTCPType type = gst_rtcp_packet_get_type (packet);
1991   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
1992   guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
1993   guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
1994   guint length = 4 * (gst_rtcp_packet_get_length (packet) - 2);
1995
1996   GST_DEBUG ("received feedback %d:%d from %08X about %08X"
1997       " with FCI of length %d", type, fbtype, sender_ssrc, media_ssrc, length);
1998
1999   if (g_signal_has_handler_pending (sess,
2000           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2001     GstBuffer *fci = NULL;
2002
2003     if (length) {
2004       fci = gst_buffer_create_sub (packet->buffer, packet->offset + 72, length);
2005       GST_BUFFER_TIMESTAMP (fci) = arrival->running_time;
2006     }
2007
2008     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2009         type, fbtype, sender_ssrc, media_ssrc, fci);
2010
2011     if (fci)
2012       gst_buffer_unref (fci);
2013   }
2014 }
2015
2016 /**
2017  * rtp_session_process_rtcp:
2018  * @sess: and #RTPSession
2019  * @buffer: an RTCP buffer
2020  * @current_time: the current system time
2021  *
2022  * Process an RTCP buffer in the session manager. This function takes ownership
2023  * of @buffer.
2024  *
2025  * Returns: a #GstFlowReturn.
2026  */
2027 GstFlowReturn
2028 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2029     GstClockTime current_time)
2030 {
2031   GstRTCPPacket packet;
2032   gboolean more, is_bye = FALSE, do_sync = FALSE;
2033   RTPArrivalStats arrival;
2034   GstFlowReturn result = GST_FLOW_OK;
2035
2036   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2037   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2038
2039   if (!gst_rtcp_buffer_validate (buffer))
2040     goto invalid_packet;
2041
2042   GST_DEBUG ("received RTCP packet");
2043
2044   RTP_SESSION_LOCK (sess);
2045   /* update arrival stats */
2046   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1);
2047
2048   if (sess->sent_bye)
2049     goto ignore;
2050
2051   /* start processing the compound packet */
2052   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
2053   while (more) {
2054     GstRTCPType type;
2055
2056     type = gst_rtcp_packet_get_type (&packet);
2057
2058     /* when we are leaving the session, we should ignore all non-BYE messages */
2059     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
2060       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
2061       goto next;
2062     }
2063
2064     switch (type) {
2065       case GST_RTCP_TYPE_SR:
2066         rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
2067         break;
2068       case GST_RTCP_TYPE_RR:
2069         rtp_session_process_rr (sess, &packet, &arrival);
2070         break;
2071       case GST_RTCP_TYPE_SDES:
2072         rtp_session_process_sdes (sess, &packet, &arrival);
2073         break;
2074       case GST_RTCP_TYPE_BYE:
2075         is_bye = TRUE;
2076         /* don't try to attempt lip-sync anymore for streams with a BYE */
2077         do_sync = FALSE;
2078         rtp_session_process_bye (sess, &packet, &arrival);
2079         break;
2080       case GST_RTCP_TYPE_APP:
2081         rtp_session_process_app (sess, &packet, &arrival);
2082         break;
2083       case GST_RTCP_TYPE_RTPFB:
2084       case GST_RTCP_TYPE_PSFB:
2085         rtp_session_process_feedback (sess, &packet, &arrival);
2086         break;
2087       default:
2088         GST_WARNING ("got unknown RTCP packet");
2089         break;
2090     }
2091   next:
2092     more = gst_rtcp_packet_move_to_next (&packet);
2093   }
2094
2095   /* if we are scheduling a BYE, we only want to count bye packets, else we
2096    * count everything */
2097   if (sess->source->received_bye) {
2098     if (is_bye) {
2099       sess->stats.bye_members++;
2100       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2101     }
2102   } else {
2103     /* keep track of average packet size */
2104     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2105   }
2106   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2107       sess->stats.avg_rtcp_packet_size, arrival.bytes);
2108   RTP_SESSION_UNLOCK (sess);
2109
2110   /* notify caller of sr packets in the callback */
2111   if (do_sync && sess->callbacks.sync_rtcp) {
2112     /* make writable, we might want to change the buffer */
2113     buffer = gst_buffer_make_metadata_writable (buffer);
2114
2115     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
2116         sess->sync_rtcp_user_data);
2117   } else
2118     gst_buffer_unref (buffer);
2119
2120   return result;
2121
2122   /* ERRORS */
2123 invalid_packet:
2124   {
2125     GST_DEBUG ("invalid RTCP packet received");
2126     gst_buffer_unref (buffer);
2127     return GST_FLOW_OK;
2128   }
2129 ignore:
2130   {
2131     gst_buffer_unref (buffer);
2132     RTP_SESSION_UNLOCK (sess);
2133     GST_DEBUG ("ignoring RTP packet because we left");
2134     return GST_FLOW_OK;
2135   }
2136 }
2137
2138 /**
2139  * rtp_session_send_rtp:
2140  * @sess: an #RTPSession
2141  * @data: pointer to either an RTP buffer or a list of RTP buffers
2142  * @is_list: TRUE when @data is a buffer list
2143  * @current_time: the current system time
2144  * @running_time: the running time of @data
2145  *
2146  * Send the RTP buffer in the session manager. This function takes ownership of
2147  * @buffer.
2148  *
2149  * Returns: a #GstFlowReturn.
2150  */
2151 GstFlowReturn
2152 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2153     GstClockTime current_time, GstClockTime running_time)
2154 {
2155   GstFlowReturn result;
2156   RTPSource *source;
2157   gboolean prevsender;
2158   gboolean valid_packet;
2159   guint64 oldrate;
2160
2161   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2162   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2163
2164   if (is_list) {
2165     valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data));
2166   } else {
2167     valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data));
2168   }
2169
2170   if (!valid_packet)
2171     goto invalid_packet;
2172
2173   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2174
2175   RTP_SESSION_LOCK (sess);
2176   source = sess->source;
2177
2178   /* update last activity */
2179   source->last_rtp_activity = current_time;
2180
2181   prevsender = RTP_SOURCE_IS_SENDER (source);
2182   oldrate = source->bitrate;
2183
2184   /* we use our own source to send */
2185   result = rtp_source_send_rtp (source, data, is_list, running_time);
2186
2187   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
2188     sess->stats.sender_sources++;
2189   if (oldrate != source->bitrate)
2190     sess->recalc_bandwidth = TRUE;
2191   RTP_SESSION_UNLOCK (sess);
2192
2193   return result;
2194
2195   /* ERRORS */
2196 invalid_packet:
2197   {
2198     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2199     GST_DEBUG ("invalid RTP packet received");
2200     return GST_FLOW_OK;
2201   }
2202 }
2203
2204 static void
2205 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2206 {
2207   *bandwidth += source->bitrate;
2208 }
2209
2210 static GstClockTime
2211 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2212     gboolean first)
2213 {
2214   GstClockTime result;
2215
2216   /* recalculate bandwidth when it changed */
2217   if (sess->recalc_bandwidth) {
2218     gdouble bandwidth;
2219
2220     if (sess->bandwidth > 0)
2221       bandwidth = sess->bandwidth;
2222     else {
2223       /* If it is <= 0, then try to estimate the actual bandwidth */
2224       bandwidth = sess->source->bitrate;
2225
2226       g_hash_table_foreach (sess->cnames, (GHFunc) add_bitrates, &bandwidth);
2227       bandwidth /= 8.0;
2228     }
2229     if (bandwidth == 0)
2230       bandwidth = RTP_STATS_BANDWIDTH;
2231
2232     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2233         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2234
2235     sess->recalc_bandwidth = FALSE;
2236   }
2237
2238   if (sess->source->received_bye) {
2239     result = rtp_stats_calculate_bye_interval (&sess->stats);
2240   } else {
2241     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
2242         RTP_SOURCE_IS_SENDER (sess->source), first);
2243   }
2244
2245   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2246       GST_TIME_ARGS (result), first);
2247
2248   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2249     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
2250
2251   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2252
2253   return result;
2254 }
2255
2256 /* Stop the current @sess and schedule a BYE message for the other members.
2257  * One must have the session lock to call this function
2258  */
2259 static GstFlowReturn
2260 rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason,
2261     GstClockTime current_time)
2262 {
2263   GstFlowReturn result = GST_FLOW_OK;
2264   RTPSource *source;
2265   GstClockTime interval;
2266
2267   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2268
2269   source = sess->source;
2270
2271   /* ignore more BYEs */
2272   if (source->received_bye)
2273     goto done;
2274
2275   /* we have BYE now */
2276   source->received_bye = TRUE;
2277   /* at least one member wants to send a BYE */
2278   g_free (sess->bye_reason);
2279   sess->bye_reason = g_strdup (reason);
2280   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
2281   sess->stats.bye_members = 1;
2282   sess->first_rtcp = TRUE;
2283   sess->sent_bye = FALSE;
2284   sess->allow_early = TRUE;
2285
2286   /* reschedule transmission */
2287   sess->last_rtcp_send_time = current_time;
2288   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2289   sess->next_rtcp_check_time = current_time + interval;
2290
2291   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2292       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2293
2294   RTP_SESSION_UNLOCK (sess);
2295   /* notify app of reconsideration */
2296   if (sess->callbacks.reconsider)
2297     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2298   RTP_SESSION_LOCK (sess);
2299 done:
2300
2301   return result;
2302 }
2303
2304 /**
2305  * rtp_session_schedule_bye:
2306  * @sess: an #RTPSession
2307  * @reason: a reason or NULL
2308  * @current_time: the current system time
2309  *
2310  * Stop the current @sess and schedule a BYE message for the other members.
2311  *
2312  * Returns: a #GstFlowReturn.
2313  */
2314 GstFlowReturn
2315 rtp_session_schedule_bye (RTPSession * sess, const gchar * reason,
2316     GstClockTime current_time)
2317 {
2318   GstFlowReturn result = GST_FLOW_OK;
2319
2320   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2321
2322   RTP_SESSION_LOCK (sess);
2323   result = rtp_session_schedule_bye_locked (sess, reason, current_time);
2324   RTP_SESSION_UNLOCK (sess);
2325
2326   return result;
2327 }
2328
2329 /**
2330  * rtp_session_next_timeout:
2331  * @sess: an #RTPSession
2332  * @current_time: the current system time
2333  *
2334  * Get the next time we should perform session maintenance tasks.
2335  *
2336  * Returns: a time when rtp_session_on_timeout() should be called with the
2337  * current system time.
2338  */
2339 GstClockTime
2340 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2341 {
2342   GstClockTime result, interval = 0;
2343
2344   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2345
2346   RTP_SESSION_LOCK (sess);
2347
2348   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
2349     result = sess->next_early_rtcp_time;
2350     goto early_exit;
2351   }
2352
2353   result = sess->next_rtcp_check_time;
2354
2355   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
2356       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2357
2358   if (result < current_time) {
2359     GST_DEBUG ("take current time as base");
2360     /* our previous check time expired, start counting from the current time
2361      * again. */
2362     result = current_time;
2363   }
2364
2365   if (sess->source->received_bye) {
2366     if (sess->sent_bye) {
2367       GST_DEBUG ("we sent BYE already");
2368       interval = GST_CLOCK_TIME_NONE;
2369     } else if (sess->stats.active_sources >= 50) {
2370       GST_DEBUG ("reconsider BYE, more than 50 sources");
2371       /* reconsider BYE if members >= 50 */
2372       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2373     }
2374   } else {
2375     if (sess->first_rtcp) {
2376       GST_DEBUG ("first RTCP packet");
2377       /* we are called for the first time */
2378       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2379     } else if (sess->next_rtcp_check_time < current_time) {
2380       GST_DEBUG ("old check time expired, getting new timeout");
2381       /* get a new timeout when we need to */
2382       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2383     }
2384   }
2385
2386   if (interval != GST_CLOCK_TIME_NONE)
2387     result += interval;
2388   else
2389     result = GST_CLOCK_TIME_NONE;
2390
2391   sess->next_rtcp_check_time = result;
2392
2393 early_exit:
2394
2395   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2396       ", next time: %" GST_TIME_FORMAT,
2397       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2398   RTP_SESSION_UNLOCK (sess);
2399
2400   return result;
2401 }
2402
2403 typedef struct
2404 {
2405   RTPSession *sess;
2406   GstBuffer *rtcp;
2407   GstClockTime current_time;
2408   guint64 ntpnstime;
2409   GstClockTime running_time;
2410   GstClockTime interval;
2411   GstRTCPPacket packet;
2412   gboolean is_bye;
2413   gboolean has_sdes;
2414   gboolean is_early;
2415   gboolean may_suppress;
2416 } ReportData;
2417
2418 static void
2419 session_start_rtcp (RTPSession * sess, ReportData * data)
2420 {
2421   GstRTCPPacket *packet = &data->packet;
2422   RTPSource *own = sess->source;
2423
2424   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2425
2426   if (RTP_SOURCE_IS_SENDER (own)) {
2427     guint64 ntptime;
2428     guint32 rtptime;
2429     guint32 packet_count, octet_count;
2430
2431     /* we are a sender, create SR */
2432     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2433     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
2434
2435     /* get latest stats */
2436     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
2437         &ntptime, &rtptime, &packet_count, &octet_count);
2438     /* store stats */
2439     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2440         packet_count, octet_count);
2441
2442     /* fill in sender report info */
2443     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2444         ntptime, rtptime, packet_count, octet_count);
2445   } else {
2446     /* we are only receiver, create RR */
2447     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2448     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
2449     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2450   }
2451 }
2452
2453 /* construct a Sender or Receiver Report */
2454 static void
2455 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2456 {
2457   RTPSession *sess = data->sess;
2458   GstRTCPPacket *packet = &data->packet;
2459
2460   /* create a new buffer if needed */
2461   if (data->rtcp == NULL) {
2462     session_start_rtcp (sess, data);
2463   } else if (data->is_early) {
2464     /* Put a single RR or SR in minimal compound packets */
2465     return;
2466   }
2467   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
2468     /* only report about other sender sources */
2469     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
2470       guint8 fractionlost;
2471       gint32 packetslost;
2472       guint32 exthighestseq, jitter;
2473       guint32 lsr, dlsr;
2474
2475       /* get new stats */
2476       rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2477           &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2478
2479       /* store last generated RR packet */
2480       source->last_rr.is_valid = TRUE;
2481       source->last_rr.fractionlost = fractionlost;
2482       source->last_rr.packetslost = packetslost;
2483       source->last_rr.exthighestseq = exthighestseq;
2484       source->last_rr.jitter = jitter;
2485       source->last_rr.lsr = lsr;
2486       source->last_rr.dlsr = dlsr;
2487
2488       /* packet is not yet filled, add report block for this source. */
2489       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2490           exthighestseq, jitter, lsr, dlsr);
2491     }
2492   }
2493 }
2494
2495 /* perform cleanup of sources that timed out */
2496 static void
2497 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2498 {
2499   gboolean remove = FALSE;
2500   gboolean byetimeout = FALSE;
2501   gboolean sendertimeout = FALSE;
2502   gboolean is_sender, is_active;
2503   RTPSession *sess = data->sess;
2504   GstClockTime interval;
2505
2506   is_sender = RTP_SOURCE_IS_SENDER (source);
2507   is_active = RTP_SOURCE_IS_ACTIVE (source);
2508
2509   /* check for our own source, we don't want to delete our own source. */
2510   if (!(source == sess->source)) {
2511     if (source->received_bye) {
2512       /* if we received a BYE from the source, remove the source after some
2513        * time. */
2514       if (data->current_time > source->bye_time &&
2515           data->current_time - source->bye_time > sess->stats.bye_timeout) {
2516         GST_DEBUG ("removing BYE source %08x", source->ssrc);
2517         remove = TRUE;
2518         byetimeout = TRUE;
2519       }
2520     }
2521     /* sources that were inactive for more than 5 times the deterministic reporting
2522      * interval get timed out. the min timeout is 5 seconds. */
2523     if (data->current_time > source->last_activity) {
2524       interval = MAX (data->interval * 5, 5 * GST_SECOND);
2525       if (data->current_time - source->last_activity > interval) {
2526         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2527             source->ssrc, GST_TIME_ARGS (source->last_activity));
2528         remove = TRUE;
2529       }
2530     }
2531   }
2532
2533   /* senders that did not send for a long time become a receiver, this also
2534    * holds for our own source. */
2535   if (is_sender) {
2536     if (data->current_time > source->last_rtp_activity) {
2537       interval = MAX (data->interval * 2, 5 * GST_SECOND);
2538       if (data->current_time - source->last_rtp_activity > interval) {
2539         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
2540             GST_TIME_FORMAT, source->ssrc,
2541             GST_TIME_ARGS (source->last_rtp_activity));
2542         source->is_sender = FALSE;
2543         sess->stats.sender_sources--;
2544         sendertimeout = TRUE;
2545       }
2546     }
2547   }
2548
2549   if (remove) {
2550     sess->total_sources--;
2551     if (is_sender)
2552       sess->stats.sender_sources--;
2553     if (is_active)
2554       sess->stats.active_sources--;
2555
2556     if (byetimeout)
2557       on_bye_timeout (sess, source);
2558     else
2559       on_timeout (sess, source);
2560   } else {
2561     if (sendertimeout)
2562       on_sender_timeout (sess, source);
2563   }
2564
2565   source->closing = remove;
2566 }
2567
2568 static void
2569 session_sdes (RTPSession * sess, ReportData * data)
2570 {
2571   GstRTCPPacket *packet = &data->packet;
2572   const GstStructure *sdes;
2573   gint i, n_fields;
2574
2575   /* add SDES packet */
2576   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
2577
2578   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
2579
2580   sdes = rtp_source_get_sdes_struct (sess->source);
2581
2582   /* add all fields in the structure, the order is not important. */
2583   n_fields = gst_structure_n_fields (sdes);
2584   for (i = 0; i < n_fields; ++i) {
2585     const gchar *field;
2586     const gchar *value;
2587     GstRTCPSDESType type;
2588
2589     field = gst_structure_nth_field_name (sdes, i);
2590     if (field == NULL)
2591       continue;
2592     value = gst_structure_get_string (sdes, field);
2593     if (value == NULL)
2594       continue;
2595     type = gst_rtcp_sdes_name_to_type (field);
2596
2597     /* Early packets are minimal and only include the CNAME */
2598     if (data->is_early && type != GST_RTCP_SDES_CNAME)
2599       continue;
2600
2601     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
2602       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
2603           (const guint8 *) value);
2604     } else if (type == GST_RTCP_SDES_PRIV) {
2605       gsize prefix_len;
2606       gsize value_len;
2607       gsize data_len;
2608       guint8 data[256];
2609
2610       /* don't accept entries that are too big */
2611       prefix_len = strlen (field);
2612       if (prefix_len > 255)
2613         continue;
2614       value_len = strlen (value);
2615       if (value_len > 255)
2616         continue;
2617       data_len = 1 + prefix_len + value_len;
2618       if (data_len > 255)
2619         continue;
2620
2621       data[0] = prefix_len;
2622       memcpy (&data[1], field, prefix_len);
2623       memcpy (&data[1 + prefix_len], value, value_len);
2624
2625       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
2626     }
2627   }
2628
2629   data->has_sdes = TRUE;
2630 }
2631
2632 /* schedule a BYE packet */
2633 static void
2634 session_bye (RTPSession * sess, ReportData * data)
2635 {
2636   GstRTCPPacket *packet = &data->packet;
2637
2638   /* open packet */
2639   session_start_rtcp (sess, data);
2640
2641   /* add SDES */
2642   session_sdes (sess, data);
2643
2644   /* add a BYE packet */
2645   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
2646   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
2647   if (sess->bye_reason)
2648     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
2649
2650   /* we have a BYE packet now */
2651   data->is_bye = TRUE;
2652 }
2653
2654 static gboolean
2655 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
2656 {
2657   GstClockTime new_send_time, elapsed;
2658
2659   if (data->is_early && sess->next_early_rtcp_time < current_time)
2660     goto early;
2661
2662   /* no need to check yet */
2663   if (sess->next_rtcp_check_time > current_time) {
2664     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
2665         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
2666         GST_TIME_ARGS (current_time));
2667     return FALSE;
2668   }
2669
2670   /* get elapsed time since we last reported */
2671   elapsed = current_time - sess->last_rtcp_send_time;
2672
2673   /* perform forward reconsideration */
2674   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
2675
2676   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
2677       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
2678
2679   new_send_time += sess->last_rtcp_send_time;
2680
2681   /* check if reconsideration */
2682   if (current_time < new_send_time) {
2683     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
2684         GST_TIME_ARGS (new_send_time));
2685     /* store new check time */
2686     sess->next_rtcp_check_time = new_send_time;
2687     return FALSE;
2688   }
2689
2690 early:
2691
2692   new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
2693
2694   GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
2695       GST_TIME_ARGS (new_send_time));
2696   sess->next_rtcp_check_time = current_time + new_send_time;
2697
2698   /* Apply the rules from RFC 4585 section 3.5.3 */
2699   if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
2700     GstClockTimeDiff T_rr_current_interval = g_random_double_range (0.5, 1.5) *
2701         sess->stats.min_interval;
2702
2703     /* This will caused the RTCP to be suppressed if no FB packets are added */
2704     if (sess->last_rtcp_send_time + T_rr_current_interval >
2705         sess->next_rtcp_check_time) {
2706       GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
2707           " last: %" GST_TIME_FORMAT
2708           " + T_rr_current_interval: %" GST_TIME_FORMAT
2709           " >  sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
2710           GST_TIME_ARGS (sess->stats.min_interval),
2711           GST_TIME_ARGS (sess->last_rtcp_send_time),
2712           GST_TIME_ARGS (T_rr_current_interval),
2713           GST_TIME_ARGS (sess->next_rtcp_check_time));
2714       data->may_suppress = TRUE;
2715     }
2716   }
2717
2718   return TRUE;
2719 }
2720
2721 static void
2722 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
2723 {
2724   g_hash_table_insert (hash_table, key, g_object_ref (source));
2725 }
2726
2727 static gboolean
2728 remove_closing_sources (const gchar * key, RTPSource * source, gpointer * data)
2729 {
2730   return source->closing;
2731 }
2732
2733 /**
2734  * rtp_session_on_timeout:
2735  * @sess: an #RTPSession
2736  * @current_time: the current system time
2737  * @ntpnstime: the current NTP time in nanoseconds
2738  * @running_time: the current running_time of the pipeline
2739  *
2740  * Perform maintenance actions after the timeout obtained with
2741  * rtp_session_next_timeout() expired.
2742  *
2743  * This function will perform timeouts of receivers and senders, send a BYE
2744  * packet or generate RTCP packets with current session stats.
2745  *
2746  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
2747  * times, for each packet that should be processed.
2748  *
2749  * Returns: a #GstFlowReturn.
2750  */
2751 GstFlowReturn
2752 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
2753     guint64 ntpnstime, GstClockTime running_time)
2754 {
2755   GstFlowReturn result = GST_FLOW_OK;
2756   ReportData data;
2757   RTPSource *own;
2758   GHashTable *table_copy;
2759   gboolean notify = FALSE;
2760
2761   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2762
2763   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
2764       GST_TIME_ARGS (current_time), GST_TIME_ARGS (ntpnstime));
2765
2766   data.sess = sess;
2767   data.rtcp = NULL;
2768   data.current_time = current_time;
2769   data.ntpnstime = ntpnstime;
2770   data.is_bye = FALSE;
2771   data.has_sdes = FALSE;
2772   data.may_suppress = FALSE;
2773   data.running_time = running_time;
2774
2775   own = sess->source;
2776
2777   RTP_SESSION_LOCK (sess);
2778   /* get a new interval, we need this for various cleanups etc */
2779   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
2780
2781   /* Make a local copy of the hashtable. We need to do this because the
2782    * cleanup stage below releases the session lock. */
2783   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
2784       (GDestroyNotify) g_object_unref);
2785   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2786       (GHFunc) clone_ssrcs_hashtable, table_copy);
2787
2788   /* Clean up the session, mark the source for removing, this might release the
2789    * session lock. */
2790   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
2791   g_hash_table_destroy (table_copy);
2792
2793   /* Now remove the marked sources */
2794   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
2795       (GHRFunc) remove_closing_sources, NULL);
2796
2797   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
2798     data.is_early = TRUE;
2799   else
2800     data.is_early = FALSE;
2801
2802   /* see if we need to generate SR or RR packets */
2803   if (is_rtcp_time (sess, current_time, &data)) {
2804     if (own->received_bye) {
2805       /* generate BYE instead */
2806       GST_DEBUG ("generating BYE message");
2807       session_bye (sess, &data);
2808       sess->sent_bye = TRUE;
2809     } else {
2810       /* loop over all known sources and do something */
2811       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2812           (GHFunc) session_report_blocks, &data);
2813     }
2814   }
2815
2816   if (data.rtcp) {
2817     /* we keep track of the last report time in order to timeout inactive
2818      * receivers or senders */
2819     if (!data.is_early && !data.may_suppress)
2820       sess->last_rtcp_send_time = data.current_time;
2821     sess->first_rtcp = FALSE;
2822     sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
2823
2824     /* add SDES for this source when not already added */
2825     if (!data.has_sdes)
2826       session_sdes (sess, &data);
2827   }
2828
2829   /* check for outdated collisions */
2830   GST_DEBUG ("Timing out collisions");
2831   rtp_source_timeout (sess->source, current_time,
2832       data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT);
2833
2834   if (sess->change_ssrc) {
2835     GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
2836     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
2837         GINT_TO_POINTER (own->ssrc));
2838
2839     own->ssrc = rtp_session_create_new_ssrc (sess);
2840     rtp_source_reset (own);
2841
2842     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
2843         GINT_TO_POINTER (own->ssrc), own);
2844
2845     g_free (sess->bye_reason);
2846     sess->bye_reason = NULL;
2847     sess->sent_bye = FALSE;
2848     sess->change_ssrc = FALSE;
2849     notify = TRUE;
2850     GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
2851   }
2852
2853   sess->allow_early = TRUE;
2854
2855   RTP_SESSION_UNLOCK (sess);
2856
2857   if (notify)
2858     g_object_notify (G_OBJECT (sess), "internal-ssrc");
2859
2860   /* push out the RTCP packet */
2861   if (data.rtcp) {
2862     gboolean do_not_suppress;
2863
2864     /* Give the user a change to add its own packet */
2865     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
2866         data.rtcp, data.is_early, &do_not_suppress);
2867
2868     if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
2869       guint packet_size;
2870
2871       /* close the RTCP packet */
2872       gst_rtcp_buffer_end (data.rtcp);
2873
2874       packet_size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
2875
2876       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
2877       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
2878           sess->stats.avg_rtcp_packet_size, packet_size);
2879       result =
2880           sess->callbacks.send_rtcp (sess, own, data.rtcp, sess->sent_bye,
2881           sess->send_rtcp_user_data);
2882     } else {
2883       GST_DEBUG ("freeing packet callback: %p"
2884           " do_not_suppress: %d may_suppress: %d",
2885           sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
2886       gst_buffer_unref (data.rtcp);
2887     }
2888   }
2889
2890   return result;
2891 }
2892
2893 void
2894 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
2895     GstClockTimeDiff max_delay)
2896 {
2897   GstClockTime T_dither_max;
2898
2899   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
2900
2901   RTP_SESSION_LOCK (sess);
2902
2903   /* Check if already requested */
2904   /*  RFC 4585 section 3.5.2 step 2 */
2905   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
2906     goto dont_send;
2907
2908   /* Ignore the request a scheduled packet will be in time anyway */
2909   if (current_time + max_delay > sess->next_rtcp_check_time)
2910     goto dont_send;
2911
2912   /*  RFC 4585 section 3.5.2 step 2b */
2913   /* If the total sources is <=2, then there is only us and one peer */
2914   if (sess->total_sources <= 2) {
2915     T_dither_max = 0;
2916   } else {
2917     /* Divide by 2 because l = 0.5 */
2918     T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
2919     T_dither_max /= 2;
2920   }
2921
2922   /*  RFC 4585 section 3.5.2 step 3 */
2923   if (current_time + T_dither_max > sess->next_rtcp_check_time)
2924     goto dont_send;
2925
2926   /*  RFC 4585 section 3.5.2 step 4 */
2927   if (sess->allow_early == FALSE)
2928     goto dont_send;
2929
2930   if (T_dither_max) {
2931     /* Schedule an early transmission later */
2932     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
2933         current_time;
2934   } else {
2935     /* If no dithering, schedule it for NOW */
2936     sess->next_early_rtcp_time = current_time;
2937   }
2938
2939   RTP_SESSION_UNLOCK (sess);
2940
2941   /* notify app of need to send packet early
2942    * and therefore of timeout change */
2943   if (sess->callbacks.reconsider)
2944     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2945
2946   return;
2947
2948 dont_send:
2949
2950   RTP_SESSION_UNLOCK (sess);
2951
2952 }