4e8a732d40ba1e2b17c56cfb85942f6b4ddfda39
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "gstrtpbin-marshal.h"
27 #include "rtpsession.h"
28
29 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
30 #define GST_CAT_DEFAULT rtp_session_debug
31
32 /* signals and args */
33 enum
34 {
35   SIGNAL_GET_SOURCE_BY_SSRC,
36   SIGNAL_ON_NEW_SSRC,
37   SIGNAL_ON_SSRC_COLLISION,
38   SIGNAL_ON_SSRC_VALIDATED,
39   SIGNAL_ON_SSRC_ACTIVE,
40   SIGNAL_ON_SSRC_SDES,
41   SIGNAL_ON_BYE_SSRC,
42   SIGNAL_ON_BYE_TIMEOUT,
43   SIGNAL_ON_TIMEOUT,
44   SIGNAL_ON_SENDER_TIMEOUT,
45   LAST_SIGNAL
46 };
47
48 #define DEFAULT_INTERNAL_SOURCE      NULL
49 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
50 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_BANDWIDTH
51 #define DEFAULT_SDES_CNAME           NULL
52 #define DEFAULT_SDES_NAME            NULL
53 #define DEFAULT_SDES_EMAIL           NULL
54 #define DEFAULT_SDES_PHONE           NULL
55 #define DEFAULT_SDES_LOCATION        NULL
56 #define DEFAULT_SDES_TOOL            NULL
57 #define DEFAULT_SDES_NOTE            NULL
58 #define DEFAULT_NUM_SOURCES          0
59 #define DEFAULT_NUM_ACTIVE_SOURCES   0
60 #define DEFAULT_SOURCES              NULL
61
62 enum
63 {
64   PROP_0,
65   PROP_INTERNAL_SOURCE,
66   PROP_BANDWIDTH,
67   PROP_RTCP_FRACTION,
68   PROP_SDES_CNAME,
69   PROP_SDES_NAME,
70   PROP_SDES_EMAIL,
71   PROP_SDES_PHONE,
72   PROP_SDES_LOCATION,
73   PROP_SDES_TOOL,
74   PROP_SDES_NOTE,
75   PROP_NUM_SOURCES,
76   PROP_NUM_ACTIVE_SOURCES,
77   PROP_SOURCES,
78   PROP_LAST
79 };
80
81 /* update average packet size, we keep this scaled by 16 to keep enough
82  * precision. */
83 #define UPDATE_AVG(avg, val)            \
84   if ((avg) == 0)                       \
85    (avg) = (val) << 4;                  \
86   else                                  \
87    (avg) = ((val) + (15 * (avg))) >> 4;
88
89 /* The number RTCP intervals after which to timeout entries in the
90  * collision table
91  */
92 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
93
94 /* GObject vmethods */
95 static void rtp_session_finalize (GObject * object);
96 static void rtp_session_set_property (GObject * object, guint prop_id,
97     const GValue * value, GParamSpec * pspec);
98 static void rtp_session_get_property (GObject * object, guint prop_id,
99     GValue * value, GParamSpec * pspec);
100
101 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
102
103 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
104
105 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
106     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
107 static GstFlowReturn rtp_session_send_bye_locked (RTPSession * sess,
108     const gchar * reason, GstClockTime current_time);
109 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
110     gboolean deterministic, gboolean first);
111
112 static void
113 rtp_session_class_init (RTPSessionClass * klass)
114 {
115   GObjectClass *gobject_class;
116
117   gobject_class = (GObjectClass *) klass;
118
119   gobject_class->finalize = rtp_session_finalize;
120   gobject_class->set_property = rtp_session_set_property;
121   gobject_class->get_property = rtp_session_get_property;
122
123   /**
124    * RTPSession::get-source-by-ssrc:
125    * @session: the object which received the signal
126    * @ssrc: the SSRC of the RTPSource
127    *
128    * Request the #RTPSource object with SSRC @ssrc in @session.
129    */
130   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
131       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
132       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
133           get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
134       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
135
136   /**
137    * RTPSession::on-new-ssrc:
138    * @session: the object which received the signal
139    * @src: the new RTPSource
140    *
141    * Notify of a new SSRC that entered @session.
142    */
143   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
144       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
145       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
146       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
147       RTP_TYPE_SOURCE);
148   /**
149    * RTPSession::on-ssrc-collision:
150    * @session: the object which received the signal
151    * @src: the #RTPSource that caused a collision
152    *
153    * Notify when we have an SSRC collision
154    */
155   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
156       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
157       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
158       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
159       RTP_TYPE_SOURCE);
160   /**
161    * RTPSession::on-ssrc-validated:
162    * @session: the object which received the signal
163    * @src: the new validated RTPSource
164    *
165    * Notify of a new SSRC that became validated.
166    */
167   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
168       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
169       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
170       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
171       RTP_TYPE_SOURCE);
172   /**
173    * RTPSession::on-ssrc-active:
174    * @session: the object which received the signal
175    * @src: the active RTPSource
176    *
177    * Notify of a SSRC that is active, i.e., sending RTCP.
178    */
179   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
180       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
181       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
182       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
183       RTP_TYPE_SOURCE);
184   /**
185    * RTPSession::on-ssrc-sdes:
186    * @session: the object which received the signal
187    * @src: the RTPSource
188    *
189    * Notify that a new SDES was received for SSRC.
190    */
191   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
192       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
193       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
194       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
195       RTP_TYPE_SOURCE);
196   /**
197    * RTPSession::on-bye-ssrc:
198    * @session: the object which received the signal
199    * @src: the RTPSource that went away
200    *
201    * Notify of an SSRC that became inactive because of a BYE packet.
202    */
203   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
204       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
205       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
206       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
207       RTP_TYPE_SOURCE);
208   /**
209    * RTPSession::on-bye-timeout:
210    * @session: the object which received the signal
211    * @src: the RTPSource that timed out
212    *
213    * Notify of an SSRC that has timed out because of BYE
214    */
215   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
216       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
217       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
218       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
219       RTP_TYPE_SOURCE);
220   /**
221    * RTPSession::on-timeout:
222    * @session: the object which received the signal
223    * @src: the RTPSource that timed out
224    *
225    * Notify of an SSRC that has timed out
226    */
227   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
228       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
229       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
230       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
231       RTP_TYPE_SOURCE);
232   /**
233    * RTPSession::on-sender-timeout:
234    * @session: the object which received the signal
235    * @src: the RTPSource that timed out
236    *
237    * Notify of an SSRC that was a sender but timed out and became a receiver.
238    */
239   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
240       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
241       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
242       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
243       RTP_TYPE_SOURCE);
244
245   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
246       g_param_spec_object ("internal-source", "Internal Source",
247           "The internal source element of the session",
248           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
249
250   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
251       g_param_spec_double ("bandwidth", "Bandwidth",
252           "The bandwidth of the session",
253           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
254           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
255
256   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
257       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
258           "The fraction of the bandwidth used for RTCP",
259           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
260           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
261
262   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
263       g_param_spec_string ("sdes-cname", "SDES CNAME",
264           "The CNAME to put in SDES messages of this session",
265           DEFAULT_SDES_CNAME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266
267   g_object_class_install_property (gobject_class, PROP_SDES_NAME,
268       g_param_spec_string ("sdes-name", "SDES NAME",
269           "The NAME to put in SDES messages of this session",
270           DEFAULT_SDES_NAME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
271
272   g_object_class_install_property (gobject_class, PROP_SDES_EMAIL,
273       g_param_spec_string ("sdes-email", "SDES EMAIL",
274           "The EMAIL to put in SDES messages of this session",
275           DEFAULT_SDES_EMAIL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
276
277   g_object_class_install_property (gobject_class, PROP_SDES_PHONE,
278       g_param_spec_string ("sdes-phone", "SDES PHONE",
279           "The PHONE to put in SDES messages of this session",
280           DEFAULT_SDES_PHONE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
281
282   g_object_class_install_property (gobject_class, PROP_SDES_LOCATION,
283       g_param_spec_string ("sdes-location", "SDES LOCATION",
284           "The LOCATION to put in SDES messages of this session",
285           DEFAULT_SDES_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286
287   g_object_class_install_property (gobject_class, PROP_SDES_TOOL,
288       g_param_spec_string ("sdes-tool", "SDES TOOL",
289           "The TOOL to put in SDES messages of this session",
290           DEFAULT_SDES_TOOL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
291
292   g_object_class_install_property (gobject_class, PROP_SDES_NOTE,
293       g_param_spec_string ("sdes-note", "SDES NOTE",
294           "The NOTE to put in SDES messages of this session",
295           DEFAULT_SDES_NOTE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
296
297   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
298       g_param_spec_uint ("num-sources", "Num Sources",
299           "The number of sources in the session", 0, G_MAXUINT,
300           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
301
302   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
303       g_param_spec_uint ("num-active-sources", "Num Active Sources",
304           "The number of active sources in the session", 0, G_MAXUINT,
305           DEFAULT_NUM_ACTIVE_SOURCES,
306           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
307   /**
308    * RTPSource::sources
309    *
310    * Get a GValue Array of all sources in the session.
311    *
312    * <example>
313    * <title>Getting the #RTPSources of a session
314    * <programlisting>
315    * {
316    *   GValueArray *arr;
317    *   GValue *val;
318    *   guint i;
319    *
320    *   g_object_get (sess, "sources", &arr, NULL);
321    *
322    *   for (i = 0; i < arr->n_values; i++) {
323    *     RTPSource *source;
324    *
325    *     val = g_value_array_get_nth (arr, i);
326    *     source = g_value_get_object (val);
327    *   }
328    *   g_value_array_free (arr);
329    * }
330    * </programlisting>
331    * </example>
332    */
333   g_object_class_install_property (gobject_class, PROP_SOURCES,
334       g_param_spec_boxed ("sources", "Sources",
335           "An array of all known sources in the session",
336           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
337
338   klass->get_source_by_ssrc =
339       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
340
341   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
342 }
343
344 static void
345 rtp_session_init (RTPSession * sess)
346 {
347   gint i;
348   gchar *str;
349
350   sess->lock = g_mutex_new ();
351   sess->key = g_random_int ();
352   sess->mask_idx = 0;
353   sess->mask = 0;
354
355   for (i = 0; i < 32; i++) {
356     sess->ssrcs[i] =
357         g_hash_table_new_full (NULL, NULL, NULL,
358         (GDestroyNotify) g_object_unref);
359   }
360   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
361
362   rtp_stats_init_defaults (&sess->stats);
363
364   /* create an active SSRC for this session manager */
365   sess->source = rtp_session_create_source (sess);
366   sess->source->validated = TRUE;
367   sess->source->internal = TRUE;
368   sess->stats.active_sources++;
369
370   /* default UDP header length */
371   sess->header_len = 28;
372   sess->mtu = 1400;
373
374   /* some default SDES entries */
375   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
376   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
377   g_free (str);
378
379   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME,
380       g_get_real_name ());
381   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
382
383   sess->first_rtcp = TRUE;
384
385   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
386 }
387
388 static void
389 rtp_session_finalize (GObject * object)
390 {
391   RTPSession *sess;
392   gint i;
393
394   sess = RTP_SESSION_CAST (object);
395
396   g_mutex_free (sess->lock);
397   for (i = 0; i < 32; i++)
398     g_hash_table_destroy (sess->ssrcs[i]);
399
400   g_free (sess->bye_reason);
401
402   g_hash_table_destroy (sess->cnames);
403   g_object_unref (sess->source);
404
405   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
406 }
407
408 static void
409 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
410 {
411   GValue value = { 0 };
412
413   g_value_init (&value, RTP_TYPE_SOURCE);
414   g_value_take_object (&value, source);
415   g_value_array_append (arr, &value);
416 }
417
418 static GValueArray *
419 rtp_session_create_sources (RTPSession * sess)
420 {
421   GValueArray *res;
422   guint size;
423
424   RTP_SESSION_LOCK (sess);
425   /* get number of elements in the table */
426   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
427   /* create the result value array */
428   res = g_value_array_new (size);
429
430   /* and copy all values into the array */
431   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
432   RTP_SESSION_UNLOCK (sess);
433
434   return res;
435 }
436
437 static void
438 rtp_session_set_property (GObject * object, guint prop_id,
439     const GValue * value, GParamSpec * pspec)
440 {
441   RTPSession *sess;
442
443   sess = RTP_SESSION (object);
444
445   switch (prop_id) {
446     case PROP_BANDWIDTH:
447       rtp_session_set_bandwidth (sess, g_value_get_double (value));
448       break;
449     case PROP_RTCP_FRACTION:
450       rtp_session_set_rtcp_fraction (sess, g_value_get_double (value));
451       break;
452     case PROP_SDES_CNAME:
453       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_CNAME,
454           g_value_get_string (value));
455       break;
456     case PROP_SDES_NAME:
457       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NAME,
458           g_value_get_string (value));
459       break;
460     case PROP_SDES_EMAIL:
461       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_EMAIL,
462           g_value_get_string (value));
463       break;
464     case PROP_SDES_PHONE:
465       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_PHONE,
466           g_value_get_string (value));
467       break;
468     case PROP_SDES_LOCATION:
469       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_LOC,
470           g_value_get_string (value));
471       break;
472     case PROP_SDES_TOOL:
473       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_TOOL,
474           g_value_get_string (value));
475       break;
476     case PROP_SDES_NOTE:
477       rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NOTE,
478           g_value_get_string (value));
479       break;
480     default:
481       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
482       break;
483   }
484 }
485
486 static void
487 rtp_session_get_property (GObject * object, guint prop_id,
488     GValue * value, GParamSpec * pspec)
489 {
490   RTPSession *sess;
491
492   sess = RTP_SESSION (object);
493
494   switch (prop_id) {
495     case PROP_INTERNAL_SOURCE:
496       g_value_take_object (value, rtp_session_get_internal_source (sess));
497       break;
498     case PROP_BANDWIDTH:
499       g_value_set_double (value, rtp_session_get_bandwidth (sess));
500       break;
501     case PROP_RTCP_FRACTION:
502       g_value_set_double (value, rtp_session_get_rtcp_fraction (sess));
503       break;
504     case PROP_SDES_CNAME:
505       g_value_take_string (value, rtp_session_get_sdes_string (sess,
506               GST_RTCP_SDES_CNAME));
507       break;
508     case PROP_SDES_NAME:
509       g_value_take_string (value, rtp_session_get_sdes_string (sess,
510               GST_RTCP_SDES_NAME));
511       break;
512     case PROP_SDES_EMAIL:
513       g_value_take_string (value, rtp_session_get_sdes_string (sess,
514               GST_RTCP_SDES_EMAIL));
515       break;
516     case PROP_SDES_PHONE:
517       g_value_take_string (value, rtp_session_get_sdes_string (sess,
518               GST_RTCP_SDES_PHONE));
519       break;
520     case PROP_SDES_LOCATION:
521       g_value_take_string (value, rtp_session_get_sdes_string (sess,
522               GST_RTCP_SDES_LOC));
523       break;
524     case PROP_SDES_TOOL:
525       g_value_take_string (value, rtp_session_get_sdes_string (sess,
526               GST_RTCP_SDES_TOOL));
527       break;
528     case PROP_SDES_NOTE:
529       g_value_take_string (value, rtp_session_get_sdes_string (sess,
530               GST_RTCP_SDES_NOTE));
531       break;
532     case PROP_NUM_SOURCES:
533       g_value_set_uint (value, rtp_session_get_num_sources (sess));
534       break;
535     case PROP_NUM_ACTIVE_SOURCES:
536       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
537       break;
538     case PROP_SOURCES:
539       g_value_take_boxed (value, rtp_session_create_sources (sess));
540       break;
541     default:
542       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
543       break;
544   }
545 }
546
547 static void
548 on_new_ssrc (RTPSession * sess, RTPSource * source)
549 {
550   g_object_ref (source);
551   RTP_SESSION_UNLOCK (sess);
552   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
553   RTP_SESSION_LOCK (sess);
554   g_object_unref (source);
555 }
556
557 static void
558 on_ssrc_collision (RTPSession * sess, RTPSource * source)
559 {
560   g_object_ref (source);
561   RTP_SESSION_UNLOCK (sess);
562   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
563       source);
564   RTP_SESSION_LOCK (sess);
565   g_object_unref (source);
566 }
567
568 static void
569 on_ssrc_validated (RTPSession * sess, RTPSource * source)
570 {
571   g_object_ref (source);
572   RTP_SESSION_UNLOCK (sess);
573   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
574       source);
575   RTP_SESSION_LOCK (sess);
576   g_object_unref (source);
577 }
578
579 static void
580 on_ssrc_active (RTPSession * sess, RTPSource * source)
581 {
582   g_object_ref (source);
583   RTP_SESSION_UNLOCK (sess);
584   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
585   RTP_SESSION_LOCK (sess);
586   g_object_unref (source);
587 }
588
589 static void
590 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
591 {
592   g_object_ref (source);
593   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
594   RTP_SESSION_UNLOCK (sess);
595   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
596   RTP_SESSION_LOCK (sess);
597   g_object_unref (source);
598 }
599
600 static void
601 on_bye_ssrc (RTPSession * sess, RTPSource * source)
602 {
603   g_object_ref (source);
604   RTP_SESSION_UNLOCK (sess);
605   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
606   RTP_SESSION_LOCK (sess);
607   g_object_unref (source);
608 }
609
610 static void
611 on_bye_timeout (RTPSession * sess, RTPSource * source)
612 {
613   g_object_ref (source);
614   RTP_SESSION_UNLOCK (sess);
615   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
616   RTP_SESSION_LOCK (sess);
617   g_object_unref (source);
618 }
619
620 static void
621 on_timeout (RTPSession * sess, RTPSource * source)
622 {
623   g_object_ref (source);
624   RTP_SESSION_UNLOCK (sess);
625   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
626   RTP_SESSION_LOCK (sess);
627   g_object_unref (source);
628 }
629
630 static void
631 on_sender_timeout (RTPSession * sess, RTPSource * source)
632 {
633   g_object_ref (source);
634   RTP_SESSION_UNLOCK (sess);
635   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
636       source);
637   RTP_SESSION_LOCK (sess);
638   g_object_unref (source);
639 }
640
641 /**
642  * rtp_session_new:
643  *
644  * Create a new session object.
645  *
646  * Returns: a new #RTPSession. g_object_unref() after usage.
647  */
648 RTPSession *
649 rtp_session_new (void)
650 {
651   RTPSession *sess;
652
653   sess = g_object_new (RTP_TYPE_SESSION, NULL);
654
655   return sess;
656 }
657
658 /**
659  * rtp_session_set_callbacks:
660  * @sess: an #RTPSession
661  * @callbacks: callbacks to configure
662  * @user_data: user data passed in the callbacks
663  *
664  * Configure a set of callbacks to be notified of actions.
665  */
666 void
667 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
668     gpointer user_data)
669 {
670   g_return_if_fail (RTP_IS_SESSION (sess));
671
672   if (callbacks->process_rtp) {
673     sess->callbacks.process_rtp = callbacks->process_rtp;
674     sess->process_rtp_user_data = user_data;
675   }
676   if (callbacks->send_rtp) {
677     sess->callbacks.send_rtp = callbacks->send_rtp;
678     sess->send_rtp_user_data = user_data;
679   }
680   if (callbacks->send_rtcp) {
681     sess->callbacks.send_rtcp = callbacks->send_rtcp;
682     sess->send_rtcp_user_data = user_data;
683   }
684   if (callbacks->sync_rtcp) {
685     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
686     sess->sync_rtcp_user_data = user_data;
687   }
688   if (callbacks->clock_rate) {
689     sess->callbacks.clock_rate = callbacks->clock_rate;
690     sess->clock_rate_user_data = user_data;
691   }
692   if (callbacks->reconsider) {
693     sess->callbacks.reconsider = callbacks->reconsider;
694     sess->reconsider_user_data = user_data;
695   }
696 }
697
698 /**
699  * rtp_session_set_process_rtp_callback:
700  * @sess: an #RTPSession
701  * @callback: callback to set
702  * @user_data: user data passed in the callback
703  *
704  * Configure only the process_rtp callback to be notified of the process_rtp action.
705  */
706 void
707 rtp_session_set_process_rtp_callback (RTPSession * sess,
708     RTPSessionProcessRTP callback, gpointer user_data)
709 {
710   g_return_if_fail (RTP_IS_SESSION (sess));
711
712   sess->callbacks.process_rtp = callback;
713   sess->process_rtp_user_data = user_data;
714 }
715
716 /**
717  * rtp_session_set_send_rtp_callback:
718  * @sess: an #RTPSession
719  * @callback: callback to set
720  * @user_data: user data passed in the callback
721  *
722  * Configure only the send_rtp callback to be notified of the send_rtp action.
723  */
724 void
725 rtp_session_set_send_rtp_callback (RTPSession * sess,
726     RTPSessionSendRTP callback, gpointer user_data)
727 {
728   g_return_if_fail (RTP_IS_SESSION (sess));
729
730   sess->callbacks.send_rtp = callback;
731   sess->send_rtp_user_data = user_data;
732 }
733
734 /**
735  * rtp_session_set_send_rtcp_callback:
736  * @sess: an #RTPSession
737  * @callback: callback to set
738  * @user_data: user data passed in the callback
739  *
740  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
741  */
742 void
743 rtp_session_set_send_rtcp_callback (RTPSession * sess,
744     RTPSessionSendRTCP callback, gpointer user_data)
745 {
746   g_return_if_fail (RTP_IS_SESSION (sess));
747
748   sess->callbacks.send_rtcp = callback;
749   sess->send_rtcp_user_data = user_data;
750 }
751
752 /**
753  * rtp_session_set_sync_rtcp_callback:
754  * @sess: an #RTPSession
755  * @callback: callback to set
756  * @user_data: user data passed in the callback
757  *
758  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
759  */
760 void
761 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
762     RTPSessionSyncRTCP callback, gpointer user_data)
763 {
764   g_return_if_fail (RTP_IS_SESSION (sess));
765
766   sess->callbacks.sync_rtcp = callback;
767   sess->sync_rtcp_user_data = user_data;
768 }
769
770 /**
771  * rtp_session_set_clock_rate_callback:
772  * @sess: an #RTPSession
773  * @callback: callback to set
774  * @user_data: user data passed in the callback
775  *
776  * Configure only the clock_rate callback to be notified of the clock_rate action.
777  */
778 void
779 rtp_session_set_clock_rate_callback (RTPSession * sess,
780     RTPSessionClockRate callback, gpointer user_data)
781 {
782   g_return_if_fail (RTP_IS_SESSION (sess));
783
784   sess->callbacks.clock_rate = callback;
785   sess->clock_rate_user_data = user_data;
786 }
787
788 /**
789  * rtp_session_set_reconsider_callback:
790  * @sess: an #RTPSession
791  * @callback: callback to set
792  * @user_data: user data passed in the callback
793  *
794  * Configure only the reconsider callback to be notified of the reconsider action.
795  */
796 void
797 rtp_session_set_reconsider_callback (RTPSession * sess,
798     RTPSessionReconsider callback, gpointer user_data)
799 {
800   g_return_if_fail (RTP_IS_SESSION (sess));
801
802   sess->callbacks.reconsider = callback;
803   sess->reconsider_user_data = user_data;
804 }
805
806 /**
807  * rtp_session_set_bandwidth:
808  * @sess: an #RTPSession
809  * @bandwidth: the bandwidth allocated
810  *
811  * Set the session bandwidth in bytes per second.
812  */
813 void
814 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
815 {
816   g_return_if_fail (RTP_IS_SESSION (sess));
817
818   RTP_SESSION_LOCK (sess);
819   sess->stats.bandwidth = bandwidth;
820   RTP_SESSION_UNLOCK (sess);
821 }
822
823 /**
824  * rtp_session_get_bandwidth:
825  * @sess: an #RTPSession
826  *
827  * Get the session bandwidth.
828  *
829  * Returns: the session bandwidth.
830  */
831 gdouble
832 rtp_session_get_bandwidth (RTPSession * sess)
833 {
834   gdouble result;
835
836   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
837
838   RTP_SESSION_LOCK (sess);
839   result = sess->stats.bandwidth;
840   RTP_SESSION_UNLOCK (sess);
841
842   return result;
843 }
844
845 /**
846  * rtp_session_set_rtcp_fraction:
847  * @sess: an #RTPSession
848  * @bandwidth: the RTCP bandwidth
849  *
850  * Set the bandwidth that should be used for RTCP
851  * messages.
852  */
853 void
854 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
855 {
856   g_return_if_fail (RTP_IS_SESSION (sess));
857
858   RTP_SESSION_LOCK (sess);
859   sess->stats.rtcp_bandwidth = bandwidth;
860   RTP_SESSION_UNLOCK (sess);
861 }
862
863 /**
864  * rtp_session_get_rtcp_fraction:
865  * @sess: an #RTPSession
866  *
867  * Get the session bandwidth used for RTCP.
868  *
869  * Returns: The bandwidth used for RTCP messages.
870  */
871 gdouble
872 rtp_session_get_rtcp_fraction (RTPSession * sess)
873 {
874   gdouble result;
875
876   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
877
878   RTP_SESSION_LOCK (sess);
879   result = sess->stats.rtcp_bandwidth;
880   RTP_SESSION_UNLOCK (sess);
881
882   return result;
883 }
884
885 /**
886  * rtp_session_set_sdes_string:
887  * @sess: an #RTPSession
888  * @type: the type of the SDES item
889  * @item: a null-terminated string to set. 
890  *
891  * Store an SDES item of @type in @sess. 
892  *
893  * Returns: %FALSE if the data was unchanged @type is invalid.
894  */
895 gboolean
896 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
897     const gchar * item)
898 {
899   gboolean result;
900
901   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
902
903   RTP_SESSION_LOCK (sess);
904   result = rtp_source_set_sdes_string (sess->source, type, item);
905   RTP_SESSION_UNLOCK (sess);
906
907   return result;
908 }
909
910 /**
911  * rtp_session_get_sdes_string:
912  * @sess: an #RTPSession
913  * @type: the type of the SDES item
914  *
915  * Get the SDES item of @type from @sess. 
916  *
917  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
918  * valid. g_free() after usage.
919  */
920 gchar *
921 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
922 {
923   gchar *result;
924
925   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
926
927   RTP_SESSION_LOCK (sess);
928   result = rtp_source_get_sdes_string (sess->source, type);
929   RTP_SESSION_UNLOCK (sess);
930
931   return result;
932 }
933
934 static GstFlowReturn
935 source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
936 {
937   GstFlowReturn result = GST_FLOW_OK;
938
939   if (source == session->source) {
940     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
941
942     RTP_SESSION_UNLOCK (session);
943
944     if (session->callbacks.send_rtp)
945       result =
946           session->callbacks.send_rtp (session, source, buffer,
947           session->send_rtp_user_data);
948     else
949       gst_buffer_unref (buffer);
950
951   } else {
952     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
953     RTP_SESSION_UNLOCK (session);
954
955     if (session->callbacks.process_rtp)
956       result =
957           session->callbacks.process_rtp (session, source, buffer,
958           session->process_rtp_user_data);
959     else
960       gst_buffer_unref (buffer);
961   }
962   RTP_SESSION_LOCK (session);
963
964   return result;
965 }
966
967 static gint
968 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
969 {
970   gint result;
971
972   RTP_SESSION_UNLOCK (session);
973
974   if (session->callbacks.clock_rate)
975     result =
976         session->callbacks.clock_rate (session, pt,
977         session->clock_rate_user_data);
978   else
979     result = -1;
980
981   RTP_SESSION_LOCK (session);
982
983   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
984
985   return result;
986 }
987
988 static RTPSourceCallbacks callbacks = {
989   (RTPSourcePushRTP) source_push_rtp,
990   (RTPSourceClockRate) source_clock_rate,
991 };
992
993 /**
994  * find_add_conflicting_addresses:
995  * @sess: The session to check in
996  * @arrival: The arrival stats for the buffer
997  *
998  * Checks if an address which has a conflict is already known,
999  *  otherwise remembers it to prevent loops.
1000  *
1001  * Returns: TRUE if it was a known conflict, FALSE otherwise
1002  */
1003
1004 static gboolean
1005 find_add_conflicting_addresses (RTPSession * sess, RTPArrivalStats * arrival)
1006 {
1007   GList *item;
1008   RTPConflictingAddress *new_conflict;
1009
1010   for (item = g_list_first (sess->conflicting_addresses);
1011       item; item = g_list_next (item)) {
1012     RTPConflictingAddress *known_conflict = item->data;
1013
1014     if (gst_netaddress_equal (&arrival->address, &known_conflict->address)) {
1015       known_conflict->time = arrival->time;
1016       return TRUE;
1017     }
1018   }
1019
1020   new_conflict = g_new0 (RTPConflictingAddress, 1);
1021
1022   memcpy (&new_conflict->address, &arrival->address, sizeof (GstNetAddress));
1023   new_conflict->time = arrival->time;
1024
1025   sess->conflicting_addresses = g_list_prepend (sess->conflicting_addresses,
1026       new_conflict);
1027
1028   return FALSE;
1029 }
1030
1031 static gboolean
1032 check_collision (RTPSession * sess, RTPSource * source,
1033     RTPArrivalStats * arrival, gboolean rtp)
1034 {
1035   /* If we have no arrival address, we can't do collision checking */
1036   if (!arrival->have_address)
1037     return FALSE;
1038
1039   if (sess->source != source) {
1040     /* This is not our local source, but lets check if two remote
1041      * source collide
1042      */
1043     if (rtp) {
1044       if (source->have_rtp_from) {
1045         if (gst_netaddress_equal (&source->rtp_from, &arrival->address))
1046           /* Address is the same */
1047           return FALSE;
1048       } else {
1049         /* We don't already have a from address for RTP, just set it */
1050         rtp_source_set_rtp_from (source, &arrival->address);
1051         return FALSE;
1052       }
1053     } else {
1054       if (source->have_rtcp_from) {
1055         if (gst_netaddress_equal (&source->rtcp_from, &arrival->address))
1056           /* Address is the same */
1057           return FALSE;
1058       } else {
1059         /* We don't already have a from address for RTCP, just set it */
1060         rtp_source_set_rtcp_from (source, &arrival->address);
1061         return FALSE;
1062       }
1063     }
1064     /* We received RTP or RTCP from this source before but the network address
1065      * changed. In this case, we have third-party collision or loop */
1066     GST_DEBUG ("we have a third-party collision or loop");
1067
1068     /* FIXME: Log 3rd party collision somehow
1069      * Maybe should be done in upper layer, only the SDES can tell us
1070      * if its a collision or a loop
1071      */
1072   } else {
1073     /* This is sending with our ssrc, is it an address we already know */
1074
1075     if (find_add_conflicting_addresses (sess, arrival)) {
1076       /* Its a known conflict, its probably a loop, not a collision
1077        * lets just drop the incoming packet
1078        */
1079       GST_DEBUG ("Our packets are being looped back to us, dropping");
1080     } else {
1081       /* Its a new collision, lets change our SSRC */
1082
1083       GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
1084       on_ssrc_collision (sess, source);
1085
1086       rtp_session_send_bye_locked (sess, "SSRC Collision", arrival->time);
1087
1088       sess->change_ssrc = TRUE;
1089     }
1090   }
1091
1092   return TRUE;
1093 }
1094
1095
1096 /* must be called with the session lock */
1097 static RTPSource *
1098 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1099     RTPArrivalStats * arrival, gboolean rtp)
1100 {
1101   RTPSource *source;
1102
1103   source =
1104       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1105   if (source == NULL) {
1106     /* make new Source in probation and insert */
1107     source = rtp_source_new (ssrc);
1108
1109     /* for RTP packets we need to set the source in probation. Receiving RTCP
1110      * packets of an SSRC, on the other hand, is a strong indication that we
1111      * are dealing with a valid source. */
1112     if (rtp)
1113       source->probation = RTP_DEFAULT_PROBATION;
1114     else
1115       source->probation = 0;
1116
1117     /* store from address, if any */
1118     if (arrival->have_address) {
1119       if (rtp)
1120         rtp_source_set_rtp_from (source, &arrival->address);
1121       else
1122         rtp_source_set_rtcp_from (source, &arrival->address);
1123     }
1124
1125     /* configure a callback on the source */
1126     rtp_source_set_callbacks (source, &callbacks, sess);
1127
1128     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1129         source);
1130
1131     /* we have one more source now */
1132     sess->total_sources++;
1133     *created = TRUE;
1134   } else {
1135     *created = FALSE;
1136     /* check for collision, this updates the address when not previously set */
1137     if (check_collision (sess, source, arrival, rtp)) {
1138       return NULL;
1139     }
1140   }
1141   /* update last activity */
1142   source->last_activity = arrival->time;
1143   if (rtp)
1144     source->last_rtp_activity = arrival->time;
1145
1146   return source;
1147 }
1148
1149 /**
1150  * rtp_session_get_internal_source:
1151  * @sess: a #RTPSession
1152  *
1153  * Get the internal #RTPSource of @sess.
1154  *
1155  * Returns: The internal #RTPSource. g_object_unref() after usage.
1156  */
1157 RTPSource *
1158 rtp_session_get_internal_source (RTPSession * sess)
1159 {
1160   RTPSource *result;
1161
1162   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1163
1164   result = g_object_ref (sess->source);
1165
1166   return result;
1167 }
1168
1169 /**
1170  * rtp_session_set_internal_ssrc:
1171  * @sess: a #RTPSession
1172  * @ssrc: an SSRC
1173  *
1174  * Set the SSRC of @sess to @ssrc.
1175  */
1176 void
1177 rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
1178 {
1179   RTP_SESSION_LOCK (sess);
1180   g_hash_table_steal (sess->ssrcs[sess->mask_idx],
1181       GINT_TO_POINTER (sess->source->ssrc));
1182
1183   sess->source->ssrc = ssrc;
1184   rtp_source_reset (sess->source);
1185
1186   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1187       GINT_TO_POINTER (sess->source->ssrc), sess->source);
1188   RTP_SESSION_UNLOCK (sess);
1189 }
1190
1191 /**
1192  * rtp_session_get_internal_ssrc:
1193  * @sess: a #RTPSession
1194  *
1195  * Get the internal SSRC of @sess.
1196  *
1197  * Returns: The SSRC of the session. 
1198  */
1199 guint32
1200 rtp_session_get_internal_ssrc (RTPSession * sess)
1201 {
1202   guint32 ssrc;
1203
1204   RTP_SESSION_LOCK (sess);
1205   ssrc = sess->source->ssrc;
1206   RTP_SESSION_UNLOCK (sess);
1207
1208   return ssrc;
1209 }
1210
1211 /**
1212  * rtp_session_add_source:
1213  * @sess: a #RTPSession
1214  * @src: #RTPSource to add
1215  *
1216  * Add @src to @session.
1217  *
1218  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1219  * existed in the session.
1220  */
1221 gboolean
1222 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1223 {
1224   gboolean result = FALSE;
1225   RTPSource *find;
1226
1227   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1228   g_return_val_if_fail (src != NULL, FALSE);
1229
1230   RTP_SESSION_LOCK (sess);
1231   find =
1232       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1233       GINT_TO_POINTER (src->ssrc));
1234   if (find == NULL) {
1235     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1236         GINT_TO_POINTER (src->ssrc), src);
1237     /* we have one more source now */
1238     sess->total_sources++;
1239     result = TRUE;
1240   }
1241   RTP_SESSION_UNLOCK (sess);
1242
1243   return result;
1244 }
1245
1246 /**
1247  * rtp_session_get_num_sources:
1248  * @sess: an #RTPSession
1249  *
1250  * Get the number of sources in @sess.
1251  *
1252  * Returns: The number of sources in @sess.
1253  */
1254 guint
1255 rtp_session_get_num_sources (RTPSession * sess)
1256 {
1257   guint result;
1258
1259   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1260
1261   RTP_SESSION_LOCK (sess);
1262   result = sess->total_sources;
1263   RTP_SESSION_UNLOCK (sess);
1264
1265   return result;
1266 }
1267
1268 /**
1269  * rtp_session_get_num_active_sources:
1270  * @sess: an #RTPSession
1271  *
1272  * Get the number of active sources in @sess. A source is considered active when
1273  * it has been validated and has not yet received a BYE RTCP message.
1274  *
1275  * Returns: The number of active sources in @sess.
1276  */
1277 guint
1278 rtp_session_get_num_active_sources (RTPSession * sess)
1279 {
1280   guint result;
1281
1282   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1283
1284   RTP_SESSION_LOCK (sess);
1285   result = sess->stats.active_sources;
1286   RTP_SESSION_UNLOCK (sess);
1287
1288   return result;
1289 }
1290
1291 /**
1292  * rtp_session_get_source_by_ssrc:
1293  * @sess: an #RTPSession
1294  * @ssrc: an SSRC
1295  *
1296  * Find the source with @ssrc in @sess.
1297  *
1298  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1299  * g_object_unref() after usage.
1300  */
1301 RTPSource *
1302 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1303 {
1304   RTPSource *result;
1305
1306   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1307
1308   RTP_SESSION_LOCK (sess);
1309   result =
1310       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1311   if (result)
1312     g_object_ref (result);
1313   RTP_SESSION_UNLOCK (sess);
1314
1315   return result;
1316 }
1317
1318 /**
1319  * rtp_session_get_source_by_cname:
1320  * @sess: a #RTPSession
1321  * @cname: an CNAME
1322  *
1323  * Find the source with @cname in @sess.
1324  *
1325  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
1326  * g_object_unref() after usage.
1327  */
1328 RTPSource *
1329 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
1330 {
1331   RTPSource *result;
1332
1333   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1334   g_return_val_if_fail (cname != NULL, NULL);
1335
1336   RTP_SESSION_LOCK (sess);
1337   result = g_hash_table_lookup (sess->cnames, cname);
1338   if (result)
1339     g_object_ref (result);
1340   RTP_SESSION_UNLOCK (sess);
1341
1342   return result;
1343 }
1344
1345 static guint32
1346 rtp_session_create_new_ssrc (RTPSession * sess)
1347 {
1348   guint32 ssrc;
1349
1350   while (TRUE) {
1351     ssrc = g_random_int ();
1352
1353     /* see if it exists in the session, we're done if it doesn't */
1354     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1355             GINT_TO_POINTER (ssrc)) == NULL)
1356       break;
1357   }
1358
1359   return ssrc;
1360 }
1361
1362
1363 /**
1364  * rtp_session_create_source:
1365  * @sess: an #RTPSession
1366  *
1367  * Create an #RTPSource for use in @sess. This function will create a source
1368  * with an ssrc that is currently not used by any participants in the session.
1369  *
1370  * Returns: an #RTPSource.
1371  */
1372 RTPSource *
1373 rtp_session_create_source (RTPSession * sess)
1374 {
1375   guint32 ssrc;
1376   RTPSource *source;
1377
1378   RTP_SESSION_LOCK (sess);
1379   ssrc = rtp_session_create_new_ssrc (sess);
1380   source = rtp_source_new (ssrc);
1381   g_object_ref (source);
1382   rtp_source_set_callbacks (source, &callbacks, sess);
1383   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1384       source);
1385   /* we have one more source now */
1386   sess->total_sources++;
1387   RTP_SESSION_UNLOCK (sess);
1388
1389   return source;
1390 }
1391
1392 /* update the RTPArrivalStats structure with the current time and other bits
1393  * about the current buffer we are handling.
1394  * This function is typically called when a validated packet is received.
1395  * This function should be called with the SESSION_LOCK
1396  */
1397 static void
1398 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1399     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1400     GstClockTime running_time, guint64 ntpnstime)
1401 {
1402   /* get time of arrival */
1403   arrival->time = current_time;
1404   arrival->running_time = running_time;
1405   arrival->ntpnstime = ntpnstime;
1406
1407   /* get packet size including header overhead */
1408   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
1409
1410   if (rtp) {
1411     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1412   } else {
1413     arrival->payload_len = 0;
1414   }
1415
1416   /* for netbuffer we can store the IP address to check for collisions */
1417   arrival->have_address = GST_IS_NETBUFFER (buffer);
1418   if (arrival->have_address) {
1419     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1420
1421     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1422   }
1423 }
1424
1425 /**
1426  * rtp_session_process_rtp:
1427  * @sess: and #RTPSession
1428  * @buffer: an RTP buffer
1429  * @current_time: the current system time
1430  * @ntpnstime: the NTP arrival time in nanoseconds
1431  *
1432  * Process an RTP buffer in the session manager. This function takes ownership
1433  * of @buffer.
1434  *
1435  * Returns: a #GstFlowReturn.
1436  */
1437 GstFlowReturn
1438 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1439     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
1440 {
1441   GstFlowReturn result;
1442   guint32 ssrc;
1443   RTPSource *source;
1444   gboolean created;
1445   gboolean prevsender, prevactive;
1446   RTPArrivalStats arrival;
1447
1448   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1449   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1450
1451   if (!gst_rtp_buffer_validate (buffer))
1452     goto invalid_packet;
1453
1454   RTP_SESSION_LOCK (sess);
1455   /* update arrival stats */
1456   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1457       running_time, ntpnstime);
1458
1459   /* ignore more RTP packets when we left the session */
1460   if (sess->source->received_bye)
1461     goto ignore;
1462
1463   /* get SSRC and look up in session database */
1464   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1465   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1466
1467   if (!source)
1468     goto collision;
1469
1470   prevsender = RTP_SOURCE_IS_SENDER (source);
1471   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1472
1473   /* we need to ref so that we can process the CSRCs later */
1474   gst_buffer_ref (buffer);
1475
1476   /* let source process the packet */
1477   result = rtp_source_process_rtp (source, buffer, &arrival);
1478
1479   /* source became active */
1480   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1481     sess->stats.active_sources++;
1482     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1483         sess->stats.active_sources);
1484     on_ssrc_validated (sess, source);
1485   }
1486   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1487     sess->stats.sender_sources++;
1488     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1489         sess->stats.sender_sources);
1490   }
1491
1492   if (created)
1493     on_new_ssrc (sess, source);
1494
1495   if (source->validated) {
1496     guint8 i, count;
1497     gboolean created;
1498
1499     /* for validated sources, we add the CSRCs as well */
1500     count = gst_rtp_buffer_get_csrc_count (buffer);
1501
1502     for (i = 0; i < count; i++) {
1503       guint32 csrc;
1504       RTPSource *csrc_src;
1505
1506       csrc = gst_rtp_buffer_get_csrc (buffer, i);
1507
1508       /* get source */
1509       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1510
1511       if (created) {
1512         GST_DEBUG ("created new CSRC: %08x", csrc);
1513         rtp_source_set_as_csrc (csrc_src);
1514         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1515           sess->stats.active_sources++;
1516         on_new_ssrc (sess, source);
1517       }
1518     }
1519   }
1520   gst_buffer_unref (buffer);
1521
1522   RTP_SESSION_UNLOCK (sess);
1523
1524   return result;
1525
1526   /* ERRORS */
1527 invalid_packet:
1528   {
1529     gst_buffer_unref (buffer);
1530     GST_DEBUG ("invalid RTP packet received");
1531     return GST_FLOW_OK;
1532   }
1533 ignore:
1534   {
1535     gst_buffer_unref (buffer);
1536     RTP_SESSION_UNLOCK (sess);
1537     GST_DEBUG ("ignoring RTP packet because we are leaving");
1538     return GST_FLOW_OK;
1539   }
1540 collision:
1541   {
1542     gst_buffer_unref (buffer);
1543     RTP_SESSION_UNLOCK (sess);
1544     GST_DEBUG ("ignoring packet because its collisioning");
1545     return GST_FLOW_OK;
1546   }
1547 }
1548
1549 static void
1550 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1551     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1552 {
1553   guint count, i;
1554
1555   count = gst_rtcp_packet_get_rb_count (packet);
1556   for (i = 0; i < count; i++) {
1557     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1558     guint8 fractionlost;
1559     gint32 packetslost;
1560
1561     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1562         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1563
1564     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1565
1566     if (ssrc == sess->source->ssrc) {
1567       /* only deal with report blocks for our session, we update the stats of
1568        * the sender of the RTCP message. We could also compare our stats against
1569        * the other sender to see if we are better or worse. */
1570       rtp_source_process_rb (source, arrival->time, fractionlost, packetslost,
1571           exthighestseq, jitter, lsr, dlsr);
1572
1573       on_ssrc_active (sess, source);
1574     }
1575   }
1576 }
1577
1578 /* A Sender report contains statistics about how the sender is doing. This
1579  * includes timing informataion such as the relation between RTP and NTP
1580  * timestamps and the number of packets/bytes it sent to us.
1581  *
1582  * In this report is also included a set of report blocks related to how this
1583  * sender is receiving data (in case we (or somebody else) is also sending stuff
1584  * to it). This info includes the packet loss, jitter and seqnum. It also
1585  * contains information to calculate the round trip time (LSR/DLSR).
1586  */
1587 static void
1588 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1589     RTPArrivalStats * arrival)
1590 {
1591   guint32 senderssrc, rtptime, packet_count, octet_count;
1592   guint64 ntptime;
1593   RTPSource *source;
1594   gboolean created, prevsender;
1595
1596   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1597       &packet_count, &octet_count);
1598
1599   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1600       senderssrc, GST_TIME_ARGS (arrival->time));
1601
1602   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1603
1604   if (!source)
1605     return;
1606
1607   prevsender = RTP_SOURCE_IS_SENDER (source);
1608
1609   /* first update the source */
1610   rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count,
1611       octet_count);
1612
1613   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1614     sess->stats.sender_sources++;
1615     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1616         sess->stats.sender_sources);
1617   }
1618
1619   if (created)
1620     on_new_ssrc (sess, source);
1621
1622   rtp_session_process_rb (sess, source, packet, arrival);
1623 }
1624
1625 /* A receiver report contains statistics about how a receiver is doing. It
1626  * includes stuff like packet loss, jitter and the seqnum it received last. It
1627  * also contains info to calculate the round trip time.
1628  *
1629  * We are only interested in how the sender of this report is doing wrt to us.
1630  */
1631 static void
1632 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1633     RTPArrivalStats * arrival)
1634 {
1635   guint32 senderssrc;
1636   RTPSource *source;
1637   gboolean created;
1638
1639   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1640
1641   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1642
1643   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1644
1645   if (!source)
1646     return;
1647
1648   if (created)
1649     on_new_ssrc (sess, source);
1650
1651   rtp_session_process_rb (sess, source, packet, arrival);
1652 }
1653
1654 /* Get SDES items and store them in the SSRC */
1655 static void
1656 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1657     RTPArrivalStats * arrival)
1658 {
1659   guint items, i, j;
1660   gboolean more_items, more_entries;
1661
1662   items = gst_rtcp_packet_sdes_get_item_count (packet);
1663   GST_DEBUG ("got SDES packet with %d items", items);
1664
1665   more_items = gst_rtcp_packet_sdes_first_item (packet);
1666   i = 0;
1667   while (more_items) {
1668     guint32 ssrc;
1669     gboolean changed, created;
1670     RTPSource *source;
1671
1672     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1673
1674     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1675
1676     /* find src, no probation when dealing with RTCP */
1677     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1678     changed = FALSE;
1679
1680     if (!source)
1681       return;
1682
1683     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1684     j = 0;
1685     while (more_entries) {
1686       GstRTCPSDESType type;
1687       guint8 len;
1688       guint8 *data;
1689
1690       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1691
1692       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1693           data);
1694
1695       changed |= rtp_source_set_sdes (source, type, data, len);
1696
1697       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1698       j++;
1699     }
1700
1701     source->validated = TRUE;
1702
1703     if (created)
1704       on_new_ssrc (sess, source);
1705     if (changed)
1706       on_ssrc_sdes (sess, source);
1707
1708     more_items = gst_rtcp_packet_sdes_next_item (packet);
1709     i++;
1710   }
1711 }
1712
1713 /* BYE is sent when a client leaves the session
1714  */
1715 static void
1716 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1717     RTPArrivalStats * arrival)
1718 {
1719   guint count, i;
1720   gchar *reason;
1721
1722   reason = gst_rtcp_packet_bye_get_reason (packet);
1723   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1724
1725   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1726   for (i = 0; i < count; i++) {
1727     guint32 ssrc;
1728     RTPSource *source;
1729     gboolean created, prevactive, prevsender;
1730     guint pmembers, members;
1731
1732     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1733     GST_DEBUG ("SSRC: %08x", ssrc);
1734
1735     /* find src and mark bye, no probation when dealing with RTCP */
1736     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1737
1738     if (!source)
1739       return;
1740
1741     /* store time for when we need to time out this source */
1742     source->bye_time = arrival->time;
1743
1744     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1745     prevsender = RTP_SOURCE_IS_SENDER (source);
1746
1747     /* let the source handle the rest */
1748     rtp_source_process_bye (source, reason);
1749
1750     pmembers = sess->stats.active_sources;
1751
1752     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1753       sess->stats.active_sources--;
1754       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1755           sess->stats.active_sources);
1756     }
1757     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1758       sess->stats.sender_sources--;
1759       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1760           sess->stats.sender_sources);
1761     }
1762     members = sess->stats.active_sources;
1763
1764     if (!sess->source->received_bye && members < pmembers) {
1765       /* some members went away since the previous timeout estimate.
1766        * Perform reverse reconsideration but only when we are not scheduling a
1767        * BYE ourselves. */
1768       if (arrival->time < sess->next_rtcp_check_time) {
1769         GstClockTime time_remaining;
1770
1771         time_remaining = sess->next_rtcp_check_time - arrival->time;
1772         sess->next_rtcp_check_time =
1773             gst_util_uint64_scale (time_remaining, members, pmembers);
1774
1775         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1776             GST_TIME_ARGS (sess->next_rtcp_check_time));
1777
1778         sess->next_rtcp_check_time += arrival->time;
1779
1780         RTP_SESSION_UNLOCK (sess);
1781         /* notify app of reconsideration */
1782         if (sess->callbacks.reconsider)
1783           sess->callbacks.reconsider (sess, sess->reconsider_user_data);
1784         RTP_SESSION_LOCK (sess);
1785       }
1786     }
1787
1788     if (created)
1789       on_new_ssrc (sess, source);
1790
1791     on_bye_ssrc (sess, source);
1792   }
1793   g_free (reason);
1794 }
1795
1796 static void
1797 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1798     RTPArrivalStats * arrival)
1799 {
1800   GST_DEBUG ("received APP");
1801 }
1802
1803 /**
1804  * rtp_session_process_rtcp:
1805  * @sess: and #RTPSession
1806  * @buffer: an RTCP buffer
1807  * @current_time: the current system time
1808  *
1809  * Process an RTCP buffer in the session manager. This function takes ownership
1810  * of @buffer.
1811  *
1812  * Returns: a #GstFlowReturn.
1813  */
1814 GstFlowReturn
1815 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
1816     GstClockTime current_time)
1817 {
1818   GstRTCPPacket packet;
1819   gboolean more, is_bye = FALSE, is_sr = FALSE;
1820   RTPArrivalStats arrival;
1821   GstFlowReturn result = GST_FLOW_OK;
1822
1823   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1824   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1825
1826   if (!gst_rtcp_buffer_validate (buffer))
1827     goto invalid_packet;
1828
1829   GST_DEBUG ("received RTCP packet");
1830
1831   RTP_SESSION_LOCK (sess);
1832   /* update arrival stats */
1833   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1, -1);
1834
1835   if (sess->sent_bye)
1836     goto ignore;
1837
1838   /* make writable, we might want to change the buffer */
1839   buffer = gst_buffer_make_metadata_writable (buffer);
1840
1841   /* start processing the compound packet */
1842   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1843   while (more) {
1844     GstRTCPType type;
1845
1846     type = gst_rtcp_packet_get_type (&packet);
1847
1848     /* when we are leaving the session, we should ignore all non-BYE messages */
1849     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1850       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1851       goto next;
1852     }
1853
1854     switch (type) {
1855       case GST_RTCP_TYPE_SR:
1856         rtp_session_process_sr (sess, &packet, &arrival);
1857         is_sr = TRUE;
1858         break;
1859       case GST_RTCP_TYPE_RR:
1860         rtp_session_process_rr (sess, &packet, &arrival);
1861         break;
1862       case GST_RTCP_TYPE_SDES:
1863         rtp_session_process_sdes (sess, &packet, &arrival);
1864         break;
1865       case GST_RTCP_TYPE_BYE:
1866         is_bye = TRUE;
1867         rtp_session_process_bye (sess, &packet, &arrival);
1868         break;
1869       case GST_RTCP_TYPE_APP:
1870         rtp_session_process_app (sess, &packet, &arrival);
1871         break;
1872       default:
1873         GST_WARNING ("got unknown RTCP packet");
1874         break;
1875     }
1876   next:
1877     more = gst_rtcp_packet_move_to_next (&packet);
1878   }
1879
1880   /* if we are scheduling a BYE, we only want to count bye packets, else we
1881    * count everything */
1882   if (sess->source->received_bye) {
1883     if (is_bye) {
1884       sess->stats.bye_members++;
1885       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1886     }
1887   } else {
1888     /* keep track of average packet size */
1889     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1890   }
1891   RTP_SESSION_UNLOCK (sess);
1892
1893   /* notify caller of sr packets in the callback */
1894   if (is_sr && sess->callbacks.sync_rtcp)
1895     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
1896         sess->sync_rtcp_user_data);
1897   else
1898     gst_buffer_unref (buffer);
1899
1900   return result;
1901
1902   /* ERRORS */
1903 invalid_packet:
1904   {
1905     GST_DEBUG ("invalid RTCP packet received");
1906     gst_buffer_unref (buffer);
1907     return GST_FLOW_OK;
1908   }
1909 ignore:
1910   {
1911     gst_buffer_unref (buffer);
1912     RTP_SESSION_UNLOCK (sess);
1913     GST_DEBUG ("ignoring RTP packet because we left");
1914     return GST_FLOW_OK;
1915   }
1916 }
1917
1918 /**
1919  * rtp_session_send_rtp:
1920  * @sess: an #RTPSession
1921  * @buffer: an RTP buffer
1922  * @current_time: the current system time
1923  * @ntpnstime: the NTP time in nanoseconds of when this buffer was captured.
1924  * This is the buffer timestamp converted to NTP time.
1925  *
1926  * Send the RTP buffer in the session manager. This function takes ownership of
1927  * @buffer.
1928  *
1929  * Returns: a #GstFlowReturn.
1930  */
1931 GstFlowReturn
1932 rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer,
1933     GstClockTime current_time, guint64 ntpnstime)
1934 {
1935   GstFlowReturn result;
1936   RTPSource *source;
1937   gboolean prevsender;
1938
1939   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1940   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1941
1942   if (!gst_rtp_buffer_validate (buffer))
1943     goto invalid_packet;
1944
1945   GST_LOG ("received RTP packet for sending");
1946
1947   RTP_SESSION_LOCK (sess);
1948   source = sess->source;
1949
1950   /* update last activity */
1951   source->last_rtp_activity = current_time;
1952
1953   prevsender = RTP_SOURCE_IS_SENDER (source);
1954
1955   /* we use our own source to send */
1956   result = rtp_source_send_rtp (source, buffer, ntpnstime);
1957
1958   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
1959     sess->stats.sender_sources++;
1960   RTP_SESSION_UNLOCK (sess);
1961
1962   return result;
1963
1964   /* ERRORS */
1965 invalid_packet:
1966   {
1967     gst_buffer_unref (buffer);
1968     GST_DEBUG ("invalid RTP packet received");
1969     return GST_FLOW_OK;
1970   }
1971 }
1972
1973 static GstClockTime
1974 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
1975     gboolean first)
1976 {
1977   GstClockTime result;
1978
1979   if (sess->source->received_bye) {
1980     result = rtp_stats_calculate_bye_interval (&sess->stats);
1981   } else {
1982     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
1983         RTP_SOURCE_IS_SENDER (sess->source), first);
1984   }
1985
1986   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
1987       GST_TIME_ARGS (result), first);
1988
1989   if (!deterministic)
1990     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
1991
1992   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1993
1994   return result;
1995 }
1996
1997 /**
1998  * rtp_session_send_bye_locked:
1999  * @sess: an #RTPSession
2000  * @reason: a reason or NULL
2001  *
2002  * Stop the current @sess and schedule a BYE message for the other members.
2003  *
2004  * One must have the session lock to call this function
2005  *
2006  * Returns: a #GstFlowReturn.
2007  */
2008 static GstFlowReturn
2009 rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason,
2010     GstClockTime current_time)
2011 {
2012   GstFlowReturn result = GST_FLOW_OK;
2013   RTPSource *source;
2014   GstClockTime interval;
2015
2016   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2017
2018   source = sess->source;
2019
2020   /* ignore more BYEs */
2021   if (source->received_bye)
2022     goto done;
2023
2024   /* we have BYE now */
2025   source->received_bye = TRUE;
2026   /* at least one member wants to send a BYE */
2027   g_free (sess->bye_reason);
2028   sess->bye_reason = g_strdup (reason);
2029   sess->stats.avg_rtcp_packet_size = 100;
2030   sess->stats.bye_members = 1;
2031   sess->first_rtcp = TRUE;
2032   sess->sent_bye = FALSE;
2033
2034   /* reschedule transmission */
2035   sess->last_rtcp_send_time = current_time;
2036   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2037   sess->next_rtcp_check_time = current_time + interval;
2038
2039   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2040       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2041
2042   RTP_SESSION_UNLOCK (sess);
2043   /* notify app of reconsideration */
2044   if (sess->callbacks.reconsider)
2045     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2046   RTP_SESSION_LOCK (sess);
2047 done:
2048
2049   return result;
2050 }
2051
2052 /**
2053  * rtp_session_send_bye:
2054  * @sess: an #RTPSession
2055  * @reason: a reason or NULL
2056  * @current_time: the current system time
2057  *
2058  * Stop the current @sess and schedule a BYE message for the other members.
2059  *
2060  * One must have the session lock to call this function
2061  *
2062  * Returns: a #GstFlowReturn.
2063  */
2064 GstFlowReturn
2065 rtp_session_send_bye (RTPSession * sess, const gchar * reason,
2066     GstClockTime current_time)
2067 {
2068   GstFlowReturn result = GST_FLOW_OK;
2069
2070   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2071
2072   RTP_SESSION_LOCK (sess);
2073   result = rtp_session_send_bye_locked (sess, reason, current_time);
2074   RTP_SESSION_UNLOCK (sess);
2075
2076   return result;
2077 }
2078
2079 /**
2080  * rtp_session_next_timeout:
2081  * @sess: an #RTPSession
2082  * @current_time: the current system time
2083  *
2084  * Get the next time we should perform session maintenance tasks.
2085  *
2086  * Returns: a time when rtp_session_on_timeout() should be called with the
2087  * current system time.
2088  */
2089 GstClockTime
2090 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2091 {
2092   GstClockTime result;
2093
2094   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2095
2096   RTP_SESSION_LOCK (sess);
2097
2098   result = sess->next_rtcp_check_time;
2099
2100   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
2101       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2102
2103   if (result < current_time) {
2104     GST_DEBUG ("take current time as base");
2105     /* our previous check time expired, start counting from the current time
2106      * again. */
2107     result = current_time;
2108   }
2109
2110   if (sess->source->received_bye) {
2111     if (sess->sent_bye) {
2112       GST_DEBUG ("we sent BYE already");
2113       result = GST_CLOCK_TIME_NONE;
2114     } else if (sess->stats.active_sources >= 50) {
2115       GST_DEBUG ("reconsider BYE, more than 50 sources");
2116       /* reconsider BYE if members >= 50 */
2117       result += calculate_rtcp_interval (sess, FALSE, TRUE);
2118     }
2119   } else {
2120     if (sess->first_rtcp) {
2121       GST_DEBUG ("first RTCP packet");
2122       /* we are called for the first time */
2123       result += calculate_rtcp_interval (sess, FALSE, TRUE);
2124     } else if (sess->next_rtcp_check_time < current_time) {
2125       GST_DEBUG ("old check time expired, getting new timeout");
2126       /* get a new timeout when we need to */
2127       result += calculate_rtcp_interval (sess, FALSE, FALSE);
2128     }
2129   }
2130   sess->next_rtcp_check_time = result;
2131
2132   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2133   RTP_SESSION_UNLOCK (sess);
2134
2135   return result;
2136 }
2137
2138 typedef struct
2139 {
2140   RTPSession *sess;
2141   GstBuffer *rtcp;
2142   GstClockTime current_time;
2143   guint64 ntpnstime;
2144   GstClockTime interval;
2145   GstRTCPPacket packet;
2146   gboolean is_bye;
2147   gboolean has_sdes;
2148 } ReportData;
2149
2150 static void
2151 session_start_rtcp (RTPSession * sess, ReportData * data)
2152 {
2153   GstRTCPPacket *packet = &data->packet;
2154   RTPSource *own = sess->source;
2155
2156   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2157
2158   if (RTP_SOURCE_IS_SENDER (own)) {
2159     guint64 ntptime;
2160     guint32 rtptime;
2161     guint32 packet_count, octet_count;
2162
2163     /* we are a sender, create SR */
2164     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2165     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
2166
2167     /* get latest stats */
2168     rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime,
2169         &packet_count, &octet_count);
2170     /* store stats */
2171     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2172         packet_count, octet_count);
2173
2174     /* fill in sender report info */
2175     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2176         ntptime, rtptime, packet_count, octet_count);
2177   } else {
2178     /* we are only receiver, create RR */
2179     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2180     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
2181     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2182   }
2183 }
2184
2185 /* construct a Sender or Receiver Report */
2186 static void
2187 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2188 {
2189   RTPSession *sess = data->sess;
2190   GstRTCPPacket *packet = &data->packet;
2191
2192   /* create a new buffer if needed */
2193   if (data->rtcp == NULL) {
2194     session_start_rtcp (sess, data);
2195   }
2196   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
2197     /* only report about other sender sources */
2198     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
2199       guint8 fractionlost;
2200       gint32 packetslost;
2201       guint32 exthighestseq, jitter;
2202       guint32 lsr, dlsr;
2203
2204       /* get new stats */
2205       rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2206           &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2207
2208       /* packet is not yet filled, add report block for this source. */
2209       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2210           exthighestseq, jitter, lsr, dlsr);
2211     }
2212   }
2213 }
2214
2215 /* perform cleanup of sources that timed out */
2216 static gboolean
2217 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2218 {
2219   gboolean remove = FALSE;
2220   gboolean byetimeout = FALSE;
2221   gboolean sendertimeout = FALSE;
2222   gboolean is_sender, is_active;
2223   RTPSession *sess = data->sess;
2224   GstClockTime interval;
2225
2226   is_sender = RTP_SOURCE_IS_SENDER (source);
2227   is_active = RTP_SOURCE_IS_ACTIVE (source);
2228
2229   /* check for our own source, we don't want to delete our own source. */
2230   if (!(source == sess->source)) {
2231     if (source->received_bye) {
2232       /* if we received a BYE from the source, remove the source after some
2233        * time. */
2234       if (data->current_time > source->bye_time &&
2235           data->current_time - source->bye_time > sess->stats.bye_timeout) {
2236         GST_DEBUG ("removing BYE source %08x", source->ssrc);
2237         remove = TRUE;
2238         byetimeout = TRUE;
2239       }
2240     }
2241     /* sources that were inactive for more than 5 times the deterministic reporting
2242      * interval get timed out. the min timeout is 5 seconds. */
2243     if (data->current_time > source->last_activity) {
2244       interval = MAX (data->interval * 5, 5 * GST_SECOND);
2245       if (data->current_time - source->last_activity > interval) {
2246         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2247             source->ssrc, GST_TIME_ARGS (source->last_activity));
2248         remove = TRUE;
2249       }
2250     }
2251   }
2252
2253   /* senders that did not send for a long time become a receiver, this also
2254    * holds for our own source. */
2255   if (is_sender) {
2256     if (data->current_time > source->last_rtp_activity) {
2257       interval = MAX (data->interval * 2, 5 * GST_SECOND);
2258       if (data->current_time - source->last_rtp_activity > interval) {
2259         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
2260             GST_TIME_FORMAT, source->ssrc,
2261             GST_TIME_ARGS (source->last_rtp_activity));
2262         source->is_sender = FALSE;
2263         sess->stats.sender_sources--;
2264         sendertimeout = TRUE;
2265       }
2266     }
2267   }
2268
2269   if (remove) {
2270     sess->total_sources--;
2271     if (is_sender)
2272       sess->stats.sender_sources--;
2273     if (is_active)
2274       sess->stats.active_sources--;
2275
2276     if (byetimeout)
2277       on_bye_timeout (sess, source);
2278     else
2279       on_timeout (sess, source);
2280   } else {
2281     if (sendertimeout)
2282       on_sender_timeout (sess, source);
2283   }
2284   return remove;
2285 }
2286
2287 static void
2288 session_sdes (RTPSession * sess, ReportData * data)
2289 {
2290   GstRTCPPacket *packet = &data->packet;
2291   guint8 *sdes_data;
2292   guint sdes_len;
2293
2294   /* add SDES packet */
2295   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
2296
2297   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
2298
2299   rtp_source_get_sdes (sess->source, GST_RTCP_SDES_CNAME, &sdes_data,
2300       &sdes_len);
2301   gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME, sdes_len,
2302       sdes_data);
2303
2304   /* other SDES items must only be added at regular intervals and only when the
2305    * user requests to since it might be a privacy problem */
2306 #if 0
2307   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME,
2308       strlen (sess->name), (guint8 *) sess->name);
2309   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
2310       strlen (sess->tool), (guint8 *) sess->tool);
2311 #endif
2312
2313   data->has_sdes = TRUE;
2314 }
2315
2316 /* schedule a BYE packet */
2317 static void
2318 session_bye (RTPSession * sess, ReportData * data)
2319 {
2320   GstRTCPPacket *packet = &data->packet;
2321
2322   /* open packet */
2323   session_start_rtcp (sess, data);
2324
2325   /* add SDES */
2326   session_sdes (sess, data);
2327
2328   /* add a BYE packet */
2329   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
2330   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
2331   if (sess->bye_reason)
2332     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
2333
2334   /* we have a BYE packet now */
2335   data->is_bye = TRUE;
2336 }
2337
2338 static gboolean
2339 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
2340 {
2341   GstClockTime new_send_time, elapsed;
2342   gboolean result;
2343
2344   /* no need to check yet */
2345   if (sess->next_rtcp_check_time > current_time) {
2346     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
2347         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
2348         GST_TIME_ARGS (current_time));
2349     return FALSE;
2350   }
2351
2352   /* get elapsed time since we last reported */
2353   elapsed = current_time - sess->last_rtcp_send_time;
2354
2355   /* perform forward reconsideration */
2356   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
2357
2358   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
2359       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
2360
2361   new_send_time += sess->last_rtcp_send_time;
2362
2363   /* check if reconsideration */
2364   if (current_time < new_send_time) {
2365     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
2366         GST_TIME_ARGS (new_send_time));
2367     result = FALSE;
2368     /* store new check time */
2369     sess->next_rtcp_check_time = new_send_time;
2370   } else {
2371     result = TRUE;
2372     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
2373
2374     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
2375         GST_TIME_ARGS (new_send_time));
2376     sess->next_rtcp_check_time = current_time + new_send_time;
2377   }
2378   return result;
2379 }
2380
2381 /**
2382  * rtp_session_on_timeout:
2383  * @sess: an #RTPSession
2384  * @current_time: the current system time
2385  * @ntpnstime: the current NTP time in nanoseconds
2386  *
2387  * Perform maintenance actions after the timeout obtained with
2388  * rtp_session_next_timeout() expired.
2389  *
2390  * This function will perform timeouts of receivers and senders, send a BYE
2391  * packet or generate RTCP packets with current session stats.
2392  *
2393  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
2394  * times, for each packet that should be processed.
2395  *
2396  * Returns: a #GstFlowReturn.
2397  */
2398 GstFlowReturn
2399 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
2400     guint64 ntpnstime)
2401 {
2402   GstFlowReturn result = GST_FLOW_OK;
2403   GList *item;
2404   ReportData data;
2405   RTPSource *own;
2406
2407   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2408
2409   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
2410       GST_TIME_ARGS (current_time), GST_TIME_ARGS (ntpnstime));
2411
2412   data.sess = sess;
2413   data.rtcp = NULL;
2414   data.current_time = current_time;
2415   data.ntpnstime = ntpnstime;
2416   data.is_bye = FALSE;
2417   data.has_sdes = FALSE;
2418
2419   own = sess->source;
2420
2421   RTP_SESSION_LOCK (sess);
2422   /* get a new interval, we need this for various cleanups etc */
2423   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
2424
2425   /* first perform cleanups */
2426   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
2427       (GHRFunc) session_cleanup, &data);
2428
2429   /* see if we need to generate SR or RR packets */
2430   if (is_rtcp_time (sess, current_time, &data)) {
2431     if (own->received_bye) {
2432       /* generate BYE instead */
2433       GST_DEBUG ("generating BYE message");
2434       session_bye (sess, &data);
2435       sess->sent_bye = TRUE;
2436     } else {
2437       /* loop over all known sources and do something */
2438       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2439           (GHFunc) session_report_blocks, &data);
2440     }
2441   }
2442
2443   if (data.rtcp) {
2444     guint size;
2445
2446     /* we keep track of the last report time in order to timeout inactive
2447      * receivers or senders */
2448     sess->last_rtcp_send_time = data.current_time;
2449     sess->first_rtcp = FALSE;
2450
2451     /* add SDES for this source when not already added */
2452     if (!data.has_sdes)
2453       session_sdes (sess, &data);
2454
2455     /* update average RTCP size before sending */
2456     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
2457     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
2458   }
2459
2460   /* check for outdated collisions */
2461   GST_DEBUG ("checking collision list");
2462   item = g_list_first (sess->conflicting_addresses);
2463   while (item) {
2464     RTPConflictingAddress *known_conflict = item->data;
2465     GList *next_item = g_list_next (item);
2466
2467     if (known_conflict->time < current_time - (data.interval *
2468             RTCP_INTERVAL_COLLISION_TIMEOUT)) {
2469       sess->conflicting_addresses =
2470           g_list_delete_link (sess->conflicting_addresses, item);
2471       GST_DEBUG ("collision %p timed out", known_conflict);
2472       g_free (known_conflict);
2473     }
2474     item = next_item;
2475   }
2476
2477   if (sess->change_ssrc) {
2478     GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
2479     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
2480         GINT_TO_POINTER (own->ssrc));
2481
2482     own->ssrc = rtp_session_create_new_ssrc (sess);
2483     rtp_source_reset (own);
2484
2485     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
2486         GINT_TO_POINTER (own->ssrc), own);
2487
2488     g_free (sess->bye_reason);
2489     sess->bye_reason = NULL;
2490     sess->sent_bye = FALSE;
2491     sess->change_ssrc = FALSE;
2492     GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
2493   }
2494   RTP_SESSION_UNLOCK (sess);
2495
2496   /* push out the RTCP packet */
2497   if (data.rtcp) {
2498     /* close the RTCP packet */
2499     gst_rtcp_buffer_end (data.rtcp);
2500
2501     GST_DEBUG ("sending packet");
2502     if (sess->callbacks.send_rtcp)
2503       result = sess->callbacks.send_rtcp (sess, own, data.rtcp,
2504           sess->sent_bye, sess->send_rtcp_user_data);
2505     else {
2506       GST_DEBUG ("freeing packet");
2507       gst_buffer_unref (data.rtcp);
2508     }
2509   }
2510
2511   return result;
2512 }