gst/rtpmanager/gstrtpsession.c: Remove debug.
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "rtpsession.h"
27
28 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
29 #define GST_CAT_DEFAULT rtp_session_debug
30
31 /* signals and args */
32 enum
33 {
34   SIGNAL_ON_NEW_SSRC,
35   SIGNAL_ON_SSRC_COLLISION,
36   SIGNAL_ON_SSRC_VALIDATED,
37   SIGNAL_ON_BYE_SSRC,
38   SIGNAL_ON_BYE_TIMEOUT,
39   SIGNAL_ON_TIMEOUT,
40   LAST_SIGNAL
41 };
42
43 #define RTP_DEFAULT_BANDWIDTH        64000.0
44 #define RTP_DEFAULT_RTCP_BANDWIDTH   1000
45
46 enum
47 {
48   PROP_0
49 };
50
51 /* update average packet size, we keep this scaled by 16 to keep enough
52  * precision. */
53 #define UPDATE_AVG(avg, val)            \
54   if ((avg) == 0)                       \
55    (avg) = (val) << 4;                  \
56   else                                  \
57    (avg) = ((val) + (15 * (avg))) >> 4;
58
59 /* GObject vmethods */
60 static void rtp_session_finalize (GObject * object);
61 static void rtp_session_set_property (GObject * object, guint prop_id,
62     const GValue * value, GParamSpec * pspec);
63 static void rtp_session_get_property (GObject * object, guint prop_id,
64     GValue * value, GParamSpec * pspec);
65
66 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
67
68 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
69
70 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
71     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
72
73 static void
74 rtp_session_class_init (RTPSessionClass * klass)
75 {
76   GObjectClass *gobject_class;
77
78   gobject_class = (GObjectClass *) klass;
79
80   gobject_class->finalize = rtp_session_finalize;
81   gobject_class->set_property = rtp_session_set_property;
82   gobject_class->get_property = rtp_session_get_property;
83
84   /**
85    * RTPSession::on-new-ssrc:
86    * @session: the object which received the signal
87    * @src: the new RTPSource
88    *
89    * Notify of a new SSRC that entered @session.
90    */
91   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
92       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
93       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
94       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
95       G_TYPE_OBJECT);
96   /**
97    * RTPSession::on-ssrc_collision:
98    * @session: the object which received the signal
99    * @src: the #RTPSource that caused a collision
100    *
101    * Notify when we have an SSRC collision
102    */
103   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
104       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
105       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
106       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
107       G_TYPE_OBJECT);
108   /**
109    * RTPSession::on-ssrc_validated:
110    * @session: the object which received the signal
111    * @src: the new validated RTPSource
112    *
113    * Notify of a new SSRC that became validated.
114    */
115   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
116       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
117       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
118       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
119       G_TYPE_OBJECT);
120   /**
121    * RTPSession::on-bye-ssrc:
122    * @session: the object which received the signal
123    * @src: the RTPSource that went away
124    *
125    * Notify of an SSRC that became inactive because of a BYE packet.
126    */
127   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
128       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
129       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
130       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
131       G_TYPE_OBJECT);
132   /**
133    * RTPSession::on-bye-timeout:
134    * @session: the object which received the signal
135    * @src: the RTPSource that timed out
136    *
137    * Notify of an SSRC that has timed out because of BYE
138    */
139   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
140       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
141       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
142       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
143       G_TYPE_OBJECT);
144   /**
145    * RTPSession::on-timeout:
146    * @session: the object which received the signal
147    * @src: the RTPSource that timed out
148    *
149    * Notify of an SSRC that has timed out
150    */
151   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
152       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
153       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
154       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
155       G_TYPE_OBJECT);
156
157   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
158 }
159
160 static void
161 rtp_session_init (RTPSession * sess)
162 {
163   gint i;
164
165   sess->lock = g_mutex_new ();
166   sess->key = g_random_int ();
167   sess->mask_idx = 0;
168   sess->mask = 0;
169
170   for (i = 0; i < 32; i++) {
171     sess->ssrcs[i] =
172         g_hash_table_new_full (NULL, NULL, NULL,
173         (GDestroyNotify) g_object_unref);
174   }
175   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
176
177   rtp_stats_init_defaults (&sess->stats);
178
179   /* create an active SSRC for this session manager */
180   sess->source = rtp_session_create_source (sess);
181   sess->source->validated = TRUE;
182   sess->stats.active_sources++;
183
184   /* default UDP header length */
185   sess->header_len = 28;
186   sess->mtu = 1400;
187
188   /* some default SDES entries */
189   //sess->cname = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
190   sess->cname = g_strdup_printf ("foo@%s", g_get_host_name ());
191   sess->name = g_strdup (g_get_real_name ());
192   sess->tool = g_strdup ("GStreamer");
193
194   sess->first_rtcp = TRUE;
195
196   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
197 }
198
199 static void
200 rtp_session_finalize (GObject * object)
201 {
202   RTPSession *sess;
203   gint i;
204
205   sess = RTP_SESSION_CAST (object);
206
207   g_mutex_free (sess->lock);
208   for (i = 0; i < 32; i++)
209     g_hash_table_destroy (sess->ssrcs[i]);
210
211   g_hash_table_destroy (sess->cnames);
212   g_object_unref (sess->source);
213
214   g_free (sess->cname);
215   g_free (sess->tool);
216   g_free (sess->bye_reason);
217
218   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
219 }
220
221 static void
222 rtp_session_set_property (GObject * object, guint prop_id,
223     const GValue * value, GParamSpec * pspec)
224 {
225   RTPSession *sess;
226
227   sess = RTP_SESSION (object);
228
229   switch (prop_id) {
230     default:
231       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
232       break;
233   }
234 }
235
236 static void
237 rtp_session_get_property (GObject * object, guint prop_id,
238     GValue * value, GParamSpec * pspec)
239 {
240   RTPSession *sess;
241
242   sess = RTP_SESSION (object);
243
244   switch (prop_id) {
245     default:
246       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
247       break;
248   }
249 }
250
251 static void
252 on_new_ssrc (RTPSession * sess, RTPSource * source)
253 {
254   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
255 }
256
257 static void
258 on_ssrc_collision (RTPSession * sess, RTPSource * source)
259 {
260   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
261       source);
262 }
263
264 static void
265 on_ssrc_validated (RTPSession * sess, RTPSource * source)
266 {
267   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
268       source);
269 }
270
271 static void
272 on_bye_ssrc (RTPSession * sess, RTPSource * source)
273 {
274   /* notify app that reconsideration should be performed */
275   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
276 }
277
278 static void
279 on_bye_timeout (RTPSession * sess, RTPSource * source)
280 {
281   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
282 }
283
284 static void
285 on_timeout (RTPSession * sess, RTPSource * source)
286 {
287   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
288 }
289
290 /**
291  * rtp_session_new:
292  *
293  * Create a new session object.
294  *
295  * Returns: a new #RTPSession. g_object_unref() after usage.
296  */
297 RTPSession *
298 rtp_session_new (void)
299 {
300   RTPSession *sess;
301
302   sess = g_object_new (RTP_TYPE_SESSION, NULL);
303
304   return sess;
305 }
306
307 /**
308  * rtp_session_set_callbacks:
309  * @sess: an #RTPSession
310  * @callbacks: callbacks to configure
311  * @user_data: user data passed in the callbacks
312  *
313  * Configure a set of callbacks to be notified of actions.
314  */
315 void
316 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
317     gpointer user_data)
318 {
319   g_return_if_fail (RTP_IS_SESSION (sess));
320
321   sess->callbacks.process_rtp = callbacks->process_rtp;
322   sess->callbacks.send_rtp = callbacks->send_rtp;
323   sess->callbacks.send_rtcp = callbacks->send_rtcp;
324   sess->callbacks.clock_rate = callbacks->clock_rate;
325   sess->callbacks.get_time = callbacks->get_time;
326   sess->callbacks.reconsider = callbacks->reconsider;
327   sess->user_data = user_data;
328 }
329
330 /**
331  * rtp_session_set_bandwidth:
332  * @sess: an #RTPSession
333  * @bandwidth: the bandwidth allocated
334  *
335  * Set the session bandwidth in bytes per second.
336  */
337 void
338 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
339 {
340   g_return_if_fail (RTP_IS_SESSION (sess));
341
342   sess->stats.bandwidth = bandwidth;
343 }
344
345 /**
346  * rtp_session_get_bandwidth:
347  * @sess: an #RTPSession
348  *
349  * Get the session bandwidth.
350  *
351  * Returns: the session bandwidth.
352  */
353 gdouble
354 rtp_session_get_bandwidth (RTPSession * sess)
355 {
356   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
357
358   return sess->stats.bandwidth;
359 }
360
361 /**
362  * rtp_session_set_rtcp_bandwidth:
363  * @sess: an #RTPSession
364  * @bandwidth: the RTCP bandwidth
365  *
366  * Set the bandwidth that should be used for RTCP
367  * messages. 
368  */
369 void
370 rtp_session_set_rtcp_bandwidth (RTPSession * sess, gdouble bandwidth)
371 {
372   g_return_if_fail (RTP_IS_SESSION (sess));
373
374   sess->stats.rtcp_bandwidth = bandwidth;
375 }
376
377 /**
378  * rtp_session_get_rtcp_bandwidth:
379  * @sess: an #RTPSession
380  *
381  * Get the session bandwidth used for RTCP.
382  *
383  * Returns: The bandwidth used for RTCP messages.
384  */
385 gdouble
386 rtp_session_get_rtcp_bandwidth (RTPSession * sess)
387 {
388   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
389
390   return sess->stats.rtcp_bandwidth;
391 }
392
393 /**
394  * rtp_session_set_cname:
395  * @sess: an #RTPSession
396  * @cname: a CNAME for the session
397  *
398  * Set the CNAME for the session. 
399  */
400 void
401 rtp_session_set_cname (RTPSession * sess, const gchar * cname)
402 {
403   g_return_if_fail (RTP_IS_SESSION (sess));
404
405   g_free (sess->cname);
406   sess->cname = g_strdup (cname);
407 }
408
409 /**
410  * rtp_session_get_cname:
411  * @sess: an #RTPSession
412  *
413  * Get the currently configured CNAME for the session.
414  *
415  * Returns: The CNAME. g_free after usage.
416  */
417 gchar *
418 rtp_session_get_cname (RTPSession * sess)
419 {
420   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
421
422   return g_strdup (sess->cname);
423 }
424
425 /**
426  * rtp_session_set_name:
427  * @sess: an #RTPSession
428  * @name: a NAME for the session
429  *
430  * Set the NAME for the session. 
431  */
432 void
433 rtp_session_set_name (RTPSession * sess, const gchar * name)
434 {
435   g_return_if_fail (RTP_IS_SESSION (sess));
436
437   g_free (sess->name);
438   sess->name = g_strdup (name);
439 }
440
441 /**
442  * rtp_session_get_name:
443  * @sess: an #RTPSession
444  *
445  * Get the currently configured NAME for the session.
446  *
447  * Returns: The NAME. g_free after usage.
448  */
449 gchar *
450 rtp_session_get_name (RTPSession * sess)
451 {
452   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
453
454   return g_strdup (sess->name);
455 }
456
457 /**
458  * rtp_session_set_email:
459  * @sess: an #RTPSession
460  * @email: an EMAIL for the session
461  *
462  * Set the EMAIL the session. 
463  */
464 void
465 rtp_session_set_email (RTPSession * sess, const gchar * email)
466 {
467   g_return_if_fail (RTP_IS_SESSION (sess));
468
469   g_free (sess->email);
470   sess->email = g_strdup (email);
471 }
472
473 /**
474  * rtp_session_get_email:
475  * @sess: an #RTPSession
476  *
477  * Get the currently configured EMAIL of the session.
478  *
479  * Returns: The EMAIL. g_free after usage.
480  */
481 gchar *
482 rtp_session_get_email (RTPSession * sess)
483 {
484   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
485
486   return g_strdup (sess->email);
487 }
488
489 /**
490  * rtp_session_set_phone:
491  * @sess: an #RTPSession
492  * @phone: a PHONE for the session
493  *
494  * Set the PHONE the session. 
495  */
496 void
497 rtp_session_set_phone (RTPSession * sess, const gchar * phone)
498 {
499   g_return_if_fail (RTP_IS_SESSION (sess));
500
501   g_free (sess->phone);
502   sess->phone = g_strdup (phone);
503 }
504
505 /**
506  * rtp_session_get_location:
507  * @sess: an #RTPSession
508  *
509  * Get the currently configured PHONE of the session.
510  *
511  * Returns: The PHONE. g_free after usage.
512  */
513 gchar *
514 rtp_session_get_phone (RTPSession * sess)
515 {
516   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
517
518   return g_strdup (sess->phone);
519 }
520
521 /**
522  * rtp_session_set_location:
523  * @sess: an #RTPSession
524  * @location: a LOCATION for the session
525  *
526  * Set the LOCATION the session. 
527  */
528 void
529 rtp_session_set_location (RTPSession * sess, const gchar * location)
530 {
531   g_return_if_fail (RTP_IS_SESSION (sess));
532
533   g_free (sess->location);
534   sess->location = g_strdup (location);
535 }
536
537 /**
538  * rtp_session_get_location:
539  * @sess: an #RTPSession
540  *
541  * Get the currently configured LOCATION of the session.
542  *
543  * Returns: The LOCATION. g_free after usage.
544  */
545 gchar *
546 rtp_session_get_location (RTPSession * sess)
547 {
548   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
549
550   return g_strdup (sess->location);
551 }
552
553 /**
554  * rtp_session_set_tool:
555  * @sess: an #RTPSession
556  * @tool: a TOOL for the session
557  *
558  * Set the TOOL the session. 
559  */
560 void
561 rtp_session_set_tool (RTPSession * sess, const gchar * tool)
562 {
563   g_return_if_fail (RTP_IS_SESSION (sess));
564
565   g_free (sess->tool);
566   sess->tool = g_strdup (tool);
567 }
568
569 /**
570  * rtp_session_get_tool:
571  * @sess: an #RTPSession
572  *
573  * Get the currently configured TOOL of the session.
574  *
575  * Returns: The TOOL. g_free after usage.
576  */
577 gchar *
578 rtp_session_get_tool (RTPSession * sess)
579 {
580   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
581
582   return g_strdup (sess->tool);
583 }
584
585 /**
586  * rtp_session_set_note:
587  * @sess: an #RTPSession
588  * @note: a NOTE for the session
589  *
590  * Set the NOTE the session. 
591  */
592 void
593 rtp_session_set_note (RTPSession * sess, const gchar * note)
594 {
595   g_return_if_fail (RTP_IS_SESSION (sess));
596
597   g_free (sess->note);
598   sess->note = g_strdup (note);
599 }
600
601 /**
602  * rtp_session_get_note:
603  * @sess: an #RTPSession
604  *
605  * Get the currently configured NOTE of the session.
606  *
607  * Returns: The NOTE. g_free after usage.
608  */
609 gchar *
610 rtp_session_get_note (RTPSession * sess)
611 {
612   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
613
614   return g_strdup (sess->note);
615 }
616
617 static GstFlowReturn
618 source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
619 {
620   GstFlowReturn result = GST_FLOW_OK;
621
622   if (source == session->source) {
623     GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
624
625
626     if (session->callbacks.send_rtp)
627       result =
628           session->callbacks.send_rtp (session, source, buffer,
629           session->user_data);
630     else
631       gst_buffer_unref (buffer);
632   } else {
633     GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
634     if (session->callbacks.process_rtp)
635       result =
636           session->callbacks.process_rtp (session, source, buffer,
637           session->user_data);
638     else
639       gst_buffer_unref (buffer);
640   }
641   return result;
642 }
643
644 static gint
645 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
646 {
647   gint result;
648
649   if (session->callbacks.clock_rate)
650     result = session->callbacks.clock_rate (session, pt, session->user_data);
651   else
652     result = -1;
653
654   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
655
656   return result;
657 }
658
659 static RTPSourceCallbacks callbacks = {
660   (RTPSourcePushRTP) source_push_rtp,
661   (RTPSourceClockRate) source_clock_rate,
662 };
663
664 static gboolean
665 check_collision (RTPSession * sess, RTPSource * source,
666     RTPArrivalStats * arrival)
667 {
668   /* FIXME, do collision check */
669   return FALSE;
670 }
671
672 static RTPSource *
673 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
674     RTPArrivalStats * arrival, gboolean rtp)
675 {
676   RTPSource *source;
677
678   source =
679       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
680   if (source == NULL) {
681     /* make new Source in probation and insert */
682     source = rtp_source_new (ssrc);
683
684     if (rtp)
685       source->probation = RTP_DEFAULT_PROBATION;
686     else
687       source->probation = 0;
688
689     /* store from address, if any */
690     if (arrival->have_address) {
691       if (rtp)
692         rtp_source_set_rtp_from (source, &arrival->address);
693       else
694         rtp_source_set_rtcp_from (source, &arrival->address);
695     }
696
697     /* configure a callback on the source */
698     rtp_source_set_callbacks (source, &callbacks, sess);
699
700     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
701         source);
702
703     /* we have one more source now */
704     sess->total_sources++;
705     *created = TRUE;
706   } else {
707     *created = FALSE;
708     /* check for collision, this updates the address when not previously set */
709     if (check_collision (sess, source, arrival))
710       on_ssrc_collision (sess, source);
711   }
712   /* update last activity */
713   source->last_activity = arrival->time;
714   if (rtp)
715     source->last_rtp_activity = arrival->time;
716
717   return source;
718 }
719
720 /**
721  * rtp_session_add_source:
722  * @sess: a #RTPSession
723  * @src: #RTPSource to add
724  *
725  * Add @src to @session.
726  *
727  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
728  * existed in the session.
729  */
730 gboolean
731 rtp_session_add_source (RTPSession * sess, RTPSource * src)
732 {
733   gboolean result = FALSE;
734   RTPSource *find;
735
736   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
737   g_return_val_if_fail (src != NULL, FALSE);
738
739   RTP_SESSION_LOCK (sess);
740   find =
741       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
742       GINT_TO_POINTER (src->ssrc));
743   if (find == NULL) {
744     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
745         GINT_TO_POINTER (src->ssrc), src);
746     /* we have one more source now */
747     sess->total_sources++;
748     result = TRUE;
749   }
750   RTP_SESSION_UNLOCK (sess);
751
752   return result;
753 }
754
755 /**
756  * rtp_session_get_num_sources:
757  * @sess: an #RTPSession
758  *
759  * Get the number of sources in @sess.
760  *
761  * Returns: The number of sources in @sess.
762  */
763 guint
764 rtp_session_get_num_sources (RTPSession * sess)
765 {
766   guint result;
767
768   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
769
770   RTP_SESSION_LOCK (sess);
771   result = sess->total_sources;
772   RTP_SESSION_UNLOCK (sess);
773
774   return result;
775 }
776
777 /**
778  * rtp_session_get_num_active_sources:
779  * @sess: an #RTPSession
780  *
781  * Get the number of active sources in @sess. A source is considered active when
782  * it has been validated and has not yet received a BYE RTCP message.
783  *
784  * Returns: The number of active sources in @sess.
785  */
786 guint
787 rtp_session_get_num_active_sources (RTPSession * sess)
788 {
789   guint result;
790
791   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
792
793   RTP_SESSION_LOCK (sess);
794   result = sess->stats.active_sources;
795   RTP_SESSION_UNLOCK (sess);
796
797   return result;
798 }
799
800 /**
801  * rtp_session_get_source_by_ssrc:
802  * @sess: an #RTPSession
803  * @ssrc: an SSRC
804  *
805  * Find the source with @ssrc in @sess.
806  *
807  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
808  * g_object_unref() after usage.
809  */
810 RTPSource *
811 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
812 {
813   RTPSource *result;
814
815   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
816
817   RTP_SESSION_LOCK (sess);
818   result =
819       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
820   if (result)
821     g_object_ref (result);
822   RTP_SESSION_UNLOCK (sess);
823
824   return result;
825 }
826
827 /**
828  * rtp_session_get_source_by_cname:
829  * @sess: a #RTPSession
830  * @cname: an CNAME
831  *
832  * Find the source with @cname in @sess.
833  *
834  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
835  * g_object_unref() after usage.
836  */
837 RTPSource *
838 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
839 {
840   RTPSource *result;
841
842   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
843   g_return_val_if_fail (cname != NULL, NULL);
844
845   RTP_SESSION_LOCK (sess);
846   result = g_hash_table_lookup (sess->cnames, cname);
847   if (result)
848     g_object_ref (result);
849   RTP_SESSION_UNLOCK (sess);
850
851   return result;
852 }
853
854 /**
855  * rtp_session_create_source:
856  * @sess: an #RTPSession
857  *
858  * Create an #RTPSource for use in @sess. This function will create a source
859  * with an ssrc that is currently not used by any participants in the session.
860  *
861  * Returns: an #RTPSource.
862  */
863 RTPSource *
864 rtp_session_create_source (RTPSession * sess)
865 {
866   guint32 ssrc;
867   RTPSource *source;
868
869   RTP_SESSION_LOCK (sess);
870   while (TRUE) {
871     ssrc = g_random_int ();
872
873     /* see if it exists in the session, we're done if it doesn't */
874     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
875             GINT_TO_POINTER (ssrc)) == NULL)
876       break;
877   }
878   source = rtp_source_new (ssrc);
879   g_object_ref (source);
880   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
881       source);
882   /* we have one more source now */
883   sess->total_sources++;
884   RTP_SESSION_UNLOCK (sess);
885
886   return source;
887 }
888
889 /* update the RTPArrivalStats structure with the current time and other bits
890  * about the current buffer we are handling.
891  * This function is typically called when a validated packet is received.
892  * This function should be called with the SESSION_LOCK
893  */
894 static void
895 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
896     gboolean rtp, GstBuffer * buffer)
897 {
898   /* get time or arrival */
899   if (sess->callbacks.get_time)
900     arrival->time = sess->callbacks.get_time (sess, sess->user_data);
901   else
902     arrival->time = GST_CLOCK_TIME_NONE;
903
904   /* get packet size including header overhead */
905   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
906
907   if (rtp) {
908     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
909   } else {
910     arrival->payload_len = 0;
911   }
912
913   /* for netbuffer we can store the IP address to check for collisions */
914   arrival->have_address = GST_IS_NETBUFFER (buffer);
915   if (arrival->have_address) {
916     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
917
918     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
919   }
920 }
921
922 /**
923  * rtp_session_process_rtp:
924  * @sess: and #RTPSession
925  * @buffer: an RTP buffer
926  *
927  * Process an RTP buffer in the session manager. This function takes ownership
928  * of @buffer.
929  *
930  * Returns: a #GstFlowReturn.
931  */
932 GstFlowReturn
933 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
934 {
935   GstFlowReturn result;
936   guint32 ssrc;
937   RTPSource *source;
938   gboolean created;
939   gboolean prevsender, prevactive;
940   RTPArrivalStats arrival;
941
942   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
943   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
944
945   if (!gst_rtp_buffer_validate (buffer))
946     goto invalid_packet;
947
948   RTP_SESSION_LOCK (sess);
949   /* update arrival stats */
950   update_arrival_stats (sess, &arrival, TRUE, buffer);
951
952   /* ignore more RTP packets when we left the session */
953   if (sess->source->received_bye)
954     goto ignore;
955
956   /* get SSRC and look up in session database */
957   ssrc = gst_rtp_buffer_get_ssrc (buffer);
958   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
959
960   prevsender = RTP_SOURCE_IS_SENDER (source);
961   prevactive = RTP_SOURCE_IS_ACTIVE (source);
962
963   /* we need to ref so that we can process the CSRCs later */
964   gst_buffer_ref (buffer);
965
966   /* let source process the packet */
967   result = rtp_source_process_rtp (source, buffer, &arrival);
968
969   /* source became active */
970   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
971     sess->stats.active_sources++;
972     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
973         sess->stats.active_sources);
974     on_ssrc_validated (sess, source);
975   }
976   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
977     sess->stats.sender_sources++;
978     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
979         sess->stats.sender_sources);
980   }
981
982   if (created)
983     on_new_ssrc (sess, source);
984
985   if (source->validated) {
986     guint8 i, count;
987     gboolean created;
988
989     /* for validated sources, we add the CSRCs as well */
990     count = gst_rtp_buffer_get_csrc_count (buffer);
991
992     for (i = 0; i < count; i++) {
993       guint32 csrc;
994       RTPSource *csrc_src;
995
996       csrc = gst_rtp_buffer_get_csrc (buffer, i);
997
998       /* get source */
999       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1000
1001       if (created) {
1002         GST_DEBUG ("created new CSRC: %08x", csrc);
1003         rtp_source_set_as_csrc (csrc_src);
1004         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1005           sess->stats.active_sources++;
1006         on_new_ssrc (sess, source);
1007       }
1008     }
1009   }
1010   gst_buffer_unref (buffer);
1011
1012   RTP_SESSION_UNLOCK (sess);
1013
1014   return result;
1015
1016   /* ERRORS */
1017 invalid_packet:
1018   {
1019     gst_buffer_unref (buffer);
1020     GST_DEBUG ("invalid RTP packet received");
1021     return GST_FLOW_OK;
1022   }
1023 ignore:
1024   {
1025     gst_buffer_unref (buffer);
1026     RTP_SESSION_UNLOCK (sess);
1027     GST_DEBUG ("ignoring RTP packet because we are leaving");
1028     return GST_FLOW_OK;
1029   }
1030 }
1031
1032 /* A Sender report contains statistics about how the sender is doing. This
1033  * includes timing informataion about the relation between RTP and NTP
1034  * timestamps is it using and the number of packets/bytes it sent to us.
1035  *
1036  * In this report is also included a set of report blocks related to how this
1037  * sender is receiving data (in case we (or somebody else) is also sending stuff
1038  * to it). This info includes the packet loss, jitter and seqnum. It also
1039  * contains information to calculate the round trip time (LSR/DLSR).
1040  */
1041 static void
1042 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1043     RTPArrivalStats * arrival)
1044 {
1045   guint32 senderssrc, rtptime, packet_count, octet_count;
1046   guint64 ntptime;
1047   guint count, i;
1048   RTPSource *source;
1049   gboolean created, prevsender;
1050
1051   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1052       &packet_count, &octet_count);
1053
1054   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1055       senderssrc, GST_TIME_ARGS (arrival->time));
1056
1057   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1058
1059   prevsender = RTP_SOURCE_IS_SENDER (source);
1060
1061   /* first update the source */
1062   rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count,
1063       arrival->time);
1064
1065   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1066     sess->stats.sender_sources++;
1067     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1068         sess->stats.sender_sources);
1069   }
1070
1071   if (created)
1072     on_new_ssrc (sess, source);
1073
1074   count = gst_rtcp_packet_get_rb_count (packet);
1075   for (i = 0; i < count; i++) {
1076     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1077     guint8 fractionlost;
1078     gint32 packetslost;
1079
1080     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1081         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1082
1083     if (ssrc == sess->source->ssrc) {
1084       /* only deal with report blocks for our session, we update the stats of
1085        * the sender of the RTCP message. We could also compare our stats against
1086        * the other sender to see if we are better or worse. */
1087       rtp_source_process_rb (source, fractionlost, packetslost,
1088           exthighestseq, jitter, lsr, dlsr);
1089     }
1090   }
1091 }
1092
1093 /* A receiver report contains statistics about how a receiver is doing. It
1094  * includes stuff like packet loss, jitter and the seqnum it received last. It
1095  * also contains info to calculate the round trip time.
1096  *
1097  * We are only interested in how the sender of this report is doing wrt to us.
1098  */
1099 static void
1100 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1101     RTPArrivalStats * arrival)
1102 {
1103   guint32 senderssrc;
1104   guint count, i;
1105   RTPSource *source;
1106   gboolean created;
1107
1108   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1109
1110   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1111
1112   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1113
1114   if (created)
1115     on_new_ssrc (sess, source);
1116
1117   count = gst_rtcp_packet_get_rb_count (packet);
1118   for (i = 0; i < count; i++) {
1119     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1120     guint8 fractionlost;
1121     gint32 packetslost;
1122
1123     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1124         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1125
1126     if (ssrc == sess->source->ssrc) {
1127       rtp_source_process_rb (source, fractionlost, packetslost,
1128           exthighestseq, jitter, lsr, dlsr);
1129     }
1130   }
1131 }
1132
1133 /* FIXME, we're just printing this for now... */
1134 static void
1135 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1136     RTPArrivalStats * arrival)
1137 {
1138   guint items, i, j;
1139   gboolean more_items, more_entries;
1140
1141   items = gst_rtcp_packet_sdes_get_item_count (packet);
1142   GST_DEBUG ("got SDES packet with %d items", items);
1143
1144   more_items = gst_rtcp_packet_sdes_first_item (packet);
1145   i = 0;
1146   while (more_items) {
1147     guint32 ssrc;
1148
1149     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1150
1151     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1152
1153     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1154     j = 0;
1155     while (more_entries) {
1156       GstRTCPSDESType type;
1157       guint8 len;
1158       guint8 *data;
1159
1160       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1161
1162       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1163           data);
1164
1165       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1166       j++;
1167     }
1168     more_items = gst_rtcp_packet_sdes_next_item (packet);
1169     i++;
1170   }
1171 }
1172
1173 /* BYE is sent when a client leaves the session
1174  */
1175 static void
1176 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1177     RTPArrivalStats * arrival)
1178 {
1179   guint count, i;
1180   gchar *reason;
1181
1182   reason = gst_rtcp_packet_bye_get_reason (packet);
1183   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1184
1185   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1186   for (i = 0; i < count; i++) {
1187     guint32 ssrc;
1188     RTPSource *source;
1189     gboolean created, prevactive, prevsender;
1190     guint pmembers, members;
1191
1192     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1193     GST_DEBUG ("SSRC: %08x", ssrc);
1194
1195     /* find src and mark bye, no probation when dealing with RTCP */
1196     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1197
1198     /* store time for when we need to time out this source */
1199     source->bye_time = arrival->time;
1200
1201     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1202     prevsender = RTP_SOURCE_IS_SENDER (source);
1203
1204     /* let the source handle the rest */
1205     rtp_source_process_bye (source, reason);
1206
1207     pmembers = sess->stats.active_sources;
1208
1209     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1210       sess->stats.active_sources--;
1211       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1212           sess->stats.active_sources);
1213     }
1214     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1215       sess->stats.sender_sources--;
1216       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1217           sess->stats.sender_sources);
1218     }
1219     members = sess->stats.active_sources;
1220
1221     if (!sess->source->received_bye && members < pmembers) {
1222       /* some members went away since the previous timeout estimate. 
1223        * Perform reverse reconsideration but only when we are not scheduling a
1224        * BYE ourselves. */
1225       if (arrival->time < sess->next_rtcp_check_time) {
1226         GstClockTime time_remaining;
1227
1228         time_remaining = sess->next_rtcp_check_time - arrival->time;
1229         sess->next_rtcp_check_time =
1230             gst_util_uint64_scale (time_remaining, members, pmembers);
1231
1232         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1233             GST_TIME_ARGS (sess->next_rtcp_check_time));
1234
1235         sess->next_rtcp_check_time += arrival->time;
1236
1237         /* notify app of reconsideration */
1238         if (sess->callbacks.reconsider)
1239           sess->callbacks.reconsider (sess, sess->user_data);
1240       }
1241     }
1242
1243     if (created)
1244       on_new_ssrc (sess, source);
1245
1246     on_bye_ssrc (sess, source);
1247   }
1248   g_free (reason);
1249 }
1250
1251 static void
1252 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1253     RTPArrivalStats * arrival)
1254 {
1255   GST_DEBUG ("received APP");
1256 }
1257
1258 /**
1259  * rtp_session_process_rtcp:
1260  * @sess: and #RTPSession
1261  * @buffer: an RTCP buffer
1262  *
1263  * Process an RTCP buffer in the session manager.
1264  *
1265  * Returns: a #GstFlowReturn.
1266  */
1267 GstFlowReturn
1268 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
1269 {
1270   GstRTCPPacket packet;
1271   gboolean more, is_bye = FALSE;
1272   RTPArrivalStats arrival;
1273
1274   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1275   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1276
1277   if (!gst_rtcp_buffer_validate (buffer))
1278     goto invalid_packet;
1279
1280   GST_DEBUG ("received RTCP packet");
1281
1282   RTP_SESSION_LOCK (sess);
1283   /* update arrival stats */
1284   update_arrival_stats (sess, &arrival, FALSE, buffer);
1285
1286   if (sess->sent_bye)
1287     goto ignore;
1288
1289   /* start processing the compound packet */
1290   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1291   while (more) {
1292     GstRTCPType type;
1293
1294     type = gst_rtcp_packet_get_type (&packet);
1295
1296     /* when we are leaving the session, we should ignore all non-BYE messages */
1297     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1298       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1299       goto next;
1300     }
1301
1302     switch (type) {
1303       case GST_RTCP_TYPE_SR:
1304         rtp_session_process_sr (sess, &packet, &arrival);
1305         break;
1306       case GST_RTCP_TYPE_RR:
1307         rtp_session_process_rr (sess, &packet, &arrival);
1308         break;
1309       case GST_RTCP_TYPE_SDES:
1310         rtp_session_process_sdes (sess, &packet, &arrival);
1311         break;
1312       case GST_RTCP_TYPE_BYE:
1313         is_bye = TRUE;
1314         rtp_session_process_bye (sess, &packet, &arrival);
1315         break;
1316       case GST_RTCP_TYPE_APP:
1317         rtp_session_process_app (sess, &packet, &arrival);
1318         break;
1319       default:
1320         GST_WARNING ("got unknown RTCP packet");
1321         break;
1322     }
1323   next:
1324     more = gst_rtcp_packet_move_to_next (&packet);
1325   }
1326
1327   /* if we are scheduling a BYE, we only want to count bye packets, else we
1328    * count everything */
1329   if (sess->source->received_bye) {
1330     if (is_bye) {
1331       sess->stats.bye_members++;
1332       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1333     }
1334   } else {
1335     /* keep track of average packet size */
1336     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1337   }
1338   RTP_SESSION_UNLOCK (sess);
1339
1340   gst_buffer_unref (buffer);
1341
1342   return GST_FLOW_OK;
1343
1344   /* ERRORS */
1345 invalid_packet:
1346   {
1347     GST_DEBUG ("invalid RTCP packet received");
1348     return GST_FLOW_OK;
1349   }
1350 ignore:
1351   {
1352     gst_buffer_unref (buffer);
1353     RTP_SESSION_UNLOCK (sess);
1354     GST_DEBUG ("ignoring RTP packet because we left");
1355     return GST_FLOW_OK;
1356   }
1357 }
1358
1359 /**
1360  * rtp_session_send_rtp:
1361  * @sess: an #RTPSession
1362  * @buffer: an RTP buffer
1363  *
1364  * Send the RTP buffer in the session manager.
1365  *
1366  * Returns: a #GstFlowReturn.
1367  */
1368 GstFlowReturn
1369 rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
1370 {
1371   GstFlowReturn result;
1372   RTPSource *source;
1373   gboolean prevsender;
1374
1375   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1376   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1377
1378   RTP_SESSION_LOCK (sess);
1379   source = sess->source;
1380
1381   prevsender = RTP_SOURCE_IS_SENDER (source);
1382
1383   /* we use our own source to send */
1384   result = rtp_source_send_rtp (sess->source, buffer);
1385
1386   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
1387     sess->stats.sender_sources++;
1388   RTP_SESSION_UNLOCK (sess);
1389
1390   return result;
1391 }
1392
1393 static GstClockTime
1394 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
1395     gboolean first)
1396 {
1397   GstClockTime result;
1398
1399   if (sess->source->received_bye) {
1400     result = rtp_stats_calculate_bye_interval (&sess->stats);
1401   } else {
1402     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
1403         RTP_SOURCE_IS_SENDER (sess->source), first);
1404   }
1405
1406   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
1407       GST_TIME_ARGS (result), first);
1408
1409   if (!deterministic)
1410     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
1411
1412   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1413
1414   return result;
1415 }
1416
1417 /**
1418  * rtp_session_send_bye:
1419  * @sess: an #RTPSession
1420  * @reason: a reason or NULL
1421  *
1422  * Stop the current @sess and schedule a BYE message for the other members.
1423  *
1424  * Returns: a #GstFlowReturn.
1425  */
1426 GstFlowReturn
1427 rtp_session_send_bye (RTPSession * sess, const gchar * reason)
1428 {
1429   GstFlowReturn result = GST_FLOW_OK;
1430   RTPSource *source;
1431   GstClockTime current, interval;
1432
1433   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1434
1435   RTP_SESSION_LOCK (sess);
1436   source = sess->source;
1437
1438   /* ignore more BYEs */
1439   if (source->received_bye)
1440     goto done;
1441
1442   /* we have BYE now */
1443   source->received_bye = TRUE;
1444   /* at least one member wants to send a BYE */
1445   sess->bye_reason = g_strdup (reason);
1446   sess->stats.avg_rtcp_packet_size = 100;
1447   sess->stats.bye_members = 1;
1448   sess->first_rtcp = TRUE;
1449   sess->sent_bye = FALSE;
1450
1451   /* get current time */
1452   if (sess->callbacks.get_time)
1453     current = sess->callbacks.get_time (sess, sess->user_data);
1454   else
1455     current = 0;
1456
1457   /* reschedule transmission */
1458   sess->last_rtcp_send_time = current;
1459   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
1460   sess->next_rtcp_check_time = current + interval;
1461
1462   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
1463       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
1464
1465   /* notify app of reconsideration */
1466   if (sess->callbacks.reconsider)
1467     sess->callbacks.reconsider (sess, sess->user_data);
1468 done:
1469   RTP_SESSION_UNLOCK (sess);
1470
1471   return result;
1472 }
1473
1474 /**
1475  * rtp_session_next_timeout:
1476  * @sess: an #RTPSession
1477  * @time: the current time
1478  *
1479  * Get the next time we should perform session maintenance tasks.
1480  *
1481  * Returns: a time when rtp_session_on_timeout() should be called with the
1482  * current time.
1483  */
1484 GstClockTime
1485 rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
1486 {
1487   GstClockTime result;
1488
1489   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1490
1491   RTP_SESSION_LOCK (sess);
1492
1493   result = sess->next_rtcp_check_time;
1494
1495   if (sess->source->received_bye) {
1496     if (sess->sent_bye)
1497       result = GST_CLOCK_TIME_NONE;
1498     else if (sess->stats.active_sources >= 50)
1499       /* reconsider BYE if members >= 50 */
1500       result = time + calculate_rtcp_interval (sess, FALSE, TRUE);
1501   } else {
1502     if (sess->first_rtcp)
1503       /* we are called for the first time */
1504       result = time + calculate_rtcp_interval (sess, FALSE, TRUE);
1505     else if (sess->next_rtcp_check_time < time)
1506       /* get a new timeout when we need to */
1507       result = time + calculate_rtcp_interval (sess, FALSE, FALSE);
1508   }
1509   sess->next_rtcp_check_time = result;
1510
1511   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1512   RTP_SESSION_UNLOCK (sess);
1513
1514   return result;
1515 }
1516
1517 typedef struct
1518 {
1519   RTPSession *sess;
1520   GstBuffer *rtcp;
1521   GstClockTime time;
1522   GstClockTime interval;
1523   GstRTCPPacket packet;
1524   gboolean is_bye;
1525   gboolean has_sdes;
1526 } ReportData;
1527
1528 static void
1529 session_start_rtcp (RTPSession * sess, ReportData * data)
1530 {
1531   GstRTCPPacket *packet = &data->packet;
1532   RTPSource *own = sess->source;
1533
1534   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
1535
1536   if (RTP_SOURCE_IS_SENDER (own)) {
1537     /* we are a sender, create SR */
1538     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
1539     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
1540
1541     /* fill in sender report info, FIXME NTP and RTP timestamps missing */
1542     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
1543         0, 0, own->stats.packets_sent, own->stats.octets_sent);
1544   } else {
1545     /* we are only receiver, create RR */
1546     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
1547     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
1548     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
1549   }
1550 }
1551
1552 /* construct a Sender or Receiver Report */
1553 static void
1554 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
1555 {
1556   RTPSession *sess = data->sess;
1557   GstRTCPPacket *packet = &data->packet;
1558
1559   /* create a new buffer if needed */
1560   if (data->rtcp == NULL) {
1561     session_start_rtcp (sess, data);
1562   }
1563   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
1564     /* only report about other sender sources */
1565     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
1566       RTPSourceStats *stats;
1567       guint64 extended_max, expected;
1568       guint64 expected_interval, received_interval, ntptime;
1569       gint64 lost, lost_interval;
1570       guint32 fraction, LSR, DLSR;
1571       GstClockTime time;
1572
1573       stats = &source->stats;
1574
1575       extended_max = stats->cycles + stats->max_seq;
1576       expected = extended_max - stats->base_seq + 1;
1577
1578       GST_DEBUG ("ext_max %d, expected %d, received %d, base_seq %d",
1579           extended_max, expected, stats->packets_received, stats->base_seq);
1580
1581       lost = expected - stats->packets_received;
1582       lost = CLAMP (lost, -0x800000, 0x7fffff);
1583
1584       expected_interval = expected - stats->prev_expected;
1585       stats->prev_expected = expected;
1586       received_interval = stats->packets_received - stats->prev_received;
1587       stats->prev_received = stats->packets_received;
1588
1589       lost_interval = expected_interval - received_interval;
1590
1591       if (expected_interval == 0 || lost_interval <= 0)
1592         fraction = 0;
1593       else
1594         fraction = (lost_interval << 8) / expected_interval;
1595
1596       GST_DEBUG ("add RR for SSRC %08x", source->ssrc);
1597       /* we scaled the jitter up for additional precision */
1598       GST_DEBUG ("fraction %d, lost %d, extseq %u, jitter %d", fraction, lost,
1599           extended_max, stats->jitter >> 4);
1600
1601       if (rtp_source_get_last_sr (source, &ntptime, NULL, NULL, NULL, &time)) {
1602         GstClockTime diff;
1603
1604         /* LSR is middle bits of the last ntptime */
1605         LSR = (ntptime >> 16) & 0xffffffff;
1606         diff = data->time - time;
1607         GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff));
1608         /* DLSR, delay since last SR is expressed in 1/65536 second units */
1609         DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND);
1610       } else {
1611         /* No valid SR received, LSR/DLSR are set to 0 then */
1612         LSR = 0;
1613         DLSR = 0;
1614       }
1615       GST_DEBUG ("LSR %08x, DLSR %08x", LSR, DLSR);
1616
1617       /* packet is not yet filled, add report block for this source. */
1618       gst_rtcp_packet_add_rb (packet, source->ssrc, fraction, lost,
1619           extended_max, stats->jitter >> 4, LSR, DLSR);
1620     }
1621   }
1622 }
1623
1624 /* perform cleanup of sources that timed out */
1625 static gboolean
1626 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
1627 {
1628   gboolean remove = FALSE;
1629   gboolean byetimeout = FALSE;
1630   gboolean is_sender, is_active;
1631   RTPSession *sess = data->sess;
1632   GstClockTime interval;
1633
1634   is_sender = RTP_SOURCE_IS_SENDER (source);
1635   is_active = RTP_SOURCE_IS_ACTIVE (source);
1636
1637   /* check for our own source, we don't want to delete our own source. */
1638   if (!(source == sess->source)) {
1639     if (source->received_bye) {
1640       /* if we received a BYE from the source, remove the source after some
1641        * time. */
1642       if (data->time > source->bye_time &&
1643           data->time - source->bye_time > sess->stats.bye_timeout) {
1644         GST_DEBUG ("removing BYE source %08x", source->ssrc);
1645         remove = TRUE;
1646         byetimeout = TRUE;
1647       }
1648     }
1649     /* sources that were inactive for more than 5 times the deterministic reporting
1650      * interval get timed out. the min timeout is 5 seconds. */
1651     if (data->time > source->last_activity) {
1652       interval = MAX (data->interval * 5, 5 * GST_SECOND);
1653       if (data->time - source->last_activity > interval) {
1654         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
1655             source->ssrc, GST_TIME_ARGS (source->last_activity));
1656         remove = TRUE;
1657       }
1658     }
1659   }
1660
1661   /* senders that did not send for a long time become a receiver, this also
1662    * holds for our own source. */
1663   if (is_sender) {
1664     if (data->time > source->last_rtp_activity) {
1665       interval = MAX (data->interval * 2, 5 * GST_SECOND);
1666
1667       if (data->time - source->last_rtp_activity > interval) {
1668         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
1669             GST_TIME_FORMAT, source->ssrc,
1670             GST_TIME_ARGS (source->last_rtp_activity));
1671         source->is_sender = FALSE;
1672         sess->stats.sender_sources--;
1673       }
1674     }
1675   }
1676
1677   if (remove) {
1678     sess->total_sources--;
1679     if (is_sender)
1680       sess->stats.sender_sources--;
1681     if (is_active)
1682       sess->stats.active_sources--;
1683
1684     if (byetimeout)
1685       on_bye_timeout (sess, source);
1686     else
1687       on_timeout (sess, source);
1688
1689   }
1690   return remove;
1691 }
1692
1693 static void
1694 session_sdes (RTPSession * sess, ReportData * data)
1695 {
1696   GstRTCPPacket *packet = &data->packet;
1697
1698   /* add SDES packet */
1699   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
1700
1701   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
1702   gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME,
1703       strlen (sess->cname), (guint8 *) sess->cname);
1704
1705   /* other SDES items must only be added at regular intervals and only when the
1706    * user requests to since it might be a privacy problem */
1707 #if 0
1708   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME,
1709       strlen (sess->name), (guint8 *) sess->name);
1710   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
1711       strlen (sess->tool), (guint8 *) sess->tool);
1712 #endif
1713
1714   data->has_sdes = TRUE;
1715 }
1716
1717 /* schedule a BYE packet */
1718 static void
1719 session_bye (RTPSession * sess, ReportData * data)
1720 {
1721   GstRTCPPacket *packet = &data->packet;
1722
1723   /* open packet */
1724   session_start_rtcp (sess, data);
1725
1726   /* add SDES */
1727   session_sdes (sess, data);
1728
1729   /* add a BYE packet */
1730   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
1731   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
1732   if (sess->bye_reason)
1733     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
1734
1735   /* we have a BYE packet now */
1736   data->is_bye = TRUE;
1737 }
1738
1739 static gboolean
1740 is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
1741 {
1742   GstClockTime new_send_time;
1743   gboolean result;
1744
1745   /* no need to check yet */
1746   if (sess->next_rtcp_check_time > time) {
1747     GST_DEBUG ("no check time yet");
1748     return FALSE;
1749   }
1750
1751   /* perform forward reconsideration */
1752   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
1753
1754   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT,
1755       GST_TIME_ARGS (new_send_time));
1756
1757   new_send_time += sess->last_rtcp_send_time;
1758
1759   /* check if reconsideration */
1760   if (time < new_send_time) {
1761     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
1762         GST_TIME_ARGS (new_send_time));
1763     result = FALSE;
1764     /* store new check time */
1765     sess->next_rtcp_check_time = new_send_time;
1766   } else {
1767     result = TRUE;
1768     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
1769
1770     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
1771         GST_TIME_ARGS (new_send_time));
1772     sess->next_rtcp_check_time = time + new_send_time;
1773   }
1774   return result;
1775 }
1776
1777 /**
1778  * rtp_session_on_timeout:
1779  * @sess: an #RTPSession
1780  *
1781  * Perform maintenance actions after the timeout obtained with
1782  * rtp_session_next_timeout() expired.
1783  *
1784  * This function will perform timeouts of receivers and senders, send a BYE
1785  * packet or generate RTCP packets with current session stats.
1786  *
1787  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
1788  * times, for each packet that should be processed.
1789  *
1790  * Returns: a #GstFlowReturn.
1791  */
1792 GstFlowReturn
1793 rtp_session_on_timeout (RTPSession * sess, GstClockTime time)
1794 {
1795   GstFlowReturn result = GST_FLOW_OK;
1796   ReportData data;
1797
1798   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1799
1800   data.sess = sess;
1801   data.rtcp = NULL;
1802   data.time = time;
1803   data.is_bye = FALSE;
1804   data.has_sdes = FALSE;
1805
1806   GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
1807
1808   RTP_SESSION_LOCK (sess);
1809   /* get a new interval, we need this for various cleanups etc */
1810   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
1811
1812   /* first perform cleanups */
1813   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
1814       (GHRFunc) session_cleanup, &data);
1815
1816   /* see if we need to generate SR or RR packets */
1817   if (is_rtcp_time (sess, time, &data)) {
1818     if (sess->source->received_bye) {
1819       /* generate BYE instead */
1820       session_bye (sess, &data);
1821       sess->sent_bye = TRUE;
1822     } else {
1823       /* loop over all known sources and do something */
1824       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1825           (GHFunc) session_report_blocks, &data);
1826     }
1827   }
1828
1829   if (data.rtcp) {
1830     guint size;
1831
1832     /* we keep track of the last report time in order to timeout inactive
1833      * receivers or senders */
1834     sess->last_rtcp_send_time = data.time;
1835     sess->first_rtcp = FALSE;
1836
1837     /* add SDES for this source when not already added */
1838     if (!data.has_sdes)
1839       session_sdes (sess, &data);
1840
1841     /* update average RTCP size before sending */
1842     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
1843     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
1844   }
1845   RTP_SESSION_UNLOCK (sess);
1846
1847   /* push out the RTCP packet */
1848   if (data.rtcp) {
1849     /* close the RTCP packet */
1850     gst_rtcp_buffer_end (data.rtcp);
1851
1852     if (sess->callbacks.send_rtcp)
1853       result = sess->callbacks.send_rtcp (sess, sess->source, data.rtcp,
1854           sess->user_data);
1855     else
1856       gst_buffer_unref (data.rtcp);
1857   }
1858
1859   return result;
1860 }