gst/rtpmanager/: Post a message when the SDES infor changes for a source.
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "gstrtpbin-marshal.h"
27
28 #include "rtpsession.h"
29
30 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
31 #define GST_CAT_DEFAULT rtp_session_debug
32
33 /* signals and args */
34 enum
35 {
36   SIGNAL_ON_NEW_SSRC,
37   SIGNAL_ON_SSRC_COLLISION,
38   SIGNAL_ON_SSRC_VALIDATED,
39   SIGNAL_ON_SSRC_ACTIVE,
40   SIGNAL_ON_SSRC_SDES,
41   SIGNAL_ON_BYE_SSRC,
42   SIGNAL_ON_BYE_TIMEOUT,
43   SIGNAL_ON_TIMEOUT,
44   LAST_SIGNAL
45 };
46
47 #define DEFAULT_INTERNAL_SOURCE      NULL
48 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
49 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_BANDWIDTH
50 #define DEFAULT_SDES_CNAME           NULL
51 #define DEFAULT_SDES_NAME            NULL
52 #define DEFAULT_SDES_EMAIL           NULL
53 #define DEFAULT_SDES_PHONE           NULL
54 #define DEFAULT_SDES_LOCATION        NULL
55 #define DEFAULT_SDES_TOOL            NULL
56 #define DEFAULT_SDES_NOTE            NULL
57 #define DEFAULT_NUM_SOURCES          0
58 #define DEFAULT_NUM_ACTIVE_SOURCES   0
59
60 enum
61 {
62   PROP_0,
63   PROP_INTERNAL_SOURCE,
64   PROP_BANDWIDTH,
65   PROP_RTCP_FRACTION,
66   PROP_SDES_CNAME,
67   PROP_SDES_NAME,
68   PROP_SDES_EMAIL,
69   PROP_SDES_PHONE,
70   PROP_SDES_LOCATION,
71   PROP_SDES_TOOL,
72   PROP_SDES_NOTE,
73   PROP_NUM_SOURCES,
74   PROP_NUM_ACTIVE_SOURCES,
75   PROP_LAST
76 };
77
78 /* update average packet size, we keep this scaled by 16 to keep enough
79  * precision. */
80 #define UPDATE_AVG(avg, val)            \
81   if ((avg) == 0)                       \
82    (avg) = (val) << 4;                  \
83   else                                  \
84    (avg) = ((val) + (15 * (avg))) >> 4;
85
86 /* GObject vmethods */
87 static void rtp_session_finalize (GObject * object);
88 static void rtp_session_set_property (GObject * object, guint prop_id,
89     const GValue * value, GParamSpec * pspec);
90 static void rtp_session_get_property (GObject * object, guint prop_id,
91     GValue * value, GParamSpec * pspec);
92
93 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
94
95 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
96
97 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
98     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
99
100 static void
101 rtp_session_class_init (RTPSessionClass * klass)
102 {
103   GObjectClass *gobject_class;
104
105   gobject_class = (GObjectClass *) klass;
106
107   gobject_class->finalize = rtp_session_finalize;
108   gobject_class->set_property = rtp_session_set_property;
109   gobject_class->get_property = rtp_session_get_property;
110
111   /**
112    * RTPSession::on-new-ssrc:
113    * @session: the object which received the signal
114    * @src: the new RTPSource
115    *
116    * Notify of a new SSRC that entered @session.
117    */
118   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
119       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
120       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
121       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
122       RTP_TYPE_SOURCE);
123   /**
124    * RTPSession::on-ssrc-collision:
125    * @session: the object which received the signal
126    * @src: the #RTPSource that caused a collision
127    *
128    * Notify when we have an SSRC collision
129    */
130   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
131       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
132       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
133       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
134       RTP_TYPE_SOURCE);
135   /**
136    * RTPSession::on-ssrc-validated:
137    * @session: the object which received the signal
138    * @src: the new validated RTPSource
139    *
140    * Notify of a new SSRC that became validated.
141    */
142   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
143       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
144       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
145       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
146       RTP_TYPE_SOURCE);
147   /**
148    * RTPSession::on-ssrc-active:
149    * @session: the object which received the signal
150    * @src: the active RTPSource
151    *
152    * Notify of a SSRC that is active, i.e., sending RTCP.
153    */
154   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
155       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
156       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
157       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
158       RTP_TYPE_SOURCE);
159   /**
160    * RTPSession::on-ssrc-sdes:
161    * @session: the object which received the signal
162    * @src: the RTPSource
163    *
164    * Notify that a new SDES was received for SSRC.
165    */
166   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
167       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
168       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
169       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
170       RTP_TYPE_SOURCE);
171   /**
172    * RTPSession::on-bye-ssrc:
173    * @session: the object which received the signal
174    * @src: the RTPSource that went away
175    *
176    * Notify of an SSRC that became inactive because of a BYE packet.
177    */
178   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
179       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
180       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
181       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
182       RTP_TYPE_SOURCE);
183   /**
184    * RTPSession::on-bye-timeout:
185    * @session: the object which received the signal
186    * @src: the RTPSource that timed out
187    *
188    * Notify of an SSRC that has timed out because of BYE
189    */
190   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
191       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
192       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
193       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
194       RTP_TYPE_SOURCE);
195   /**
196    * RTPSession::on-timeout:
197    * @session: the object which received the signal
198    * @src: the RTPSource that timed out
199    *
200    * Notify of an SSRC that has timed out
201    */
202   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
203       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
204       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
205       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
206       RTP_TYPE_SOURCE);
207
208   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
209       g_param_spec_object ("internal-source", "Internal Source",
210           "The internal source element of the session",
211           RTP_TYPE_SOURCE, G_PARAM_READABLE));
212
213   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
214       g_param_spec_double ("bandwidth", "Bandwidth",
215           "The bandwidth of the session",
216           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH, G_PARAM_READWRITE));
217
218   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
219       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
220           "The fraction of the bandwidth used for RTCP",
221           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION, G_PARAM_READWRITE));
222
223   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
224       g_param_spec_string ("sdes-cname", "SDES CNAME",
225           "The CNAME to put in SDES messages of this session",
226           DEFAULT_SDES_CNAME, G_PARAM_READWRITE));
227
228   g_object_class_install_property (gobject_class, PROP_SDES_NAME,
229       g_param_spec_string ("sdes-name", "SDES NAME",
230           "The NAME to put in SDES messages of this session",
231           DEFAULT_SDES_NAME, G_PARAM_READWRITE));
232
233   g_object_class_install_property (gobject_class, PROP_SDES_EMAIL,
234       g_param_spec_string ("sdes-email", "SDES EMAIL",
235           "The EMAIL to put in SDES messages of this session",
236           DEFAULT_SDES_EMAIL, G_PARAM_READWRITE));
237
238   g_object_class_install_property (gobject_class, PROP_SDES_PHONE,
239       g_param_spec_string ("sdes-phone", "SDES PHONE",
240           "The PHONE to put in SDES messages of this session",
241           DEFAULT_SDES_PHONE, G_PARAM_READWRITE));
242
243   g_object_class_install_property (gobject_class, PROP_SDES_LOCATION,
244       g_param_spec_string ("sdes-location", "SDES LOCATION",
245           "The LOCATION to put in SDES messages of this session",
246           DEFAULT_SDES_LOCATION, G_PARAM_READWRITE));
247
248   g_object_class_install_property (gobject_class, PROP_SDES_TOOL,
249       g_param_spec_string ("sdes-tool", "SDES TOOL",
250           "The TOOL to put in SDES messages of this session",
251           DEFAULT_SDES_TOOL, G_PARAM_READWRITE));
252
253   g_object_class_install_property (gobject_class, PROP_SDES_NOTE,
254       g_param_spec_string ("sdes-note", "SDES NOTE",
255           "The NOTE to put in SDES messages of this session",
256           DEFAULT_SDES_NOTE, G_PARAM_READWRITE));
257
258   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
259       g_param_spec_uint ("num-sources", "Num Sources",
260           "The number of sources in the session", 0, G_MAXUINT,
261           DEFAULT_NUM_SOURCES, G_PARAM_READABLE));
262
263   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
264       g_param_spec_uint ("num-active-sources", "Num Active Sources",
265           "The number of active sources in the session", 0, G_MAXUINT,
266           DEFAULT_NUM_ACTIVE_SOURCES, G_PARAM_READABLE));
267
268   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
269 }
270
271 static void
272 rtp_session_init (RTPSession * sess)
273 {
274   gint i;
275   gchar *str;
276
277   sess->lock = g_mutex_new ();
278   sess->key = g_random_int ();
279   sess->mask_idx = 0;
280   sess->mask = 0;
281
282   for (i = 0; i < 32; i++) {
283     sess->ssrcs[i] =
284         g_hash_table_new_full (NULL, NULL, NULL,
285         (GDestroyNotify) g_object_unref);
286   }
287   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
288
289   rtp_stats_init_defaults (&sess->stats);
290
291   /* create an active SSRC for this session manager */
292   sess->source = rtp_session_create_source (sess);
293   sess->source->validated = TRUE;
294   sess->stats.active_sources++;
295
296   /* default UDP header length */
297   sess->header_len = 28;
298   sess->mtu = 1400;
299
300   /* some default SDES entries */
301   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
302   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
303   g_free (str);
304
305   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME,
306       g_get_real_name ());
307   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
308
309   sess->first_rtcp = TRUE;
310
311   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
312 }
313
314 static void
315 rtp_session_finalize (GObject * object)
316 {
317   RTPSession *sess;
318   gint i;
319
320   sess = RTP_SESSION_CAST (object);
321
322   g_mutex_free (sess->lock);
323   for (i = 0; i < 32; i++)
324     g_hash_table_destroy (sess->ssrcs[i]);
325
326   g_hash_table_destroy (sess->cnames);
327   g_object_unref (sess->source);
328
329   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
330 }
331
332 static void
333 rtp_session_set_property (GObject * object, guint prop_id,
334     const GValue * value, GParamSpec * pspec)
335 {
336   RTPSession *sess;
337
338   sess = RTP_SESSION (object);
339
340   switch (prop_id) {
341     case PROP_BANDWIDTH:
342       rtp_session_set_bandwidth (sess, g_value_get_double (value));
343       break;
344     case PROP_RTCP_FRACTION:
345       rtp_session_set_rtcp_fraction (sess, g_value_get_double (value));
346       break;
347     case PROP_SDES_CNAME:
348       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_CNAME,
349           g_value_get_string (value));
350       break;
351     case PROP_SDES_NAME:
352       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NAME,
353           g_value_get_string (value));
354       break;
355     case PROP_SDES_EMAIL:
356       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_EMAIL,
357           g_value_get_string (value));
358       break;
359     case PROP_SDES_PHONE:
360       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_PHONE,
361           g_value_get_string (value));
362       break;
363     case PROP_SDES_LOCATION:
364       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_LOC,
365           g_value_get_string (value));
366       break;
367     case PROP_SDES_TOOL:
368       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_TOOL,
369           g_value_get_string (value));
370       break;
371     case PROP_SDES_NOTE:
372       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NOTE,
373           g_value_get_string (value));
374       break;
375     default:
376       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
377       break;
378   }
379 }
380
381 static void
382 rtp_session_get_property (GObject * object, guint prop_id,
383     GValue * value, GParamSpec * pspec)
384 {
385   RTPSession *sess;
386
387   sess = RTP_SESSION (object);
388
389   switch (prop_id) {
390     case PROP_INTERNAL_SOURCE:
391       g_value_take_object (value, rtp_session_get_internal_source (sess));
392       break;
393     case PROP_BANDWIDTH:
394       g_value_set_double (value, rtp_session_get_bandwidth (sess));
395       break;
396     case PROP_RTCP_FRACTION:
397       g_value_set_double (value, rtp_session_get_rtcp_fraction (sess));
398       break;
399     case PROP_SDES_CNAME:
400       g_value_take_string (value, rtp_session_get_sdes_string (sess,
401               GST_RTCP_SDES_CNAME));
402       break;
403     case PROP_SDES_NAME:
404       g_value_take_string (value, rtp_session_get_sdes_string (sess,
405               GST_RTCP_SDES_NAME));
406       break;
407     case PROP_SDES_EMAIL:
408       g_value_take_string (value, rtp_session_get_sdes_string (sess,
409               GST_RTCP_SDES_EMAIL));
410       break;
411     case PROP_SDES_PHONE:
412       g_value_take_string (value, rtp_session_get_sdes_string (sess,
413               GST_RTCP_SDES_PHONE));
414       break;
415     case PROP_SDES_LOCATION:
416       g_value_take_string (value, rtp_session_get_sdes_string (sess,
417               GST_RTCP_SDES_LOC));
418       break;
419     case PROP_SDES_TOOL:
420       g_value_take_string (value, rtp_session_get_sdes_string (sess,
421               GST_RTCP_SDES_TOOL));
422       break;
423     case PROP_SDES_NOTE:
424       g_value_take_string (value, rtp_session_get_sdes_string (sess,
425               GST_RTCP_SDES_NOTE));
426       break;
427     case PROP_NUM_SOURCES:
428       g_value_set_uint (value, rtp_session_get_num_sources (sess));
429       break;
430     case PROP_NUM_ACTIVE_SOURCES:
431       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
432       break;
433     default:
434       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
435       break;
436   }
437 }
438
439 static void
440 on_new_ssrc (RTPSession * sess, RTPSource * source)
441 {
442   RTP_SESSION_UNLOCK (sess);
443   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
444   RTP_SESSION_LOCK (sess);
445 }
446
447 static void
448 on_ssrc_collision (RTPSession * sess, RTPSource * source)
449 {
450   RTP_SESSION_UNLOCK (sess);
451   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
452       source);
453   RTP_SESSION_LOCK (sess);
454 }
455
456 static void
457 on_ssrc_validated (RTPSession * sess, RTPSource * source)
458 {
459   RTP_SESSION_UNLOCK (sess);
460   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
461       source);
462   RTP_SESSION_LOCK (sess);
463 }
464
465 static void
466 on_ssrc_active (RTPSession * sess, RTPSource * source)
467 {
468   RTP_SESSION_UNLOCK (sess);
469   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
470   RTP_SESSION_LOCK (sess);
471 }
472
473 static void
474 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
475 {
476   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
477   RTP_SESSION_UNLOCK (sess);
478   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
479   RTP_SESSION_LOCK (sess);
480 }
481
482 static void
483 on_bye_ssrc (RTPSession * sess, RTPSource * source)
484 {
485   RTP_SESSION_UNLOCK (sess);
486   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
487   RTP_SESSION_LOCK (sess);
488 }
489
490 static void
491 on_bye_timeout (RTPSession * sess, RTPSource * source)
492 {
493   RTP_SESSION_UNLOCK (sess);
494   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
495   RTP_SESSION_LOCK (sess);
496 }
497
498 static void
499 on_timeout (RTPSession * sess, RTPSource * source)
500 {
501   RTP_SESSION_UNLOCK (sess);
502   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
503   RTP_SESSION_LOCK (sess);
504 }
505
506 /**
507  * rtp_session_new:
508  *
509  * Create a new session object.
510  *
511  * Returns: a new #RTPSession. g_object_unref() after usage.
512  */
513 RTPSession *
514 rtp_session_new (void)
515 {
516   RTPSession *sess;
517
518   sess = g_object_new (RTP_TYPE_SESSION, NULL);
519
520   return sess;
521 }
522
523 /**
524  * rtp_session_set_callbacks:
525  * @sess: an #RTPSession
526  * @callbacks: callbacks to configure
527  * @user_data: user data passed in the callbacks
528  *
529  * Configure a set of callbacks to be notified of actions.
530  */
531 void
532 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
533     gpointer user_data)
534 {
535   g_return_if_fail (RTP_IS_SESSION (sess));
536
537   sess->callbacks.process_rtp = callbacks->process_rtp;
538   sess->callbacks.send_rtp = callbacks->send_rtp;
539   sess->callbacks.send_rtcp = callbacks->send_rtcp;
540   sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
541   sess->callbacks.clock_rate = callbacks->clock_rate;
542   sess->callbacks.reconsider = callbacks->reconsider;
543   sess->user_data = user_data;
544 }
545
546 /**
547  * rtp_session_set_bandwidth:
548  * @sess: an #RTPSession
549  * @bandwidth: the bandwidth allocated
550  *
551  * Set the session bandwidth in bytes per second.
552  */
553 void
554 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
555 {
556   g_return_if_fail (RTP_IS_SESSION (sess));
557
558   RTP_SESSION_LOCK (sess);
559   sess->stats.bandwidth = bandwidth;
560   RTP_SESSION_UNLOCK (sess);
561 }
562
563 /**
564  * rtp_session_get_bandwidth:
565  * @sess: an #RTPSession
566  *
567  * Get the session bandwidth.
568  *
569  * Returns: the session bandwidth.
570  */
571 gdouble
572 rtp_session_get_bandwidth (RTPSession * sess)
573 {
574   gdouble result;
575
576   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
577
578   RTP_SESSION_LOCK (sess);
579   result = sess->stats.bandwidth;
580   RTP_SESSION_UNLOCK (sess);
581
582   return result;
583 }
584
585 /**
586  * rtp_session_set_rtcp_fraction:
587  * @sess: an #RTPSession
588  * @bandwidth: the RTCP bandwidth
589  *
590  * Set the bandwidth that should be used for RTCP
591  * messages.
592  */
593 void
594 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
595 {
596   g_return_if_fail (RTP_IS_SESSION (sess));
597
598   RTP_SESSION_LOCK (sess);
599   sess->stats.rtcp_bandwidth = bandwidth;
600   RTP_SESSION_UNLOCK (sess);
601 }
602
603 /**
604  * rtp_session_get_rtcp_fraction:
605  * @sess: an #RTPSession
606  *
607  * Get the session bandwidth used for RTCP.
608  *
609  * Returns: The bandwidth used for RTCP messages.
610  */
611 gdouble
612 rtp_session_get_rtcp_fraction (RTPSession * sess)
613 {
614   gdouble result;
615
616   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
617
618   RTP_SESSION_LOCK (sess);
619   result = sess->stats.rtcp_bandwidth;
620   RTP_SESSION_UNLOCK (sess);
621
622   return result;
623 }
624
625 /**
626  * rtp_session_set_sdes_string:
627  * @sess: an #RTPSession
628  * @type: the type of the SDES item
629  * @item: a null-terminated string to set. 
630  *
631  * Store an SDES item of @type in @sess. 
632  *
633  * Returns: %FALSE if the data was unchanged @type is invalid.
634  */
635 gboolean
636 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
637     const gchar * item)
638 {
639   gboolean result;
640
641   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
642
643   RTP_SESSION_LOCK (sess);
644   result = rtp_source_set_sdes_string (sess->source, type, item);
645   RTP_SESSION_UNLOCK (sess);
646
647   return result;
648 }
649
650 /**
651  * rtp_session_get_sdes_string:
652  * @sess: an #RTPSession
653  * @type: the type of the SDES item
654  *
655  * Get the SDES item of @type from @sess. 
656  *
657  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
658  * valid. g_free() after usage.
659  */
660 gchar *
661 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
662 {
663   gchar *result;
664
665   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
666
667   RTP_SESSION_LOCK (sess);
668   result = rtp_source_get_sdes_string (sess->source, type);
669   RTP_SESSION_UNLOCK (sess);
670
671   return result;
672 }
673
674 static GstFlowReturn
675 source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
676 {
677   GstFlowReturn result = GST_FLOW_OK;
678
679   if (source == session->source) {
680     GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
681
682     RTP_SESSION_UNLOCK (session);
683
684     if (session->callbacks.send_rtp)
685       result =
686           session->callbacks.send_rtp (session, source, buffer,
687           session->user_data);
688     else
689       gst_buffer_unref (buffer);
690
691   } else {
692     GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
693     RTP_SESSION_UNLOCK (session);
694
695     if (session->callbacks.process_rtp)
696       result =
697           session->callbacks.process_rtp (session, source, buffer,
698           session->user_data);
699     else
700       gst_buffer_unref (buffer);
701   }
702   RTP_SESSION_LOCK (session);
703
704   return result;
705 }
706
707 static gint
708 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
709 {
710   gint result;
711
712   if (session->callbacks.clock_rate)
713     result = session->callbacks.clock_rate (session, pt, session->user_data);
714   else
715     result = -1;
716
717   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
718
719   return result;
720 }
721
722 static RTPSourceCallbacks callbacks = {
723   (RTPSourcePushRTP) source_push_rtp,
724   (RTPSourceClockRate) source_clock_rate,
725 };
726
727 static gboolean
728 check_collision (RTPSession * sess, RTPSource * source,
729     RTPArrivalStats * arrival)
730 {
731   /* FIXME, do collision check */
732   return FALSE;
733 }
734
735 /* must be called with the session lock */
736 static RTPSource *
737 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
738     RTPArrivalStats * arrival, gboolean rtp)
739 {
740   RTPSource *source;
741
742   source =
743       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
744   if (source == NULL) {
745     /* make new Source in probation and insert */
746     source = rtp_source_new (ssrc);
747
748     /* for RTP packets we need to set the source in probation. Receiving RTCP
749      * packets of an SSRC, on the other hand, is a strong indication that we
750      * are dealing with a valid source. */
751     if (rtp)
752       source->probation = RTP_DEFAULT_PROBATION;
753     else
754       source->probation = 0;
755
756     /* store from address, if any */
757     if (arrival->have_address) {
758       if (rtp)
759         rtp_source_set_rtp_from (source, &arrival->address);
760       else
761         rtp_source_set_rtcp_from (source, &arrival->address);
762     }
763
764     /* configure a callback on the source */
765     rtp_source_set_callbacks (source, &callbacks, sess);
766
767     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
768         source);
769
770     /* we have one more source now */
771     sess->total_sources++;
772     *created = TRUE;
773   } else {
774     *created = FALSE;
775     /* check for collision, this updates the address when not previously set */
776     if (check_collision (sess, source, arrival))
777       on_ssrc_collision (sess, source);
778   }
779   /* update last activity */
780   source->last_activity = arrival->time;
781   if (rtp)
782     source->last_rtp_activity = arrival->time;
783
784   return source;
785 }
786
787 /**
788  * rtp_session_get_internal_source:
789  * @sess: a #RTPSession
790  *
791  * Get the internal #RTPSource of @session.
792  *
793  * Returns: The internal #RTPSource. g_object_unref() after usage.
794  */
795 RTPSource *
796 rtp_session_get_internal_source (RTPSession * sess)
797 {
798   RTPSource *result;
799
800   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
801
802   result = g_object_ref (sess->source);
803
804   return result;
805 }
806
807 /**
808  * rtp_session_add_source:
809  * @sess: a #RTPSession
810  * @src: #RTPSource to add
811  *
812  * Add @src to @session.
813  *
814  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
815  * existed in the session.
816  */
817 gboolean
818 rtp_session_add_source (RTPSession * sess, RTPSource * src)
819 {
820   gboolean result = FALSE;
821   RTPSource *find;
822
823   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
824   g_return_val_if_fail (src != NULL, FALSE);
825
826   RTP_SESSION_LOCK (sess);
827   find =
828       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
829       GINT_TO_POINTER (src->ssrc));
830   if (find == NULL) {
831     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
832         GINT_TO_POINTER (src->ssrc), src);
833     /* we have one more source now */
834     sess->total_sources++;
835     result = TRUE;
836   }
837   RTP_SESSION_UNLOCK (sess);
838
839   return result;
840 }
841
842 /**
843  * rtp_session_get_num_sources:
844  * @sess: an #RTPSession
845  *
846  * Get the number of sources in @sess.
847  *
848  * Returns: The number of sources in @sess.
849  */
850 guint
851 rtp_session_get_num_sources (RTPSession * sess)
852 {
853   guint result;
854
855   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
856
857   RTP_SESSION_LOCK (sess);
858   result = sess->total_sources;
859   RTP_SESSION_UNLOCK (sess);
860
861   return result;
862 }
863
864 /**
865  * rtp_session_get_num_active_sources:
866  * @sess: an #RTPSession
867  *
868  * Get the number of active sources in @sess. A source is considered active when
869  * it has been validated and has not yet received a BYE RTCP message.
870  *
871  * Returns: The number of active sources in @sess.
872  */
873 guint
874 rtp_session_get_num_active_sources (RTPSession * sess)
875 {
876   guint result;
877
878   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
879
880   RTP_SESSION_LOCK (sess);
881   result = sess->stats.active_sources;
882   RTP_SESSION_UNLOCK (sess);
883
884   return result;
885 }
886
887 /**
888  * rtp_session_get_source_by_ssrc:
889  * @sess: an #RTPSession
890  * @ssrc: an SSRC
891  *
892  * Find the source with @ssrc in @sess.
893  *
894  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
895  * g_object_unref() after usage.
896  */
897 RTPSource *
898 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
899 {
900   RTPSource *result;
901
902   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
903
904   RTP_SESSION_LOCK (sess);
905   result =
906       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
907   if (result)
908     g_object_ref (result);
909   RTP_SESSION_UNLOCK (sess);
910
911   return result;
912 }
913
914 /**
915  * rtp_session_get_source_by_cname:
916  * @sess: a #RTPSession
917  * @cname: an CNAME
918  *
919  * Find the source with @cname in @sess.
920  *
921  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
922  * g_object_unref() after usage.
923  */
924 RTPSource *
925 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
926 {
927   RTPSource *result;
928
929   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
930   g_return_val_if_fail (cname != NULL, NULL);
931
932   RTP_SESSION_LOCK (sess);
933   result = g_hash_table_lookup (sess->cnames, cname);
934   if (result)
935     g_object_ref (result);
936   RTP_SESSION_UNLOCK (sess);
937
938   return result;
939 }
940
941 /**
942  * rtp_session_create_source:
943  * @sess: an #RTPSession
944  *
945  * Create an #RTPSource for use in @sess. This function will create a source
946  * with an ssrc that is currently not used by any participants in the session.
947  *
948  * Returns: an #RTPSource.
949  */
950 RTPSource *
951 rtp_session_create_source (RTPSession * sess)
952 {
953   guint32 ssrc;
954   RTPSource *source;
955
956   RTP_SESSION_LOCK (sess);
957   while (TRUE) {
958     ssrc = g_random_int ();
959
960     /* see if it exists in the session, we're done if it doesn't */
961     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
962             GINT_TO_POINTER (ssrc)) == NULL)
963       break;
964   }
965   source = rtp_source_new (ssrc);
966   g_object_ref (source);
967   rtp_source_set_callbacks (source, &callbacks, sess);
968   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
969       source);
970   /* we have one more source now */
971   sess->total_sources++;
972   RTP_SESSION_UNLOCK (sess);
973
974   return source;
975 }
976
977 /* update the RTPArrivalStats structure with the current time and other bits
978  * about the current buffer we are handling.
979  * This function is typically called when a validated packet is received.
980  * This function should be called with the SESSION_LOCK
981  */
982 static void
983 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
984     gboolean rtp, GstBuffer * buffer, guint64 ntpnstime)
985 {
986   GTimeVal current;
987
988   /* get time of arrival */
989   g_get_current_time (&current);
990   arrival->time = GST_TIMEVAL_TO_TIME (current);
991   arrival->ntpnstime = ntpnstime;
992
993   /* get packet size including header overhead */
994   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
995
996   if (rtp) {
997     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
998   } else {
999     arrival->payload_len = 0;
1000   }
1001
1002   /* for netbuffer we can store the IP address to check for collisions */
1003   arrival->have_address = GST_IS_NETBUFFER (buffer);
1004   if (arrival->have_address) {
1005     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1006
1007     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1008   }
1009 }
1010
1011 /**
1012  * rtp_session_process_rtp:
1013  * @sess: and #RTPSession
1014  * @buffer: an RTP buffer
1015  * @ntpnstime: the NTP arrival time in nanoseconds
1016  *
1017  * Process an RTP buffer in the session manager. This function takes ownership
1018  * of @buffer.
1019  *
1020  * Returns: a #GstFlowReturn.
1021  */
1022 GstFlowReturn
1023 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1024     guint64 ntpnstime)
1025 {
1026   GstFlowReturn result;
1027   guint32 ssrc;
1028   RTPSource *source;
1029   gboolean created;
1030   gboolean prevsender, prevactive;
1031   RTPArrivalStats arrival;
1032
1033   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1034   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1035
1036   if (!gst_rtp_buffer_validate (buffer))
1037     goto invalid_packet;
1038
1039   RTP_SESSION_LOCK (sess);
1040   /* update arrival stats */
1041   update_arrival_stats (sess, &arrival, TRUE, buffer, ntpnstime);
1042
1043   /* ignore more RTP packets when we left the session */
1044   if (sess->source->received_bye)
1045     goto ignore;
1046
1047   /* get SSRC and look up in session database */
1048   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1049   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1050
1051   prevsender = RTP_SOURCE_IS_SENDER (source);
1052   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1053
1054   /* we need to ref so that we can process the CSRCs later */
1055   gst_buffer_ref (buffer);
1056
1057   /* let source process the packet */
1058   result = rtp_source_process_rtp (source, buffer, &arrival);
1059
1060   /* source became active */
1061   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1062     sess->stats.active_sources++;
1063     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1064         sess->stats.active_sources);
1065     on_ssrc_validated (sess, source);
1066   }
1067   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1068     sess->stats.sender_sources++;
1069     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1070         sess->stats.sender_sources);
1071   }
1072
1073   if (created)
1074     on_new_ssrc (sess, source);
1075
1076   if (source->validated) {
1077     guint8 i, count;
1078     gboolean created;
1079
1080     /* for validated sources, we add the CSRCs as well */
1081     count = gst_rtp_buffer_get_csrc_count (buffer);
1082
1083     for (i = 0; i < count; i++) {
1084       guint32 csrc;
1085       RTPSource *csrc_src;
1086
1087       csrc = gst_rtp_buffer_get_csrc (buffer, i);
1088
1089       /* get source */
1090       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1091
1092       if (created) {
1093         GST_DEBUG ("created new CSRC: %08x", csrc);
1094         rtp_source_set_as_csrc (csrc_src);
1095         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1096           sess->stats.active_sources++;
1097         on_new_ssrc (sess, source);
1098       }
1099     }
1100   }
1101   gst_buffer_unref (buffer);
1102
1103   RTP_SESSION_UNLOCK (sess);
1104
1105   return result;
1106
1107   /* ERRORS */
1108 invalid_packet:
1109   {
1110     gst_buffer_unref (buffer);
1111     GST_DEBUG ("invalid RTP packet received");
1112     return GST_FLOW_OK;
1113   }
1114 ignore:
1115   {
1116     gst_buffer_unref (buffer);
1117     RTP_SESSION_UNLOCK (sess);
1118     GST_DEBUG ("ignoring RTP packet because we are leaving");
1119     return GST_FLOW_OK;
1120   }
1121 }
1122
1123 static void
1124 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1125     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1126 {
1127   guint count, i;
1128
1129   count = gst_rtcp_packet_get_rb_count (packet);
1130   for (i = 0; i < count; i++) {
1131     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1132     guint8 fractionlost;
1133     gint32 packetslost;
1134
1135     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1136         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1137
1138     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1139
1140     if (ssrc == sess->source->ssrc) {
1141       /* only deal with report blocks for our session, we update the stats of
1142        * the sender of the RTCP message. We could also compare our stats against
1143        * the other sender to see if we are better or worse. */
1144       rtp_source_process_rb (source, arrival->time, fractionlost, packetslost,
1145           exthighestseq, jitter, lsr, dlsr);
1146
1147       on_ssrc_active (sess, source);
1148     }
1149   }
1150 }
1151
1152 /* A Sender report contains statistics about how the sender is doing. This
1153  * includes timing informataion such as the relation between RTP and NTP
1154  * timestamps and the number of packets/bytes it sent to us.
1155  *
1156  * In this report is also included a set of report blocks related to how this
1157  * sender is receiving data (in case we (or somebody else) is also sending stuff
1158  * to it). This info includes the packet loss, jitter and seqnum. It also
1159  * contains information to calculate the round trip time (LSR/DLSR).
1160  */
1161 static void
1162 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1163     RTPArrivalStats * arrival)
1164 {
1165   guint32 senderssrc, rtptime, packet_count, octet_count;
1166   guint64 ntptime;
1167   RTPSource *source;
1168   gboolean created, prevsender;
1169
1170   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1171       &packet_count, &octet_count);
1172
1173   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1174       senderssrc, GST_TIME_ARGS (arrival->time));
1175
1176   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1177
1178   GST_BUFFER_OFFSET (packet->buffer) = source->clock_base;
1179
1180   prevsender = RTP_SOURCE_IS_SENDER (source);
1181
1182   /* first update the source */
1183   rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count,
1184       octet_count);
1185
1186   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1187     sess->stats.sender_sources++;
1188     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1189         sess->stats.sender_sources);
1190   }
1191
1192   if (created)
1193     on_new_ssrc (sess, source);
1194
1195   rtp_session_process_rb (sess, source, packet, arrival);
1196 }
1197
1198 /* A receiver report contains statistics about how a receiver is doing. It
1199  * includes stuff like packet loss, jitter and the seqnum it received last. It
1200  * also contains info to calculate the round trip time.
1201  *
1202  * We are only interested in how the sender of this report is doing wrt to us.
1203  */
1204 static void
1205 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1206     RTPArrivalStats * arrival)
1207 {
1208   guint32 senderssrc;
1209   RTPSource *source;
1210   gboolean created;
1211
1212   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1213
1214   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1215
1216   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1217
1218   if (created)
1219     on_new_ssrc (sess, source);
1220
1221   rtp_session_process_rb (sess, source, packet, arrival);
1222 }
1223
1224 /* Get SDES items and store them in the SSRC */
1225 static void
1226 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1227     RTPArrivalStats * arrival)
1228 {
1229   guint items, i, j;
1230   gboolean more_items, more_entries;
1231
1232   items = gst_rtcp_packet_sdes_get_item_count (packet);
1233   GST_DEBUG ("got SDES packet with %d items", items);
1234
1235   more_items = gst_rtcp_packet_sdes_first_item (packet);
1236   i = 0;
1237   while (more_items) {
1238     guint32 ssrc;
1239     gboolean changed, created;
1240     RTPSource *source;
1241
1242     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1243
1244     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1245
1246     /* find src, no probation when dealing with RTCP */
1247     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1248     changed = FALSE;
1249
1250     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1251     j = 0;
1252     while (more_entries) {
1253       GstRTCPSDESType type;
1254       guint8 len;
1255       guint8 *data;
1256
1257       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1258
1259       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1260           data);
1261
1262       changed |= rtp_source_set_sdes (source, type, data, len);
1263
1264       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1265       j++;
1266     }
1267
1268     if (created)
1269       on_new_ssrc (sess, source);
1270     if (changed)
1271       on_ssrc_sdes (sess, source);
1272
1273     more_items = gst_rtcp_packet_sdes_next_item (packet);
1274     i++;
1275   }
1276 }
1277
1278 /* BYE is sent when a client leaves the session
1279  */
1280 static void
1281 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1282     RTPArrivalStats * arrival)
1283 {
1284   guint count, i;
1285   gchar *reason;
1286
1287   reason = gst_rtcp_packet_bye_get_reason (packet);
1288   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1289
1290   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1291   for (i = 0; i < count; i++) {
1292     guint32 ssrc;
1293     RTPSource *source;
1294     gboolean created, prevactive, prevsender;
1295     guint pmembers, members;
1296
1297     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1298     GST_DEBUG ("SSRC: %08x", ssrc);
1299
1300     /* find src and mark bye, no probation when dealing with RTCP */
1301     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1302
1303     /* store time for when we need to time out this source */
1304     source->bye_time = arrival->time;
1305
1306     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1307     prevsender = RTP_SOURCE_IS_SENDER (source);
1308
1309     /* let the source handle the rest */
1310     rtp_source_process_bye (source, reason);
1311
1312     pmembers = sess->stats.active_sources;
1313
1314     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1315       sess->stats.active_sources--;
1316       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1317           sess->stats.active_sources);
1318     }
1319     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1320       sess->stats.sender_sources--;
1321       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1322           sess->stats.sender_sources);
1323     }
1324     members = sess->stats.active_sources;
1325
1326     if (!sess->source->received_bye && members < pmembers) {
1327       /* some members went away since the previous timeout estimate.
1328        * Perform reverse reconsideration but only when we are not scheduling a
1329        * BYE ourselves. */
1330       if (arrival->time < sess->next_rtcp_check_time) {
1331         GstClockTime time_remaining;
1332
1333         time_remaining = sess->next_rtcp_check_time - arrival->time;
1334         sess->next_rtcp_check_time =
1335             gst_util_uint64_scale (time_remaining, members, pmembers);
1336
1337         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1338             GST_TIME_ARGS (sess->next_rtcp_check_time));
1339
1340         sess->next_rtcp_check_time += arrival->time;
1341
1342         /* notify app of reconsideration */
1343         if (sess->callbacks.reconsider)
1344           sess->callbacks.reconsider (sess, sess->user_data);
1345       }
1346     }
1347
1348     if (created)
1349       on_new_ssrc (sess, source);
1350
1351     on_bye_ssrc (sess, source);
1352   }
1353   g_free (reason);
1354 }
1355
1356 static void
1357 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1358     RTPArrivalStats * arrival)
1359 {
1360   GST_DEBUG ("received APP");
1361 }
1362
1363 /**
1364  * rtp_session_process_rtcp:
1365  * @sess: and #RTPSession
1366  * @buffer: an RTCP buffer
1367  *
1368  * Process an RTCP buffer in the session manager. This function takes ownership
1369  * of @buffer.
1370  *
1371  * Returns: a #GstFlowReturn.
1372  */
1373 GstFlowReturn
1374 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
1375 {
1376   GstRTCPPacket packet;
1377   gboolean more, is_bye = FALSE, is_sr = FALSE;
1378   RTPArrivalStats arrival;
1379   GstFlowReturn result = GST_FLOW_OK;
1380
1381   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1382   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1383
1384   if (!gst_rtcp_buffer_validate (buffer))
1385     goto invalid_packet;
1386
1387   GST_DEBUG ("received RTCP packet");
1388
1389   RTP_SESSION_LOCK (sess);
1390   /* update arrival stats */
1391   update_arrival_stats (sess, &arrival, FALSE, buffer, -1);
1392
1393   if (sess->sent_bye)
1394     goto ignore;
1395
1396   /* start processing the compound packet */
1397   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1398   while (more) {
1399     GstRTCPType type;
1400
1401     type = gst_rtcp_packet_get_type (&packet);
1402
1403     /* when we are leaving the session, we should ignore all non-BYE messages */
1404     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1405       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1406       goto next;
1407     }
1408
1409     switch (type) {
1410       case GST_RTCP_TYPE_SR:
1411         rtp_session_process_sr (sess, &packet, &arrival);
1412         is_sr = TRUE;
1413         break;
1414       case GST_RTCP_TYPE_RR:
1415         rtp_session_process_rr (sess, &packet, &arrival);
1416         break;
1417       case GST_RTCP_TYPE_SDES:
1418         rtp_session_process_sdes (sess, &packet, &arrival);
1419         break;
1420       case GST_RTCP_TYPE_BYE:
1421         is_bye = TRUE;
1422         rtp_session_process_bye (sess, &packet, &arrival);
1423         break;
1424       case GST_RTCP_TYPE_APP:
1425         rtp_session_process_app (sess, &packet, &arrival);
1426         break;
1427       default:
1428         GST_WARNING ("got unknown RTCP packet");
1429         break;
1430     }
1431   next:
1432     more = gst_rtcp_packet_move_to_next (&packet);
1433   }
1434
1435   /* if we are scheduling a BYE, we only want to count bye packets, else we
1436    * count everything */
1437   if (sess->source->received_bye) {
1438     if (is_bye) {
1439       sess->stats.bye_members++;
1440       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1441     }
1442   } else {
1443     /* keep track of average packet size */
1444     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1445   }
1446   RTP_SESSION_UNLOCK (sess);
1447
1448   /* notify caller of sr packets in the callback */
1449   if (is_sr && sess->callbacks.sync_rtcp)
1450     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
1451         sess->user_data);
1452   else
1453     gst_buffer_unref (buffer);
1454
1455   return result;
1456
1457   /* ERRORS */
1458 invalid_packet:
1459   {
1460     GST_DEBUG ("invalid RTCP packet received");
1461     gst_buffer_unref (buffer);
1462     return GST_FLOW_OK;
1463   }
1464 ignore:
1465   {
1466     gst_buffer_unref (buffer);
1467     RTP_SESSION_UNLOCK (sess);
1468     GST_DEBUG ("ignoring RTP packet because we left");
1469     return GST_FLOW_OK;
1470   }
1471 }
1472
1473 /**
1474  * rtp_session_send_rtp:
1475  * @sess: an #RTPSession
1476  * @buffer: an RTP buffer
1477  * @ntpnstime: the NTP time in nanoseconds of when this buffer was captured.
1478  *
1479  * Send the RTP buffer in the session manager. This function takes ownership of
1480  * @buffer.
1481  *
1482  * Returns: a #GstFlowReturn.
1483  */
1484 GstFlowReturn
1485 rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntpnstime)
1486 {
1487   GstFlowReturn result;
1488   RTPSource *source;
1489   gboolean prevsender;
1490   GTimeVal current;
1491
1492   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1493   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1494
1495   if (!gst_rtp_buffer_validate (buffer))
1496     goto invalid_packet;
1497
1498   GST_DEBUG ("received RTP packet for sending");
1499
1500   RTP_SESSION_LOCK (sess);
1501   source = sess->source;
1502
1503   /* update last activity */
1504   g_get_current_time (&current);
1505   source->last_rtp_activity = GST_TIMEVAL_TO_TIME (current);
1506
1507   prevsender = RTP_SOURCE_IS_SENDER (source);
1508
1509   /* we use our own source to send */
1510   result = rtp_source_send_rtp (source, buffer, ntpnstime);
1511
1512   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
1513     sess->stats.sender_sources++;
1514   RTP_SESSION_UNLOCK (sess);
1515
1516   return result;
1517
1518   /* ERRORS */
1519 invalid_packet:
1520   {
1521     gst_buffer_unref (buffer);
1522     GST_DEBUG ("invalid RTP packet received");
1523     return GST_FLOW_OK;
1524   }
1525 }
1526
1527 static GstClockTime
1528 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
1529     gboolean first)
1530 {
1531   GstClockTime result;
1532
1533   if (sess->source->received_bye) {
1534     result = rtp_stats_calculate_bye_interval (&sess->stats);
1535   } else {
1536     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
1537         RTP_SOURCE_IS_SENDER (sess->source), first);
1538   }
1539
1540   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
1541       GST_TIME_ARGS (result), first);
1542
1543   if (!deterministic)
1544     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
1545
1546   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1547
1548   return result;
1549 }
1550
1551 /**
1552  * rtp_session_send_bye:
1553  * @sess: an #RTPSession
1554  * @reason: a reason or NULL
1555  *
1556  * Stop the current @sess and schedule a BYE message for the other members.
1557  *
1558  * Returns: a #GstFlowReturn.
1559  */
1560 GstFlowReturn
1561 rtp_session_send_bye (RTPSession * sess, const gchar * reason)
1562 {
1563   GstFlowReturn result = GST_FLOW_OK;
1564   RTPSource *source;
1565   GstClockTime current, interval;
1566   GTimeVal curtv;
1567
1568   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1569
1570   RTP_SESSION_LOCK (sess);
1571   source = sess->source;
1572
1573   /* ignore more BYEs */
1574   if (source->received_bye)
1575     goto done;
1576
1577   /* we have BYE now */
1578   source->received_bye = TRUE;
1579   /* at least one member wants to send a BYE */
1580   sess->bye_reason = g_strdup (reason);
1581   sess->stats.avg_rtcp_packet_size = 100;
1582   sess->stats.bye_members = 1;
1583   sess->first_rtcp = TRUE;
1584   sess->sent_bye = FALSE;
1585
1586   /* get current time */
1587   g_get_current_time (&curtv);
1588   current = GST_TIMEVAL_TO_TIME (curtv);
1589
1590   /* reschedule transmission */
1591   sess->last_rtcp_send_time = current;
1592   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
1593   sess->next_rtcp_check_time = current + interval;
1594
1595   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
1596       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
1597
1598   /* notify app of reconsideration */
1599   if (sess->callbacks.reconsider)
1600     sess->callbacks.reconsider (sess, sess->user_data);
1601 done:
1602   RTP_SESSION_UNLOCK (sess);
1603
1604   return result;
1605 }
1606
1607 /**
1608  * rtp_session_next_timeout:
1609  * @sess: an #RTPSession
1610  * @time: the current system time
1611  *
1612  * Get the next time we should perform session maintenance tasks.
1613  *
1614  * Returns: a time when rtp_session_on_timeout() should be called with the
1615  * current system time.
1616  */
1617 GstClockTime
1618 rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
1619 {
1620   GstClockTime result;
1621
1622   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1623
1624   RTP_SESSION_LOCK (sess);
1625
1626   result = sess->next_rtcp_check_time;
1627
1628   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
1629       GST_TIME_ARGS (time), GST_TIME_ARGS (result));
1630
1631   if (result < time) {
1632     GST_DEBUG ("take current time as base");
1633     /* our previous check time expired, start counting from the current time
1634      * again. */
1635     result = time;
1636   }
1637
1638   if (sess->source->received_bye) {
1639     if (sess->sent_bye) {
1640       GST_DEBUG ("we sent BYE already");
1641       result = GST_CLOCK_TIME_NONE;
1642     } else if (sess->stats.active_sources >= 50) {
1643       GST_DEBUG ("reconsider BYE, more than 50 sources");
1644       /* reconsider BYE if members >= 50 */
1645       result += calculate_rtcp_interval (sess, FALSE, TRUE);
1646     }
1647   } else {
1648     if (sess->first_rtcp) {
1649       GST_DEBUG ("first RTCP packet");
1650       /* we are called for the first time */
1651       result += calculate_rtcp_interval (sess, FALSE, TRUE);
1652     } else if (sess->next_rtcp_check_time < time) {
1653       GST_DEBUG ("old check time expired, getting new timeout");
1654       /* get a new timeout when we need to */
1655       result += calculate_rtcp_interval (sess, FALSE, FALSE);
1656     }
1657   }
1658   sess->next_rtcp_check_time = result;
1659
1660   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1661   RTP_SESSION_UNLOCK (sess);
1662
1663   return result;
1664 }
1665
1666 typedef struct
1667 {
1668   RTPSession *sess;
1669   GstBuffer *rtcp;
1670   GstClockTime time;
1671   guint64 ntpnstime;
1672   GstClockTime interval;
1673   GstRTCPPacket packet;
1674   gboolean is_bye;
1675   gboolean has_sdes;
1676 } ReportData;
1677
1678 static void
1679 session_start_rtcp (RTPSession * sess, ReportData * data)
1680 {
1681   GstRTCPPacket *packet = &data->packet;
1682   RTPSource *own = sess->source;
1683
1684   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
1685
1686   if (RTP_SOURCE_IS_SENDER (own)) {
1687     guint64 ntptime;
1688     guint32 rtptime;
1689     guint32 packet_count, octet_count;
1690
1691     /* we are a sender, create SR */
1692     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
1693     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
1694
1695     /* get latest stats */
1696     rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime,
1697         &packet_count, &octet_count);
1698     /* store stats */
1699     rtp_source_process_sr (own, data->ntpnstime, ntptime, rtptime, packet_count,
1700         octet_count);
1701
1702     /* fill in sender report info */
1703     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
1704         ntptime, rtptime, packet_count, octet_count);
1705   } else {
1706     /* we are only receiver, create RR */
1707     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
1708     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
1709     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
1710   }
1711 }
1712
1713 /* construct a Sender or Receiver Report */
1714 static void
1715 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
1716 {
1717   RTPSession *sess = data->sess;
1718   GstRTCPPacket *packet = &data->packet;
1719
1720   /* create a new buffer if needed */
1721   if (data->rtcp == NULL) {
1722     session_start_rtcp (sess, data);
1723   }
1724   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
1725     /* only report about other sender sources */
1726     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
1727       guint8 fractionlost;
1728       gint32 packetslost;
1729       guint32 exthighestseq, jitter;
1730       guint32 lsr, dlsr;
1731
1732       /* get new stats */
1733       rtp_source_get_new_rb (source, data->time, &fractionlost, &packetslost,
1734           &exthighestseq, &jitter, &lsr, &dlsr);
1735
1736       /* packet is not yet filled, add report block for this source. */
1737       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
1738           exthighestseq, jitter, lsr, dlsr);
1739     }
1740   }
1741 }
1742
1743 /* perform cleanup of sources that timed out */
1744 static gboolean
1745 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
1746 {
1747   gboolean remove = FALSE;
1748   gboolean byetimeout = FALSE;
1749   gboolean is_sender, is_active;
1750   RTPSession *sess = data->sess;
1751   GstClockTime interval;
1752
1753   is_sender = RTP_SOURCE_IS_SENDER (source);
1754   is_active = RTP_SOURCE_IS_ACTIVE (source);
1755
1756   /* check for our own source, we don't want to delete our own source. */
1757   if (!(source == sess->source)) {
1758     if (source->received_bye) {
1759       /* if we received a BYE from the source, remove the source after some
1760        * time. */
1761       if (data->time > source->bye_time &&
1762           data->time - source->bye_time > sess->stats.bye_timeout) {
1763         GST_DEBUG ("removing BYE source %08x", source->ssrc);
1764         remove = TRUE;
1765         byetimeout = TRUE;
1766       }
1767     }
1768     /* sources that were inactive for more than 5 times the deterministic reporting
1769      * interval get timed out. the min timeout is 5 seconds. */
1770     if (data->time > source->last_activity) {
1771       interval = MAX (data->interval * 5, 5 * GST_SECOND);
1772       if (data->time - source->last_activity > interval) {
1773         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
1774             source->ssrc, GST_TIME_ARGS (source->last_activity));
1775         remove = TRUE;
1776       }
1777     }
1778   }
1779
1780   /* senders that did not send for a long time become a receiver, this also
1781    * holds for our own source. */
1782   if (is_sender) {
1783     if (data->time > source->last_rtp_activity) {
1784       interval = MAX (data->interval * 2, 5 * GST_SECOND);
1785       if (data->time - source->last_rtp_activity > interval) {
1786         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
1787             GST_TIME_FORMAT, source->ssrc,
1788             GST_TIME_ARGS (source->last_rtp_activity));
1789         source->is_sender = FALSE;
1790         sess->stats.sender_sources--;
1791       }
1792     }
1793   }
1794
1795   if (remove) {
1796     sess->total_sources--;
1797     if (is_sender)
1798       sess->stats.sender_sources--;
1799     if (is_active)
1800       sess->stats.active_sources--;
1801
1802     if (byetimeout)
1803       on_bye_timeout (sess, source);
1804     else
1805       on_timeout (sess, source);
1806   }
1807   return remove;
1808 }
1809
1810 static void
1811 session_sdes (RTPSession * sess, ReportData * data)
1812 {
1813   GstRTCPPacket *packet = &data->packet;
1814   guint8 *sdes_data;
1815   guint sdes_len;
1816
1817   /* add SDES packet */
1818   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
1819
1820   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
1821
1822   rtp_source_get_sdes (sess->source, GST_RTCP_SDES_CNAME, &sdes_data,
1823       &sdes_len);
1824   gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME, sdes_len,
1825       sdes_data);
1826
1827   /* other SDES items must only be added at regular intervals and only when the
1828    * user requests to since it might be a privacy problem */
1829 #if 0
1830   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME,
1831       strlen (sess->name), (guint8 *) sess->name);
1832   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
1833       strlen (sess->tool), (guint8 *) sess->tool);
1834 #endif
1835
1836   data->has_sdes = TRUE;
1837 }
1838
1839 /* schedule a BYE packet */
1840 static void
1841 session_bye (RTPSession * sess, ReportData * data)
1842 {
1843   GstRTCPPacket *packet = &data->packet;
1844
1845   /* open packet */
1846   session_start_rtcp (sess, data);
1847
1848   /* add SDES */
1849   session_sdes (sess, data);
1850
1851   /* add a BYE packet */
1852   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
1853   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
1854   if (sess->bye_reason)
1855     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
1856
1857   /* we have a BYE packet now */
1858   data->is_bye = TRUE;
1859 }
1860
1861 static gboolean
1862 is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
1863 {
1864   GstClockTime new_send_time, elapsed;
1865   gboolean result;
1866
1867   /* no need to check yet */
1868   if (sess->next_rtcp_check_time > time) {
1869     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
1870         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
1871         GST_TIME_ARGS (time));
1872     return FALSE;
1873   }
1874
1875   /* get elapsed time since we last reported */
1876   elapsed = time - sess->last_rtcp_send_time;
1877
1878   /* perform forward reconsideration */
1879   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
1880
1881   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
1882       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
1883
1884   new_send_time += sess->last_rtcp_send_time;
1885
1886   /* check if reconsideration */
1887   if (time < new_send_time) {
1888     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
1889         GST_TIME_ARGS (new_send_time));
1890     result = FALSE;
1891     /* store new check time */
1892     sess->next_rtcp_check_time = new_send_time;
1893   } else {
1894     result = TRUE;
1895     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
1896
1897     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
1898         GST_TIME_ARGS (new_send_time));
1899     sess->next_rtcp_check_time = time + new_send_time;
1900   }
1901   return result;
1902 }
1903
1904 /**
1905  * rtp_session_on_timeout:
1906  * @sess: an #RTPSession
1907  * @time: the current system time
1908  * @ntpnstime: the current NTP time in nanoseconds
1909  *
1910  * Perform maintenance actions after the timeout obtained with
1911  * rtp_session_next_timeout() expired.
1912  *
1913  * This function will perform timeouts of receivers and senders, send a BYE
1914  * packet or generate RTCP packets with current session stats.
1915  *
1916  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
1917  * times, for each packet that should be processed.
1918  *
1919  * Returns: a #GstFlowReturn.
1920  */
1921 GstFlowReturn
1922 rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime)
1923 {
1924   GstFlowReturn result = GST_FLOW_OK;
1925   ReportData data;
1926
1927   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1928
1929   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
1930       GST_TIME_ARGS (time), GST_TIME_ARGS (ntpnstime));
1931
1932   data.sess = sess;
1933   data.rtcp = NULL;
1934   data.time = time;
1935   data.ntpnstime = ntpnstime;
1936   data.is_bye = FALSE;
1937   data.has_sdes = FALSE;
1938
1939   RTP_SESSION_LOCK (sess);
1940   /* get a new interval, we need this for various cleanups etc */
1941   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
1942
1943   /* first perform cleanups */
1944   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
1945       (GHRFunc) session_cleanup, &data);
1946
1947   /* see if we need to generate SR or RR packets */
1948   if (is_rtcp_time (sess, time, &data)) {
1949     if (sess->source->received_bye) {
1950       /* generate BYE instead */
1951       session_bye (sess, &data);
1952       sess->sent_bye = TRUE;
1953     } else {
1954       /* loop over all known sources and do something */
1955       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1956           (GHFunc) session_report_blocks, &data);
1957     }
1958   }
1959
1960   if (data.rtcp) {
1961     guint size;
1962
1963     /* we keep track of the last report time in order to timeout inactive
1964      * receivers or senders */
1965     sess->last_rtcp_send_time = data.time;
1966     sess->first_rtcp = FALSE;
1967
1968     /* add SDES for this source when not already added */
1969     if (!data.has_sdes)
1970       session_sdes (sess, &data);
1971
1972     /* update average RTCP size before sending */
1973     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
1974     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
1975   }
1976   RTP_SESSION_UNLOCK (sess);
1977
1978   /* push out the RTCP packet */
1979   if (data.rtcp) {
1980     /* close the RTCP packet */
1981     gst_rtcp_buffer_end (data.rtcp);
1982
1983     if (sess->callbacks.send_rtcp)
1984       result = sess->callbacks.send_rtcp (sess, sess->source, data.rtcp,
1985           sess->user_data);
1986     else
1987       gst_buffer_unref (data.rtcp);
1988   }
1989
1990   return result;
1991 }