gst/rtpmanager/: Make it possible to use different user_data for each of the callbacks.
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26
27 #include "rtpsession.h"
28
29 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
30 #define GST_CAT_DEFAULT rtp_session_debug
31
32 /* signals and args */
33 enum
34 {
35   SIGNAL_ON_NEW_SSRC,
36   SIGNAL_ON_SSRC_COLLISION,
37   SIGNAL_ON_SSRC_VALIDATED,
38   SIGNAL_ON_SSRC_ACTIVE,
39   SIGNAL_ON_SSRC_SDES,
40   SIGNAL_ON_BYE_SSRC,
41   SIGNAL_ON_BYE_TIMEOUT,
42   SIGNAL_ON_TIMEOUT,
43   LAST_SIGNAL
44 };
45
46 #define DEFAULT_INTERNAL_SOURCE      NULL
47 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
48 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_BANDWIDTH
49 #define DEFAULT_SDES_CNAME           NULL
50 #define DEFAULT_SDES_NAME            NULL
51 #define DEFAULT_SDES_EMAIL           NULL
52 #define DEFAULT_SDES_PHONE           NULL
53 #define DEFAULT_SDES_LOCATION        NULL
54 #define DEFAULT_SDES_TOOL            NULL
55 #define DEFAULT_SDES_NOTE            NULL
56 #define DEFAULT_NUM_SOURCES          0
57 #define DEFAULT_NUM_ACTIVE_SOURCES   0
58
59 enum
60 {
61   PROP_0,
62   PROP_INTERNAL_SOURCE,
63   PROP_BANDWIDTH,
64   PROP_RTCP_FRACTION,
65   PROP_SDES_CNAME,
66   PROP_SDES_NAME,
67   PROP_SDES_EMAIL,
68   PROP_SDES_PHONE,
69   PROP_SDES_LOCATION,
70   PROP_SDES_TOOL,
71   PROP_SDES_NOTE,
72   PROP_NUM_SOURCES,
73   PROP_NUM_ACTIVE_SOURCES,
74   PROP_LAST
75 };
76
77 /* update average packet size, we keep this scaled by 16 to keep enough
78  * precision. */
79 #define UPDATE_AVG(avg, val)            \
80   if ((avg) == 0)                       \
81    (avg) = (val) << 4;                  \
82   else                                  \
83    (avg) = ((val) + (15 * (avg))) >> 4;
84
85 /* GObject vmethods */
86 static void rtp_session_finalize (GObject * object);
87 static void rtp_session_set_property (GObject * object, guint prop_id,
88     const GValue * value, GParamSpec * pspec);
89 static void rtp_session_get_property (GObject * object, guint prop_id,
90     GValue * value, GParamSpec * pspec);
91
92 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
93
94 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
95
96 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
97     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
98
99 static void
100 rtp_session_class_init (RTPSessionClass * klass)
101 {
102   GObjectClass *gobject_class;
103
104   gobject_class = (GObjectClass *) klass;
105
106   gobject_class->finalize = rtp_session_finalize;
107   gobject_class->set_property = rtp_session_set_property;
108   gobject_class->get_property = rtp_session_get_property;
109
110   /**
111    * RTPSession::on-new-ssrc:
112    * @session: the object which received the signal
113    * @src: the new RTPSource
114    *
115    * Notify of a new SSRC that entered @session.
116    */
117   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
118       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
119       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
120       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
121       RTP_TYPE_SOURCE);
122   /**
123    * RTPSession::on-ssrc-collision:
124    * @session: the object which received the signal
125    * @src: the #RTPSource that caused a collision
126    *
127    * Notify when we have an SSRC collision
128    */
129   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
130       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
131       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
132       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
133       RTP_TYPE_SOURCE);
134   /**
135    * RTPSession::on-ssrc-validated:
136    * @session: the object which received the signal
137    * @src: the new validated RTPSource
138    *
139    * Notify of a new SSRC that became validated.
140    */
141   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
142       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
143       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
144       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
145       RTP_TYPE_SOURCE);
146   /**
147    * RTPSession::on-ssrc-active:
148    * @session: the object which received the signal
149    * @src: the active RTPSource
150    *
151    * Notify of a SSRC that is active, i.e., sending RTCP.
152    */
153   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
154       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
155       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
156       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
157       RTP_TYPE_SOURCE);
158   /**
159    * RTPSession::on-ssrc-sdes:
160    * @session: the object which received the signal
161    * @src: the RTPSource
162    *
163    * Notify that a new SDES was received for SSRC.
164    */
165   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
166       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
167       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
168       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
169       RTP_TYPE_SOURCE);
170   /**
171    * RTPSession::on-bye-ssrc:
172    * @session: the object which received the signal
173    * @src: the RTPSource that went away
174    *
175    * Notify of an SSRC that became inactive because of a BYE packet.
176    */
177   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
178       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
179       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
180       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
181       RTP_TYPE_SOURCE);
182   /**
183    * RTPSession::on-bye-timeout:
184    * @session: the object which received the signal
185    * @src: the RTPSource that timed out
186    *
187    * Notify of an SSRC that has timed out because of BYE
188    */
189   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
190       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
191       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
192       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
193       RTP_TYPE_SOURCE);
194   /**
195    * RTPSession::on-timeout:
196    * @session: the object which received the signal
197    * @src: the RTPSource that timed out
198    *
199    * Notify of an SSRC that has timed out
200    */
201   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
202       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
203       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
204       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
205       RTP_TYPE_SOURCE);
206
207   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
208       g_param_spec_object ("internal-source", "Internal Source",
209           "The internal source element of the session",
210           RTP_TYPE_SOURCE, G_PARAM_READABLE));
211
212   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
213       g_param_spec_double ("bandwidth", "Bandwidth",
214           "The bandwidth of the session",
215           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH, G_PARAM_READWRITE));
216
217   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
218       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
219           "The fraction of the bandwidth used for RTCP",
220           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION, G_PARAM_READWRITE));
221
222   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
223       g_param_spec_string ("sdes-cname", "SDES CNAME",
224           "The CNAME to put in SDES messages of this session",
225           DEFAULT_SDES_CNAME, G_PARAM_READWRITE));
226
227   g_object_class_install_property (gobject_class, PROP_SDES_NAME,
228       g_param_spec_string ("sdes-name", "SDES NAME",
229           "The NAME to put in SDES messages of this session",
230           DEFAULT_SDES_NAME, G_PARAM_READWRITE));
231
232   g_object_class_install_property (gobject_class, PROP_SDES_EMAIL,
233       g_param_spec_string ("sdes-email", "SDES EMAIL",
234           "The EMAIL to put in SDES messages of this session",
235           DEFAULT_SDES_EMAIL, G_PARAM_READWRITE));
236
237   g_object_class_install_property (gobject_class, PROP_SDES_PHONE,
238       g_param_spec_string ("sdes-phone", "SDES PHONE",
239           "The PHONE to put in SDES messages of this session",
240           DEFAULT_SDES_PHONE, G_PARAM_READWRITE));
241
242   g_object_class_install_property (gobject_class, PROP_SDES_LOCATION,
243       g_param_spec_string ("sdes-location", "SDES LOCATION",
244           "The LOCATION to put in SDES messages of this session",
245           DEFAULT_SDES_LOCATION, G_PARAM_READWRITE));
246
247   g_object_class_install_property (gobject_class, PROP_SDES_TOOL,
248       g_param_spec_string ("sdes-tool", "SDES TOOL",
249           "The TOOL to put in SDES messages of this session",
250           DEFAULT_SDES_TOOL, G_PARAM_READWRITE));
251
252   g_object_class_install_property (gobject_class, PROP_SDES_NOTE,
253       g_param_spec_string ("sdes-note", "SDES NOTE",
254           "The NOTE to put in SDES messages of this session",
255           DEFAULT_SDES_NOTE, G_PARAM_READWRITE));
256
257   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
258       g_param_spec_uint ("num-sources", "Num Sources",
259           "The number of sources in the session", 0, G_MAXUINT,
260           DEFAULT_NUM_SOURCES, G_PARAM_READABLE));
261
262   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
263       g_param_spec_uint ("num-active-sources", "Num Active Sources",
264           "The number of active sources in the session", 0, G_MAXUINT,
265           DEFAULT_NUM_ACTIVE_SOURCES, G_PARAM_READABLE));
266
267   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
268 }
269
270 static void
271 rtp_session_init (RTPSession * sess)
272 {
273   gint i;
274   gchar *str;
275
276   sess->lock = g_mutex_new ();
277   sess->key = g_random_int ();
278   sess->mask_idx = 0;
279   sess->mask = 0;
280
281   for (i = 0; i < 32; i++) {
282     sess->ssrcs[i] =
283         g_hash_table_new_full (NULL, NULL, NULL,
284         (GDestroyNotify) g_object_unref);
285   }
286   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
287
288   rtp_stats_init_defaults (&sess->stats);
289
290   /* create an active SSRC for this session manager */
291   sess->source = rtp_session_create_source (sess);
292   sess->source->validated = TRUE;
293   sess->stats.active_sources++;
294
295   /* default UDP header length */
296   sess->header_len = 28;
297   sess->mtu = 1400;
298
299   /* some default SDES entries */
300   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
301   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
302   g_free (str);
303
304   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME,
305       g_get_real_name ());
306   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
307
308   sess->first_rtcp = TRUE;
309
310   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
311 }
312
313 static void
314 rtp_session_finalize (GObject * object)
315 {
316   RTPSession *sess;
317   gint i;
318
319   sess = RTP_SESSION_CAST (object);
320
321   g_mutex_free (sess->lock);
322   for (i = 0; i < 32; i++)
323     g_hash_table_destroy (sess->ssrcs[i]);
324
325   g_free (sess->bye_reason);
326
327   g_hash_table_destroy (sess->cnames);
328   g_object_unref (sess->source);
329
330   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
331 }
332
333 static void
334 rtp_session_set_property (GObject * object, guint prop_id,
335     const GValue * value, GParamSpec * pspec)
336 {
337   RTPSession *sess;
338
339   sess = RTP_SESSION (object);
340
341   switch (prop_id) {
342     case PROP_BANDWIDTH:
343       rtp_session_set_bandwidth (sess, g_value_get_double (value));
344       break;
345     case PROP_RTCP_FRACTION:
346       rtp_session_set_rtcp_fraction (sess, g_value_get_double (value));
347       break;
348     case PROP_SDES_CNAME:
349       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_CNAME,
350           g_value_get_string (value));
351       break;
352     case PROP_SDES_NAME:
353       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NAME,
354           g_value_get_string (value));
355       break;
356     case PROP_SDES_EMAIL:
357       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_EMAIL,
358           g_value_get_string (value));
359       break;
360     case PROP_SDES_PHONE:
361       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_PHONE,
362           g_value_get_string (value));
363       break;
364     case PROP_SDES_LOCATION:
365       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_LOC,
366           g_value_get_string (value));
367       break;
368     case PROP_SDES_TOOL:
369       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_TOOL,
370           g_value_get_string (value));
371       break;
372     case PROP_SDES_NOTE:
373       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NOTE,
374           g_value_get_string (value));
375       break;
376     default:
377       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
378       break;
379   }
380 }
381
382 static void
383 rtp_session_get_property (GObject * object, guint prop_id,
384     GValue * value, GParamSpec * pspec)
385 {
386   RTPSession *sess;
387
388   sess = RTP_SESSION (object);
389
390   switch (prop_id) {
391     case PROP_INTERNAL_SOURCE:
392       g_value_take_object (value, rtp_session_get_internal_source (sess));
393       break;
394     case PROP_BANDWIDTH:
395       g_value_set_double (value, rtp_session_get_bandwidth (sess));
396       break;
397     case PROP_RTCP_FRACTION:
398       g_value_set_double (value, rtp_session_get_rtcp_fraction (sess));
399       break;
400     case PROP_SDES_CNAME:
401       g_value_take_string (value, rtp_session_get_sdes_string (sess,
402               GST_RTCP_SDES_CNAME));
403       break;
404     case PROP_SDES_NAME:
405       g_value_take_string (value, rtp_session_get_sdes_string (sess,
406               GST_RTCP_SDES_NAME));
407       break;
408     case PROP_SDES_EMAIL:
409       g_value_take_string (value, rtp_session_get_sdes_string (sess,
410               GST_RTCP_SDES_EMAIL));
411       break;
412     case PROP_SDES_PHONE:
413       g_value_take_string (value, rtp_session_get_sdes_string (sess,
414               GST_RTCP_SDES_PHONE));
415       break;
416     case PROP_SDES_LOCATION:
417       g_value_take_string (value, rtp_session_get_sdes_string (sess,
418               GST_RTCP_SDES_LOC));
419       break;
420     case PROP_SDES_TOOL:
421       g_value_take_string (value, rtp_session_get_sdes_string (sess,
422               GST_RTCP_SDES_TOOL));
423       break;
424     case PROP_SDES_NOTE:
425       g_value_take_string (value, rtp_session_get_sdes_string (sess,
426               GST_RTCP_SDES_NOTE));
427       break;
428     case PROP_NUM_SOURCES:
429       g_value_set_uint (value, rtp_session_get_num_sources (sess));
430       break;
431     case PROP_NUM_ACTIVE_SOURCES:
432       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
433       break;
434     default:
435       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
436       break;
437   }
438 }
439
440 static void
441 on_new_ssrc (RTPSession * sess, RTPSource * source)
442 {
443   RTP_SESSION_UNLOCK (sess);
444   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
445   RTP_SESSION_LOCK (sess);
446 }
447
448 static void
449 on_ssrc_collision (RTPSession * sess, RTPSource * source)
450 {
451   RTP_SESSION_UNLOCK (sess);
452   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
453       source);
454   RTP_SESSION_LOCK (sess);
455 }
456
457 static void
458 on_ssrc_validated (RTPSession * sess, RTPSource * source)
459 {
460   RTP_SESSION_UNLOCK (sess);
461   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
462       source);
463   RTP_SESSION_LOCK (sess);
464 }
465
466 static void
467 on_ssrc_active (RTPSession * sess, RTPSource * source)
468 {
469   RTP_SESSION_UNLOCK (sess);
470   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
471   RTP_SESSION_LOCK (sess);
472 }
473
474 static void
475 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
476 {
477   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
478   RTP_SESSION_UNLOCK (sess);
479   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
480   RTP_SESSION_LOCK (sess);
481 }
482
483 static void
484 on_bye_ssrc (RTPSession * sess, RTPSource * source)
485 {
486   RTP_SESSION_UNLOCK (sess);
487   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
488   RTP_SESSION_LOCK (sess);
489 }
490
491 static void
492 on_bye_timeout (RTPSession * sess, RTPSource * source)
493 {
494   RTP_SESSION_UNLOCK (sess);
495   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
496   RTP_SESSION_LOCK (sess);
497 }
498
499 static void
500 on_timeout (RTPSession * sess, RTPSource * source)
501 {
502   RTP_SESSION_UNLOCK (sess);
503   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
504   RTP_SESSION_LOCK (sess);
505 }
506
507 /**
508  * rtp_session_new:
509  *
510  * Create a new session object.
511  *
512  * Returns: a new #RTPSession. g_object_unref() after usage.
513  */
514 RTPSession *
515 rtp_session_new (void)
516 {
517   RTPSession *sess;
518
519   sess = g_object_new (RTP_TYPE_SESSION, NULL);
520
521   return sess;
522 }
523
524 /**
525  * rtp_session_set_callbacks:
526  * @sess: an #RTPSession
527  * @callbacks: callbacks to configure
528  * @user_data: user data passed in the callbacks
529  *
530  * Configure a set of callbacks to be notified of actions.
531  */
532 void
533 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
534     gpointer user_data)
535 {
536   g_return_if_fail (RTP_IS_SESSION (sess));
537
538   if (callbacks->process_rtp) {
539     sess->callbacks.process_rtp = callbacks->process_rtp;
540     sess->process_rtp_user_data = user_data;
541   }
542   if (callbacks->send_rtp) {
543     sess->callbacks.send_rtp = callbacks->send_rtp;
544     sess->send_rtp_user_data = user_data;
545   }
546   if (callbacks->send_rtcp) {
547     sess->callbacks.send_rtcp = callbacks->send_rtcp;
548     sess->send_rtcp_user_data = user_data;
549   }
550   if (callbacks->sync_rtcp) {
551     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
552     sess->sync_rtcp_user_data = user_data;
553   }
554   if (callbacks->clock_rate) {
555     sess->callbacks.clock_rate = callbacks->clock_rate;
556     sess->clock_rate_user_data = user_data;
557   }
558   if (callbacks->reconsider) {
559     sess->callbacks.reconsider = callbacks->reconsider;
560     sess->reconsider_user_data = user_data;
561   }
562 }
563
564 /**
565  * rtp_session_set_process_rtp_callback:
566  * @sess: an #RTPSession
567  * @callback: callback to set
568  * @user_data: user data passed in the callback
569  *
570  * Configure only the process_rtp callback to be notified of the process_rtp action.
571  */
572 void
573 rtp_session_set_process_rtp_callback (RTPSession * sess,
574     RTPSessionProcessRTP callback, gpointer user_data)
575 {
576   g_return_if_fail (RTP_IS_SESSION (sess));
577
578   sess->callbacks.process_rtp = callback;
579   sess->process_rtp_user_data = user_data;
580 }
581
582 /**
583  * rtp_session_set_send_rtp_callback:
584  * @sess: an #RTPSession
585  * @callback: callback to set
586  * @user_data: user data passed in the callback
587  *
588  * Configure only the send_rtp callback to be notified of the send_rtp action.
589  */
590 void
591 rtp_session_set_send_rtp_callback (RTPSession * sess,
592     RTPSessionSendRTP callback, gpointer user_data)
593 {
594   g_return_if_fail (RTP_IS_SESSION (sess));
595
596   sess->callbacks.send_rtp = callback;
597   sess->send_rtp_user_data = user_data;
598 }
599
600 /**
601  * rtp_session_set_send_rtcp_callback:
602  * @sess: an #RTPSession
603  * @callback: callback to set
604  * @user_data: user data passed in the callback
605  *
606  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
607  */
608 void
609 rtp_session_set_send_rtcp_callback (RTPSession * sess,
610     RTPSessionSendRTCP callback, gpointer user_data)
611 {
612   g_return_if_fail (RTP_IS_SESSION (sess));
613
614   sess->callbacks.send_rtcp = callback;
615   sess->send_rtcp_user_data = user_data;
616 }
617
618 /**
619  * rtp_session_set_sync_rtcp_callback:
620  * @sess: an #RTPSession
621  * @callback: callback to set
622  * @user_data: user data passed in the callback
623  *
624  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
625  */
626 void
627 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
628     RTPSessionSyncRTCP callback, gpointer user_data)
629 {
630   g_return_if_fail (RTP_IS_SESSION (sess));
631
632   sess->callbacks.sync_rtcp = callback;
633   sess->sync_rtcp_user_data = user_data;
634 }
635
636 /**
637  * rtp_session_set_clock_rate_callback:
638  * @sess: an #RTPSession
639  * @callback: callback to set
640  * @user_data: user data passed in the callback
641  *
642  * Configure only the clock_rate callback to be notified of the clock_rate action.
643  */
644 void
645 rtp_session_set_clock_rate_callback (RTPSession * sess,
646     RTPSessionClockRate callback, gpointer user_data)
647 {
648   g_return_if_fail (RTP_IS_SESSION (sess));
649
650   sess->callbacks.clock_rate = callback;
651   sess->clock_rate_user_data = user_data;
652 }
653
654 /**
655  * rtp_session_set_reconsider_callback:
656  * @sess: an #RTPSession
657  * @callback: callback to set
658  * @user_data: user data passed in the callback
659  *
660  * Configure only the reconsider callback to be notified of the reconsider action.
661  */
662 void
663 rtp_session_set_reconsider_callback (RTPSession * sess,
664     RTPSessionReconsider callback, gpointer user_data)
665 {
666   g_return_if_fail (RTP_IS_SESSION (sess));
667
668   sess->callbacks.reconsider = callback;
669   sess->reconsider_user_data = user_data;
670 }
671
672 /**
673  * rtp_session_set_bandwidth:
674  * @sess: an #RTPSession
675  * @bandwidth: the bandwidth allocated
676  *
677  * Set the session bandwidth in bytes per second.
678  */
679 void
680 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
681 {
682   g_return_if_fail (RTP_IS_SESSION (sess));
683
684   RTP_SESSION_LOCK (sess);
685   sess->stats.bandwidth = bandwidth;
686   RTP_SESSION_UNLOCK (sess);
687 }
688
689 /**
690  * rtp_session_get_bandwidth:
691  * @sess: an #RTPSession
692  *
693  * Get the session bandwidth.
694  *
695  * Returns: the session bandwidth.
696  */
697 gdouble
698 rtp_session_get_bandwidth (RTPSession * sess)
699 {
700   gdouble result;
701
702   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
703
704   RTP_SESSION_LOCK (sess);
705   result = sess->stats.bandwidth;
706   RTP_SESSION_UNLOCK (sess);
707
708   return result;
709 }
710
711 /**
712  * rtp_session_set_rtcp_fraction:
713  * @sess: an #RTPSession
714  * @bandwidth: the RTCP bandwidth
715  *
716  * Set the bandwidth that should be used for RTCP
717  * messages.
718  */
719 void
720 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
721 {
722   g_return_if_fail (RTP_IS_SESSION (sess));
723
724   RTP_SESSION_LOCK (sess);
725   sess->stats.rtcp_bandwidth = bandwidth;
726   RTP_SESSION_UNLOCK (sess);
727 }
728
729 /**
730  * rtp_session_get_rtcp_fraction:
731  * @sess: an #RTPSession
732  *
733  * Get the session bandwidth used for RTCP.
734  *
735  * Returns: The bandwidth used for RTCP messages.
736  */
737 gdouble
738 rtp_session_get_rtcp_fraction (RTPSession * sess)
739 {
740   gdouble result;
741
742   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
743
744   RTP_SESSION_LOCK (sess);
745   result = sess->stats.rtcp_bandwidth;
746   RTP_SESSION_UNLOCK (sess);
747
748   return result;
749 }
750
751 /**
752  * rtp_session_set_sdes_string:
753  * @sess: an #RTPSession
754  * @type: the type of the SDES item
755  * @item: a null-terminated string to set. 
756  *
757  * Store an SDES item of @type in @sess. 
758  *
759  * Returns: %FALSE if the data was unchanged @type is invalid.
760  */
761 gboolean
762 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
763     const gchar * item)
764 {
765   gboolean result;
766
767   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
768
769   RTP_SESSION_LOCK (sess);
770   result = rtp_source_set_sdes_string (sess->source, type, item);
771   RTP_SESSION_UNLOCK (sess);
772
773   return result;
774 }
775
776 /**
777  * rtp_session_get_sdes_string:
778  * @sess: an #RTPSession
779  * @type: the type of the SDES item
780  *
781  * Get the SDES item of @type from @sess. 
782  *
783  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
784  * valid. g_free() after usage.
785  */
786 gchar *
787 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
788 {
789   gchar *result;
790
791   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
792
793   RTP_SESSION_LOCK (sess);
794   result = rtp_source_get_sdes_string (sess->source, type);
795   RTP_SESSION_UNLOCK (sess);
796
797   return result;
798 }
799
800 static GstFlowReturn
801 source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
802 {
803   GstFlowReturn result = GST_FLOW_OK;
804
805   if (source == session->source) {
806     GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
807
808     RTP_SESSION_UNLOCK (session);
809
810     if (session->callbacks.send_rtp)
811       result =
812           session->callbacks.send_rtp (session, source, buffer,
813           session->send_rtp_user_data);
814     else
815       gst_buffer_unref (buffer);
816
817   } else {
818     GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
819     RTP_SESSION_UNLOCK (session);
820
821     if (session->callbacks.process_rtp)
822       result =
823           session->callbacks.process_rtp (session, source, buffer,
824           session->process_rtp_user_data);
825     else
826       gst_buffer_unref (buffer);
827   }
828   RTP_SESSION_LOCK (session);
829
830   return result;
831 }
832
833 static gint
834 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
835 {
836   gint result;
837
838   if (session->callbacks.clock_rate)
839     result =
840         session->callbacks.clock_rate (session, pt,
841         session->clock_rate_user_data);
842   else
843     result = -1;
844
845   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
846
847   return result;
848 }
849
850 static RTPSourceCallbacks callbacks = {
851   (RTPSourcePushRTP) source_push_rtp,
852   (RTPSourceClockRate) source_clock_rate,
853 };
854
855 static gboolean
856 check_collision (RTPSession * sess, RTPSource * source,
857     RTPArrivalStats * arrival)
858 {
859   /* FIXME, do collision check */
860   return FALSE;
861 }
862
863 /* must be called with the session lock */
864 static RTPSource *
865 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
866     RTPArrivalStats * arrival, gboolean rtp)
867 {
868   RTPSource *source;
869
870   source =
871       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
872   if (source == NULL) {
873     /* make new Source in probation and insert */
874     source = rtp_source_new (ssrc);
875
876     /* for RTP packets we need to set the source in probation. Receiving RTCP
877      * packets of an SSRC, on the other hand, is a strong indication that we
878      * are dealing with a valid source. */
879     if (rtp)
880       source->probation = RTP_DEFAULT_PROBATION;
881     else
882       source->probation = 0;
883
884     /* store from address, if any */
885     if (arrival->have_address) {
886       if (rtp)
887         rtp_source_set_rtp_from (source, &arrival->address);
888       else
889         rtp_source_set_rtcp_from (source, &arrival->address);
890     }
891
892     /* configure a callback on the source */
893     rtp_source_set_callbacks (source, &callbacks, sess);
894
895     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
896         source);
897
898     /* we have one more source now */
899     sess->total_sources++;
900     *created = TRUE;
901   } else {
902     *created = FALSE;
903     /* check for collision, this updates the address when not previously set */
904     if (check_collision (sess, source, arrival))
905       on_ssrc_collision (sess, source);
906   }
907   /* update last activity */
908   source->last_activity = arrival->time;
909   if (rtp)
910     source->last_rtp_activity = arrival->time;
911
912   return source;
913 }
914
915 /**
916  * rtp_session_get_internal_source:
917  * @sess: a #RTPSession
918  *
919  * Get the internal #RTPSource of @session.
920  *
921  * Returns: The internal #RTPSource. g_object_unref() after usage.
922  */
923 RTPSource *
924 rtp_session_get_internal_source (RTPSession * sess)
925 {
926   RTPSource *result;
927
928   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
929
930   result = g_object_ref (sess->source);
931
932   return result;
933 }
934
935 /**
936  * rtp_session_add_source:
937  * @sess: a #RTPSession
938  * @src: #RTPSource to add
939  *
940  * Add @src to @session.
941  *
942  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
943  * existed in the session.
944  */
945 gboolean
946 rtp_session_add_source (RTPSession * sess, RTPSource * src)
947 {
948   gboolean result = FALSE;
949   RTPSource *find;
950
951   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
952   g_return_val_if_fail (src != NULL, FALSE);
953
954   RTP_SESSION_LOCK (sess);
955   find =
956       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
957       GINT_TO_POINTER (src->ssrc));
958   if (find == NULL) {
959     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
960         GINT_TO_POINTER (src->ssrc), src);
961     /* we have one more source now */
962     sess->total_sources++;
963     result = TRUE;
964   }
965   RTP_SESSION_UNLOCK (sess);
966
967   return result;
968 }
969
970 /**
971  * rtp_session_get_num_sources:
972  * @sess: an #RTPSession
973  *
974  * Get the number of sources in @sess.
975  *
976  * Returns: The number of sources in @sess.
977  */
978 guint
979 rtp_session_get_num_sources (RTPSession * sess)
980 {
981   guint result;
982
983   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
984
985   RTP_SESSION_LOCK (sess);
986   result = sess->total_sources;
987   RTP_SESSION_UNLOCK (sess);
988
989   return result;
990 }
991
992 /**
993  * rtp_session_get_num_active_sources:
994  * @sess: an #RTPSession
995  *
996  * Get the number of active sources in @sess. A source is considered active when
997  * it has been validated and has not yet received a BYE RTCP message.
998  *
999  * Returns: The number of active sources in @sess.
1000  */
1001 guint
1002 rtp_session_get_num_active_sources (RTPSession * sess)
1003 {
1004   guint result;
1005
1006   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1007
1008   RTP_SESSION_LOCK (sess);
1009   result = sess->stats.active_sources;
1010   RTP_SESSION_UNLOCK (sess);
1011
1012   return result;
1013 }
1014
1015 /**
1016  * rtp_session_get_source_by_ssrc:
1017  * @sess: an #RTPSession
1018  * @ssrc: an SSRC
1019  *
1020  * Find the source with @ssrc in @sess.
1021  *
1022  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1023  * g_object_unref() after usage.
1024  */
1025 RTPSource *
1026 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1027 {
1028   RTPSource *result;
1029
1030   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1031
1032   RTP_SESSION_LOCK (sess);
1033   result =
1034       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1035   if (result)
1036     g_object_ref (result);
1037   RTP_SESSION_UNLOCK (sess);
1038
1039   return result;
1040 }
1041
1042 /**
1043  * rtp_session_get_source_by_cname:
1044  * @sess: a #RTPSession
1045  * @cname: an CNAME
1046  *
1047  * Find the source with @cname in @sess.
1048  *
1049  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
1050  * g_object_unref() after usage.
1051  */
1052 RTPSource *
1053 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
1054 {
1055   RTPSource *result;
1056
1057   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1058   g_return_val_if_fail (cname != NULL, NULL);
1059
1060   RTP_SESSION_LOCK (sess);
1061   result = g_hash_table_lookup (sess->cnames, cname);
1062   if (result)
1063     g_object_ref (result);
1064   RTP_SESSION_UNLOCK (sess);
1065
1066   return result;
1067 }
1068
1069 /**
1070  * rtp_session_create_source:
1071  * @sess: an #RTPSession
1072  *
1073  * Create an #RTPSource for use in @sess. This function will create a source
1074  * with an ssrc that is currently not used by any participants in the session.
1075  *
1076  * Returns: an #RTPSource.
1077  */
1078 RTPSource *
1079 rtp_session_create_source (RTPSession * sess)
1080 {
1081   guint32 ssrc;
1082   RTPSource *source;
1083
1084   RTP_SESSION_LOCK (sess);
1085   while (TRUE) {
1086     ssrc = g_random_int ();
1087
1088     /* see if it exists in the session, we're done if it doesn't */
1089     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1090             GINT_TO_POINTER (ssrc)) == NULL)
1091       break;
1092   }
1093   source = rtp_source_new (ssrc);
1094   g_object_ref (source);
1095   rtp_source_set_callbacks (source, &callbacks, sess);
1096   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1097       source);
1098   /* we have one more source now */
1099   sess->total_sources++;
1100   RTP_SESSION_UNLOCK (sess);
1101
1102   return source;
1103 }
1104
1105 /* update the RTPArrivalStats structure with the current time and other bits
1106  * about the current buffer we are handling.
1107  * This function is typically called when a validated packet is received.
1108  * This function should be called with the SESSION_LOCK
1109  */
1110 static void
1111 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1112     gboolean rtp, GstBuffer * buffer, guint64 ntpnstime)
1113 {
1114   GTimeVal current;
1115
1116   /* get time of arrival */
1117   g_get_current_time (&current);
1118   arrival->time = GST_TIMEVAL_TO_TIME (current);
1119   arrival->ntpnstime = ntpnstime;
1120
1121   /* get packet size including header overhead */
1122   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
1123
1124   if (rtp) {
1125     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1126   } else {
1127     arrival->payload_len = 0;
1128   }
1129
1130   /* for netbuffer we can store the IP address to check for collisions */
1131   arrival->have_address = GST_IS_NETBUFFER (buffer);
1132   if (arrival->have_address) {
1133     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1134
1135     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1136   }
1137 }
1138
1139 /**
1140  * rtp_session_process_rtp:
1141  * @sess: and #RTPSession
1142  * @buffer: an RTP buffer
1143  * @ntpnstime: the NTP arrival time in nanoseconds
1144  *
1145  * Process an RTP buffer in the session manager. This function takes ownership
1146  * of @buffer.
1147  *
1148  * Returns: a #GstFlowReturn.
1149  */
1150 GstFlowReturn
1151 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1152     guint64 ntpnstime)
1153 {
1154   GstFlowReturn result;
1155   guint32 ssrc;
1156   RTPSource *source;
1157   gboolean created;
1158   gboolean prevsender, prevactive;
1159   RTPArrivalStats arrival;
1160
1161   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1162   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1163
1164   if (!gst_rtp_buffer_validate (buffer))
1165     goto invalid_packet;
1166
1167   RTP_SESSION_LOCK (sess);
1168   /* update arrival stats */
1169   update_arrival_stats (sess, &arrival, TRUE, buffer, ntpnstime);
1170
1171   /* ignore more RTP packets when we left the session */
1172   if (sess->source->received_bye)
1173     goto ignore;
1174
1175   /* get SSRC and look up in session database */
1176   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1177   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1178
1179   prevsender = RTP_SOURCE_IS_SENDER (source);
1180   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1181
1182   /* we need to ref so that we can process the CSRCs later */
1183   gst_buffer_ref (buffer);
1184
1185   /* let source process the packet */
1186   result = rtp_source_process_rtp (source, buffer, &arrival);
1187
1188   /* source became active */
1189   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1190     sess->stats.active_sources++;
1191     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1192         sess->stats.active_sources);
1193     on_ssrc_validated (sess, source);
1194   }
1195   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1196     sess->stats.sender_sources++;
1197     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1198         sess->stats.sender_sources);
1199   }
1200
1201   if (created)
1202     on_new_ssrc (sess, source);
1203
1204   if (source->validated) {
1205     guint8 i, count;
1206     gboolean created;
1207
1208     /* for validated sources, we add the CSRCs as well */
1209     count = gst_rtp_buffer_get_csrc_count (buffer);
1210
1211     for (i = 0; i < count; i++) {
1212       guint32 csrc;
1213       RTPSource *csrc_src;
1214
1215       csrc = gst_rtp_buffer_get_csrc (buffer, i);
1216
1217       /* get source */
1218       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1219
1220       if (created) {
1221         GST_DEBUG ("created new CSRC: %08x", csrc);
1222         rtp_source_set_as_csrc (csrc_src);
1223         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1224           sess->stats.active_sources++;
1225         on_new_ssrc (sess, source);
1226       }
1227     }
1228   }
1229   gst_buffer_unref (buffer);
1230
1231   RTP_SESSION_UNLOCK (sess);
1232
1233   return result;
1234
1235   /* ERRORS */
1236 invalid_packet:
1237   {
1238     gst_buffer_unref (buffer);
1239     GST_DEBUG ("invalid RTP packet received");
1240     return GST_FLOW_OK;
1241   }
1242 ignore:
1243   {
1244     gst_buffer_unref (buffer);
1245     RTP_SESSION_UNLOCK (sess);
1246     GST_DEBUG ("ignoring RTP packet because we are leaving");
1247     return GST_FLOW_OK;
1248   }
1249 }
1250
1251 static void
1252 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1253     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1254 {
1255   guint count, i;
1256
1257   count = gst_rtcp_packet_get_rb_count (packet);
1258   for (i = 0; i < count; i++) {
1259     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1260     guint8 fractionlost;
1261     gint32 packetslost;
1262
1263     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1264         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1265
1266     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1267
1268     if (ssrc == sess->source->ssrc) {
1269       /* only deal with report blocks for our session, we update the stats of
1270        * the sender of the RTCP message. We could also compare our stats against
1271        * the other sender to see if we are better or worse. */
1272       rtp_source_process_rb (source, arrival->time, fractionlost, packetslost,
1273           exthighestseq, jitter, lsr, dlsr);
1274
1275       on_ssrc_active (sess, source);
1276     }
1277   }
1278 }
1279
1280 /* A Sender report contains statistics about how the sender is doing. This
1281  * includes timing informataion such as the relation between RTP and NTP
1282  * timestamps and the number of packets/bytes it sent to us.
1283  *
1284  * In this report is also included a set of report blocks related to how this
1285  * sender is receiving data (in case we (or somebody else) is also sending stuff
1286  * to it). This info includes the packet loss, jitter and seqnum. It also
1287  * contains information to calculate the round trip time (LSR/DLSR).
1288  */
1289 static void
1290 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1291     RTPArrivalStats * arrival)
1292 {
1293   guint32 senderssrc, rtptime, packet_count, octet_count;
1294   guint64 ntptime;
1295   RTPSource *source;
1296   gboolean created, prevsender;
1297
1298   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1299       &packet_count, &octet_count);
1300
1301   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1302       senderssrc, GST_TIME_ARGS (arrival->time));
1303
1304   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1305
1306   GST_BUFFER_OFFSET (packet->buffer) = source->clock_base;
1307
1308   prevsender = RTP_SOURCE_IS_SENDER (source);
1309
1310   /* first update the source */
1311   rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count,
1312       octet_count);
1313
1314   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1315     sess->stats.sender_sources++;
1316     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1317         sess->stats.sender_sources);
1318   }
1319
1320   if (created)
1321     on_new_ssrc (sess, source);
1322
1323   rtp_session_process_rb (sess, source, packet, arrival);
1324 }
1325
1326 /* A receiver report contains statistics about how a receiver is doing. It
1327  * includes stuff like packet loss, jitter and the seqnum it received last. It
1328  * also contains info to calculate the round trip time.
1329  *
1330  * We are only interested in how the sender of this report is doing wrt to us.
1331  */
1332 static void
1333 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1334     RTPArrivalStats * arrival)
1335 {
1336   guint32 senderssrc;
1337   RTPSource *source;
1338   gboolean created;
1339
1340   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1341
1342   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1343
1344   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1345
1346   if (created)
1347     on_new_ssrc (sess, source);
1348
1349   rtp_session_process_rb (sess, source, packet, arrival);
1350 }
1351
1352 /* Get SDES items and store them in the SSRC */
1353 static void
1354 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1355     RTPArrivalStats * arrival)
1356 {
1357   guint items, i, j;
1358   gboolean more_items, more_entries;
1359
1360   items = gst_rtcp_packet_sdes_get_item_count (packet);
1361   GST_DEBUG ("got SDES packet with %d items", items);
1362
1363   more_items = gst_rtcp_packet_sdes_first_item (packet);
1364   i = 0;
1365   while (more_items) {
1366     guint32 ssrc;
1367     gboolean changed, created;
1368     RTPSource *source;
1369
1370     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1371
1372     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1373
1374     /* find src, no probation when dealing with RTCP */
1375     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1376     changed = FALSE;
1377
1378     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1379     j = 0;
1380     while (more_entries) {
1381       GstRTCPSDESType type;
1382       guint8 len;
1383       guint8 *data;
1384
1385       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1386
1387       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1388           data);
1389
1390       changed |= rtp_source_set_sdes (source, type, data, len);
1391
1392       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1393       j++;
1394     }
1395
1396     if (created)
1397       on_new_ssrc (sess, source);
1398     if (changed)
1399       on_ssrc_sdes (sess, source);
1400
1401     more_items = gst_rtcp_packet_sdes_next_item (packet);
1402     i++;
1403   }
1404 }
1405
1406 /* BYE is sent when a client leaves the session
1407  */
1408 static void
1409 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1410     RTPArrivalStats * arrival)
1411 {
1412   guint count, i;
1413   gchar *reason;
1414
1415   reason = gst_rtcp_packet_bye_get_reason (packet);
1416   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1417
1418   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1419   for (i = 0; i < count; i++) {
1420     guint32 ssrc;
1421     RTPSource *source;
1422     gboolean created, prevactive, prevsender;
1423     guint pmembers, members;
1424
1425     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1426     GST_DEBUG ("SSRC: %08x", ssrc);
1427
1428     /* find src and mark bye, no probation when dealing with RTCP */
1429     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1430
1431     /* store time for when we need to time out this source */
1432     source->bye_time = arrival->time;
1433
1434     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1435     prevsender = RTP_SOURCE_IS_SENDER (source);
1436
1437     /* let the source handle the rest */
1438     rtp_source_process_bye (source, reason);
1439
1440     pmembers = sess->stats.active_sources;
1441
1442     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1443       sess->stats.active_sources--;
1444       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1445           sess->stats.active_sources);
1446     }
1447     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1448       sess->stats.sender_sources--;
1449       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1450           sess->stats.sender_sources);
1451     }
1452     members = sess->stats.active_sources;
1453
1454     if (!sess->source->received_bye && members < pmembers) {
1455       /* some members went away since the previous timeout estimate.
1456        * Perform reverse reconsideration but only when we are not scheduling a
1457        * BYE ourselves. */
1458       if (arrival->time < sess->next_rtcp_check_time) {
1459         GstClockTime time_remaining;
1460
1461         time_remaining = sess->next_rtcp_check_time - arrival->time;
1462         sess->next_rtcp_check_time =
1463             gst_util_uint64_scale (time_remaining, members, pmembers);
1464
1465         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1466             GST_TIME_ARGS (sess->next_rtcp_check_time));
1467
1468         sess->next_rtcp_check_time += arrival->time;
1469
1470         /* notify app of reconsideration */
1471         if (sess->callbacks.reconsider)
1472           sess->callbacks.reconsider (sess, sess->reconsider_user_data);
1473       }
1474     }
1475
1476     if (created)
1477       on_new_ssrc (sess, source);
1478
1479     on_bye_ssrc (sess, source);
1480   }
1481   g_free (reason);
1482 }
1483
1484 static void
1485 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1486     RTPArrivalStats * arrival)
1487 {
1488   GST_DEBUG ("received APP");
1489 }
1490
1491 /**
1492  * rtp_session_process_rtcp:
1493  * @sess: and #RTPSession
1494  * @buffer: an RTCP buffer
1495  *
1496  * Process an RTCP buffer in the session manager. This function takes ownership
1497  * of @buffer.
1498  *
1499  * Returns: a #GstFlowReturn.
1500  */
1501 GstFlowReturn
1502 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
1503 {
1504   GstRTCPPacket packet;
1505   gboolean more, is_bye = FALSE, is_sr = FALSE;
1506   RTPArrivalStats arrival;
1507   GstFlowReturn result = GST_FLOW_OK;
1508
1509   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1510   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1511
1512   if (!gst_rtcp_buffer_validate (buffer))
1513     goto invalid_packet;
1514
1515   GST_DEBUG ("received RTCP packet");
1516
1517   RTP_SESSION_LOCK (sess);
1518   /* update arrival stats */
1519   update_arrival_stats (sess, &arrival, FALSE, buffer, -1);
1520
1521   if (sess->sent_bye)
1522     goto ignore;
1523
1524   /* start processing the compound packet */
1525   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1526   while (more) {
1527     GstRTCPType type;
1528
1529     type = gst_rtcp_packet_get_type (&packet);
1530
1531     /* when we are leaving the session, we should ignore all non-BYE messages */
1532     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1533       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1534       goto next;
1535     }
1536
1537     switch (type) {
1538       case GST_RTCP_TYPE_SR:
1539         rtp_session_process_sr (sess, &packet, &arrival);
1540         is_sr = TRUE;
1541         break;
1542       case GST_RTCP_TYPE_RR:
1543         rtp_session_process_rr (sess, &packet, &arrival);
1544         break;
1545       case GST_RTCP_TYPE_SDES:
1546         rtp_session_process_sdes (sess, &packet, &arrival);
1547         break;
1548       case GST_RTCP_TYPE_BYE:
1549         is_bye = TRUE;
1550         rtp_session_process_bye (sess, &packet, &arrival);
1551         break;
1552       case GST_RTCP_TYPE_APP:
1553         rtp_session_process_app (sess, &packet, &arrival);
1554         break;
1555       default:
1556         GST_WARNING ("got unknown RTCP packet");
1557         break;
1558     }
1559   next:
1560     more = gst_rtcp_packet_move_to_next (&packet);
1561   }
1562
1563   /* if we are scheduling a BYE, we only want to count bye packets, else we
1564    * count everything */
1565   if (sess->source->received_bye) {
1566     if (is_bye) {
1567       sess->stats.bye_members++;
1568       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1569     }
1570   } else {
1571     /* keep track of average packet size */
1572     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1573   }
1574   RTP_SESSION_UNLOCK (sess);
1575
1576   /* notify caller of sr packets in the callback */
1577   if (is_sr && sess->callbacks.sync_rtcp)
1578     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
1579         sess->sync_rtcp_user_data);
1580   else
1581     gst_buffer_unref (buffer);
1582
1583   return result;
1584
1585   /* ERRORS */
1586 invalid_packet:
1587   {
1588     GST_DEBUG ("invalid RTCP packet received");
1589     gst_buffer_unref (buffer);
1590     return GST_FLOW_OK;
1591   }
1592 ignore:
1593   {
1594     gst_buffer_unref (buffer);
1595     RTP_SESSION_UNLOCK (sess);
1596     GST_DEBUG ("ignoring RTP packet because we left");
1597     return GST_FLOW_OK;
1598   }
1599 }
1600
1601 /**
1602  * rtp_session_send_rtp:
1603  * @sess: an #RTPSession
1604  * @buffer: an RTP buffer
1605  * @ntpnstime: the NTP time in nanoseconds of when this buffer was captured.
1606  *
1607  * Send the RTP buffer in the session manager. This function takes ownership of
1608  * @buffer.
1609  *
1610  * Returns: a #GstFlowReturn.
1611  */
1612 GstFlowReturn
1613 rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntpnstime)
1614 {
1615   GstFlowReturn result;
1616   RTPSource *source;
1617   gboolean prevsender;
1618   GTimeVal current;
1619
1620   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1621   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1622
1623   if (!gst_rtp_buffer_validate (buffer))
1624     goto invalid_packet;
1625
1626   GST_DEBUG ("received RTP packet for sending");
1627
1628   RTP_SESSION_LOCK (sess);
1629   source = sess->source;
1630
1631   /* update last activity */
1632   g_get_current_time (&current);
1633   source->last_rtp_activity = GST_TIMEVAL_TO_TIME (current);
1634
1635   prevsender = RTP_SOURCE_IS_SENDER (source);
1636
1637   /* we use our own source to send */
1638   result = rtp_source_send_rtp (source, buffer, ntpnstime);
1639
1640   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
1641     sess->stats.sender_sources++;
1642   RTP_SESSION_UNLOCK (sess);
1643
1644   return result;
1645
1646   /* ERRORS */
1647 invalid_packet:
1648   {
1649     gst_buffer_unref (buffer);
1650     GST_DEBUG ("invalid RTP packet received");
1651     return GST_FLOW_OK;
1652   }
1653 }
1654
1655 static GstClockTime
1656 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
1657     gboolean first)
1658 {
1659   GstClockTime result;
1660
1661   if (sess->source->received_bye) {
1662     result = rtp_stats_calculate_bye_interval (&sess->stats);
1663   } else {
1664     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
1665         RTP_SOURCE_IS_SENDER (sess->source), first);
1666   }
1667
1668   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
1669       GST_TIME_ARGS (result), first);
1670
1671   if (!deterministic)
1672     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
1673
1674   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1675
1676   return result;
1677 }
1678
1679 /**
1680  * rtp_session_send_bye:
1681  * @sess: an #RTPSession
1682  * @reason: a reason or NULL
1683  *
1684  * Stop the current @sess and schedule a BYE message for the other members.
1685  *
1686  * Returns: a #GstFlowReturn.
1687  */
1688 GstFlowReturn
1689 rtp_session_send_bye (RTPSession * sess, const gchar * reason)
1690 {
1691   GstFlowReturn result = GST_FLOW_OK;
1692   RTPSource *source;
1693   GstClockTime current, interval;
1694   GTimeVal curtv;
1695
1696   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1697
1698   RTP_SESSION_LOCK (sess);
1699   source = sess->source;
1700
1701   /* ignore more BYEs */
1702   if (source->received_bye)
1703     goto done;
1704
1705   /* we have BYE now */
1706   source->received_bye = TRUE;
1707   /* at least one member wants to send a BYE */
1708   g_free (sess->bye_reason);
1709   sess->bye_reason = g_strdup (reason);
1710   sess->stats.avg_rtcp_packet_size = 100;
1711   sess->stats.bye_members = 1;
1712   sess->first_rtcp = TRUE;
1713   sess->sent_bye = FALSE;
1714
1715   /* get current time */
1716   g_get_current_time (&curtv);
1717   current = GST_TIMEVAL_TO_TIME (curtv);
1718
1719   /* reschedule transmission */
1720   sess->last_rtcp_send_time = current;
1721   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
1722   sess->next_rtcp_check_time = current + interval;
1723
1724   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
1725       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
1726
1727   /* notify app of reconsideration */
1728   if (sess->callbacks.reconsider)
1729     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
1730 done:
1731   RTP_SESSION_UNLOCK (sess);
1732
1733   return result;
1734 }
1735
1736 /**
1737  * rtp_session_next_timeout:
1738  * @sess: an #RTPSession
1739  * @time: the current system time
1740  *
1741  * Get the next time we should perform session maintenance tasks.
1742  *
1743  * Returns: a time when rtp_session_on_timeout() should be called with the
1744  * current system time.
1745  */
1746 GstClockTime
1747 rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
1748 {
1749   GstClockTime result;
1750
1751   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1752
1753   RTP_SESSION_LOCK (sess);
1754
1755   result = sess->next_rtcp_check_time;
1756
1757   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
1758       GST_TIME_ARGS (time), GST_TIME_ARGS (result));
1759
1760   if (result < time) {
1761     GST_DEBUG ("take current time as base");
1762     /* our previous check time expired, start counting from the current time
1763      * again. */
1764     result = time;
1765   }
1766
1767   if (sess->source->received_bye) {
1768     if (sess->sent_bye) {
1769       GST_DEBUG ("we sent BYE already");
1770       result = GST_CLOCK_TIME_NONE;
1771     } else if (sess->stats.active_sources >= 50) {
1772       GST_DEBUG ("reconsider BYE, more than 50 sources");
1773       /* reconsider BYE if members >= 50 */
1774       result += calculate_rtcp_interval (sess, FALSE, TRUE);
1775     }
1776   } else {
1777     if (sess->first_rtcp) {
1778       GST_DEBUG ("first RTCP packet");
1779       /* we are called for the first time */
1780       result += calculate_rtcp_interval (sess, FALSE, TRUE);
1781     } else if (sess->next_rtcp_check_time < time) {
1782       GST_DEBUG ("old check time expired, getting new timeout");
1783       /* get a new timeout when we need to */
1784       result += calculate_rtcp_interval (sess, FALSE, FALSE);
1785     }
1786   }
1787   sess->next_rtcp_check_time = result;
1788
1789   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1790   RTP_SESSION_UNLOCK (sess);
1791
1792   return result;
1793 }
1794
1795 typedef struct
1796 {
1797   RTPSession *sess;
1798   GstBuffer *rtcp;
1799   GstClockTime time;
1800   guint64 ntpnstime;
1801   GstClockTime interval;
1802   GstRTCPPacket packet;
1803   gboolean is_bye;
1804   gboolean has_sdes;
1805 } ReportData;
1806
1807 static void
1808 session_start_rtcp (RTPSession * sess, ReportData * data)
1809 {
1810   GstRTCPPacket *packet = &data->packet;
1811   RTPSource *own = sess->source;
1812
1813   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
1814
1815   if (RTP_SOURCE_IS_SENDER (own)) {
1816     guint64 ntptime;
1817     guint32 rtptime;
1818     guint32 packet_count, octet_count;
1819
1820     /* we are a sender, create SR */
1821     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
1822     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
1823
1824     /* get latest stats */
1825     rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime,
1826         &packet_count, &octet_count);
1827     /* store stats */
1828     rtp_source_process_sr (own, data->ntpnstime, ntptime, rtptime, packet_count,
1829         octet_count);
1830
1831     /* fill in sender report info */
1832     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
1833         ntptime, rtptime, packet_count, octet_count);
1834   } else {
1835     /* we are only receiver, create RR */
1836     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
1837     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
1838     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
1839   }
1840 }
1841
1842 /* construct a Sender or Receiver Report */
1843 static void
1844 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
1845 {
1846   RTPSession *sess = data->sess;
1847   GstRTCPPacket *packet = &data->packet;
1848
1849   /* create a new buffer if needed */
1850   if (data->rtcp == NULL) {
1851     session_start_rtcp (sess, data);
1852   }
1853   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
1854     /* only report about other sender sources */
1855     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
1856       guint8 fractionlost;
1857       gint32 packetslost;
1858       guint32 exthighestseq, jitter;
1859       guint32 lsr, dlsr;
1860
1861       /* get new stats */
1862       rtp_source_get_new_rb (source, data->time, &fractionlost, &packetslost,
1863           &exthighestseq, &jitter, &lsr, &dlsr);
1864
1865       /* packet is not yet filled, add report block for this source. */
1866       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
1867           exthighestseq, jitter, lsr, dlsr);
1868     }
1869   }
1870 }
1871
1872 /* perform cleanup of sources that timed out */
1873 static gboolean
1874 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
1875 {
1876   gboolean remove = FALSE;
1877   gboolean byetimeout = FALSE;
1878   gboolean is_sender, is_active;
1879   RTPSession *sess = data->sess;
1880   GstClockTime interval;
1881
1882   is_sender = RTP_SOURCE_IS_SENDER (source);
1883   is_active = RTP_SOURCE_IS_ACTIVE (source);
1884
1885   /* check for our own source, we don't want to delete our own source. */
1886   if (!(source == sess->source)) {
1887     if (source->received_bye) {
1888       /* if we received a BYE from the source, remove the source after some
1889        * time. */
1890       if (data->time > source->bye_time &&
1891           data->time - source->bye_time > sess->stats.bye_timeout) {
1892         GST_DEBUG ("removing BYE source %08x", source->ssrc);
1893         remove = TRUE;
1894         byetimeout = TRUE;
1895       }
1896     }
1897     /* sources that were inactive for more than 5 times the deterministic reporting
1898      * interval get timed out. the min timeout is 5 seconds. */
1899     if (data->time > source->last_activity) {
1900       interval = MAX (data->interval * 5, 5 * GST_SECOND);
1901       if (data->time - source->last_activity > interval) {
1902         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
1903             source->ssrc, GST_TIME_ARGS (source->last_activity));
1904         remove = TRUE;
1905       }
1906     }
1907   }
1908
1909   /* senders that did not send for a long time become a receiver, this also
1910    * holds for our own source. */
1911   if (is_sender) {
1912     if (data->time > source->last_rtp_activity) {
1913       interval = MAX (data->interval * 2, 5 * GST_SECOND);
1914       if (data->time - source->last_rtp_activity > interval) {
1915         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
1916             GST_TIME_FORMAT, source->ssrc,
1917             GST_TIME_ARGS (source->last_rtp_activity));
1918         source->is_sender = FALSE;
1919         sess->stats.sender_sources--;
1920       }
1921     }
1922   }
1923
1924   if (remove) {
1925     sess->total_sources--;
1926     if (is_sender)
1927       sess->stats.sender_sources--;
1928     if (is_active)
1929       sess->stats.active_sources--;
1930
1931     if (byetimeout)
1932       on_bye_timeout (sess, source);
1933     else
1934       on_timeout (sess, source);
1935   }
1936   return remove;
1937 }
1938
1939 static void
1940 session_sdes (RTPSession * sess, ReportData * data)
1941 {
1942   GstRTCPPacket *packet = &data->packet;
1943   guint8 *sdes_data;
1944   guint sdes_len;
1945
1946   /* add SDES packet */
1947   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
1948
1949   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
1950
1951   rtp_source_get_sdes (sess->source, GST_RTCP_SDES_CNAME, &sdes_data,
1952       &sdes_len);
1953   gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME, sdes_len,
1954       sdes_data);
1955
1956   /* other SDES items must only be added at regular intervals and only when the
1957    * user requests to since it might be a privacy problem */
1958 #if 0
1959   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME,
1960       strlen (sess->name), (guint8 *) sess->name);
1961   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
1962       strlen (sess->tool), (guint8 *) sess->tool);
1963 #endif
1964
1965   data->has_sdes = TRUE;
1966 }
1967
1968 /* schedule a BYE packet */
1969 static void
1970 session_bye (RTPSession * sess, ReportData * data)
1971 {
1972   GstRTCPPacket *packet = &data->packet;
1973
1974   /* open packet */
1975   session_start_rtcp (sess, data);
1976
1977   /* add SDES */
1978   session_sdes (sess, data);
1979
1980   /* add a BYE packet */
1981   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
1982   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
1983   if (sess->bye_reason)
1984     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
1985
1986   /* we have a BYE packet now */
1987   data->is_bye = TRUE;
1988 }
1989
1990 static gboolean
1991 is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
1992 {
1993   GstClockTime new_send_time, elapsed;
1994   gboolean result;
1995
1996   /* no need to check yet */
1997   if (sess->next_rtcp_check_time > time) {
1998     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
1999         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
2000         GST_TIME_ARGS (time));
2001     return FALSE;
2002   }
2003
2004   /* get elapsed time since we last reported */
2005   elapsed = time - sess->last_rtcp_send_time;
2006
2007   /* perform forward reconsideration */
2008   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
2009
2010   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
2011       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
2012
2013   new_send_time += sess->last_rtcp_send_time;
2014
2015   /* check if reconsideration */
2016   if (time < new_send_time) {
2017     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
2018         GST_TIME_ARGS (new_send_time));
2019     result = FALSE;
2020     /* store new check time */
2021     sess->next_rtcp_check_time = new_send_time;
2022   } else {
2023     result = TRUE;
2024     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
2025
2026     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
2027         GST_TIME_ARGS (new_send_time));
2028     sess->next_rtcp_check_time = time + new_send_time;
2029   }
2030   return result;
2031 }
2032
2033 /**
2034  * rtp_session_on_timeout:
2035  * @sess: an #RTPSession
2036  * @time: the current system time
2037  * @ntpnstime: the current NTP time in nanoseconds
2038  *
2039  * Perform maintenance actions after the timeout obtained with
2040  * rtp_session_next_timeout() expired.
2041  *
2042  * This function will perform timeouts of receivers and senders, send a BYE
2043  * packet or generate RTCP packets with current session stats.
2044  *
2045  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
2046  * times, for each packet that should be processed.
2047  *
2048  * Returns: a #GstFlowReturn.
2049  */
2050 GstFlowReturn
2051 rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime)
2052 {
2053   GstFlowReturn result = GST_FLOW_OK;
2054   ReportData data;
2055
2056   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2057
2058   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
2059       GST_TIME_ARGS (time), GST_TIME_ARGS (ntpnstime));
2060
2061   data.sess = sess;
2062   data.rtcp = NULL;
2063   data.time = time;
2064   data.ntpnstime = ntpnstime;
2065   data.is_bye = FALSE;
2066   data.has_sdes = FALSE;
2067
2068   RTP_SESSION_LOCK (sess);
2069   /* get a new interval, we need this for various cleanups etc */
2070   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
2071
2072   /* first perform cleanups */
2073   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
2074       (GHRFunc) session_cleanup, &data);
2075
2076   /* see if we need to generate SR or RR packets */
2077   if (is_rtcp_time (sess, time, &data)) {
2078     if (sess->source->received_bye) {
2079       /* generate BYE instead */
2080       session_bye (sess, &data);
2081       sess->sent_bye = TRUE;
2082     } else {
2083       /* loop over all known sources and do something */
2084       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2085           (GHFunc) session_report_blocks, &data);
2086     }
2087   }
2088
2089   if (data.rtcp) {
2090     guint size;
2091
2092     /* we keep track of the last report time in order to timeout inactive
2093      * receivers or senders */
2094     sess->last_rtcp_send_time = data.time;
2095     sess->first_rtcp = FALSE;
2096
2097     /* add SDES for this source when not already added */
2098     if (!data.has_sdes)
2099       session_sdes (sess, &data);
2100
2101     /* update average RTCP size before sending */
2102     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
2103     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
2104   }
2105   RTP_SESSION_UNLOCK (sess);
2106
2107   /* push out the RTCP packet */
2108   if (data.rtcp) {
2109     /* close the RTCP packet */
2110     gst_rtcp_buffer_end (data.rtcp);
2111
2112     if (sess->callbacks.send_rtcp)
2113       result = sess->callbacks.send_rtcp (sess, sess->source, data.rtcp,
2114           sess->send_rtcp_user_data);
2115     else
2116       gst_buffer_unref (data.rtcp);
2117   }
2118
2119   return result;
2120 }