48bb7b86bea9e3e24138e846fb5e947193479e77
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25
26 #include <gst/rtp/gstrtpbuffer.h>
27 #include <gst/rtp/gstrtcpbuffer.h>
28
29 #include <gst/glib-compat-private.h>
30
31 #include "rtpsession.h"
32
33 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
34 #define GST_CAT_DEFAULT rtp_session_debug
35
36 /* signals and args */
37 enum
38 {
39   SIGNAL_GET_SOURCE_BY_SSRC,
40   SIGNAL_ON_NEW_SSRC,
41   SIGNAL_ON_SSRC_COLLISION,
42   SIGNAL_ON_SSRC_VALIDATED,
43   SIGNAL_ON_SSRC_ACTIVE,
44   SIGNAL_ON_SSRC_SDES,
45   SIGNAL_ON_BYE_SSRC,
46   SIGNAL_ON_BYE_TIMEOUT,
47   SIGNAL_ON_TIMEOUT,
48   SIGNAL_ON_SENDER_TIMEOUT,
49   SIGNAL_ON_SENDING_RTCP,
50   SIGNAL_ON_FEEDBACK_RTCP,
51   SIGNAL_SEND_RTCP,
52   LAST_SIGNAL
53 };
54
55 #define DEFAULT_INTERNAL_SOURCE      NULL
56 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
57 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
58 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
59 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
60 #define DEFAULT_RTCP_MTU             1400
61 #define DEFAULT_SDES                 NULL
62 #define DEFAULT_NUM_SOURCES          0
63 #define DEFAULT_NUM_ACTIVE_SOURCES   0
64 #define DEFAULT_SOURCES              NULL
65 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
66 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
67 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
68 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
69
70 enum
71 {
72   PROP_0,
73   PROP_INTERNAL_SSRC,
74   PROP_INTERNAL_SOURCE,
75   PROP_BANDWIDTH,
76   PROP_RTCP_FRACTION,
77   PROP_RTCP_RR_BANDWIDTH,
78   PROP_RTCP_RS_BANDWIDTH,
79   PROP_RTCP_MTU,
80   PROP_SDES,
81   PROP_NUM_SOURCES,
82   PROP_NUM_ACTIVE_SOURCES,
83   PROP_SOURCES,
84   PROP_FAVOR_NEW,
85   PROP_RTCP_MIN_INTERVAL,
86   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
87   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
88   PROP_PROBATION,
89   PROP_LAST
90 };
91
92 /* update average packet size */
93 #define INIT_AVG(avg, val) \
94    (avg) = (val);
95 #define UPDATE_AVG(avg, val)            \
96   if ((avg) == 0)                       \
97    (avg) = (val);                       \
98   else                                  \
99    (avg) = ((val) + (15 * (avg))) >> 4;
100
101
102 /* The number RTCP intervals after which to timeout entries in the
103  * collision table
104  */
105 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
106
107 /* GObject vmethods */
108 static void rtp_session_finalize (GObject * object);
109 static void rtp_session_set_property (GObject * object, guint prop_id,
110     const GValue * value, GParamSpec * pspec);
111 static void rtp_session_get_property (GObject * object, guint prop_id,
112     GValue * value, GParamSpec * pspec);
113
114 static void rtp_session_send_rtcp (RTPSession * sess,
115     GstClockTimeDiff max_delay);
116
117
118 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
119
120 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
121
122 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
123 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
124     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
125 static RTPSource *obtain_internal_source (RTPSession * sess,
126     guint32 ssrc, gboolean * created);
127 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
128     GstClockTime current_time);
129 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
130     gboolean deterministic, gboolean first);
131
132 static gboolean
133 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
134     const GValue * handler_return, gpointer data)
135 {
136   if (g_value_get_boolean (handler_return))
137     g_value_set_boolean (return_accu, TRUE);
138
139   return TRUE;
140 }
141
142 static void
143 rtp_session_class_init (RTPSessionClass * klass)
144 {
145   GObjectClass *gobject_class;
146
147   gobject_class = (GObjectClass *) klass;
148
149   gobject_class->finalize = rtp_session_finalize;
150   gobject_class->set_property = rtp_session_set_property;
151   gobject_class->get_property = rtp_session_get_property;
152
153   /**
154    * RTPSession::get-source-by-ssrc:
155    * @session: the object which received the signal
156    * @ssrc: the SSRC of the RTPSource
157    *
158    * Request the #RTPSource object with SSRC @ssrc in @session.
159    */
160   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
161       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
162       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
163           get_source_by_ssrc), NULL, NULL, g_cclosure_marshal_generic,
164       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
165
166   /**
167    * RTPSession::on-new-ssrc:
168    * @session: the object which received the signal
169    * @src: the new RTPSource
170    *
171    * Notify of a new SSRC that entered @session.
172    */
173   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
174       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
175       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
176       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
177       RTP_TYPE_SOURCE);
178   /**
179    * RTPSession::on-ssrc-collision:
180    * @session: the object which received the signal
181    * @src: the #RTPSource that caused a collision
182    *
183    * Notify when we have an SSRC collision
184    */
185   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
186       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
187       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
188       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
189       RTP_TYPE_SOURCE);
190   /**
191    * RTPSession::on-ssrc-validated:
192    * @session: the object which received the signal
193    * @src: the new validated RTPSource
194    *
195    * Notify of a new SSRC that became validated.
196    */
197   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
198       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
199       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
200       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
201       RTP_TYPE_SOURCE);
202   /**
203    * RTPSession::on-ssrc-active:
204    * @session: the object which received the signal
205    * @src: the active RTPSource
206    *
207    * Notify of a SSRC that is active, i.e., sending RTCP.
208    */
209   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
210       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
211       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
212       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
213       RTP_TYPE_SOURCE);
214   /**
215    * RTPSession::on-ssrc-sdes:
216    * @session: the object which received the signal
217    * @src: the RTPSource
218    *
219    * Notify that a new SDES was received for SSRC.
220    */
221   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
222       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
223       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
224       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
225       RTP_TYPE_SOURCE);
226   /**
227    * RTPSession::on-bye-ssrc:
228    * @session: the object which received the signal
229    * @src: the RTPSource that went away
230    *
231    * Notify of an SSRC that became inactive because of a BYE packet.
232    */
233   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
234       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
235       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
236       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
237       RTP_TYPE_SOURCE);
238   /**
239    * RTPSession::on-bye-timeout:
240    * @session: the object which received the signal
241    * @src: the RTPSource that timed out
242    *
243    * Notify of an SSRC that has timed out because of BYE
244    */
245   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
246       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
247       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
248       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
249       RTP_TYPE_SOURCE);
250   /**
251    * RTPSession::on-timeout:
252    * @session: the object which received the signal
253    * @src: the RTPSource that timed out
254    *
255    * Notify of an SSRC that has timed out
256    */
257   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
258       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
259       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
260       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
261       RTP_TYPE_SOURCE);
262   /**
263    * RTPSession::on-sender-timeout:
264    * @session: the object which received the signal
265    * @src: the RTPSource that timed out
266    *
267    * Notify of an SSRC that was a sender but timed out and became a receiver.
268    */
269   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
270       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
271       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
272       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
273       RTP_TYPE_SOURCE);
274
275   /**
276    * RTPSession::on-sending-rtcp
277    * @session: the object which received the signal
278    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
279    * @early: %TRUE if the packet is early, %FALSE if it is regular
280    *
281    * This signal is emitted before sending an RTCP packet, it can be used
282    * to add extra RTCP Packets.
283    *
284    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
285    * if suppressing it is acceptable
286    */
287   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
288       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
289       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
290       accumulate_trues, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2,
291       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
292
293   /**
294    * RTPSession::on-feedback-rtcp:
295    * @session: the object which received the signal
296    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
297    *  %GST_RTCP_TYPE_RTPFB
298    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
299    * @sender_ssrc: The SSRC of the sender
300    * @media_ssrc: The SSRC of the media this refers to
301    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
302    * there was no FCI
303    *
304    * Notify that a RTCP feedback packet has been received
305    */
306   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
307       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
308       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
309       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 5, G_TYPE_UINT,
310       G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, GST_TYPE_BUFFER);
311
312   /**
313    * RTPSession::send-rtcp:
314    * @session: the object which received the signal
315    * @max_delay: The maximum delay after which the feedback will not be useful
316    *  anymore
317    *
318    * Requests that the #RTPSession initiate a new RTCP packet as soon as
319    * possible within the requested delay.
320    */
321   rtp_session_signals[SIGNAL_SEND_RTCP] =
322       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
323       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
324       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
325       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
326
327   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
328       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
329           "The internal SSRC used for the session (deprecated)",
330           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
331
332   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
333       g_param_spec_object ("internal-source", "Internal Source",
334           "The internal source element of the session (deprecated)",
335           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
336
337   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
338       g_param_spec_double ("bandwidth", "Bandwidth",
339           "The bandwidth of the session (0 for auto-discover)",
340           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
341           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342
343   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
344       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
345           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
346           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
347           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
348
349   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
350       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
351           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
352           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
353           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
354
355   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
356       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
357           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
358           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
359           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
360
361   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
362       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
363           "The maximum size of the RTCP packets",
364           16, G_MAXINT16, DEFAULT_RTCP_MTU,
365           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366
367   g_object_class_install_property (gobject_class, PROP_SDES,
368       g_param_spec_boxed ("sdes", "SDES",
369           "The SDES items of this session",
370           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371
372   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
373       g_param_spec_uint ("num-sources", "Num Sources",
374           "The number of sources in the session", 0, G_MAXUINT,
375           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
376
377   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
378       g_param_spec_uint ("num-active-sources", "Num Active Sources",
379           "The number of active sources in the session", 0, G_MAXUINT,
380           DEFAULT_NUM_ACTIVE_SOURCES,
381           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
382   /**
383    * RTPSource::sources
384    *
385    * Get a GValue Array of all sources in the session.
386    *
387    * <example>
388    * <title>Getting the #RTPSources of a session
389    * <programlisting>
390    * {
391    *   GValueArray *arr;
392    *   GValue *val;
393    *   guint i;
394    *
395    *   g_object_get (sess, "sources", &arr, NULL);
396    *
397    *   for (i = 0; i < arr->n_values; i++) {
398    *     RTPSource *source;
399    *
400    *     val = g_value_array_get_nth (arr, i);
401    *     source = g_value_get_object (val);
402    *   }
403    *   g_value_array_free (arr);
404    * }
405    * </programlisting>
406    * </example>
407    */
408   g_object_class_install_property (gobject_class, PROP_SOURCES,
409       g_param_spec_boxed ("sources", "Sources",
410           "An array of all known sources in the session",
411           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
412
413   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
414       g_param_spec_boolean ("favor-new", "Favor new sources",
415           "Resolve SSRC conflict in favor of new sources", FALSE,
416           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
417
418   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
419       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
420           "Minimum interval between Regular RTCP packet (in ns)",
421           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
422           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
423
424   g_object_class_install_property (gobject_class,
425       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
426       g_param_spec_uint64 ("rtcp-feedback-retention-window",
427           "RTCP Feedback retention window",
428           "Duration during which RTCP Feedback packets are retained (in ns)",
429           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
430           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431
432   g_object_class_install_property (gobject_class,
433       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
434       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
435           "RTCP Immediate Feedback threshold",
436           "The maximum number of members of a RTP session for which immediate"
437           " feedback is used",
438           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
439           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_PROBATION,
442       g_param_spec_uint ("probation", "Number of probations",
443           "Consecutive packet sequence numbers to accept the source",
444           0, G_MAXUINT, DEFAULT_PROBATION,
445           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446
447   klass->get_source_by_ssrc =
448       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
449   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
450
451   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
452 }
453
454 static void
455 rtp_session_init (RTPSession * sess)
456 {
457   gint i;
458   gchar *str;
459
460   g_mutex_init (&sess->lock);
461   sess->key = g_random_int ();
462   sess->mask_idx = 0;
463   sess->mask = 0;
464
465   for (i = 0; i < 32; i++) {
466     sess->ssrcs[i] =
467         g_hash_table_new_full (NULL, NULL, NULL,
468         (GDestroyNotify) g_object_unref);
469   }
470
471   rtp_stats_init_defaults (&sess->stats);
472   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
473   rtp_stats_set_min_interval (&sess->stats,
474       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
475
476   sess->recalc_bandwidth = TRUE;
477   sess->bandwidth = DEFAULT_BANDWIDTH;
478   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
479   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
480   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
481
482   /* default UDP header length */
483   sess->header_len = 28;
484   sess->mtu = DEFAULT_RTCP_MTU;
485
486   sess->probation = DEFAULT_PROBATION;
487
488   /* some default SDES entries */
489   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
490
491   /* we do not want to leak details like the username or hostname here */
492   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
493   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
494   g_free (str);
495
496 #if 0
497   /* we do not want to leak the user's real name here */
498   str = g_strdup_printf ("Anon%u", g_random_int ());
499   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
500   g_free (str);
501 #endif
502
503   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
504
505   /* this is the SSRC we suggest */
506   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
507
508   sess->first_rtcp = TRUE;
509   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
510
511   sess->allow_early = TRUE;
512   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
513   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
514   sess->rtcp_immediate_feedback_threshold =
515       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
516
517   sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
518 }
519
520 static void
521 rtp_session_finalize (GObject * object)
522 {
523   RTPSession *sess;
524   gint i;
525
526   sess = RTP_SESSION_CAST (object);
527
528   gst_structure_free (sess->sdes);
529
530   for (i = 0; i < 32; i++)
531     g_hash_table_destroy (sess->ssrcs[i]);
532
533   g_mutex_clear (&sess->lock);
534
535   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
536 }
537
538 static void
539 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
540 {
541   GValue value = { 0 };
542
543   g_value_init (&value, RTP_TYPE_SOURCE);
544   g_value_take_object (&value, source);
545   /* copies the value */
546   g_value_array_append (arr, &value);
547 }
548
549 static GValueArray *
550 rtp_session_create_sources (RTPSession * sess)
551 {
552   GValueArray *res;
553   guint size;
554
555   RTP_SESSION_LOCK (sess);
556   /* get number of elements in the table */
557   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
558   /* create the result value array */
559   res = g_value_array_new (size);
560
561   /* and copy all values into the array */
562   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
563   RTP_SESSION_UNLOCK (sess);
564
565   return res;
566 }
567
568 static void
569 rtp_session_set_property (GObject * object, guint prop_id,
570     const GValue * value, GParamSpec * pspec)
571 {
572   RTPSession *sess;
573
574   sess = RTP_SESSION (object);
575
576   switch (prop_id) {
577     case PROP_INTERNAL_SSRC:
578       break;
579     case PROP_BANDWIDTH:
580       RTP_SESSION_LOCK (sess);
581       sess->bandwidth = g_value_get_double (value);
582       sess->recalc_bandwidth = TRUE;
583       RTP_SESSION_UNLOCK (sess);
584       break;
585     case PROP_RTCP_FRACTION:
586       RTP_SESSION_LOCK (sess);
587       sess->rtcp_bandwidth = g_value_get_double (value);
588       sess->recalc_bandwidth = TRUE;
589       RTP_SESSION_UNLOCK (sess);
590       break;
591     case PROP_RTCP_RR_BANDWIDTH:
592       RTP_SESSION_LOCK (sess);
593       sess->rtcp_rr_bandwidth = g_value_get_int (value);
594       sess->recalc_bandwidth = TRUE;
595       RTP_SESSION_UNLOCK (sess);
596       break;
597     case PROP_RTCP_RS_BANDWIDTH:
598       RTP_SESSION_LOCK (sess);
599       sess->rtcp_rs_bandwidth = g_value_get_int (value);
600       sess->recalc_bandwidth = TRUE;
601       RTP_SESSION_UNLOCK (sess);
602       break;
603     case PROP_RTCP_MTU:
604       sess->mtu = g_value_get_uint (value);
605       break;
606     case PROP_SDES:
607       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
608       break;
609     case PROP_FAVOR_NEW:
610       sess->favor_new = g_value_get_boolean (value);
611       break;
612     case PROP_RTCP_MIN_INTERVAL:
613       rtp_stats_set_min_interval (&sess->stats,
614           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
615       /* trigger reconsideration */
616       RTP_SESSION_LOCK (sess);
617       sess->next_rtcp_check_time = 0;
618       RTP_SESSION_UNLOCK (sess);
619       if (sess->callbacks.reconsider)
620         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
621       break;
622     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
623       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
624       break;
625     case PROP_PROBATION:
626       sess->probation = g_value_get_uint (value);
627       break;
628     default:
629       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
630       break;
631   }
632 }
633
634 static void
635 rtp_session_get_property (GObject * object, guint prop_id,
636     GValue * value, GParamSpec * pspec)
637 {
638   RTPSession *sess;
639
640   sess = RTP_SESSION (object);
641
642   switch (prop_id) {
643     case PROP_INTERNAL_SSRC:
644       g_value_set_uint (value, rtp_session_suggest_ssrc (sess));
645       break;
646     case PROP_INTERNAL_SOURCE:
647       /* FIXME, return a random source */
648       g_value_set_object (value, NULL);
649       break;
650     case PROP_BANDWIDTH:
651       g_value_set_double (value, sess->bandwidth);
652       break;
653     case PROP_RTCP_FRACTION:
654       g_value_set_double (value, sess->rtcp_bandwidth);
655       break;
656     case PROP_RTCP_RR_BANDWIDTH:
657       g_value_set_int (value, sess->rtcp_rr_bandwidth);
658       break;
659     case PROP_RTCP_RS_BANDWIDTH:
660       g_value_set_int (value, sess->rtcp_rs_bandwidth);
661       break;
662     case PROP_RTCP_MTU:
663       g_value_set_uint (value, sess->mtu);
664       break;
665     case PROP_SDES:
666       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
667       break;
668     case PROP_NUM_SOURCES:
669       g_value_set_uint (value, rtp_session_get_num_sources (sess));
670       break;
671     case PROP_NUM_ACTIVE_SOURCES:
672       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
673       break;
674     case PROP_SOURCES:
675       g_value_take_boxed (value, rtp_session_create_sources (sess));
676       break;
677     case PROP_FAVOR_NEW:
678       g_value_set_boolean (value, sess->favor_new);
679       break;
680     case PROP_RTCP_MIN_INTERVAL:
681       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
682       break;
683     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
684       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
685       break;
686     case PROP_PROBATION:
687       g_value_set_uint (value, sess->probation);
688       break;
689     default:
690       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
691       break;
692   }
693 }
694
695 static void
696 on_new_ssrc (RTPSession * sess, RTPSource * source)
697 {
698   g_object_ref (source);
699   RTP_SESSION_UNLOCK (sess);
700   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
701   RTP_SESSION_LOCK (sess);
702   g_object_unref (source);
703 }
704
705 static void
706 on_ssrc_collision (RTPSession * sess, RTPSource * source)
707 {
708   g_object_ref (source);
709   RTP_SESSION_UNLOCK (sess);
710   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
711       source);
712   RTP_SESSION_LOCK (sess);
713   g_object_unref (source);
714 }
715
716 static void
717 on_ssrc_validated (RTPSession * sess, RTPSource * source)
718 {
719   g_object_ref (source);
720   RTP_SESSION_UNLOCK (sess);
721   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
722       source);
723   RTP_SESSION_LOCK (sess);
724   g_object_unref (source);
725 }
726
727 static void
728 on_ssrc_active (RTPSession * sess, RTPSource * source)
729 {
730   g_object_ref (source);
731   RTP_SESSION_UNLOCK (sess);
732   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
733   RTP_SESSION_LOCK (sess);
734   g_object_unref (source);
735 }
736
737 static void
738 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
739 {
740   g_object_ref (source);
741   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
742   RTP_SESSION_UNLOCK (sess);
743   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
744   RTP_SESSION_LOCK (sess);
745   g_object_unref (source);
746 }
747
748 static void
749 on_bye_ssrc (RTPSession * sess, RTPSource * source)
750 {
751   g_object_ref (source);
752   RTP_SESSION_UNLOCK (sess);
753   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
754   RTP_SESSION_LOCK (sess);
755   g_object_unref (source);
756 }
757
758 static void
759 on_bye_timeout (RTPSession * sess, RTPSource * source)
760 {
761   g_object_ref (source);
762   RTP_SESSION_UNLOCK (sess);
763   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
764   RTP_SESSION_LOCK (sess);
765   g_object_unref (source);
766 }
767
768 static void
769 on_timeout (RTPSession * sess, RTPSource * source)
770 {
771   g_object_ref (source);
772   RTP_SESSION_UNLOCK (sess);
773   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
774   RTP_SESSION_LOCK (sess);
775   g_object_unref (source);
776 }
777
778 static void
779 on_sender_timeout (RTPSession * sess, RTPSource * source)
780 {
781   g_object_ref (source);
782   RTP_SESSION_UNLOCK (sess);
783   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
784       source);
785   RTP_SESSION_LOCK (sess);
786   g_object_unref (source);
787 }
788
789 /**
790  * rtp_session_new:
791  *
792  * Create a new session object.
793  *
794  * Returns: a new #RTPSession. g_object_unref() after usage.
795  */
796 RTPSession *
797 rtp_session_new (void)
798 {
799   RTPSession *sess;
800
801   sess = g_object_new (RTP_TYPE_SESSION, NULL);
802
803   return sess;
804 }
805
806 /**
807  * rtp_session_set_callbacks:
808  * @sess: an #RTPSession
809  * @callbacks: callbacks to configure
810  * @user_data: user data passed in the callbacks
811  *
812  * Configure a set of callbacks to be notified of actions.
813  */
814 void
815 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
816     gpointer user_data)
817 {
818   g_return_if_fail (RTP_IS_SESSION (sess));
819
820   if (callbacks->process_rtp) {
821     sess->callbacks.process_rtp = callbacks->process_rtp;
822     sess->process_rtp_user_data = user_data;
823   }
824   if (callbacks->send_rtp) {
825     sess->callbacks.send_rtp = callbacks->send_rtp;
826     sess->send_rtp_user_data = user_data;
827   }
828   if (callbacks->send_rtcp) {
829     sess->callbacks.send_rtcp = callbacks->send_rtcp;
830     sess->send_rtcp_user_data = user_data;
831   }
832   if (callbacks->sync_rtcp) {
833     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
834     sess->sync_rtcp_user_data = user_data;
835   }
836   if (callbacks->clock_rate) {
837     sess->callbacks.clock_rate = callbacks->clock_rate;
838     sess->clock_rate_user_data = user_data;
839   }
840   if (callbacks->reconsider) {
841     sess->callbacks.reconsider = callbacks->reconsider;
842     sess->reconsider_user_data = user_data;
843   }
844   if (callbacks->request_key_unit) {
845     sess->callbacks.request_key_unit = callbacks->request_key_unit;
846     sess->request_key_unit_user_data = user_data;
847   }
848   if (callbacks->request_time) {
849     sess->callbacks.request_time = callbacks->request_time;
850     sess->request_time_user_data = user_data;
851   }
852 }
853
854 /**
855  * rtp_session_set_process_rtp_callback:
856  * @sess: an #RTPSession
857  * @callback: callback to set
858  * @user_data: user data passed in the callback
859  *
860  * Configure only the process_rtp callback to be notified of the process_rtp action.
861  */
862 void
863 rtp_session_set_process_rtp_callback (RTPSession * sess,
864     RTPSessionProcessRTP callback, gpointer user_data)
865 {
866   g_return_if_fail (RTP_IS_SESSION (sess));
867
868   sess->callbacks.process_rtp = callback;
869   sess->process_rtp_user_data = user_data;
870 }
871
872 /**
873  * rtp_session_set_send_rtp_callback:
874  * @sess: an #RTPSession
875  * @callback: callback to set
876  * @user_data: user data passed in the callback
877  *
878  * Configure only the send_rtp callback to be notified of the send_rtp action.
879  */
880 void
881 rtp_session_set_send_rtp_callback (RTPSession * sess,
882     RTPSessionSendRTP callback, gpointer user_data)
883 {
884   g_return_if_fail (RTP_IS_SESSION (sess));
885
886   sess->callbacks.send_rtp = callback;
887   sess->send_rtp_user_data = user_data;
888 }
889
890 /**
891  * rtp_session_set_send_rtcp_callback:
892  * @sess: an #RTPSession
893  * @callback: callback to set
894  * @user_data: user data passed in the callback
895  *
896  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
897  */
898 void
899 rtp_session_set_send_rtcp_callback (RTPSession * sess,
900     RTPSessionSendRTCP callback, gpointer user_data)
901 {
902   g_return_if_fail (RTP_IS_SESSION (sess));
903
904   sess->callbacks.send_rtcp = callback;
905   sess->send_rtcp_user_data = user_data;
906 }
907
908 /**
909  * rtp_session_set_sync_rtcp_callback:
910  * @sess: an #RTPSession
911  * @callback: callback to set
912  * @user_data: user data passed in the callback
913  *
914  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
915  */
916 void
917 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
918     RTPSessionSyncRTCP callback, gpointer user_data)
919 {
920   g_return_if_fail (RTP_IS_SESSION (sess));
921
922   sess->callbacks.sync_rtcp = callback;
923   sess->sync_rtcp_user_data = user_data;
924 }
925
926 /**
927  * rtp_session_set_clock_rate_callback:
928  * @sess: an #RTPSession
929  * @callback: callback to set
930  * @user_data: user data passed in the callback
931  *
932  * Configure only the clock_rate callback to be notified of the clock_rate action.
933  */
934 void
935 rtp_session_set_clock_rate_callback (RTPSession * sess,
936     RTPSessionClockRate callback, gpointer user_data)
937 {
938   g_return_if_fail (RTP_IS_SESSION (sess));
939
940   sess->callbacks.clock_rate = callback;
941   sess->clock_rate_user_data = user_data;
942 }
943
944 /**
945  * rtp_session_set_reconsider_callback:
946  * @sess: an #RTPSession
947  * @callback: callback to set
948  * @user_data: user data passed in the callback
949  *
950  * Configure only the reconsider callback to be notified of the reconsider action.
951  */
952 void
953 rtp_session_set_reconsider_callback (RTPSession * sess,
954     RTPSessionReconsider callback, gpointer user_data)
955 {
956   g_return_if_fail (RTP_IS_SESSION (sess));
957
958   sess->callbacks.reconsider = callback;
959   sess->reconsider_user_data = user_data;
960 }
961
962 /**
963  * rtp_session_set_request_time_callback:
964  * @sess: an #RTPSession
965  * @callback: callback to set
966  * @user_data: user data passed in the callback
967  *
968  * Configure only the request_time callback
969  */
970 void
971 rtp_session_set_request_time_callback (RTPSession * sess,
972     RTPSessionRequestTime callback, gpointer user_data)
973 {
974   g_return_if_fail (RTP_IS_SESSION (sess));
975
976   sess->callbacks.request_time = callback;
977   sess->request_time_user_data = user_data;
978 }
979
980 /**
981  * rtp_session_set_bandwidth:
982  * @sess: an #RTPSession
983  * @bandwidth: the bandwidth allocated
984  *
985  * Set the session bandwidth in bytes per second.
986  */
987 void
988 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
989 {
990   g_return_if_fail (RTP_IS_SESSION (sess));
991
992   RTP_SESSION_LOCK (sess);
993   sess->stats.bandwidth = bandwidth;
994   RTP_SESSION_UNLOCK (sess);
995 }
996
997 /**
998  * rtp_session_get_bandwidth:
999  * @sess: an #RTPSession
1000  *
1001  * Get the session bandwidth.
1002  *
1003  * Returns: the session bandwidth.
1004  */
1005 gdouble
1006 rtp_session_get_bandwidth (RTPSession * sess)
1007 {
1008   gdouble result;
1009
1010   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1011
1012   RTP_SESSION_LOCK (sess);
1013   result = sess->stats.bandwidth;
1014   RTP_SESSION_UNLOCK (sess);
1015
1016   return result;
1017 }
1018
1019 /**
1020  * rtp_session_set_rtcp_fraction:
1021  * @sess: an #RTPSession
1022  * @bandwidth: the RTCP bandwidth
1023  *
1024  * Set the bandwidth in bytes per second that should be used for RTCP
1025  * messages.
1026  */
1027 void
1028 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1029 {
1030   g_return_if_fail (RTP_IS_SESSION (sess));
1031
1032   RTP_SESSION_LOCK (sess);
1033   sess->stats.rtcp_bandwidth = bandwidth;
1034   RTP_SESSION_UNLOCK (sess);
1035 }
1036
1037 /**
1038  * rtp_session_get_rtcp_fraction:
1039  * @sess: an #RTPSession
1040  *
1041  * Get the session bandwidth used for RTCP.
1042  *
1043  * Returns: The bandwidth used for RTCP messages.
1044  */
1045 gdouble
1046 rtp_session_get_rtcp_fraction (RTPSession * sess)
1047 {
1048   gdouble result;
1049
1050   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1051
1052   RTP_SESSION_LOCK (sess);
1053   result = sess->stats.rtcp_bandwidth;
1054   RTP_SESSION_UNLOCK (sess);
1055
1056   return result;
1057 }
1058
1059 /**
1060  * rtp_session_get_sdes_struct:
1061  * @sess: an #RTSPSession
1062  *
1063  * Get the SDES data as a #GstStructure
1064  *
1065  * Returns: a GstStructure with SDES items for @sess. This function returns a
1066  * copy of the SDES structure, use gst_structure_free() after usage.
1067  */
1068 GstStructure *
1069 rtp_session_get_sdes_struct (RTPSession * sess)
1070 {
1071   GstStructure *result = NULL;
1072
1073   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1074
1075   RTP_SESSION_LOCK (sess);
1076   if (sess->sdes)
1077     result = gst_structure_copy (sess->sdes);
1078   RTP_SESSION_UNLOCK (sess);
1079
1080   return result;
1081 }
1082
1083 /**
1084  * rtp_session_set_sdes_struct:
1085  * @sess: an #RTSPSession
1086  * @sdes: a #GstStructure
1087  *
1088  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1089  */
1090 void
1091 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1092 {
1093   g_return_if_fail (sdes);
1094   g_return_if_fail (RTP_IS_SESSION (sess));
1095
1096   RTP_SESSION_LOCK (sess);
1097   if (sess->sdes)
1098     gst_structure_free (sess->sdes);
1099   sess->sdes = gst_structure_copy (sdes);
1100   RTP_SESSION_UNLOCK (sess);
1101 }
1102
1103 static GstFlowReturn
1104 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1105 {
1106   GstFlowReturn result = GST_FLOW_OK;
1107
1108   if (source->internal) {
1109     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1110
1111     RTP_SESSION_UNLOCK (session);
1112
1113     if (session->callbacks.send_rtp)
1114       result =
1115           session->callbacks.send_rtp (session, source, data,
1116           session->send_rtp_user_data);
1117     else {
1118       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1119     }
1120   } else {
1121     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1122     RTP_SESSION_UNLOCK (session);
1123
1124     if (session->callbacks.process_rtp)
1125       result =
1126           session->callbacks.process_rtp (session, source,
1127           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1128     else
1129       gst_buffer_unref (GST_BUFFER_CAST (data));
1130   }
1131   RTP_SESSION_LOCK (session);
1132
1133   return result;
1134 }
1135
1136 static gint
1137 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1138 {
1139   gint result;
1140
1141   RTP_SESSION_UNLOCK (session);
1142
1143   if (session->callbacks.clock_rate)
1144     result =
1145         session->callbacks.clock_rate (session, pt,
1146         session->clock_rate_user_data);
1147   else
1148     result = -1;
1149
1150   RTP_SESSION_LOCK (session);
1151
1152   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1153
1154   return result;
1155 }
1156
1157 static RTPSourceCallbacks callbacks = {
1158   (RTPSourcePushRTP) source_push_rtp,
1159   (RTPSourceClockRate) source_clock_rate,
1160 };
1161
1162 static gboolean
1163 check_collision (RTPSession * sess, RTPSource * source,
1164     RTPArrivalStats * arrival, gboolean rtp)
1165 {
1166   guint32 ssrc;
1167
1168   /* If we have no arrival address, we can't do collision checking */
1169   if (!arrival->address)
1170     return FALSE;
1171
1172   ssrc = rtp_source_get_ssrc (source);
1173
1174   if (!source->internal) {
1175     GSocketAddress *from;
1176
1177     /* This is not our local source, but lets check if two remote
1178      * source collide */
1179     if (rtp) {
1180       from = source->rtp_from;
1181     } else {
1182       from = source->rtcp_from;
1183     }
1184
1185     if (from) {
1186       if (__g_socket_address_equal (from, arrival->address)) {
1187         /* Address is the same */
1188         return FALSE;
1189       } else {
1190         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1191         if (sess->favor_new) {
1192           if (rtp_source_find_conflicting_address (source,
1193                   arrival->address, arrival->current_time)) {
1194             gchar *buf1;
1195
1196             buf1 = __g_socket_address_to_string (arrival->address);
1197             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1198                 buf1);
1199             g_free (buf1);
1200
1201             return TRUE;
1202           } else {
1203             gchar *buf1, *buf2;
1204
1205             /* Current address is not a known conflict, lets assume this is
1206              * a new source. Save old address in possible conflict list
1207              */
1208             rtp_source_add_conflicting_address (source, from,
1209                 arrival->current_time);
1210
1211             buf1 = __g_socket_address_to_string (from);
1212             buf2 = __g_socket_address_to_string (arrival->address);
1213
1214             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1215                 " saving old as known conflict", ssrc, buf1, buf2);
1216
1217             if (rtp)
1218               rtp_source_set_rtp_from (source, arrival->address);
1219             else
1220               rtp_source_set_rtcp_from (source, arrival->address);
1221
1222             g_free (buf1);
1223             g_free (buf2);
1224
1225             return FALSE;
1226           }
1227         } else {
1228           /* Don't need to save old addresses, we ignore new sources */
1229           return TRUE;
1230         }
1231       }
1232     } else {
1233       /* We don't already have a from address for RTP, just set it */
1234       if (rtp)
1235         rtp_source_set_rtp_from (source, arrival->address);
1236       else
1237         rtp_source_set_rtcp_from (source, arrival->address);
1238       return FALSE;
1239     }
1240
1241     /* FIXME: Log 3rd party collision somehow
1242      * Maybe should be done in upper layer, only the SDES can tell us
1243      * if its a collision or a loop
1244      */
1245   } else {
1246     /* This is sending with our ssrc, is it an address we already know */
1247     if (rtp_source_find_conflicting_address (source, arrival->address,
1248             arrival->current_time)) {
1249       /* Its a known conflict, its probably a loop, not a collision
1250        * lets just drop the incoming packet
1251        */
1252       GST_DEBUG ("Our packets are being looped back to us, dropping");
1253     } else {
1254       /* Its a new collision, lets change our SSRC */
1255       rtp_source_add_conflicting_address (source, arrival->address,
1256           arrival->current_time);
1257
1258       GST_DEBUG ("Collision for SSRC %x", ssrc);
1259       /* mark the source BYE */
1260       rtp_source_mark_bye (source, "SSRC Collision");
1261       /* if we were suggesting this SSRC, change to something else */
1262       if (sess->suggested_ssrc == ssrc)
1263         sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1264
1265       on_ssrc_collision (sess, source);
1266
1267       rtp_session_schedule_bye_locked (sess, arrival->current_time);
1268     }
1269   }
1270
1271   return TRUE;
1272 }
1273
1274 static RTPSource *
1275 find_source (RTPSession * sess, guint32 ssrc)
1276 {
1277   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1278       GINT_TO_POINTER (ssrc));
1279 }
1280
1281 static void
1282 add_source (RTPSession * sess, RTPSource * src)
1283 {
1284   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1285       GINT_TO_POINTER (src->ssrc), src);
1286   /* report the new source ASAP */
1287   src->generation = sess->generation;
1288   /* we have one more source now */
1289   sess->total_sources++;
1290   if (RTP_SOURCE_IS_ACTIVE (src))
1291     sess->stats.active_sources++;
1292   if (src->internal) {
1293     sess->stats.internal_sources++;
1294     if (sess->suggested_ssrc != src->ssrc)
1295       sess->suggested_ssrc = src->ssrc;
1296   }
1297 }
1298
1299 /* must be called with the session lock, the returned source needs to be
1300  * unreffed after usage. */
1301 static RTPSource *
1302 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1303     RTPArrivalStats * arrival, gboolean rtp)
1304 {
1305   RTPSource *source;
1306
1307   source = find_source (sess, ssrc);
1308   if (source == NULL) {
1309     /* make new Source in probation and insert */
1310     source = rtp_source_new (ssrc);
1311
1312     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1313
1314     /* for RTP packets we need to set the source in probation. Receiving RTCP
1315      * packets of an SSRC, on the other hand, is a strong indication that we
1316      * are dealing with a valid source. */
1317     if (rtp)
1318       g_object_set (source, "probation", sess->probation, NULL);
1319     else
1320       g_object_set (source, "probation", 0, NULL);
1321
1322     /* store from address, if any */
1323     if (arrival->address) {
1324       if (rtp)
1325         rtp_source_set_rtp_from (source, arrival->address);
1326       else
1327         rtp_source_set_rtcp_from (source, arrival->address);
1328     }
1329
1330     /* configure a callback on the source */
1331     rtp_source_set_callbacks (source, &callbacks, sess);
1332
1333     add_source (sess, source);
1334     *created = TRUE;
1335   } else {
1336     *created = FALSE;
1337     /* check for collision, this updates the address when not previously set */
1338     if (check_collision (sess, source, arrival, rtp)) {
1339       return NULL;
1340     }
1341     /* Receiving RTCP packets of an SSRC is a strong indication that we
1342      * are dealing with a valid source. */
1343     if (!rtp)
1344       g_object_set (source, "probation", 0, NULL);
1345   }
1346   /* update last activity */
1347   source->last_activity = arrival->current_time;
1348   if (rtp)
1349     source->last_rtp_activity = arrival->current_time;
1350   g_object_ref (source);
1351
1352   return source;
1353 }
1354
1355 /* must be called with the session lock, the returned source needs to be
1356  * unreffed after usage. */
1357 static RTPSource *
1358 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
1359 {
1360   RTPSource *source;
1361
1362   source = find_source (sess, ssrc);
1363   if (source == NULL) {
1364     /* make new internal Source and insert */
1365     source = rtp_source_new (ssrc);
1366
1367     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1368
1369     source->validated = TRUE;
1370     source->internal = TRUE;
1371     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1372     rtp_source_set_callbacks (source, &callbacks, sess);
1373
1374     add_source (sess, source);
1375     *created = TRUE;
1376   } else {
1377     *created = FALSE;
1378   }
1379   g_object_ref (source);
1380
1381   return source;
1382 }
1383
1384 /**
1385  * rtp_session_suggest_ssrc:
1386  * @sess: a #RTPSession
1387  *
1388  * Suggest an unused SSRC in @sess.
1389  *
1390  * Returns: a free unused SSRC
1391  */
1392 guint32
1393 rtp_session_suggest_ssrc (RTPSession * sess)
1394 {
1395   guint32 result;
1396
1397   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1398
1399   RTP_SESSION_LOCK (sess);
1400   result = sess->suggested_ssrc;
1401   RTP_SESSION_UNLOCK (sess);
1402
1403   return result;
1404 }
1405
1406 /**
1407  * rtp_session_add_source:
1408  * @sess: a #RTPSession
1409  * @src: #RTPSource to add
1410  *
1411  * Add @src to @session.
1412  *
1413  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1414  * existed in the session.
1415  */
1416 gboolean
1417 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1418 {
1419   gboolean result = FALSE;
1420   RTPSource *find;
1421
1422   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1423   g_return_val_if_fail (src != NULL, FALSE);
1424
1425   RTP_SESSION_LOCK (sess);
1426   find = find_source (sess, src->ssrc);
1427   if (find == NULL) {
1428     add_source (sess, src);
1429     result = TRUE;
1430   }
1431   RTP_SESSION_UNLOCK (sess);
1432
1433   return result;
1434 }
1435
1436 /**
1437  * rtp_session_get_num_sources:
1438  * @sess: an #RTPSession
1439  *
1440  * Get the number of sources in @sess.
1441  *
1442  * Returns: The number of sources in @sess.
1443  */
1444 guint
1445 rtp_session_get_num_sources (RTPSession * sess)
1446 {
1447   guint result;
1448
1449   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1450
1451   RTP_SESSION_LOCK (sess);
1452   result = sess->total_sources;
1453   RTP_SESSION_UNLOCK (sess);
1454
1455   return result;
1456 }
1457
1458 /**
1459  * rtp_session_get_num_active_sources:
1460  * @sess: an #RTPSession
1461  *
1462  * Get the number of active sources in @sess. A source is considered active when
1463  * it has been validated and has not yet received a BYE RTCP message.
1464  *
1465  * Returns: The number of active sources in @sess.
1466  */
1467 guint
1468 rtp_session_get_num_active_sources (RTPSession * sess)
1469 {
1470   guint result;
1471
1472   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1473
1474   RTP_SESSION_LOCK (sess);
1475   result = sess->stats.active_sources;
1476   RTP_SESSION_UNLOCK (sess);
1477
1478   return result;
1479 }
1480
1481 /**
1482  * rtp_session_get_source_by_ssrc:
1483  * @sess: an #RTPSession
1484  * @ssrc: an SSRC
1485  *
1486  * Find the source with @ssrc in @sess.
1487  *
1488  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1489  * g_object_unref() after usage.
1490  */
1491 RTPSource *
1492 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1493 {
1494   RTPSource *result;
1495
1496   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1497
1498   RTP_SESSION_LOCK (sess);
1499   result = find_source (sess, ssrc);
1500   if (result)
1501     g_object_ref (result);
1502   RTP_SESSION_UNLOCK (sess);
1503
1504   return result;
1505 }
1506
1507 /* should be called with the SESSION lock */
1508 static guint32
1509 rtp_session_create_new_ssrc (RTPSession * sess)
1510 {
1511   guint32 ssrc;
1512
1513   while (TRUE) {
1514     ssrc = g_random_int ();
1515
1516     /* see if it exists in the session, we're done if it doesn't */
1517     if (find_source (sess, ssrc) == NULL)
1518       break;
1519   }
1520   return ssrc;
1521 }
1522
1523
1524 /**
1525  * rtp_session_create_source:
1526  * @sess: an #RTPSession
1527  *
1528  * Create an #RTPSource for use in @sess. This function will create a source
1529  * with an ssrc that is currently not used by any participants in the session.
1530  *
1531  * Returns: an #RTPSource.
1532  */
1533 RTPSource *
1534 rtp_session_create_source (RTPSession * sess)
1535 {
1536   guint32 ssrc;
1537   RTPSource *source;
1538
1539   RTP_SESSION_LOCK (sess);
1540   ssrc = rtp_session_create_new_ssrc (sess);
1541   source = rtp_source_new (ssrc);
1542   rtp_source_set_callbacks (source, &callbacks, sess);
1543   /* we need an additional ref for the source in the hashtable */
1544   g_object_ref (source);
1545   add_source (sess, source);
1546   RTP_SESSION_UNLOCK (sess);
1547
1548   return source;
1549 }
1550
1551 /* update the RTPArrivalStats structure with the current time and other bits
1552  * about the current buffer we are handling.
1553  * This function is typically called when a validated packet is received.
1554  * This function should be called with the SESSION_LOCK
1555  */
1556 static void
1557 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1558     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1559     GstClockTime running_time, guint64 ntpnstime)
1560 {
1561   GstNetAddressMeta *meta;
1562   GstRTPBuffer rtpb = { NULL };
1563
1564   /* get time of arrival */
1565   arrival->current_time = current_time;
1566   arrival->running_time = running_time;
1567   arrival->ntpnstime = ntpnstime;
1568
1569   /* get packet size including header overhead */
1570   arrival->bytes = gst_buffer_get_size (buffer) + sess->header_len;
1571
1572   if (rtp) {
1573     gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpb);
1574     arrival->payload_len = gst_rtp_buffer_get_payload_len (&rtpb);
1575     gst_rtp_buffer_unmap (&rtpb);
1576   } else {
1577     arrival->payload_len = 0;
1578   }
1579
1580   /* for netbuffer we can store the IP address to check for collisions */
1581   meta = gst_buffer_get_net_address_meta (buffer);
1582   if (arrival->address)
1583     g_object_unref (arrival->address);
1584   if (meta) {
1585     arrival->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
1586   } else {
1587     arrival->address = NULL;
1588   }
1589 }
1590
1591 static void
1592 clean_arrival_stats (RTPArrivalStats * arrival)
1593 {
1594   if (arrival->address)
1595     g_object_unref (arrival->address);
1596 }
1597
1598 static gboolean
1599 source_update_active (RTPSession * sess, RTPSource * source,
1600     gboolean prevactive)
1601 {
1602   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
1603   guint32 ssrc = source->ssrc;
1604
1605   if (prevactive == active)
1606     return FALSE;
1607
1608   if (active) {
1609     sess->stats.active_sources++;
1610     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1611         sess->stats.active_sources);
1612   } else {
1613     sess->stats.active_sources--;
1614     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1615         sess->stats.active_sources);
1616   }
1617   return TRUE;
1618 }
1619
1620 static gboolean
1621 source_update_sender (RTPSession * sess, RTPSource * source,
1622     gboolean prevsender)
1623 {
1624   gboolean sender = RTP_SOURCE_IS_SENDER (source);
1625   guint32 ssrc = source->ssrc;
1626
1627   if (prevsender == sender)
1628     return FALSE;
1629
1630   if (sender) {
1631     sess->stats.sender_sources++;
1632     if (source->internal)
1633       sess->stats.internal_sender_sources++;
1634     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1635         sess->stats.sender_sources);
1636   } else {
1637     sess->stats.sender_sources--;
1638     if (source->internal)
1639       sess->stats.internal_sender_sources--;
1640     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1641         sess->stats.sender_sources);
1642   }
1643   return TRUE;
1644 }
1645
1646 /**
1647  * rtp_session_process_rtp:
1648  * @sess: and #RTPSession
1649  * @buffer: an RTP buffer
1650  * @current_time: the current system time
1651  * @running_time: the running_time of @buffer
1652  *
1653  * Process an RTP buffer in the session manager. This function takes ownership
1654  * of @buffer.
1655  *
1656  * Returns: a #GstFlowReturn.
1657  */
1658 GstFlowReturn
1659 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1660     GstClockTime current_time, GstClockTime running_time)
1661 {
1662   GstFlowReturn result;
1663   guint32 ssrc;
1664   RTPSource *source;
1665   gboolean created;
1666   gboolean prevsender, prevactive;
1667   RTPArrivalStats arrival = { NULL, };
1668   guint32 csrcs[16];
1669   guint8 i, count;
1670   guint64 oldrate;
1671   GstRTPBuffer rtp = { NULL };
1672
1673   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1674   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1675
1676   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
1677     goto invalid_packet;
1678
1679   /* get SSRC to look up in session database */
1680   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
1681   /* copy available csrc for later */
1682   count = gst_rtp_buffer_get_csrc_count (&rtp);
1683   /* make sure to not overflow our array. An RTP buffer can maximally contain
1684    * 16 CSRCs */
1685   count = MIN (count, 16);
1686
1687   for (i = 0; i < count; i++)
1688     csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
1689
1690   gst_rtp_buffer_unmap (&rtp);
1691
1692   RTP_SESSION_LOCK (sess);
1693
1694   /* update arrival stats */
1695   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1696       running_time, -1);
1697
1698   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1699   if (!source)
1700     goto collision;
1701
1702   prevsender = RTP_SOURCE_IS_SENDER (source);
1703   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1704   oldrate = source->bitrate;
1705
1706   /* let source process the packet */
1707   result = rtp_source_process_rtp (source, buffer, &arrival);
1708
1709   /* source became active */
1710   if (source_update_active (sess, source, prevactive))
1711     on_ssrc_validated (sess, source);
1712
1713   source_update_sender (sess, source, prevsender);
1714
1715   if (oldrate != source->bitrate)
1716     sess->recalc_bandwidth = TRUE;
1717
1718   if (created)
1719     on_new_ssrc (sess, source);
1720
1721   if (source->validated) {
1722     gboolean created;
1723
1724     /* for validated sources, we add the CSRCs as well */
1725     for (i = 0; i < count; i++) {
1726       guint32 csrc;
1727       RTPSource *csrc_src;
1728
1729       csrc = csrcs[i];
1730
1731       /* get source */
1732       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1733       if (!csrc_src)
1734         continue;
1735
1736       if (created) {
1737         GST_DEBUG ("created new CSRC: %08x", csrc);
1738         rtp_source_set_as_csrc (csrc_src);
1739         source_update_active (sess, csrc_src, FALSE);
1740         on_new_ssrc (sess, csrc_src);
1741       }
1742       g_object_unref (csrc_src);
1743     }
1744   }
1745   g_object_unref (source);
1746
1747   RTP_SESSION_UNLOCK (sess);
1748
1749   clean_arrival_stats (&arrival);
1750
1751   return result;
1752
1753   /* ERRORS */
1754 invalid_packet:
1755   {
1756     gst_buffer_unref (buffer);
1757     GST_DEBUG ("invalid RTP packet received");
1758     return GST_FLOW_OK;
1759   }
1760 collision:
1761   {
1762     RTP_SESSION_UNLOCK (sess);
1763     gst_buffer_unref (buffer);
1764     clean_arrival_stats (&arrival);
1765     GST_DEBUG ("ignoring packet because its collisioning");
1766     return GST_FLOW_OK;
1767   }
1768 }
1769
1770 static void
1771 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1772     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1773 {
1774   guint count, i;
1775
1776   count = gst_rtcp_packet_get_rb_count (packet);
1777   for (i = 0; i < count; i++) {
1778     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1779     guint8 fractionlost;
1780     gint32 packetslost;
1781     RTPSource *src;
1782
1783     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1784         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1785
1786     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1787
1788     /* find our own source */
1789     src = find_source (sess, ssrc);
1790     if (src == NULL)
1791       continue;
1792
1793     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
1794       /* only deal with report blocks for our session, we update the stats of
1795        * the sender of the RTCP message. We could also compare our stats against
1796        * the other sender to see if we are better or worse. */
1797       /* FIXME, need to keep track who the RB block is from */
1798       rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
1799           packetslost, exthighestseq, jitter, lsr, dlsr);
1800     }
1801   }
1802   on_ssrc_active (sess, source);
1803 }
1804
1805 /* A Sender report contains statistics about how the sender is doing. This
1806  * includes timing informataion such as the relation between RTP and NTP
1807  * timestamps and the number of packets/bytes it sent to us.
1808  *
1809  * In this report is also included a set of report blocks related to how this
1810  * sender is receiving data (in case we (or somebody else) is also sending stuff
1811  * to it). This info includes the packet loss, jitter and seqnum. It also
1812  * contains information to calculate the round trip time (LSR/DLSR).
1813  */
1814 static void
1815 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1816     RTPArrivalStats * arrival, gboolean * do_sync)
1817 {
1818   guint32 senderssrc, rtptime, packet_count, octet_count;
1819   guint64 ntptime;
1820   RTPSource *source;
1821   gboolean created, prevsender;
1822
1823   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1824       &packet_count, &octet_count);
1825
1826   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1827       senderssrc, GST_TIME_ARGS (arrival->current_time));
1828
1829   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1830   if (!source)
1831     return;
1832
1833   /* don't try to do lip-sync for sources that sent a BYE */
1834   if (RTP_SOURCE_IS_MARKED_BYE (source))
1835     *do_sync = FALSE;
1836   else
1837     *do_sync = TRUE;
1838
1839   prevsender = RTP_SOURCE_IS_SENDER (source);
1840
1841   /* first update the source */
1842   rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
1843       packet_count, octet_count);
1844
1845   source_update_sender (sess, source, prevsender);
1846
1847   if (created)
1848     on_new_ssrc (sess, source);
1849
1850   rtp_session_process_rb (sess, source, packet, arrival);
1851   g_object_unref (source);
1852 }
1853
1854 /* A receiver report contains statistics about how a receiver is doing. It
1855  * includes stuff like packet loss, jitter and the seqnum it received last. It
1856  * also contains info to calculate the round trip time.
1857  *
1858  * We are only interested in how the sender of this report is doing wrt to us.
1859  */
1860 static void
1861 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1862     RTPArrivalStats * arrival)
1863 {
1864   guint32 senderssrc;
1865   RTPSource *source;
1866   gboolean created;
1867
1868   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1869
1870   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1871
1872   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1873   if (!source)
1874     return;
1875
1876   if (created)
1877     on_new_ssrc (sess, source);
1878
1879   rtp_session_process_rb (sess, source, packet, arrival);
1880   g_object_unref (source);
1881 }
1882
1883 /* Get SDES items and store them in the SSRC */
1884 static void
1885 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1886     RTPArrivalStats * arrival)
1887 {
1888   guint items, i, j;
1889   gboolean more_items, more_entries;
1890
1891   items = gst_rtcp_packet_sdes_get_item_count (packet);
1892   GST_DEBUG ("got SDES packet with %d items", items);
1893
1894   more_items = gst_rtcp_packet_sdes_first_item (packet);
1895   i = 0;
1896   while (more_items) {
1897     guint32 ssrc;
1898     gboolean changed, created, prevactive;
1899     RTPSource *source;
1900     GstStructure *sdes;
1901
1902     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1903
1904     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1905
1906     changed = FALSE;
1907
1908     /* find src, no probation when dealing with RTCP */
1909     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1910     if (!source)
1911       return;
1912
1913     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
1914
1915     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1916     j = 0;
1917     while (more_entries) {
1918       GstRTCPSDESType type;
1919       guint8 len;
1920       guint8 *data;
1921       gchar *name;
1922       gchar *value;
1923
1924       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1925
1926       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1927           data);
1928
1929       if (type == GST_RTCP_SDES_PRIV) {
1930         name = g_strndup ((const gchar *) &data[1], data[0]);
1931         len -= data[0] + 1;
1932         data += data[0] + 1;
1933       } else {
1934         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
1935       }
1936
1937       value = g_strndup ((const gchar *) data, len);
1938
1939       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
1940
1941       g_free (name);
1942       g_free (value);
1943
1944       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1945       j++;
1946     }
1947
1948     /* takes ownership of sdes */
1949     changed = rtp_source_set_sdes_struct (source, sdes);
1950
1951     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1952     source->validated = TRUE;
1953
1954     if (created)
1955       on_new_ssrc (sess, source);
1956
1957     /* source became active */
1958     if (source_update_active (sess, source, prevactive))
1959       on_ssrc_validated (sess, source);
1960
1961     if (changed)
1962       on_ssrc_sdes (sess, source);
1963
1964     g_object_unref (source);
1965
1966     more_items = gst_rtcp_packet_sdes_next_item (packet);
1967     i++;
1968   }
1969 }
1970
1971 /* BYE is sent when a client leaves the session
1972  */
1973 static void
1974 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1975     RTPArrivalStats * arrival)
1976 {
1977   guint count, i;
1978   gchar *reason;
1979   gboolean reconsider = FALSE;
1980
1981   reason = gst_rtcp_packet_bye_get_reason (packet);
1982   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1983
1984   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1985   for (i = 0; i < count; i++) {
1986     guint32 ssrc;
1987     RTPSource *source;
1988     gboolean created, prevactive, prevsender;
1989     guint pmembers, members;
1990
1991     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1992     GST_DEBUG ("SSRC: %08x", ssrc);
1993
1994     /* find src and mark bye, no probation when dealing with RTCP */
1995     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1996     if (!source)
1997       return;
1998
1999     if (source->internal) {
2000       /* our own source, something weird with this packet */
2001       g_object_unref (source);
2002       continue;
2003     }
2004
2005     /* store time for when we need to time out this source */
2006     source->bye_time = arrival->current_time;
2007
2008     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2009     prevsender = RTP_SOURCE_IS_SENDER (source);
2010
2011     /* mark the source BYE */
2012     rtp_source_mark_bye (source, reason);
2013
2014     pmembers = sess->stats.active_sources;
2015
2016     source_update_active (sess, source, prevactive);
2017     source_update_sender (sess, source, prevsender);
2018
2019     members = sess->stats.active_sources;
2020
2021     if (!sess->scheduled_bye && members < pmembers) {
2022       /* some members went away since the previous timeout estimate.
2023        * Perform reverse reconsideration but only when we are not scheduling a
2024        * BYE ourselves. */
2025       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2026           arrival->current_time < sess->next_rtcp_check_time) {
2027         GstClockTime time_remaining;
2028
2029         time_remaining = sess->next_rtcp_check_time - arrival->current_time;
2030         sess->next_rtcp_check_time =
2031             gst_util_uint64_scale (time_remaining, members, pmembers);
2032
2033         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2034             GST_TIME_ARGS (sess->next_rtcp_check_time));
2035
2036         sess->next_rtcp_check_time += arrival->current_time;
2037
2038         /* mark pending reconsider. We only want to signal the reconsideration
2039          * once after we handled all the source in the bye packet */
2040         reconsider = TRUE;
2041       }
2042     }
2043
2044     if (created)
2045       on_new_ssrc (sess, source);
2046
2047     on_bye_ssrc (sess, source);
2048
2049     g_object_unref (source);
2050   }
2051   if (reconsider) {
2052     RTP_SESSION_UNLOCK (sess);
2053     /* notify app of reconsideration */
2054     if (sess->callbacks.reconsider)
2055       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2056     RTP_SESSION_LOCK (sess);
2057   }
2058   g_free (reason);
2059 }
2060
2061 static void
2062 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2063     RTPArrivalStats * arrival)
2064 {
2065   GST_DEBUG ("received APP");
2066 }
2067
2068 static gboolean
2069 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2070     gboolean fir, GstClockTime current_time)
2071 {
2072   guint32 round_trip = 0;
2073
2074   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
2075
2076   if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2077     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2078         GST_SECOND, 65536);
2079
2080     if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
2081       GST_DEBUG ("Ignoring %s request because one was send without one "
2082           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2083           fir ? "FIR" : "PLI",
2084           GST_TIME_ARGS (current_time - sess->last_keyframe_request),
2085           GST_TIME_ARGS (round_trip_in_ns));;
2086       return FALSE;
2087     }
2088   }
2089
2090   sess->last_keyframe_request = current_time;
2091
2092   GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
2093       rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
2094       sess->callbacks.request_key_unit);
2095
2096   RTP_SESSION_UNLOCK (sess);
2097   sess->callbacks.request_key_unit (sess, fir,
2098       sess->request_key_unit_user_data);
2099   RTP_SESSION_LOCK (sess);
2100
2101   return TRUE;
2102 }
2103
2104 static void
2105 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2106     guint32 media_ssrc, GstClockTime current_time)
2107 {
2108   RTPSource *src;
2109
2110   if (!sess->callbacks.request_key_unit)
2111     return;
2112
2113   src = find_source (sess, sender_ssrc);
2114   if (!src)
2115     return;
2116
2117   rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
2118 }
2119
2120 static void
2121 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2122     guint8 * fci_data, guint fci_length, GstClockTime current_time)
2123 {
2124   RTPSource *src;
2125   guint32 ssrc;
2126   guint position = 0;
2127   gboolean our_request = FALSE;
2128
2129   if (!sess->callbacks.request_key_unit)
2130     return;
2131
2132   if (fci_length < 8)
2133     return;
2134
2135   src = find_source (sess, sender_ssrc);
2136
2137   /* Hack because Google fails to set the sender_ssrc correctly */
2138   if (!src && sender_ssrc == 1) {
2139     GHashTableIter iter;
2140
2141     /* we can't find the source if there are multiple */
2142     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2143       return;
2144
2145     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2146     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2147       if (!src->internal && rtp_source_is_sender (src))
2148         break;
2149       src = NULL;
2150     }
2151   }
2152   if (!src)
2153     return;
2154
2155   for (position = 0; position < fci_length; position += 8) {
2156     guint8 *data = fci_data + position;
2157     RTPSource *own;
2158
2159     ssrc = GST_READ_UINT32_BE (data);
2160
2161     own = find_source (sess, ssrc);
2162     if (own->internal) {
2163       our_request = TRUE;
2164       break;
2165     }
2166   }
2167   if (!our_request)
2168     return;
2169
2170   rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
2171 }
2172
2173 static void
2174 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2175     RTPArrivalStats * arrival, GstClockTime current_time)
2176 {
2177   GstRTCPType type = gst_rtcp_packet_get_type (packet);
2178   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
2179   guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2180   guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2181   guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
2182   guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
2183   RTPSource *src;
2184
2185   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2186       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2187
2188   if (g_signal_has_handler_pending (sess,
2189           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2190     GstBuffer *fci_buffer = NULL;
2191
2192     if (fci_length > 0) {
2193       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2194           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2195           fci_length);
2196       GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
2197     }
2198
2199     RTP_SESSION_UNLOCK (sess);
2200     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2201         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2202     RTP_SESSION_LOCK (sess);
2203
2204     if (fci_buffer)
2205       gst_buffer_unref (fci_buffer);
2206   }
2207
2208   src = find_source (sess, media_ssrc);
2209   if (!src)
2210     return;
2211
2212   if (sess->rtcp_feedback_retention_window) {
2213     rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
2214   }
2215
2216   if (src->internal ||
2217       /* PSFB FIR puts the media ssrc inside the FCI */
2218       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
2219     switch (type) {
2220       case GST_RTCP_TYPE_PSFB:
2221         switch (fbtype) {
2222           case GST_RTCP_PSFB_TYPE_PLI:
2223             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2224                 current_time);
2225             break;
2226           case GST_RTCP_PSFB_TYPE_FIR:
2227             rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
2228                 current_time);
2229             break;
2230           default:
2231             break;
2232         }
2233         break;
2234       case GST_RTCP_TYPE_RTPFB:
2235       default:
2236         break;
2237     }
2238   }
2239 }
2240
2241 /**
2242  * rtp_session_process_rtcp:
2243  * @sess: and #RTPSession
2244  * @buffer: an RTCP buffer
2245  * @current_time: the current system time
2246  * @ntpnstime: the current NTP time in nanoseconds
2247  *
2248  * Process an RTCP buffer in the session manager. This function takes ownership
2249  * of @buffer.
2250  *
2251  * Returns: a #GstFlowReturn.
2252  */
2253 GstFlowReturn
2254 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2255     GstClockTime current_time, guint64 ntpnstime)
2256 {
2257   GstRTCPPacket packet;
2258   gboolean more, is_bye = FALSE, do_sync = FALSE;
2259   RTPArrivalStats arrival = { NULL, };
2260   GstFlowReturn result = GST_FLOW_OK;
2261   GstRTCPBuffer rtcp = { NULL, };
2262
2263   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2264   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2265
2266   if (!gst_rtcp_buffer_validate (buffer))
2267     goto invalid_packet;
2268
2269   GST_DEBUG ("received RTCP packet");
2270
2271   RTP_SESSION_LOCK (sess);
2272   /* update arrival stats */
2273   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
2274       ntpnstime);
2275
2276   /* start processing the compound packet */
2277   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2278   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
2279   while (more) {
2280     GstRTCPType type;
2281
2282     type = gst_rtcp_packet_get_type (&packet);
2283
2284     /* when we are leaving the session, we should ignore all non-BYE messages */
2285     if (sess->scheduled_bye && type != GST_RTCP_TYPE_BYE) {
2286       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
2287       goto next;
2288     }
2289
2290     switch (type) {
2291       case GST_RTCP_TYPE_SR:
2292         rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
2293         break;
2294       case GST_RTCP_TYPE_RR:
2295         rtp_session_process_rr (sess, &packet, &arrival);
2296         break;
2297       case GST_RTCP_TYPE_SDES:
2298         rtp_session_process_sdes (sess, &packet, &arrival);
2299         break;
2300       case GST_RTCP_TYPE_BYE:
2301         is_bye = TRUE;
2302         /* don't try to attempt lip-sync anymore for streams with a BYE */
2303         do_sync = FALSE;
2304         rtp_session_process_bye (sess, &packet, &arrival);
2305         break;
2306       case GST_RTCP_TYPE_APP:
2307         rtp_session_process_app (sess, &packet, &arrival);
2308         break;
2309       case GST_RTCP_TYPE_RTPFB:
2310       case GST_RTCP_TYPE_PSFB:
2311         rtp_session_process_feedback (sess, &packet, &arrival, current_time);
2312         break;
2313       default:
2314         GST_WARNING ("got unknown RTCP packet");
2315         break;
2316     }
2317   next:
2318     more = gst_rtcp_packet_move_to_next (&packet);
2319   }
2320
2321   gst_rtcp_buffer_unmap (&rtcp);
2322
2323   /* if we are scheduling a BYE, we only want to count bye packets, else we
2324    * count everything */
2325   if (sess->scheduled_bye) {
2326     if (is_bye) {
2327       sess->stats.bye_members++;
2328       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2329     }
2330   } else {
2331     /* keep track of average packet size */
2332     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2333   }
2334   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2335       sess->stats.avg_rtcp_packet_size, arrival.bytes);
2336   RTP_SESSION_UNLOCK (sess);
2337
2338   clean_arrival_stats (&arrival);
2339
2340   /* notify caller of sr packets in the callback */
2341   if (do_sync && sess->callbacks.sync_rtcp) {
2342     result = sess->callbacks.sync_rtcp (sess, buffer,
2343         sess->sync_rtcp_user_data);
2344   } else
2345     gst_buffer_unref (buffer);
2346
2347   return result;
2348
2349   /* ERRORS */
2350 invalid_packet:
2351   {
2352     GST_DEBUG ("invalid RTCP packet received");
2353     gst_buffer_unref (buffer);
2354     return GST_FLOW_OK;
2355   }
2356 }
2357
2358 /**
2359  * rtp_session_update_send_caps:
2360  * @sess: an #RTPSession
2361  * @caps: a #GstCaps
2362  *
2363  * Update the caps of the sender in the rtp session.
2364  */
2365 void
2366 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
2367 {
2368   GstStructure *s;
2369   guint ssrc;
2370
2371   g_return_if_fail (RTP_IS_SESSION (sess));
2372   g_return_if_fail (GST_IS_CAPS (caps));
2373
2374   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
2375
2376   s = gst_caps_get_structure (caps, 0);
2377
2378   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
2379     RTPSource *source;
2380     gboolean created;
2381
2382     RTP_SESSION_LOCK (sess);
2383     source = obtain_internal_source (sess, ssrc, &created);
2384     if (source) {
2385       rtp_source_update_caps (source, caps);
2386       g_object_unref (source);
2387     }
2388     RTP_SESSION_UNLOCK (sess);
2389   }
2390 }
2391
2392 /**
2393  * rtp_session_send_rtp:
2394  * @sess: an #RTPSession
2395  * @data: pointer to either an RTP buffer or a list of RTP buffers
2396  * @is_list: TRUE when @data is a buffer list
2397  * @current_time: the current system time
2398  * @running_time: the running time of @data
2399  *
2400  * Send the RTP buffer in the session manager. This function takes ownership of
2401  * @buffer.
2402  *
2403  * Returns: a #GstFlowReturn.
2404  */
2405 GstFlowReturn
2406 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2407     GstClockTime current_time, GstClockTime running_time)
2408 {
2409   GstFlowReturn result;
2410   RTPSource *source;
2411   gboolean prevsender;
2412   guint64 oldrate;
2413   GstBuffer *buffer;
2414   GstRTPBuffer rtp = { NULL };
2415   guint32 ssrc;
2416   gboolean created;
2417
2418   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2419   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2420
2421   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2422
2423   if (is_list) {
2424     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
2425
2426     buffer = gst_buffer_list_get (list, 0);
2427     if (!buffer)
2428       goto no_buffer;
2429   } else {
2430     buffer = GST_BUFFER_CAST (data);
2431   }
2432
2433   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
2434     goto invalid_packet;
2435
2436   /* get SSRC and look up in session database */
2437   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
2438
2439   gst_rtp_buffer_unmap (&rtp);
2440
2441   RTP_SESSION_LOCK (sess);
2442   source = obtain_internal_source (sess, ssrc, &created);
2443
2444   /* update last activity */
2445   source->last_rtp_activity = current_time;
2446
2447   prevsender = RTP_SOURCE_IS_SENDER (source);
2448   oldrate = source->bitrate;
2449
2450   /* we use our own source to send */
2451   result = rtp_source_send_rtp (source, data, is_list, running_time);
2452
2453   source_update_sender (sess, source, prevsender);
2454
2455   if (oldrate != source->bitrate)
2456     sess->recalc_bandwidth = TRUE;
2457   RTP_SESSION_UNLOCK (sess);
2458
2459   g_object_unref (source);
2460
2461   return result;
2462
2463 invalid_packet:
2464   {
2465     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2466     GST_DEBUG ("invalid RTP packet received");
2467     return GST_FLOW_OK;
2468   }
2469 no_buffer:
2470   {
2471     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2472     GST_DEBUG ("no buffer in list");
2473     return GST_FLOW_OK;
2474   }
2475 }
2476
2477 static void
2478 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2479 {
2480   *bandwidth += source->bitrate;
2481 }
2482
2483 /* must be called with session lock */
2484 static GstClockTime
2485 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2486     gboolean first)
2487 {
2488   GstClockTime result;
2489
2490   /* recalculate bandwidth when it changed */
2491   if (sess->recalc_bandwidth) {
2492     gdouble bandwidth;
2493
2494     if (sess->bandwidth > 0)
2495       bandwidth = sess->bandwidth;
2496     else {
2497       /* If it is <= 0, then try to estimate the actual bandwidth */
2498       bandwidth = 0;
2499
2500       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2501           (GHFunc) add_bitrates, &bandwidth);
2502       bandwidth /= 8.0;
2503     }
2504     if (bandwidth < 8000)
2505       bandwidth = RTP_STATS_BANDWIDTH;
2506
2507     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2508         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2509
2510     sess->recalc_bandwidth = FALSE;
2511   }
2512
2513   if (sess->scheduled_bye) {
2514     result = rtp_stats_calculate_bye_interval (&sess->stats);
2515   } else {
2516     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
2517         sess->stats.internal_sender_sources > 0, first);
2518   }
2519
2520   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2521       GST_TIME_ARGS (result), first);
2522
2523   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2524     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
2525
2526   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2527
2528   return result;
2529 }
2530
2531 static void
2532 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
2533 {
2534   if (source->internal)
2535     rtp_source_mark_bye (source, reason);
2536 }
2537
2538 /**
2539  * rtp_session_mark_all_bye:
2540  * @sess: an #RTPSession
2541  * @reason: a reason
2542  *
2543  * Mark all internal sources of the session as BYE with @reason.
2544  */
2545 void
2546 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
2547 {
2548   g_return_if_fail (RTP_IS_SESSION (sess));
2549
2550   RTP_SESSION_LOCK (sess);
2551   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2552       (GHFunc) source_mark_bye, (gpointer) reason);
2553   RTP_SESSION_UNLOCK (sess);
2554 }
2555
2556 /* Stop the current @sess and schedule a BYE message for the other members.
2557  * One must have the session lock to call this function
2558  */
2559 static GstFlowReturn
2560 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
2561 {
2562   GstFlowReturn result = GST_FLOW_OK;
2563   GstClockTime interval;
2564
2565   /* nothing to do it we already scheduled bye */
2566   if (sess->scheduled_bye)
2567     goto done;
2568
2569   /* we schedule BYE now */
2570   sess->scheduled_bye = TRUE;
2571   /* at least one member wants to send a BYE */
2572   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
2573   sess->stats.bye_members = 1;
2574   sess->first_rtcp = TRUE;
2575   sess->allow_early = TRUE;
2576
2577   /* reschedule transmission */
2578   sess->last_rtcp_send_time = current_time;
2579   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2580
2581   if (interval != GST_CLOCK_TIME_NONE)
2582     sess->next_rtcp_check_time = current_time + interval;
2583   else
2584     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
2585
2586   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2587       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2588
2589   RTP_SESSION_UNLOCK (sess);
2590   /* notify app of reconsideration */
2591   if (sess->callbacks.reconsider)
2592     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2593   RTP_SESSION_LOCK (sess);
2594 done:
2595
2596   return result;
2597 }
2598
2599 /**
2600  * rtp_session_schedule_bye:
2601  * @sess: an #RTPSession
2602  * @current_time: the current system time
2603  *
2604  * Schedule a BYE message for all sources marked as BYE in @sess.
2605  *
2606  * Returns: a #GstFlowReturn.
2607  */
2608 GstFlowReturn
2609 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
2610 {
2611   GstFlowReturn result = GST_FLOW_OK;
2612
2613   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2614
2615   RTP_SESSION_LOCK (sess);
2616   result = rtp_session_schedule_bye_locked (sess, current_time);
2617   RTP_SESSION_UNLOCK (sess);
2618
2619   return result;
2620 }
2621
2622 /**
2623  * rtp_session_next_timeout:
2624  * @sess: an #RTPSession
2625  * @current_time: the current system time
2626  *
2627  * Get the next time we should perform session maintenance tasks.
2628  *
2629  * Returns: a time when rtp_session_on_timeout() should be called with the
2630  * current system time.
2631  */
2632 GstClockTime
2633 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2634 {
2635   GstClockTime result, interval = 0;
2636
2637   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2638
2639   RTP_SESSION_LOCK (sess);
2640
2641   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
2642     result = sess->next_early_rtcp_time;
2643     goto early_exit;
2644   }
2645
2646   result = sess->next_rtcp_check_time;
2647
2648   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2649       ", next time: %" GST_TIME_FORMAT,
2650       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2651
2652   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
2653     GST_DEBUG ("take current time as base");
2654     /* our previous check time expired, start counting from the current time
2655      * again. */
2656     result = current_time;
2657   }
2658
2659   if (sess->scheduled_bye) {
2660     if (sess->stats.active_sources >= 50) {
2661       GST_DEBUG ("reconsider BYE, more than 50 sources");
2662       /* reconsider BYE if members >= 50 */
2663       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2664     }
2665   } else {
2666     if (sess->first_rtcp) {
2667       GST_DEBUG ("first RTCP packet");
2668       /* we are called for the first time */
2669       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2670     } else if (sess->next_rtcp_check_time < current_time) {
2671       GST_DEBUG ("old check time expired, getting new timeout");
2672       /* get a new timeout when we need to */
2673       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2674     }
2675   }
2676
2677   if (interval != GST_CLOCK_TIME_NONE)
2678     result += interval;
2679   else
2680     result = GST_CLOCK_TIME_NONE;
2681
2682   sess->next_rtcp_check_time = result;
2683
2684 early_exit:
2685
2686   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2687       ", next time: %" GST_TIME_FORMAT,
2688       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2689   RTP_SESSION_UNLOCK (sess);
2690
2691   return result;
2692 }
2693
2694 typedef struct
2695 {
2696   RTPSource *source;
2697   gboolean is_bye;
2698   GstBuffer *buffer;
2699 } ReportOutput;
2700
2701 typedef struct
2702 {
2703   GstRTCPBuffer rtcpbuf;
2704   RTPSession *sess;
2705   RTPSource *source;
2706   guint num_to_report;
2707   gboolean have_fir;
2708   gboolean have_pli;
2709   GstBuffer *rtcp;
2710   GstClockTime current_time;
2711   guint64 ntpnstime;
2712   GstClockTime running_time;
2713   GstClockTime interval;
2714   GstRTCPPacket packet;
2715   gboolean has_sdes;
2716   gboolean is_early;
2717   gboolean may_suppress;
2718   GQueue output;
2719 } ReportData;
2720
2721 static void
2722 session_start_rtcp (RTPSession * sess, ReportData * data)
2723 {
2724   GstRTCPPacket *packet = &data->packet;
2725   RTPSource *own = data->source;
2726   GstRTCPBuffer *rtcp = &data->rtcpbuf;
2727
2728   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2729   data->has_sdes = FALSE;
2730
2731   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
2732
2733   if (RTP_SOURCE_IS_SENDER (own)) {
2734     guint64 ntptime;
2735     guint32 rtptime;
2736     guint32 packet_count, octet_count;
2737
2738     /* we are a sender, create SR */
2739     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2740     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
2741
2742     /* get latest stats */
2743     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
2744         &ntptime, &rtptime, &packet_count, &octet_count);
2745     /* store stats */
2746     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2747         packet_count, octet_count);
2748
2749     /* fill in sender report info */
2750     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2751         ntptime, rtptime, packet_count, octet_count);
2752   } else {
2753     /* we are only receiver, create RR */
2754     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2755     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
2756     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2757   }
2758 }
2759
2760 /* construct a Sender or Receiver Report */
2761 static void
2762 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2763 {
2764   RTPSession *sess = data->sess;
2765   GstRTCPPacket *packet = &data->packet;
2766   guint8 fractionlost;
2767   gint32 packetslost;
2768   guint32 exthighestseq, jitter;
2769   guint32 lsr, dlsr;
2770
2771   /* don't report for sources in future generations */
2772   if (((gint16) (source->generation - sess->generation)) > 0) {
2773     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
2774         source->generation, sess->generation);
2775     return;
2776   }
2777
2778   /* only report about other sender */
2779   if (source == data->source)
2780     goto reported;
2781
2782   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
2783     GST_DEBUG ("max RB count reached");
2784     return;
2785   }
2786
2787   if (!RTP_SOURCE_IS_SENDER (source)) {
2788     GST_DEBUG ("source %08x not sender", source->ssrc);
2789     goto reported;
2790   }
2791
2792   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
2793
2794   /* get new stats */
2795   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2796       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2797
2798   /* store last generated RR packet */
2799   source->last_rr.is_valid = TRUE;
2800   source->last_rr.fractionlost = fractionlost;
2801   source->last_rr.packetslost = packetslost;
2802   source->last_rr.exthighestseq = exthighestseq;
2803   source->last_rr.jitter = jitter;
2804   source->last_rr.lsr = lsr;
2805   source->last_rr.dlsr = dlsr;
2806
2807   /* packet is not yet filled, add report block for this source. */
2808   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2809       exthighestseq, jitter, lsr, dlsr);
2810
2811 reported:
2812   /* source is reported, move to next generation */
2813   source->generation = sess->generation + 1;
2814
2815   /* if we reported all sources in this generation, move to next */
2816   if (--data->num_to_report == 0) {
2817     sess->generation++;
2818     GST_DEBUG ("all reported, generation now %u", sess->generation);
2819   }
2820 }
2821
2822 /* construct FIR */
2823 static void
2824 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
2825 {
2826   GstRTCPPacket *packet = &data->packet;
2827   guint16 len;
2828   guint8 *fci_data;
2829
2830   if (!source->send_fir)
2831     return;
2832
2833   len = gst_rtcp_packet_fb_get_fci_length (packet);
2834   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
2835     /* exit because the packet is full, will put next request in a
2836      * further packet */
2837     return;
2838
2839   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
2840
2841   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
2842   fci_data += 4;
2843   fci_data[0] = source->current_send_fir_seqnum;
2844   fci_data[1] = fci_data[2] = fci_data[3] = 0;
2845
2846   source->send_fir = FALSE;
2847 }
2848
2849 static void
2850 session_fir (RTPSession * sess, ReportData * data)
2851 {
2852   GstRTCPBuffer *rtcp = &data->rtcpbuf;
2853   GstRTCPPacket *packet = &data->packet;
2854
2855   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
2856     return;
2857
2858   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
2859   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
2860   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
2861
2862   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2863       (GHFunc) session_add_fir, data);
2864
2865   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
2866     gst_rtcp_packet_remove (packet);
2867   else
2868     data->may_suppress = FALSE;
2869 }
2870
2871 static gboolean
2872 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
2873 {
2874   GstRTCPPacket packet;
2875   GstRTCPBuffer rtcp = { NULL, };
2876   gboolean ret = FALSE;
2877
2878   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
2879
2880   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
2881     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
2882         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
2883       ret = TRUE;
2884   }
2885
2886   gst_rtcp_buffer_unmap (&rtcp);
2887
2888   return ret;
2889 }
2890
2891 /* construct PLI */
2892 static void
2893 session_pli (const gchar * key, RTPSource * source, ReportData * data)
2894 {
2895   GstRTCPBuffer *rtcp = &data->rtcpbuf;
2896   GstRTCPPacket *packet = &data->packet;
2897
2898   if (!source->send_pli)
2899     return;
2900
2901   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
2902     return;
2903
2904   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
2905     /* exit because the packet is full, will put next request in a
2906      * further packet */
2907     return;
2908
2909   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
2910   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
2911   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
2912
2913   source->send_pli = FALSE;
2914   data->may_suppress = FALSE;
2915 }
2916
2917 /* perform cleanup of sources that timed out */
2918 static void
2919 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2920 {
2921   gboolean remove = FALSE;
2922   gboolean byetimeout = FALSE;
2923   gboolean sendertimeout = FALSE;
2924   gboolean is_sender, is_active;
2925   RTPSession *sess = data->sess;
2926   GstClockTime interval, binterval;
2927   GstClockTime btime;
2928
2929   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
2930
2931   /* check for outdated collisions */
2932   if (source->internal) {
2933     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
2934     rtp_source_timeout (source, data->current_time,
2935         /* "a relatively long time" -- RFC 3550 section 8.2 */
2936         RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
2937         data->running_time - sess->rtcp_feedback_retention_window);
2938   }
2939
2940   /* nothing else to do when without RTCP */
2941   if (data->interval == GST_CLOCK_TIME_NONE)
2942     return;
2943
2944   is_sender = RTP_SOURCE_IS_SENDER (source);
2945   is_active = RTP_SOURCE_IS_ACTIVE (source);
2946
2947   /* our own rtcp interval may have been forced low by secondary configuration,
2948    * while sender side may still operate with higher interval,
2949    * so do not just take our interval to decide on timing out sender,
2950    * but take (if data->interval <= 5 * GST_SECOND):
2951    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
2952    * where sender_interval is difference between last 2 received RTCP reports
2953    */
2954   if (data->interval >= 5 * GST_SECOND || source->internal) {
2955     binterval = data->interval;
2956   } else {
2957     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
2958         GST_TIME_ARGS (source->stats.prev_rtcptime),
2959         GST_TIME_ARGS (source->stats.last_rtcptime));
2960     /* if not received enough yet, fallback to larger default */
2961     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
2962       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
2963     else
2964       binterval = 5 * GST_SECOND;
2965     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
2966   }
2967   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
2968       GST_TIME_ARGS (binterval));
2969
2970   if (!source->internal) {
2971     if (source->marked_bye) {
2972       /* if we received a BYE from the source, remove the source after some
2973        * time. */
2974       if (data->current_time > source->bye_time &&
2975           data->current_time - source->bye_time > sess->stats.bye_timeout) {
2976         GST_DEBUG ("removing BYE source %08x", source->ssrc);
2977         remove = TRUE;
2978         byetimeout = TRUE;
2979       }
2980     }
2981     /* sources that were inactive for more than 5 times the deterministic reporting
2982      * interval get timed out. the min timeout is 5 seconds. */
2983     /* mind old time that might pre-date last time going to PLAYING */
2984     btime = MAX (source->last_activity, sess->start_time);
2985     if (data->current_time > btime) {
2986       interval = MAX (binterval * 5, 5 * GST_SECOND);
2987       if (data->current_time - btime > interval) {
2988         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2989             source->ssrc, GST_TIME_ARGS (btime));
2990         remove = TRUE;
2991       }
2992     }
2993   }
2994
2995   /* senders that did not send for a long time become a receiver, this also
2996    * holds for our own sources. */
2997   if (is_sender) {
2998     /* mind old time that might pre-date last time going to PLAYING */
2999     btime = MAX (source->last_rtp_activity, sess->start_time);
3000     if (data->current_time > btime) {
3001       interval = MAX (binterval * 2, 5 * GST_SECOND);
3002       if (data->current_time - btime > interval) {
3003         if (source->internal && source->sent_bye) {
3004           /* an internal source is BYE and stopped sending RTP, remove */
3005           GST_DEBUG ("internal BYE source %08x timed out, last %"
3006               GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3007           remove = TRUE;
3008         } else {
3009           GST_DEBUG ("sender source %08x timed out and became receiver, last %"
3010               GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3011           sendertimeout = TRUE;
3012         }
3013       }
3014     }
3015   }
3016
3017   if (remove) {
3018     sess->total_sources--;
3019     if (is_sender) {
3020       sess->stats.sender_sources--;
3021       if (source->internal)
3022         sess->stats.internal_sender_sources--;
3023     }
3024     if (is_active)
3025       sess->stats.active_sources--;
3026
3027     if (source->internal)
3028       sess->stats.internal_sources--;
3029
3030     if (byetimeout)
3031       on_bye_timeout (sess, source);
3032     else
3033       on_timeout (sess, source);
3034   } else {
3035     if (sendertimeout) {
3036       source->is_sender = FALSE;
3037       sess->stats.sender_sources--;
3038       if (source->internal)
3039         sess->stats.internal_sender_sources--;
3040
3041       on_sender_timeout (sess, source);
3042     }
3043     /* count how many source to report in this generation */
3044     if (((gint16) (source->generation - sess->generation)) <= 0)
3045       data->num_to_report++;
3046   }
3047   source->closing = remove;
3048 }
3049
3050 static void
3051 session_sdes (RTPSession * sess, ReportData * data)
3052 {
3053   GstRTCPPacket *packet = &data->packet;
3054   const GstStructure *sdes;
3055   gint i, n_fields;
3056   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3057
3058   /* add SDES packet */
3059   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
3060
3061   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
3062
3063   sdes = rtp_source_get_sdes_struct (data->source);
3064
3065   /* add all fields in the structure, the order is not important. */
3066   n_fields = gst_structure_n_fields (sdes);
3067   for (i = 0; i < n_fields; ++i) {
3068     const gchar *field;
3069     const gchar *value;
3070     GstRTCPSDESType type;
3071
3072     field = gst_structure_nth_field_name (sdes, i);
3073     if (field == NULL)
3074       continue;
3075     value = gst_structure_get_string (sdes, field);
3076     if (value == NULL)
3077       continue;
3078     type = gst_rtcp_sdes_name_to_type (field);
3079
3080     /* Early packets are minimal and only include the CNAME */
3081     if (data->is_early && type != GST_RTCP_SDES_CNAME)
3082       continue;
3083
3084     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
3085       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
3086           (const guint8 *) value);
3087     } else if (type == GST_RTCP_SDES_PRIV) {
3088       gsize prefix_len;
3089       gsize value_len;
3090       gsize data_len;
3091       guint8 data[256];
3092
3093       /* don't accept entries that are too big */
3094       prefix_len = strlen (field);
3095       if (prefix_len > 255)
3096         continue;
3097       value_len = strlen (value);
3098       if (value_len > 255)
3099         continue;
3100       data_len = 1 + prefix_len + value_len;
3101       if (data_len > 255)
3102         continue;
3103
3104       data[0] = prefix_len;
3105       memcpy (&data[1], field, prefix_len);
3106       memcpy (&data[1 + prefix_len], value, value_len);
3107
3108       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
3109     }
3110   }
3111
3112   data->has_sdes = TRUE;
3113 }
3114
3115 /* schedule a BYE packet */
3116 static void
3117 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
3118 {
3119   GstRTCPPacket *packet = &data->packet;
3120   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3121
3122   /* add SDES */
3123   session_sdes (sess, data);
3124   /* add a BYE packet */
3125   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
3126   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
3127   if (source->bye_reason)
3128     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
3129
3130   /* we have a BYE packet now */
3131   source->sent_bye = TRUE;
3132 }
3133
3134 static gboolean
3135 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
3136 {
3137   GstClockTime new_send_time, elapsed;
3138
3139   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3140     data->is_early = TRUE;
3141   else
3142     data->is_early = FALSE;
3143
3144   if (data->is_early && sess->next_early_rtcp_time < current_time)
3145     goto early;
3146
3147   /* no need to check yet */
3148   if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
3149       sess->next_rtcp_check_time > current_time) {
3150     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
3151         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
3152         GST_TIME_ARGS (current_time));
3153     return FALSE;
3154   }
3155
3156   /* get elapsed time since we last reported */
3157   elapsed = current_time - sess->last_rtcp_send_time;
3158
3159   new_send_time = data->interval;
3160   /* perform forward reconsideration */
3161   if (new_send_time != GST_CLOCK_TIME_NONE) {
3162     new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, new_send_time);
3163
3164     GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
3165         GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time),
3166         GST_TIME_ARGS (elapsed));
3167
3168     new_send_time += sess->last_rtcp_send_time;
3169   }
3170
3171   /* check if reconsideration */
3172   if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
3173     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
3174         GST_TIME_ARGS (new_send_time));
3175     /* store new check time */
3176     sess->next_rtcp_check_time = new_send_time;
3177     return FALSE;
3178   }
3179
3180 early:
3181
3182   new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
3183
3184   GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
3185       GST_TIME_ARGS (new_send_time));
3186
3187   sess->next_rtcp_check_time = new_send_time;
3188   if (new_send_time != GST_CLOCK_TIME_NONE) {
3189     sess->next_rtcp_check_time += current_time;
3190
3191     /* Apply the rules from RFC 4585 section 3.5.3 */
3192     if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
3193       GstClockTimeDiff T_rr_current_interval =
3194           g_random_double_range (0.5, 1.5) * sess->stats.min_interval;
3195
3196       /* This will caused the RTCP to be suppressed if no FB packets are added */
3197       if (sess->last_rtcp_send_time + T_rr_current_interval >
3198           sess->next_rtcp_check_time) {
3199         GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
3200             " last: %" GST_TIME_FORMAT
3201             " + T_rr_current_interval: %" GST_TIME_FORMAT
3202             " >  sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
3203             GST_TIME_ARGS (sess->stats.min_interval),
3204             GST_TIME_ARGS (sess->last_rtcp_send_time),
3205             GST_TIME_ARGS (T_rr_current_interval),
3206             GST_TIME_ARGS (sess->next_rtcp_check_time));
3207         data->may_suppress = TRUE;
3208       }
3209     }
3210   }
3211
3212   return TRUE;
3213 }
3214
3215 static void
3216 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
3217 {
3218   g_hash_table_insert (hash_table, key, g_object_ref (source));
3219 }
3220
3221 static gboolean
3222 remove_closing_sources (const gchar * key, RTPSource * source,
3223     ReportData * data)
3224 {
3225   if (source->closing)
3226     return TRUE;
3227
3228   if (source->send_fir)
3229     data->have_fir = TRUE;
3230   if (source->send_pli)
3231     data->have_pli = TRUE;
3232
3233   return FALSE;
3234 }
3235
3236 static void
3237 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
3238 {
3239   RTPSession *sess = data->sess;
3240   gboolean is_bye = FALSE;
3241   ReportOutput *output;
3242
3243   /* only generate RTCP for active internal sources */
3244   if (!source->internal || source->sent_bye)
3245     return;
3246
3247   data->source = source;
3248
3249   /* open packet */
3250   session_start_rtcp (sess, data);
3251
3252   if (source->marked_bye) {
3253     /* send BYE */
3254     make_source_bye (sess, source, data);
3255     is_bye = TRUE;
3256   } else if (!data->is_early) {
3257     /* loop over all known sources and add report blocks. If we are early, we
3258      * just make a minimal RTCP packet and skip this step */
3259     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3260         (GHFunc) session_report_blocks, data);
3261   }
3262   if (!data->has_sdes)
3263     session_sdes (sess, data);
3264
3265   if (data->have_fir)
3266     session_fir (sess, data);
3267
3268   if (data->have_pli)
3269     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3270         (GHFunc) session_pli, data);
3271
3272   gst_rtcp_buffer_unmap (&data->rtcpbuf);
3273
3274   output = g_slice_new (ReportOutput);
3275   output->source = g_object_ref (source);
3276   output->is_bye = is_bye;
3277   output->buffer = data->rtcp;
3278   /* queue the RTCP packet to push later */
3279   g_queue_push_tail (&data->output, output);
3280 }
3281
3282 /**
3283  * rtp_session_on_timeout:
3284  * @sess: an #RTPSession
3285  * @current_time: the current system time
3286  * @ntpnstime: the current NTP time in nanoseconds
3287  * @running_time: the current running_time of the pipeline
3288  *
3289  * Perform maintenance actions after the timeout obtained with
3290  * rtp_session_next_timeout() expired.
3291  *
3292  * This function will perform timeouts of receivers and senders, send a BYE
3293  * packet or generate RTCP packets with current session stats.
3294  *
3295  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
3296  * times, for each packet that should be processed.
3297  *
3298  * Returns: a #GstFlowReturn.
3299  */
3300 GstFlowReturn
3301 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
3302     guint64 ntpnstime, GstClockTime running_time)
3303 {
3304   GstFlowReturn result = GST_FLOW_OK;
3305   ReportData data = { GST_RTCP_BUFFER_INIT };
3306   GHashTable *table_copy;
3307   ReportOutput *output;
3308
3309   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3310
3311   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
3312       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3313       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
3314
3315   data.sess = sess;
3316   data.current_time = current_time;
3317   data.ntpnstime = ntpnstime;
3318   data.running_time = running_time;
3319   data.num_to_report = 0;
3320   data.may_suppress = FALSE;
3321   g_queue_init (&data.output);
3322
3323   RTP_SESSION_LOCK (sess);
3324   /* get a new interval, we need this for various cleanups etc */
3325   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
3326
3327   /* we need an internal source now */
3328   if (sess->stats.internal_sources == 0) {
3329     RTPSource *source;
3330     gboolean created;
3331
3332     source = obtain_internal_source (sess, sess->suggested_ssrc, &created);
3333     g_object_unref (source);
3334   }
3335
3336   /* Make a local copy of the hashtable. We need to do this because the
3337    * cleanup stage below releases the session lock. */
3338   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
3339       (GDestroyNotify) g_object_unref);
3340   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3341       (GHFunc) clone_ssrcs_hashtable, table_copy);
3342
3343   /* Clean up the session, mark the source for removing, this might release the
3344    * session lock. */
3345   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
3346   g_hash_table_destroy (table_copy);
3347
3348   /* Now remove the marked sources */
3349   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
3350       (GHRFunc) remove_closing_sources, NULL);
3351
3352   /* see if we need to generate SR or RR packets */
3353   if (!is_rtcp_time (sess, current_time, &data))
3354     goto done;
3355
3356   GST_DEBUG ("doing RTCP generation %u for %u sources", sess->generation,
3357       data.num_to_report);
3358
3359   /* generate RTCP for all internal sources */
3360   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3361       (GHFunc) generate_rtcp, &data);
3362
3363   /* we keep track of the last report time in order to timeout inactive
3364    * receivers or senders */
3365   if (!data.is_early && !data.may_suppress)
3366     sess->last_rtcp_send_time = data.current_time;
3367   sess->first_rtcp = FALSE;
3368   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
3369
3370 done:
3371   RTP_SESSION_UNLOCK (sess);
3372
3373   /* push out the RTCP packets */
3374   while ((output = g_queue_pop_head (&data.output))) {
3375     gboolean do_not_suppress;
3376     GstBuffer *buffer = output->buffer;
3377     RTPSource *source = output->source;
3378
3379     /* Give the user a change to add its own packet */
3380     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
3381         buffer, data.is_early, &do_not_suppress);
3382
3383     if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
3384       guint packet_size;
3385
3386       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
3387
3388       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
3389       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
3390           sess->stats.avg_rtcp_packet_size, packet_size);
3391       result =
3392           sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
3393           sess->send_rtcp_user_data);
3394     } else {
3395       GST_DEBUG ("freeing packet callback: %p"
3396           " do_not_suppress: %d may_suppress: %d",
3397           sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
3398       gst_buffer_unref (buffer);
3399     }
3400     g_object_unref (source);
3401     g_slice_free (ReportOutput, output);
3402   }
3403   return result;
3404 }
3405
3406 /**
3407  * rtp_session_request_early_rtcp:
3408  * @sess: an #RTPSession
3409  * @current_time: the current system time
3410  * @max_delay: maximum delay
3411  *
3412  * Request transmission of early RTCP
3413  */
3414 void
3415 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
3416     GstClockTimeDiff max_delay)
3417 {
3418   GstClockTime T_dither_max;
3419
3420   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
3421
3422   RTP_SESSION_LOCK (sess);
3423
3424   /* Check if already requested */
3425   /*  RFC 4585 section 3.5.2 step 2 */
3426   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3427     goto dont_send;
3428
3429   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time))
3430     goto dont_send;
3431
3432   /* Ignore the request a scheduled packet will be in time anyway */
3433   if (current_time + max_delay > sess->next_rtcp_check_time)
3434     goto dont_send;
3435
3436   /*  RFC 4585 section 3.5.2 step 2b */
3437   /* If the total sources is <=2, then there is only us and one peer */
3438   if (sess->total_sources <= 2) {
3439     T_dither_max = 0;
3440   } else {
3441     /* Divide by 2 because l = 0.5 */
3442     T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
3443     T_dither_max /= 2;
3444   }
3445
3446   /*  RFC 4585 section 3.5.2 step 3 */
3447   if (current_time + T_dither_max > sess->next_rtcp_check_time)
3448     goto dont_send;
3449
3450   /*  RFC 4585 section 3.5.2 step 4
3451    * Don't send if allow_early is FALSE, but not if we are in
3452    * immediate mode, meaning we are part of a group of at most the
3453    * application-specific threshold.
3454    */
3455   if (sess->total_sources > sess->rtcp_immediate_feedback_threshold &&
3456       sess->allow_early == FALSE)
3457     goto dont_send;
3458
3459   if (T_dither_max) {
3460     /* Schedule an early transmission later */
3461     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
3462         current_time;
3463   } else {
3464     /* If no dithering, schedule it for NOW */
3465     sess->next_early_rtcp_time = current_time;
3466   }
3467
3468   RTP_SESSION_UNLOCK (sess);
3469
3470   /* notify app of need to send packet early
3471    * and therefore of timeout change */
3472   if (sess->callbacks.reconsider)
3473     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3474
3475   return;
3476
3477 dont_send:
3478
3479   RTP_SESSION_UNLOCK (sess);
3480 }
3481
3482 gboolean
3483 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, GstClockTime now,
3484     gboolean fir, gint count)
3485 {
3486   RTPSource *src = find_source (sess, ssrc);
3487
3488   if (!src)
3489     return FALSE;
3490
3491   if (fir) {
3492     src->send_pli = FALSE;
3493     src->send_fir = TRUE;
3494
3495     if (count == -1 || count != src->last_fir_count)
3496       src->current_send_fir_seqnum++;
3497     src->last_fir_count = count;
3498   } else if (!src->send_fir) {
3499     src->send_pli = TRUE;
3500   }
3501
3502   rtp_session_request_early_rtcp (sess, now, 200 * GST_MSECOND);
3503
3504   return TRUE;
3505 }
3506
3507 static void
3508 rtp_session_send_rtcp (RTPSession * sess, GstClockTimeDiff max_delay)
3509 {
3510   GstClockTime now;
3511
3512   if (!sess->callbacks.send_rtcp)
3513     return;
3514
3515   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
3516
3517   rtp_session_request_early_rtcp (sess, now, max_delay);
3518 }