gst/rtpmanager/gstrtpsession.c: Pass the running time to the session when processing...
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "gstrtpbin-marshal.h"
27 #include "rtpsession.h"
28
29 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
30 #define GST_CAT_DEFAULT rtp_session_debug
31
32 /* signals and args */
33 enum
34 {
35   SIGNAL_GET_SOURCE_BY_SSRC,
36   SIGNAL_ON_NEW_SSRC,
37   SIGNAL_ON_SSRC_COLLISION,
38   SIGNAL_ON_SSRC_VALIDATED,
39   SIGNAL_ON_SSRC_ACTIVE,
40   SIGNAL_ON_SSRC_SDES,
41   SIGNAL_ON_BYE_SSRC,
42   SIGNAL_ON_BYE_TIMEOUT,
43   SIGNAL_ON_TIMEOUT,
44   SIGNAL_ON_SENDER_TIMEOUT,
45   LAST_SIGNAL
46 };
47
48 #define DEFAULT_INTERNAL_SOURCE      NULL
49 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
50 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_BANDWIDTH
51 #define DEFAULT_SDES_CNAME           NULL
52 #define DEFAULT_SDES_NAME            NULL
53 #define DEFAULT_SDES_EMAIL           NULL
54 #define DEFAULT_SDES_PHONE           NULL
55 #define DEFAULT_SDES_LOCATION        NULL
56 #define DEFAULT_SDES_TOOL            NULL
57 #define DEFAULT_SDES_NOTE            NULL
58 #define DEFAULT_NUM_SOURCES          0
59 #define DEFAULT_NUM_ACTIVE_SOURCES   0
60
61 enum
62 {
63   PROP_0,
64   PROP_INTERNAL_SOURCE,
65   PROP_BANDWIDTH,
66   PROP_RTCP_FRACTION,
67   PROP_SDES_CNAME,
68   PROP_SDES_NAME,
69   PROP_SDES_EMAIL,
70   PROP_SDES_PHONE,
71   PROP_SDES_LOCATION,
72   PROP_SDES_TOOL,
73   PROP_SDES_NOTE,
74   PROP_NUM_SOURCES,
75   PROP_NUM_ACTIVE_SOURCES,
76   PROP_LAST
77 };
78
79 /* update average packet size, we keep this scaled by 16 to keep enough
80  * precision. */
81 #define UPDATE_AVG(avg, val)            \
82   if ((avg) == 0)                       \
83    (avg) = (val) << 4;                  \
84   else                                  \
85    (avg) = ((val) + (15 * (avg))) >> 4;
86
87 /* The number RTCP intervals after which to timeout entries in the
88  * collision table
89  */
90 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
91
92 /* GObject vmethods */
93 static void rtp_session_finalize (GObject * object);
94 static void rtp_session_set_property (GObject * object, guint prop_id,
95     const GValue * value, GParamSpec * pspec);
96 static void rtp_session_get_property (GObject * object, guint prop_id,
97     GValue * value, GParamSpec * pspec);
98
99 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
100
101 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
102
103 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
104     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
105 static GstFlowReturn rtp_session_send_bye_locked (RTPSession * sess,
106     const gchar * reason, GstClockTime current_time);
107 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
108     gboolean deterministic, gboolean first);
109
110 static void
111 rtp_session_class_init (RTPSessionClass * klass)
112 {
113   GObjectClass *gobject_class;
114
115   gobject_class = (GObjectClass *) klass;
116
117   gobject_class->finalize = rtp_session_finalize;
118   gobject_class->set_property = rtp_session_set_property;
119   gobject_class->get_property = rtp_session_get_property;
120
121   /**
122    * RTPSession::get-source-by-ssrc:
123    * @session: the object which received the signal
124    * @ssrc: the SSRC of the RTPSource
125    *
126    * Request the #RTPSource object with SSRC @ssrc in @session.
127    */
128   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
129       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
130       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
131           get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
132       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
133
134   /**
135    * RTPSession::on-new-ssrc:
136    * @session: the object which received the signal
137    * @src: the new RTPSource
138    *
139    * Notify of a new SSRC that entered @session.
140    */
141   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
142       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
143       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
144       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
145       RTP_TYPE_SOURCE);
146   /**
147    * RTPSession::on-ssrc-collision:
148    * @session: the object which received the signal
149    * @src: the #RTPSource that caused a collision
150    *
151    * Notify when we have an SSRC collision
152    */
153   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
154       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
155       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
156       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
157       RTP_TYPE_SOURCE);
158   /**
159    * RTPSession::on-ssrc-validated:
160    * @session: the object which received the signal
161    * @src: the new validated RTPSource
162    *
163    * Notify of a new SSRC that became validated.
164    */
165   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
166       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
167       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
168       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
169       RTP_TYPE_SOURCE);
170   /**
171    * RTPSession::on-ssrc-active:
172    * @session: the object which received the signal
173    * @src: the active RTPSource
174    *
175    * Notify of a SSRC that is active, i.e., sending RTCP.
176    */
177   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
178       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
179       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
180       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
181       RTP_TYPE_SOURCE);
182   /**
183    * RTPSession::on-ssrc-sdes:
184    * @session: the object which received the signal
185    * @src: the RTPSource
186    *
187    * Notify that a new SDES was received for SSRC.
188    */
189   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
190       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
191       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
192       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
193       RTP_TYPE_SOURCE);
194   /**
195    * RTPSession::on-bye-ssrc:
196    * @session: the object which received the signal
197    * @src: the RTPSource that went away
198    *
199    * Notify of an SSRC that became inactive because of a BYE packet.
200    */
201   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
202       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
203       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
204       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
205       RTP_TYPE_SOURCE);
206   /**
207    * RTPSession::on-bye-timeout:
208    * @session: the object which received the signal
209    * @src: the RTPSource that timed out
210    *
211    * Notify of an SSRC that has timed out because of BYE
212    */
213   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
214       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
215       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
216       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
217       RTP_TYPE_SOURCE);
218   /**
219    * RTPSession::on-timeout:
220    * @session: the object which received the signal
221    * @src: the RTPSource that timed out
222    *
223    * Notify of an SSRC that has timed out
224    */
225   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
226       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
227       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
228       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
229       RTP_TYPE_SOURCE);
230   /**
231    * RTPSession::on-sender-timeout:
232    * @session: the object which received the signal
233    * @src: the RTPSource that timed out
234    *
235    * Notify of an SSRC that was a sender but timed out and became a receiver.
236    */
237   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
238       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
239       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
240       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
241       RTP_TYPE_SOURCE);
242
243   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
244       g_param_spec_object ("internal-source", "Internal Source",
245           "The internal source element of the session",
246           RTP_TYPE_SOURCE, G_PARAM_READABLE));
247
248   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
249       g_param_spec_double ("bandwidth", "Bandwidth",
250           "The bandwidth of the session",
251           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH, G_PARAM_READWRITE));
252
253   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
254       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
255           "The fraction of the bandwidth used for RTCP",
256           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION, G_PARAM_READWRITE));
257
258   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
259       g_param_spec_string ("sdes-cname", "SDES CNAME",
260           "The CNAME to put in SDES messages of this session",
261           DEFAULT_SDES_CNAME, G_PARAM_READWRITE));
262
263   g_object_class_install_property (gobject_class, PROP_SDES_NAME,
264       g_param_spec_string ("sdes-name", "SDES NAME",
265           "The NAME to put in SDES messages of this session",
266           DEFAULT_SDES_NAME, G_PARAM_READWRITE));
267
268   g_object_class_install_property (gobject_class, PROP_SDES_EMAIL,
269       g_param_spec_string ("sdes-email", "SDES EMAIL",
270           "The EMAIL to put in SDES messages of this session",
271           DEFAULT_SDES_EMAIL, G_PARAM_READWRITE));
272
273   g_object_class_install_property (gobject_class, PROP_SDES_PHONE,
274       g_param_spec_string ("sdes-phone", "SDES PHONE",
275           "The PHONE to put in SDES messages of this session",
276           DEFAULT_SDES_PHONE, G_PARAM_READWRITE));
277
278   g_object_class_install_property (gobject_class, PROP_SDES_LOCATION,
279       g_param_spec_string ("sdes-location", "SDES LOCATION",
280           "The LOCATION to put in SDES messages of this session",
281           DEFAULT_SDES_LOCATION, G_PARAM_READWRITE));
282
283   g_object_class_install_property (gobject_class, PROP_SDES_TOOL,
284       g_param_spec_string ("sdes-tool", "SDES TOOL",
285           "The TOOL to put in SDES messages of this session",
286           DEFAULT_SDES_TOOL, G_PARAM_READWRITE));
287
288   g_object_class_install_property (gobject_class, PROP_SDES_NOTE,
289       g_param_spec_string ("sdes-note", "SDES NOTE",
290           "The NOTE to put in SDES messages of this session",
291           DEFAULT_SDES_NOTE, G_PARAM_READWRITE));
292
293   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
294       g_param_spec_uint ("num-sources", "Num Sources",
295           "The number of sources in the session", 0, G_MAXUINT,
296           DEFAULT_NUM_SOURCES, G_PARAM_READABLE));
297
298   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
299       g_param_spec_uint ("num-active-sources", "Num Active Sources",
300           "The number of active sources in the session", 0, G_MAXUINT,
301           DEFAULT_NUM_ACTIVE_SOURCES, G_PARAM_READABLE));
302
303   klass->get_source_by_ssrc =
304       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
305
306   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
307 }
308
309 static void
310 rtp_session_init (RTPSession * sess)
311 {
312   gint i;
313   gchar *str;
314
315   sess->lock = g_mutex_new ();
316   sess->key = g_random_int ();
317   sess->mask_idx = 0;
318   sess->mask = 0;
319
320   for (i = 0; i < 32; i++) {
321     sess->ssrcs[i] =
322         g_hash_table_new_full (NULL, NULL, NULL,
323         (GDestroyNotify) g_object_unref);
324   }
325   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
326
327   rtp_stats_init_defaults (&sess->stats);
328
329   /* create an active SSRC for this session manager */
330   sess->source = rtp_session_create_source (sess);
331   sess->source->validated = TRUE;
332   sess->source->internal = TRUE;
333   sess->stats.active_sources++;
334
335   /* default UDP header length */
336   sess->header_len = 28;
337   sess->mtu = 1400;
338
339   /* some default SDES entries */
340   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
341   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
342   g_free (str);
343
344   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME,
345       g_get_real_name ());
346   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
347
348   sess->first_rtcp = TRUE;
349
350   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
351 }
352
353 static void
354 rtp_session_finalize (GObject * object)
355 {
356   RTPSession *sess;
357   gint i;
358
359   sess = RTP_SESSION_CAST (object);
360
361   g_mutex_free (sess->lock);
362   for (i = 0; i < 32; i++)
363     g_hash_table_destroy (sess->ssrcs[i]);
364
365   g_free (sess->bye_reason);
366
367   g_hash_table_destroy (sess->cnames);
368   g_object_unref (sess->source);
369
370   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
371 }
372
373 static void
374 rtp_session_set_property (GObject * object, guint prop_id,
375     const GValue * value, GParamSpec * pspec)
376 {
377   RTPSession *sess;
378
379   sess = RTP_SESSION (object);
380
381   switch (prop_id) {
382     case PROP_BANDWIDTH:
383       rtp_session_set_bandwidth (sess, g_value_get_double (value));
384       break;
385     case PROP_RTCP_FRACTION:
386       rtp_session_set_rtcp_fraction (sess, g_value_get_double (value));
387       break;
388     case PROP_SDES_CNAME:
389       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_CNAME,
390           g_value_get_string (value));
391       break;
392     case PROP_SDES_NAME:
393       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NAME,
394           g_value_get_string (value));
395       break;
396     case PROP_SDES_EMAIL:
397       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_EMAIL,
398           g_value_get_string (value));
399       break;
400     case PROP_SDES_PHONE:
401       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_PHONE,
402           g_value_get_string (value));
403       break;
404     case PROP_SDES_LOCATION:
405       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_LOC,
406           g_value_get_string (value));
407       break;
408     case PROP_SDES_TOOL:
409       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_TOOL,
410           g_value_get_string (value));
411       break;
412     case PROP_SDES_NOTE:
413       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NOTE,
414           g_value_get_string (value));
415       break;
416     default:
417       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
418       break;
419   }
420 }
421
422 static void
423 rtp_session_get_property (GObject * object, guint prop_id,
424     GValue * value, GParamSpec * pspec)
425 {
426   RTPSession *sess;
427
428   sess = RTP_SESSION (object);
429
430   switch (prop_id) {
431     case PROP_INTERNAL_SOURCE:
432       g_value_take_object (value, rtp_session_get_internal_source (sess));
433       break;
434     case PROP_BANDWIDTH:
435       g_value_set_double (value, rtp_session_get_bandwidth (sess));
436       break;
437     case PROP_RTCP_FRACTION:
438       g_value_set_double (value, rtp_session_get_rtcp_fraction (sess));
439       break;
440     case PROP_SDES_CNAME:
441       g_value_take_string (value, rtp_session_get_sdes_string (sess,
442               GST_RTCP_SDES_CNAME));
443       break;
444     case PROP_SDES_NAME:
445       g_value_take_string (value, rtp_session_get_sdes_string (sess,
446               GST_RTCP_SDES_NAME));
447       break;
448     case PROP_SDES_EMAIL:
449       g_value_take_string (value, rtp_session_get_sdes_string (sess,
450               GST_RTCP_SDES_EMAIL));
451       break;
452     case PROP_SDES_PHONE:
453       g_value_take_string (value, rtp_session_get_sdes_string (sess,
454               GST_RTCP_SDES_PHONE));
455       break;
456     case PROP_SDES_LOCATION:
457       g_value_take_string (value, rtp_session_get_sdes_string (sess,
458               GST_RTCP_SDES_LOC));
459       break;
460     case PROP_SDES_TOOL:
461       g_value_take_string (value, rtp_session_get_sdes_string (sess,
462               GST_RTCP_SDES_TOOL));
463       break;
464     case PROP_SDES_NOTE:
465       g_value_take_string (value, rtp_session_get_sdes_string (sess,
466               GST_RTCP_SDES_NOTE));
467       break;
468     case PROP_NUM_SOURCES:
469       g_value_set_uint (value, rtp_session_get_num_sources (sess));
470       break;
471     case PROP_NUM_ACTIVE_SOURCES:
472       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
473       break;
474     default:
475       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
476       break;
477   }
478 }
479
480 static void
481 on_new_ssrc (RTPSession * sess, RTPSource * source)
482 {
483   g_object_ref (source);
484   RTP_SESSION_UNLOCK (sess);
485   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
486   RTP_SESSION_LOCK (sess);
487   g_object_unref (source);
488 }
489
490 static void
491 on_ssrc_collision (RTPSession * sess, RTPSource * source)
492 {
493   g_object_ref (source);
494   RTP_SESSION_UNLOCK (sess);
495   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
496       source);
497   RTP_SESSION_LOCK (sess);
498   g_object_unref (source);
499 }
500
501 static void
502 on_ssrc_validated (RTPSession * sess, RTPSource * source)
503 {
504   g_object_ref (source);
505   RTP_SESSION_UNLOCK (sess);
506   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
507       source);
508   RTP_SESSION_LOCK (sess);
509   g_object_unref (source);
510 }
511
512 static void
513 on_ssrc_active (RTPSession * sess, RTPSource * source)
514 {
515   g_object_ref (source);
516   RTP_SESSION_UNLOCK (sess);
517   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
518   RTP_SESSION_LOCK (sess);
519   g_object_unref (source);
520 }
521
522 static void
523 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
524 {
525   g_object_ref (source);
526   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
527   RTP_SESSION_UNLOCK (sess);
528   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
529   RTP_SESSION_LOCK (sess);
530   g_object_unref (source);
531 }
532
533 static void
534 on_bye_ssrc (RTPSession * sess, RTPSource * source)
535 {
536   g_object_ref (source);
537   RTP_SESSION_UNLOCK (sess);
538   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
539   RTP_SESSION_LOCK (sess);
540   g_object_unref (source);
541 }
542
543 static void
544 on_bye_timeout (RTPSession * sess, RTPSource * source)
545 {
546   g_object_ref (source);
547   RTP_SESSION_UNLOCK (sess);
548   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
549   RTP_SESSION_LOCK (sess);
550   g_object_unref (source);
551 }
552
553 static void
554 on_timeout (RTPSession * sess, RTPSource * source)
555 {
556   g_object_ref (source);
557   RTP_SESSION_UNLOCK (sess);
558   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
559   RTP_SESSION_LOCK (sess);
560   g_object_unref (source);
561 }
562
563 static void
564 on_sender_timeout (RTPSession * sess, RTPSource * source)
565 {
566   g_object_ref (source);
567   RTP_SESSION_UNLOCK (sess);
568   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
569       source);
570   RTP_SESSION_LOCK (sess);
571   g_object_unref (source);
572 }
573
574 /**
575  * rtp_session_new:
576  *
577  * Create a new session object.
578  *
579  * Returns: a new #RTPSession. g_object_unref() after usage.
580  */
581 RTPSession *
582 rtp_session_new (void)
583 {
584   RTPSession *sess;
585
586   sess = g_object_new (RTP_TYPE_SESSION, NULL);
587
588   return sess;
589 }
590
591 /**
592  * rtp_session_set_callbacks:
593  * @sess: an #RTPSession
594  * @callbacks: callbacks to configure
595  * @user_data: user data passed in the callbacks
596  *
597  * Configure a set of callbacks to be notified of actions.
598  */
599 void
600 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
601     gpointer user_data)
602 {
603   g_return_if_fail (RTP_IS_SESSION (sess));
604
605   if (callbacks->process_rtp) {
606     sess->callbacks.process_rtp = callbacks->process_rtp;
607     sess->process_rtp_user_data = user_data;
608   }
609   if (callbacks->send_rtp) {
610     sess->callbacks.send_rtp = callbacks->send_rtp;
611     sess->send_rtp_user_data = user_data;
612   }
613   if (callbacks->send_rtcp) {
614     sess->callbacks.send_rtcp = callbacks->send_rtcp;
615     sess->send_rtcp_user_data = user_data;
616   }
617   if (callbacks->sync_rtcp) {
618     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
619     sess->sync_rtcp_user_data = user_data;
620   }
621   if (callbacks->clock_rate) {
622     sess->callbacks.clock_rate = callbacks->clock_rate;
623     sess->clock_rate_user_data = user_data;
624   }
625   if (callbacks->reconsider) {
626     sess->callbacks.reconsider = callbacks->reconsider;
627     sess->reconsider_user_data = user_data;
628   }
629 }
630
631 /**
632  * rtp_session_set_process_rtp_callback:
633  * @sess: an #RTPSession
634  * @callback: callback to set
635  * @user_data: user data passed in the callback
636  *
637  * Configure only the process_rtp callback to be notified of the process_rtp action.
638  */
639 void
640 rtp_session_set_process_rtp_callback (RTPSession * sess,
641     RTPSessionProcessRTP callback, gpointer user_data)
642 {
643   g_return_if_fail (RTP_IS_SESSION (sess));
644
645   sess->callbacks.process_rtp = callback;
646   sess->process_rtp_user_data = user_data;
647 }
648
649 /**
650  * rtp_session_set_send_rtp_callback:
651  * @sess: an #RTPSession
652  * @callback: callback to set
653  * @user_data: user data passed in the callback
654  *
655  * Configure only the send_rtp callback to be notified of the send_rtp action.
656  */
657 void
658 rtp_session_set_send_rtp_callback (RTPSession * sess,
659     RTPSessionSendRTP callback, gpointer user_data)
660 {
661   g_return_if_fail (RTP_IS_SESSION (sess));
662
663   sess->callbacks.send_rtp = callback;
664   sess->send_rtp_user_data = user_data;
665 }
666
667 /**
668  * rtp_session_set_send_rtcp_callback:
669  * @sess: an #RTPSession
670  * @callback: callback to set
671  * @user_data: user data passed in the callback
672  *
673  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
674  */
675 void
676 rtp_session_set_send_rtcp_callback (RTPSession * sess,
677     RTPSessionSendRTCP callback, gpointer user_data)
678 {
679   g_return_if_fail (RTP_IS_SESSION (sess));
680
681   sess->callbacks.send_rtcp = callback;
682   sess->send_rtcp_user_data = user_data;
683 }
684
685 /**
686  * rtp_session_set_sync_rtcp_callback:
687  * @sess: an #RTPSession
688  * @callback: callback to set
689  * @user_data: user data passed in the callback
690  *
691  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
692  */
693 void
694 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
695     RTPSessionSyncRTCP callback, gpointer user_data)
696 {
697   g_return_if_fail (RTP_IS_SESSION (sess));
698
699   sess->callbacks.sync_rtcp = callback;
700   sess->sync_rtcp_user_data = user_data;
701 }
702
703 /**
704  * rtp_session_set_clock_rate_callback:
705  * @sess: an #RTPSession
706  * @callback: callback to set
707  * @user_data: user data passed in the callback
708  *
709  * Configure only the clock_rate callback to be notified of the clock_rate action.
710  */
711 void
712 rtp_session_set_clock_rate_callback (RTPSession * sess,
713     RTPSessionClockRate callback, gpointer user_data)
714 {
715   g_return_if_fail (RTP_IS_SESSION (sess));
716
717   sess->callbacks.clock_rate = callback;
718   sess->clock_rate_user_data = user_data;
719 }
720
721 /**
722  * rtp_session_set_reconsider_callback:
723  * @sess: an #RTPSession
724  * @callback: callback to set
725  * @user_data: user data passed in the callback
726  *
727  * Configure only the reconsider callback to be notified of the reconsider action.
728  */
729 void
730 rtp_session_set_reconsider_callback (RTPSession * sess,
731     RTPSessionReconsider callback, gpointer user_data)
732 {
733   g_return_if_fail (RTP_IS_SESSION (sess));
734
735   sess->callbacks.reconsider = callback;
736   sess->reconsider_user_data = user_data;
737 }
738
739 /**
740  * rtp_session_set_bandwidth:
741  * @sess: an #RTPSession
742  * @bandwidth: the bandwidth allocated
743  *
744  * Set the session bandwidth in bytes per second.
745  */
746 void
747 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
748 {
749   g_return_if_fail (RTP_IS_SESSION (sess));
750
751   RTP_SESSION_LOCK (sess);
752   sess->stats.bandwidth = bandwidth;
753   RTP_SESSION_UNLOCK (sess);
754 }
755
756 /**
757  * rtp_session_get_bandwidth:
758  * @sess: an #RTPSession
759  *
760  * Get the session bandwidth.
761  *
762  * Returns: the session bandwidth.
763  */
764 gdouble
765 rtp_session_get_bandwidth (RTPSession * sess)
766 {
767   gdouble result;
768
769   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
770
771   RTP_SESSION_LOCK (sess);
772   result = sess->stats.bandwidth;
773   RTP_SESSION_UNLOCK (sess);
774
775   return result;
776 }
777
778 /**
779  * rtp_session_set_rtcp_fraction:
780  * @sess: an #RTPSession
781  * @bandwidth: the RTCP bandwidth
782  *
783  * Set the bandwidth that should be used for RTCP
784  * messages.
785  */
786 void
787 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
788 {
789   g_return_if_fail (RTP_IS_SESSION (sess));
790
791   RTP_SESSION_LOCK (sess);
792   sess->stats.rtcp_bandwidth = bandwidth;
793   RTP_SESSION_UNLOCK (sess);
794 }
795
796 /**
797  * rtp_session_get_rtcp_fraction:
798  * @sess: an #RTPSession
799  *
800  * Get the session bandwidth used for RTCP.
801  *
802  * Returns: The bandwidth used for RTCP messages.
803  */
804 gdouble
805 rtp_session_get_rtcp_fraction (RTPSession * sess)
806 {
807   gdouble result;
808
809   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
810
811   RTP_SESSION_LOCK (sess);
812   result = sess->stats.rtcp_bandwidth;
813   RTP_SESSION_UNLOCK (sess);
814
815   return result;
816 }
817
818 /**
819  * rtp_session_set_sdes_string:
820  * @sess: an #RTPSession
821  * @type: the type of the SDES item
822  * @item: a null-terminated string to set. 
823  *
824  * Store an SDES item of @type in @sess. 
825  *
826  * Returns: %FALSE if the data was unchanged @type is invalid.
827  */
828 gboolean
829 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
830     const gchar * item)
831 {
832   gboolean result;
833
834   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
835
836   RTP_SESSION_LOCK (sess);
837   result = rtp_source_set_sdes_string (sess->source, type, item);
838   RTP_SESSION_UNLOCK (sess);
839
840   return result;
841 }
842
843 /**
844  * rtp_session_get_sdes_string:
845  * @sess: an #RTPSession
846  * @type: the type of the SDES item
847  *
848  * Get the SDES item of @type from @sess. 
849  *
850  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
851  * valid. g_free() after usage.
852  */
853 gchar *
854 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
855 {
856   gchar *result;
857
858   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
859
860   RTP_SESSION_LOCK (sess);
861   result = rtp_source_get_sdes_string (sess->source, type);
862   RTP_SESSION_UNLOCK (sess);
863
864   return result;
865 }
866
867 static GstFlowReturn
868 source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
869 {
870   GstFlowReturn result = GST_FLOW_OK;
871
872   if (source == session->source) {
873     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
874
875     RTP_SESSION_UNLOCK (session);
876
877     if (session->callbacks.send_rtp)
878       result =
879           session->callbacks.send_rtp (session, source, buffer,
880           session->send_rtp_user_data);
881     else
882       gst_buffer_unref (buffer);
883
884   } else {
885     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
886     RTP_SESSION_UNLOCK (session);
887
888     if (session->callbacks.process_rtp)
889       result =
890           session->callbacks.process_rtp (session, source, buffer,
891           session->process_rtp_user_data);
892     else
893       gst_buffer_unref (buffer);
894   }
895   RTP_SESSION_LOCK (session);
896
897   return result;
898 }
899
900 static gint
901 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
902 {
903   gint result;
904
905   RTP_SESSION_UNLOCK (session);
906
907   if (session->callbacks.clock_rate)
908     result =
909         session->callbacks.clock_rate (session, pt,
910         session->clock_rate_user_data);
911   else
912     result = -1;
913
914   RTP_SESSION_LOCK (session);
915
916   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
917
918   return result;
919 }
920
921 static RTPSourceCallbacks callbacks = {
922   (RTPSourcePushRTP) source_push_rtp,
923   (RTPSourceClockRate) source_clock_rate,
924 };
925
926 /**
927  * find_add_conflicting_addresses:
928  * @sess: The session to check in
929  * @arrival: The arrival stats for the buffer
930  *
931  * Checks if an address which has a conflict is already known,
932  *  otherwise remembers it to prevent loops.
933  *
934  * Returns: TRUE if it was a known conflict, FALSE otherwise
935  */
936
937 static gboolean
938 find_add_conflicting_addresses (RTPSession * sess, RTPArrivalStats * arrival)
939 {
940   GList *item;
941   RTPConflictingAddress *new_conflict;
942
943   for (item = g_list_first (sess->conflicting_addresses);
944       item; item = g_list_next (item)) {
945     RTPConflictingAddress *known_conflict = item->data;
946
947     if (gst_netaddress_equal (&arrival->address, &known_conflict->address)) {
948       known_conflict->time = arrival->time;
949       return TRUE;
950     }
951   }
952
953   new_conflict = g_new0 (RTPConflictingAddress, 1);
954
955   memcpy (&new_conflict->address, &arrival->address, sizeof (GstNetAddress));
956   new_conflict->time = arrival->time;
957
958   sess->conflicting_addresses = g_list_prepend (sess->conflicting_addresses,
959       new_conflict);
960
961   return FALSE;
962 }
963
964 static gboolean
965 check_collision (RTPSession * sess, RTPSource * source,
966     RTPArrivalStats * arrival, gboolean rtp)
967 {
968   /* If we have no arrival address, we can't do collision checking */
969   if (!arrival->have_address)
970     return FALSE;
971
972   if (sess->source != source) {
973     /* This is not our local source, but lets check if two remote
974      * source collide
975      */
976     if (rtp) {
977       if (source->have_rtp_from) {
978         if (gst_netaddress_equal (&source->rtp_from, &arrival->address))
979           /* Address is the same */
980           return FALSE;
981       } else {
982         /* We don't already have a from address for RTP, just set it */
983         rtp_source_set_rtp_from (source, &arrival->address);
984         return FALSE;
985       }
986     } else {
987       if (source->have_rtcp_from) {
988         if (gst_netaddress_equal (&source->rtcp_from, &arrival->address))
989           /* Address is the same */
990           return FALSE;
991       } else {
992         /* We don't already have a from address for RTCP, just set it */
993         rtp_source_set_rtcp_from (source, &arrival->address);
994         return FALSE;
995       }
996     }
997     /* We received RTP or RTCP from this source before but the network address
998      * changed. In this case, we have third-party collision or loop */
999     GST_DEBUG ("we have a third-party collision or loop");
1000
1001     /* FIXME: Log 3rd party collision somehow
1002      * Maybe should be done in upper layer, only the SDES can tell us
1003      * if its a collision or a loop
1004      */
1005   } else {
1006     /* This is sending with our ssrc, is it an address we already know */
1007
1008     if (find_add_conflicting_addresses (sess, arrival)) {
1009       /* Its a known conflict, its probably a loop, not a collision
1010        * lets just drop the incoming packet
1011        */
1012       GST_DEBUG ("Our packets are being looped back to us, dropping");
1013     } else {
1014       /* Its a new collision, lets change our SSRC */
1015
1016       GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
1017       on_ssrc_collision (sess, source);
1018
1019       rtp_session_send_bye_locked (sess, "SSRC Collision", arrival->time);
1020
1021       sess->change_ssrc = TRUE;
1022     }
1023   }
1024
1025   return TRUE;
1026 }
1027
1028
1029 /* must be called with the session lock */
1030 static RTPSource *
1031 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1032     RTPArrivalStats * arrival, gboolean rtp)
1033 {
1034   RTPSource *source;
1035
1036   source =
1037       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1038   if (source == NULL) {
1039     /* make new Source in probation and insert */
1040     source = rtp_source_new (ssrc);
1041
1042     /* for RTP packets we need to set the source in probation. Receiving RTCP
1043      * packets of an SSRC, on the other hand, is a strong indication that we
1044      * are dealing with a valid source. */
1045     if (rtp)
1046       source->probation = RTP_DEFAULT_PROBATION;
1047     else
1048       source->probation = 0;
1049
1050     /* store from address, if any */
1051     if (arrival->have_address) {
1052       if (rtp)
1053         rtp_source_set_rtp_from (source, &arrival->address);
1054       else
1055         rtp_source_set_rtcp_from (source, &arrival->address);
1056     }
1057
1058     /* configure a callback on the source */
1059     rtp_source_set_callbacks (source, &callbacks, sess);
1060
1061     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1062         source);
1063
1064     /* we have one more source now */
1065     sess->total_sources++;
1066     *created = TRUE;
1067   } else {
1068     *created = FALSE;
1069     /* check for collision, this updates the address when not previously set */
1070     if (check_collision (sess, source, arrival, rtp)) {
1071       return NULL;
1072     }
1073   }
1074   /* update last activity */
1075   source->last_activity = arrival->time;
1076   if (rtp)
1077     source->last_rtp_activity = arrival->time;
1078
1079   return source;
1080 }
1081
1082 /**
1083  * rtp_session_get_internal_source:
1084  * @sess: a #RTPSession
1085  *
1086  * Get the internal #RTPSource of @sess.
1087  *
1088  * Returns: The internal #RTPSource. g_object_unref() after usage.
1089  */
1090 RTPSource *
1091 rtp_session_get_internal_source (RTPSession * sess)
1092 {
1093   RTPSource *result;
1094
1095   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1096
1097   result = g_object_ref (sess->source);
1098
1099   return result;
1100 }
1101
1102 /**
1103  * rtp_session_set_internal_ssrc:
1104  * @sess: a #RTPSession
1105  * @ssrc: an SSRC
1106  *
1107  * Set the SSRC of @sess to @ssrc.
1108  */
1109 void
1110 rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
1111 {
1112   RTP_SESSION_LOCK (sess);
1113   g_hash_table_steal (sess->ssrcs[sess->mask_idx],
1114       GINT_TO_POINTER (sess->source->ssrc));
1115
1116   sess->source->ssrc = ssrc;
1117   rtp_source_reset (sess->source);
1118
1119   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1120       GINT_TO_POINTER (sess->source->ssrc), sess->source);
1121   RTP_SESSION_UNLOCK (sess);
1122 }
1123
1124 /**
1125  * rtp_session_get_internal_ssrc:
1126  * @sess: a #RTPSession
1127  *
1128  * Get the internal SSRC of @sess.
1129  *
1130  * Returns: The SSRC of the session. 
1131  */
1132 guint32
1133 rtp_session_get_internal_ssrc (RTPSession * sess)
1134 {
1135   guint32 ssrc;
1136
1137   RTP_SESSION_LOCK (sess);
1138   ssrc = sess->source->ssrc;
1139   RTP_SESSION_UNLOCK (sess);
1140
1141   return ssrc;
1142 }
1143
1144 /**
1145  * rtp_session_add_source:
1146  * @sess: a #RTPSession
1147  * @src: #RTPSource to add
1148  *
1149  * Add @src to @session.
1150  *
1151  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1152  * existed in the session.
1153  */
1154 gboolean
1155 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1156 {
1157   gboolean result = FALSE;
1158   RTPSource *find;
1159
1160   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1161   g_return_val_if_fail (src != NULL, FALSE);
1162
1163   RTP_SESSION_LOCK (sess);
1164   find =
1165       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1166       GINT_TO_POINTER (src->ssrc));
1167   if (find == NULL) {
1168     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1169         GINT_TO_POINTER (src->ssrc), src);
1170     /* we have one more source now */
1171     sess->total_sources++;
1172     result = TRUE;
1173   }
1174   RTP_SESSION_UNLOCK (sess);
1175
1176   return result;
1177 }
1178
1179 /**
1180  * rtp_session_get_num_sources:
1181  * @sess: an #RTPSession
1182  *
1183  * Get the number of sources in @sess.
1184  *
1185  * Returns: The number of sources in @sess.
1186  */
1187 guint
1188 rtp_session_get_num_sources (RTPSession * sess)
1189 {
1190   guint result;
1191
1192   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1193
1194   RTP_SESSION_LOCK (sess);
1195   result = sess->total_sources;
1196   RTP_SESSION_UNLOCK (sess);
1197
1198   return result;
1199 }
1200
1201 /**
1202  * rtp_session_get_num_active_sources:
1203  * @sess: an #RTPSession
1204  *
1205  * Get the number of active sources in @sess. A source is considered active when
1206  * it has been validated and has not yet received a BYE RTCP message.
1207  *
1208  * Returns: The number of active sources in @sess.
1209  */
1210 guint
1211 rtp_session_get_num_active_sources (RTPSession * sess)
1212 {
1213   guint result;
1214
1215   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1216
1217   RTP_SESSION_LOCK (sess);
1218   result = sess->stats.active_sources;
1219   RTP_SESSION_UNLOCK (sess);
1220
1221   return result;
1222 }
1223
1224 /**
1225  * rtp_session_get_source_by_ssrc:
1226  * @sess: an #RTPSession
1227  * @ssrc: an SSRC
1228  *
1229  * Find the source with @ssrc in @sess.
1230  *
1231  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1232  * g_object_unref() after usage.
1233  */
1234 RTPSource *
1235 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1236 {
1237   RTPSource *result;
1238
1239   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1240
1241   RTP_SESSION_LOCK (sess);
1242   result =
1243       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1244   if (result)
1245     g_object_ref (result);
1246   RTP_SESSION_UNLOCK (sess);
1247
1248   return result;
1249 }
1250
1251 /**
1252  * rtp_session_get_source_by_cname:
1253  * @sess: a #RTPSession
1254  * @cname: an CNAME
1255  *
1256  * Find the source with @cname in @sess.
1257  *
1258  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
1259  * g_object_unref() after usage.
1260  */
1261 RTPSource *
1262 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
1263 {
1264   RTPSource *result;
1265
1266   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1267   g_return_val_if_fail (cname != NULL, NULL);
1268
1269   RTP_SESSION_LOCK (sess);
1270   result = g_hash_table_lookup (sess->cnames, cname);
1271   if (result)
1272     g_object_ref (result);
1273   RTP_SESSION_UNLOCK (sess);
1274
1275   return result;
1276 }
1277
1278 static guint32
1279 rtp_session_create_new_ssrc (RTPSession * sess)
1280 {
1281   guint32 ssrc;
1282
1283   while (TRUE) {
1284     ssrc = g_random_int ();
1285
1286     /* see if it exists in the session, we're done if it doesn't */
1287     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1288             GINT_TO_POINTER (ssrc)) == NULL)
1289       break;
1290   }
1291
1292   return ssrc;
1293 }
1294
1295
1296 /**
1297  * rtp_session_create_source:
1298  * @sess: an #RTPSession
1299  *
1300  * Create an #RTPSource for use in @sess. This function will create a source
1301  * with an ssrc that is currently not used by any participants in the session.
1302  *
1303  * Returns: an #RTPSource.
1304  */
1305 RTPSource *
1306 rtp_session_create_source (RTPSession * sess)
1307 {
1308   guint32 ssrc;
1309   RTPSource *source;
1310
1311   RTP_SESSION_LOCK (sess);
1312   ssrc = rtp_session_create_new_ssrc (sess);
1313   source = rtp_source_new (ssrc);
1314   g_object_ref (source);
1315   rtp_source_set_callbacks (source, &callbacks, sess);
1316   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1317       source);
1318   /* we have one more source now */
1319   sess->total_sources++;
1320   RTP_SESSION_UNLOCK (sess);
1321
1322   return source;
1323 }
1324
1325 /* update the RTPArrivalStats structure with the current time and other bits
1326  * about the current buffer we are handling.
1327  * This function is typically called when a validated packet is received.
1328  * This function should be called with the SESSION_LOCK
1329  */
1330 static void
1331 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1332     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1333     GstClockTime running_time, guint64 ntpnstime)
1334 {
1335   /* get time of arrival */
1336   arrival->time = current_time;
1337   arrival->running_time = running_time;
1338   arrival->ntpnstime = ntpnstime;
1339
1340   /* get packet size including header overhead */
1341   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
1342
1343   if (rtp) {
1344     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1345   } else {
1346     arrival->payload_len = 0;
1347   }
1348
1349   /* for netbuffer we can store the IP address to check for collisions */
1350   arrival->have_address = GST_IS_NETBUFFER (buffer);
1351   if (arrival->have_address) {
1352     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1353
1354     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1355   }
1356 }
1357
1358 /**
1359  * rtp_session_process_rtp:
1360  * @sess: and #RTPSession
1361  * @buffer: an RTP buffer
1362  * @current_time: the current system time
1363  * @ntpnstime: the NTP arrival time in nanoseconds
1364  *
1365  * Process an RTP buffer in the session manager. This function takes ownership
1366  * of @buffer.
1367  *
1368  * Returns: a #GstFlowReturn.
1369  */
1370 GstFlowReturn
1371 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1372     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
1373 {
1374   GstFlowReturn result;
1375   guint32 ssrc;
1376   RTPSource *source;
1377   gboolean created;
1378   gboolean prevsender, prevactive;
1379   RTPArrivalStats arrival;
1380
1381   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1382   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1383
1384   if (!gst_rtp_buffer_validate (buffer))
1385     goto invalid_packet;
1386
1387   RTP_SESSION_LOCK (sess);
1388   /* update arrival stats */
1389   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1390       running_time, ntpnstime);
1391
1392   /* ignore more RTP packets when we left the session */
1393   if (sess->source->received_bye)
1394     goto ignore;
1395
1396   /* get SSRC and look up in session database */
1397   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1398   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1399
1400   if (!source)
1401     goto collision;
1402
1403   prevsender = RTP_SOURCE_IS_SENDER (source);
1404   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1405
1406   /* we need to ref so that we can process the CSRCs later */
1407   gst_buffer_ref (buffer);
1408
1409   /* let source process the packet */
1410   result = rtp_source_process_rtp (source, buffer, &arrival);
1411
1412   /* source became active */
1413   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1414     sess->stats.active_sources++;
1415     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1416         sess->stats.active_sources);
1417     on_ssrc_validated (sess, source);
1418   }
1419   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1420     sess->stats.sender_sources++;
1421     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1422         sess->stats.sender_sources);
1423   }
1424
1425   if (created)
1426     on_new_ssrc (sess, source);
1427
1428   if (source->validated) {
1429     guint8 i, count;
1430     gboolean created;
1431
1432     /* for validated sources, we add the CSRCs as well */
1433     count = gst_rtp_buffer_get_csrc_count (buffer);
1434
1435     for (i = 0; i < count; i++) {
1436       guint32 csrc;
1437       RTPSource *csrc_src;
1438
1439       csrc = gst_rtp_buffer_get_csrc (buffer, i);
1440
1441       /* get source */
1442       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1443
1444       if (created) {
1445         GST_DEBUG ("created new CSRC: %08x", csrc);
1446         rtp_source_set_as_csrc (csrc_src);
1447         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1448           sess->stats.active_sources++;
1449         on_new_ssrc (sess, source);
1450       }
1451     }
1452   }
1453   gst_buffer_unref (buffer);
1454
1455   RTP_SESSION_UNLOCK (sess);
1456
1457   return result;
1458
1459   /* ERRORS */
1460 invalid_packet:
1461   {
1462     gst_buffer_unref (buffer);
1463     GST_DEBUG ("invalid RTP packet received");
1464     return GST_FLOW_OK;
1465   }
1466 ignore:
1467   {
1468     gst_buffer_unref (buffer);
1469     RTP_SESSION_UNLOCK (sess);
1470     GST_DEBUG ("ignoring RTP packet because we are leaving");
1471     return GST_FLOW_OK;
1472   }
1473 collision:
1474   {
1475     gst_buffer_unref (buffer);
1476     RTP_SESSION_UNLOCK (sess);
1477     GST_DEBUG ("ignoring packet because its collisioning");
1478     return GST_FLOW_OK;
1479   }
1480 }
1481
1482 static void
1483 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1484     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1485 {
1486   guint count, i;
1487
1488   count = gst_rtcp_packet_get_rb_count (packet);
1489   for (i = 0; i < count; i++) {
1490     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1491     guint8 fractionlost;
1492     gint32 packetslost;
1493
1494     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1495         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1496
1497     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1498
1499     if (ssrc == sess->source->ssrc) {
1500       /* only deal with report blocks for our session, we update the stats of
1501        * the sender of the RTCP message. We could also compare our stats against
1502        * the other sender to see if we are better or worse. */
1503       rtp_source_process_rb (source, arrival->time, fractionlost, packetslost,
1504           exthighestseq, jitter, lsr, dlsr);
1505
1506       on_ssrc_active (sess, source);
1507     }
1508   }
1509 }
1510
1511 /* A Sender report contains statistics about how the sender is doing. This
1512  * includes timing informataion such as the relation between RTP and NTP
1513  * timestamps and the number of packets/bytes it sent to us.
1514  *
1515  * In this report is also included a set of report blocks related to how this
1516  * sender is receiving data (in case we (or somebody else) is also sending stuff
1517  * to it). This info includes the packet loss, jitter and seqnum. It also
1518  * contains information to calculate the round trip time (LSR/DLSR).
1519  */
1520 static void
1521 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1522     RTPArrivalStats * arrival)
1523 {
1524   guint32 senderssrc, rtptime, packet_count, octet_count;
1525   guint64 ntptime;
1526   RTPSource *source;
1527   gboolean created, prevsender;
1528
1529   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1530       &packet_count, &octet_count);
1531
1532   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1533       senderssrc, GST_TIME_ARGS (arrival->time));
1534
1535   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1536
1537   if (!source)
1538     return;
1539
1540   prevsender = RTP_SOURCE_IS_SENDER (source);
1541
1542   /* first update the source */
1543   rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count,
1544       octet_count);
1545
1546   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1547     sess->stats.sender_sources++;
1548     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1549         sess->stats.sender_sources);
1550   }
1551
1552   if (created)
1553     on_new_ssrc (sess, source);
1554
1555   rtp_session_process_rb (sess, source, packet, arrival);
1556 }
1557
1558 /* A receiver report contains statistics about how a receiver is doing. It
1559  * includes stuff like packet loss, jitter and the seqnum it received last. It
1560  * also contains info to calculate the round trip time.
1561  *
1562  * We are only interested in how the sender of this report is doing wrt to us.
1563  */
1564 static void
1565 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1566     RTPArrivalStats * arrival)
1567 {
1568   guint32 senderssrc;
1569   RTPSource *source;
1570   gboolean created;
1571
1572   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1573
1574   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1575
1576   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1577
1578   if (!source)
1579     return;
1580
1581   if (created)
1582     on_new_ssrc (sess, source);
1583
1584   rtp_session_process_rb (sess, source, packet, arrival);
1585 }
1586
1587 /* Get SDES items and store them in the SSRC */
1588 static void
1589 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1590     RTPArrivalStats * arrival)
1591 {
1592   guint items, i, j;
1593   gboolean more_items, more_entries;
1594
1595   items = gst_rtcp_packet_sdes_get_item_count (packet);
1596   GST_DEBUG ("got SDES packet with %d items", items);
1597
1598   more_items = gst_rtcp_packet_sdes_first_item (packet);
1599   i = 0;
1600   while (more_items) {
1601     guint32 ssrc;
1602     gboolean changed, created;
1603     RTPSource *source;
1604
1605     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1606
1607     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1608
1609     /* find src, no probation when dealing with RTCP */
1610     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1611     changed = FALSE;
1612
1613     if (!source)
1614       return;
1615
1616     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1617     j = 0;
1618     while (more_entries) {
1619       GstRTCPSDESType type;
1620       guint8 len;
1621       guint8 *data;
1622
1623       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1624
1625       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1626           data);
1627
1628       changed |= rtp_source_set_sdes (source, type, data, len);
1629
1630       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1631       j++;
1632     }
1633
1634     source->validated = TRUE;
1635
1636     if (created)
1637       on_new_ssrc (sess, source);
1638     if (changed)
1639       on_ssrc_sdes (sess, source);
1640
1641     more_items = gst_rtcp_packet_sdes_next_item (packet);
1642     i++;
1643   }
1644 }
1645
1646 /* BYE is sent when a client leaves the session
1647  */
1648 static void
1649 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1650     RTPArrivalStats * arrival)
1651 {
1652   guint count, i;
1653   gchar *reason;
1654
1655   reason = gst_rtcp_packet_bye_get_reason (packet);
1656   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1657
1658   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1659   for (i = 0; i < count; i++) {
1660     guint32 ssrc;
1661     RTPSource *source;
1662     gboolean created, prevactive, prevsender;
1663     guint pmembers, members;
1664
1665     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1666     GST_DEBUG ("SSRC: %08x", ssrc);
1667
1668     /* find src and mark bye, no probation when dealing with RTCP */
1669     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1670
1671     if (!source)
1672       return;
1673
1674     /* store time for when we need to time out this source */
1675     source->bye_time = arrival->time;
1676
1677     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1678     prevsender = RTP_SOURCE_IS_SENDER (source);
1679
1680     /* let the source handle the rest */
1681     rtp_source_process_bye (source, reason);
1682
1683     pmembers = sess->stats.active_sources;
1684
1685     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1686       sess->stats.active_sources--;
1687       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1688           sess->stats.active_sources);
1689     }
1690     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1691       sess->stats.sender_sources--;
1692       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1693           sess->stats.sender_sources);
1694     }
1695     members = sess->stats.active_sources;
1696
1697     if (!sess->source->received_bye && members < pmembers) {
1698       /* some members went away since the previous timeout estimate.
1699        * Perform reverse reconsideration but only when we are not scheduling a
1700        * BYE ourselves. */
1701       if (arrival->time < sess->next_rtcp_check_time) {
1702         GstClockTime time_remaining;
1703
1704         time_remaining = sess->next_rtcp_check_time - arrival->time;
1705         sess->next_rtcp_check_time =
1706             gst_util_uint64_scale (time_remaining, members, pmembers);
1707
1708         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1709             GST_TIME_ARGS (sess->next_rtcp_check_time));
1710
1711         sess->next_rtcp_check_time += arrival->time;
1712
1713         RTP_SESSION_UNLOCK (sess);
1714         /* notify app of reconsideration */
1715         if (sess->callbacks.reconsider)
1716           sess->callbacks.reconsider (sess, sess->reconsider_user_data);
1717         RTP_SESSION_LOCK (sess);
1718       }
1719     }
1720
1721     if (created)
1722       on_new_ssrc (sess, source);
1723
1724     on_bye_ssrc (sess, source);
1725   }
1726   g_free (reason);
1727 }
1728
1729 static void
1730 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1731     RTPArrivalStats * arrival)
1732 {
1733   GST_DEBUG ("received APP");
1734 }
1735
1736 /**
1737  * rtp_session_process_rtcp:
1738  * @sess: and #RTPSession
1739  * @buffer: an RTCP buffer
1740  * @current_time: the current system time
1741  *
1742  * Process an RTCP buffer in the session manager. This function takes ownership
1743  * of @buffer.
1744  *
1745  * Returns: a #GstFlowReturn.
1746  */
1747 GstFlowReturn
1748 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
1749     GstClockTime current_time)
1750 {
1751   GstRTCPPacket packet;
1752   gboolean more, is_bye = FALSE, is_sr = FALSE;
1753   RTPArrivalStats arrival;
1754   GstFlowReturn result = GST_FLOW_OK;
1755
1756   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1757   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1758
1759   if (!gst_rtcp_buffer_validate (buffer))
1760     goto invalid_packet;
1761
1762   GST_DEBUG ("received RTCP packet");
1763
1764   RTP_SESSION_LOCK (sess);
1765   /* update arrival stats */
1766   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1, -1);
1767
1768   if (sess->sent_bye)
1769     goto ignore;
1770
1771   /* make writable, we might want to change the buffer */
1772   buffer = gst_buffer_make_metadata_writable (buffer);
1773
1774   /* start processing the compound packet */
1775   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1776   while (more) {
1777     GstRTCPType type;
1778
1779     type = gst_rtcp_packet_get_type (&packet);
1780
1781     /* when we are leaving the session, we should ignore all non-BYE messages */
1782     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1783       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1784       goto next;
1785     }
1786
1787     switch (type) {
1788       case GST_RTCP_TYPE_SR:
1789         rtp_session_process_sr (sess, &packet, &arrival);
1790         is_sr = TRUE;
1791         break;
1792       case GST_RTCP_TYPE_RR:
1793         rtp_session_process_rr (sess, &packet, &arrival);
1794         break;
1795       case GST_RTCP_TYPE_SDES:
1796         rtp_session_process_sdes (sess, &packet, &arrival);
1797         break;
1798       case GST_RTCP_TYPE_BYE:
1799         is_bye = TRUE;
1800         rtp_session_process_bye (sess, &packet, &arrival);
1801         break;
1802       case GST_RTCP_TYPE_APP:
1803         rtp_session_process_app (sess, &packet, &arrival);
1804         break;
1805       default:
1806         GST_WARNING ("got unknown RTCP packet");
1807         break;
1808     }
1809   next:
1810     more = gst_rtcp_packet_move_to_next (&packet);
1811   }
1812
1813   /* if we are scheduling a BYE, we only want to count bye packets, else we
1814    * count everything */
1815   if (sess->source->received_bye) {
1816     if (is_bye) {
1817       sess->stats.bye_members++;
1818       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1819     }
1820   } else {
1821     /* keep track of average packet size */
1822     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1823   }
1824   RTP_SESSION_UNLOCK (sess);
1825
1826   /* notify caller of sr packets in the callback */
1827   if (is_sr && sess->callbacks.sync_rtcp)
1828     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
1829         sess->sync_rtcp_user_data);
1830   else
1831     gst_buffer_unref (buffer);
1832
1833   return result;
1834
1835   /* ERRORS */
1836 invalid_packet:
1837   {
1838     GST_DEBUG ("invalid RTCP packet received");
1839     gst_buffer_unref (buffer);
1840     return GST_FLOW_OK;
1841   }
1842 ignore:
1843   {
1844     gst_buffer_unref (buffer);
1845     RTP_SESSION_UNLOCK (sess);
1846     GST_DEBUG ("ignoring RTP packet because we left");
1847     return GST_FLOW_OK;
1848   }
1849 }
1850
1851 /**
1852  * rtp_session_send_rtp:
1853  * @sess: an #RTPSession
1854  * @buffer: an RTP buffer
1855  * @current_time: the current system time
1856  * @ntpnstime: the NTP time in nanoseconds of when this buffer was captured.
1857  * This is the buffer timestamp converted to NTP time.
1858  *
1859  * Send the RTP buffer in the session manager. This function takes ownership of
1860  * @buffer.
1861  *
1862  * Returns: a #GstFlowReturn.
1863  */
1864 GstFlowReturn
1865 rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer,
1866     GstClockTime current_time, guint64 ntpnstime)
1867 {
1868   GstFlowReturn result;
1869   RTPSource *source;
1870   gboolean prevsender;
1871
1872   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1873   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1874
1875   if (!gst_rtp_buffer_validate (buffer))
1876     goto invalid_packet;
1877
1878   GST_LOG ("received RTP packet for sending");
1879
1880   RTP_SESSION_LOCK (sess);
1881   source = sess->source;
1882
1883   /* update last activity */
1884   source->last_rtp_activity = current_time;
1885
1886   prevsender = RTP_SOURCE_IS_SENDER (source);
1887
1888   /* we use our own source to send */
1889   result = rtp_source_send_rtp (source, buffer, ntpnstime);
1890
1891   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
1892     sess->stats.sender_sources++;
1893   RTP_SESSION_UNLOCK (sess);
1894
1895   return result;
1896
1897   /* ERRORS */
1898 invalid_packet:
1899   {
1900     gst_buffer_unref (buffer);
1901     GST_DEBUG ("invalid RTP packet received");
1902     return GST_FLOW_OK;
1903   }
1904 }
1905
1906 static GstClockTime
1907 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
1908     gboolean first)
1909 {
1910   GstClockTime result;
1911
1912   if (sess->source->received_bye) {
1913     result = rtp_stats_calculate_bye_interval (&sess->stats);
1914   } else {
1915     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
1916         RTP_SOURCE_IS_SENDER (sess->source), first);
1917   }
1918
1919   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
1920       GST_TIME_ARGS (result), first);
1921
1922   if (!deterministic)
1923     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
1924
1925   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1926
1927   return result;
1928 }
1929
1930 /**
1931  * rtp_session_send_bye_locked:
1932  * @sess: an #RTPSession
1933  * @reason: a reason or NULL
1934  *
1935  * Stop the current @sess and schedule a BYE message for the other members.
1936  *
1937  * One must have the session lock to call this function
1938  *
1939  * Returns: a #GstFlowReturn.
1940  */
1941 static GstFlowReturn
1942 rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason,
1943     GstClockTime current_time)
1944 {
1945   GstFlowReturn result = GST_FLOW_OK;
1946   RTPSource *source;
1947   GstClockTime interval;
1948
1949   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1950
1951   source = sess->source;
1952
1953   /* ignore more BYEs */
1954   if (source->received_bye)
1955     goto done;
1956
1957   /* we have BYE now */
1958   source->received_bye = TRUE;
1959   /* at least one member wants to send a BYE */
1960   g_free (sess->bye_reason);
1961   sess->bye_reason = g_strdup (reason);
1962   sess->stats.avg_rtcp_packet_size = 100;
1963   sess->stats.bye_members = 1;
1964   sess->first_rtcp = TRUE;
1965   sess->sent_bye = FALSE;
1966
1967   /* reschedule transmission */
1968   sess->last_rtcp_send_time = current_time;
1969   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
1970   sess->next_rtcp_check_time = current_time + interval;
1971
1972   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
1973       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
1974
1975   RTP_SESSION_UNLOCK (sess);
1976   /* notify app of reconsideration */
1977   if (sess->callbacks.reconsider)
1978     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
1979   RTP_SESSION_LOCK (sess);
1980 done:
1981
1982   return result;
1983 }
1984
1985 /**
1986  * rtp_session_send_bye:
1987  * @sess: an #RTPSession
1988  * @reason: a reason or NULL
1989  * @current_time: the current system time
1990  *
1991  * Stop the current @sess and schedule a BYE message for the other members.
1992  *
1993  * One must have the session lock to call this function
1994  *
1995  * Returns: a #GstFlowReturn.
1996  */
1997 GstFlowReturn
1998 rtp_session_send_bye (RTPSession * sess, const gchar * reason,
1999     GstClockTime current_time)
2000 {
2001   GstFlowReturn result = GST_FLOW_OK;
2002
2003   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2004
2005   RTP_SESSION_LOCK (sess);
2006   result = rtp_session_send_bye_locked (sess, reason, current_time);
2007   RTP_SESSION_UNLOCK (sess);
2008
2009   return result;
2010 }
2011
2012 /**
2013  * rtp_session_next_timeout:
2014  * @sess: an #RTPSession
2015  * @current_time: the current system time
2016  *
2017  * Get the next time we should perform session maintenance tasks.
2018  *
2019  * Returns: a time when rtp_session_on_timeout() should be called with the
2020  * current system time.
2021  */
2022 GstClockTime
2023 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2024 {
2025   GstClockTime result;
2026
2027   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2028
2029   RTP_SESSION_LOCK (sess);
2030
2031   result = sess->next_rtcp_check_time;
2032
2033   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
2034       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2035
2036   if (result < current_time) {
2037     GST_DEBUG ("take current time as base");
2038     /* our previous check time expired, start counting from the current time
2039      * again. */
2040     result = current_time;
2041   }
2042
2043   if (sess->source->received_bye) {
2044     if (sess->sent_bye) {
2045       GST_DEBUG ("we sent BYE already");
2046       result = GST_CLOCK_TIME_NONE;
2047     } else if (sess->stats.active_sources >= 50) {
2048       GST_DEBUG ("reconsider BYE, more than 50 sources");
2049       /* reconsider BYE if members >= 50 */
2050       result += calculate_rtcp_interval (sess, FALSE, TRUE);
2051     }
2052   } else {
2053     if (sess->first_rtcp) {
2054       GST_DEBUG ("first RTCP packet");
2055       /* we are called for the first time */
2056       result += calculate_rtcp_interval (sess, FALSE, TRUE);
2057     } else if (sess->next_rtcp_check_time < current_time) {
2058       GST_DEBUG ("old check time expired, getting new timeout");
2059       /* get a new timeout when we need to */
2060       result += calculate_rtcp_interval (sess, FALSE, FALSE);
2061     }
2062   }
2063   sess->next_rtcp_check_time = result;
2064
2065   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2066   RTP_SESSION_UNLOCK (sess);
2067
2068   return result;
2069 }
2070
2071 typedef struct
2072 {
2073   RTPSession *sess;
2074   GstBuffer *rtcp;
2075   GstClockTime current_time;
2076   guint64 ntpnstime;
2077   GstClockTime interval;
2078   GstRTCPPacket packet;
2079   gboolean is_bye;
2080   gboolean has_sdes;
2081 } ReportData;
2082
2083 static void
2084 session_start_rtcp (RTPSession * sess, ReportData * data)
2085 {
2086   GstRTCPPacket *packet = &data->packet;
2087   RTPSource *own = sess->source;
2088
2089   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2090
2091   if (RTP_SOURCE_IS_SENDER (own)) {
2092     guint64 ntptime;
2093     guint32 rtptime;
2094     guint32 packet_count, octet_count;
2095
2096     /* we are a sender, create SR */
2097     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2098     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
2099
2100     /* get latest stats */
2101     rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime,
2102         &packet_count, &octet_count);
2103     /* store stats */
2104     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2105         packet_count, octet_count);
2106
2107     /* fill in sender report info */
2108     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2109         ntptime, rtptime, packet_count, octet_count);
2110   } else {
2111     /* we are only receiver, create RR */
2112     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2113     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
2114     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2115   }
2116 }
2117
2118 /* construct a Sender or Receiver Report */
2119 static void
2120 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2121 {
2122   RTPSession *sess = data->sess;
2123   GstRTCPPacket *packet = &data->packet;
2124
2125   /* create a new buffer if needed */
2126   if (data->rtcp == NULL) {
2127     session_start_rtcp (sess, data);
2128   }
2129   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
2130     /* only report about other sender sources */
2131     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
2132       guint8 fractionlost;
2133       gint32 packetslost;
2134       guint32 exthighestseq, jitter;
2135       guint32 lsr, dlsr;
2136
2137       /* get new stats */
2138       rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2139           &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2140
2141       /* packet is not yet filled, add report block for this source. */
2142       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2143           exthighestseq, jitter, lsr, dlsr);
2144     }
2145   }
2146 }
2147
2148 /* perform cleanup of sources that timed out */
2149 static gboolean
2150 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2151 {
2152   gboolean remove = FALSE;
2153   gboolean byetimeout = FALSE;
2154   gboolean sendertimeout = FALSE;
2155   gboolean is_sender, is_active;
2156   RTPSession *sess = data->sess;
2157   GstClockTime interval;
2158
2159   is_sender = RTP_SOURCE_IS_SENDER (source);
2160   is_active = RTP_SOURCE_IS_ACTIVE (source);
2161
2162   /* check for our own source, we don't want to delete our own source. */
2163   if (!(source == sess->source)) {
2164     if (source->received_bye) {
2165       /* if we received a BYE from the source, remove the source after some
2166        * time. */
2167       if (data->current_time > source->bye_time &&
2168           data->current_time - source->bye_time > sess->stats.bye_timeout) {
2169         GST_DEBUG ("removing BYE source %08x", source->ssrc);
2170         remove = TRUE;
2171         byetimeout = TRUE;
2172       }
2173     }
2174     /* sources that were inactive for more than 5 times the deterministic reporting
2175      * interval get timed out. the min timeout is 5 seconds. */
2176     if (data->current_time > source->last_activity) {
2177       interval = MAX (data->interval * 5, 5 * GST_SECOND);
2178       if (data->current_time - source->last_activity > interval) {
2179         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2180             source->ssrc, GST_TIME_ARGS (source->last_activity));
2181         remove = TRUE;
2182       }
2183     }
2184   }
2185
2186   /* senders that did not send for a long time become a receiver, this also
2187    * holds for our own source. */
2188   if (is_sender) {
2189     if (data->current_time > source->last_rtp_activity) {
2190       interval = MAX (data->interval * 2, 5 * GST_SECOND);
2191       if (data->current_time - source->last_rtp_activity > interval) {
2192         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
2193             GST_TIME_FORMAT, source->ssrc,
2194             GST_TIME_ARGS (source->last_rtp_activity));
2195         source->is_sender = FALSE;
2196         sess->stats.sender_sources--;
2197         sendertimeout = TRUE;
2198       }
2199     }
2200   }
2201
2202   if (remove) {
2203     sess->total_sources--;
2204     if (is_sender)
2205       sess->stats.sender_sources--;
2206     if (is_active)
2207       sess->stats.active_sources--;
2208
2209     if (byetimeout)
2210       on_bye_timeout (sess, source);
2211     else
2212       on_timeout (sess, source);
2213   } else {
2214     if (sendertimeout)
2215       on_sender_timeout (sess, source);
2216   }
2217   return remove;
2218 }
2219
2220 static void
2221 session_sdes (RTPSession * sess, ReportData * data)
2222 {
2223   GstRTCPPacket *packet = &data->packet;
2224   guint8 *sdes_data;
2225   guint sdes_len;
2226
2227   /* add SDES packet */
2228   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
2229
2230   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
2231
2232   rtp_source_get_sdes (sess->source, GST_RTCP_SDES_CNAME, &sdes_data,
2233       &sdes_len);
2234   gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME, sdes_len,
2235       sdes_data);
2236
2237   /* other SDES items must only be added at regular intervals and only when the
2238    * user requests to since it might be a privacy problem */
2239 #if 0
2240   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME,
2241       strlen (sess->name), (guint8 *) sess->name);
2242   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
2243       strlen (sess->tool), (guint8 *) sess->tool);
2244 #endif
2245
2246   data->has_sdes = TRUE;
2247 }
2248
2249 /* schedule a BYE packet */
2250 static void
2251 session_bye (RTPSession * sess, ReportData * data)
2252 {
2253   GstRTCPPacket *packet = &data->packet;
2254
2255   /* open packet */
2256   session_start_rtcp (sess, data);
2257
2258   /* add SDES */
2259   session_sdes (sess, data);
2260
2261   /* add a BYE packet */
2262   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
2263   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
2264   if (sess->bye_reason)
2265     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
2266
2267   /* we have a BYE packet now */
2268   data->is_bye = TRUE;
2269 }
2270
2271 static gboolean
2272 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
2273 {
2274   GstClockTime new_send_time, elapsed;
2275   gboolean result;
2276
2277   /* no need to check yet */
2278   if (sess->next_rtcp_check_time > current_time) {
2279     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
2280         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
2281         GST_TIME_ARGS (current_time));
2282     return FALSE;
2283   }
2284
2285   /* get elapsed time since we last reported */
2286   elapsed = current_time - sess->last_rtcp_send_time;
2287
2288   /* perform forward reconsideration */
2289   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
2290
2291   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
2292       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
2293
2294   new_send_time += sess->last_rtcp_send_time;
2295
2296   /* check if reconsideration */
2297   if (current_time < new_send_time) {
2298     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
2299         GST_TIME_ARGS (new_send_time));
2300     result = FALSE;
2301     /* store new check time */
2302     sess->next_rtcp_check_time = new_send_time;
2303   } else {
2304     result = TRUE;
2305     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
2306
2307     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
2308         GST_TIME_ARGS (new_send_time));
2309     sess->next_rtcp_check_time = current_time + new_send_time;
2310   }
2311   return result;
2312 }
2313
2314 /**
2315  * rtp_session_on_timeout:
2316  * @sess: an #RTPSession
2317  * @current_time: the current system time
2318  * @ntpnstime: the current NTP time in nanoseconds
2319  *
2320  * Perform maintenance actions after the timeout obtained with
2321  * rtp_session_next_timeout() expired.
2322  *
2323  * This function will perform timeouts of receivers and senders, send a BYE
2324  * packet or generate RTCP packets with current session stats.
2325  *
2326  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
2327  * times, for each packet that should be processed.
2328  *
2329  * Returns: a #GstFlowReturn.
2330  */
2331 GstFlowReturn
2332 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
2333     guint64 ntpnstime)
2334 {
2335   GstFlowReturn result = GST_FLOW_OK;
2336   GList *item;
2337   ReportData data;
2338   RTPSource *own;
2339
2340   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2341
2342   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
2343       GST_TIME_ARGS (current_time), GST_TIME_ARGS (ntpnstime));
2344
2345   data.sess = sess;
2346   data.rtcp = NULL;
2347   data.current_time = current_time;
2348   data.ntpnstime = ntpnstime;
2349   data.is_bye = FALSE;
2350   data.has_sdes = FALSE;
2351
2352   own = sess->source;
2353
2354   RTP_SESSION_LOCK (sess);
2355   /* get a new interval, we need this for various cleanups etc */
2356   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
2357
2358   /* first perform cleanups */
2359   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
2360       (GHRFunc) session_cleanup, &data);
2361
2362   /* see if we need to generate SR or RR packets */
2363   if (is_rtcp_time (sess, current_time, &data)) {
2364     if (own->received_bye) {
2365       /* generate BYE instead */
2366       GST_DEBUG ("generating BYE message");
2367       session_bye (sess, &data);
2368       sess->sent_bye = TRUE;
2369     } else {
2370       /* loop over all known sources and do something */
2371       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2372           (GHFunc) session_report_blocks, &data);
2373     }
2374   }
2375
2376   if (data.rtcp) {
2377     guint size;
2378
2379     /* we keep track of the last report time in order to timeout inactive
2380      * receivers or senders */
2381     sess->last_rtcp_send_time = data.current_time;
2382     sess->first_rtcp = FALSE;
2383
2384     /* add SDES for this source when not already added */
2385     if (!data.has_sdes)
2386       session_sdes (sess, &data);
2387
2388     /* update average RTCP size before sending */
2389     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
2390     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
2391   }
2392
2393   /* check for outdated collisions */
2394   GST_DEBUG ("checking collision list");
2395   item = g_list_first (sess->conflicting_addresses);
2396   while (item) {
2397     RTPConflictingAddress *known_conflict = item->data;
2398     GList *next_item = g_list_next (item);
2399
2400     if (known_conflict->time < current_time - (data.interval *
2401             RTCP_INTERVAL_COLLISION_TIMEOUT)) {
2402       sess->conflicting_addresses =
2403           g_list_delete_link (sess->conflicting_addresses, item);
2404       GST_DEBUG ("collision %p timed out", known_conflict);
2405       g_free (known_conflict);
2406     }
2407     item = next_item;
2408   }
2409
2410   if (sess->change_ssrc) {
2411     GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
2412     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
2413         GINT_TO_POINTER (own->ssrc));
2414
2415     own->ssrc = rtp_session_create_new_ssrc (sess);
2416     rtp_source_reset (own);
2417
2418     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
2419         GINT_TO_POINTER (own->ssrc), own);
2420
2421     g_free (sess->bye_reason);
2422     sess->bye_reason = NULL;
2423     sess->sent_bye = FALSE;
2424     sess->change_ssrc = FALSE;
2425     GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
2426   }
2427   RTP_SESSION_UNLOCK (sess);
2428
2429   /* push out the RTCP packet */
2430   if (data.rtcp) {
2431     /* close the RTCP packet */
2432     gst_rtcp_buffer_end (data.rtcp);
2433
2434     GST_DEBUG ("sending packet");
2435     if (sess->callbacks.send_rtcp)
2436       result = sess->callbacks.send_rtcp (sess, own, data.rtcp,
2437           sess->sent_bye, sess->send_rtcp_user_data);
2438     else {
2439       GST_DEBUG ("freeing packet");
2440       gst_buffer_unref (data.rtcp);
2441     }
2442   }
2443
2444   return result;
2445 }