various (gst): add a missing G_PARAM_STATIC_STRINGS flags
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "gstrtpbin-marshal.h"
27 #include "rtpsession.h"
28
29 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
30 #define GST_CAT_DEFAULT rtp_session_debug
31
32 /* signals and args */
33 enum
34 {
35   SIGNAL_GET_SOURCE_BY_SSRC,
36   SIGNAL_ON_NEW_SSRC,
37   SIGNAL_ON_SSRC_COLLISION,
38   SIGNAL_ON_SSRC_VALIDATED,
39   SIGNAL_ON_SSRC_ACTIVE,
40   SIGNAL_ON_SSRC_SDES,
41   SIGNAL_ON_BYE_SSRC,
42   SIGNAL_ON_BYE_TIMEOUT,
43   SIGNAL_ON_TIMEOUT,
44   SIGNAL_ON_SENDER_TIMEOUT,
45   LAST_SIGNAL
46 };
47
48 #define DEFAULT_INTERNAL_SOURCE      NULL
49 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
50 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
51 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
52 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
53 #define DEFAULT_RTCP_MTU             1400
54 #define DEFAULT_SDES                 NULL
55 #define DEFAULT_NUM_SOURCES          0
56 #define DEFAULT_NUM_ACTIVE_SOURCES   0
57 #define DEFAULT_SOURCES              NULL
58
59 enum
60 {
61   PROP_0,
62   PROP_INTERNAL_SSRC,
63   PROP_INTERNAL_SOURCE,
64   PROP_BANDWIDTH,
65   PROP_RTCP_FRACTION,
66   PROP_RTCP_RR_BANDWIDTH,
67   PROP_RTCP_RS_BANDWIDTH,
68   PROP_RTCP_MTU,
69   PROP_SDES,
70   PROP_NUM_SOURCES,
71   PROP_NUM_ACTIVE_SOURCES,
72   PROP_SOURCES,
73   PROP_FAVOR_NEW,
74   PROP_LAST
75 };
76
77 /* update average packet size, we keep this scaled by 16 to keep enough
78  * precision. */
79 #define UPDATE_AVG(avg, val)            \
80   if ((avg) == 0)                       \
81    (avg) = (val) << 4;                  \
82   else                                  \
83    (avg) = ((val) + (15 * (avg)));
84
85 /* The number RTCP intervals after which to timeout entries in the
86  * collision table
87  */
88 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
89
90 /* GObject vmethods */
91 static void rtp_session_finalize (GObject * object);
92 static void rtp_session_set_property (GObject * object, guint prop_id,
93     const GValue * value, GParamSpec * pspec);
94 static void rtp_session_get_property (GObject * object, guint prop_id,
95     GValue * value, GParamSpec * pspec);
96
97 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
98
99 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
100
101 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
102     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
103 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
104     const gchar * reason, GstClockTime current_time);
105 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
106     gboolean deterministic, gboolean first);
107
108 static void
109 rtp_session_class_init (RTPSessionClass * klass)
110 {
111   GObjectClass *gobject_class;
112
113   gobject_class = (GObjectClass *) klass;
114
115   gobject_class->finalize = rtp_session_finalize;
116   gobject_class->set_property = rtp_session_set_property;
117   gobject_class->get_property = rtp_session_get_property;
118
119   /**
120    * RTPSession::get-source-by-ssrc:
121    * @session: the object which received the signal
122    * @ssrc: the SSRC of the RTPSource
123    *
124    * Request the #RTPSource object with SSRC @ssrc in @session.
125    */
126   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
127       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
128       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
129           get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
130       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
131
132   /**
133    * RTPSession::on-new-ssrc:
134    * @session: the object which received the signal
135    * @src: the new RTPSource
136    *
137    * Notify of a new SSRC that entered @session.
138    */
139   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
140       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
141       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
142       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
143       RTP_TYPE_SOURCE);
144   /**
145    * RTPSession::on-ssrc-collision:
146    * @session: the object which received the signal
147    * @src: the #RTPSource that caused a collision
148    *
149    * Notify when we have an SSRC collision
150    */
151   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
152       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
153       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
154       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
155       RTP_TYPE_SOURCE);
156   /**
157    * RTPSession::on-ssrc-validated:
158    * @session: the object which received the signal
159    * @src: the new validated RTPSource
160    *
161    * Notify of a new SSRC that became validated.
162    */
163   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
164       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
165       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
166       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
167       RTP_TYPE_SOURCE);
168   /**
169    * RTPSession::on-ssrc-active:
170    * @session: the object which received the signal
171    * @src: the active RTPSource
172    *
173    * Notify of a SSRC that is active, i.e., sending RTCP.
174    */
175   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
176       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
177       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
178       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
179       RTP_TYPE_SOURCE);
180   /**
181    * RTPSession::on-ssrc-sdes:
182    * @session: the object which received the signal
183    * @src: the RTPSource
184    *
185    * Notify that a new SDES was received for SSRC.
186    */
187   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
188       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
189       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
190       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
191       RTP_TYPE_SOURCE);
192   /**
193    * RTPSession::on-bye-ssrc:
194    * @session: the object which received the signal
195    * @src: the RTPSource that went away
196    *
197    * Notify of an SSRC that became inactive because of a BYE packet.
198    */
199   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
200       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
201       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
202       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
203       RTP_TYPE_SOURCE);
204   /**
205    * RTPSession::on-bye-timeout:
206    * @session: the object which received the signal
207    * @src: the RTPSource that timed out
208    *
209    * Notify of an SSRC that has timed out because of BYE
210    */
211   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
212       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
213       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
214       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
215       RTP_TYPE_SOURCE);
216   /**
217    * RTPSession::on-timeout:
218    * @session: the object which received the signal
219    * @src: the RTPSource that timed out
220    *
221    * Notify of an SSRC that has timed out
222    */
223   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
224       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
225       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
226       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
227       RTP_TYPE_SOURCE);
228   /**
229    * RTPSession::on-sender-timeout:
230    * @session: the object which received the signal
231    * @src: the RTPSource that timed out
232    *
233    * Notify of an SSRC that was a sender but timed out and became a receiver.
234    */
235   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
236       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
237       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
238       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
239       RTP_TYPE_SOURCE);
240
241   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
242       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
243           "The internal SSRC used for the session",
244           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
245
246   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
247       g_param_spec_object ("internal-source", "Internal Source",
248           "The internal source element of the session",
249           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
250
251   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
252       g_param_spec_double ("bandwidth", "Bandwidth",
253           "The bandwidth of the session (0 for auto-discover)",
254           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
255           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
256
257   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
258       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
259           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
260           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
261           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
262
263   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
264       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
265           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
266           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
267           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
268
269   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
270       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
271           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
272           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
273           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
274
275   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
276       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
277           "The maximum size of the RTCP packets",
278           16, G_MAXINT16, DEFAULT_RTCP_MTU,
279           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
280
281   g_object_class_install_property (gobject_class, PROP_SDES,
282       g_param_spec_boxed ("sdes", "SDES",
283           "The SDES items of this session",
284           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
285
286   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
287       g_param_spec_uint ("num-sources", "Num Sources",
288           "The number of sources in the session", 0, G_MAXUINT,
289           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
290
291   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
292       g_param_spec_uint ("num-active-sources", "Num Active Sources",
293           "The number of active sources in the session", 0, G_MAXUINT,
294           DEFAULT_NUM_ACTIVE_SOURCES,
295           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
296   /**
297    * RTPSource::sources
298    *
299    * Get a GValue Array of all sources in the session.
300    *
301    * <example>
302    * <title>Getting the #RTPSources of a session
303    * <programlisting>
304    * {
305    *   GValueArray *arr;
306    *   GValue *val;
307    *   guint i;
308    *
309    *   g_object_get (sess, "sources", &arr, NULL);
310    *
311    *   for (i = 0; i < arr->n_values; i++) {
312    *     RTPSource *source;
313    *
314    *     val = g_value_array_get_nth (arr, i);
315    *     source = g_value_get_object (val);
316    *   }
317    *   g_value_array_free (arr);
318    * }
319    * </programlisting>
320    * </example>
321    */
322   g_object_class_install_property (gobject_class, PROP_SOURCES,
323       g_param_spec_boxed ("sources", "Sources",
324           "An array of all known sources in the session",
325           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
326
327   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
328       g_param_spec_boolean ("favor-new", "Favor new sources",
329           "Resolve SSRC conflict in favor of new sources", FALSE,
330           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
331
332
333   klass->get_source_by_ssrc =
334       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
335
336   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
337 }
338
339 static void
340 rtp_session_init (RTPSession * sess)
341 {
342   gint i;
343   gchar *str;
344
345   sess->lock = g_mutex_new ();
346   sess->key = g_random_int ();
347   sess->mask_idx = 0;
348   sess->mask = 0;
349
350   for (i = 0; i < 32; i++) {
351     sess->ssrcs[i] =
352         g_hash_table_new_full (NULL, NULL, NULL,
353         (GDestroyNotify) g_object_unref);
354   }
355   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
356
357   rtp_stats_init_defaults (&sess->stats);
358
359   sess->recalc_bandwidth = TRUE;
360   sess->bandwidth = DEFAULT_BANDWIDTH;
361   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
362   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
363   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
364
365   /* create an active SSRC for this session manager */
366   sess->source = rtp_session_create_source (sess);
367   sess->source->validated = TRUE;
368   sess->source->internal = TRUE;
369   sess->stats.active_sources++;
370
371   /* default UDP header length */
372   sess->header_len = 28;
373   sess->mtu = DEFAULT_RTCP_MTU;
374
375   /* some default SDES entries */
376   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
377   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
378   g_free (str);
379
380   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME,
381       g_get_real_name ());
382   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
383
384   sess->first_rtcp = TRUE;
385
386   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
387 }
388
389 static void
390 rtp_session_finalize (GObject * object)
391 {
392   RTPSession *sess;
393   gint i;
394
395   sess = RTP_SESSION_CAST (object);
396
397   g_mutex_free (sess->lock);
398   for (i = 0; i < 32; i++)
399     g_hash_table_destroy (sess->ssrcs[i]);
400
401   g_free (sess->bye_reason);
402
403   g_hash_table_destroy (sess->cnames);
404   g_object_unref (sess->source);
405
406   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
407 }
408
409 static void
410 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
411 {
412   GValue value = { 0 };
413
414   g_value_init (&value, RTP_TYPE_SOURCE);
415   g_value_take_object (&value, source);
416   /* copies the value */
417   g_value_array_append (arr, &value);
418 }
419
420 static GValueArray *
421 rtp_session_create_sources (RTPSession * sess)
422 {
423   GValueArray *res;
424   guint size;
425
426   RTP_SESSION_LOCK (sess);
427   /* get number of elements in the table */
428   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
429   /* create the result value array */
430   res = g_value_array_new (size);
431
432   /* and copy all values into the array */
433   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
434   RTP_SESSION_UNLOCK (sess);
435
436   return res;
437 }
438
439 static void
440 rtp_session_set_property (GObject * object, guint prop_id,
441     const GValue * value, GParamSpec * pspec)
442 {
443   RTPSession *sess;
444
445   sess = RTP_SESSION (object);
446
447   switch (prop_id) {
448     case PROP_INTERNAL_SSRC:
449       rtp_session_set_internal_ssrc (sess, g_value_get_uint (value));
450       break;
451     case PROP_BANDWIDTH:
452       sess->bandwidth = g_value_get_double (value);
453       sess->recalc_bandwidth = TRUE;
454       break;
455     case PROP_RTCP_FRACTION:
456       sess->rtcp_bandwidth = g_value_get_double (value);
457       sess->recalc_bandwidth = TRUE;
458       break;
459     case PROP_RTCP_RR_BANDWIDTH:
460       sess->rtcp_rr_bandwidth = g_value_get_int (value);
461       sess->recalc_bandwidth = TRUE;
462       break;
463     case PROP_RTCP_RS_BANDWIDTH:
464       sess->rtcp_rs_bandwidth = g_value_get_int (value);
465       sess->recalc_bandwidth = TRUE;
466       break;
467     case PROP_RTCP_MTU:
468       sess->mtu = g_value_get_uint (value);
469       break;
470     case PROP_SDES:
471       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
472       break;
473     case PROP_FAVOR_NEW:
474       sess->favor_new = g_value_get_boolean (value);
475       break;
476     default:
477       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
478       break;
479   }
480 }
481
482 static void
483 rtp_session_get_property (GObject * object, guint prop_id,
484     GValue * value, GParamSpec * pspec)
485 {
486   RTPSession *sess;
487
488   sess = RTP_SESSION (object);
489
490   switch (prop_id) {
491     case PROP_INTERNAL_SSRC:
492       g_value_set_uint (value, rtp_session_get_internal_ssrc (sess));
493       break;
494     case PROP_INTERNAL_SOURCE:
495       g_value_take_object (value, rtp_session_get_internal_source (sess));
496       break;
497     case PROP_BANDWIDTH:
498       g_value_set_double (value, sess->bandwidth);
499       break;
500     case PROP_RTCP_FRACTION:
501       g_value_set_double (value, sess->rtcp_bandwidth);
502       break;
503     case PROP_RTCP_RR_BANDWIDTH:
504       g_value_set_int (value, sess->rtcp_rr_bandwidth);
505       break;
506     case PROP_RTCP_RS_BANDWIDTH:
507       g_value_set_int (value, sess->rtcp_rs_bandwidth);
508       break;
509     case PROP_RTCP_MTU:
510       g_value_set_uint (value, sess->mtu);
511       break;
512     case PROP_SDES:
513       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
514       break;
515     case PROP_NUM_SOURCES:
516       g_value_set_uint (value, rtp_session_get_num_sources (sess));
517       break;
518     case PROP_NUM_ACTIVE_SOURCES:
519       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
520       break;
521     case PROP_SOURCES:
522       g_value_take_boxed (value, rtp_session_create_sources (sess));
523       break;
524     case PROP_FAVOR_NEW:
525       g_value_set_boolean (value, sess->favor_new);
526       break;
527     default:
528       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
529       break;
530   }
531 }
532
533 static void
534 on_new_ssrc (RTPSession * sess, RTPSource * source)
535 {
536   g_object_ref (source);
537   RTP_SESSION_UNLOCK (sess);
538   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
539   RTP_SESSION_LOCK (sess);
540   g_object_unref (source);
541 }
542
543 static void
544 on_ssrc_collision (RTPSession * sess, RTPSource * source)
545 {
546   g_object_ref (source);
547   RTP_SESSION_UNLOCK (sess);
548   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
549       source);
550   RTP_SESSION_LOCK (sess);
551   g_object_unref (source);
552 }
553
554 static void
555 on_ssrc_validated (RTPSession * sess, RTPSource * source)
556 {
557   g_object_ref (source);
558   RTP_SESSION_UNLOCK (sess);
559   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
560       source);
561   RTP_SESSION_LOCK (sess);
562   g_object_unref (source);
563 }
564
565 static void
566 on_ssrc_active (RTPSession * sess, RTPSource * source)
567 {
568   g_object_ref (source);
569   RTP_SESSION_UNLOCK (sess);
570   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
571   RTP_SESSION_LOCK (sess);
572   g_object_unref (source);
573 }
574
575 static void
576 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
577 {
578   g_object_ref (source);
579   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
580   RTP_SESSION_UNLOCK (sess);
581   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
582   RTP_SESSION_LOCK (sess);
583   g_object_unref (source);
584 }
585
586 static void
587 on_bye_ssrc (RTPSession * sess, RTPSource * source)
588 {
589   g_object_ref (source);
590   RTP_SESSION_UNLOCK (sess);
591   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
592   RTP_SESSION_LOCK (sess);
593   g_object_unref (source);
594 }
595
596 static void
597 on_bye_timeout (RTPSession * sess, RTPSource * source)
598 {
599   g_object_ref (source);
600   RTP_SESSION_UNLOCK (sess);
601   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
602   RTP_SESSION_LOCK (sess);
603   g_object_unref (source);
604 }
605
606 static void
607 on_timeout (RTPSession * sess, RTPSource * source)
608 {
609   g_object_ref (source);
610   RTP_SESSION_UNLOCK (sess);
611   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
612   RTP_SESSION_LOCK (sess);
613   g_object_unref (source);
614 }
615
616 static void
617 on_sender_timeout (RTPSession * sess, RTPSource * source)
618 {
619   g_object_ref (source);
620   RTP_SESSION_UNLOCK (sess);
621   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
622       source);
623   RTP_SESSION_LOCK (sess);
624   g_object_unref (source);
625 }
626
627 /**
628  * rtp_session_new:
629  *
630  * Create a new session object.
631  *
632  * Returns: a new #RTPSession. g_object_unref() after usage.
633  */
634 RTPSession *
635 rtp_session_new (void)
636 {
637   RTPSession *sess;
638
639   sess = g_object_new (RTP_TYPE_SESSION, NULL);
640
641   return sess;
642 }
643
644 /**
645  * rtp_session_set_callbacks:
646  * @sess: an #RTPSession
647  * @callbacks: callbacks to configure
648  * @user_data: user data passed in the callbacks
649  *
650  * Configure a set of callbacks to be notified of actions.
651  */
652 void
653 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
654     gpointer user_data)
655 {
656   g_return_if_fail (RTP_IS_SESSION (sess));
657
658   if (callbacks->process_rtp) {
659     sess->callbacks.process_rtp = callbacks->process_rtp;
660     sess->process_rtp_user_data = user_data;
661   }
662   if (callbacks->send_rtp) {
663     sess->callbacks.send_rtp = callbacks->send_rtp;
664     sess->send_rtp_user_data = user_data;
665   }
666   if (callbacks->send_rtcp) {
667     sess->callbacks.send_rtcp = callbacks->send_rtcp;
668     sess->send_rtcp_user_data = user_data;
669   }
670   if (callbacks->sync_rtcp) {
671     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
672     sess->sync_rtcp_user_data = user_data;
673   }
674   if (callbacks->clock_rate) {
675     sess->callbacks.clock_rate = callbacks->clock_rate;
676     sess->clock_rate_user_data = user_data;
677   }
678   if (callbacks->reconsider) {
679     sess->callbacks.reconsider = callbacks->reconsider;
680     sess->reconsider_user_data = user_data;
681   }
682 }
683
684 /**
685  * rtp_session_set_process_rtp_callback:
686  * @sess: an #RTPSession
687  * @callback: callback to set
688  * @user_data: user data passed in the callback
689  *
690  * Configure only the process_rtp callback to be notified of the process_rtp action.
691  */
692 void
693 rtp_session_set_process_rtp_callback (RTPSession * sess,
694     RTPSessionProcessRTP callback, gpointer user_data)
695 {
696   g_return_if_fail (RTP_IS_SESSION (sess));
697
698   sess->callbacks.process_rtp = callback;
699   sess->process_rtp_user_data = user_data;
700 }
701
702 /**
703  * rtp_session_set_send_rtp_callback:
704  * @sess: an #RTPSession
705  * @callback: callback to set
706  * @user_data: user data passed in the callback
707  *
708  * Configure only the send_rtp callback to be notified of the send_rtp action.
709  */
710 void
711 rtp_session_set_send_rtp_callback (RTPSession * sess,
712     RTPSessionSendRTP callback, gpointer user_data)
713 {
714   g_return_if_fail (RTP_IS_SESSION (sess));
715
716   sess->callbacks.send_rtp = callback;
717   sess->send_rtp_user_data = user_data;
718 }
719
720 /**
721  * rtp_session_set_send_rtcp_callback:
722  * @sess: an #RTPSession
723  * @callback: callback to set
724  * @user_data: user data passed in the callback
725  *
726  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
727  */
728 void
729 rtp_session_set_send_rtcp_callback (RTPSession * sess,
730     RTPSessionSendRTCP callback, gpointer user_data)
731 {
732   g_return_if_fail (RTP_IS_SESSION (sess));
733
734   sess->callbacks.send_rtcp = callback;
735   sess->send_rtcp_user_data = user_data;
736 }
737
738 /**
739  * rtp_session_set_sync_rtcp_callback:
740  * @sess: an #RTPSession
741  * @callback: callback to set
742  * @user_data: user data passed in the callback
743  *
744  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
745  */
746 void
747 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
748     RTPSessionSyncRTCP callback, gpointer user_data)
749 {
750   g_return_if_fail (RTP_IS_SESSION (sess));
751
752   sess->callbacks.sync_rtcp = callback;
753   sess->sync_rtcp_user_data = user_data;
754 }
755
756 /**
757  * rtp_session_set_clock_rate_callback:
758  * @sess: an #RTPSession
759  * @callback: callback to set
760  * @user_data: user data passed in the callback
761  *
762  * Configure only the clock_rate callback to be notified of the clock_rate action.
763  */
764 void
765 rtp_session_set_clock_rate_callback (RTPSession * sess,
766     RTPSessionClockRate callback, gpointer user_data)
767 {
768   g_return_if_fail (RTP_IS_SESSION (sess));
769
770   sess->callbacks.clock_rate = callback;
771   sess->clock_rate_user_data = user_data;
772 }
773
774 /**
775  * rtp_session_set_reconsider_callback:
776  * @sess: an #RTPSession
777  * @callback: callback to set
778  * @user_data: user data passed in the callback
779  *
780  * Configure only the reconsider callback to be notified of the reconsider action.
781  */
782 void
783 rtp_session_set_reconsider_callback (RTPSession * sess,
784     RTPSessionReconsider callback, gpointer user_data)
785 {
786   g_return_if_fail (RTP_IS_SESSION (sess));
787
788   sess->callbacks.reconsider = callback;
789   sess->reconsider_user_data = user_data;
790 }
791
792 /**
793  * rtp_session_set_bandwidth:
794  * @sess: an #RTPSession
795  * @bandwidth: the bandwidth allocated
796  *
797  * Set the session bandwidth in bytes per second.
798  */
799 void
800 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
801 {
802   g_return_if_fail (RTP_IS_SESSION (sess));
803
804   RTP_SESSION_LOCK (sess);
805   sess->stats.bandwidth = bandwidth;
806   RTP_SESSION_UNLOCK (sess);
807 }
808
809 /**
810  * rtp_session_get_bandwidth:
811  * @sess: an #RTPSession
812  *
813  * Get the session bandwidth.
814  *
815  * Returns: the session bandwidth.
816  */
817 gdouble
818 rtp_session_get_bandwidth (RTPSession * sess)
819 {
820   gdouble result;
821
822   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
823
824   RTP_SESSION_LOCK (sess);
825   result = sess->stats.bandwidth;
826   RTP_SESSION_UNLOCK (sess);
827
828   return result;
829 }
830
831 /**
832  * rtp_session_set_rtcp_fraction:
833  * @sess: an #RTPSession
834  * @bandwidth: the RTCP bandwidth
835  *
836  * Set the bandwidth in bytes per second that should be used for RTCP
837  * messages.
838  */
839 void
840 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
841 {
842   g_return_if_fail (RTP_IS_SESSION (sess));
843
844   RTP_SESSION_LOCK (sess);
845   sess->stats.rtcp_bandwidth = bandwidth;
846   RTP_SESSION_UNLOCK (sess);
847 }
848
849 /**
850  * rtp_session_get_rtcp_fraction:
851  * @sess: an #RTPSession
852  *
853  * Get the session bandwidth used for RTCP.
854  *
855  * Returns: The bandwidth used for RTCP messages.
856  */
857 gdouble
858 rtp_session_get_rtcp_fraction (RTPSession * sess)
859 {
860   gdouble result;
861
862   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
863
864   RTP_SESSION_LOCK (sess);
865   result = sess->stats.rtcp_bandwidth;
866   RTP_SESSION_UNLOCK (sess);
867
868   return result;
869 }
870
871 /**
872  * rtp_session_set_sdes_string:
873  * @sess: an #RTPSession
874  * @type: the type of the SDES item
875  * @item: a null-terminated string to set.
876  *
877  * Store an SDES item of @type in @sess.
878  *
879  * Returns: %FALSE if the data was unchanged @type is invalid.
880  */
881 gboolean
882 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
883     const gchar * item)
884 {
885   gboolean result;
886
887   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
888
889   RTP_SESSION_LOCK (sess);
890   result = rtp_source_set_sdes_string (sess->source, type, item);
891   RTP_SESSION_UNLOCK (sess);
892
893   return result;
894 }
895
896 /**
897  * rtp_session_get_sdes_string:
898  * @sess: an #RTPSession
899  * @type: the type of the SDES item
900  *
901  * Get the SDES item of @type from @sess.
902  *
903  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
904  * valid. g_free() after usage.
905  */
906 gchar *
907 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
908 {
909   gchar *result;
910
911   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
912
913   RTP_SESSION_LOCK (sess);
914   result = rtp_source_get_sdes_string (sess->source, type);
915   RTP_SESSION_UNLOCK (sess);
916
917   return result;
918 }
919
920 /**
921  * rtp_session_get_sdes_struct:
922  * @sess: an #RTSPSession
923  *
924  * Get the SDES data as a #GstStructure
925  *
926  * Returns: a GstStructure with SDES items for @sess. This function returns a
927  * copy of the SDES structure, use gst_structure_free() after usage.
928  */
929 GstStructure *
930 rtp_session_get_sdes_struct (RTPSession * sess)
931 {
932   const GstStructure *sdes;
933   GstStructure *result = NULL;
934
935   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
936
937   RTP_SESSION_LOCK (sess);
938   sdes = rtp_source_get_sdes_struct (sess->source);
939   if (sdes)
940     result = gst_structure_copy (sdes);
941   RTP_SESSION_UNLOCK (sess);
942
943   return result;
944 }
945
946 /**
947  * rtp_session_set_sdes_struct:
948  * @sess: an #RTSPSession
949  * @sdes: a #GstStructure
950  *
951  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
952  */
953 void
954 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
955 {
956   g_return_if_fail (sdes);
957   g_return_if_fail (RTP_IS_SESSION (sess));
958
959   RTP_SESSION_LOCK (sess);
960   rtp_source_set_sdes_struct (sess->source, gst_structure_copy (sdes));
961   RTP_SESSION_UNLOCK (sess);
962 }
963
964 static GstFlowReturn
965 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
966 {
967   GstFlowReturn result = GST_FLOW_OK;
968
969   if (source == session->source) {
970     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
971
972     RTP_SESSION_UNLOCK (session);
973
974     if (session->callbacks.send_rtp)
975       result =
976           session->callbacks.send_rtp (session, source, data,
977           session->send_rtp_user_data);
978     else {
979       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
980     }
981   } else {
982     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
983     RTP_SESSION_UNLOCK (session);
984
985     if (session->callbacks.process_rtp)
986       result =
987           session->callbacks.process_rtp (session, source,
988           GST_BUFFER_CAST (data), session->process_rtp_user_data);
989     else
990       gst_buffer_unref (GST_BUFFER_CAST (data));
991   }
992   RTP_SESSION_LOCK (session);
993
994   return result;
995 }
996
997 static gint
998 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
999 {
1000   gint result;
1001
1002   RTP_SESSION_UNLOCK (session);
1003
1004   if (session->callbacks.clock_rate)
1005     result =
1006         session->callbacks.clock_rate (session, pt,
1007         session->clock_rate_user_data);
1008   else
1009     result = -1;
1010
1011   RTP_SESSION_LOCK (session);
1012
1013   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1014
1015   return result;
1016 }
1017
1018 static RTPSourceCallbacks callbacks = {
1019   (RTPSourcePushRTP) source_push_rtp,
1020   (RTPSourceClockRate) source_clock_rate,
1021 };
1022
1023 static gboolean
1024 check_collision (RTPSession * sess, RTPSource * source,
1025     RTPArrivalStats * arrival, gboolean rtp)
1026 {
1027   /* If we have no arrival address, we can't do collision checking */
1028   if (!arrival->have_address)
1029     return FALSE;
1030
1031   if (sess->source != source) {
1032     GstNetAddress *from;
1033     gboolean have_from;
1034
1035     /* This is not our local source, but lets check if two remote
1036      * source collide
1037      */
1038
1039     if (rtp) {
1040       from = &source->rtp_from;
1041       have_from = source->have_rtp_from;
1042     } else {
1043       from = &source->rtcp_from;
1044       have_from = source->have_rtcp_from;
1045     }
1046
1047     if (have_from) {
1048       if (gst_netaddress_equal (from, &arrival->address)) {
1049         /* Address is the same */
1050         return FALSE;
1051       } else {
1052         GST_LOG ("we have a third-party collision or loop ssrc:%x",
1053             rtp_source_get_ssrc (source));
1054         if (sess->favor_new) {
1055           if (rtp_source_find_conflicting_address (source,
1056                   &arrival->address, arrival->current_time)) {
1057             gchar buf1[40];
1058             gst_netaddress_to_string (&arrival->address, buf1, 40);
1059             GST_LOG ("Known conflict on %x for %s, dropping packet",
1060                 rtp_source_get_ssrc (source), buf1);
1061             return TRUE;
1062           } else {
1063             gchar buf1[40], buf2[40];
1064
1065             /* Current address is not a known conflict, lets assume this is
1066              * a new source. Save old address in possible conflict list
1067              */
1068             rtp_source_add_conflicting_address (source, from,
1069                 arrival->current_time);
1070
1071             gst_netaddress_to_string (from, buf1, 40);
1072             gst_netaddress_to_string (&arrival->address, buf2, 40);
1073             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1074                 " saving old as known conflict",
1075                 rtp_source_get_ssrc (source), buf1, buf2);
1076
1077             if (rtp)
1078               rtp_source_set_rtp_from (source, &arrival->address);
1079             else
1080               rtp_source_set_rtcp_from (source, &arrival->address);
1081             return FALSE;
1082           }
1083         } else {
1084           /* Don't need to save old addresses, we ignore new sources */
1085           return TRUE;
1086         }
1087       }
1088     } else {
1089       /* We don't already have a from address for RTP, just set it */
1090       if (rtp)
1091         rtp_source_set_rtp_from (source, &arrival->address);
1092       else
1093         rtp_source_set_rtcp_from (source, &arrival->address);
1094       return FALSE;
1095     }
1096
1097     /* FIXME: Log 3rd party collision somehow
1098      * Maybe should be done in upper layer, only the SDES can tell us
1099      * if its a collision or a loop
1100      */
1101
1102     /* If the source has been inactive for some time, we assume that it has
1103      * simply changed its transport source address. Hence, there is no true
1104      * third-party collision - only a simulated one. */
1105     if (arrival->current_time > source->last_activity) {
1106       GstClockTime inactivity_period =
1107           arrival->current_time - source->last_activity;
1108       if (inactivity_period > 1 * GST_SECOND) {
1109         /* Use new network address */
1110         if (rtp) {
1111           g_assert (source->have_rtp_from);
1112           rtp_source_set_rtp_from (source, &arrival->address);
1113         } else {
1114           g_assert (source->have_rtcp_from);
1115           rtp_source_set_rtcp_from (source, &arrival->address);
1116         }
1117         return FALSE;
1118       }
1119     }
1120   } else {
1121     /* This is sending with our ssrc, is it an address we already know */
1122
1123     if (rtp_source_find_conflicting_address (source, &arrival->address,
1124             arrival->current_time)) {
1125       /* Its a known conflict, its probably a loop, not a collision
1126        * lets just drop the incoming packet
1127        */
1128       GST_DEBUG ("Our packets are being looped back to us, dropping");
1129     } else {
1130       /* Its a new collision, lets change our SSRC */
1131
1132       rtp_source_add_conflicting_address (source, &arrival->address,
1133           arrival->current_time);
1134
1135       GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
1136       on_ssrc_collision (sess, source);
1137
1138       rtp_session_schedule_bye_locked (sess, "SSRC Collision",
1139           arrival->current_time);
1140
1141       sess->change_ssrc = TRUE;
1142     }
1143   }
1144
1145   return TRUE;
1146 }
1147
1148
1149 /* must be called with the session lock, the returned source needs to be
1150  * unreffed after usage. */
1151 static RTPSource *
1152 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1153     RTPArrivalStats * arrival, gboolean rtp)
1154 {
1155   RTPSource *source;
1156
1157   source =
1158       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1159   if (source == NULL) {
1160     /* make new Source in probation and insert */
1161     source = rtp_source_new (ssrc);
1162
1163     /* for RTP packets we need to set the source in probation. Receiving RTCP
1164      * packets of an SSRC, on the other hand, is a strong indication that we
1165      * are dealing with a valid source. */
1166     if (rtp)
1167       source->probation = RTP_DEFAULT_PROBATION;
1168     else
1169       source->probation = 0;
1170
1171     /* store from address, if any */
1172     if (arrival->have_address) {
1173       if (rtp)
1174         rtp_source_set_rtp_from (source, &arrival->address);
1175       else
1176         rtp_source_set_rtcp_from (source, &arrival->address);
1177     }
1178
1179     /* configure a callback on the source */
1180     rtp_source_set_callbacks (source, &callbacks, sess);
1181
1182     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1183         source);
1184
1185     /* we have one more source now */
1186     sess->total_sources++;
1187     *created = TRUE;
1188   } else {
1189     *created = FALSE;
1190     /* check for collision, this updates the address when not previously set */
1191     if (check_collision (sess, source, arrival, rtp)) {
1192       return NULL;
1193     }
1194   }
1195   /* update last activity */
1196   source->last_activity = arrival->current_time;
1197   if (rtp)
1198     source->last_rtp_activity = arrival->current_time;
1199   g_object_ref (source);
1200
1201   return source;
1202 }
1203
1204 /**
1205  * rtp_session_get_internal_source:
1206  * @sess: a #RTPSession
1207  *
1208  * Get the internal #RTPSource of @sess.
1209  *
1210  * Returns: The internal #RTPSource. g_object_unref() after usage.
1211  */
1212 RTPSource *
1213 rtp_session_get_internal_source (RTPSession * sess)
1214 {
1215   RTPSource *result;
1216
1217   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1218
1219   result = g_object_ref (sess->source);
1220
1221   return result;
1222 }
1223
1224 /**
1225  * rtp_session_set_internal_ssrc:
1226  * @sess: a #RTPSession
1227  * @ssrc: an SSRC
1228  *
1229  * Set the SSRC of @sess to @ssrc.
1230  */
1231 void
1232 rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
1233 {
1234   RTP_SESSION_LOCK (sess);
1235   if (ssrc != sess->source->ssrc) {
1236     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
1237         GINT_TO_POINTER (sess->source->ssrc));
1238
1239     GST_DEBUG ("setting internal SSRC to %08x", ssrc);
1240     /* After this call, any receiver of the old SSRC either in RTP or RTCP
1241      * packets will timeout on the old SSRC, we could potentially schedule a
1242      * BYE RTCP for the old SSRC... */
1243     sess->source->ssrc = ssrc;
1244     rtp_source_reset (sess->source);
1245
1246     /* rehash with the new SSRC */
1247     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1248         GINT_TO_POINTER (sess->source->ssrc), sess->source);
1249   }
1250   RTP_SESSION_UNLOCK (sess);
1251
1252   g_object_notify (G_OBJECT (sess), "internal-ssrc");
1253 }
1254
1255 /**
1256  * rtp_session_get_internal_ssrc:
1257  * @sess: a #RTPSession
1258  *
1259  * Get the internal SSRC of @sess.
1260  *
1261  * Returns: The SSRC of the session.
1262  */
1263 guint32
1264 rtp_session_get_internal_ssrc (RTPSession * sess)
1265 {
1266   guint32 ssrc;
1267
1268   RTP_SESSION_LOCK (sess);
1269   ssrc = sess->source->ssrc;
1270   RTP_SESSION_UNLOCK (sess);
1271
1272   return ssrc;
1273 }
1274
1275 /**
1276  * rtp_session_add_source:
1277  * @sess: a #RTPSession
1278  * @src: #RTPSource to add
1279  *
1280  * Add @src to @session.
1281  *
1282  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1283  * existed in the session.
1284  */
1285 gboolean
1286 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1287 {
1288   gboolean result = FALSE;
1289   RTPSource *find;
1290
1291   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1292   g_return_val_if_fail (src != NULL, FALSE);
1293
1294   RTP_SESSION_LOCK (sess);
1295   find =
1296       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1297       GINT_TO_POINTER (src->ssrc));
1298   if (find == NULL) {
1299     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1300         GINT_TO_POINTER (src->ssrc), src);
1301     /* we have one more source now */
1302     sess->total_sources++;
1303     result = TRUE;
1304   }
1305   RTP_SESSION_UNLOCK (sess);
1306
1307   return result;
1308 }
1309
1310 /**
1311  * rtp_session_get_num_sources:
1312  * @sess: an #RTPSession
1313  *
1314  * Get the number of sources in @sess.
1315  *
1316  * Returns: The number of sources in @sess.
1317  */
1318 guint
1319 rtp_session_get_num_sources (RTPSession * sess)
1320 {
1321   guint result;
1322
1323   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1324
1325   RTP_SESSION_LOCK (sess);
1326   result = sess->total_sources;
1327   RTP_SESSION_UNLOCK (sess);
1328
1329   return result;
1330 }
1331
1332 /**
1333  * rtp_session_get_num_active_sources:
1334  * @sess: an #RTPSession
1335  *
1336  * Get the number of active sources in @sess. A source is considered active when
1337  * it has been validated and has not yet received a BYE RTCP message.
1338  *
1339  * Returns: The number of active sources in @sess.
1340  */
1341 guint
1342 rtp_session_get_num_active_sources (RTPSession * sess)
1343 {
1344   guint result;
1345
1346   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1347
1348   RTP_SESSION_LOCK (sess);
1349   result = sess->stats.active_sources;
1350   RTP_SESSION_UNLOCK (sess);
1351
1352   return result;
1353 }
1354
1355 /**
1356  * rtp_session_get_source_by_ssrc:
1357  * @sess: an #RTPSession
1358  * @ssrc: an SSRC
1359  *
1360  * Find the source with @ssrc in @sess.
1361  *
1362  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1363  * g_object_unref() after usage.
1364  */
1365 RTPSource *
1366 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1367 {
1368   RTPSource *result;
1369
1370   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1371
1372   RTP_SESSION_LOCK (sess);
1373   result =
1374       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1375   if (result)
1376     g_object_ref (result);
1377   RTP_SESSION_UNLOCK (sess);
1378
1379   return result;
1380 }
1381
1382 /**
1383  * rtp_session_get_source_by_cname:
1384  * @sess: a #RTPSession
1385  * @cname: an CNAME
1386  *
1387  * Find the source with @cname in @sess.
1388  *
1389  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
1390  * g_object_unref() after usage.
1391  */
1392 RTPSource *
1393 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
1394 {
1395   RTPSource *result;
1396
1397   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1398   g_return_val_if_fail (cname != NULL, NULL);
1399
1400   RTP_SESSION_LOCK (sess);
1401   result = g_hash_table_lookup (sess->cnames, cname);
1402   if (result)
1403     g_object_ref (result);
1404   RTP_SESSION_UNLOCK (sess);
1405
1406   return result;
1407 }
1408
1409 /* should be called with the SESSION lock */
1410 static guint32
1411 rtp_session_create_new_ssrc (RTPSession * sess)
1412 {
1413   guint32 ssrc;
1414
1415   while (TRUE) {
1416     ssrc = g_random_int ();
1417
1418     /* see if it exists in the session, we're done if it doesn't */
1419     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1420             GINT_TO_POINTER (ssrc)) == NULL)
1421       break;
1422   }
1423   return ssrc;
1424 }
1425
1426
1427 /**
1428  * rtp_session_create_source:
1429  * @sess: an #RTPSession
1430  *
1431  * Create an #RTPSource for use in @sess. This function will create a source
1432  * with an ssrc that is currently not used by any participants in the session.
1433  *
1434  * Returns: an #RTPSource.
1435  */
1436 RTPSource *
1437 rtp_session_create_source (RTPSession * sess)
1438 {
1439   guint32 ssrc;
1440   RTPSource *source;
1441
1442   RTP_SESSION_LOCK (sess);
1443   ssrc = rtp_session_create_new_ssrc (sess);
1444   source = rtp_source_new (ssrc);
1445   rtp_source_set_callbacks (source, &callbacks, sess);
1446   /* we need an additional ref for the source in the hashtable */
1447   g_object_ref (source);
1448   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1449       source);
1450   /* we have one more source now */
1451   sess->total_sources++;
1452   RTP_SESSION_UNLOCK (sess);
1453
1454   return source;
1455 }
1456
1457 /* update the RTPArrivalStats structure with the current time and other bits
1458  * about the current buffer we are handling.
1459  * This function is typically called when a validated packet is received.
1460  * This function should be called with the SESSION_LOCK
1461  */
1462 static void
1463 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1464     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1465     GstClockTime running_time)
1466 {
1467   /* get time of arrival */
1468   arrival->current_time = current_time;
1469   arrival->running_time = running_time;
1470
1471   /* get packet size including header overhead */
1472   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
1473
1474   if (rtp) {
1475     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1476   } else {
1477     arrival->payload_len = 0;
1478   }
1479
1480   /* for netbuffer we can store the IP address to check for collisions */
1481   arrival->have_address = GST_IS_NETBUFFER (buffer);
1482   if (arrival->have_address) {
1483     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1484
1485     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1486   }
1487 }
1488
1489 /**
1490  * rtp_session_process_rtp:
1491  * @sess: and #RTPSession
1492  * @buffer: an RTP buffer
1493  * @current_time: the current system time
1494  * @running_time: the running_time of @buffer
1495  *
1496  * Process an RTP buffer in the session manager. This function takes ownership
1497  * of @buffer.
1498  *
1499  * Returns: a #GstFlowReturn.
1500  */
1501 GstFlowReturn
1502 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1503     GstClockTime current_time, GstClockTime running_time)
1504 {
1505   GstFlowReturn result;
1506   guint32 ssrc;
1507   RTPSource *source;
1508   gboolean created;
1509   gboolean prevsender, prevactive;
1510   RTPArrivalStats arrival;
1511   guint32 csrcs[16];
1512   guint8 i, count;
1513   guint64 oldrate;
1514
1515   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1516   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1517
1518   if (!gst_rtp_buffer_validate (buffer))
1519     goto invalid_packet;
1520
1521   RTP_SESSION_LOCK (sess);
1522   /* update arrival stats */
1523   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1524       running_time);
1525
1526   /* ignore more RTP packets when we left the session */
1527   if (sess->source->received_bye)
1528     goto ignore;
1529
1530   /* get SSRC and look up in session database */
1531   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1532   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1533   if (!source)
1534     goto collision;
1535
1536   prevsender = RTP_SOURCE_IS_SENDER (source);
1537   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1538   oldrate = source->bitrate;
1539
1540   /* copy available csrc for later */
1541   count = gst_rtp_buffer_get_csrc_count (buffer);
1542   /* make sure to not overflow our array. An RTP buffer can maximally contain
1543    * 16 CSRCs */
1544   count = MIN (count, 16);
1545
1546   for (i = 0; i < count; i++)
1547     csrcs[i] = gst_rtp_buffer_get_csrc (buffer, i);
1548
1549   /* let source process the packet */
1550   result = rtp_source_process_rtp (source, buffer, &arrival);
1551
1552   /* source became active */
1553   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1554     sess->stats.active_sources++;
1555     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1556         sess->stats.active_sources);
1557     on_ssrc_validated (sess, source);
1558   }
1559   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1560     sess->stats.sender_sources++;
1561     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1562         sess->stats.sender_sources);
1563   }
1564   if (oldrate != source->bitrate)
1565     sess->recalc_bandwidth = TRUE;
1566
1567   if (created)
1568     on_new_ssrc (sess, source);
1569
1570   if (source->validated) {
1571     gboolean created;
1572
1573     /* for validated sources, we add the CSRCs as well */
1574     for (i = 0; i < count; i++) {
1575       guint32 csrc;
1576       RTPSource *csrc_src;
1577
1578       csrc = csrcs[i];
1579
1580       /* get source */
1581       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1582       if (!csrc_src)
1583         continue;
1584
1585       if (created) {
1586         GST_DEBUG ("created new CSRC: %08x", csrc);
1587         rtp_source_set_as_csrc (csrc_src);
1588         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1589           sess->stats.active_sources++;
1590         on_new_ssrc (sess, csrc_src);
1591       }
1592       g_object_unref (csrc_src);
1593     }
1594   }
1595   g_object_unref (source);
1596
1597   RTP_SESSION_UNLOCK (sess);
1598
1599   return result;
1600
1601   /* ERRORS */
1602 invalid_packet:
1603   {
1604     gst_buffer_unref (buffer);
1605     GST_DEBUG ("invalid RTP packet received");
1606     return GST_FLOW_OK;
1607   }
1608 ignore:
1609   {
1610     gst_buffer_unref (buffer);
1611     RTP_SESSION_UNLOCK (sess);
1612     GST_DEBUG ("ignoring RTP packet because we are leaving");
1613     return GST_FLOW_OK;
1614   }
1615 collision:
1616   {
1617     gst_buffer_unref (buffer);
1618     RTP_SESSION_UNLOCK (sess);
1619     GST_DEBUG ("ignoring packet because its collisioning");
1620     return GST_FLOW_OK;
1621   }
1622 }
1623
1624 static void
1625 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1626     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1627 {
1628   guint count, i;
1629
1630   count = gst_rtcp_packet_get_rb_count (packet);
1631   for (i = 0; i < count; i++) {
1632     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1633     guint8 fractionlost;
1634     gint32 packetslost;
1635
1636     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1637         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1638
1639     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1640
1641     if (ssrc == sess->source->ssrc) {
1642       /* only deal with report blocks for our session, we update the stats of
1643        * the sender of the RTCP message. We could also compare our stats against
1644        * the other sender to see if we are better or worse. */
1645       rtp_source_process_rb (source, arrival->current_time, fractionlost,
1646           packetslost, exthighestseq, jitter, lsr, dlsr);
1647
1648       on_ssrc_active (sess, source);
1649     }
1650   }
1651 }
1652
1653 /* A Sender report contains statistics about how the sender is doing. This
1654  * includes timing informataion such as the relation between RTP and NTP
1655  * timestamps and the number of packets/bytes it sent to us.
1656  *
1657  * In this report is also included a set of report blocks related to how this
1658  * sender is receiving data (in case we (or somebody else) is also sending stuff
1659  * to it). This info includes the packet loss, jitter and seqnum. It also
1660  * contains information to calculate the round trip time (LSR/DLSR).
1661  */
1662 static void
1663 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1664     RTPArrivalStats * arrival, gboolean * do_sync)
1665 {
1666   guint32 senderssrc, rtptime, packet_count, octet_count;
1667   guint64 ntptime;
1668   RTPSource *source;
1669   gboolean created, prevsender;
1670
1671   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1672       &packet_count, &octet_count);
1673
1674   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1675       senderssrc, GST_TIME_ARGS (arrival->current_time));
1676
1677   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1678   if (!source)
1679     return;
1680
1681   /* don't try to do lip-sync for sources that sent a BYE */
1682   if (rtp_source_received_bye (source))
1683     *do_sync = FALSE;
1684   else
1685     *do_sync = TRUE;
1686
1687   prevsender = RTP_SOURCE_IS_SENDER (source);
1688
1689   /* first update the source */
1690   rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
1691       packet_count, octet_count);
1692
1693   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1694     sess->stats.sender_sources++;
1695     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1696         sess->stats.sender_sources);
1697   }
1698
1699   if (created)
1700     on_new_ssrc (sess, source);
1701
1702   rtp_session_process_rb (sess, source, packet, arrival);
1703   g_object_unref (source);
1704 }
1705
1706 /* A receiver report contains statistics about how a receiver is doing. It
1707  * includes stuff like packet loss, jitter and the seqnum it received last. It
1708  * also contains info to calculate the round trip time.
1709  *
1710  * We are only interested in how the sender of this report is doing wrt to us.
1711  */
1712 static void
1713 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1714     RTPArrivalStats * arrival)
1715 {
1716   guint32 senderssrc;
1717   RTPSource *source;
1718   gboolean created;
1719
1720   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1721
1722   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1723
1724   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1725   if (!source)
1726     return;
1727
1728   if (created)
1729     on_new_ssrc (sess, source);
1730
1731   rtp_session_process_rb (sess, source, packet, arrival);
1732   g_object_unref (source);
1733 }
1734
1735 /* Get SDES items and store them in the SSRC */
1736 static void
1737 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1738     RTPArrivalStats * arrival)
1739 {
1740   guint items, i, j;
1741   gboolean more_items, more_entries;
1742
1743   items = gst_rtcp_packet_sdes_get_item_count (packet);
1744   GST_DEBUG ("got SDES packet with %d items", items);
1745
1746   more_items = gst_rtcp_packet_sdes_first_item (packet);
1747   i = 0;
1748   while (more_items) {
1749     guint32 ssrc;
1750     gboolean changed, created;
1751     RTPSource *source;
1752     GstStructure *sdes;
1753
1754     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1755
1756     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1757
1758     changed = FALSE;
1759
1760     /* find src, no probation when dealing with RTCP */
1761     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1762     if (!source)
1763       return;
1764
1765     sdes = gst_structure_new ("application/x-rtp-source-sdes", NULL);
1766
1767     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1768     j = 0;
1769     while (more_entries) {
1770       GstRTCPSDESType type;
1771       guint8 len;
1772       guint8 *data;
1773       gchar *name;
1774       gchar *value;
1775
1776       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1777
1778       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1779           data);
1780
1781       if (type == GST_RTCP_SDES_PRIV) {
1782         name = g_strndup ((const gchar *) &data[1], data[0]);
1783         len -= data[0] + 1;
1784         data += data[0] + 1;
1785       } else {
1786         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
1787       }
1788
1789       value = g_strndup ((const gchar *) data, len);
1790
1791       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
1792
1793       g_free (name);
1794       g_free (value);
1795
1796       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1797       j++;
1798     }
1799
1800     /* takes ownership of sdes */
1801     changed = rtp_source_set_sdes_struct (source, sdes);
1802
1803     source->validated = TRUE;
1804
1805     if (created)
1806       on_new_ssrc (sess, source);
1807     if (changed)
1808       on_ssrc_sdes (sess, source);
1809
1810     g_object_unref (source);
1811
1812     more_items = gst_rtcp_packet_sdes_next_item (packet);
1813     i++;
1814   }
1815 }
1816
1817 /* BYE is sent when a client leaves the session
1818  */
1819 static void
1820 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1821     RTPArrivalStats * arrival)
1822 {
1823   guint count, i;
1824   gchar *reason;
1825   gboolean reconsider = FALSE;
1826
1827   reason = gst_rtcp_packet_bye_get_reason (packet);
1828   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1829
1830   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1831   for (i = 0; i < count; i++) {
1832     guint32 ssrc;
1833     RTPSource *source;
1834     gboolean created, prevactive, prevsender;
1835     guint pmembers, members;
1836
1837     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1838     GST_DEBUG ("SSRC: %08x", ssrc);
1839
1840     /* find src and mark bye, no probation when dealing with RTCP */
1841     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1842     if (!source)
1843       return;
1844
1845     /* store time for when we need to time out this source */
1846     source->bye_time = arrival->current_time;
1847
1848     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1849     prevsender = RTP_SOURCE_IS_SENDER (source);
1850
1851     /* let the source handle the rest */
1852     rtp_source_process_bye (source, reason);
1853
1854     pmembers = sess->stats.active_sources;
1855
1856     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1857       sess->stats.active_sources--;
1858       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1859           sess->stats.active_sources);
1860     }
1861     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1862       sess->stats.sender_sources--;
1863       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1864           sess->stats.sender_sources);
1865     }
1866     members = sess->stats.active_sources;
1867
1868     if (!sess->source->received_bye && members < pmembers) {
1869       /* some members went away since the previous timeout estimate.
1870        * Perform reverse reconsideration but only when we are not scheduling a
1871        * BYE ourselves. */
1872       if (arrival->current_time < sess->next_rtcp_check_time) {
1873         GstClockTime time_remaining;
1874
1875         time_remaining = sess->next_rtcp_check_time - arrival->current_time;
1876         sess->next_rtcp_check_time =
1877             gst_util_uint64_scale (time_remaining, members, pmembers);
1878
1879         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1880             GST_TIME_ARGS (sess->next_rtcp_check_time));
1881
1882         sess->next_rtcp_check_time += arrival->current_time;
1883
1884         /* mark pending reconsider. We only want to signal the reconsideration
1885          * once after we handled all the source in the bye packet */
1886         reconsider = TRUE;
1887       }
1888     }
1889
1890     if (created)
1891       on_new_ssrc (sess, source);
1892
1893     on_bye_ssrc (sess, source);
1894
1895     g_object_unref (source);
1896   }
1897   if (reconsider) {
1898     RTP_SESSION_UNLOCK (sess);
1899     /* notify app of reconsideration */
1900     if (sess->callbacks.reconsider)
1901       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
1902     RTP_SESSION_LOCK (sess);
1903   }
1904   g_free (reason);
1905 }
1906
1907 static void
1908 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1909     RTPArrivalStats * arrival)
1910 {
1911   GST_DEBUG ("received APP");
1912 }
1913
1914 /**
1915  * rtp_session_process_rtcp:
1916  * @sess: and #RTPSession
1917  * @buffer: an RTCP buffer
1918  * @current_time: the current system time
1919  *
1920  * Process an RTCP buffer in the session manager. This function takes ownership
1921  * of @buffer.
1922  *
1923  * Returns: a #GstFlowReturn.
1924  */
1925 GstFlowReturn
1926 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
1927     GstClockTime current_time)
1928 {
1929   GstRTCPPacket packet;
1930   gboolean more, is_bye = FALSE, do_sync = FALSE;
1931   RTPArrivalStats arrival;
1932   GstFlowReturn result = GST_FLOW_OK;
1933
1934   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1935   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1936
1937   if (!gst_rtcp_buffer_validate (buffer))
1938     goto invalid_packet;
1939
1940   GST_DEBUG ("received RTCP packet");
1941
1942   RTP_SESSION_LOCK (sess);
1943   /* update arrival stats */
1944   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1);
1945
1946   if (sess->sent_bye)
1947     goto ignore;
1948
1949   /* make writable, we might want to change the buffer */
1950   buffer = gst_buffer_make_metadata_writable (buffer);
1951
1952   /* start processing the compound packet */
1953   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1954   while (more) {
1955     GstRTCPType type;
1956
1957     type = gst_rtcp_packet_get_type (&packet);
1958
1959     /* when we are leaving the session, we should ignore all non-BYE messages */
1960     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1961       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1962       goto next;
1963     }
1964
1965     switch (type) {
1966       case GST_RTCP_TYPE_SR:
1967         rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
1968         break;
1969       case GST_RTCP_TYPE_RR:
1970         rtp_session_process_rr (sess, &packet, &arrival);
1971         break;
1972       case GST_RTCP_TYPE_SDES:
1973         rtp_session_process_sdes (sess, &packet, &arrival);
1974         break;
1975       case GST_RTCP_TYPE_BYE:
1976         is_bye = TRUE;
1977         /* don't try to attempt lip-sync anymore for streams with a BYE */
1978         do_sync = FALSE;
1979         rtp_session_process_bye (sess, &packet, &arrival);
1980         break;
1981       case GST_RTCP_TYPE_APP:
1982         rtp_session_process_app (sess, &packet, &arrival);
1983         break;
1984       default:
1985         GST_WARNING ("got unknown RTCP packet");
1986         break;
1987     }
1988   next:
1989     more = gst_rtcp_packet_move_to_next (&packet);
1990   }
1991
1992   /* if we are scheduling a BYE, we only want to count bye packets, else we
1993    * count everything */
1994   if (sess->source->received_bye) {
1995     if (is_bye) {
1996       sess->stats.bye_members++;
1997       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1998     }
1999   } else {
2000     /* keep track of average packet size */
2001     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2002   }
2003   RTP_SESSION_UNLOCK (sess);
2004
2005   /* notify caller of sr packets in the callback */
2006   if (do_sync && sess->callbacks.sync_rtcp)
2007     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
2008         sess->sync_rtcp_user_data);
2009   else
2010     gst_buffer_unref (buffer);
2011
2012   return result;
2013
2014   /* ERRORS */
2015 invalid_packet:
2016   {
2017     GST_DEBUG ("invalid RTCP packet received");
2018     gst_buffer_unref (buffer);
2019     return GST_FLOW_OK;
2020   }
2021 ignore:
2022   {
2023     gst_buffer_unref (buffer);
2024     RTP_SESSION_UNLOCK (sess);
2025     GST_DEBUG ("ignoring RTP packet because we left");
2026     return GST_FLOW_OK;
2027   }
2028 }
2029
2030 /**
2031  * rtp_session_send_rtp:
2032  * @sess: an #RTPSession
2033  * @data: pointer to either an RTP buffer or a list of RTP buffers
2034  * @is_list: TRUE when @data is a buffer list
2035  * @current_time: the current system time
2036  * @running_time: the running time of @data
2037  *
2038  * Send the RTP buffer in the session manager. This function takes ownership of
2039  * @buffer.
2040  *
2041  * Returns: a #GstFlowReturn.
2042  */
2043 GstFlowReturn
2044 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2045     GstClockTime current_time, GstClockTime running_time)
2046 {
2047   GstFlowReturn result;
2048   RTPSource *source;
2049   gboolean prevsender;
2050   gboolean valid_packet;
2051   guint64 oldrate;
2052
2053   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2054   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2055
2056   if (is_list) {
2057     valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data));
2058   } else {
2059     valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data));
2060   }
2061
2062   if (!valid_packet)
2063     goto invalid_packet;
2064
2065   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2066
2067   RTP_SESSION_LOCK (sess);
2068   source = sess->source;
2069
2070   /* update last activity */
2071   source->last_rtp_activity = current_time;
2072
2073   prevsender = RTP_SOURCE_IS_SENDER (source);
2074   oldrate = source->bitrate;
2075
2076   /* we use our own source to send */
2077   result = rtp_source_send_rtp (source, data, is_list, running_time);
2078
2079   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
2080     sess->stats.sender_sources++;
2081   if (oldrate != source->bitrate)
2082     sess->recalc_bandwidth = TRUE;
2083   RTP_SESSION_UNLOCK (sess);
2084
2085   return result;
2086
2087   /* ERRORS */
2088 invalid_packet:
2089   {
2090     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2091     GST_DEBUG ("invalid RTP packet received");
2092     return GST_FLOW_OK;
2093   }
2094 }
2095
2096 static void
2097 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2098 {
2099   *bandwidth += source->bitrate;
2100 }
2101
2102 static GstClockTime
2103 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2104     gboolean first)
2105 {
2106   GstClockTime result;
2107
2108   /* recalculate bandwidth when it changed */
2109   if (sess->recalc_bandwidth) {
2110     gdouble bandwidth;
2111
2112     if (sess->bandwidth > 0)
2113       bandwidth = sess->bandwidth;
2114     else {
2115       /* If it is <= 0, then try to estimate the actual bandwidth */
2116       bandwidth = sess->source->bitrate;
2117
2118       g_hash_table_foreach (sess->cnames, (GHFunc) add_bitrates, &bandwidth);
2119       bandwidth /= 8.0;
2120     }
2121     if (bandwidth == 0)
2122       bandwidth = RTP_STATS_BANDWIDTH;
2123
2124     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2125         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2126
2127     sess->recalc_bandwidth = FALSE;
2128   }
2129
2130   if (sess->source->received_bye) {
2131     result = rtp_stats_calculate_bye_interval (&sess->stats);
2132   } else {
2133     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
2134         RTP_SOURCE_IS_SENDER (sess->source), first);
2135   }
2136
2137   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2138       GST_TIME_ARGS (result), first);
2139
2140   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2141     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
2142
2143   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2144
2145   return result;
2146 }
2147
2148 /* Stop the current @sess and schedule a BYE message for the other members.
2149  * One must have the session lock to call this function
2150  */
2151 static GstFlowReturn
2152 rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason,
2153     GstClockTime current_time)
2154 {
2155   GstFlowReturn result = GST_FLOW_OK;
2156   RTPSource *source;
2157   GstClockTime interval;
2158
2159   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2160
2161   source = sess->source;
2162
2163   /* ignore more BYEs */
2164   if (source->received_bye)
2165     goto done;
2166
2167   /* we have BYE now */
2168   source->received_bye = TRUE;
2169   /* at least one member wants to send a BYE */
2170   g_free (sess->bye_reason);
2171   sess->bye_reason = g_strdup (reason);
2172   /* The avg packet size is kept scaled by 16 */
2173   sess->stats.avg_rtcp_packet_size = 100 * 16;
2174   sess->stats.bye_members = 1;
2175   sess->first_rtcp = TRUE;
2176   sess->sent_bye = FALSE;
2177
2178   /* reschedule transmission */
2179   sess->last_rtcp_send_time = current_time;
2180   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2181   sess->next_rtcp_check_time = current_time + interval;
2182
2183   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2184       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2185
2186   RTP_SESSION_UNLOCK (sess);
2187   /* notify app of reconsideration */
2188   if (sess->callbacks.reconsider)
2189     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2190   RTP_SESSION_LOCK (sess);
2191 done:
2192
2193   return result;
2194 }
2195
2196 /**
2197  * rtp_session_schedule_bye:
2198  * @sess: an #RTPSession
2199  * @reason: a reason or NULL
2200  * @current_time: the current system time
2201  *
2202  * Stop the current @sess and schedule a BYE message for the other members.
2203  *
2204  * Returns: a #GstFlowReturn.
2205  */
2206 GstFlowReturn
2207 rtp_session_schedule_bye (RTPSession * sess, const gchar * reason,
2208     GstClockTime current_time)
2209 {
2210   GstFlowReturn result = GST_FLOW_OK;
2211
2212   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2213
2214   RTP_SESSION_LOCK (sess);
2215   result = rtp_session_schedule_bye_locked (sess, reason, current_time);
2216   RTP_SESSION_UNLOCK (sess);
2217
2218   return result;
2219 }
2220
2221 /**
2222  * rtp_session_next_timeout:
2223  * @sess: an #RTPSession
2224  * @current_time: the current system time
2225  *
2226  * Get the next time we should perform session maintenance tasks.
2227  *
2228  * Returns: a time when rtp_session_on_timeout() should be called with the
2229  * current system time.
2230  */
2231 GstClockTime
2232 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2233 {
2234   GstClockTime result, interval = 0;
2235
2236   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2237
2238   RTP_SESSION_LOCK (sess);
2239
2240   result = sess->next_rtcp_check_time;
2241
2242   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
2243       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2244
2245   if (result < current_time) {
2246     GST_DEBUG ("take current time as base");
2247     /* our previous check time expired, start counting from the current time
2248      * again. */
2249     result = current_time;
2250   }
2251
2252   if (sess->source->received_bye) {
2253     if (sess->sent_bye) {
2254       GST_DEBUG ("we sent BYE already");
2255       interval = GST_CLOCK_TIME_NONE;
2256     } else if (sess->stats.active_sources >= 50) {
2257       GST_DEBUG ("reconsider BYE, more than 50 sources");
2258       /* reconsider BYE if members >= 50 */
2259       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2260     }
2261   } else {
2262     if (sess->first_rtcp) {
2263       GST_DEBUG ("first RTCP packet");
2264       /* we are called for the first time */
2265       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2266     } else if (sess->next_rtcp_check_time < current_time) {
2267       GST_DEBUG ("old check time expired, getting new timeout");
2268       /* get a new timeout when we need to */
2269       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2270     }
2271   }
2272
2273   if (interval != GST_CLOCK_TIME_NONE)
2274     result += interval;
2275   else
2276     result = GST_CLOCK_TIME_NONE;
2277
2278   sess->next_rtcp_check_time = result;
2279
2280   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2281   RTP_SESSION_UNLOCK (sess);
2282
2283   return result;
2284 }
2285
2286 typedef struct
2287 {
2288   RTPSession *sess;
2289   GstBuffer *rtcp;
2290   GstClockTime current_time;
2291   guint64 ntpnstime;
2292   GstClockTime running_time;
2293   GstClockTime interval;
2294   GstRTCPPacket packet;
2295   gboolean is_bye;
2296   gboolean has_sdes;
2297 } ReportData;
2298
2299 static void
2300 session_start_rtcp (RTPSession * sess, ReportData * data)
2301 {
2302   GstRTCPPacket *packet = &data->packet;
2303   RTPSource *own = sess->source;
2304
2305   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2306
2307   if (RTP_SOURCE_IS_SENDER (own)) {
2308     guint64 ntptime;
2309     guint32 rtptime;
2310     guint32 packet_count, octet_count;
2311
2312     /* we are a sender, create SR */
2313     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2314     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
2315
2316     /* get latest stats */
2317     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
2318         &ntptime, &rtptime, &packet_count, &octet_count);
2319     /* store stats */
2320     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2321         packet_count, octet_count);
2322
2323     /* fill in sender report info */
2324     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2325         ntptime, rtptime, packet_count, octet_count);
2326   } else {
2327     /* we are only receiver, create RR */
2328     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2329     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
2330     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2331   }
2332 }
2333
2334 /* construct a Sender or Receiver Report */
2335 static void
2336 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2337 {
2338   RTPSession *sess = data->sess;
2339   GstRTCPPacket *packet = &data->packet;
2340
2341   /* create a new buffer if needed */
2342   if (data->rtcp == NULL) {
2343     session_start_rtcp (sess, data);
2344   }
2345   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
2346     /* only report about other sender sources */
2347     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
2348       guint8 fractionlost;
2349       gint32 packetslost;
2350       guint32 exthighestseq, jitter;
2351       guint32 lsr, dlsr;
2352
2353       /* get new stats */
2354       rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2355           &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2356
2357       /* packet is not yet filled, add report block for this source. */
2358       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2359           exthighestseq, jitter, lsr, dlsr);
2360     }
2361   }
2362 }
2363
2364 /* perform cleanup of sources that timed out */
2365 static void
2366 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2367 {
2368   gboolean remove = FALSE;
2369   gboolean byetimeout = FALSE;
2370   gboolean sendertimeout = FALSE;
2371   gboolean is_sender, is_active;
2372   RTPSession *sess = data->sess;
2373   GstClockTime interval;
2374
2375   is_sender = RTP_SOURCE_IS_SENDER (source);
2376   is_active = RTP_SOURCE_IS_ACTIVE (source);
2377
2378   /* check for our own source, we don't want to delete our own source. */
2379   if (!(source == sess->source)) {
2380     if (source->received_bye) {
2381       /* if we received a BYE from the source, remove the source after some
2382        * time. */
2383       if (data->current_time > source->bye_time &&
2384           data->current_time - source->bye_time > sess->stats.bye_timeout) {
2385         GST_DEBUG ("removing BYE source %08x", source->ssrc);
2386         remove = TRUE;
2387         byetimeout = TRUE;
2388       }
2389     }
2390     /* sources that were inactive for more than 5 times the deterministic reporting
2391      * interval get timed out. the min timeout is 5 seconds. */
2392     if (data->current_time > source->last_activity) {
2393       interval = MAX (data->interval * 5, 5 * GST_SECOND);
2394       if (data->current_time - source->last_activity > interval) {
2395         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2396             source->ssrc, GST_TIME_ARGS (source->last_activity));
2397         remove = TRUE;
2398       }
2399     }
2400   }
2401
2402   /* senders that did not send for a long time become a receiver, this also
2403    * holds for our own source. */
2404   if (is_sender) {
2405     if (data->current_time > source->last_rtp_activity) {
2406       interval = MAX (data->interval * 2, 5 * GST_SECOND);
2407       if (data->current_time - source->last_rtp_activity > interval) {
2408         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
2409             GST_TIME_FORMAT, source->ssrc,
2410             GST_TIME_ARGS (source->last_rtp_activity));
2411         source->is_sender = FALSE;
2412         sess->stats.sender_sources--;
2413         sendertimeout = TRUE;
2414       }
2415     }
2416   }
2417
2418   if (remove) {
2419     sess->total_sources--;
2420     if (is_sender)
2421       sess->stats.sender_sources--;
2422     if (is_active)
2423       sess->stats.active_sources--;
2424
2425     if (byetimeout)
2426       on_bye_timeout (sess, source);
2427     else
2428       on_timeout (sess, source);
2429   } else {
2430     if (sendertimeout)
2431       on_sender_timeout (sess, source);
2432   }
2433
2434   source->closing = remove;
2435 }
2436
2437 static void
2438 session_sdes (RTPSession * sess, ReportData * data)
2439 {
2440   GstRTCPPacket *packet = &data->packet;
2441   const GstStructure *sdes;
2442   gint i, n_fields;
2443
2444   /* add SDES packet */
2445   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
2446
2447   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
2448
2449   sdes = rtp_source_get_sdes_struct (sess->source);
2450
2451   /* add all fields in the structure, the order is not important. */
2452   n_fields = gst_structure_n_fields (sdes);
2453   for (i = 0; i < n_fields; ++i) {
2454     const gchar *field;
2455     const gchar *value;
2456     GstRTCPSDESType type;
2457
2458     field = gst_structure_nth_field_name (sdes, i);
2459     if (field == NULL)
2460       continue;
2461     value = gst_structure_get_string (sdes, field);
2462     if (value == NULL)
2463       continue;
2464     type = gst_rtcp_sdes_name_to_type (field);
2465
2466     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
2467       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
2468           (const guint8 *) value);
2469     } else if (type == GST_RTCP_SDES_PRIV) {
2470       gsize prefix_len;
2471       gsize value_len;
2472       gsize data_len;
2473       guint8 data[256];
2474
2475       /* don't accept entries that are too big */
2476       prefix_len = strlen (field);
2477       if (prefix_len > 255)
2478         continue;
2479       value_len = strlen (value);
2480       if (value_len > 255)
2481         continue;
2482       data_len = 1 + prefix_len + value_len;
2483       if (data_len > 255)
2484         continue;
2485
2486       data[0] = prefix_len;
2487       memcpy (&data[1], field, prefix_len);
2488       memcpy (&data[1 + prefix_len], value, value_len);
2489
2490       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
2491     }
2492   }
2493
2494   data->has_sdes = TRUE;
2495 }
2496
2497 /* schedule a BYE packet */
2498 static void
2499 session_bye (RTPSession * sess, ReportData * data)
2500 {
2501   GstRTCPPacket *packet = &data->packet;
2502
2503   /* open packet */
2504   session_start_rtcp (sess, data);
2505
2506   /* add SDES */
2507   session_sdes (sess, data);
2508
2509   /* add a BYE packet */
2510   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
2511   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
2512   if (sess->bye_reason)
2513     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
2514
2515   /* we have a BYE packet now */
2516   data->is_bye = TRUE;
2517 }
2518
2519 static gboolean
2520 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
2521 {
2522   GstClockTime new_send_time, elapsed;
2523   gboolean result;
2524
2525   /* no need to check yet */
2526   if (sess->next_rtcp_check_time > current_time) {
2527     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
2528         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
2529         GST_TIME_ARGS (current_time));
2530     return FALSE;
2531   }
2532
2533   /* get elapsed time since we last reported */
2534   elapsed = current_time - sess->last_rtcp_send_time;
2535
2536   /* perform forward reconsideration */
2537   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
2538
2539   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
2540       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
2541
2542   new_send_time += sess->last_rtcp_send_time;
2543
2544   /* check if reconsideration */
2545   if (current_time < new_send_time) {
2546     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
2547         GST_TIME_ARGS (new_send_time));
2548     result = FALSE;
2549     /* store new check time */
2550     sess->next_rtcp_check_time = new_send_time;
2551   } else {
2552     result = TRUE;
2553     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
2554
2555     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
2556         GST_TIME_ARGS (new_send_time));
2557     sess->next_rtcp_check_time = current_time + new_send_time;
2558   }
2559   return result;
2560 }
2561
2562 static void
2563 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
2564 {
2565   g_hash_table_insert (hash_table, key, g_object_ref (source));
2566 }
2567
2568 static gboolean
2569 remove_closing_sources (const gchar * key, RTPSource * source, gpointer * data)
2570 {
2571   return source->closing;
2572 }
2573
2574 /**
2575  * rtp_session_on_timeout:
2576  * @sess: an #RTPSession
2577  * @current_time: the current system time
2578  * @ntpnstime: the current NTP time in nanoseconds
2579  * @running_time: the current running_time of the pipeline
2580  *
2581  * Perform maintenance actions after the timeout obtained with
2582  * rtp_session_next_timeout() expired.
2583  *
2584  * This function will perform timeouts of receivers and senders, send a BYE
2585  * packet or generate RTCP packets with current session stats.
2586  *
2587  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
2588  * times, for each packet that should be processed.
2589  *
2590  * Returns: a #GstFlowReturn.
2591  */
2592 GstFlowReturn
2593 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
2594     guint64 ntpnstime, GstClockTime running_time)
2595 {
2596   GstFlowReturn result = GST_FLOW_OK;
2597   ReportData data;
2598   RTPSource *own;
2599   GHashTable *table_copy;
2600   gboolean notify = FALSE;
2601
2602   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2603
2604   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
2605       GST_TIME_ARGS (current_time), GST_TIME_ARGS (ntpnstime));
2606
2607   data.sess = sess;
2608   data.rtcp = NULL;
2609   data.current_time = current_time;
2610   data.ntpnstime = ntpnstime;
2611   data.is_bye = FALSE;
2612   data.has_sdes = FALSE;
2613   data.running_time = running_time;
2614
2615   own = sess->source;
2616
2617   RTP_SESSION_LOCK (sess);
2618   /* get a new interval, we need this for various cleanups etc */
2619   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
2620
2621   /* Make a local copy of the hashtable. We need to do this because the
2622    * cleanup stage below releases the session lock. */
2623   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
2624       (GDestroyNotify) g_object_unref);
2625   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2626       (GHFunc) clone_ssrcs_hashtable, table_copy);
2627
2628   /* Clean up the session, mark the source for removing, this might release the
2629    * session lock. */
2630   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
2631   g_hash_table_destroy (table_copy);
2632
2633   /* Now remove the marked sources */
2634   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
2635       (GHRFunc) remove_closing_sources, NULL);
2636
2637   /* see if we need to generate SR or RR packets */
2638   if (is_rtcp_time (sess, current_time, &data)) {
2639     if (own->received_bye) {
2640       /* generate BYE instead */
2641       GST_DEBUG ("generating BYE message");
2642       session_bye (sess, &data);
2643       sess->sent_bye = TRUE;
2644     } else {
2645       /* loop over all known sources and do something */
2646       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2647           (GHFunc) session_report_blocks, &data);
2648     }
2649   }
2650
2651   if (data.rtcp) {
2652     /* we keep track of the last report time in order to timeout inactive
2653      * receivers or senders */
2654     sess->last_rtcp_send_time = data.current_time;
2655     sess->first_rtcp = FALSE;
2656
2657     /* add SDES for this source when not already added */
2658     if (!data.has_sdes)
2659       session_sdes (sess, &data);
2660   }
2661
2662   /* check for outdated collisions */
2663   GST_DEBUG ("Timing out collisions");
2664   rtp_source_timeout (sess->source, current_time,
2665       data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT);
2666
2667   if (sess->change_ssrc) {
2668     GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
2669     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
2670         GINT_TO_POINTER (own->ssrc));
2671
2672     own->ssrc = rtp_session_create_new_ssrc (sess);
2673     rtp_source_reset (own);
2674
2675     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
2676         GINT_TO_POINTER (own->ssrc), own);
2677
2678     g_free (sess->bye_reason);
2679     sess->bye_reason = NULL;
2680     sess->sent_bye = FALSE;
2681     sess->change_ssrc = FALSE;
2682     notify = TRUE;
2683     GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
2684   }
2685   RTP_SESSION_UNLOCK (sess);
2686
2687   if (notify)
2688     g_object_notify (G_OBJECT (sess), "internal-ssrc");
2689
2690   /* push out the RTCP packet */
2691   if (data.rtcp) {
2692     /* close the RTCP packet */
2693     gst_rtcp_buffer_end (data.rtcp);
2694
2695     GST_DEBUG ("sending packet");
2696     if (sess->callbacks.send_rtcp) {
2697       UPDATE_AVG (sess->stats.avg_rtcp_packet_size,
2698           GST_BUFFER_SIZE (data.rtcp));
2699       result = sess->callbacks.send_rtcp (sess, own, data.rtcp,
2700           sess->sent_bye, sess->send_rtcp_user_data);
2701     } else {
2702       GST_DEBUG ("freeing packet");
2703       gst_buffer_unref (data.rtcp);
2704     }
2705   }
2706
2707   return result;
2708 }