rtpsession: handle NONE RTCP intervals
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "gstrtpbin-marshal.h"
27 #include "rtpsession.h"
28
29 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
30 #define GST_CAT_DEFAULT rtp_session_debug
31
32 /* signals and args */
33 enum
34 {
35   SIGNAL_GET_SOURCE_BY_SSRC,
36   SIGNAL_ON_NEW_SSRC,
37   SIGNAL_ON_SSRC_COLLISION,
38   SIGNAL_ON_SSRC_VALIDATED,
39   SIGNAL_ON_SSRC_ACTIVE,
40   SIGNAL_ON_SSRC_SDES,
41   SIGNAL_ON_BYE_SSRC,
42   SIGNAL_ON_BYE_TIMEOUT,
43   SIGNAL_ON_TIMEOUT,
44   SIGNAL_ON_SENDER_TIMEOUT,
45   LAST_SIGNAL
46 };
47
48 #define DEFAULT_INTERNAL_SOURCE      NULL
49 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
50 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_BANDWIDTH
51 #define DEFAULT_RTCP_MTU             1400
52 #define DEFAULT_SDES                 NULL
53 #define DEFAULT_NUM_SOURCES          0
54 #define DEFAULT_NUM_ACTIVE_SOURCES   0
55 #define DEFAULT_SOURCES              NULL
56
57 enum
58 {
59   PROP_0,
60   PROP_INTERNAL_SSRC,
61   PROP_INTERNAL_SOURCE,
62   PROP_BANDWIDTH,
63   PROP_RTCP_FRACTION,
64   PROP_RTCP_MTU,
65   PROP_SDES,
66   PROP_NUM_SOURCES,
67   PROP_NUM_ACTIVE_SOURCES,
68   PROP_SOURCES,
69   PROP_FAVOR_NEW,
70   PROP_LAST
71 };
72
73 /* update average packet size, we keep this scaled by 16 to keep enough
74  * precision. */
75 #define UPDATE_AVG(avg, val)            \
76   if ((avg) == 0)                       \
77    (avg) = (val) << 4;                  \
78   else                                  \
79    (avg) = ((val) + (15 * (avg))) >> 4;
80
81 /* The number RTCP intervals after which to timeout entries in the
82  * collision table
83  */
84 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
85
86 /* GObject vmethods */
87 static void rtp_session_finalize (GObject * object);
88 static void rtp_session_set_property (GObject * object, guint prop_id,
89     const GValue * value, GParamSpec * pspec);
90 static void rtp_session_get_property (GObject * object, guint prop_id,
91     GValue * value, GParamSpec * pspec);
92
93 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
94
95 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
96
97 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
98     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
99 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
100     const gchar * reason, GstClockTime current_time);
101 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
102     gboolean deterministic, gboolean first);
103
104 static void
105 rtp_session_class_init (RTPSessionClass * klass)
106 {
107   GObjectClass *gobject_class;
108
109   gobject_class = (GObjectClass *) klass;
110
111   gobject_class->finalize = rtp_session_finalize;
112   gobject_class->set_property = rtp_session_set_property;
113   gobject_class->get_property = rtp_session_get_property;
114
115   /**
116    * RTPSession::get-source-by-ssrc:
117    * @session: the object which received the signal
118    * @ssrc: the SSRC of the RTPSource
119    *
120    * Request the #RTPSource object with SSRC @ssrc in @session.
121    */
122   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
123       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
124       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
125           get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
126       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
127
128   /**
129    * RTPSession::on-new-ssrc:
130    * @session: the object which received the signal
131    * @src: the new RTPSource
132    *
133    * Notify of a new SSRC that entered @session.
134    */
135   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
136       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
137       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
138       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
139       RTP_TYPE_SOURCE);
140   /**
141    * RTPSession::on-ssrc-collision:
142    * @session: the object which received the signal
143    * @src: the #RTPSource that caused a collision
144    *
145    * Notify when we have an SSRC collision
146    */
147   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
148       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
149       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
150       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
151       RTP_TYPE_SOURCE);
152   /**
153    * RTPSession::on-ssrc-validated:
154    * @session: the object which received the signal
155    * @src: the new validated RTPSource
156    *
157    * Notify of a new SSRC that became validated.
158    */
159   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
160       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
161       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
162       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
163       RTP_TYPE_SOURCE);
164   /**
165    * RTPSession::on-ssrc-active:
166    * @session: the object which received the signal
167    * @src: the active RTPSource
168    *
169    * Notify of a SSRC that is active, i.e., sending RTCP.
170    */
171   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
172       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
173       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
174       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
175       RTP_TYPE_SOURCE);
176   /**
177    * RTPSession::on-ssrc-sdes:
178    * @session: the object which received the signal
179    * @src: the RTPSource
180    *
181    * Notify that a new SDES was received for SSRC.
182    */
183   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
184       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
185       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
186       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
187       RTP_TYPE_SOURCE);
188   /**
189    * RTPSession::on-bye-ssrc:
190    * @session: the object which received the signal
191    * @src: the RTPSource that went away
192    *
193    * Notify of an SSRC that became inactive because of a BYE packet.
194    */
195   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
196       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
197       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
198       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
199       RTP_TYPE_SOURCE);
200   /**
201    * RTPSession::on-bye-timeout:
202    * @session: the object which received the signal
203    * @src: the RTPSource that timed out
204    *
205    * Notify of an SSRC that has timed out because of BYE
206    */
207   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
208       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
209       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
210       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
211       RTP_TYPE_SOURCE);
212   /**
213    * RTPSession::on-timeout:
214    * @session: the object which received the signal
215    * @src: the RTPSource that timed out
216    *
217    * Notify of an SSRC that has timed out
218    */
219   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
220       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
221       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
222       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
223       RTP_TYPE_SOURCE);
224   /**
225    * RTPSession::on-sender-timeout:
226    * @session: the object which received the signal
227    * @src: the RTPSource that timed out
228    *
229    * Notify of an SSRC that was a sender but timed out and became a receiver.
230    */
231   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
232       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
233       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
234       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
235       RTP_TYPE_SOURCE);
236
237   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
238       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
239           "The internal SSRC used for the session",
240           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
241
242   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
243       g_param_spec_object ("internal-source", "Internal Source",
244           "The internal source element of the session",
245           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
246
247   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
248       g_param_spec_double ("bandwidth", "Bandwidth",
249           "The bandwidth of the session",
250           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
251           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
252
253   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
254       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
255           "The fraction of the bandwidth used for RTCP",
256           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
257           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
258
259   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
260       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
261           "The maximum size of the RTCP packets",
262           16, G_MAXINT16, DEFAULT_RTCP_MTU,
263           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
264
265   g_object_class_install_property (gobject_class, PROP_SDES,
266       g_param_spec_boxed ("sdes", "SDES",
267           "The SDES items of this session",
268           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
269
270   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
271       g_param_spec_uint ("num-sources", "Num Sources",
272           "The number of sources in the session", 0, G_MAXUINT,
273           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
274
275   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
276       g_param_spec_uint ("num-active-sources", "Num Active Sources",
277           "The number of active sources in the session", 0, G_MAXUINT,
278           DEFAULT_NUM_ACTIVE_SOURCES,
279           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
280   /**
281    * RTPSource::sources
282    *
283    * Get a GValue Array of all sources in the session.
284    *
285    * <example>
286    * <title>Getting the #RTPSources of a session
287    * <programlisting>
288    * {
289    *   GValueArray *arr;
290    *   GValue *val;
291    *   guint i;
292    *
293    *   g_object_get (sess, "sources", &arr, NULL);
294    *
295    *   for (i = 0; i < arr->n_values; i++) {
296    *     RTPSource *source;
297    *
298    *     val = g_value_array_get_nth (arr, i);
299    *     source = g_value_get_object (val);
300    *   }
301    *   g_value_array_free (arr);
302    * }
303    * </programlisting>
304    * </example>
305    */
306   g_object_class_install_property (gobject_class, PROP_SOURCES,
307       g_param_spec_boxed ("sources", "Sources",
308           "An array of all known sources in the session",
309           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
310
311   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
312       g_param_spec_boolean ("favor-new", "Favor new sources",
313           "Resolve SSRC conflict in favor of new sources", FALSE,
314           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315
316
317   klass->get_source_by_ssrc =
318       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
319
320   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
321 }
322
323 static void
324 rtp_session_init (RTPSession * sess)
325 {
326   gint i;
327   gchar *str;
328
329   sess->lock = g_mutex_new ();
330   sess->key = g_random_int ();
331   sess->mask_idx = 0;
332   sess->mask = 0;
333
334   for (i = 0; i < 32; i++) {
335     sess->ssrcs[i] =
336         g_hash_table_new_full (NULL, NULL, NULL,
337         (GDestroyNotify) g_object_unref);
338   }
339   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
340
341   rtp_stats_init_defaults (&sess->stats);
342
343   /* create an active SSRC for this session manager */
344   sess->source = rtp_session_create_source (sess);
345   sess->source->validated = TRUE;
346   sess->source->internal = TRUE;
347   sess->stats.active_sources++;
348
349   /* default UDP header length */
350   sess->header_len = 28;
351   sess->mtu = DEFAULT_RTCP_MTU;
352
353   /* some default SDES entries */
354   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
355   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
356   g_free (str);
357
358   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME,
359       g_get_real_name ());
360   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
361
362   sess->first_rtcp = TRUE;
363
364   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
365 }
366
367 static void
368 rtp_session_finalize (GObject * object)
369 {
370   RTPSession *sess;
371   gint i;
372
373   sess = RTP_SESSION_CAST (object);
374
375   g_mutex_free (sess->lock);
376   for (i = 0; i < 32; i++)
377     g_hash_table_destroy (sess->ssrcs[i]);
378
379   g_free (sess->bye_reason);
380
381   g_hash_table_destroy (sess->cnames);
382   g_object_unref (sess->source);
383
384   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
385 }
386
387 static void
388 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
389 {
390   GValue value = { 0 };
391
392   g_value_init (&value, RTP_TYPE_SOURCE);
393   g_value_take_object (&value, source);
394   /* copies the value */
395   g_value_array_append (arr, &value);
396 }
397
398 static GValueArray *
399 rtp_session_create_sources (RTPSession * sess)
400 {
401   GValueArray *res;
402   guint size;
403
404   RTP_SESSION_LOCK (sess);
405   /* get number of elements in the table */
406   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
407   /* create the result value array */
408   res = g_value_array_new (size);
409
410   /* and copy all values into the array */
411   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
412   RTP_SESSION_UNLOCK (sess);
413
414   return res;
415 }
416
417 static void
418 rtp_session_set_property (GObject * object, guint prop_id,
419     const GValue * value, GParamSpec * pspec)
420 {
421   RTPSession *sess;
422
423   sess = RTP_SESSION (object);
424
425   switch (prop_id) {
426     case PROP_INTERNAL_SSRC:
427       rtp_session_set_internal_ssrc (sess, g_value_get_uint (value));
428       break;
429     case PROP_BANDWIDTH:
430       rtp_session_set_bandwidth (sess, g_value_get_double (value));
431       break;
432     case PROP_RTCP_FRACTION:
433       rtp_session_set_rtcp_fraction (sess, g_value_get_double (value));
434       break;
435     case PROP_RTCP_MTU:
436       sess->mtu = g_value_get_uint (value);
437       break;
438     case PROP_SDES:
439       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
440       break;
441     case PROP_FAVOR_NEW:
442       sess->favor_new = g_value_get_boolean (value);
443       break;
444     default:
445       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
446       break;
447   }
448 }
449
450 static void
451 rtp_session_get_property (GObject * object, guint prop_id,
452     GValue * value, GParamSpec * pspec)
453 {
454   RTPSession *sess;
455
456   sess = RTP_SESSION (object);
457
458   switch (prop_id) {
459     case PROP_INTERNAL_SSRC:
460       g_value_set_uint (value, rtp_session_get_internal_ssrc (sess));
461       break;
462     case PROP_INTERNAL_SOURCE:
463       g_value_take_object (value, rtp_session_get_internal_source (sess));
464       break;
465     case PROP_BANDWIDTH:
466       g_value_set_double (value, rtp_session_get_bandwidth (sess));
467       break;
468     case PROP_RTCP_FRACTION:
469       g_value_set_double (value, rtp_session_get_rtcp_fraction (sess));
470       break;
471     case PROP_RTCP_MTU:
472       g_value_set_uint (value, sess->mtu);
473       break;
474     case PROP_SDES:
475       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
476       break;
477     case PROP_NUM_SOURCES:
478       g_value_set_uint (value, rtp_session_get_num_sources (sess));
479       break;
480     case PROP_NUM_ACTIVE_SOURCES:
481       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
482       break;
483     case PROP_SOURCES:
484       g_value_take_boxed (value, rtp_session_create_sources (sess));
485       break;
486     case PROP_FAVOR_NEW:
487       g_value_set_boolean (value, sess->favor_new);
488       break;
489     default:
490       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
491       break;
492   }
493 }
494
495 static void
496 on_new_ssrc (RTPSession * sess, RTPSource * source)
497 {
498   g_object_ref (source);
499   RTP_SESSION_UNLOCK (sess);
500   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
501   RTP_SESSION_LOCK (sess);
502   g_object_unref (source);
503 }
504
505 static void
506 on_ssrc_collision (RTPSession * sess, RTPSource * source)
507 {
508   g_object_ref (source);
509   RTP_SESSION_UNLOCK (sess);
510   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
511       source);
512   RTP_SESSION_LOCK (sess);
513   g_object_unref (source);
514 }
515
516 static void
517 on_ssrc_validated (RTPSession * sess, RTPSource * source)
518 {
519   g_object_ref (source);
520   RTP_SESSION_UNLOCK (sess);
521   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
522       source);
523   RTP_SESSION_LOCK (sess);
524   g_object_unref (source);
525 }
526
527 static void
528 on_ssrc_active (RTPSession * sess, RTPSource * source)
529 {
530   g_object_ref (source);
531   RTP_SESSION_UNLOCK (sess);
532   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
533   RTP_SESSION_LOCK (sess);
534   g_object_unref (source);
535 }
536
537 static void
538 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
539 {
540   g_object_ref (source);
541   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
542   RTP_SESSION_UNLOCK (sess);
543   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
544   RTP_SESSION_LOCK (sess);
545   g_object_unref (source);
546 }
547
548 static void
549 on_bye_ssrc (RTPSession * sess, RTPSource * source)
550 {
551   g_object_ref (source);
552   RTP_SESSION_UNLOCK (sess);
553   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
554   RTP_SESSION_LOCK (sess);
555   g_object_unref (source);
556 }
557
558 static void
559 on_bye_timeout (RTPSession * sess, RTPSource * source)
560 {
561   g_object_ref (source);
562   RTP_SESSION_UNLOCK (sess);
563   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
564   RTP_SESSION_LOCK (sess);
565   g_object_unref (source);
566 }
567
568 static void
569 on_timeout (RTPSession * sess, RTPSource * source)
570 {
571   g_object_ref (source);
572   RTP_SESSION_UNLOCK (sess);
573   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
574   RTP_SESSION_LOCK (sess);
575   g_object_unref (source);
576 }
577
578 static void
579 on_sender_timeout (RTPSession * sess, RTPSource * source)
580 {
581   g_object_ref (source);
582   RTP_SESSION_UNLOCK (sess);
583   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
584       source);
585   RTP_SESSION_LOCK (sess);
586   g_object_unref (source);
587 }
588
589 /**
590  * rtp_session_new:
591  *
592  * Create a new session object.
593  *
594  * Returns: a new #RTPSession. g_object_unref() after usage.
595  */
596 RTPSession *
597 rtp_session_new (void)
598 {
599   RTPSession *sess;
600
601   sess = g_object_new (RTP_TYPE_SESSION, NULL);
602
603   return sess;
604 }
605
606 /**
607  * rtp_session_set_callbacks:
608  * @sess: an #RTPSession
609  * @callbacks: callbacks to configure
610  * @user_data: user data passed in the callbacks
611  *
612  * Configure a set of callbacks to be notified of actions.
613  */
614 void
615 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
616     gpointer user_data)
617 {
618   g_return_if_fail (RTP_IS_SESSION (sess));
619
620   if (callbacks->process_rtp) {
621     sess->callbacks.process_rtp = callbacks->process_rtp;
622     sess->process_rtp_user_data = user_data;
623   }
624   if (callbacks->send_rtp) {
625     sess->callbacks.send_rtp = callbacks->send_rtp;
626     sess->send_rtp_user_data = user_data;
627   }
628   if (callbacks->send_rtcp) {
629     sess->callbacks.send_rtcp = callbacks->send_rtcp;
630     sess->send_rtcp_user_data = user_data;
631   }
632   if (callbacks->sync_rtcp) {
633     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
634     sess->sync_rtcp_user_data = user_data;
635   }
636   if (callbacks->clock_rate) {
637     sess->callbacks.clock_rate = callbacks->clock_rate;
638     sess->clock_rate_user_data = user_data;
639   }
640   if (callbacks->reconsider) {
641     sess->callbacks.reconsider = callbacks->reconsider;
642     sess->reconsider_user_data = user_data;
643   }
644 }
645
646 /**
647  * rtp_session_set_process_rtp_callback:
648  * @sess: an #RTPSession
649  * @callback: callback to set
650  * @user_data: user data passed in the callback
651  *
652  * Configure only the process_rtp callback to be notified of the process_rtp action.
653  */
654 void
655 rtp_session_set_process_rtp_callback (RTPSession * sess,
656     RTPSessionProcessRTP callback, gpointer user_data)
657 {
658   g_return_if_fail (RTP_IS_SESSION (sess));
659
660   sess->callbacks.process_rtp = callback;
661   sess->process_rtp_user_data = user_data;
662 }
663
664 /**
665  * rtp_session_set_send_rtp_callback:
666  * @sess: an #RTPSession
667  * @callback: callback to set
668  * @user_data: user data passed in the callback
669  *
670  * Configure only the send_rtp callback to be notified of the send_rtp action.
671  */
672 void
673 rtp_session_set_send_rtp_callback (RTPSession * sess,
674     RTPSessionSendRTP callback, gpointer user_data)
675 {
676   g_return_if_fail (RTP_IS_SESSION (sess));
677
678   sess->callbacks.send_rtp = callback;
679   sess->send_rtp_user_data = user_data;
680 }
681
682 /**
683  * rtp_session_set_send_rtcp_callback:
684  * @sess: an #RTPSession
685  * @callback: callback to set
686  * @user_data: user data passed in the callback
687  *
688  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
689  */
690 void
691 rtp_session_set_send_rtcp_callback (RTPSession * sess,
692     RTPSessionSendRTCP callback, gpointer user_data)
693 {
694   g_return_if_fail (RTP_IS_SESSION (sess));
695
696   sess->callbacks.send_rtcp = callback;
697   sess->send_rtcp_user_data = user_data;
698 }
699
700 /**
701  * rtp_session_set_sync_rtcp_callback:
702  * @sess: an #RTPSession
703  * @callback: callback to set
704  * @user_data: user data passed in the callback
705  *
706  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
707  */
708 void
709 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
710     RTPSessionSyncRTCP callback, gpointer user_data)
711 {
712   g_return_if_fail (RTP_IS_SESSION (sess));
713
714   sess->callbacks.sync_rtcp = callback;
715   sess->sync_rtcp_user_data = user_data;
716 }
717
718 /**
719  * rtp_session_set_clock_rate_callback:
720  * @sess: an #RTPSession
721  * @callback: callback to set
722  * @user_data: user data passed in the callback
723  *
724  * Configure only the clock_rate callback to be notified of the clock_rate action.
725  */
726 void
727 rtp_session_set_clock_rate_callback (RTPSession * sess,
728     RTPSessionClockRate callback, gpointer user_data)
729 {
730   g_return_if_fail (RTP_IS_SESSION (sess));
731
732   sess->callbacks.clock_rate = callback;
733   sess->clock_rate_user_data = user_data;
734 }
735
736 /**
737  * rtp_session_set_reconsider_callback:
738  * @sess: an #RTPSession
739  * @callback: callback to set
740  * @user_data: user data passed in the callback
741  *
742  * Configure only the reconsider callback to be notified of the reconsider action.
743  */
744 void
745 rtp_session_set_reconsider_callback (RTPSession * sess,
746     RTPSessionReconsider callback, gpointer user_data)
747 {
748   g_return_if_fail (RTP_IS_SESSION (sess));
749
750   sess->callbacks.reconsider = callback;
751   sess->reconsider_user_data = user_data;
752 }
753
754 /**
755  * rtp_session_set_bandwidth:
756  * @sess: an #RTPSession
757  * @bandwidth: the bandwidth allocated
758  *
759  * Set the session bandwidth in bytes per second.
760  */
761 void
762 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
763 {
764   g_return_if_fail (RTP_IS_SESSION (sess));
765
766   RTP_SESSION_LOCK (sess);
767   sess->stats.bandwidth = bandwidth;
768   RTP_SESSION_UNLOCK (sess);
769 }
770
771 /**
772  * rtp_session_get_bandwidth:
773  * @sess: an #RTPSession
774  *
775  * Get the session bandwidth.
776  *
777  * Returns: the session bandwidth.
778  */
779 gdouble
780 rtp_session_get_bandwidth (RTPSession * sess)
781 {
782   gdouble result;
783
784   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
785
786   RTP_SESSION_LOCK (sess);
787   result = sess->stats.bandwidth;
788   RTP_SESSION_UNLOCK (sess);
789
790   return result;
791 }
792
793 /**
794  * rtp_session_set_rtcp_fraction:
795  * @sess: an #RTPSession
796  * @bandwidth: the RTCP bandwidth
797  *
798  * Set the bandwidth that should be used for RTCP
799  * messages.
800  */
801 void
802 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
803 {
804   g_return_if_fail (RTP_IS_SESSION (sess));
805
806   RTP_SESSION_LOCK (sess);
807   sess->stats.rtcp_bandwidth = bandwidth;
808   RTP_SESSION_UNLOCK (sess);
809 }
810
811 /**
812  * rtp_session_get_rtcp_fraction:
813  * @sess: an #RTPSession
814  *
815  * Get the session bandwidth used for RTCP.
816  *
817  * Returns: The bandwidth used for RTCP messages.
818  */
819 gdouble
820 rtp_session_get_rtcp_fraction (RTPSession * sess)
821 {
822   gdouble result;
823
824   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
825
826   RTP_SESSION_LOCK (sess);
827   result = sess->stats.rtcp_bandwidth;
828   RTP_SESSION_UNLOCK (sess);
829
830   return result;
831 }
832
833 /**
834  * rtp_session_set_sdes_string:
835  * @sess: an #RTPSession
836  * @type: the type of the SDES item
837  * @item: a null-terminated string to set.
838  *
839  * Store an SDES item of @type in @sess.
840  *
841  * Returns: %FALSE if the data was unchanged @type is invalid.
842  */
843 gboolean
844 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
845     const gchar * item)
846 {
847   gboolean result;
848
849   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
850
851   RTP_SESSION_LOCK (sess);
852   result = rtp_source_set_sdes_string (sess->source, type, item);
853   RTP_SESSION_UNLOCK (sess);
854
855   return result;
856 }
857
858 /**
859  * rtp_session_get_sdes_string:
860  * @sess: an #RTPSession
861  * @type: the type of the SDES item
862  *
863  * Get the SDES item of @type from @sess.
864  *
865  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
866  * valid. g_free() after usage.
867  */
868 gchar *
869 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
870 {
871   gchar *result;
872
873   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
874
875   RTP_SESSION_LOCK (sess);
876   result = rtp_source_get_sdes_string (sess->source, type);
877   RTP_SESSION_UNLOCK (sess);
878
879   return result;
880 }
881
882 /**
883  * rtp_session_get_sdes_struct:
884  * @sess: an #RTSPSession
885  *
886  * Get the SDES data as a #GstStructure
887  *
888  * Returns: a GstStructure with SDES items for @sess. This function returns a
889  * copy of the SDES structure, use gst_structure_free() after usage.
890  */
891 GstStructure *
892 rtp_session_get_sdes_struct (RTPSession * sess)
893 {
894   const GstStructure *sdes;
895   GstStructure *result = NULL;
896
897   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
898
899   RTP_SESSION_LOCK (sess);
900   sdes = rtp_source_get_sdes_struct (sess->source);
901   if (sdes)
902     result = gst_structure_copy (sdes);
903   RTP_SESSION_UNLOCK (sess);
904
905   return result;
906 }
907
908 /**
909  * rtp_session_set_sdes_struct:
910  * @sess: an #RTSPSession
911  * @sdes: a #GstStructure
912  *
913  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
914  */
915 void
916 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
917 {
918   g_return_if_fail (sdes);
919   g_return_if_fail (RTP_IS_SESSION (sess));
920
921   RTP_SESSION_LOCK (sess);
922   rtp_source_set_sdes_struct (sess->source, gst_structure_copy (sdes));
923   RTP_SESSION_UNLOCK (sess);
924 }
925
926 static GstFlowReturn
927 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
928 {
929   GstFlowReturn result = GST_FLOW_OK;
930
931   if (source == session->source) {
932     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
933
934     RTP_SESSION_UNLOCK (session);
935
936     if (session->callbacks.send_rtp)
937       result =
938           session->callbacks.send_rtp (session, source, data,
939           session->send_rtp_user_data);
940     else {
941       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
942     }
943   } else {
944     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
945     RTP_SESSION_UNLOCK (session);
946
947     if (session->callbacks.process_rtp)
948       result =
949           session->callbacks.process_rtp (session, source,
950           GST_BUFFER_CAST (data), session->process_rtp_user_data);
951     else
952       gst_buffer_unref (GST_BUFFER_CAST (data));
953   }
954   RTP_SESSION_LOCK (session);
955
956   return result;
957 }
958
959 static gint
960 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
961 {
962   gint result;
963
964   RTP_SESSION_UNLOCK (session);
965
966   if (session->callbacks.clock_rate)
967     result =
968         session->callbacks.clock_rate (session, pt,
969         session->clock_rate_user_data);
970   else
971     result = -1;
972
973   RTP_SESSION_LOCK (session);
974
975   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
976
977   return result;
978 }
979
980 static RTPSourceCallbacks callbacks = {
981   (RTPSourcePushRTP) source_push_rtp,
982   (RTPSourceClockRate) source_clock_rate,
983 };
984
985 static gboolean
986 check_collision (RTPSession * sess, RTPSource * source,
987     RTPArrivalStats * arrival, gboolean rtp)
988 {
989   /* If we have no arrival address, we can't do collision checking */
990   if (!arrival->have_address)
991     return FALSE;
992
993   if (sess->source != source) {
994     GstNetAddress *from;
995     gboolean have_from;
996
997     /* This is not our local source, but lets check if two remote
998      * source collide
999      */
1000
1001     if (rtp) {
1002       from = &source->rtp_from;
1003       have_from = source->have_rtp_from;
1004     } else {
1005       from = &source->rtcp_from;
1006       have_from = source->have_rtcp_from;
1007     }
1008
1009     if (have_from) {
1010       if (gst_netaddress_equal (from, &arrival->address)) {
1011         /* Address is the same */
1012         return FALSE;
1013       } else {
1014         GST_LOG ("we have a third-party collision or loop ssrc:%x",
1015             rtp_source_get_ssrc (source));
1016         if (sess->favor_new) {
1017           if (rtp_source_find_conflicting_address (source,
1018                   &arrival->address, arrival->current_time)) {
1019             gchar buf1[40];
1020             gst_netaddress_to_string (&arrival->address, buf1, 40);
1021             GST_LOG ("Known conflict on %x for %s, dropping packet",
1022                 rtp_source_get_ssrc (source), buf1);
1023             return TRUE;
1024           } else {
1025             gchar buf1[40], buf2[40];
1026
1027             /* Current address is not a known conflict, lets assume this is
1028              * a new source. Save old address in possible conflict list
1029              */
1030             rtp_source_add_conflicting_address (source, from,
1031                 arrival->current_time);
1032
1033             gst_netaddress_to_string (from, buf1, 40);
1034             gst_netaddress_to_string (&arrival->address, buf2, 40);
1035             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1036                 " saving old as known conflict",
1037                 rtp_source_get_ssrc (source), buf1, buf2);
1038
1039             if (rtp)
1040               rtp_source_set_rtp_from (source, &arrival->address);
1041             else
1042               rtp_source_set_rtcp_from (source, &arrival->address);
1043             return FALSE;
1044           }
1045         } else {
1046           /* Don't need to save old addresses, we ignore new sources */
1047           return TRUE;
1048         }
1049       }
1050     } else {
1051       /* We don't already have a from address for RTP, just set it */
1052       if (rtp)
1053         rtp_source_set_rtp_from (source, &arrival->address);
1054       else
1055         rtp_source_set_rtcp_from (source, &arrival->address);
1056       return FALSE;
1057     }
1058
1059     /* FIXME: Log 3rd party collision somehow
1060      * Maybe should be done in upper layer, only the SDES can tell us
1061      * if its a collision or a loop
1062      */
1063   } else {
1064     /* This is sending with our ssrc, is it an address we already know */
1065
1066     if (rtp_source_find_conflicting_address (source, &arrival->address,
1067             arrival->current_time)) {
1068       /* Its a known conflict, its probably a loop, not a collision
1069        * lets just drop the incoming packet
1070        */
1071       GST_DEBUG ("Our packets are being looped back to us, dropping");
1072     } else {
1073       /* Its a new collision, lets change our SSRC */
1074
1075       rtp_source_add_conflicting_address (source, &arrival->address,
1076           arrival->current_time);
1077
1078       GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
1079       on_ssrc_collision (sess, source);
1080
1081       rtp_session_schedule_bye_locked (sess, "SSRC Collision",
1082           arrival->current_time);
1083
1084       sess->change_ssrc = TRUE;
1085     }
1086   }
1087
1088   return TRUE;
1089 }
1090
1091
1092 /* must be called with the session lock, the returned source needs to be
1093  * unreffed after usage. */
1094 static RTPSource *
1095 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1096     RTPArrivalStats * arrival, gboolean rtp)
1097 {
1098   RTPSource *source;
1099
1100   source =
1101       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1102   if (source == NULL) {
1103     /* make new Source in probation and insert */
1104     source = rtp_source_new (ssrc);
1105
1106     /* for RTP packets we need to set the source in probation. Receiving RTCP
1107      * packets of an SSRC, on the other hand, is a strong indication that we
1108      * are dealing with a valid source. */
1109     if (rtp)
1110       source->probation = RTP_DEFAULT_PROBATION;
1111     else
1112       source->probation = 0;
1113
1114     /* store from address, if any */
1115     if (arrival->have_address) {
1116       if (rtp)
1117         rtp_source_set_rtp_from (source, &arrival->address);
1118       else
1119         rtp_source_set_rtcp_from (source, &arrival->address);
1120     }
1121
1122     /* configure a callback on the source */
1123     rtp_source_set_callbacks (source, &callbacks, sess);
1124
1125     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1126         source);
1127
1128     /* we have one more source now */
1129     sess->total_sources++;
1130     *created = TRUE;
1131   } else {
1132     *created = FALSE;
1133     /* check for collision, this updates the address when not previously set */
1134     if (check_collision (sess, source, arrival, rtp)) {
1135       return NULL;
1136     }
1137   }
1138   /* update last activity */
1139   source->last_activity = arrival->current_time;
1140   if (rtp)
1141     source->last_rtp_activity = arrival->current_time;
1142   g_object_ref (source);
1143
1144   return source;
1145 }
1146
1147 /**
1148  * rtp_session_get_internal_source:
1149  * @sess: a #RTPSession
1150  *
1151  * Get the internal #RTPSource of @sess.
1152  *
1153  * Returns: The internal #RTPSource. g_object_unref() after usage.
1154  */
1155 RTPSource *
1156 rtp_session_get_internal_source (RTPSession * sess)
1157 {
1158   RTPSource *result;
1159
1160   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1161
1162   result = g_object_ref (sess->source);
1163
1164   return result;
1165 }
1166
1167 /**
1168  * rtp_session_set_internal_ssrc:
1169  * @sess: a #RTPSession
1170  * @ssrc: an SSRC
1171  *
1172  * Set the SSRC of @sess to @ssrc.
1173  */
1174 void
1175 rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
1176 {
1177   RTP_SESSION_LOCK (sess);
1178   if (ssrc != sess->source->ssrc) {
1179     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
1180         GINT_TO_POINTER (sess->source->ssrc));
1181
1182     GST_DEBUG ("setting internal SSRC to %08x", ssrc);
1183     /* After this call, any receiver of the old SSRC either in RTP or RTCP
1184      * packets will timeout on the old SSRC, we could potentially schedule a
1185      * BYE RTCP for the old SSRC... */
1186     sess->source->ssrc = ssrc;
1187     rtp_source_reset (sess->source);
1188
1189     /* rehash with the new SSRC */
1190     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1191         GINT_TO_POINTER (sess->source->ssrc), sess->source);
1192   }
1193   RTP_SESSION_UNLOCK (sess);
1194
1195   g_object_notify (G_OBJECT (sess), "internal-ssrc");
1196 }
1197
1198 /**
1199  * rtp_session_get_internal_ssrc:
1200  * @sess: a #RTPSession
1201  *
1202  * Get the internal SSRC of @sess.
1203  *
1204  * Returns: The SSRC of the session.
1205  */
1206 guint32
1207 rtp_session_get_internal_ssrc (RTPSession * sess)
1208 {
1209   guint32 ssrc;
1210
1211   RTP_SESSION_LOCK (sess);
1212   ssrc = sess->source->ssrc;
1213   RTP_SESSION_UNLOCK (sess);
1214
1215   return ssrc;
1216 }
1217
1218 /**
1219  * rtp_session_add_source:
1220  * @sess: a #RTPSession
1221  * @src: #RTPSource to add
1222  *
1223  * Add @src to @session.
1224  *
1225  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1226  * existed in the session.
1227  */
1228 gboolean
1229 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1230 {
1231   gboolean result = FALSE;
1232   RTPSource *find;
1233
1234   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1235   g_return_val_if_fail (src != NULL, FALSE);
1236
1237   RTP_SESSION_LOCK (sess);
1238   find =
1239       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1240       GINT_TO_POINTER (src->ssrc));
1241   if (find == NULL) {
1242     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1243         GINT_TO_POINTER (src->ssrc), src);
1244     /* we have one more source now */
1245     sess->total_sources++;
1246     result = TRUE;
1247   }
1248   RTP_SESSION_UNLOCK (sess);
1249
1250   return result;
1251 }
1252
1253 /**
1254  * rtp_session_get_num_sources:
1255  * @sess: an #RTPSession
1256  *
1257  * Get the number of sources in @sess.
1258  *
1259  * Returns: The number of sources in @sess.
1260  */
1261 guint
1262 rtp_session_get_num_sources (RTPSession * sess)
1263 {
1264   guint result;
1265
1266   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1267
1268   RTP_SESSION_LOCK (sess);
1269   result = sess->total_sources;
1270   RTP_SESSION_UNLOCK (sess);
1271
1272   return result;
1273 }
1274
1275 /**
1276  * rtp_session_get_num_active_sources:
1277  * @sess: an #RTPSession
1278  *
1279  * Get the number of active sources in @sess. A source is considered active when
1280  * it has been validated and has not yet received a BYE RTCP message.
1281  *
1282  * Returns: The number of active sources in @sess.
1283  */
1284 guint
1285 rtp_session_get_num_active_sources (RTPSession * sess)
1286 {
1287   guint result;
1288
1289   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1290
1291   RTP_SESSION_LOCK (sess);
1292   result = sess->stats.active_sources;
1293   RTP_SESSION_UNLOCK (sess);
1294
1295   return result;
1296 }
1297
1298 /**
1299  * rtp_session_get_source_by_ssrc:
1300  * @sess: an #RTPSession
1301  * @ssrc: an SSRC
1302  *
1303  * Find the source with @ssrc in @sess.
1304  *
1305  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1306  * g_object_unref() after usage.
1307  */
1308 RTPSource *
1309 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1310 {
1311   RTPSource *result;
1312
1313   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1314
1315   RTP_SESSION_LOCK (sess);
1316   result =
1317       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1318   if (result)
1319     g_object_ref (result);
1320   RTP_SESSION_UNLOCK (sess);
1321
1322   return result;
1323 }
1324
1325 /**
1326  * rtp_session_get_source_by_cname:
1327  * @sess: a #RTPSession
1328  * @cname: an CNAME
1329  *
1330  * Find the source with @cname in @sess.
1331  *
1332  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
1333  * g_object_unref() after usage.
1334  */
1335 RTPSource *
1336 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
1337 {
1338   RTPSource *result;
1339
1340   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1341   g_return_val_if_fail (cname != NULL, NULL);
1342
1343   RTP_SESSION_LOCK (sess);
1344   result = g_hash_table_lookup (sess->cnames, cname);
1345   if (result)
1346     g_object_ref (result);
1347   RTP_SESSION_UNLOCK (sess);
1348
1349   return result;
1350 }
1351
1352 static guint32
1353 rtp_session_create_new_ssrc (RTPSession * sess)
1354 {
1355   guint32 ssrc;
1356
1357   while (TRUE) {
1358     ssrc = g_random_int ();
1359
1360     /* see if it exists in the session, we're done if it doesn't */
1361     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1362             GINT_TO_POINTER (ssrc)) == NULL)
1363       break;
1364   }
1365   return ssrc;
1366 }
1367
1368
1369 /**
1370  * rtp_session_create_source:
1371  * @sess: an #RTPSession
1372  *
1373  * Create an #RTPSource for use in @sess. This function will create a source
1374  * with an ssrc that is currently not used by any participants in the session.
1375  *
1376  * Returns: an #RTPSource.
1377  */
1378 RTPSource *
1379 rtp_session_create_source (RTPSession * sess)
1380 {
1381   guint32 ssrc;
1382   RTPSource *source;
1383
1384   RTP_SESSION_LOCK (sess);
1385   ssrc = rtp_session_create_new_ssrc (sess);
1386   source = rtp_source_new (ssrc);
1387   rtp_source_set_callbacks (source, &callbacks, sess);
1388   /* we need an additional ref for the source in the hashtable */
1389   g_object_ref (source);
1390   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1391       source);
1392   /* we have one more source now */
1393   sess->total_sources++;
1394   RTP_SESSION_UNLOCK (sess);
1395
1396   return source;
1397 }
1398
1399 /* update the RTPArrivalStats structure with the current time and other bits
1400  * about the current buffer we are handling.
1401  * This function is typically called when a validated packet is received.
1402  * This function should be called with the SESSION_LOCK
1403  */
1404 static void
1405 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1406     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1407     GstClockTime running_time)
1408 {
1409   /* get time of arrival */
1410   arrival->current_time = current_time;
1411   arrival->running_time = running_time;
1412
1413   /* get packet size including header overhead */
1414   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
1415
1416   if (rtp) {
1417     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1418   } else {
1419     arrival->payload_len = 0;
1420   }
1421
1422   /* for netbuffer we can store the IP address to check for collisions */
1423   arrival->have_address = GST_IS_NETBUFFER (buffer);
1424   if (arrival->have_address) {
1425     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1426
1427     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1428   }
1429 }
1430
1431 /**
1432  * rtp_session_process_rtp:
1433  * @sess: and #RTPSession
1434  * @buffer: an RTP buffer
1435  * @current_time: the current system time
1436  *
1437  * Process an RTP buffer in the session manager. This function takes ownership
1438  * of @buffer.
1439  *
1440  * Returns: a #GstFlowReturn.
1441  */
1442 GstFlowReturn
1443 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1444     GstClockTime current_time, GstClockTime running_time)
1445 {
1446   GstFlowReturn result;
1447   guint32 ssrc;
1448   RTPSource *source;
1449   gboolean created;
1450   gboolean prevsender, prevactive;
1451   RTPArrivalStats arrival;
1452   guint32 csrcs[16];
1453   guint8 i, count;
1454
1455   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1456   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1457
1458   if (!gst_rtp_buffer_validate (buffer))
1459     goto invalid_packet;
1460
1461   RTP_SESSION_LOCK (sess);
1462   /* update arrival stats */
1463   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1464       running_time);
1465
1466   /* ignore more RTP packets when we left the session */
1467   if (sess->source->received_bye)
1468     goto ignore;
1469
1470   /* get SSRC and look up in session database */
1471   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1472   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1473   if (!source)
1474     goto collision;
1475
1476   prevsender = RTP_SOURCE_IS_SENDER (source);
1477   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1478
1479   /* copy available csrc for later */
1480   count = gst_rtp_buffer_get_csrc_count (buffer);
1481   /* make sure to not overflow our array. An RTP buffer can maximally contain
1482    * 16 CSRCs */
1483   count = MIN (count, 16);
1484
1485   for (i = 0; i < count; i++)
1486     csrcs[i] = gst_rtp_buffer_get_csrc (buffer, i);
1487
1488   /* let source process the packet */
1489   result = rtp_source_process_rtp (source, buffer, &arrival);
1490
1491   /* source became active */
1492   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1493     sess->stats.active_sources++;
1494     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1495         sess->stats.active_sources);
1496     on_ssrc_validated (sess, source);
1497   }
1498   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1499     sess->stats.sender_sources++;
1500     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1501         sess->stats.sender_sources);
1502   }
1503
1504   if (created)
1505     on_new_ssrc (sess, source);
1506
1507   if (source->validated) {
1508     gboolean created;
1509
1510     /* for validated sources, we add the CSRCs as well */
1511     for (i = 0; i < count; i++) {
1512       guint32 csrc;
1513       RTPSource *csrc_src;
1514
1515       csrc = csrcs[i];
1516
1517       /* get source */
1518       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1519       if (!csrc_src)
1520         continue;
1521
1522       if (created) {
1523         GST_DEBUG ("created new CSRC: %08x", csrc);
1524         rtp_source_set_as_csrc (csrc_src);
1525         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1526           sess->stats.active_sources++;
1527         on_new_ssrc (sess, csrc_src);
1528       }
1529       g_object_unref (csrc_src);
1530     }
1531   }
1532   g_object_unref (source);
1533
1534   RTP_SESSION_UNLOCK (sess);
1535
1536   return result;
1537
1538   /* ERRORS */
1539 invalid_packet:
1540   {
1541     gst_buffer_unref (buffer);
1542     GST_DEBUG ("invalid RTP packet received");
1543     return GST_FLOW_OK;
1544   }
1545 ignore:
1546   {
1547     gst_buffer_unref (buffer);
1548     RTP_SESSION_UNLOCK (sess);
1549     GST_DEBUG ("ignoring RTP packet because we are leaving");
1550     return GST_FLOW_OK;
1551   }
1552 collision:
1553   {
1554     gst_buffer_unref (buffer);
1555     RTP_SESSION_UNLOCK (sess);
1556     GST_DEBUG ("ignoring packet because its collisioning");
1557     return GST_FLOW_OK;
1558   }
1559 }
1560
1561 static void
1562 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1563     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1564 {
1565   guint count, i;
1566
1567   count = gst_rtcp_packet_get_rb_count (packet);
1568   for (i = 0; i < count; i++) {
1569     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1570     guint8 fractionlost;
1571     gint32 packetslost;
1572
1573     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1574         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1575
1576     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1577
1578     if (ssrc == sess->source->ssrc) {
1579       /* only deal with report blocks for our session, we update the stats of
1580        * the sender of the RTCP message. We could also compare our stats against
1581        * the other sender to see if we are better or worse. */
1582       rtp_source_process_rb (source, arrival->current_time, fractionlost,
1583           packetslost, exthighestseq, jitter, lsr, dlsr);
1584
1585       on_ssrc_active (sess, source);
1586     }
1587   }
1588 }
1589
1590 /* A Sender report contains statistics about how the sender is doing. This
1591  * includes timing informataion such as the relation between RTP and NTP
1592  * timestamps and the number of packets/bytes it sent to us.
1593  *
1594  * In this report is also included a set of report blocks related to how this
1595  * sender is receiving data (in case we (or somebody else) is also sending stuff
1596  * to it). This info includes the packet loss, jitter and seqnum. It also
1597  * contains information to calculate the round trip time (LSR/DLSR).
1598  */
1599 static void
1600 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1601     RTPArrivalStats * arrival, gboolean * do_sync)
1602 {
1603   guint32 senderssrc, rtptime, packet_count, octet_count;
1604   guint64 ntptime;
1605   RTPSource *source;
1606   gboolean created, prevsender;
1607
1608   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1609       &packet_count, &octet_count);
1610
1611   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1612       senderssrc, GST_TIME_ARGS (arrival->current_time));
1613
1614   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1615   if (!source)
1616     return;
1617
1618   /* don't try to do lip-sync for sources that sent a BYE */
1619   if (rtp_source_received_bye (source))
1620     *do_sync = FALSE;
1621   else
1622     *do_sync = TRUE;
1623
1624   prevsender = RTP_SOURCE_IS_SENDER (source);
1625
1626   /* first update the source */
1627   rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
1628       packet_count, octet_count);
1629
1630   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1631     sess->stats.sender_sources++;
1632     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1633         sess->stats.sender_sources);
1634   }
1635
1636   if (created)
1637     on_new_ssrc (sess, source);
1638
1639   rtp_session_process_rb (sess, source, packet, arrival);
1640   g_object_unref (source);
1641 }
1642
1643 /* A receiver report contains statistics about how a receiver is doing. It
1644  * includes stuff like packet loss, jitter and the seqnum it received last. It
1645  * also contains info to calculate the round trip time.
1646  *
1647  * We are only interested in how the sender of this report is doing wrt to us.
1648  */
1649 static void
1650 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1651     RTPArrivalStats * arrival)
1652 {
1653   guint32 senderssrc;
1654   RTPSource *source;
1655   gboolean created;
1656
1657   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1658
1659   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1660
1661   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1662   if (!source)
1663     return;
1664
1665   if (created)
1666     on_new_ssrc (sess, source);
1667
1668   rtp_session_process_rb (sess, source, packet, arrival);
1669   g_object_unref (source);
1670 }
1671
1672 /* Get SDES items and store them in the SSRC */
1673 static void
1674 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1675     RTPArrivalStats * arrival)
1676 {
1677   guint items, i, j;
1678   gboolean more_items, more_entries;
1679
1680   items = gst_rtcp_packet_sdes_get_item_count (packet);
1681   GST_DEBUG ("got SDES packet with %d items", items);
1682
1683   more_items = gst_rtcp_packet_sdes_first_item (packet);
1684   i = 0;
1685   while (more_items) {
1686     guint32 ssrc;
1687     gboolean changed, created;
1688     RTPSource *source;
1689     GstStructure *sdes;
1690
1691     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1692
1693     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1694
1695     changed = FALSE;
1696
1697     /* find src, no probation when dealing with RTCP */
1698     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1699     if (!source)
1700       return;
1701
1702     sdes = gst_structure_new ("application/x-rtp-source-sdes", NULL);
1703
1704     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1705     j = 0;
1706     while (more_entries) {
1707       GstRTCPSDESType type;
1708       guint8 len;
1709       guint8 *data;
1710       gchar *name;
1711       gchar *value;
1712
1713       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1714
1715       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1716           data);
1717
1718       if (type == GST_RTCP_SDES_PRIV) {
1719         name = g_strndup ((const gchar *) &data[1], data[0]);
1720         len -= data[0] + 1;
1721         data += data[0] + 1;
1722       } else {
1723         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
1724       }
1725
1726       value = g_strndup ((const gchar *) data, len);
1727
1728       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
1729
1730       g_free (name);
1731       g_free (value);
1732
1733       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1734       j++;
1735     }
1736
1737     /* takes ownership of sdes */
1738     changed = rtp_source_set_sdes_struct (source, sdes);
1739
1740     source->validated = TRUE;
1741
1742     if (created)
1743       on_new_ssrc (sess, source);
1744     if (changed)
1745       on_ssrc_sdes (sess, source);
1746
1747     g_object_unref (source);
1748
1749     more_items = gst_rtcp_packet_sdes_next_item (packet);
1750     i++;
1751   }
1752 }
1753
1754 /* BYE is sent when a client leaves the session
1755  */
1756 static void
1757 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1758     RTPArrivalStats * arrival)
1759 {
1760   guint count, i;
1761   gchar *reason;
1762   gboolean reconsider = FALSE;
1763
1764   reason = gst_rtcp_packet_bye_get_reason (packet);
1765   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1766
1767   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1768   for (i = 0; i < count; i++) {
1769     guint32 ssrc;
1770     RTPSource *source;
1771     gboolean created, prevactive, prevsender;
1772     guint pmembers, members;
1773
1774     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1775     GST_DEBUG ("SSRC: %08x", ssrc);
1776
1777     /* find src and mark bye, no probation when dealing with RTCP */
1778     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1779     if (!source)
1780       return;
1781
1782     /* store time for when we need to time out this source */
1783     source->bye_time = arrival->current_time;
1784
1785     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1786     prevsender = RTP_SOURCE_IS_SENDER (source);
1787
1788     /* let the source handle the rest */
1789     rtp_source_process_bye (source, reason);
1790
1791     pmembers = sess->stats.active_sources;
1792
1793     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1794       sess->stats.active_sources--;
1795       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1796           sess->stats.active_sources);
1797     }
1798     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1799       sess->stats.sender_sources--;
1800       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1801           sess->stats.sender_sources);
1802     }
1803     members = sess->stats.active_sources;
1804
1805     if (!sess->source->received_bye && members < pmembers) {
1806       /* some members went away since the previous timeout estimate.
1807        * Perform reverse reconsideration but only when we are not scheduling a
1808        * BYE ourselves. */
1809       if (arrival->current_time < sess->next_rtcp_check_time) {
1810         GstClockTime time_remaining;
1811
1812         time_remaining = sess->next_rtcp_check_time - arrival->current_time;
1813         sess->next_rtcp_check_time =
1814             gst_util_uint64_scale (time_remaining, members, pmembers);
1815
1816         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1817             GST_TIME_ARGS (sess->next_rtcp_check_time));
1818
1819         sess->next_rtcp_check_time += arrival->current_time;
1820
1821         /* mark pending reconsider. We only want to signal the reconsideration
1822          * once after we handled all the source in the bye packet */
1823         reconsider = TRUE;
1824       }
1825     }
1826
1827     if (created)
1828       on_new_ssrc (sess, source);
1829
1830     on_bye_ssrc (sess, source);
1831
1832     g_object_unref (source);
1833   }
1834   if (reconsider) {
1835     RTP_SESSION_UNLOCK (sess);
1836     /* notify app of reconsideration */
1837     if (sess->callbacks.reconsider)
1838       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
1839     RTP_SESSION_LOCK (sess);
1840   }
1841   g_free (reason);
1842 }
1843
1844 static void
1845 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1846     RTPArrivalStats * arrival)
1847 {
1848   GST_DEBUG ("received APP");
1849 }
1850
1851 /**
1852  * rtp_session_process_rtcp:
1853  * @sess: and #RTPSession
1854  * @buffer: an RTCP buffer
1855  * @current_time: the current system time
1856  *
1857  * Process an RTCP buffer in the session manager. This function takes ownership
1858  * of @buffer.
1859  *
1860  * Returns: a #GstFlowReturn.
1861  */
1862 GstFlowReturn
1863 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
1864     GstClockTime current_time)
1865 {
1866   GstRTCPPacket packet;
1867   gboolean more, is_bye = FALSE, do_sync = FALSE;
1868   RTPArrivalStats arrival;
1869   GstFlowReturn result = GST_FLOW_OK;
1870
1871   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1872   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1873
1874   if (!gst_rtcp_buffer_validate (buffer))
1875     goto invalid_packet;
1876
1877   GST_DEBUG ("received RTCP packet");
1878
1879   RTP_SESSION_LOCK (sess);
1880   /* update arrival stats */
1881   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1);
1882
1883   if (sess->sent_bye)
1884     goto ignore;
1885
1886   /* make writable, we might want to change the buffer */
1887   buffer = gst_buffer_make_metadata_writable (buffer);
1888
1889   /* start processing the compound packet */
1890   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1891   while (more) {
1892     GstRTCPType type;
1893
1894     type = gst_rtcp_packet_get_type (&packet);
1895
1896     /* when we are leaving the session, we should ignore all non-BYE messages */
1897     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1898       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1899       goto next;
1900     }
1901
1902     switch (type) {
1903       case GST_RTCP_TYPE_SR:
1904         rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
1905         break;
1906       case GST_RTCP_TYPE_RR:
1907         rtp_session_process_rr (sess, &packet, &arrival);
1908         break;
1909       case GST_RTCP_TYPE_SDES:
1910         rtp_session_process_sdes (sess, &packet, &arrival);
1911         break;
1912       case GST_RTCP_TYPE_BYE:
1913         is_bye = TRUE;
1914         /* don't try to attempt lip-sync anymore for streams with a BYE */
1915         do_sync = FALSE;
1916         rtp_session_process_bye (sess, &packet, &arrival);
1917         break;
1918       case GST_RTCP_TYPE_APP:
1919         rtp_session_process_app (sess, &packet, &arrival);
1920         break;
1921       default:
1922         GST_WARNING ("got unknown RTCP packet");
1923         break;
1924     }
1925   next:
1926     more = gst_rtcp_packet_move_to_next (&packet);
1927   }
1928
1929   /* if we are scheduling a BYE, we only want to count bye packets, else we
1930    * count everything */
1931   if (sess->source->received_bye) {
1932     if (is_bye) {
1933       sess->stats.bye_members++;
1934       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1935     }
1936   } else {
1937     /* keep track of average packet size */
1938     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1939   }
1940   RTP_SESSION_UNLOCK (sess);
1941
1942   /* notify caller of sr packets in the callback */
1943   if (do_sync && sess->callbacks.sync_rtcp)
1944     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
1945         sess->sync_rtcp_user_data);
1946   else
1947     gst_buffer_unref (buffer);
1948
1949   return result;
1950
1951   /* ERRORS */
1952 invalid_packet:
1953   {
1954     GST_DEBUG ("invalid RTCP packet received");
1955     gst_buffer_unref (buffer);
1956     return GST_FLOW_OK;
1957   }
1958 ignore:
1959   {
1960     gst_buffer_unref (buffer);
1961     RTP_SESSION_UNLOCK (sess);
1962     GST_DEBUG ("ignoring RTP packet because we left");
1963     return GST_FLOW_OK;
1964   }
1965 }
1966
1967 /**
1968  * rtp_session_send_rtp:
1969  * @sess: an #RTPSession
1970  * @data: pointer to either an RTP buffer or a list of RTP buffers
1971  * @is_list: TRUE when @data is a buffer list
1972  * @current_time: the current system time
1973  * @running_time: the running time of @data
1974  *
1975  * Send the RTP buffer in the session manager. This function takes ownership of
1976  * @buffer.
1977  *
1978  * Returns: a #GstFlowReturn.
1979  */
1980 GstFlowReturn
1981 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
1982     GstClockTime current_time, GstClockTime running_time)
1983 {
1984   GstFlowReturn result;
1985   RTPSource *source;
1986   gboolean prevsender;
1987   gboolean valid_packet;
1988
1989   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1990   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
1991
1992   if (is_list) {
1993     valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data));
1994   } else {
1995     valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data));
1996   }
1997
1998   if (!valid_packet)
1999     goto invalid_packet;
2000
2001   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2002
2003   RTP_SESSION_LOCK (sess);
2004   source = sess->source;
2005
2006   /* update last activity */
2007   source->last_rtp_activity = current_time;
2008
2009   prevsender = RTP_SOURCE_IS_SENDER (source);
2010
2011   /* we use our own source to send */
2012   result = rtp_source_send_rtp (source, data, is_list, running_time);
2013
2014   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
2015     sess->stats.sender_sources++;
2016   RTP_SESSION_UNLOCK (sess);
2017
2018   return result;
2019
2020   /* ERRORS */
2021 invalid_packet:
2022   {
2023     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2024     GST_DEBUG ("invalid RTP packet received");
2025     return GST_FLOW_OK;
2026   }
2027 }
2028
2029 static GstClockTime
2030 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2031     gboolean first)
2032 {
2033   GstClockTime result;
2034
2035   if (sess->source->received_bye) {
2036     result = rtp_stats_calculate_bye_interval (&sess->stats);
2037   } else {
2038     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
2039         RTP_SOURCE_IS_SENDER (sess->source), first);
2040   }
2041
2042   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2043       GST_TIME_ARGS (result), first);
2044
2045   if (!deterministic)
2046     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
2047
2048   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2049
2050   return result;
2051 }
2052
2053 /* Stop the current @sess and schedule a BYE message for the other members.
2054  * One must have the session lock to call this function
2055  */
2056 static GstFlowReturn
2057 rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason,
2058     GstClockTime current_time)
2059 {
2060   GstFlowReturn result = GST_FLOW_OK;
2061   RTPSource *source;
2062   GstClockTime interval;
2063
2064   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2065
2066   source = sess->source;
2067
2068   /* ignore more BYEs */
2069   if (source->received_bye)
2070     goto done;
2071
2072   /* we have BYE now */
2073   source->received_bye = TRUE;
2074   /* at least one member wants to send a BYE */
2075   g_free (sess->bye_reason);
2076   sess->bye_reason = g_strdup (reason);
2077   sess->stats.avg_rtcp_packet_size = 100;
2078   sess->stats.bye_members = 1;
2079   sess->first_rtcp = TRUE;
2080   sess->sent_bye = FALSE;
2081
2082   /* reschedule transmission */
2083   sess->last_rtcp_send_time = current_time;
2084   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2085   sess->next_rtcp_check_time = current_time + interval;
2086
2087   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2088       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2089
2090   RTP_SESSION_UNLOCK (sess);
2091   /* notify app of reconsideration */
2092   if (sess->callbacks.reconsider)
2093     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2094   RTP_SESSION_LOCK (sess);
2095 done:
2096
2097   return result;
2098 }
2099
2100 /**
2101  * rtp_session_schedule_bye:
2102  * @sess: an #RTPSession
2103  * @reason: a reason or NULL
2104  * @current_time: the current system time
2105  *
2106  * Stop the current @sess and schedule a BYE message for the other members.
2107  *
2108  * Returns: a #GstFlowReturn.
2109  */
2110 GstFlowReturn
2111 rtp_session_schedule_bye (RTPSession * sess, const gchar * reason,
2112     GstClockTime current_time)
2113 {
2114   GstFlowReturn result = GST_FLOW_OK;
2115
2116   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2117
2118   RTP_SESSION_LOCK (sess);
2119   result = rtp_session_schedule_bye_locked (sess, reason, current_time);
2120   RTP_SESSION_UNLOCK (sess);
2121
2122   return result;
2123 }
2124
2125 /**
2126  * rtp_session_next_timeout:
2127  * @sess: an #RTPSession
2128  * @current_time: the current system time
2129  *
2130  * Get the next time we should perform session maintenance tasks.
2131  *
2132  * Returns: a time when rtp_session_on_timeout() should be called with the
2133  * current system time.
2134  */
2135 GstClockTime
2136 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2137 {
2138   GstClockTime result, interval = 0;
2139
2140   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2141
2142   RTP_SESSION_LOCK (sess);
2143
2144   result = sess->next_rtcp_check_time;
2145
2146   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
2147       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2148
2149   if (result < current_time) {
2150     GST_DEBUG ("take current time as base");
2151     /* our previous check time expired, start counting from the current time
2152      * again. */
2153     result = current_time;
2154   }
2155
2156   if (sess->source->received_bye) {
2157     if (sess->sent_bye) {
2158       GST_DEBUG ("we sent BYE already");
2159       interval = GST_CLOCK_TIME_NONE;
2160     } else if (sess->stats.active_sources >= 50) {
2161       GST_DEBUG ("reconsider BYE, more than 50 sources");
2162       /* reconsider BYE if members >= 50 */
2163       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2164     }
2165   } else {
2166     if (sess->first_rtcp) {
2167       GST_DEBUG ("first RTCP packet");
2168       /* we are called for the first time */
2169       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2170     } else if (sess->next_rtcp_check_time < current_time) {
2171       GST_DEBUG ("old check time expired, getting new timeout");
2172       /* get a new timeout when we need to */
2173       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2174     }
2175   }
2176
2177   if (interval != GST_CLOCK_TIME_NONE)
2178     result += interval;
2179   else
2180     result = GST_CLOCK_TIME_NONE;
2181
2182   sess->next_rtcp_check_time = result;
2183
2184   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2185   RTP_SESSION_UNLOCK (sess);
2186
2187   return result;
2188 }
2189
2190 typedef struct
2191 {
2192   RTPSession *sess;
2193   GstBuffer *rtcp;
2194   GstClockTime current_time;
2195   guint64 ntpnstime;
2196   GstClockTime running_time;
2197   GstClockTime interval;
2198   GstRTCPPacket packet;
2199   gboolean is_bye;
2200   gboolean has_sdes;
2201 } ReportData;
2202
2203 static void
2204 session_start_rtcp (RTPSession * sess, ReportData * data)
2205 {
2206   GstRTCPPacket *packet = &data->packet;
2207   RTPSource *own = sess->source;
2208
2209   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2210
2211   if (RTP_SOURCE_IS_SENDER (own)) {
2212     guint64 ntptime;
2213     guint32 rtptime;
2214     guint32 packet_count, octet_count;
2215
2216     /* we are a sender, create SR */
2217     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2218     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
2219
2220     /* get latest stats */
2221     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
2222         &ntptime, &rtptime, &packet_count, &octet_count);
2223     /* store stats */
2224     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2225         packet_count, octet_count);
2226
2227     /* fill in sender report info */
2228     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2229         ntptime, rtptime, packet_count, octet_count);
2230   } else {
2231     /* we are only receiver, create RR */
2232     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2233     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
2234     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2235   }
2236 }
2237
2238 /* construct a Sender or Receiver Report */
2239 static void
2240 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2241 {
2242   RTPSession *sess = data->sess;
2243   GstRTCPPacket *packet = &data->packet;
2244
2245   /* create a new buffer if needed */
2246   if (data->rtcp == NULL) {
2247     session_start_rtcp (sess, data);
2248   }
2249   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
2250     /* only report about other sender sources */
2251     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
2252       guint8 fractionlost;
2253       gint32 packetslost;
2254       guint32 exthighestseq, jitter;
2255       guint32 lsr, dlsr;
2256
2257       /* get new stats */
2258       rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2259           &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2260
2261       /* packet is not yet filled, add report block for this source. */
2262       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2263           exthighestseq, jitter, lsr, dlsr);
2264     }
2265   }
2266 }
2267
2268 /* perform cleanup of sources that timed out */
2269 static gboolean
2270 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2271 {
2272   gboolean remove = FALSE;
2273   gboolean byetimeout = FALSE;
2274   gboolean sendertimeout = FALSE;
2275   gboolean is_sender, is_active;
2276   RTPSession *sess = data->sess;
2277   GstClockTime interval;
2278
2279   is_sender = RTP_SOURCE_IS_SENDER (source);
2280   is_active = RTP_SOURCE_IS_ACTIVE (source);
2281
2282   /* check for our own source, we don't want to delete our own source. */
2283   if (!(source == sess->source)) {
2284     if (source->received_bye) {
2285       /* if we received a BYE from the source, remove the source after some
2286        * time. */
2287       if (data->current_time > source->bye_time &&
2288           data->current_time - source->bye_time > sess->stats.bye_timeout) {
2289         GST_DEBUG ("removing BYE source %08x", source->ssrc);
2290         remove = TRUE;
2291         byetimeout = TRUE;
2292       }
2293     }
2294     /* sources that were inactive for more than 5 times the deterministic reporting
2295      * interval get timed out. the min timeout is 5 seconds. */
2296     if (data->current_time > source->last_activity) {
2297       interval = MAX (data->interval * 5, 5 * GST_SECOND);
2298       if (data->current_time - source->last_activity > interval) {
2299         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2300             source->ssrc, GST_TIME_ARGS (source->last_activity));
2301         remove = TRUE;
2302       }
2303     }
2304   }
2305
2306   /* senders that did not send for a long time become a receiver, this also
2307    * holds for our own source. */
2308   if (is_sender) {
2309     if (data->current_time > source->last_rtp_activity) {
2310       interval = MAX (data->interval * 2, 5 * GST_SECOND);
2311       if (data->current_time - source->last_rtp_activity > interval) {
2312         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
2313             GST_TIME_FORMAT, source->ssrc,
2314             GST_TIME_ARGS (source->last_rtp_activity));
2315         source->is_sender = FALSE;
2316         sess->stats.sender_sources--;
2317         sendertimeout = TRUE;
2318       }
2319     }
2320   }
2321
2322   if (remove) {
2323     sess->total_sources--;
2324     if (is_sender)
2325       sess->stats.sender_sources--;
2326     if (is_active)
2327       sess->stats.active_sources--;
2328
2329     if (byetimeout)
2330       on_bye_timeout (sess, source);
2331     else
2332       on_timeout (sess, source);
2333   } else {
2334     if (sendertimeout)
2335       on_sender_timeout (sess, source);
2336   }
2337   return remove;
2338 }
2339
2340 static void
2341 session_sdes (RTPSession * sess, ReportData * data)
2342 {
2343   GstRTCPPacket *packet = &data->packet;
2344   const GstStructure *sdes;
2345   gint i, n_fields;
2346
2347   /* add SDES packet */
2348   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
2349
2350   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
2351
2352   sdes = rtp_source_get_sdes_struct (sess->source);
2353
2354   /* add all fields in the structure, the order is not important. */
2355   n_fields = gst_structure_n_fields (sdes);
2356   for (i = 0; i < n_fields; ++i) {
2357     const gchar *field;
2358     const gchar *value;
2359     GstRTCPSDESType type;
2360
2361     field = gst_structure_nth_field_name (sdes, i);
2362     if (field == NULL)
2363       continue;
2364     value = gst_structure_get_string (sdes, field);
2365     if (value == NULL)
2366       continue;
2367     type = gst_rtcp_sdes_name_to_type (field);
2368
2369     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
2370       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
2371           (const guint8 *) value);
2372     } else if (type == GST_RTCP_SDES_PRIV) {
2373       gsize prefix_len;
2374       gsize value_len;
2375       gsize data_len;
2376       guint8 data[256];
2377
2378       /* don't accept entries that are too big */
2379       prefix_len = strlen (field);
2380       if (prefix_len > 255)
2381         continue;
2382       value_len = strlen (value);
2383       if (value_len > 255)
2384         continue;
2385       data_len = 1 + prefix_len + value_len;
2386       if (data_len > 255)
2387         continue;
2388
2389       data[0] = prefix_len;
2390       memcpy (&data[1], field, prefix_len);
2391       memcpy (&data[1 + prefix_len], value, value_len);
2392
2393       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
2394     }
2395   }
2396
2397   data->has_sdes = TRUE;
2398 }
2399
2400 /* schedule a BYE packet */
2401 static void
2402 session_bye (RTPSession * sess, ReportData * data)
2403 {
2404   GstRTCPPacket *packet = &data->packet;
2405
2406   /* open packet */
2407   session_start_rtcp (sess, data);
2408
2409   /* add SDES */
2410   session_sdes (sess, data);
2411
2412   /* add a BYE packet */
2413   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
2414   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
2415   if (sess->bye_reason)
2416     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
2417
2418   /* we have a BYE packet now */
2419   data->is_bye = TRUE;
2420 }
2421
2422 static gboolean
2423 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
2424 {
2425   GstClockTime new_send_time, elapsed;
2426   gboolean result;
2427
2428   /* no need to check yet */
2429   if (sess->next_rtcp_check_time > current_time) {
2430     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
2431         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
2432         GST_TIME_ARGS (current_time));
2433     return FALSE;
2434   }
2435
2436   /* get elapsed time since we last reported */
2437   elapsed = current_time - sess->last_rtcp_send_time;
2438
2439   /* perform forward reconsideration */
2440   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
2441
2442   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
2443       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
2444
2445   new_send_time += sess->last_rtcp_send_time;
2446
2447   /* check if reconsideration */
2448   if (current_time < new_send_time) {
2449     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
2450         GST_TIME_ARGS (new_send_time));
2451     result = FALSE;
2452     /* store new check time */
2453     sess->next_rtcp_check_time = new_send_time;
2454   } else {
2455     result = TRUE;
2456     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
2457
2458     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
2459         GST_TIME_ARGS (new_send_time));
2460     sess->next_rtcp_check_time = current_time + new_send_time;
2461   }
2462   return result;
2463 }
2464
2465 /**
2466  * rtp_session_on_timeout:
2467  * @sess: an #RTPSession
2468  * @current_time: the current system time
2469  * @ntpnstime: the current NTP time in nanoseconds
2470  * @running_time: the current running_time of the pipeline
2471  *
2472  * Perform maintenance actions after the timeout obtained with
2473  * rtp_session_next_timeout() expired.
2474  *
2475  * This function will perform timeouts of receivers and senders, send a BYE
2476  * packet or generate RTCP packets with current session stats.
2477  *
2478  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
2479  * times, for each packet that should be processed.
2480  *
2481  * Returns: a #GstFlowReturn.
2482  */
2483 GstFlowReturn
2484 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
2485     guint64 ntpnstime, GstClockTime running_time)
2486 {
2487   GstFlowReturn result = GST_FLOW_OK;
2488   ReportData data;
2489   RTPSource *own;
2490   gboolean notify = FALSE;
2491
2492   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2493
2494   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
2495       GST_TIME_ARGS (current_time), GST_TIME_ARGS (ntpnstime));
2496
2497   data.sess = sess;
2498   data.rtcp = NULL;
2499   data.current_time = current_time;
2500   data.ntpnstime = ntpnstime;
2501   data.is_bye = FALSE;
2502   data.has_sdes = FALSE;
2503   data.running_time = running_time;
2504
2505   own = sess->source;
2506
2507   RTP_SESSION_LOCK (sess);
2508   /* get a new interval, we need this for various cleanups etc */
2509   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
2510
2511   /* first perform cleanups */
2512   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
2513       (GHRFunc) session_cleanup, &data);
2514
2515   /* see if we need to generate SR or RR packets */
2516   if (is_rtcp_time (sess, current_time, &data)) {
2517     if (own->received_bye) {
2518       /* generate BYE instead */
2519       GST_DEBUG ("generating BYE message");
2520       session_bye (sess, &data);
2521       sess->sent_bye = TRUE;
2522     } else {
2523       /* loop over all known sources and do something */
2524       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2525           (GHFunc) session_report_blocks, &data);
2526     }
2527   }
2528
2529   if (data.rtcp) {
2530     guint size;
2531
2532     /* we keep track of the last report time in order to timeout inactive
2533      * receivers or senders */
2534     sess->last_rtcp_send_time = data.current_time;
2535     sess->first_rtcp = FALSE;
2536
2537     /* add SDES for this source when not already added */
2538     if (!data.has_sdes)
2539       session_sdes (sess, &data);
2540
2541     /* update average RTCP size before sending */
2542     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
2543     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
2544   }
2545
2546   /* check for outdated collisions */
2547   GST_DEBUG ("Timing out collisions");
2548   rtp_source_timeout (sess->source, current_time,
2549       data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT);
2550
2551   if (sess->change_ssrc) {
2552     GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
2553     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
2554         GINT_TO_POINTER (own->ssrc));
2555
2556     own->ssrc = rtp_session_create_new_ssrc (sess);
2557     rtp_source_reset (own);
2558
2559     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
2560         GINT_TO_POINTER (own->ssrc), own);
2561
2562     g_free (sess->bye_reason);
2563     sess->bye_reason = NULL;
2564     sess->sent_bye = FALSE;
2565     sess->change_ssrc = FALSE;
2566     notify = TRUE;
2567     GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
2568   }
2569   RTP_SESSION_UNLOCK (sess);
2570
2571   if (notify)
2572     g_object_notify (G_OBJECT (sess), "internal-ssrc");
2573
2574   /* push out the RTCP packet */
2575   if (data.rtcp) {
2576     /* close the RTCP packet */
2577     gst_rtcp_buffer_end (data.rtcp);
2578
2579     GST_DEBUG ("sending packet");
2580     if (sess->callbacks.send_rtcp)
2581       result = sess->callbacks.send_rtcp (sess, own, data.rtcp,
2582           sess->sent_bye, sess->send_rtcp_user_data);
2583     else {
2584       GST_DEBUG ("freeing packet");
2585       gst_buffer_unref (data.rtcp);
2586     }
2587   }
2588
2589   return result;
2590 }