7244f5fbca64897cda1aa7c8cb327138bf50e89f
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "rtpsession.h"
27
28 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
29 #define GST_CAT_DEFAULT rtp_session_debug
30
31 /* signals and args */
32 enum
33 {
34   SIGNAL_ON_NEW_SSRC,
35   SIGNAL_ON_SSRC_COLLISION,
36   SIGNAL_ON_SSRC_VALIDATED,
37   SIGNAL_ON_BYE_SSRC,
38   SIGNAL_ON_BYE_TIMEOUT,
39   SIGNAL_ON_TIMEOUT,
40   LAST_SIGNAL
41 };
42
43 #define RTP_DEFAULT_BANDWIDTH        64000.0
44 #define RTP_DEFAULT_RTCP_BANDWIDTH   1000
45
46 enum
47 {
48   PROP_0
49 };
50
51 /* update average packet size, we keep this scaled by 16 to keep enough
52  * precision. */
53 #define UPDATE_AVG(avg, val)            \
54   if ((avg) == 0)                       \
55    (avg) = (val) << 4;                  \
56   else                                  \
57    (avg) = ((val) + (15 * (avg))) >> 4;
58
59 /* GObject vmethods */
60 static void rtp_session_finalize (GObject * object);
61 static void rtp_session_set_property (GObject * object, guint prop_id,
62     const GValue * value, GParamSpec * pspec);
63 static void rtp_session_get_property (GObject * object, guint prop_id,
64     GValue * value, GParamSpec * pspec);
65
66 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
67
68 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
69
70 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
71     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
72
73 static void
74 rtp_session_class_init (RTPSessionClass * klass)
75 {
76   GObjectClass *gobject_class;
77
78   gobject_class = (GObjectClass *) klass;
79
80   gobject_class->finalize = rtp_session_finalize;
81   gobject_class->set_property = rtp_session_set_property;
82   gobject_class->get_property = rtp_session_get_property;
83
84   /**
85    * RTPSession::on-new-ssrc:
86    * @session: the object which received the signal
87    * @src: the new RTPSource
88    *
89    * Notify of a new SSRC that entered @session.
90    */
91   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
92       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
93       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
94       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
95       G_TYPE_OBJECT);
96   /**
97    * RTPSession::on-ssrc_collision:
98    * @session: the object which received the signal
99    * @src: the #RTPSource that caused a collision
100    *
101    * Notify when we have an SSRC collision
102    */
103   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
104       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
105       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
106       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
107       G_TYPE_OBJECT);
108   /**
109    * RTPSession::on-ssrc_validated:
110    * @session: the object which received the signal
111    * @src: the new validated RTPSource
112    *
113    * Notify of a new SSRC that became validated.
114    */
115   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
116       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
117       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
118       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
119       G_TYPE_OBJECT);
120   /**
121    * RTPSession::on-bye-ssrc:
122    * @session: the object which received the signal
123    * @src: the RTPSource that went away
124    *
125    * Notify of an SSRC that became inactive because of a BYE packet.
126    */
127   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
128       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
129       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
130       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
131       G_TYPE_OBJECT);
132   /**
133    * RTPSession::on-bye-timeout:
134    * @session: the object which received the signal
135    * @src: the RTPSource that timed out
136    *
137    * Notify of an SSRC that has timed out because of BYE
138    */
139   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
140       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
141       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
142       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
143       G_TYPE_OBJECT);
144   /**
145    * RTPSession::on-timeout:
146    * @session: the object which received the signal
147    * @src: the RTPSource that timed out
148    *
149    * Notify of an SSRC that has timed out
150    */
151   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
152       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
153       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
154       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
155       G_TYPE_OBJECT);
156
157   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
158 }
159
160 static void
161 rtp_session_init (RTPSession * sess)
162 {
163   gint i;
164
165   sess->lock = g_mutex_new ();
166   sess->key = g_random_int ();
167   sess->mask_idx = 0;
168   sess->mask = 0;
169
170   for (i = 0; i < 32; i++) {
171     sess->ssrcs[i] =
172         g_hash_table_new_full (NULL, NULL, NULL,
173         (GDestroyNotify) g_object_unref);
174   }
175   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
176
177   rtp_stats_init_defaults (&sess->stats);
178
179   /* create an active SSRC for this session manager */
180   sess->source = rtp_session_create_source (sess);
181   sess->source->validated = TRUE;
182   sess->stats.active_sources++;
183
184   /* default UDP header length */
185   sess->header_len = 28;
186   sess->mtu = 1400;
187
188   /* some default SDES entries */
189   //sess->cname = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
190   sess->cname = g_strdup_printf ("foo@%s", g_get_host_name ());
191   sess->name = g_strdup (g_get_real_name ());
192   sess->tool = g_strdup ("GStreamer");
193
194   sess->first_rtcp = TRUE;
195
196   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
197 }
198
199 static void
200 rtp_session_finalize (GObject * object)
201 {
202   RTPSession *sess;
203   gint i;
204
205   sess = RTP_SESSION_CAST (object);
206
207   g_mutex_free (sess->lock);
208   for (i = 0; i < 32; i++)
209     g_hash_table_destroy (sess->ssrcs[i]);
210
211   g_hash_table_destroy (sess->cnames);
212   g_object_unref (sess->source);
213
214   g_free (sess->cname);
215   g_free (sess->tool);
216   g_free (sess->bye_reason);
217
218   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
219 }
220
221 static void
222 rtp_session_set_property (GObject * object, guint prop_id,
223     const GValue * value, GParamSpec * pspec)
224 {
225   RTPSession *sess;
226
227   sess = RTP_SESSION (object);
228
229   switch (prop_id) {
230     default:
231       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
232       break;
233   }
234 }
235
236 static void
237 rtp_session_get_property (GObject * object, guint prop_id,
238     GValue * value, GParamSpec * pspec)
239 {
240   RTPSession *sess;
241
242   sess = RTP_SESSION (object);
243
244   switch (prop_id) {
245     default:
246       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
247       break;
248   }
249 }
250
251 static void
252 on_new_ssrc (RTPSession * sess, RTPSource * source)
253 {
254   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
255 }
256
257 static void
258 on_ssrc_collision (RTPSession * sess, RTPSource * source)
259 {
260   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
261       source);
262 }
263
264 static void
265 on_ssrc_validated (RTPSession * sess, RTPSource * source)
266 {
267   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
268       source);
269 }
270
271 static void
272 on_bye_ssrc (RTPSession * sess, RTPSource * source)
273 {
274   /* notify app that reconsideration should be performed */
275   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
276 }
277
278 static void
279 on_bye_timeout (RTPSession * sess, RTPSource * source)
280 {
281   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
282 }
283
284 static void
285 on_timeout (RTPSession * sess, RTPSource * source)
286 {
287   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
288 }
289
290 /**
291  * rtp_session_new:
292  *
293  * Create a new session object.
294  *
295  * Returns: a new #RTPSession. g_object_unref() after usage.
296  */
297 RTPSession *
298 rtp_session_new (void)
299 {
300   RTPSession *sess;
301
302   sess = g_object_new (RTP_TYPE_SESSION, NULL);
303
304   return sess;
305 }
306
307 /**
308  * rtp_session_set_callbacks:
309  * @sess: an #RTPSession
310  * @callbacks: callbacks to configure
311  * @user_data: user data passed in the callbacks
312  *
313  * Configure a set of callbacks to be notified of actions.
314  */
315 void
316 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
317     gpointer user_data)
318 {
319   g_return_if_fail (RTP_IS_SESSION (sess));
320
321   sess->callbacks.process_rtp = callbacks->process_rtp;
322   sess->callbacks.send_rtp = callbacks->send_rtp;
323   sess->callbacks.send_rtcp = callbacks->send_rtcp;
324   sess->callbacks.clock_rate = callbacks->clock_rate;
325   sess->callbacks.get_time = callbacks->get_time;
326   sess->callbacks.reconsider = callbacks->reconsider;
327   sess->user_data = user_data;
328 }
329
330 /**
331  * rtp_session_set_bandwidth:
332  * @sess: an #RTPSession
333  * @bandwidth: the bandwidth allocated
334  *
335  * Set the session bandwidth in bytes per second.
336  */
337 void
338 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
339 {
340   g_return_if_fail (RTP_IS_SESSION (sess));
341
342   sess->stats.bandwidth = bandwidth;
343 }
344
345 /**
346  * rtp_session_get_bandwidth:
347  * @sess: an #RTPSession
348  *
349  * Get the session bandwidth.
350  *
351  * Returns: the session bandwidth.
352  */
353 gdouble
354 rtp_session_get_bandwidth (RTPSession * sess)
355 {
356   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
357
358   return sess->stats.bandwidth;
359 }
360
361 /**
362  * rtp_session_set_rtcp_bandwidth:
363  * @sess: an #RTPSession
364  * @bandwidth: the RTCP bandwidth
365  *
366  * Set the bandwidth that should be used for RTCP
367  * messages. 
368  */
369 void
370 rtp_session_set_rtcp_bandwidth (RTPSession * sess, gdouble bandwidth)
371 {
372   g_return_if_fail (RTP_IS_SESSION (sess));
373
374   sess->stats.rtcp_bandwidth = bandwidth;
375 }
376
377 /**
378  * rtp_session_get_rtcp_bandwidth:
379  * @sess: an #RTPSession
380  *
381  * Get the session bandwidth used for RTCP.
382  *
383  * Returns: The bandwidth used for RTCP messages.
384  */
385 gdouble
386 rtp_session_get_rtcp_bandwidth (RTPSession * sess)
387 {
388   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
389
390   return sess->stats.rtcp_bandwidth;
391 }
392
393 /**
394  * rtp_session_set_cname:
395  * @sess: an #RTPSession
396  * @cname: a CNAME for the session
397  *
398  * Set the CNAME for the session. 
399  */
400 void
401 rtp_session_set_cname (RTPSession * sess, const gchar * cname)
402 {
403   g_return_if_fail (RTP_IS_SESSION (sess));
404
405   g_free (sess->cname);
406   sess->cname = g_strdup (cname);
407 }
408
409 /**
410  * rtp_session_get_cname:
411  * @sess: an #RTPSession
412  *
413  * Get the currently configured CNAME for the session.
414  *
415  * Returns: The CNAME. g_free after usage.
416  */
417 gchar *
418 rtp_session_get_cname (RTPSession * sess)
419 {
420   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
421
422   return g_strdup (sess->cname);
423 }
424
425 /**
426  * rtp_session_set_name:
427  * @sess: an #RTPSession
428  * @name: a NAME for the session
429  *
430  * Set the NAME for the session. 
431  */
432 void
433 rtp_session_set_name (RTPSession * sess, const gchar * name)
434 {
435   g_return_if_fail (RTP_IS_SESSION (sess));
436
437   g_free (sess->name);
438   sess->name = g_strdup (name);
439 }
440
441 /**
442  * rtp_session_get_name:
443  * @sess: an #RTPSession
444  *
445  * Get the currently configured NAME for the session.
446  *
447  * Returns: The NAME. g_free after usage.
448  */
449 gchar *
450 rtp_session_get_name (RTPSession * sess)
451 {
452   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
453
454   return g_strdup (sess->name);
455 }
456
457 /**
458  * rtp_session_set_email:
459  * @sess: an #RTPSession
460  * @email: an EMAIL for the session
461  *
462  * Set the EMAIL the session. 
463  */
464 void
465 rtp_session_set_email (RTPSession * sess, const gchar * email)
466 {
467   g_return_if_fail (RTP_IS_SESSION (sess));
468
469   g_free (sess->email);
470   sess->email = g_strdup (email);
471 }
472
473 /**
474  * rtp_session_get_email:
475  * @sess: an #RTPSession
476  *
477  * Get the currently configured EMAIL of the session.
478  *
479  * Returns: The EMAIL. g_free after usage.
480  */
481 gchar *
482 rtp_session_get_email (RTPSession * sess)
483 {
484   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
485
486   return g_strdup (sess->email);
487 }
488
489 /**
490  * rtp_session_set_phone:
491  * @sess: an #RTPSession
492  * @phone: a PHONE for the session
493  *
494  * Set the PHONE the session. 
495  */
496 void
497 rtp_session_set_phone (RTPSession * sess, const gchar * phone)
498 {
499   g_return_if_fail (RTP_IS_SESSION (sess));
500
501   g_free (sess->phone);
502   sess->phone = g_strdup (phone);
503 }
504
505 /**
506  * rtp_session_get_location:
507  * @sess: an #RTPSession
508  *
509  * Get the currently configured PHONE of the session.
510  *
511  * Returns: The PHONE. g_free after usage.
512  */
513 gchar *
514 rtp_session_get_phone (RTPSession * sess)
515 {
516   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
517
518   return g_strdup (sess->phone);
519 }
520
521 /**
522  * rtp_session_set_location:
523  * @sess: an #RTPSession
524  * @location: a LOCATION for the session
525  *
526  * Set the LOCATION the session. 
527  */
528 void
529 rtp_session_set_location (RTPSession * sess, const gchar * location)
530 {
531   g_return_if_fail (RTP_IS_SESSION (sess));
532
533   g_free (sess->location);
534   sess->location = g_strdup (location);
535 }
536
537 /**
538  * rtp_session_get_location:
539  * @sess: an #RTPSession
540  *
541  * Get the currently configured LOCATION of the session.
542  *
543  * Returns: The LOCATION. g_free after usage.
544  */
545 gchar *
546 rtp_session_get_location (RTPSession * sess)
547 {
548   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
549
550   return g_strdup (sess->location);
551 }
552
553 /**
554  * rtp_session_set_tool:
555  * @sess: an #RTPSession
556  * @tool: a TOOL for the session
557  *
558  * Set the TOOL the session. 
559  */
560 void
561 rtp_session_set_tool (RTPSession * sess, const gchar * tool)
562 {
563   g_return_if_fail (RTP_IS_SESSION (sess));
564
565   g_free (sess->tool);
566   sess->tool = g_strdup (tool);
567 }
568
569 /**
570  * rtp_session_get_tool:
571  * @sess: an #RTPSession
572  *
573  * Get the currently configured TOOL of the session.
574  *
575  * Returns: The TOOL. g_free after usage.
576  */
577 gchar *
578 rtp_session_get_tool (RTPSession * sess)
579 {
580   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
581
582   return g_strdup (sess->tool);
583 }
584
585 /**
586  * rtp_session_set_note:
587  * @sess: an #RTPSession
588  * @note: a NOTE for the session
589  *
590  * Set the NOTE the session. 
591  */
592 void
593 rtp_session_set_note (RTPSession * sess, const gchar * note)
594 {
595   g_return_if_fail (RTP_IS_SESSION (sess));
596
597   g_free (sess->note);
598   sess->note = g_strdup (note);
599 }
600
601 /**
602  * rtp_session_get_note:
603  * @sess: an #RTPSession
604  *
605  * Get the currently configured NOTE of the session.
606  *
607  * Returns: The NOTE. g_free after usage.
608  */
609 gchar *
610 rtp_session_get_note (RTPSession * sess)
611 {
612   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
613
614   return g_strdup (sess->note);
615 }
616
617 static GstFlowReturn
618 source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
619 {
620   GstFlowReturn result = GST_FLOW_OK;
621
622   if (source == session->source) {
623     GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
624
625
626     if (session->callbacks.send_rtp)
627       result =
628           session->callbacks.send_rtp (session, source, buffer,
629           session->user_data);
630     else
631       gst_buffer_unref (buffer);
632   } else {
633     GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
634     if (session->callbacks.process_rtp)
635       result =
636           session->callbacks.process_rtp (session, source, buffer,
637           session->user_data);
638     else
639       gst_buffer_unref (buffer);
640   }
641   return result;
642 }
643
644 static gint
645 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
646 {
647   gint result;
648
649   if (session->callbacks.clock_rate)
650     result = session->callbacks.clock_rate (session, pt, session->user_data);
651   else
652     result = -1;
653
654   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
655
656   return result;
657 }
658
659 static RTPSourceCallbacks callbacks = {
660   (RTPSourcePushRTP) source_push_rtp,
661   (RTPSourceClockRate) source_clock_rate,
662 };
663
664 static gboolean
665 check_collision (RTPSession * sess, RTPSource * source,
666     RTPArrivalStats * arrival)
667 {
668   /* FIXME, do collision check */
669   return FALSE;
670 }
671
672 static RTPSource *
673 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
674     RTPArrivalStats * arrival, gboolean rtp)
675 {
676   RTPSource *source;
677
678   source =
679       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
680   if (source == NULL) {
681     /* make new Source in probation and insert */
682     source = rtp_source_new (ssrc);
683
684     if (rtp)
685       source->probation = RTP_DEFAULT_PROBATION;
686     else
687       source->probation = 0;
688
689     /* store from address, if any */
690     if (arrival->have_address) {
691       if (rtp)
692         rtp_source_set_rtp_from (source, &arrival->address);
693       else
694         rtp_source_set_rtcp_from (source, &arrival->address);
695     }
696
697     /* configure a callback on the source */
698     rtp_source_set_callbacks (source, &callbacks, sess);
699
700     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
701         source);
702
703     /* we have one more source now */
704     sess->total_sources++;
705     *created = TRUE;
706   } else {
707     *created = FALSE;
708     /* check for collision, this updates the address when not previously set */
709     if (check_collision (sess, source, arrival))
710       on_ssrc_collision (sess, source);
711   }
712   /* update last activity */
713   source->last_activity = arrival->time;
714   if (rtp)
715     source->last_rtp_activity = arrival->time;
716
717   return source;
718 }
719
720 /**
721  * rtp_session_add_source:
722  * @sess: a #RTPSession
723  * @src: #RTPSource to add
724  *
725  * Add @src to @session.
726  *
727  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
728  * existed in the session.
729  */
730 gboolean
731 rtp_session_add_source (RTPSession * sess, RTPSource * src)
732 {
733   gboolean result = FALSE;
734   RTPSource *find;
735
736   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
737   g_return_val_if_fail (src != NULL, FALSE);
738
739   RTP_SESSION_LOCK (sess);
740   find =
741       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
742       GINT_TO_POINTER (src->ssrc));
743   if (find == NULL) {
744     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
745         GINT_TO_POINTER (src->ssrc), src);
746     /* we have one more source now */
747     sess->total_sources++;
748     result = TRUE;
749   }
750   RTP_SESSION_UNLOCK (sess);
751
752   return result;
753 }
754
755 /**
756  * rtp_session_get_num_sources:
757  * @sess: an #RTPSession
758  *
759  * Get the number of sources in @sess.
760  *
761  * Returns: The number of sources in @sess.
762  */
763 guint
764 rtp_session_get_num_sources (RTPSession * sess)
765 {
766   guint result;
767
768   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
769
770   RTP_SESSION_LOCK (sess);
771   result = sess->total_sources;
772   RTP_SESSION_UNLOCK (sess);
773
774   return result;
775 }
776
777 /**
778  * rtp_session_get_num_active_sources:
779  * @sess: an #RTPSession
780  *
781  * Get the number of active sources in @sess. A source is considered active when
782  * it has been validated and has not yet received a BYE RTCP message.
783  *
784  * Returns: The number of active sources in @sess.
785  */
786 guint
787 rtp_session_get_num_active_sources (RTPSession * sess)
788 {
789   guint result;
790
791   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
792
793   RTP_SESSION_LOCK (sess);
794   result = sess->stats.active_sources;
795   RTP_SESSION_UNLOCK (sess);
796
797   return result;
798 }
799
800 /**
801  * rtp_session_get_source_by_ssrc:
802  * @sess: an #RTPSession
803  * @ssrc: an SSRC
804  *
805  * Find the source with @ssrc in @sess.
806  *
807  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
808  * g_object_unref() after usage.
809  */
810 RTPSource *
811 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
812 {
813   RTPSource *result;
814
815   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
816
817   RTP_SESSION_LOCK (sess);
818   result =
819       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
820   if (result)
821     g_object_ref (result);
822   RTP_SESSION_UNLOCK (sess);
823
824   return result;
825 }
826
827 /**
828  * rtp_session_get_source_by_cname:
829  * @sess: a #RTPSession
830  * @cname: an CNAME
831  *
832  * Find the source with @cname in @sess.
833  *
834  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
835  * g_object_unref() after usage.
836  */
837 RTPSource *
838 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
839 {
840   RTPSource *result;
841
842   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
843   g_return_val_if_fail (cname != NULL, NULL);
844
845   RTP_SESSION_LOCK (sess);
846   result = g_hash_table_lookup (sess->cnames, cname);
847   if (result)
848     g_object_ref (result);
849   RTP_SESSION_UNLOCK (sess);
850
851   return result;
852 }
853
854 /**
855  * rtp_session_create_source:
856  * @sess: an #RTPSession
857  *
858  * Create an #RTPSource for use in @sess. This function will create a source
859  * with an ssrc that is currently not used by any participants in the session.
860  *
861  * Returns: an #RTPSource.
862  */
863 RTPSource *
864 rtp_session_create_source (RTPSession * sess)
865 {
866   guint32 ssrc;
867   RTPSource *source;
868
869   RTP_SESSION_LOCK (sess);
870   while (TRUE) {
871     ssrc = g_random_int ();
872
873     /* see if it exists in the session, we're done if it doesn't */
874     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
875             GINT_TO_POINTER (ssrc)) == NULL)
876       break;
877   }
878   source = rtp_source_new (ssrc);
879   g_object_ref (source);
880   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
881       source);
882   /* we have one more source now */
883   sess->total_sources++;
884   RTP_SESSION_UNLOCK (sess);
885
886   return source;
887 }
888
889 /* update the RTPArrivalStats structure with the current time and other bits
890  * about the current buffer we are handling.
891  * This function is typically called when a validated packet is received.
892  * This function should be called with the SESSION_LOCK
893  */
894 static void
895 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
896     gboolean rtp, GstBuffer * buffer)
897 {
898   /* get time or arrival */
899   if (sess->callbacks.get_time)
900     arrival->time = sess->callbacks.get_time (sess, sess->user_data);
901   else
902     arrival->time = GST_CLOCK_TIME_NONE;
903
904   /* get packet size including header overhead */
905   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
906
907   if (rtp) {
908     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
909   } else {
910     arrival->payload_len = 0;
911   }
912
913   /* for netbuffer we can store the IP address to check for collisions */
914   arrival->have_address = GST_IS_NETBUFFER (buffer);
915   if (arrival->have_address) {
916     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
917
918     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
919   }
920 }
921
922 /**
923  * rtp_session_process_rtp:
924  * @sess: and #RTPSession
925  * @buffer: an RTP buffer
926  *
927  * Process an RTP buffer in the session manager. This function takes ownership
928  * of @buffer.
929  *
930  * Returns: a #GstFlowReturn.
931  */
932 GstFlowReturn
933 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
934 {
935   GstFlowReturn result;
936   guint32 ssrc;
937   RTPSource *source;
938   gboolean created;
939   gboolean prevsender, prevactive;
940   RTPArrivalStats arrival;
941
942   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
943   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
944
945   if (!gst_rtp_buffer_validate (buffer))
946     goto invalid_packet;
947
948   RTP_SESSION_LOCK (sess);
949   /* update arrival stats */
950   update_arrival_stats (sess, &arrival, TRUE, buffer);
951
952   /* ignore more RTP packets when we left the session */
953   if (sess->source->received_bye)
954     goto ignore;
955
956   /* get SSRC and look up in session database */
957   ssrc = gst_rtp_buffer_get_ssrc (buffer);
958   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
959
960   prevsender = RTP_SOURCE_IS_SENDER (source);
961   prevactive = RTP_SOURCE_IS_ACTIVE (source);
962
963   /* we need to ref so that we can process the CSRCs later */
964   gst_buffer_ref (buffer);
965
966   /* let source process the packet */
967   result = rtp_source_process_rtp (source, buffer, &arrival);
968
969   /* source became active */
970   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
971     sess->stats.active_sources++;
972     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
973         sess->stats.active_sources);
974     on_ssrc_validated (sess, source);
975   }
976   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
977     sess->stats.sender_sources++;
978     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
979         sess->stats.sender_sources);
980   }
981
982   if (created)
983     on_new_ssrc (sess, source);
984
985   if (source->validated) {
986     guint8 i, count;
987     gboolean created;
988
989     /* for validated sources, we add the CSRCs as well */
990     count = gst_rtp_buffer_get_csrc_count (buffer);
991
992     for (i = 0; i < count; i++) {
993       guint32 csrc;
994       RTPSource *csrc_src;
995
996       csrc = gst_rtp_buffer_get_csrc (buffer, i);
997
998       /* get source */
999       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1000
1001       if (created) {
1002         GST_DEBUG ("created new CSRC: %08x", csrc);
1003         rtp_source_set_as_csrc (csrc_src);
1004         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1005           sess->stats.active_sources++;
1006         on_new_ssrc (sess, source);
1007       }
1008     }
1009   }
1010   gst_buffer_unref (buffer);
1011
1012   RTP_SESSION_UNLOCK (sess);
1013
1014   return result;
1015
1016   /* ERRORS */
1017 invalid_packet:
1018   {
1019     gst_buffer_unref (buffer);
1020     GST_DEBUG ("invalid RTP packet received");
1021     return GST_FLOW_OK;
1022   }
1023 ignore:
1024   {
1025     gst_buffer_unref (buffer);
1026     RTP_SESSION_UNLOCK (sess);
1027     GST_DEBUG ("ignoring RTP packet because we are leaving");
1028     return GST_FLOW_OK;
1029   }
1030 }
1031
1032 /* A Sender report contains statistics about how the sender is doing. This
1033  * includes timing informataion about the relation between RTP and NTP
1034  * timestamps is it using and the number of packets/bytes it sent to us.
1035  *
1036  * In this report is also included a set of report blocks related to how this
1037  * sender is receiving data (in case we (or somebody else) is also sending stuff
1038  * to it). This info includes the packet loss, jitter and seqnum. It also
1039  * contains information to calculate the round trip time (LSR/DLSR).
1040  */
1041 static void
1042 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1043     RTPArrivalStats * arrival)
1044 {
1045   guint32 senderssrc, rtptime, packet_count, octet_count;
1046   guint64 ntptime;
1047   guint count, i;
1048   RTPSource *source;
1049   gboolean created, prevsender;
1050
1051   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1052       &packet_count, &octet_count);
1053
1054   GST_DEBUG ("got SR packet: SSRC %08x", senderssrc);
1055
1056   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1057
1058   prevsender = RTP_SOURCE_IS_SENDER (source);
1059
1060   /* first update the source */
1061   rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count,
1062       arrival->time);
1063
1064   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1065     sess->stats.sender_sources++;
1066     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1067         sess->stats.sender_sources);
1068   }
1069
1070   if (created)
1071     on_new_ssrc (sess, source);
1072
1073   count = gst_rtcp_packet_get_rb_count (packet);
1074   for (i = 0; i < count; i++) {
1075     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1076     guint8 fractionlost;
1077     gint32 packetslost;
1078
1079     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1080         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1081
1082     if (ssrc == sess->source->ssrc) {
1083       /* only deal with report blocks for our session, we update the stats of
1084        * the sender of the RTCP message. We could also compare our stats against
1085        * the other sender to see if we are better or worse. */
1086       rtp_source_process_rb (source, fractionlost, packetslost,
1087           exthighestseq, jitter, lsr, dlsr);
1088     }
1089   }
1090 }
1091
1092 /* A receiver report contains statistics about how a receiver is doing. It
1093  * includes stuff like packet loss, jitter and the seqnum it received last. It
1094  * also contains info to calculate the round trip time.
1095  *
1096  * We are only interested in how the sender of this report is doing wrt to us.
1097  */
1098 static void
1099 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1100     RTPArrivalStats * arrival)
1101 {
1102   guint32 senderssrc;
1103   guint count, i;
1104   RTPSource *source;
1105   gboolean created;
1106
1107   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1108
1109   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1110
1111   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1112
1113   if (created)
1114     on_new_ssrc (sess, source);
1115
1116   count = gst_rtcp_packet_get_rb_count (packet);
1117   for (i = 0; i < count; i++) {
1118     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1119     guint8 fractionlost;
1120     gint32 packetslost;
1121
1122     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1123         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1124
1125     if (ssrc == sess->source->ssrc) {
1126       rtp_source_process_rb (source, fractionlost, packetslost,
1127           exthighestseq, jitter, lsr, dlsr);
1128     }
1129   }
1130 }
1131
1132 /* FIXME, we're just printing this for now... */
1133 static void
1134 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1135     RTPArrivalStats * arrival)
1136 {
1137   guint items, i, j;
1138   gboolean more_items, more_entries;
1139
1140   items = gst_rtcp_packet_sdes_get_item_count (packet);
1141   GST_DEBUG ("got SDES packet with %d items", items);
1142
1143   more_items = gst_rtcp_packet_sdes_first_item (packet);
1144   i = 0;
1145   while (more_items) {
1146     guint32 ssrc;
1147
1148     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1149
1150     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1151
1152     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1153     j = 0;
1154     while (more_entries) {
1155       GstRTCPSDESType type;
1156       guint8 len;
1157       guint8 *data;
1158
1159       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1160
1161       GST_DEBUG ("entry %d, type %d, len %d, data %s", j, type, len, data);
1162
1163       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1164       j++;
1165     }
1166     more_items = gst_rtcp_packet_sdes_next_item (packet);
1167     i++;
1168   }
1169 }
1170
1171 /* BYE is sent when a client leaves the session
1172  */
1173 static void
1174 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1175     RTPArrivalStats * arrival)
1176 {
1177   guint count, i;
1178   gchar *reason;
1179
1180   reason = gst_rtcp_packet_bye_get_reason (packet);
1181   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1182
1183   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1184   for (i = 0; i < count; i++) {
1185     guint32 ssrc;
1186     RTPSource *source;
1187     gboolean created, prevactive, prevsender;
1188     guint pmembers, members;
1189
1190     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1191     GST_DEBUG ("SSRC: %08x", ssrc);
1192
1193     /* find src and mark bye, no probation when dealing with RTCP */
1194     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1195
1196     /* store time for when we need to time out this source */
1197     source->bye_time = arrival->time;
1198
1199     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1200     prevsender = RTP_SOURCE_IS_SENDER (source);
1201
1202     /* let the source handle the rest */
1203     rtp_source_process_bye (source, reason);
1204
1205     pmembers = sess->stats.active_sources;
1206
1207     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1208       sess->stats.active_sources--;
1209       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1210           sess->stats.active_sources);
1211     }
1212     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1213       sess->stats.sender_sources--;
1214       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1215           sess->stats.sender_sources);
1216     }
1217     members = sess->stats.active_sources;
1218
1219     if (!sess->source->received_bye && members < pmembers) {
1220       /* some members went away since the previous timeout estimate. 
1221        * Perform reverse reconsideration but only when we are not scheduling a
1222        * BYE ourselves. */
1223       if (arrival->time < sess->next_rtcp_check_time) {
1224         GstClockTime time_remaining;
1225
1226         time_remaining = sess->next_rtcp_check_time - arrival->time;
1227         sess->next_rtcp_check_time =
1228             gst_util_uint64_scale (time_remaining, members, pmembers);
1229
1230         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1231             GST_TIME_ARGS (sess->next_rtcp_check_time));
1232
1233         sess->next_rtcp_check_time += arrival->time;
1234
1235         /* notify app of reconsideration */
1236         if (sess->callbacks.reconsider)
1237           sess->callbacks.reconsider (sess, sess->user_data);
1238       }
1239     }
1240
1241     if (created)
1242       on_new_ssrc (sess, source);
1243
1244     on_bye_ssrc (sess, source);
1245   }
1246   g_free (reason);
1247 }
1248
1249 static void
1250 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1251     RTPArrivalStats * arrival)
1252 {
1253   GST_DEBUG ("received APP");
1254 }
1255
1256 /**
1257  * rtp_session_process_rtcp:
1258  * @sess: and #RTPSession
1259  * @buffer: an RTCP buffer
1260  *
1261  * Process an RTCP buffer in the session manager.
1262  *
1263  * Returns: a #GstFlowReturn.
1264  */
1265 GstFlowReturn
1266 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
1267 {
1268   GstRTCPPacket packet;
1269   gboolean more, is_bye = FALSE;
1270   RTPArrivalStats arrival;
1271
1272   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1273   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1274
1275   if (!gst_rtcp_buffer_validate (buffer))
1276     goto invalid_packet;
1277
1278   GST_DEBUG ("received RTCP packet");
1279
1280   RTP_SESSION_LOCK (sess);
1281   /* update arrival stats */
1282   update_arrival_stats (sess, &arrival, FALSE, buffer);
1283
1284   if (sess->sent_bye)
1285     goto ignore;
1286
1287   /* start processing the compound packet */
1288   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1289   while (more) {
1290     GstRTCPType type;
1291
1292     type = gst_rtcp_packet_get_type (&packet);
1293
1294     /* when we are leaving the session, we should ignore all non-BYE messages */
1295     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1296       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1297       goto next;
1298     }
1299
1300     switch (type) {
1301       case GST_RTCP_TYPE_SR:
1302         rtp_session_process_sr (sess, &packet, &arrival);
1303         break;
1304       case GST_RTCP_TYPE_RR:
1305         rtp_session_process_rr (sess, &packet, &arrival);
1306         break;
1307       case GST_RTCP_TYPE_SDES:
1308         rtp_session_process_sdes (sess, &packet, &arrival);
1309         break;
1310       case GST_RTCP_TYPE_BYE:
1311         is_bye = TRUE;
1312         rtp_session_process_bye (sess, &packet, &arrival);
1313         break;
1314       case GST_RTCP_TYPE_APP:
1315         rtp_session_process_app (sess, &packet, &arrival);
1316         break;
1317       default:
1318         GST_WARNING ("got unknown RTCP packet");
1319         break;
1320     }
1321   next:
1322     more = gst_rtcp_packet_move_to_next (&packet);
1323   }
1324
1325   /* if we are scheduling a BYE, we only want to count bye packets, else we
1326    * count everything */
1327   if (sess->source->received_bye) {
1328     if (is_bye) {
1329       sess->stats.bye_members++;
1330       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1331     }
1332   } else {
1333     /* keep track of average packet size */
1334     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1335   }
1336   RTP_SESSION_UNLOCK (sess);
1337
1338   gst_buffer_unref (buffer);
1339
1340   return GST_FLOW_OK;
1341
1342   /* ERRORS */
1343 invalid_packet:
1344   {
1345     GST_DEBUG ("invalid RTCP packet received");
1346     return GST_FLOW_OK;
1347   }
1348 ignore:
1349   {
1350     gst_buffer_unref (buffer);
1351     RTP_SESSION_UNLOCK (sess);
1352     GST_DEBUG ("ignoring RTP packet because we left");
1353     return GST_FLOW_OK;
1354   }
1355 }
1356
1357 /**
1358  * rtp_session_send_rtp:
1359  * @sess: an #RTPSession
1360  * @buffer: an RTP buffer
1361  *
1362  * Send the RTP buffer in the session manager.
1363  *
1364  * Returns: a #GstFlowReturn.
1365  */
1366 GstFlowReturn
1367 rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
1368 {
1369   GstFlowReturn result;
1370   RTPSource *source;
1371   gboolean prevsender;
1372
1373   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1374   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1375
1376   RTP_SESSION_LOCK (sess);
1377   source = sess->source;
1378
1379   prevsender = RTP_SOURCE_IS_SENDER (source);
1380
1381   /* we use our own source to send */
1382   result = rtp_source_send_rtp (sess->source, buffer);
1383
1384   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
1385     sess->stats.sender_sources++;
1386   RTP_SESSION_UNLOCK (sess);
1387
1388   return result;
1389 }
1390
1391 static GstClockTime
1392 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
1393     gboolean first)
1394 {
1395   GstClockTime result;
1396
1397   if (sess->source->received_bye) {
1398     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
1399         RTP_SOURCE_IS_SENDER (sess->source), first);
1400   } else {
1401     result = rtp_stats_calculate_bye_interval (&sess->stats);
1402   }
1403
1404   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT,
1405       GST_TIME_ARGS (result));
1406
1407   if (!deterministic)
1408     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
1409
1410   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1411
1412   return result;
1413 }
1414
1415 /**
1416  * rtp_session_send_bye:
1417  * @sess: an #RTPSession
1418  * @reason: a reason or NULL
1419  *
1420  * Stop the current @sess and schedule a BYE message for the other members.
1421  *
1422  * Returns: a #GstFlowReturn.
1423  */
1424 GstFlowReturn
1425 rtp_session_send_bye (RTPSession * sess, const gchar * reason)
1426 {
1427   GstFlowReturn result = GST_FLOW_OK;
1428   RTPSource *source;
1429   GstClockTime current, interval;
1430
1431   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1432
1433   RTP_SESSION_LOCK (sess);
1434   source = sess->source;
1435
1436   /* ignore more BYEs */
1437   if (source->received_bye)
1438     goto done;
1439
1440   /* we have BYE now */
1441   source->received_bye = TRUE;
1442   /* at least one member wants to send a BYE */
1443   sess->bye_reason = g_strdup (reason);
1444   sess->stats.avg_rtcp_packet_size = 100;
1445   sess->stats.bye_members = 1;
1446   sess->first_rtcp = TRUE;
1447   sess->sent_bye = FALSE;
1448
1449   /* get current time */
1450   if (sess->callbacks.get_time)
1451     current = sess->callbacks.get_time (sess, sess->user_data);
1452   else
1453     current = 0;
1454
1455   /* reschedule transmission */
1456   sess->last_rtcp_send_time = current;
1457   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
1458   sess->next_rtcp_check_time = current + interval;
1459
1460   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
1461       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
1462
1463   /* notify app of reconsideration */
1464   if (sess->callbacks.reconsider)
1465     sess->callbacks.reconsider (sess, sess->user_data);
1466 done:
1467   RTP_SESSION_UNLOCK (sess);
1468
1469   return result;
1470 }
1471
1472 /**
1473  * rtp_session_next_timeout:
1474  * @sess: an #RTPSession
1475  * @time: the current time
1476  *
1477  * Get the next time we should perform session maintenance tasks.
1478  *
1479  * Returns: a time when rtp_session_on_timeout() should be called with the
1480  * current time.
1481  */
1482 GstClockTime
1483 rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
1484 {
1485   GstClockTime result;
1486
1487   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1488
1489   RTP_SESSION_LOCK (sess);
1490
1491   result = sess->next_rtcp_check_time;
1492
1493   if (sess->source->received_bye) {
1494     if (sess->sent_bye)
1495       result = GST_CLOCK_TIME_NONE;
1496     else if (sess->stats.active_sources >= 50)
1497       /* reconsider BYE if members >= 50 */
1498       result = time + calculate_rtcp_interval (sess, FALSE, TRUE);;
1499   } else {
1500     if (sess->first_rtcp)
1501       /* we are called for the first time */
1502       result = time + calculate_rtcp_interval (sess, FALSE, TRUE);
1503     else if (sess->next_rtcp_check_time < time)
1504       /* get a new timeout when we need to */
1505       result = time + calculate_rtcp_interval (sess, FALSE, FALSE);
1506   }
1507   sess->next_rtcp_check_time = result;
1508
1509   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1510   RTP_SESSION_UNLOCK (sess);
1511
1512   return result;
1513 }
1514
1515 typedef struct
1516 {
1517   RTPSession *sess;
1518   GstBuffer *rtcp;
1519   GstClockTime time;
1520   GstClockTime interval;
1521   GstRTCPPacket packet;
1522   gboolean is_bye;
1523   gboolean has_sdes;
1524 } ReportData;
1525
1526 static void
1527 session_start_rtcp (RTPSession * sess, ReportData * data)
1528 {
1529   GstRTCPPacket *packet = &data->packet;
1530   RTPSource *own = sess->source;
1531
1532   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
1533
1534   if (RTP_SOURCE_IS_SENDER (own)) {
1535     /* we are a sender, create SR */
1536     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
1537     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
1538
1539     /* fill in sender report info, FIXME NTP and RTP timestamps missing */
1540     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
1541         0, 0, own->stats.packets_sent, own->stats.octets_sent);
1542   } else {
1543     /* we are only receiver, create RR */
1544     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
1545     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
1546     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
1547   }
1548 }
1549
1550 /* construct a Sender or Receiver Report */
1551 static void
1552 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
1553 {
1554   RTPSession *sess = data->sess;
1555   GstRTCPPacket *packet = &data->packet;
1556
1557   /* create a new buffer if needed */
1558   if (data->rtcp == NULL) {
1559     session_start_rtcp (sess, data);
1560   }
1561   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
1562     /* only report about other sender sources */
1563     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
1564       RTPSourceStats *stats;
1565       guint64 extended_max, expected;
1566       guint64 expected_interval, received_interval, ntptime;
1567       gint64 lost, lost_interval;
1568       guint32 fraction, LSR, DLSR;
1569       GstClockTime time;
1570
1571       stats = &source->stats;
1572
1573       extended_max = stats->cycles + stats->max_seq;
1574       expected = extended_max - stats->base_seq + 1;
1575
1576       GST_DEBUG ("ext_max %d, expected %d, received %d, base_seq %d",
1577           extended_max, expected, stats->packets_received, stats->base_seq);
1578
1579       lost = expected - stats->packets_received;
1580       lost = CLAMP (lost, -0x800000, 0x7fffff);
1581
1582       expected_interval = expected - stats->prev_expected;
1583       stats->prev_expected = expected;
1584       received_interval = stats->packets_received - stats->prev_received;
1585       stats->prev_received = stats->packets_received;
1586
1587       lost_interval = expected_interval - received_interval;
1588
1589       if (expected_interval == 0 || lost_interval <= 0)
1590         fraction = 0;
1591       else
1592         fraction = (lost_interval << 8) / expected_interval;
1593
1594       GST_DEBUG ("add RR for SSRC %08x", source->ssrc);
1595       /* we scaled the jitter up for additional precision */
1596       GST_DEBUG ("fraction %d, lost %d, extseq %u, jitter %d", fraction, lost,
1597           extended_max, stats->jitter >> 4);
1598
1599       if (rtp_source_get_last_sr (source, &ntptime, NULL, NULL, NULL, &time)) {
1600         /* LSR is middle bits of the last ntptime */
1601         LSR = (ntptime >> 16) & 0xffffffff;
1602         /* DLSR, delay since last SR is expressed in 1/65536 second units */
1603         DLSR = gst_util_uint64_scale_int (data->time - time, 65536, GST_SECOND);
1604       } else {
1605         /* No valid SR received, LSR/DLSR are set to 0 then */
1606         LSR = 0;
1607         DLSR = 0;
1608       }
1609       GST_DEBUG ("LSR %08x, DLSR %08x", LSR, DLSR);
1610
1611       /* packet is not yet filled, add report block for this source. */
1612       gst_rtcp_packet_add_rb (packet, source->ssrc, fraction, lost,
1613           extended_max, stats->jitter >> 4, LSR, DLSR);
1614     }
1615   }
1616 }
1617
1618 /* perform cleanup of sources that timed out */
1619 static gboolean
1620 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
1621 {
1622   gboolean remove = FALSE;
1623   gboolean byetimeout = FALSE;
1624   gboolean is_sender, is_active;
1625   RTPSession *sess = data->sess;
1626   GstClockTime interval;
1627
1628   is_sender = RTP_SOURCE_IS_SENDER (source);
1629   is_active = RTP_SOURCE_IS_ACTIVE (source);
1630
1631   /* check for our own source, we don't want to delete our own source. */
1632   if (!(source == sess->source)) {
1633     if (source->received_bye) {
1634       /* if we received a BYE from the source, remove the source after some
1635        * time. */
1636       if (data->time > source->bye_time &&
1637           data->time - source->bye_time > sess->stats.bye_timeout) {
1638         GST_DEBUG ("removing BYE source %08x", source->ssrc);
1639         remove = TRUE;
1640         byetimeout = TRUE;
1641       }
1642     }
1643     /* sources that were inactive for more than 5 times the deterministic reporting
1644      * interval get timed out. the min timeout is 5 seconds. */
1645     if (data->time > source->last_activity) {
1646       interval = MAX (data->interval * 5, 5 * GST_SECOND);
1647       if (data->time - source->last_activity > interval) {
1648         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
1649             source->ssrc, GST_TIME_ARGS (source->last_activity));
1650         remove = TRUE;
1651       }
1652     }
1653   }
1654
1655   /* senders that did not send for a long time become a receiver, this also
1656    * holds for our own source. */
1657   if (is_sender) {
1658     if (data->time > source->last_rtp_activity) {
1659       interval = MAX (data->interval * 2, 5 * GST_SECOND);
1660
1661       if (data->time - source->last_rtp_activity > interval) {
1662         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
1663             GST_TIME_FORMAT, source->ssrc,
1664             GST_TIME_ARGS (source->last_rtp_activity));
1665         source->is_sender = FALSE;
1666         sess->stats.sender_sources--;
1667       }
1668     }
1669   }
1670
1671   if (remove) {
1672     sess->total_sources--;
1673     if (is_sender)
1674       sess->stats.sender_sources--;
1675     if (is_active)
1676       sess->stats.active_sources--;
1677
1678     if (byetimeout)
1679       on_bye_timeout (sess, source);
1680     else
1681       on_timeout (sess, source);
1682
1683   }
1684   return remove;
1685 }
1686
1687 static void
1688 session_sdes (RTPSession * sess, ReportData * data)
1689 {
1690   GstRTCPPacket *packet = &data->packet;
1691
1692   /* add SDES packet */
1693   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
1694
1695   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
1696   gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME,
1697       strlen (sess->cname), (guint8 *) sess->cname);
1698
1699   /* other SDES items must only be added at regular intervals and only when the
1700    * user requests to since it might be a privacy problem */
1701 #if 0
1702   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME,
1703       strlen (sess->name), (guint8 *) sess->name);
1704   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
1705       strlen (sess->tool), (guint8 *) sess->tool);
1706 #endif
1707
1708   data->has_sdes = TRUE;
1709 }
1710
1711 /* schedule a BYE packet */
1712 static void
1713 session_bye (RTPSession * sess, ReportData * data)
1714 {
1715   GstRTCPPacket *packet = &data->packet;
1716
1717   /* open packet */
1718   session_start_rtcp (sess, data);
1719
1720   /* add SDES */
1721   session_sdes (sess, data);
1722
1723   /* add a BYE packet */
1724   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
1725   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
1726   if (sess->bye_reason)
1727     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
1728
1729   /* we have a BYE packet now */
1730   data->is_bye = TRUE;
1731 }
1732
1733 static gboolean
1734 is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
1735 {
1736   GstClockTime new_send_time;
1737   gboolean result;
1738
1739   /* no need to check yet */
1740   if (sess->next_rtcp_check_time > time) {
1741     GST_DEBUG ("no check time yet");
1742     return FALSE;
1743   }
1744
1745   /* perform forward reconsideration */
1746   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
1747
1748   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT,
1749       GST_TIME_ARGS (new_send_time));
1750
1751   new_send_time += sess->last_rtcp_send_time;
1752
1753   /* check if reconsideration */
1754   if (time < new_send_time) {
1755     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
1756         GST_TIME_ARGS (new_send_time));
1757     result = FALSE;
1758     /* store new check time */
1759     sess->next_rtcp_check_time = new_send_time;
1760   } else {
1761     result = TRUE;
1762     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
1763
1764     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
1765         GST_TIME_ARGS (new_send_time));
1766     sess->next_rtcp_check_time = time + new_send_time;
1767   }
1768   return result;
1769 }
1770
1771 /**
1772  * rtp_session_on_timeout:
1773  * @sess: an #RTPSession
1774  *
1775  * Perform maintenance actions after the timeout obtained with
1776  * rtp_session_next_timeout() expired.
1777  *
1778  * This function will perform timeouts of receivers and senders, send a BYE
1779  * packet or generate RTCP packets with current session stats.
1780  *
1781  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
1782  * times, for each packet that should be processed.
1783  *
1784  * Returns: a #GstFlowReturn.
1785  */
1786 GstFlowReturn
1787 rtp_session_on_timeout (RTPSession * sess, GstClockTime time)
1788 {
1789   GstFlowReturn result = GST_FLOW_OK;
1790   ReportData data;
1791
1792   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1793
1794   data.sess = sess;
1795   data.rtcp = NULL;
1796   data.time = time;
1797   data.is_bye = FALSE;
1798   data.has_sdes = FALSE;
1799
1800   GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
1801
1802   RTP_SESSION_LOCK (sess);
1803   /* get a new interval, we need this for various cleanups etc */
1804   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
1805
1806   /* first perform cleanups */
1807   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
1808       (GHRFunc) session_cleanup, &data);
1809
1810   /* see if we need to generate SR or RR packets */
1811   if (is_rtcp_time (sess, time, &data)) {
1812     if (sess->source->received_bye) {
1813       /* generate BYE instead */
1814       session_bye (sess, &data);
1815       sess->sent_bye = TRUE;
1816     } else {
1817       /* loop over all known sources and do something */
1818       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1819           (GHFunc) session_report_blocks, &data);
1820     }
1821   }
1822
1823   if (data.rtcp) {
1824     guint size;
1825
1826     /* we keep track of the last report time in order to timeout inactive
1827      * receivers or senders */
1828     sess->last_rtcp_send_time = data.time;
1829     sess->first_rtcp = FALSE;
1830
1831     /* add SDES for this source when not already added */
1832     if (!data.has_sdes)
1833       session_sdes (sess, &data);
1834
1835     /* update average RTCP size before sending */
1836     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
1837     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
1838   }
1839   RTP_SESSION_UNLOCK (sess);
1840
1841   /* push out the RTCP packet */
1842   if (data.rtcp) {
1843     /* close the RTCP packet */
1844     gst_rtcp_buffer_end (data.rtcp);
1845
1846     if (sess->callbacks.send_rtcp)
1847       result = sess->callbacks.send_rtcp (sess, sess->source, data.rtcp,
1848           sess->user_data);
1849     else
1850       gst_buffer_unref (data.rtcp);
1851   }
1852
1853   return result;
1854 }