gst/rtpmanager/async_jitter_queue.c: Fix the case where the buffer underruns and...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24 #include <gst/netbuffer/gstnetbuffer.h>
25
26 #include "rtpsession.h"
27
28 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
29 #define GST_CAT_DEFAULT rtp_session_debug
30
31 /* signals and args */
32 enum
33 {
34   SIGNAL_ON_NEW_SSRC,
35   SIGNAL_ON_SSRC_COLLISION,
36   SIGNAL_ON_SSRC_VALIDATED,
37   SIGNAL_ON_BYE_SSRC,
38   SIGNAL_ON_BYE_TIMEOUT,
39   SIGNAL_ON_TIMEOUT,
40   LAST_SIGNAL
41 };
42
43 #define RTP_DEFAULT_BANDWIDTH        64000.0
44 #define RTP_DEFAULT_RTCP_BANDWIDTH   1000
45
46 enum
47 {
48   PROP_0
49 };
50
51 /* update average packet size, we keep this scaled by 16 to keep enough
52  * precision. */
53 #define UPDATE_AVG(avg, val)            \
54   if ((avg) == 0)                       \
55    (avg) = (val) << 4;                  \
56   else                                  \
57    (avg) = ((val) + (15 * (avg))) >> 4;
58
59 /* GObject vmethods */
60 static void rtp_session_finalize (GObject * object);
61 static void rtp_session_set_property (GObject * object, guint prop_id,
62     const GValue * value, GParamSpec * pspec);
63 static void rtp_session_get_property (GObject * object, guint prop_id,
64     GValue * value, GParamSpec * pspec);
65
66 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
67
68 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
69
70 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
71     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
72
73 static void
74 rtp_session_class_init (RTPSessionClass * klass)
75 {
76   GObjectClass *gobject_class;
77
78   gobject_class = (GObjectClass *) klass;
79
80   gobject_class->finalize = rtp_session_finalize;
81   gobject_class->set_property = rtp_session_set_property;
82   gobject_class->get_property = rtp_session_get_property;
83
84   /**
85    * RTPSession::on-new-ssrc:
86    * @session: the object which received the signal
87    * @src: the new RTPSource
88    *
89    * Notify of a new SSRC that entered @session.
90    */
91   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
92       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
93       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
94       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
95       G_TYPE_OBJECT);
96   /**
97    * RTPSession::on-ssrc_collision:
98    * @session: the object which received the signal
99    * @src: the #RTPSource that caused a collision
100    *
101    * Notify when we have an SSRC collision
102    */
103   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
104       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
105       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
106       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
107       G_TYPE_OBJECT);
108   /**
109    * RTPSession::on-ssrc_validated:
110    * @session: the object which received the signal
111    * @src: the new validated RTPSource
112    *
113    * Notify of a new SSRC that became validated.
114    */
115   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
116       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
117       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
118       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
119       G_TYPE_OBJECT);
120   /**
121    * RTPSession::on-bye-ssrc:
122    * @session: the object which received the signal
123    * @src: the RTPSource that went away
124    *
125    * Notify of an SSRC that became inactive because of a BYE packet.
126    */
127   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
128       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
129       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
130       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
131       G_TYPE_OBJECT);
132   /**
133    * RTPSession::on-bye-timeout:
134    * @session: the object which received the signal
135    * @src: the RTPSource that timed out
136    *
137    * Notify of an SSRC that has timed out because of BYE
138    */
139   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
140       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
141       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
142       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
143       G_TYPE_OBJECT);
144   /**
145    * RTPSession::on-timeout:
146    * @session: the object which received the signal
147    * @src: the RTPSource that timed out
148    *
149    * Notify of an SSRC that has timed out
150    */
151   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
152       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
153       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
154       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
155       G_TYPE_OBJECT);
156
157   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
158 }
159
160 static void
161 rtp_session_init (RTPSession * sess)
162 {
163   gint i;
164
165   sess->lock = g_mutex_new ();
166   sess->key = g_random_int ();
167   sess->mask_idx = 0;
168   sess->mask = 0;
169
170   for (i = 0; i < 32; i++) {
171     sess->ssrcs[i] =
172         g_hash_table_new_full (NULL, NULL, NULL,
173         (GDestroyNotify) g_object_unref);
174   }
175   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
176
177   rtp_stats_init_defaults (&sess->stats);
178
179   /* create an active SSRC for this session manager */
180   sess->source = rtp_session_create_source (sess);
181   sess->source->validated = TRUE;
182   sess->stats.active_sources++;
183
184   /* default UDP header length */
185   sess->header_len = 28;
186   sess->mtu = 1400;
187
188   /* some default SDES entries */
189   //sess->cname = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
190   sess->cname = g_strdup_printf ("foo@%s", g_get_host_name ());
191   sess->name = g_strdup (g_get_real_name ());
192   sess->tool = g_strdup ("GStreamer");
193
194   sess->first_rtcp = TRUE;
195
196   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
197 }
198
199 static void
200 rtp_session_finalize (GObject * object)
201 {
202   RTPSession *sess;
203   gint i;
204
205   sess = RTP_SESSION_CAST (object);
206
207   g_mutex_free (sess->lock);
208   for (i = 0; i < 32; i++)
209     g_hash_table_destroy (sess->ssrcs[i]);
210
211   g_hash_table_destroy (sess->cnames);
212   g_object_unref (sess->source);
213
214   g_free (sess->cname);
215   g_free (sess->tool);
216   g_free (sess->bye_reason);
217
218   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
219 }
220
221 static void
222 rtp_session_set_property (GObject * object, guint prop_id,
223     const GValue * value, GParamSpec * pspec)
224 {
225   RTPSession *sess;
226
227   sess = RTP_SESSION (object);
228
229   switch (prop_id) {
230     default:
231       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
232       break;
233   }
234 }
235
236 static void
237 rtp_session_get_property (GObject * object, guint prop_id,
238     GValue * value, GParamSpec * pspec)
239 {
240   RTPSession *sess;
241
242   sess = RTP_SESSION (object);
243
244   switch (prop_id) {
245     default:
246       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
247       break;
248   }
249 }
250
251 static void
252 on_new_ssrc (RTPSession * sess, RTPSource * source)
253 {
254   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
255 }
256
257 static void
258 on_ssrc_collision (RTPSession * sess, RTPSource * source)
259 {
260   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
261       source);
262 }
263
264 static void
265 on_ssrc_validated (RTPSession * sess, RTPSource * source)
266 {
267   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
268       source);
269 }
270
271 static void
272 on_bye_ssrc (RTPSession * sess, RTPSource * source)
273 {
274   /* notify app that reconsideration should be performed */
275   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
276 }
277
278 static void
279 on_bye_timeout (RTPSession * sess, RTPSource * source)
280 {
281   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
282 }
283
284 static void
285 on_timeout (RTPSession * sess, RTPSource * source)
286 {
287   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
288 }
289
290 /**
291  * rtp_session_new:
292  *
293  * Create a new session object.
294  *
295  * Returns: a new #RTPSession. g_object_unref() after usage.
296  */
297 RTPSession *
298 rtp_session_new (void)
299 {
300   RTPSession *sess;
301
302   sess = g_object_new (RTP_TYPE_SESSION, NULL);
303
304   return sess;
305 }
306
307 /**
308  * rtp_session_set_callbacks:
309  * @sess: an #RTPSession
310  * @callbacks: callbacks to configure
311  * @user_data: user data passed in the callbacks
312  *
313  * Configure a set of callbacks to be notified of actions.
314  */
315 void
316 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
317     gpointer user_data)
318 {
319   g_return_if_fail (RTP_IS_SESSION (sess));
320
321   sess->callbacks.process_rtp = callbacks->process_rtp;
322   sess->callbacks.send_rtp = callbacks->send_rtp;
323   sess->callbacks.send_rtcp = callbacks->send_rtcp;
324   sess->callbacks.clock_rate = callbacks->clock_rate;
325   sess->callbacks.get_time = callbacks->get_time;
326   sess->callbacks.reconsider = callbacks->reconsider;
327   sess->user_data = user_data;
328 }
329
330 /**
331  * rtp_session_set_bandwidth:
332  * @sess: an #RTPSession
333  * @bandwidth: the bandwidth allocated
334  *
335  * Set the session bandwidth in bytes per second.
336  */
337 void
338 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
339 {
340   g_return_if_fail (RTP_IS_SESSION (sess));
341
342   sess->stats.bandwidth = bandwidth;
343 }
344
345 /**
346  * rtp_session_get_bandwidth:
347  * @sess: an #RTPSession
348  *
349  * Get the session bandwidth.
350  *
351  * Returns: the session bandwidth.
352  */
353 gdouble
354 rtp_session_get_bandwidth (RTPSession * sess)
355 {
356   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
357
358   return sess->stats.bandwidth;
359 }
360
361 /**
362  * rtp_session_set_rtcp_bandwidth:
363  * @sess: an #RTPSession
364  * @bandwidth: the RTCP bandwidth
365  *
366  * Set the bandwidth that should be used for RTCP
367  * messages. 
368  */
369 void
370 rtp_session_set_rtcp_bandwidth (RTPSession * sess, gdouble bandwidth)
371 {
372   g_return_if_fail (RTP_IS_SESSION (sess));
373
374   sess->stats.rtcp_bandwidth = bandwidth;
375 }
376
377 /**
378  * rtp_session_get_rtcp_bandwidth:
379  * @sess: an #RTPSession
380  *
381  * Get the session bandwidth used for RTCP.
382  *
383  * Returns: The bandwidth used for RTCP messages.
384  */
385 gdouble
386 rtp_session_get_rtcp_bandwidth (RTPSession * sess)
387 {
388   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
389
390   return sess->stats.rtcp_bandwidth;
391 }
392
393 /**
394  * rtp_session_set_cname:
395  * @sess: an #RTPSession
396  * @cname: a CNAME for the session
397  *
398  * Set the CNAME for the session. 
399  */
400 void
401 rtp_session_set_cname (RTPSession * sess, const gchar * cname)
402 {
403   g_return_if_fail (RTP_IS_SESSION (sess));
404
405   g_free (sess->cname);
406   sess->cname = g_strdup (cname);
407 }
408
409 /**
410  * rtp_session_get_cname:
411  * @sess: an #RTPSession
412  *
413  * Get the currently configured CNAME for the session.
414  *
415  * Returns: The CNAME. g_free after usage.
416  */
417 gchar *
418 rtp_session_get_cname (RTPSession * sess)
419 {
420   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
421
422   return g_strdup (sess->cname);
423 }
424
425 /**
426  * rtp_session_set_name:
427  * @sess: an #RTPSession
428  * @name: a NAME for the session
429  *
430  * Set the NAME for the session. 
431  */
432 void
433 rtp_session_set_name (RTPSession * sess, const gchar * name)
434 {
435   g_return_if_fail (RTP_IS_SESSION (sess));
436
437   g_free (sess->name);
438   sess->name = g_strdup (name);
439 }
440
441 /**
442  * rtp_session_get_name:
443  * @sess: an #RTPSession
444  *
445  * Get the currently configured NAME for the session.
446  *
447  * Returns: The NAME. g_free after usage.
448  */
449 gchar *
450 rtp_session_get_name (RTPSession * sess)
451 {
452   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
453
454   return g_strdup (sess->name);
455 }
456
457 /**
458  * rtp_session_set_email:
459  * @sess: an #RTPSession
460  * @email: an EMAIL for the session
461  *
462  * Set the EMAIL the session. 
463  */
464 void
465 rtp_session_set_email (RTPSession * sess, const gchar * email)
466 {
467   g_return_if_fail (RTP_IS_SESSION (sess));
468
469   g_free (sess->email);
470   sess->email = g_strdup (email);
471 }
472
473 /**
474  * rtp_session_get_email:
475  * @sess: an #RTPSession
476  *
477  * Get the currently configured EMAIL of the session.
478  *
479  * Returns: The EMAIL. g_free after usage.
480  */
481 gchar *
482 rtp_session_get_email (RTPSession * sess)
483 {
484   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
485
486   return g_strdup (sess->email);
487 }
488
489 /**
490  * rtp_session_set_phone:
491  * @sess: an #RTPSession
492  * @phone: a PHONE for the session
493  *
494  * Set the PHONE the session. 
495  */
496 void
497 rtp_session_set_phone (RTPSession * sess, const gchar * phone)
498 {
499   g_return_if_fail (RTP_IS_SESSION (sess));
500
501   g_free (sess->phone);
502   sess->phone = g_strdup (phone);
503 }
504
505 /**
506  * rtp_session_get_location:
507  * @sess: an #RTPSession
508  *
509  * Get the currently configured PHONE of the session.
510  *
511  * Returns: The PHONE. g_free after usage.
512  */
513 gchar *
514 rtp_session_get_phone (RTPSession * sess)
515 {
516   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
517
518   return g_strdup (sess->phone);
519 }
520
521 /**
522  * rtp_session_set_location:
523  * @sess: an #RTPSession
524  * @location: a LOCATION for the session
525  *
526  * Set the LOCATION the session. 
527  */
528 void
529 rtp_session_set_location (RTPSession * sess, const gchar * location)
530 {
531   g_return_if_fail (RTP_IS_SESSION (sess));
532
533   g_free (sess->location);
534   sess->location = g_strdup (location);
535 }
536
537 /**
538  * rtp_session_get_location:
539  * @sess: an #RTPSession
540  *
541  * Get the currently configured LOCATION of the session.
542  *
543  * Returns: The LOCATION. g_free after usage.
544  */
545 gchar *
546 rtp_session_get_location (RTPSession * sess)
547 {
548   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
549
550   return g_strdup (sess->location);
551 }
552
553 /**
554  * rtp_session_set_tool:
555  * @sess: an #RTPSession
556  * @tool: a TOOL for the session
557  *
558  * Set the TOOL the session. 
559  */
560 void
561 rtp_session_set_tool (RTPSession * sess, const gchar * tool)
562 {
563   g_return_if_fail (RTP_IS_SESSION (sess));
564
565   g_free (sess->tool);
566   sess->tool = g_strdup (tool);
567 }
568
569 /**
570  * rtp_session_get_tool:
571  * @sess: an #RTPSession
572  *
573  * Get the currently configured TOOL of the session.
574  *
575  * Returns: The TOOL. g_free after usage.
576  */
577 gchar *
578 rtp_session_get_tool (RTPSession * sess)
579 {
580   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
581
582   return g_strdup (sess->tool);
583 }
584
585 /**
586  * rtp_session_set_note:
587  * @sess: an #RTPSession
588  * @note: a NOTE for the session
589  *
590  * Set the NOTE the session. 
591  */
592 void
593 rtp_session_set_note (RTPSession * sess, const gchar * note)
594 {
595   g_return_if_fail (RTP_IS_SESSION (sess));
596
597   g_free (sess->note);
598   sess->note = g_strdup (note);
599 }
600
601 /**
602  * rtp_session_get_note:
603  * @sess: an #RTPSession
604  *
605  * Get the currently configured NOTE of the session.
606  *
607  * Returns: The NOTE. g_free after usage.
608  */
609 gchar *
610 rtp_session_get_note (RTPSession * sess)
611 {
612   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
613
614   return g_strdup (sess->note);
615 }
616
617 static GstFlowReturn
618 source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
619 {
620   GstFlowReturn result = GST_FLOW_OK;
621
622   if (source == session->source) {
623     GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
624
625     RTP_SESSION_UNLOCK (session);
626
627     if (session->callbacks.send_rtp)
628       result =
629           session->callbacks.send_rtp (session, source, buffer,
630           session->user_data);
631     else
632       gst_buffer_unref (buffer);
633
634   } else {
635     GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
636     RTP_SESSION_UNLOCK (session);
637
638     if (session->callbacks.process_rtp)
639       result =
640           session->callbacks.process_rtp (session, source, buffer,
641           session->user_data);
642     else
643       gst_buffer_unref (buffer);
644   }
645   RTP_SESSION_LOCK (session);
646
647   return result;
648 }
649
650 static gint
651 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
652 {
653   gint result;
654
655   if (session->callbacks.clock_rate)
656     result = session->callbacks.clock_rate (session, pt, session->user_data);
657   else
658     result = -1;
659
660   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
661
662   return result;
663 }
664
665 static RTPSourceCallbacks callbacks = {
666   (RTPSourcePushRTP) source_push_rtp,
667   (RTPSourceClockRate) source_clock_rate,
668 };
669
670 static gboolean
671 check_collision (RTPSession * sess, RTPSource * source,
672     RTPArrivalStats * arrival)
673 {
674   /* FIXME, do collision check */
675   return FALSE;
676 }
677
678 static RTPSource *
679 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
680     RTPArrivalStats * arrival, gboolean rtp)
681 {
682   RTPSource *source;
683
684   source =
685       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
686   if (source == NULL) {
687     /* make new Source in probation and insert */
688     source = rtp_source_new (ssrc);
689
690     if (rtp)
691       source->probation = RTP_DEFAULT_PROBATION;
692     else
693       source->probation = 0;
694
695     /* store from address, if any */
696     if (arrival->have_address) {
697       if (rtp)
698         rtp_source_set_rtp_from (source, &arrival->address);
699       else
700         rtp_source_set_rtcp_from (source, &arrival->address);
701     }
702
703     /* configure a callback on the source */
704     rtp_source_set_callbacks (source, &callbacks, sess);
705
706     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
707         source);
708
709     /* we have one more source now */
710     sess->total_sources++;
711     *created = TRUE;
712   } else {
713     *created = FALSE;
714     /* check for collision, this updates the address when not previously set */
715     if (check_collision (sess, source, arrival))
716       on_ssrc_collision (sess, source);
717   }
718   /* update last activity */
719   source->last_activity = arrival->time;
720   if (rtp)
721     source->last_rtp_activity = arrival->time;
722
723   return source;
724 }
725
726 /**
727  * rtp_session_add_source:
728  * @sess: a #RTPSession
729  * @src: #RTPSource to add
730  *
731  * Add @src to @session.
732  *
733  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
734  * existed in the session.
735  */
736 gboolean
737 rtp_session_add_source (RTPSession * sess, RTPSource * src)
738 {
739   gboolean result = FALSE;
740   RTPSource *find;
741
742   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
743   g_return_val_if_fail (src != NULL, FALSE);
744
745   RTP_SESSION_LOCK (sess);
746   find =
747       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
748       GINT_TO_POINTER (src->ssrc));
749   if (find == NULL) {
750     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
751         GINT_TO_POINTER (src->ssrc), src);
752     /* we have one more source now */
753     sess->total_sources++;
754     result = TRUE;
755   }
756   RTP_SESSION_UNLOCK (sess);
757
758   return result;
759 }
760
761 /**
762  * rtp_session_get_num_sources:
763  * @sess: an #RTPSession
764  *
765  * Get the number of sources in @sess.
766  *
767  * Returns: The number of sources in @sess.
768  */
769 guint
770 rtp_session_get_num_sources (RTPSession * sess)
771 {
772   guint result;
773
774   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
775
776   RTP_SESSION_LOCK (sess);
777   result = sess->total_sources;
778   RTP_SESSION_UNLOCK (sess);
779
780   return result;
781 }
782
783 /**
784  * rtp_session_get_num_active_sources:
785  * @sess: an #RTPSession
786  *
787  * Get the number of active sources in @sess. A source is considered active when
788  * it has been validated and has not yet received a BYE RTCP message.
789  *
790  * Returns: The number of active sources in @sess.
791  */
792 guint
793 rtp_session_get_num_active_sources (RTPSession * sess)
794 {
795   guint result;
796
797   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
798
799   RTP_SESSION_LOCK (sess);
800   result = sess->stats.active_sources;
801   RTP_SESSION_UNLOCK (sess);
802
803   return result;
804 }
805
806 /**
807  * rtp_session_get_source_by_ssrc:
808  * @sess: an #RTPSession
809  * @ssrc: an SSRC
810  *
811  * Find the source with @ssrc in @sess.
812  *
813  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
814  * g_object_unref() after usage.
815  */
816 RTPSource *
817 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
818 {
819   RTPSource *result;
820
821   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
822
823   RTP_SESSION_LOCK (sess);
824   result =
825       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
826   if (result)
827     g_object_ref (result);
828   RTP_SESSION_UNLOCK (sess);
829
830   return result;
831 }
832
833 /**
834  * rtp_session_get_source_by_cname:
835  * @sess: a #RTPSession
836  * @cname: an CNAME
837  *
838  * Find the source with @cname in @sess.
839  *
840  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
841  * g_object_unref() after usage.
842  */
843 RTPSource *
844 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
845 {
846   RTPSource *result;
847
848   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
849   g_return_val_if_fail (cname != NULL, NULL);
850
851   RTP_SESSION_LOCK (sess);
852   result = g_hash_table_lookup (sess->cnames, cname);
853   if (result)
854     g_object_ref (result);
855   RTP_SESSION_UNLOCK (sess);
856
857   return result;
858 }
859
860 /**
861  * rtp_session_create_source:
862  * @sess: an #RTPSession
863  *
864  * Create an #RTPSource for use in @sess. This function will create a source
865  * with an ssrc that is currently not used by any participants in the session.
866  *
867  * Returns: an #RTPSource.
868  */
869 RTPSource *
870 rtp_session_create_source (RTPSession * sess)
871 {
872   guint32 ssrc;
873   RTPSource *source;
874
875   RTP_SESSION_LOCK (sess);
876   while (TRUE) {
877     ssrc = g_random_int ();
878
879     /* see if it exists in the session, we're done if it doesn't */
880     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
881             GINT_TO_POINTER (ssrc)) == NULL)
882       break;
883   }
884   source = rtp_source_new (ssrc);
885   g_object_ref (source);
886   rtp_source_set_callbacks (source, &callbacks, sess);
887   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
888       source);
889   /* we have one more source now */
890   sess->total_sources++;
891   RTP_SESSION_UNLOCK (sess);
892
893   return source;
894 }
895
896 /* update the RTPArrivalStats structure with the current time and other bits
897  * about the current buffer we are handling.
898  * This function is typically called when a validated packet is received.
899  * This function should be called with the SESSION_LOCK
900  */
901 static void
902 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
903     gboolean rtp, GstBuffer * buffer)
904 {
905   /* get time or arrival */
906   if (sess->callbacks.get_time)
907     arrival->time = sess->callbacks.get_time (sess, sess->user_data);
908   else
909     arrival->time = GST_CLOCK_TIME_NONE;
910
911   /* get packet size including header overhead */
912   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
913
914   if (rtp) {
915     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
916   } else {
917     arrival->payload_len = 0;
918   }
919
920   /* for netbuffer we can store the IP address to check for collisions */
921   arrival->have_address = GST_IS_NETBUFFER (buffer);
922   if (arrival->have_address) {
923     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
924
925     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
926   }
927 }
928
929 /**
930  * rtp_session_process_rtp:
931  * @sess: and #RTPSession
932  * @buffer: an RTP buffer
933  *
934  * Process an RTP buffer in the session manager. This function takes ownership
935  * of @buffer.
936  *
937  * Returns: a #GstFlowReturn.
938  */
939 GstFlowReturn
940 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
941 {
942   GstFlowReturn result;
943   guint32 ssrc;
944   RTPSource *source;
945   gboolean created;
946   gboolean prevsender, prevactive;
947   RTPArrivalStats arrival;
948
949   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
950   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
951
952   if (!gst_rtp_buffer_validate (buffer))
953     goto invalid_packet;
954
955   RTP_SESSION_LOCK (sess);
956   /* update arrival stats */
957   update_arrival_stats (sess, &arrival, TRUE, buffer);
958
959   /* ignore more RTP packets when we left the session */
960   if (sess->source->received_bye)
961     goto ignore;
962
963   /* get SSRC and look up in session database */
964   ssrc = gst_rtp_buffer_get_ssrc (buffer);
965   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
966
967   prevsender = RTP_SOURCE_IS_SENDER (source);
968   prevactive = RTP_SOURCE_IS_ACTIVE (source);
969
970   /* we need to ref so that we can process the CSRCs later */
971   gst_buffer_ref (buffer);
972
973   /* let source process the packet */
974   result = rtp_source_process_rtp (source, buffer, &arrival);
975
976   /* source became active */
977   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
978     sess->stats.active_sources++;
979     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
980         sess->stats.active_sources);
981     on_ssrc_validated (sess, source);
982   }
983   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
984     sess->stats.sender_sources++;
985     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
986         sess->stats.sender_sources);
987   }
988
989   if (created)
990     on_new_ssrc (sess, source);
991
992   if (source->validated) {
993     guint8 i, count;
994     gboolean created;
995
996     /* for validated sources, we add the CSRCs as well */
997     count = gst_rtp_buffer_get_csrc_count (buffer);
998
999     for (i = 0; i < count; i++) {
1000       guint32 csrc;
1001       RTPSource *csrc_src;
1002
1003       csrc = gst_rtp_buffer_get_csrc (buffer, i);
1004
1005       /* get source */
1006       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1007
1008       if (created) {
1009         GST_DEBUG ("created new CSRC: %08x", csrc);
1010         rtp_source_set_as_csrc (csrc_src);
1011         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1012           sess->stats.active_sources++;
1013         on_new_ssrc (sess, source);
1014       }
1015     }
1016   }
1017   gst_buffer_unref (buffer);
1018
1019   RTP_SESSION_UNLOCK (sess);
1020
1021   return result;
1022
1023   /* ERRORS */
1024 invalid_packet:
1025   {
1026     gst_buffer_unref (buffer);
1027     GST_DEBUG ("invalid RTP packet received");
1028     return GST_FLOW_OK;
1029   }
1030 ignore:
1031   {
1032     gst_buffer_unref (buffer);
1033     RTP_SESSION_UNLOCK (sess);
1034     GST_DEBUG ("ignoring RTP packet because we are leaving");
1035     return GST_FLOW_OK;
1036   }
1037 }
1038
1039 /* A Sender report contains statistics about how the sender is doing. This
1040  * includes timing informataion about the relation between RTP and NTP
1041  * timestamps is it using and the number of packets/bytes it sent to us.
1042  *
1043  * In this report is also included a set of report blocks related to how this
1044  * sender is receiving data (in case we (or somebody else) is also sending stuff
1045  * to it). This info includes the packet loss, jitter and seqnum. It also
1046  * contains information to calculate the round trip time (LSR/DLSR).
1047  */
1048 static void
1049 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1050     RTPArrivalStats * arrival)
1051 {
1052   guint32 senderssrc, rtptime, packet_count, octet_count;
1053   guint64 ntptime;
1054   guint count, i;
1055   RTPSource *source;
1056   gboolean created, prevsender;
1057
1058   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1059       &packet_count, &octet_count);
1060
1061   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1062       senderssrc, GST_TIME_ARGS (arrival->time));
1063
1064   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1065
1066   prevsender = RTP_SOURCE_IS_SENDER (source);
1067
1068   /* first update the source */
1069   rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count,
1070       arrival->time);
1071
1072   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1073     sess->stats.sender_sources++;
1074     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1075         sess->stats.sender_sources);
1076   }
1077
1078   if (created)
1079     on_new_ssrc (sess, source);
1080
1081   count = gst_rtcp_packet_get_rb_count (packet);
1082   for (i = 0; i < count; i++) {
1083     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1084     guint8 fractionlost;
1085     gint32 packetslost;
1086
1087     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1088         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1089
1090     GST_DEBUG ("RB %d: %08x, %u", i, ssrc, jitter);
1091
1092     if (ssrc == sess->source->ssrc) {
1093       /* only deal with report blocks for our session, we update the stats of
1094        * the sender of the RTCP message. We could also compare our stats against
1095        * the other sender to see if we are better or worse. */
1096       rtp_source_process_rb (source, fractionlost, packetslost,
1097           exthighestseq, jitter, lsr, dlsr);
1098     }
1099   }
1100 }
1101
1102 /* A receiver report contains statistics about how a receiver is doing. It
1103  * includes stuff like packet loss, jitter and the seqnum it received last. It
1104  * also contains info to calculate the round trip time.
1105  *
1106  * We are only interested in how the sender of this report is doing wrt to us.
1107  */
1108 static void
1109 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1110     RTPArrivalStats * arrival)
1111 {
1112   guint32 senderssrc;
1113   guint count, i;
1114   RTPSource *source;
1115   gboolean created;
1116
1117   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1118
1119   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1120
1121   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1122
1123   if (created)
1124     on_new_ssrc (sess, source);
1125
1126   count = gst_rtcp_packet_get_rb_count (packet);
1127   for (i = 0; i < count; i++) {
1128     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1129     guint8 fractionlost;
1130     gint32 packetslost;
1131
1132     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1133         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1134
1135     if (ssrc == sess->source->ssrc) {
1136       rtp_source_process_rb (source, fractionlost, packetslost,
1137           exthighestseq, jitter, lsr, dlsr);
1138     }
1139   }
1140 }
1141
1142 /* FIXME, we're just printing this for now... */
1143 static void
1144 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1145     RTPArrivalStats * arrival)
1146 {
1147   guint items, i, j;
1148   gboolean more_items, more_entries;
1149
1150   items = gst_rtcp_packet_sdes_get_item_count (packet);
1151   GST_DEBUG ("got SDES packet with %d items", items);
1152
1153   more_items = gst_rtcp_packet_sdes_first_item (packet);
1154   i = 0;
1155   while (more_items) {
1156     guint32 ssrc;
1157
1158     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1159
1160     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1161
1162     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1163     j = 0;
1164     while (more_entries) {
1165       GstRTCPSDESType type;
1166       guint8 len;
1167       guint8 *data;
1168
1169       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1170
1171       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1172           data);
1173
1174       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1175       j++;
1176     }
1177     more_items = gst_rtcp_packet_sdes_next_item (packet);
1178     i++;
1179   }
1180 }
1181
1182 /* BYE is sent when a client leaves the session
1183  */
1184 static void
1185 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1186     RTPArrivalStats * arrival)
1187 {
1188   guint count, i;
1189   gchar *reason;
1190
1191   reason = gst_rtcp_packet_bye_get_reason (packet);
1192   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1193
1194   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1195   for (i = 0; i < count; i++) {
1196     guint32 ssrc;
1197     RTPSource *source;
1198     gboolean created, prevactive, prevsender;
1199     guint pmembers, members;
1200
1201     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1202     GST_DEBUG ("SSRC: %08x", ssrc);
1203
1204     /* find src and mark bye, no probation when dealing with RTCP */
1205     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1206
1207     /* store time for when we need to time out this source */
1208     source->bye_time = arrival->time;
1209
1210     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1211     prevsender = RTP_SOURCE_IS_SENDER (source);
1212
1213     /* let the source handle the rest */
1214     rtp_source_process_bye (source, reason);
1215
1216     pmembers = sess->stats.active_sources;
1217
1218     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
1219       sess->stats.active_sources--;
1220       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1221           sess->stats.active_sources);
1222     }
1223     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
1224       sess->stats.sender_sources--;
1225       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1226           sess->stats.sender_sources);
1227     }
1228     members = sess->stats.active_sources;
1229
1230     if (!sess->source->received_bye && members < pmembers) {
1231       /* some members went away since the previous timeout estimate. 
1232        * Perform reverse reconsideration but only when we are not scheduling a
1233        * BYE ourselves. */
1234       if (arrival->time < sess->next_rtcp_check_time) {
1235         GstClockTime time_remaining;
1236
1237         time_remaining = sess->next_rtcp_check_time - arrival->time;
1238         sess->next_rtcp_check_time =
1239             gst_util_uint64_scale (time_remaining, members, pmembers);
1240
1241         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
1242             GST_TIME_ARGS (sess->next_rtcp_check_time));
1243
1244         sess->next_rtcp_check_time += arrival->time;
1245
1246         /* notify app of reconsideration */
1247         if (sess->callbacks.reconsider)
1248           sess->callbacks.reconsider (sess, sess->user_data);
1249       }
1250     }
1251
1252     if (created)
1253       on_new_ssrc (sess, source);
1254
1255     on_bye_ssrc (sess, source);
1256   }
1257   g_free (reason);
1258 }
1259
1260 static void
1261 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
1262     RTPArrivalStats * arrival)
1263 {
1264   GST_DEBUG ("received APP");
1265 }
1266
1267 /**
1268  * rtp_session_process_rtcp:
1269  * @sess: and #RTPSession
1270  * @buffer: an RTCP buffer
1271  *
1272  * Process an RTCP buffer in the session manager.
1273  *
1274  * Returns: a #GstFlowReturn.
1275  */
1276 GstFlowReturn
1277 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
1278 {
1279   GstRTCPPacket packet;
1280   gboolean more, is_bye = FALSE;
1281   RTPArrivalStats arrival;
1282
1283   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1284   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1285
1286   if (!gst_rtcp_buffer_validate (buffer))
1287     goto invalid_packet;
1288
1289   GST_DEBUG ("received RTCP packet");
1290
1291   RTP_SESSION_LOCK (sess);
1292   /* update arrival stats */
1293   update_arrival_stats (sess, &arrival, FALSE, buffer);
1294
1295   if (sess->sent_bye)
1296     goto ignore;
1297
1298   /* start processing the compound packet */
1299   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
1300   while (more) {
1301     GstRTCPType type;
1302
1303     type = gst_rtcp_packet_get_type (&packet);
1304
1305     /* when we are leaving the session, we should ignore all non-BYE messages */
1306     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
1307       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
1308       goto next;
1309     }
1310
1311     switch (type) {
1312       case GST_RTCP_TYPE_SR:
1313         rtp_session_process_sr (sess, &packet, &arrival);
1314         break;
1315       case GST_RTCP_TYPE_RR:
1316         rtp_session_process_rr (sess, &packet, &arrival);
1317         break;
1318       case GST_RTCP_TYPE_SDES:
1319         rtp_session_process_sdes (sess, &packet, &arrival);
1320         break;
1321       case GST_RTCP_TYPE_BYE:
1322         is_bye = TRUE;
1323         rtp_session_process_bye (sess, &packet, &arrival);
1324         break;
1325       case GST_RTCP_TYPE_APP:
1326         rtp_session_process_app (sess, &packet, &arrival);
1327         break;
1328       default:
1329         GST_WARNING ("got unknown RTCP packet");
1330         break;
1331     }
1332   next:
1333     more = gst_rtcp_packet_move_to_next (&packet);
1334   }
1335
1336   /* if we are scheduling a BYE, we only want to count bye packets, else we
1337    * count everything */
1338   if (sess->source->received_bye) {
1339     if (is_bye) {
1340       sess->stats.bye_members++;
1341       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1342     }
1343   } else {
1344     /* keep track of average packet size */
1345     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
1346   }
1347   RTP_SESSION_UNLOCK (sess);
1348
1349   gst_buffer_unref (buffer);
1350
1351   return GST_FLOW_OK;
1352
1353   /* ERRORS */
1354 invalid_packet:
1355   {
1356     GST_DEBUG ("invalid RTCP packet received");
1357     return GST_FLOW_OK;
1358   }
1359 ignore:
1360   {
1361     gst_buffer_unref (buffer);
1362     RTP_SESSION_UNLOCK (sess);
1363     GST_DEBUG ("ignoring RTP packet because we left");
1364     return GST_FLOW_OK;
1365   }
1366 }
1367
1368 /**
1369  * rtp_session_send_rtp:
1370  * @sess: an #RTPSession
1371  * @buffer: an RTP buffer
1372  *
1373  * Send the RTP buffer in the session manager. This function takes ownership of
1374  * @buffer.
1375  *
1376  * Returns: a #GstFlowReturn.
1377  */
1378 GstFlowReturn
1379 rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
1380 {
1381   GstFlowReturn result;
1382   RTPSource *source;
1383   gboolean prevsender;
1384
1385   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1386   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1387
1388   if (!gst_rtp_buffer_validate (buffer))
1389     goto invalid_packet;
1390
1391   GST_DEBUG ("received RTP packet for sending");
1392
1393   RTP_SESSION_LOCK (sess);
1394   source = sess->source;
1395
1396   /* update last activity */
1397   if (sess->callbacks.get_time)
1398     source->last_rtp_activity =
1399         sess->callbacks.get_time (sess, sess->user_data);
1400
1401   prevsender = RTP_SOURCE_IS_SENDER (source);
1402
1403   /* we use our own source to send */
1404   result = rtp_source_send_rtp (sess->source, buffer);
1405
1406   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
1407     sess->stats.sender_sources++;
1408   RTP_SESSION_UNLOCK (sess);
1409
1410   return result;
1411
1412   /* ERRORS */
1413 invalid_packet:
1414   {
1415     gst_buffer_unref (buffer);
1416     GST_DEBUG ("invalid RTP packet received");
1417     return GST_FLOW_OK;
1418   }
1419 }
1420
1421 static GstClockTime
1422 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
1423     gboolean first)
1424 {
1425   GstClockTime result;
1426
1427   if (sess->source->received_bye) {
1428     result = rtp_stats_calculate_bye_interval (&sess->stats);
1429   } else {
1430     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
1431         RTP_SOURCE_IS_SENDER (sess->source), first);
1432   }
1433
1434   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
1435       GST_TIME_ARGS (result), first);
1436
1437   if (!deterministic)
1438     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
1439
1440   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1441
1442   return result;
1443 }
1444
1445 /**
1446  * rtp_session_send_bye:
1447  * @sess: an #RTPSession
1448  * @reason: a reason or NULL
1449  *
1450  * Stop the current @sess and schedule a BYE message for the other members.
1451  *
1452  * Returns: a #GstFlowReturn.
1453  */
1454 GstFlowReturn
1455 rtp_session_send_bye (RTPSession * sess, const gchar * reason)
1456 {
1457   GstFlowReturn result = GST_FLOW_OK;
1458   RTPSource *source;
1459   GstClockTime current, interval;
1460
1461   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1462
1463   RTP_SESSION_LOCK (sess);
1464   source = sess->source;
1465
1466   /* ignore more BYEs */
1467   if (source->received_bye)
1468     goto done;
1469
1470   /* we have BYE now */
1471   source->received_bye = TRUE;
1472   /* at least one member wants to send a BYE */
1473   sess->bye_reason = g_strdup (reason);
1474   sess->stats.avg_rtcp_packet_size = 100;
1475   sess->stats.bye_members = 1;
1476   sess->first_rtcp = TRUE;
1477   sess->sent_bye = FALSE;
1478
1479   /* get current time */
1480   if (sess->callbacks.get_time)
1481     current = sess->callbacks.get_time (sess, sess->user_data);
1482   else
1483     current = 0;
1484
1485   /* reschedule transmission */
1486   sess->last_rtcp_send_time = current;
1487   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
1488   sess->next_rtcp_check_time = current + interval;
1489
1490   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
1491       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
1492
1493   /* notify app of reconsideration */
1494   if (sess->callbacks.reconsider)
1495     sess->callbacks.reconsider (sess, sess->user_data);
1496 done:
1497   RTP_SESSION_UNLOCK (sess);
1498
1499   return result;
1500 }
1501
1502 /**
1503  * rtp_session_next_timeout:
1504  * @sess: an #RTPSession
1505  * @time: the current time
1506  *
1507  * Get the next time we should perform session maintenance tasks.
1508  *
1509  * Returns: a time when rtp_session_on_timeout() should be called with the
1510  * current time.
1511  */
1512 GstClockTime
1513 rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
1514 {
1515   GstClockTime result;
1516
1517   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1518
1519   RTP_SESSION_LOCK (sess);
1520
1521   result = sess->next_rtcp_check_time;
1522
1523   if (sess->source->received_bye) {
1524     if (sess->sent_bye)
1525       result = GST_CLOCK_TIME_NONE;
1526     else if (sess->stats.active_sources >= 50)
1527       /* reconsider BYE if members >= 50 */
1528       result = time + calculate_rtcp_interval (sess, FALSE, TRUE);
1529   } else {
1530     if (sess->first_rtcp)
1531       /* we are called for the first time */
1532       result = time + calculate_rtcp_interval (sess, FALSE, TRUE);
1533     else if (sess->next_rtcp_check_time < time)
1534       /* get a new timeout when we need to */
1535       result = time + calculate_rtcp_interval (sess, FALSE, FALSE);
1536   }
1537   sess->next_rtcp_check_time = result;
1538
1539   GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
1540   RTP_SESSION_UNLOCK (sess);
1541
1542   return result;
1543 }
1544
1545 typedef struct
1546 {
1547   RTPSession *sess;
1548   GstBuffer *rtcp;
1549   GstClockTime time;
1550   GstClockTime interval;
1551   GstRTCPPacket packet;
1552   gboolean is_bye;
1553   gboolean has_sdes;
1554 } ReportData;
1555
1556 static void
1557 session_start_rtcp (RTPSession * sess, ReportData * data)
1558 {
1559   GstRTCPPacket *packet = &data->packet;
1560   RTPSource *own = sess->source;
1561
1562   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
1563
1564   if (RTP_SOURCE_IS_SENDER (own)) {
1565     guint64 ntptime;
1566     guint32 rtptime;
1567
1568     /* we are a sender, create SR */
1569     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
1570     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
1571
1572     /* convert clock time to NTP time */
1573     ntptime = gst_util_uint64_scale (data->time, (1LL << 32), GST_SECOND);
1574     ntptime += (2208988800LL << 32);
1575
1576     rtptime = 0;
1577
1578     /* fill in sender report info, FIXME RTP timestamps missing */
1579     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
1580         ntptime, rtptime, own->stats.packets_sent, own->stats.octets_sent);
1581   } else {
1582     /* we are only receiver, create RR */
1583     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
1584     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
1585     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
1586   }
1587 }
1588
1589 /* construct a Sender or Receiver Report */
1590 static void
1591 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
1592 {
1593   RTPSession *sess = data->sess;
1594   GstRTCPPacket *packet = &data->packet;
1595
1596   /* create a new buffer if needed */
1597   if (data->rtcp == NULL) {
1598     session_start_rtcp (sess, data);
1599   }
1600   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
1601     /* only report about other sender sources */
1602     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
1603       RTPSourceStats *stats;
1604       guint64 extended_max, expected;
1605       guint64 expected_interval, received_interval, ntptime;
1606       gint64 lost, lost_interval;
1607       guint32 fraction, LSR, DLSR;
1608       GstClockTime time;
1609
1610       stats = &source->stats;
1611
1612       extended_max = stats->cycles + stats->max_seq;
1613       expected = extended_max - stats->base_seq + 1;
1614
1615       GST_DEBUG ("ext_max %d, expected %d, received %d, base_seq %d",
1616           extended_max, expected, stats->packets_received, stats->base_seq);
1617
1618       lost = expected - stats->packets_received;
1619       lost = CLAMP (lost, -0x800000, 0x7fffff);
1620
1621       expected_interval = expected - stats->prev_expected;
1622       stats->prev_expected = expected;
1623       received_interval = stats->packets_received - stats->prev_received;
1624       stats->prev_received = stats->packets_received;
1625
1626       lost_interval = expected_interval - received_interval;
1627
1628       if (expected_interval == 0 || lost_interval <= 0)
1629         fraction = 0;
1630       else
1631         fraction = (lost_interval << 8) / expected_interval;
1632
1633       GST_DEBUG ("add RR for SSRC %08x", source->ssrc);
1634       /* we scaled the jitter up for additional precision */
1635       GST_DEBUG ("fraction %d, lost %d, extseq %u, jitter %d", fraction, lost,
1636           extended_max, stats->jitter >> 4);
1637
1638       if (rtp_source_get_last_sr (source, &ntptime, NULL, NULL, NULL, &time)) {
1639         GstClockTime diff;
1640
1641         /* LSR is middle bits of the last ntptime */
1642         LSR = (ntptime >> 16) & 0xffffffff;
1643         diff = data->time - time;
1644         GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff));
1645         /* DLSR, delay since last SR is expressed in 1/65536 second units */
1646         DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND);
1647       } else {
1648         /* No valid SR received, LSR/DLSR are set to 0 then */
1649         LSR = 0;
1650         DLSR = 0;
1651       }
1652       GST_DEBUG ("LSR %08x, DLSR %08x", LSR, DLSR);
1653
1654       /* packet is not yet filled, add report block for this source. */
1655       gst_rtcp_packet_add_rb (packet, source->ssrc, fraction, lost,
1656           extended_max, stats->jitter >> 4, LSR, DLSR);
1657     }
1658   }
1659 }
1660
1661 /* perform cleanup of sources that timed out */
1662 static gboolean
1663 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
1664 {
1665   gboolean remove = FALSE;
1666   gboolean byetimeout = FALSE;
1667   gboolean is_sender, is_active;
1668   RTPSession *sess = data->sess;
1669   GstClockTime interval;
1670
1671   is_sender = RTP_SOURCE_IS_SENDER (source);
1672   is_active = RTP_SOURCE_IS_ACTIVE (source);
1673
1674   /* check for our own source, we don't want to delete our own source. */
1675   if (!(source == sess->source)) {
1676     if (source->received_bye) {
1677       /* if we received a BYE from the source, remove the source after some
1678        * time. */
1679       if (data->time > source->bye_time &&
1680           data->time - source->bye_time > sess->stats.bye_timeout) {
1681         GST_DEBUG ("removing BYE source %08x", source->ssrc);
1682         remove = TRUE;
1683         byetimeout = TRUE;
1684       }
1685     }
1686     /* sources that were inactive for more than 5 times the deterministic reporting
1687      * interval get timed out. the min timeout is 5 seconds. */
1688     if (data->time > source->last_activity) {
1689       interval = MAX (data->interval * 5, 5 * GST_SECOND);
1690       if (data->time - source->last_activity > interval) {
1691         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
1692             source->ssrc, GST_TIME_ARGS (source->last_activity));
1693         remove = TRUE;
1694       }
1695     }
1696   }
1697
1698   /* senders that did not send for a long time become a receiver, this also
1699    * holds for our own source. */
1700   if (is_sender) {
1701     if (data->time > source->last_rtp_activity) {
1702       interval = MAX (data->interval * 2, 5 * GST_SECOND);
1703
1704       if (data->time - source->last_rtp_activity > interval) {
1705         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
1706             GST_TIME_FORMAT, source->ssrc,
1707             GST_TIME_ARGS (source->last_rtp_activity));
1708         source->is_sender = FALSE;
1709         sess->stats.sender_sources--;
1710       }
1711     }
1712   }
1713
1714   if (remove) {
1715     sess->total_sources--;
1716     if (is_sender)
1717       sess->stats.sender_sources--;
1718     if (is_active)
1719       sess->stats.active_sources--;
1720
1721     if (byetimeout)
1722       on_bye_timeout (sess, source);
1723     else
1724       on_timeout (sess, source);
1725
1726   }
1727   return remove;
1728 }
1729
1730 static void
1731 session_sdes (RTPSession * sess, ReportData * data)
1732 {
1733   GstRTCPPacket *packet = &data->packet;
1734
1735   /* add SDES packet */
1736   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
1737
1738   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
1739   gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME,
1740       strlen (sess->cname), (guint8 *) sess->cname);
1741
1742   /* other SDES items must only be added at regular intervals and only when the
1743    * user requests to since it might be a privacy problem */
1744 #if 0
1745   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME,
1746       strlen (sess->name), (guint8 *) sess->name);
1747   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
1748       strlen (sess->tool), (guint8 *) sess->tool);
1749 #endif
1750
1751   data->has_sdes = TRUE;
1752 }
1753
1754 /* schedule a BYE packet */
1755 static void
1756 session_bye (RTPSession * sess, ReportData * data)
1757 {
1758   GstRTCPPacket *packet = &data->packet;
1759
1760   /* open packet */
1761   session_start_rtcp (sess, data);
1762
1763   /* add SDES */
1764   session_sdes (sess, data);
1765
1766   /* add a BYE packet */
1767   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
1768   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
1769   if (sess->bye_reason)
1770     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
1771
1772   /* we have a BYE packet now */
1773   data->is_bye = TRUE;
1774 }
1775
1776 static gboolean
1777 is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
1778 {
1779   GstClockTime new_send_time;
1780   gboolean result;
1781
1782   /* no need to check yet */
1783   if (sess->next_rtcp_check_time > time) {
1784     GST_DEBUG ("no check time yet");
1785     return FALSE;
1786   }
1787
1788   /* perform forward reconsideration */
1789   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
1790
1791   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT,
1792       GST_TIME_ARGS (new_send_time));
1793
1794   new_send_time += sess->last_rtcp_send_time;
1795
1796   /* check if reconsideration */
1797   if (time < new_send_time) {
1798     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
1799         GST_TIME_ARGS (new_send_time));
1800     result = FALSE;
1801     /* store new check time */
1802     sess->next_rtcp_check_time = new_send_time;
1803   } else {
1804     result = TRUE;
1805     new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
1806
1807     GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
1808         GST_TIME_ARGS (new_send_time));
1809     sess->next_rtcp_check_time = time + new_send_time;
1810   }
1811   return result;
1812 }
1813
1814 /**
1815  * rtp_session_on_timeout:
1816  * @sess: an #RTPSession
1817  *
1818  * Perform maintenance actions after the timeout obtained with
1819  * rtp_session_next_timeout() expired.
1820  *
1821  * This function will perform timeouts of receivers and senders, send a BYE
1822  * packet or generate RTCP packets with current session stats.
1823  *
1824  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
1825  * times, for each packet that should be processed.
1826  *
1827  * Returns: a #GstFlowReturn.
1828  */
1829 GstFlowReturn
1830 rtp_session_on_timeout (RTPSession * sess, GstClockTime time)
1831 {
1832   GstFlowReturn result = GST_FLOW_OK;
1833   ReportData data;
1834
1835   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1836
1837   data.sess = sess;
1838   data.rtcp = NULL;
1839   data.time = time;
1840   data.is_bye = FALSE;
1841   data.has_sdes = FALSE;
1842
1843   GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
1844
1845   RTP_SESSION_LOCK (sess);
1846   /* get a new interval, we need this for various cleanups etc */
1847   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
1848
1849   /* first perform cleanups */
1850   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
1851       (GHRFunc) session_cleanup, &data);
1852
1853   /* see if we need to generate SR or RR packets */
1854   if (is_rtcp_time (sess, time, &data)) {
1855     if (sess->source->received_bye) {
1856       /* generate BYE instead */
1857       session_bye (sess, &data);
1858       sess->sent_bye = TRUE;
1859     } else {
1860       /* loop over all known sources and do something */
1861       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1862           (GHFunc) session_report_blocks, &data);
1863     }
1864   }
1865
1866   if (data.rtcp) {
1867     guint size;
1868
1869     /* we keep track of the last report time in order to timeout inactive
1870      * receivers or senders */
1871     sess->last_rtcp_send_time = data.time;
1872     sess->first_rtcp = FALSE;
1873
1874     /* add SDES for this source when not already added */
1875     if (!data.has_sdes)
1876       session_sdes (sess, &data);
1877
1878     /* update average RTCP size before sending */
1879     size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
1880     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
1881   }
1882   RTP_SESSION_UNLOCK (sess);
1883
1884   /* push out the RTCP packet */
1885   if (data.rtcp) {
1886     /* close the RTCP packet */
1887     gst_rtcp_buffer_end (data.rtcp);
1888
1889     if (sess->callbacks.send_rtcp)
1890       result = sess->callbacks.send_rtcp (sess, sess->source, data.rtcp,
1891           sess->user_data);
1892     else
1893       gst_buffer_unref (data.rtcp);
1894   }
1895
1896   return result;
1897 }