gst/rtpmanager/: Fix some leaks.
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "gstrtpbin-marshal.h"
27
28 #include "rtpsession.h"
29
30 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
31 #define GST_CAT_DEFAULT rtp_session_debug
32
33 /* signals and args */
34 enum
35 {
36   SIGNAL_ON_NEW_SSRC,
37   SIGNAL_ON_SSRC_COLLISION,
38   SIGNAL_ON_SSRC_VALIDATED,
39   SIGNAL_ON_SSRC_ACTIVE,
40   SIGNAL_ON_SSRC_SDES,
41   SIGNAL_ON_BYE_SSRC,
42   SIGNAL_ON_BYE_TIMEOUT,
43   SIGNAL_ON_TIMEOUT,
44   LAST_SIGNAL
45 };
46
47 #define DEFAULT_INTERNAL_SOURCE      NULL
48 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
49 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_BANDWIDTH
50 #define DEFAULT_SDES_CNAME           NULL
51 #define DEFAULT_SDES_NAME            NULL
52 #define DEFAULT_SDES_EMAIL           NULL
53 #define DEFAULT_SDES_PHONE           NULL
54 #define DEFAULT_SDES_LOCATION        NULL
55 #define DEFAULT_SDES_TOOL            NULL
56 #define DEFAULT_SDES_NOTE            NULL
57 #define DEFAULT_NUM_SOURCES          0
58 #define DEFAULT_NUM_ACTIVE_SOURCES   0
59
60 enum
61 {
62   PROP_0,
63   PROP_INTERNAL_SOURCE,
64   PROP_BANDWIDTH,
65   PROP_RTCP_FRACTION,
66   PROP_SDES_CNAME,
67   PROP_SDES_NAME,
68   PROP_SDES_EMAIL,
69   PROP_SDES_PHONE,
70   PROP_SDES_LOCATION,
71   PROP_SDES_TOOL,
72   PROP_SDES_NOTE,
73   PROP_NUM_SOURCES,
74   PROP_NUM_ACTIVE_SOURCES,
75   PROP_LAST
76 };
77
78 /* update average packet size, we keep this scaled by 16 to keep enough
79  * precision. */
80 #define UPDATE_AVG(avg, val)            \
81   if ((avg) == 0)                       \
82    (avg) = (val) << 4;                  \
83   else                                  \
84    (avg) = ((val) + (15 * (avg))) >> 4;
85
86 /* GObject vmethods */
87 static void rtp_session_finalize (GObject * object);
88 static void rtp_session_set_property (GObject * object, guint prop_id,
89     const GValue * value, GParamSpec * pspec);
90 static void rtp_session_get_property (GObject * object, guint prop_id,
91     GValue * value, GParamSpec * pspec);
92
93 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
94
95 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
96
97 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
98     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
99
100 static void
101 rtp_session_class_init (RTPSessionClass * klass)
102 {
103   GObjectClass *gobject_class;
104
105   gobject_class = (GObjectClass *) klass;
106
107   gobject_class->finalize = rtp_session_finalize;
108   gobject_class->set_property = rtp_session_set_property;
109   gobject_class->get_property = rtp_session_get_property;
110
111   /**
112    * RTPSession::on-new-ssrc:
113    * @session: the object which received the signal
114    * @src: the new RTPSource
115    *
116    * Notify of a new SSRC that entered @session.
117    */
118   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
119       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
120       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
121       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
122       RTP_TYPE_SOURCE);
123   /**
124    * RTPSession::on-ssrc-collision:
125    * @session: the object which received the signal
126    * @src: the #RTPSource that caused a collision
127    *
128    * Notify when we have an SSRC collision
129    */
130   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
131       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
132       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
133       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
134       RTP_TYPE_SOURCE);
135   /**
136    * RTPSession::on-ssrc-validated:
137    * @session: the object which received the signal
138    * @src: the new validated RTPSource
139    *
140    * Notify of a new SSRC that became validated.
141    */
142   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
143       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
144       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
145       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
146       RTP_TYPE_SOURCE);
147   /**
148    * RTPSession::on-ssrc-active:
149    * @session: the object which received the signal
150    * @src: the active RTPSource
151    *
152    * Notify of a SSRC that is active, i.e., sending RTCP.
153    */
154   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
155       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
156       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
157       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
158       RTP_TYPE_SOURCE);
159   /**
160    * RTPSession::on-ssrc-sdes:
161    * @session: the object which received the signal
162    * @src: the RTPSource
163    *
164    * Notify that a new SDES was received for SSRC.
165    */
166   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
167       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
168       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
169       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
170       RTP_TYPE_SOURCE);
171   /**
172    * RTPSession::on-bye-ssrc:
173    * @session: the object which received the signal
174    * @src: the RTPSource that went away
175    *
176    * Notify of an SSRC that became inactive because of a BYE packet.
177    */
178   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
179       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
180       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
181       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
182       RTP_TYPE_SOURCE);
183   /**
184    * RTPSession::on-bye-timeout:
185    * @session: the object which received the signal
186    * @src: the RTPSource that timed out
187    *
188    * Notify of an SSRC that has timed out because of BYE
189    */
190   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
191       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
192       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
193       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
194       RTP_TYPE_SOURCE);
195   /**
196    * RTPSession::on-timeout:
197    * @session: the object which received the signal
198    * @src: the RTPSource that timed out
199    *
200    * Notify of an SSRC that has timed out
201    */
202   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
203       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
204       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
205       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
206       RTP_TYPE_SOURCE);
207
208   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
209       g_param_spec_object ("internal-source", "Internal Source",
210           "The internal source element of the session",
211           RTP_TYPE_SOURCE, G_PARAM_READABLE));
212
213   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
214       g_param_spec_double ("bandwidth", "Bandwidth",
215           "The bandwidth of the session",
216           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH, G_PARAM_READWRITE));
217
218   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
219       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
220           "The fraction of the bandwidth used for RTCP",
221           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION, G_PARAM_READWRITE));
222
223   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
224       g_param_spec_string ("sdes-cname", "SDES CNAME",
225           "The CNAME to put in SDES messages of this session",
226           DEFAULT_SDES_CNAME, G_PARAM_READWRITE));
227
228   g_object_class_install_property (gobject_class, PROP_SDES_NAME,
229       g_param_spec_string ("sdes-name", "SDES NAME",
230           "The NAME to put in SDES messages of this session",
231           DEFAULT_SDES_NAME, G_PARAM_READWRITE));
232
233   g_object_class_install_property (gobject_class, PROP_SDES_EMAIL,
234       g_param_spec_string ("sdes-email", "SDES EMAIL",
235           "The EMAIL to put in SDES messages of this session",
236           DEFAULT_SDES_EMAIL, G_PARAM_READWRITE));
237
238   g_object_class_install_property (gobject_class, PROP_SDES_PHONE,
239       g_param_spec_string ("sdes-phone", "SDES PHONE",
240           "The PHONE to put in SDES messages of this session",
241           DEFAULT_SDES_PHONE, G_PARAM_READWRITE));
242
243   g_object_class_install_property (gobject_class, PROP_SDES_LOCATION,
244       g_param_spec_string ("sdes-location", "SDES LOCATION",
245           "The LOCATION to put in SDES messages of this session",
246           DEFAULT_SDES_LOCATION, G_PARAM_READWRITE));
247
248   g_object_class_install_property (gobject_class, PROP_SDES_TOOL,
249       g_param_spec_string ("sdes-tool", "SDES TOOL",
250           "The TOOL to put in SDES messages of this session",
251           DEFAULT_SDES_TOOL, G_PARAM_READWRITE));
252
253   g_object_class_install_property (gobject_class, PROP_SDES_NOTE,
254       g_param_spec_string ("sdes-note", "SDES NOTE",
255           "The NOTE to put in SDES messages of this session",
256           DEFAULT_SDES_NOTE, G_PARAM_READWRITE));
257
258   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
259       g_param_spec_uint ("num-sources", "Num Sources",
260           "The number of sources in the session", 0, G_MAXUINT,
261           DEFAULT_NUM_SOURCES, G_PARAM_READABLE));
262
263   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
264       g_param_spec_uint ("num-active-sources", "Num Active Sources",
265           "The number of active sources in the session", 0, G_MAXUINT,
266           DEFAULT_NUM_ACTIVE_SOURCES, G_PARAM_READABLE));
267
268   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
269 }
270
271 static void
272 rtp_session_init (RTPSession * sess)
273 {
274   gint i;
275   gchar *str;
276
277   sess->lock = g_mutex_new ();
278   sess->key = g_random_int ();
279   sess->mask_idx = 0;
280   sess->mask = 0;
281
282   for (i = 0; i < 32; i++) {
283     sess->ssrcs[i] =
284         g_hash_table_new_full (NULL, NULL, NULL,
285         (GDestroyNotify) g_object_unref);
286   }
287   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
288
289   rtp_stats_init_defaults (&sess->stats);
290
291   /* create an active SSRC for this session manager */
292   sess->source = rtp_session_create_source (sess);
293   sess->source->validated = TRUE;
294   sess->stats.active_sources++;
295
296   /* default UDP header length */
297   sess->header_len = 28;
298   sess->mtu = 1400;
299
300   /* some default SDES entries */
301   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
302   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
303   g_free (str);
304
305   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME,
306       g_get_real_name ());
307   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
308
309   sess->first_rtcp = TRUE;
310
311   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
312 }
313
314 static void
315 rtp_session_finalize (GObject * object)
316 {
317   RTPSession *sess;
318   gint i;
319
320   sess = RTP_SESSION_CAST (object);
321
322   g_mutex_free (sess->lock);
323   for (i = 0; i < 32; i++)
324     g_hash_table_destroy (sess->ssrcs[i]);
325
326   g_free (sess->bye_reason);
327
328   g_hash_table_destroy (sess->cnames);
329   g_object_unref (sess->source);
330
331   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
332 }
333
334 static void
335 rtp_session_set_property (GObject * object, guint prop_id,
336     const GValue * value, GParamSpec * pspec)
337 {
338   RTPSession *sess;
339
340   sess = RTP_SESSION (object);
341
342   switch (prop_id) {
343     case PROP_BANDWIDTH:
344       rtp_session_set_bandwidth (sess, g_value_get_double (value));
345       break;
346     case PROP_RTCP_FRACTION:
347       rtp_session_set_rtcp_fraction (sess, g_value_get_double (value));
348       break;
349     case PROP_SDES_CNAME:
350       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_CNAME,
351           g_value_get_string (value));
352       break;
353     case PROP_SDES_NAME:
354       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NAME,
355           g_value_get_string (value));
356       break;
357     case PROP_SDES_EMAIL:
358       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_EMAIL,
359           g_value_get_string (value));
360       break;
361     case PROP_SDES_PHONE:
362       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_PHONE,
363           g_value_get_string (value));
364       break;
365     case PROP_SDES_LOCATION:
366       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_LOC,
367           g_value_get_string (value));
368       break;
369     case PROP_SDES_TOOL:
370       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_TOOL,
371           g_value_get_string (value));
372       break;
373     case PROP_SDES_NOTE:
374       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NOTE,
375           g_value_get_string (value));
376       break;
377     default:
378       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
379       break;
380   }
381 }
382
383 static void
384 rtp_session_get_property (GObject * object, guint prop_id,
385     GValue * value, GParamSpec * pspec)
386 {
387   RTPSession *sess;
388
389   sess = RTP_SESSION (object);
390
391   switch (prop_id) {
392     case PROP_INTERNAL_SOURCE:
393       g_value_take_object (value, rtp_session_get_internal_source (sess));
394       break;
395     case PROP_BANDWIDTH:
396       g_value_set_double (value, rtp_session_get_bandwidth (sess));
397       break;
398     case PROP_RTCP_FRACTION:
399       g_value_set_double (value, rtp_session_get_rtcp_fraction (sess));
400       break;
401     case PROP_SDES_CNAME:
402       g_value_take_string (value, rtp_session_get_sdes_string (sess,
403               GST_RTCP_SDES_CNAME));
404       break;
405     case PROP_SDES_NAME:
406       g_value_take_string (value, rtp_session_get_sdes_string (sess,
407               GST_RTCP_SDES_NAME));
408       break;
409     case PROP_SDES_EMAIL:
410       g_value_take_string (value, rtp_session_get_sdes_string (sess,
411               GST_RTCP_SDES_EMAIL));
412       break;
413     case PROP_SDES_PHONE:
414       g_value_take_string (value, rtp_session_get_sdes_string (sess,
415               GST_RTCP_SDES_PHONE));
416       break;
417     case PROP_SDES_LOCATION:
418       g_value_take_string (value, rtp_session_get_sdes_string (sess,
419               GST_RTCP_SDES_LOC));
420       break;
421     case PROP_SDES_TOOL:
422       g_value_take_string (value, rtp_session_get_sdes_string (sess,
423               GST_RTCP_SDES_TOOL));
424       break;
425     case PROP_SDES_NOTE:
426       g_value_take_string (value, rtp_session_get_sdes_string (sess,
427               GST_RTCP_SDES_NOTE));
428       break;
429     case PROP_NUM_SOURCES:
430       g_value_set_uint (value, rtp_session_get_num_sources (sess));
431       break;
432     case PROP_NUM_ACTIVE_SOURCES:
433       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
434       break;
435     default:
436       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
437       break;
438   }
439 }
440
441 static void
442 on_new_ssrc (RTPSession * sess, RTPSource * source)
443 {
444   RTP_SESSION_UNLOCK (sess);
445   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
446   RTP_SESSION_LOCK (sess);
447 }
448
449 static void
450 on_ssrc_collision (RTPSession * sess, RTPSource * source)
451 {
452   RTP_SESSION_UNLOCK (sess);
453   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
454       source);
455   RTP_SESSION_LOCK (sess);
456 }
457
458 static void
459 on_ssrc_validated (RTPSession * sess, RTPSource * source)
460 {
461   RTP_SESSION_UNLOCK (sess);
462   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
463       source);
464   RTP_SESSION_LOCK (sess);
465 }
466
467 static void
468 on_ssrc_active (RTPSession * sess, RTPSource * source)
469 {
470   RTP_SESSION_UNLOCK (sess);
471   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
472   RTP_SESSION_LOCK (sess);
473 }
474
475 static void
476 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
477 {
478   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
479   RTP_SESSION_UNLOCK (sess);
480   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
481   RTP_SESSION_LOCK (sess);
482 }
483
484 static void
485 on_bye_ssrc (RTPSession * sess, RTPSource * source)
486 {
487   RTP_SESSION_UNLOCK (sess);
488   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
489   RTP_SESSION_LOCK (sess);
490 }
491
492 static void
493 on_bye_timeout (RTPSession * sess, RTPSource * source)
494 {
495   RTP_SESSION_UNLOCK (sess);
496   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
497   RTP_SESSION_LOCK (sess);
498 }
499
500 static void
501 on_timeout (RTPSession * sess, RTPSource * source)
502 {
503   RTP_SESSION_UNLOCK (sess);
504   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
505   RTP_SESSION_LOCK (sess);
506 }
507
508 /**
509  * rtp_session_new:
510  *
511  * Create a new session object.
512  *
513  * Returns: a new #RTPSession. g_object_unref() after usage.
514  */
515 RTPSession *
516 rtp_session_new (void)
517 {
518   RTPSession *sess;
519
520   sess = g_object_new (RTP_TYPE_SESSION, NULL);
521
522   return sess;
523 }
524
525 /**
526  * rtp_session_set_callbacks:
527  * @sess: an #RTPSession
528  * @callbacks: callbacks to configure
529  * @user_data: user data passed in the callbacks
530  *
531  * Configure a set of callbacks to be notified of actions.
532  */
533 void
534 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
535     gpointer user_data)
536 {
537   g_return_if_fail (RTP_IS_SESSION (sess));
538
539   sess->callbacks.process_rtp = callbacks->process_rtp;
540   sess->callbacks.send_rtp = callbacks->send_rtp;
541   sess->callbacks.send_rtcp = callbacks->send_rtcp;
542   sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
543   sess->callbacks.clock_rate = callbacks->clock_rate;
544   sess->callbacks.reconsider = callbacks->reconsider;
545   sess->user_data = user_data;
546 }
547
548 /**
549  * rtp_session_set_bandwidth:
550  * @sess: an #RTPSession
551  * @bandwidth: the bandwidth allocated
552  *
553  * Set the session bandwidth in bytes per second.
554  */
555 void
556 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
557 {
558   g_return_if_fail (RTP_IS_SESSION (sess));
559
560   RTP_SESSION_LOCK (sess);
561   sess->stats.bandwidth = bandwidth;
562   RTP_SESSION_UNLOCK (sess);
563 }
564
565 /**
566  * rtp_session_get_bandwidth:
567  * @sess: an #RTPSession
568  *
569  * Get the session bandwidth.
570  *
571  * Returns: the session bandwidth.
572  */
573 gdouble
574 rtp_session_get_bandwidth (RTPSession * sess)
575 {
576   gdouble result;
577
578   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
579
580   RTP_SESSION_LOCK (sess);
581   result = sess->stats.bandwidth;
582   RTP_SESSION_UNLOCK (sess);
583
584   return result;
585 }
586
587 /**
588  * rtp_session_set_rtcp_fraction:
589  * @sess: an #RTPSession
590  * @bandwidth: the RTCP bandwidth
591  *
592  * Set the bandwidth that should be used for RTCP
593  * messages.
594  */
595 void
596 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
597 {
598   g_return_if_fail (RTP_IS_SESSION (sess));
599
600   RTP_SESSION_LOCK (sess);
601   sess->stats.rtcp_bandwidth = bandwidth;
602   RTP_SESSION_UNLOCK (sess);
603 }
604
605 /**
606  * rtp_session_get_rtcp_fraction:
607  * @sess: an #RTPSession
608  *
609  * Get the session bandwidth used for RTCP.
610  *
611  * Returns: The bandwidth used for RTCP messages.
612  */
613 gdouble
614 rtp_session_get_rtcp_fraction (RTPSession * sess)
615 {
616   gdouble result;
617
618   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
619
620   RTP_SESSION_LOCK (sess);
621   result = sess->stats.rtcp_bandwidth;
622   RTP_SESSION_UNLOCK (sess);
623
624   return result;
625 }
626
627 /**
628  * rtp_session_set_sdes_string:
629  * @sess: an #RTPSession
630  * @type: the type of the SDES item
631  * @item: a null-terminated string to set. 
632  *
633  * Store an SDES item of @type in @sess. 
634  *
635  * Returns: %FALSE if the data was unchanged @type is invalid.
636  */
637 gboolean
638 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
639     const gchar * item)
640 {
641   gboolean result;
642
643   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
644
645   RTP_SESSION_LOCK (sess);
646   result = rtp_source_set_sdes_string (sess->source, type, item);
647   RTP_SESSION_UNLOCK (sess);
648
649   return result;
650 }
651
652 /**
653  * rtp_session_get_sdes_string:
654  * @sess: an #RTPSession
655  * @type: the type of the SDES item
656  *
657  * Get the SDES item of @type from @sess. 
658  *
659  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
660  * valid. g_free() after usage.
661  */
662 gchar *
663 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
664 {
665   gchar *result;
666
667   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
668
669   RTP_SESSION_LOCK (sess);
670   result = rtp_source_get_sdes_string (sess->source, type);
671   RTP_SESSION_UNLOCK (sess);
672
673   return result;
674 }
675
676 static GstFlowReturn
677 source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
678 {
679   GstFlowReturn result = GST_FLOW_OK;
680
681   if (source == session->source) {
682     GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
683
684     RTP_SESSION_UNLOCK (session);
685
686     if (session->callbacks.send_rtp)
687       result =
688           session->callbacks.send_rtp (session, source, buffer,
689           session->user_data);
690     else
691       gst_buffer_unref (buffer);
692
693   } else {
694     GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
695     RTP_SESSION_UNLOCK (session);
696
697     if (session->callbacks.process_rtp)
698       result =
699           session->callbacks.process_rtp (session, source, buffer,
700           session->user_data);
701     else
702       gst_buffer_unref (buffer);
703   }
704   RTP_SESSION_LOCK (session);
705
706   return result;
707 }
708
709 static gint
710 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
711 {
712   gint result;
713
714   if (session->callbacks.clock_rate)
715     result = session->callbacks.clock_rate (session, pt, session->user_data);
716   else
717     result = -1;
718
719   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
720
721   return result;
722 }
723
724 static RTPSourceCallbacks callbacks = {
725   (RTPSourcePushRTP) source_push_rtp,
726   (RTPSourceClockRate) source_clock_rate,
727 };
728
729 static gboolean
730 check_collision (RTPSession * sess, RTPSource * source,
731     RTPArrivalStats * arrival)
732 {
733   /* FIXME, do collision check */
734   return FALSE;
735 }
736
737 /* must be called with the session lock */
738 static RTPSource *
739 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
740     RTPArrivalStats * arrival, gboolean rtp)
741 {
742   RTPSource *source;
743
744   source =
745       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
746   if (source == NULL) {
747     /* make new Source in probation and insert */
748     source = rtp_source_new (ssrc);
749
750     /* for RTP packets we need to set the source in probation. Receiving RTCP
751      * packets of an SSRC, on the other hand, is a strong indication that we
752      * are dealing with a valid source. */
753     if (rtp)
754       source->probation = RTP_DEFAULT_PROBATION;
755     else
756       source->probation = 0;
757
758     /* store from address, if any */
759     if (arrival->have_address) {
760       if (rtp)
761         rtp_source_set_rtp_from (source, &arrival->address);
762       else
763         rtp_source_set_rtcp_from (source, &arrival->address);
764     }
765
766     /* configure a callback on the source */
767     rtp_source_set_callbacks (source, &callbacks, sess);
768
769     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
770         source);
771
772     /* we have one more source now */
773     sess->total_sources++;
774     *created = TRUE;
775   } else {
776     *created = FALSE;
777     /* check for collision, this updates the address when not previously set */
778     if (check_collision (sess, source, arrival))
779       on_ssrc_collision (sess, source);
780   }
781   /* update last activity */
782   source->last_activity = arrival->time;
783   if (rtp)
784     source->last_rtp_activity = arrival->time;
785
786   return source;
787 }
788
789 /**
790  * rtp_session_get_internal_source:
791  * @sess: a #RTPSession
792  *
793  * Get the internal #RTPSource of @session.
794  *
795  * Returns: The internal #RTPSource. g_object_unref() after usage.
796  */
797 RTPSource *
798 rtp_session_get_internal_source (RTPSession * sess)
799 {
800   RTPSource *result;
801
802   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
803
804   result = g_object_ref (sess->source);
805
806   return result;
807 }
808
809 /**
810  * rtp_session_add_source:
811  * @sess: a #RTPSession
812  * @src: #RTPSource to add
813  *
814  * Add @src to @session.
815  *
816  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
817  * existed in the session.
818  */
819 gboolean
820 rtp_session_add_source (RTPSession * sess, RTPSource * src)
821 {
822   gboolean result = FALSE;
823   RTPSource *find;
824
825   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
826   g_return_val_if_fail (src != NULL, FALSE);
827
828   RTP_SESSION_LOCK (sess);
829   find =
830       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
831       GINT_TO_POINTER (src->ssrc));
832   if (find == NULL) {
833     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
834         GINT_TO_POINTER (src->ssrc), src);
835     /* we have one more source now */
836     sess->total_sources++;
837     result = TRUE;
838   }
839   RTP_SESSION_UNLOCK (sess);
840
841   return result;
842 }
843
844 /**
845  * rtp_session_get_num_sources:
846  * @sess: an #RTPSession
847  *
848  * Get the number of sources in @sess.
849  *
850  * Returns: The number of sources in @sess.
851  */
852 guint
853 rtp_session_get_num_sources (RTPSession * sess)
854 {
855   guint result;
856
857   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
858
859   RTP_SESSION_LOCK (sess);
860   result = sess->total_sources;
861   RTP_SESSION_UNLOCK (sess);
862
863   return result;
864 }
865
866 /**
867  * rtp_session_get_num_active_sources:
868  * @sess: an #RTPSession
869  *
870  * Get the number of active sources in @sess. A source is considered active when
871  * it has been validated and has not yet received a BYE RTCP message.
872  *
873  * Returns: The number of active sources in @sess.
874  */
875 guint
876 rtp_session_get_num_active_sources (RTPSession * sess)
877 {
878   guint result;
879
880   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
881
882   RTP_SESSION_LOCK (sess);
883   result = sess->stats.active_sources;
884   RTP_SESSION_UNLOCK (sess);
885
886   return result;
887 }
888
889 /**
890  * rtp_session_get_source_by_ssrc:
891  * @sess: an #RTPSession
892  * @ssrc: an SSRC
893  *
894  * Find the source with @ssrc in @sess.
895  *
896  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
897  * g_object_unref() after usage.
898  */
899 RTPSource *
900 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
901 {
902   RTPSource *result;
903
904   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
905
906   RTP_SESSION_LOCK (sess);
907   result =
908       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
909   if (result)
910     g_object_ref (result);
911   RTP_SESSION_UNLOCK (sess);
912
913   return result;
914 }
915
916 /**
917  * rtp_session_get_source_by_cname:
918  * @sess: a #RTPSession
919  * @cname: an CNAME
920  *
921  * Find the source with @cname in @sess.
922  *
923  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
924  * g_object_unref() after usage.
925  */
926 RTPSource *
927 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
928 {
929   RTPSource *result;
930
931   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
932   g_return_val_if_fail (cname != NULL, NULL);
933
934   RTP_SESSION_LOCK (sess);
935   result = g_hash_table_lookup (sess->cnames, cname);
936   if (result)
937     g_object_ref (result);
938   RTP_SESSION_UNLOCK (sess);
939
940   return result;
941 }
942
943 /**
944  * rtp_session_create_source:
945  * @sess: an #RTPSession
946  *
947  * Create an #RTPSource for use in @sess. This function will create a source
948  * with an ssrc that is currently not used by any participants in the session.
949  *
950  * Returns: an #RTPSource.
951  */
952 RTPSource *
953 rtp_session_create_source (RTPSession * sess)
954 {
955   guint32 ssrc;
956   RTPSource *source;
957
958   RTP_SESSION_LOCK (sess);
959   while (TRUE) {
960     ssrc = g_random_int ();
961
962     /* see if it exists in the session, we're done if it doesn't */
963     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
964             GINT_TO_POINTER (ssrc)) == NULL)
965       break;
966   }
967   source = rtp_source_new (ssrc);
968   g_object_ref (source);
969   rtp_source_set_callbacks (source, &callbacks, sess);
970   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
971       source);
972   /* we have one more source now */
973   sess->total_sources++;
974   RTP_SESSION_UNLOCK (sess);
975
976   return source;
977 }
978
979 /* update the RTPArrivalStats structure with the current time and other bits
980  * about the current buffer we are handling.
981  * This function is typically called when a validated packet is received.
982  * This function should be called with the SESSION_LOCK
983  */
984 static void
985 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
986     gboolean rtp, GstBuffer * buffer, guint64 ntpnstime)
987 {
988   GTimeVal current;
989
990   /* get time of arrival */
991   g_get_current_time (&current);
992   arrival->time = GST_TIMEVAL_TO_TIME (current);
993   arrival->ntpnstime = ntpnstime;
994
995   /* get packet size including header overhead */
996   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
997
998   if (rtp) {
999     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1000   } else {
1001     arrival->payload_len = 0;
1002   }
1003
1004   /* for netbuffer we can store the IP address to check for collisions */
1005   arrival->have_address = GST_IS_NETBUFFER (buffer);
1006   if (arrival->have_address) {
1007     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1008
1009     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1010   }
1011 }
1012
1013 /**
1014  * rtp_session_process_rtp:
1015  * @sess: and #RTPSession
1016  * @buffer: an RTP buffer
1017  * @ntpnstime: the NTP arrival time in nanoseconds
1018  *
1019  * Process an RTP buffer in the session manager. This function takes ownership
1020  * of @buffer.
1021  *
1022  * Returns: a #GstFlowReturn.
1023  */
1024 GstFlowReturn
1025 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1026     guint64 ntpnstime)
1027 {
1028   GstFlowReturn result;
1029   guint32 ssrc;
1030   RTPSource *source;
1031   gboolean created;
1032   gboolean prevsender, prevactive;
1033   RTPArrivalStats arrival;
1034
1035   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1036   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1037
1038   if (!gst_rtp_buffer_validate (buffer))
1039     goto invalid_packet;
1040
1041   RTP_SESSION_LOCK (sess);
1042   /* update arrival stats */
1043   update_arrival_stats (sess, &arrival, TRUE, buffer, ntpnstime);
1044
1045   /* ignore more RTP packets when we left the session */
1046   if (sess->source->received_bye)
1047     goto ignore;
1048
1049   /* get SSRC and look up in session database */
1050   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1051   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1052
1053   prevsender = RTP_SOURCE_IS_SENDER (source);
1054   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1055
1056   /* we need to ref so that we can process the CSRCs later */
1057   gst_buffer_ref (buffer);
1058
1059   /* let source process the packet */
1060   result = rtp_source_process_rtp (source, buffer, &arrival);
1061
1062   /* source became active */
1063   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1064     sess->stats.active_sources++;
1065     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1066         sess->stats.active_sources);
1067     on_ssrc_validated (sess, source);
1068   }
1069   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1070     sess->stats.sender_sources++;
1071     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1072         sess->stats.sender_sources);
1073   }
1074
1075   if (created)
1076     on_new_ssrc (sess, source);
1077
1078   if (source->validated) {
1079     guint8 i, count;
1080     gboolean created;
1081
1082     /* for validated sources, we add the CSRCs as well */
1083     count = gst_rtp_buffer_get_csrc_count (buffer);
1084
1085     for (i = 0; i < count; i++) {
1086       guint32 csrc;
1087       RTPSource *csrc_src;
1088
1089       csrc = gst_rtp_buffer_get_csrc (buffer, i);
1090
1091       /* get source */
1092       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1093
1094       if (created) {
1095         GST_DEBUG ("created new CSRC: %08x", csrc);
1096         rtp_source_set_as_csrc (csrc_src);
1097         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1098           sess->stats.active_sources++;
1099         on_new_ssrc (sess, source);
1100       }
1101     }
1102   }
1103   gst_buffer_unref (buffer);
1104
1105   RTP_SESSION_UNLOCK (sess);
1106
1107   return result;
1108
1109   /* ERRORS */
1110 invalid_packet:
1111   {
1112     gst_buffer_unref (buffer);
1113     GST_DEBUG ("invalid RTP packet received");
1114     return GST_FLOW_OK;
1115   }
1116 ignore:
1117   {
1118     gst_buffer_unref (buffer);
1119     RTP_SESSION_UNLOCK (sess);
1120     GST_DEBUG ("ignoring RTP packet because we are leaving");
1121     return GST_FLOW_OK;
1122   }
1123 }
1124
1125 static void
1126 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1127     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1128 {
1129   guint count, i;
1130
1131   count = gst_rtcp_packet_get_rb_count (packet);
1132   for (i = 0; i < count; i++) {
1133     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1134     guint8 fractionlost;
1135     gint32 packetslost;
1136
1137     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1138         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1139
1140     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1141
1142     if (ssrc == sess->source->ssrc) {
1143       /* only deal with report blocks for our session, we update the stats of
1144        * the sender of the RTCP message. We could also compare our stats against
1145        * the other sender to see if we are better or worse. */
1146       rtp_source_process_rb (source, arrival->time, fractionlost, packetslost,
1147           exthighestseq, jitter, lsr, dlsr);
1148
1149       on_ssrc_active (sess, source);
1150     }
1151   }
1152 }
1153
1154 /* A Sender report contains statistics about how the sender is doing. This
1155  * includes timing informataion such as the relation between RTP and NTP
1156  * timestamps and the number of packets/bytes it sent to us.
1157  *
1158  * In this report is also included a set of report blocks related to how this
1159  * sender is receiving data (in case we (or somebody else) is also sending stuff
1160  * to it). This info includes the packet loss, jitter and seqnum. It also
1161  * contains information to calculate the round trip time (LSR/DLSR).
1162  */
1163 static void
1164 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1165     RTPArrivalStats * arrival)
1166 {
1167   guint32 senderssrc, rtptime, packet_count, octet_count;
1168   guint64 ntptime;
1169   RTPSource *source;
1170   gboolean created, prevsender;
1171
1172   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1173       &packet_count, &octet_count);
1174
1175   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1176       senderssrc, GST_TIME_ARGS (arrival->time));
1177
1178   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1179
1180   GST_BUFFER_OFFSET (packet->buffer) = source->clock_base;
1181
1182   prevsender = RTP_SOURCE_IS_SENDER (source);
1183
1184   /* first update the source */
1185   rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count,
1186       octet_count);
1187
1188   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1189     sess->stats.sender_sources++;
1190     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1191         sess->stats.sender_sources);
1192   }
1193
1194   if (created)
1195     on_new_ssrc (sess, source);
1196
1197   rtp_session_process_rb (sess, source, packet, arrival);
1198 }
1199
1200 /* A receiver report contains statistics about how a receiver is doing. It
1201  * includes stuff like packet loss, jitter and the seqnum it received last. It
1202  * also contains info to calculate the round trip time.
1203  *
1204  * We are only interested in how the sender of this report is doing wrt to us.
1205  */
1206 static void
1207 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1208     RTPArrivalStats * arrival)
1209 {
1210   guint32 senderssrc;
1211   RTPSource *source;
1212   gboolean created;
1213
1214   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1215
1216   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1217
1218   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1219
1220   if (created)
1221     on_new_ssrc (sess, source);
1222
1223   rtp_session_process_rb (sess, source, packet, arrival);
1224 }
1225
1226 /* Get SDES items and store them in the SSRC */
1227 static void
1228 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1229     RTPArrivalStats * arrival)
1230 {
1231   guint items, i, j;
1232   gboolean more_items, more_entries;
1233
1234   items = gst_rtcp_packet_sdes_get_item_count (packet);
1235   GST_DEBUG ("got SDES packet with %d items", items);
1236
1237   more_items = gst_rtcp_packet_sdes_first_item (packet);
1238   i = 0;
1239   while (more_items) {
1240     guint32 ssrc;
1241     gboolean changed, created;
1242     RTPSource *source;
1243
1244     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1245
1246     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1247
1248     /* find src, no probation when dealing with RTCP */
1249     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1250     changed = FALSE;
1251
1252     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1253     j = 0;
1254     while (more_entries) {
1255       GstRTCPSDESType type;
1256       guint8 len;
1257       guint8 *data;
1258
1259       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1260
1261       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1262           data);
1263
1264       changed |= rtp_source_set_sdes (source, type, data, len);
1265
1266       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1267       j++;
1268     }
1269
1270     if (created)
1271       on_new_ssrc (sess, source);
1272     if (changed)
1273       on_ssrc_sdes (sess, source);
1274
1275     more_items = gst_rtcp_packet_sdes_next_item (packet);
1276     i++;
1277   }
1278 }
1279
1280 /* BYE is sent when a client leaves the session
1281  */
1282 static void
1283 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1284     RTPArrivalStats * arrival)
1285 {
1286   guint count, i;
1287   gchar *reason;
1288
1289   reason = gst_rtcp_packet_bye_get_reason (packet);
1290   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1291
1292   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1293   for (i = 0; i < count; i++) {
1294     guint32 ssrc;
1295     RTPSource *source;
1296     gboolean created, prevactive, prevsender;
1297     guint pmembers, members;
1298
1299     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1300     GST_DEBUG ("SSRC: %08x", ssrc);
1301
1302     /* find src and mark bye, no probation when dealing with RTCP */
1303     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1304
1305     /* store time for when we need to time out this source */
1306     source->bye_time = arrival->time;
1307
1308     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1309     prevsender = RTP_SOURCE_IS_SENDER (source);
1310
1311     /* let the source handle the rest */
1312     rtp_source_process_bye (source, reason);
1313
1314     pmembers = sess->stats.active_sources;
1315
1316     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1317       sess->stats.active_sources--;
1318       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1319           sess->stats.active_sources);
1320     }
1321     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1322       sess->stats.sender_sources--;
1323       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1324           sess->stats.sender_sources);
1325     }
1326     members = sess->stats.active_sources;
1327
1328     if (!sess->source->received_bye && members < pmembers) {
1329       /* some members went away since the previous timeout estimate.
1330        * Perform reverse reconsideration but only when we are not scheduling a
1331        * BYE ourselves. */
1332       if (arrival->time < sess->next_rtcp_check_time) {
1333         GstClockTime time_remaining;
1334
1335         time_remaining = sess->next_rtcp_check_time - arrival->time;
1336         sess->next_rtcp_check_time =
1337             gst_util_uint64_scale (time_remaining, members, pmembers);
1338
1339         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1340             GST_TIME_ARGS (sess->next_rtcp_check_time));
1341
1342         sess->next_rtcp_check_time += arrival->time;
1343
1344         /* notify app of reconsideration */
1345         if (sess->callbacks.reconsider)
1346           sess->callbacks.reconsider (sess, sess->user_data);
1347       }
1348     }
1349
1350     if (created)
1351       on_new_ssrc (sess, source);
1352
1353     on_bye_ssrc (sess, source);
1354   }
1355   g_free (reason);
1356 }
1357
1358 static void
1359 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1360     RTPArrivalStats * arrival)
1361 {
1362   GST_DEBUG ("received APP");
1363 }
1364
1365 /**
1366  * rtp_session_process_rtcp:
1367  * @sess: and #RTPSession
1368  * @buffer: an RTCP buffer
1369  *
1370  * Process an RTCP buffer in the session manager. This function takes ownership
1371  * of @buffer.
1372  *
1373  * Returns: a #GstFlowReturn.
1374  */
1375 GstFlowReturn
1376 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
1377 {
1378   GstRTCPPacket packet;
1379   gboolean more, is_bye = FALSE, is_sr = FALSE;
1380   RTPArrivalStats arrival;
1381   GstFlowReturn result = GST_FLOW_OK;
1382
1383   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1384   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1385
1386   if (!gst_rtcp_buffer_validate (buffer))
1387     goto invalid_packet;
1388
1389   GST_DEBUG ("received RTCP packet");
1390
1391   RTP_SESSION_LOCK (sess);
1392   /* update arrival stats */
1393   update_arrival_stats (sess, &arrival, FALSE, buffer, -1);
1394
1395   if (sess->sent_bye)
1396     goto ignore;
1397
1398   /* start processing the compound packet */
1399   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1400   while (more) {
1401     GstRTCPType type;
1402
1403     type = gst_rtcp_packet_get_type (&packet);
1404
1405     /* when we are leaving the session, we should ignore all non-BYE messages */
1406     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1407       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1408       goto next;
1409     }
1410
1411     switch (type) {
1412       case GST_RTCP_TYPE_SR:
1413         rtp_session_process_sr (sess, &packet, &arrival);
1414         is_sr = TRUE;
1415         break;
1416       case GST_RTCP_TYPE_RR:
1417         rtp_session_process_rr (sess, &packet, &arrival);
1418         break;
1419       case GST_RTCP_TYPE_SDES:
1420         rtp_session_process_sdes (sess, &packet, &arrival);
1421         break;
1422       case GST_RTCP_TYPE_BYE:
1423         is_bye = TRUE;
1424         rtp_session_process_bye (sess, &packet, &arrival);
1425         break;
1426       case GST_RTCP_TYPE_APP:
1427         rtp_session_process_app (sess, &packet, &arrival);
1428         break;
1429       default:
1430         GST_WARNING ("got unknown RTCP packet");
1431         break;
1432     }
1433   next:
1434     more = gst_rtcp_packet_move_to_next (&packet);
1435   }
1436
1437   /* if we are scheduling a BYE, we only want to count bye packets, else we
1438    * count everything */
1439   if (sess->source->received_bye) {
1440     if (is_bye) {
1441       sess->stats.bye_members++;
1442       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1443     }
1444   } else {
1445     /* keep track of average packet size */
1446     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1447   }
1448   RTP_SESSION_UNLOCK (sess);
1449
1450   /* notify caller of sr packets in the callback */
1451   if (is_sr && sess->callbacks.sync_rtcp)
1452     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
1453         sess->user_data);
1454   else
1455     gst_buffer_unref (buffer);
1456
1457   return result;
1458
1459   /* ERRORS */
1460 invalid_packet:
1461   {
1462     GST_DEBUG ("invalid RTCP packet received");
1463     gst_buffer_unref (buffer);
1464     return GST_FLOW_OK;
1465   }
1466 ignore:
1467   {
1468     gst_buffer_unref (buffer);
1469     RTP_SESSION_UNLOCK (sess);
1470     GST_DEBUG ("ignoring RTP packet because we left");
1471     return GST_FLOW_OK;
1472   }
1473 }
1474
1475 /**
1476  * rtp_session_send_rtp:
1477  * @sess: an #RTPSession
1478  * @buffer: an RTP buffer
1479  * @ntpnstime: the NTP time in nanoseconds of when this buffer was captured.
1480  *
1481  * Send the RTP buffer in the session manager. This function takes ownership of
1482  * @buffer.
1483  *
1484  * Returns: a #GstFlowReturn.
1485  */
1486 GstFlowReturn
1487 rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntpnstime)
1488 {
1489   GstFlowReturn result;
1490   RTPSource *source;
1491   gboolean prevsender;
1492   GTimeVal current;
1493
1494   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1495   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1496
1497   if (!gst_rtp_buffer_validate (buffer))
1498     goto invalid_packet;
1499
1500   GST_DEBUG ("received RTP packet for sending");
1501
1502   RTP_SESSION_LOCK (sess);
1503   source = sess->source;
1504
1505   /* update last activity */
1506   g_get_current_time (&current);
1507   source->last_rtp_activity = GST_TIMEVAL_TO_TIME (current);
1508
1509   prevsender = RTP_SOURCE_IS_SENDER (source);
1510
1511   /* we use our own source to send */
1512   result = rtp_source_send_rtp (source, buffer, ntpnstime);
1513
1514   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
1515     sess->stats.sender_sources++;
1516   RTP_SESSION_UNLOCK (sess);
1517
1518   return result;
1519
1520   /* ERRORS */
1521 invalid_packet:
1522   {
1523     gst_buffer_unref (buffer);
1524     GST_DEBUG ("invalid RTP packet received");
1525     return GST_FLOW_OK;
1526   }
1527 }
1528
1529 static GstClockTime
1530 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
1531     gboolean first)
1532 {
1533   GstClockTime result;
1534
1535   if (sess->source->received_bye) {
1536     result = rtp_stats_calculate_bye_interval (&sess->stats);
1537   } else {
1538     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
1539         RTP_SOURCE_IS_SENDER (sess->source), first);
1540   }
1541
1542   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
1543       GST_TIME_ARGS (result), first);
1544
1545   if (!deterministic)
1546     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
1547
1548   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1549
1550   return result;
1551 }
1552
1553 /**
1554  * rtp_session_send_bye:
1555  * @sess: an #RTPSession
1556  * @reason: a reason or NULL
1557  *
1558  * Stop the current @sess and schedule a BYE message for the other members.
1559  *
1560  * Returns: a #GstFlowReturn.
1561  */
1562 GstFlowReturn
1563 rtp_session_send_bye (RTPSession * sess, const gchar * reason)
1564 {
1565   GstFlowReturn result = GST_FLOW_OK;
1566   RTPSource *source;
1567   GstClockTime current, interval;
1568   GTimeVal curtv;
1569
1570   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1571
1572   RTP_SESSION_LOCK (sess);
1573   source = sess->source;
1574
1575   /* ignore more BYEs */
1576   if (source->received_bye)
1577     goto done;
1578
1579   /* we have BYE now */
1580   source->received_bye = TRUE;
1581   /* at least one member wants to send a BYE */
1582   g_free (sess->bye_reason);
1583   sess->bye_reason = g_strdup (reason);
1584   sess->stats.avg_rtcp_packet_size = 100;
1585   sess->stats.bye_members = 1;
1586   sess->first_rtcp = TRUE;
1587   sess->sent_bye = FALSE;
1588
1589   /* get current time */
1590   g_get_current_time (&curtv);
1591   current = GST_TIMEVAL_TO_TIME (curtv);
1592
1593   /* reschedule transmission */
1594   sess->last_rtcp_send_time = current;
1595   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
1596   sess->next_rtcp_check_time = current + interval;
1597
1598   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
1599       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
1600
1601   /* notify app of reconsideration */
1602   if (sess->callbacks.reconsider)
1603     sess->callbacks.reconsider (sess, sess->user_data);
1604 done:
1605   RTP_SESSION_UNLOCK (sess);
1606
1607   return result;
1608 }
1609
1610 /**
1611  * rtp_session_next_timeout:
1612  * @sess: an #RTPSession
1613  * @time: the current system time
1614  *
1615  * Get the next time we should perform session maintenance tasks.
1616  *
1617  * Returns: a time when rtp_session_on_timeout() should be called with the
1618  * current system time.
1619  */
1620 GstClockTime
1621 rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
1622 {
1623   GstClockTime result;
1624
1625   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1626
1627   RTP_SESSION_LOCK (sess);
1628
1629   result = sess->next_rtcp_check_time;
1630
1631   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
1632       GST_TIME_ARGS (time), GST_TIME_ARGS (result));
1633
1634   if (result < time) {
1635     GST_DEBUG ("take current time as base");
1636     /* our previous check time expired, start counting from the current time
1637      * again. */
1638     result = time;
1639   }
1640
1641   if (sess->source->received_bye) {
1642     if (sess->sent_bye) {
1643       GST_DEBUG ("we sent BYE already");
1644       result = GST_CLOCK_TIME_NONE;
1645     } else if (sess->stats.active_sources >= 50) {
1646       GST_DEBUG ("reconsider BYE, more than 50 sources");
1647       /* reconsider BYE if members >= 50 */
1648       result += calculate_rtcp_interval (sess, FALSE, TRUE);
1649     }
1650   } else {
1651     if (sess->first_rtcp) {
1652       GST_DEBUG ("first RTCP packet");
1653       /* we are called for the first time */
1654       result += calculate_rtcp_interval (sess, FALSE, TRUE);
1655     } else if (sess->next_rtcp_check_time < time) {
1656       GST_DEBUG ("old check time expired, getting new timeout");
1657       /* get a new timeout when we need to */
1658       result += calculate_rtcp_interval (sess, FALSE, FALSE);
1659     }
1660   }
1661   sess->next_rtcp_check_time = result;
1662
1663   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1664   RTP_SESSION_UNLOCK (sess);
1665
1666   return result;
1667 }
1668
1669 typedef struct
1670 {
1671   RTPSession *sess;
1672   GstBuffer *rtcp;
1673   GstClockTime time;
1674   guint64 ntpnstime;
1675   GstClockTime interval;
1676   GstRTCPPacket packet;
1677   gboolean is_bye;
1678   gboolean has_sdes;
1679 } ReportData;
1680
1681 static void
1682 session_start_rtcp (RTPSession * sess, ReportData * data)
1683 {
1684   GstRTCPPacket *packet = &data->packet;
1685   RTPSource *own = sess->source;
1686
1687   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
1688
1689   if (RTP_SOURCE_IS_SENDER (own)) {
1690     guint64 ntptime;
1691     guint32 rtptime;
1692     guint32 packet_count, octet_count;
1693
1694     /* we are a sender, create SR */
1695     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
1696     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
1697
1698     /* get latest stats */
1699     rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime,
1700         &packet_count, &octet_count);
1701     /* store stats */
1702     rtp_source_process_sr (own, data->ntpnstime, ntptime, rtptime, packet_count,
1703         octet_count);
1704
1705     /* fill in sender report info */
1706     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
1707         ntptime, rtptime, packet_count, octet_count);
1708   } else {
1709     /* we are only receiver, create RR */
1710     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
1711     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
1712     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
1713   }
1714 }
1715
1716 /* construct a Sender or Receiver Report */
1717 static void
1718 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
1719 {
1720   RTPSession *sess = data->sess;
1721   GstRTCPPacket *packet = &data->packet;
1722
1723   /* create a new buffer if needed */
1724   if (data->rtcp == NULL) {
1725     session_start_rtcp (sess, data);
1726   }
1727   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
1728     /* only report about other sender sources */
1729     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
1730       guint8 fractionlost;
1731       gint32 packetslost;
1732       guint32 exthighestseq, jitter;
1733       guint32 lsr, dlsr;
1734
1735       /* get new stats */
1736       rtp_source_get_new_rb (source, data->time, &fractionlost, &packetslost,
1737           &exthighestseq, &jitter, &lsr, &dlsr);
1738
1739       /* packet is not yet filled, add report block for this source. */
1740       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
1741           exthighestseq, jitter, lsr, dlsr);
1742     }
1743   }
1744 }
1745
1746 /* perform cleanup of sources that timed out */
1747 static gboolean
1748 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
1749 {
1750   gboolean remove = FALSE;
1751   gboolean byetimeout = FALSE;
1752   gboolean is_sender, is_active;
1753   RTPSession *sess = data->sess;
1754   GstClockTime interval;
1755
1756   is_sender = RTP_SOURCE_IS_SENDER (source);
1757   is_active = RTP_SOURCE_IS_ACTIVE (source);
1758
1759   /* check for our own source, we don't want to delete our own source. */
1760   if (!(source == sess->source)) {
1761     if (source->received_bye) {
1762       /* if we received a BYE from the source, remove the source after some
1763        * time. */
1764       if (data->time > source->bye_time &&
1765           data->time - source->bye_time > sess->stats.bye_timeout) {
1766         GST_DEBUG ("removing BYE source %08x", source->ssrc);
1767         remove = TRUE;
1768         byetimeout = TRUE;
1769       }
1770     }
1771     /* sources that were inactive for more than 5 times the deterministic reporting
1772      * interval get timed out. the min timeout is 5 seconds. */
1773     if (data->time > source->last_activity) {
1774       interval = MAX (data->interval * 5, 5 * GST_SECOND);
1775       if (data->time - source->last_activity > interval) {
1776         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
1777             source->ssrc, GST_TIME_ARGS (source->last_activity));
1778         remove = TRUE;
1779       }
1780     }
1781   }
1782
1783   /* senders that did not send for a long time become a receiver, this also
1784    * holds for our own source. */
1785   if (is_sender) {
1786     if (data->time > source->last_rtp_activity) {
1787       interval = MAX (data->interval * 2, 5 * GST_SECOND);
1788       if (data->time - source->last_rtp_activity > interval) {
1789         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
1790             GST_TIME_FORMAT, source->ssrc,
1791             GST_TIME_ARGS (source->last_rtp_activity));
1792         source->is_sender = FALSE;
1793         sess->stats.sender_sources--;
1794       }
1795     }
1796   }
1797
1798   if (remove) {
1799     sess->total_sources--;
1800     if (is_sender)
1801       sess->stats.sender_sources--;
1802     if (is_active)
1803       sess->stats.active_sources--;
1804
1805     if (byetimeout)
1806       on_bye_timeout (sess, source);
1807     else
1808       on_timeout (sess, source);
1809   }
1810   return remove;
1811 }
1812
1813 static void
1814 session_sdes (RTPSession * sess, ReportData * data)
1815 {
1816   GstRTCPPacket *packet = &data->packet;
1817   guint8 *sdes_data;
1818   guint sdes_len;
1819
1820   /* add SDES packet */
1821   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
1822
1823   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
1824
1825   rtp_source_get_sdes (sess->source, GST_RTCP_SDES_CNAME, &sdes_data,
1826       &sdes_len);
1827   gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME, sdes_len,
1828       sdes_data);
1829
1830   /* other SDES items must only be added at regular intervals and only when the
1831    * user requests to since it might be a privacy problem */
1832 #if 0
1833   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME,
1834       strlen (sess->name), (guint8 *) sess->name);
1835   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
1836       strlen (sess->tool), (guint8 *) sess->tool);
1837 #endif
1838
1839   data->has_sdes = TRUE;
1840 }
1841
1842 /* schedule a BYE packet */
1843 static void
1844 session_bye (RTPSession * sess, ReportData * data)
1845 {
1846   GstRTCPPacket *packet = &data->packet;
1847
1848   /* open packet */
1849   session_start_rtcp (sess, data);
1850
1851   /* add SDES */
1852   session_sdes (sess, data);
1853
1854   /* add a BYE packet */
1855   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
1856   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
1857   if (sess->bye_reason)
1858     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
1859
1860   /* we have a BYE packet now */
1861   data->is_bye = TRUE;
1862 }
1863
1864 static gboolean
1865 is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
1866 {
1867   GstClockTime new_send_time, elapsed;
1868   gboolean result;
1869
1870   /* no need to check yet */
1871   if (sess->next_rtcp_check_time > time) {
1872     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
1873         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
1874         GST_TIME_ARGS (time));
1875     return FALSE;
1876   }
1877
1878   /* get elapsed time since we last reported */
1879   elapsed = time - sess->last_rtcp_send_time;
1880
1881   /* perform forward reconsideration */
1882   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
1883
1884   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
1885       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
1886
1887   new_send_time += sess->last_rtcp_send_time;
1888
1889   /* check if reconsideration */
1890   if (time < new_send_time) {
1891     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
1892         GST_TIME_ARGS (new_send_time));
1893     result = FALSE;
1894     /* store new check time */
1895     sess->next_rtcp_check_time = new_send_time;
1896   } else {
1897     result = TRUE;
1898     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
1899
1900     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
1901         GST_TIME_ARGS (new_send_time));
1902     sess->next_rtcp_check_time = time + new_send_time;
1903   }
1904   return result;
1905 }
1906
1907 /**
1908  * rtp_session_on_timeout:
1909  * @sess: an #RTPSession
1910  * @time: the current system time
1911  * @ntpnstime: the current NTP time in nanoseconds
1912  *
1913  * Perform maintenance actions after the timeout obtained with
1914  * rtp_session_next_timeout() expired.
1915  *
1916  * This function will perform timeouts of receivers and senders, send a BYE
1917  * packet or generate RTCP packets with current session stats.
1918  *
1919  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
1920  * times, for each packet that should be processed.
1921  *
1922  * Returns: a #GstFlowReturn.
1923  */
1924 GstFlowReturn
1925 rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime)
1926 {
1927   GstFlowReturn result = GST_FLOW_OK;
1928   ReportData data;
1929
1930   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1931
1932   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
1933       GST_TIME_ARGS (time), GST_TIME_ARGS (ntpnstime));
1934
1935   data.sess = sess;
1936   data.rtcp = NULL;
1937   data.time = time;
1938   data.ntpnstime = ntpnstime;
1939   data.is_bye = FALSE;
1940   data.has_sdes = FALSE;
1941
1942   RTP_SESSION_LOCK (sess);
1943   /* get a new interval, we need this for various cleanups etc */
1944   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
1945
1946   /* first perform cleanups */
1947   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
1948       (GHRFunc) session_cleanup, &data);
1949
1950   /* see if we need to generate SR or RR packets */
1951   if (is_rtcp_time (sess, time, &data)) {
1952     if (sess->source->received_bye) {
1953       /* generate BYE instead */
1954       session_bye (sess, &data);
1955       sess->sent_bye = TRUE;
1956     } else {
1957       /* loop over all known sources and do something */
1958       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1959           (GHFunc) session_report_blocks, &data);
1960     }
1961   }
1962
1963   if (data.rtcp) {
1964     guint size;
1965
1966     /* we keep track of the last report time in order to timeout inactive
1967      * receivers or senders */
1968     sess->last_rtcp_send_time = data.time;
1969     sess->first_rtcp = FALSE;
1970
1971     /* add SDES for this source when not already added */
1972     if (!data.has_sdes)
1973       session_sdes (sess, &data);
1974
1975     /* update average RTCP size before sending */
1976     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
1977     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
1978   }
1979   RTP_SESSION_UNLOCK (sess);
1980
1981   /* push out the RTCP packet */
1982   if (data.rtcp) {
1983     /* close the RTCP packet */
1984     gst_rtcp_buffer_end (data.rtcp);
1985
1986     if (sess->callbacks.send_rtcp)
1987       result = sess->callbacks.send_rtcp (sess, sess->source, data.rtcp,
1988           sess->user_data);
1989     else
1990       gst_buffer_unref (data.rtcp);
1991   }
1992
1993   return result;
1994 }