24e752b1c1ae92d3252bb344e5ddcab4a36f7967
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25
26 #include <gst/rtp/gstrtpbuffer.h>
27 #include <gst/rtp/gstrtcpbuffer.h>
28 #include <gst/netbuffer/gstnetbuffer.h>
29
30 #include <gst/glib-compat-private.h>
31
32 #include "gstrtpbin-marshal.h"
33 #include "rtpsession.h"
34
35 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
36 #define GST_CAT_DEFAULT rtp_session_debug
37
38 /* signals and args */
39 enum
40 {
41   SIGNAL_GET_SOURCE_BY_SSRC,
42   SIGNAL_ON_NEW_SSRC,
43   SIGNAL_ON_SSRC_COLLISION,
44   SIGNAL_ON_SSRC_VALIDATED,
45   SIGNAL_ON_SSRC_ACTIVE,
46   SIGNAL_ON_SSRC_SDES,
47   SIGNAL_ON_BYE_SSRC,
48   SIGNAL_ON_BYE_TIMEOUT,
49   SIGNAL_ON_TIMEOUT,
50   SIGNAL_ON_SENDER_TIMEOUT,
51   SIGNAL_ON_SENDING_RTCP,
52   SIGNAL_ON_FEEDBACK_RTCP,
53   SIGNAL_SEND_RTCP,
54   LAST_SIGNAL
55 };
56
57 #define DEFAULT_INTERNAL_SOURCE      NULL
58 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
59 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
60 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
61 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
62 #define DEFAULT_RTCP_MTU             1400
63 #define DEFAULT_SDES                 NULL
64 #define DEFAULT_NUM_SOURCES          0
65 #define DEFAULT_NUM_ACTIVE_SOURCES   0
66 #define DEFAULT_SOURCES              NULL
67 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
68 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
69 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
70
71 enum
72 {
73   PROP_0,
74   PROP_INTERNAL_SSRC,
75   PROP_INTERNAL_SOURCE,
76   PROP_BANDWIDTH,
77   PROP_RTCP_FRACTION,
78   PROP_RTCP_RR_BANDWIDTH,
79   PROP_RTCP_RS_BANDWIDTH,
80   PROP_RTCP_MTU,
81   PROP_SDES,
82   PROP_NUM_SOURCES,
83   PROP_NUM_ACTIVE_SOURCES,
84   PROP_SOURCES,
85   PROP_FAVOR_NEW,
86   PROP_RTCP_MIN_INTERVAL,
87   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
88   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
89   PROP_LAST
90 };
91
92 /* update average packet size */
93 #define INIT_AVG(avg, val) \
94    (avg) = (val);
95 #define UPDATE_AVG(avg, val)            \
96   if ((avg) == 0)                       \
97    (avg) = (val);                       \
98   else                                  \
99    (avg) = ((val) + (15 * (avg))) >> 4;
100
101
102 /* The number RTCP intervals after which to timeout entries in the
103  * collision table
104  */
105 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
106
107 /* GObject vmethods */
108 static void rtp_session_finalize (GObject * object);
109 static void rtp_session_set_property (GObject * object, guint prop_id,
110     const GValue * value, GParamSpec * pspec);
111 static void rtp_session_get_property (GObject * object, guint prop_id,
112     GValue * value, GParamSpec * pspec);
113
114 static gboolean rtp_session_on_sending_rtcp (RTPSession * sess,
115     GstBuffer * buffer, gboolean early);
116 static void rtp_session_send_rtcp (RTPSession * sess,
117     GstClockTimeDiff max_delay);
118
119
120 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
121
122 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
123
124 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
125     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
126 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
127     const gchar * reason, GstClockTime current_time);
128 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
129     gboolean deterministic, gboolean first);
130
131 static gboolean
132 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
133     const GValue * handler_return, gpointer data)
134 {
135   if (g_value_get_boolean (handler_return))
136     g_value_set_boolean (return_accu, TRUE);
137
138   return TRUE;
139 }
140
141 static void
142 gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN (GClosure * closure,
143     GValue * return_value G_GNUC_UNUSED, guint n_param_values,
144     const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
145     gpointer marshal_data)
146 {
147   typedef gboolean (*GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (gpointer data1,
148       gpointer arg_1, gboolean arg_2, gpointer data2);
149   register GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN callback;
150   register GCClosure *cc = (GCClosure *) closure;
151   register gpointer data1, data2;
152   gboolean v_return;
153
154   g_return_if_fail (return_value != NULL);
155   g_return_if_fail (n_param_values == 3);
156
157   if (G_CCLOSURE_SWAP_DATA (closure)) {
158     data1 = closure->data;
159     data2 = g_value_peek_pointer (param_values + 0);
160   } else {
161     data1 = g_value_peek_pointer (param_values + 0);
162     data2 = closure->data;
163   }
164   callback =
165       (GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (marshal_data ? marshal_data :
166       cc->callback);
167
168   v_return = callback (data1,
169       gst_value_get_mini_object (param_values + 1),
170       g_value_get_boolean (param_values + 2), data2);
171
172   g_value_set_boolean (return_value, v_return);
173 }
174
175 static void
176 gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT (GClosure * closure,
177     GValue * return_value G_GNUC_UNUSED, guint n_param_values,
178     const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
179     gpointer marshal_data)
180 {
181   typedef void (*GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (gpointer
182       data1, guint arg_1, guint arg_2, guint arg_3, guint arg_4, gpointer arg_5,
183       gpointer data2);
184   register GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT callback;
185   register GCClosure *cc = (GCClosure *) closure;
186   register gpointer data1, data2;
187
188   g_return_if_fail (n_param_values == 6);
189
190   if (G_CCLOSURE_SWAP_DATA (closure)) {
191     data1 = closure->data;
192     data2 = g_value_peek_pointer (param_values + 0);
193   } else {
194     data1 = g_value_peek_pointer (param_values + 0);
195     data2 = closure->data;
196   }
197   callback =
198       (GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (marshal_data ?
199       marshal_data : cc->callback);
200
201   callback (data1,
202       g_value_get_uint (param_values + 1),
203       g_value_get_uint (param_values + 2),
204       g_value_get_uint (param_values + 3),
205       g_value_get_uint (param_values + 4),
206       gst_value_get_mini_object (param_values + 5), data2);
207 }
208
209
210 static void
211 rtp_session_class_init (RTPSessionClass * klass)
212 {
213   GObjectClass *gobject_class;
214
215   gobject_class = (GObjectClass *) klass;
216
217   gobject_class->finalize = rtp_session_finalize;
218   gobject_class->set_property = rtp_session_set_property;
219   gobject_class->get_property = rtp_session_get_property;
220
221   /**
222    * RTPSession::get-source-by-ssrc:
223    * @session: the object which received the signal
224    * @ssrc: the SSRC of the RTPSource
225    *
226    * Request the #RTPSource object with SSRC @ssrc in @session.
227    */
228   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
229       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
230       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
231           get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
232       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
233
234   /**
235    * RTPSession::on-new-ssrc:
236    * @session: the object which received the signal
237    * @src: the new RTPSource
238    *
239    * Notify of a new SSRC that entered @session.
240    */
241   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
242       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
243       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
244       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
245       RTP_TYPE_SOURCE);
246   /**
247    * RTPSession::on-ssrc-collision:
248    * @session: the object which received the signal
249    * @src: the #RTPSource that caused a collision
250    *
251    * Notify when we have an SSRC collision
252    */
253   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
254       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
255       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
256       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
257       RTP_TYPE_SOURCE);
258   /**
259    * RTPSession::on-ssrc-validated:
260    * @session: the object which received the signal
261    * @src: the new validated RTPSource
262    *
263    * Notify of a new SSRC that became validated.
264    */
265   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
266       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
267       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
268       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
269       RTP_TYPE_SOURCE);
270   /**
271    * RTPSession::on-ssrc-active:
272    * @session: the object which received the signal
273    * @src: the active RTPSource
274    *
275    * Notify of a SSRC that is active, i.e., sending RTCP.
276    */
277   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
278       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
279       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
280       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
281       RTP_TYPE_SOURCE);
282   /**
283    * RTPSession::on-ssrc-sdes:
284    * @session: the object which received the signal
285    * @src: the RTPSource
286    *
287    * Notify that a new SDES was received for SSRC.
288    */
289   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
290       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
291       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
292       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
293       RTP_TYPE_SOURCE);
294   /**
295    * RTPSession::on-bye-ssrc:
296    * @session: the object which received the signal
297    * @src: the RTPSource that went away
298    *
299    * Notify of an SSRC that became inactive because of a BYE packet.
300    */
301   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
302       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
303       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
304       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
305       RTP_TYPE_SOURCE);
306   /**
307    * RTPSession::on-bye-timeout:
308    * @session: the object which received the signal
309    * @src: the RTPSource that timed out
310    *
311    * Notify of an SSRC that has timed out because of BYE
312    */
313   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
314       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
315       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
316       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
317       RTP_TYPE_SOURCE);
318   /**
319    * RTPSession::on-timeout:
320    * @session: the object which received the signal
321    * @src: the RTPSource that timed out
322    *
323    * Notify of an SSRC that has timed out
324    */
325   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
326       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
327       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
328       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
329       RTP_TYPE_SOURCE);
330   /**
331    * RTPSession::on-sender-timeout:
332    * @session: the object which received the signal
333    * @src: the RTPSource that timed out
334    *
335    * Notify of an SSRC that was a sender but timed out and became a receiver.
336    */
337   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
338       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
339       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
340       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
341       RTP_TYPE_SOURCE);
342
343   /**
344    * RTPSession::on-sending-rtcp
345    * @session: the object which received the signal
346    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
347    * @early: %TRUE if the packet is early, %FALSE if it is regular
348    *
349    * This signal is emitted before sending an RTCP packet, it can be used
350    * to add extra RTCP Packets.
351    *
352    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
353    * if suppressing it is acceptable
354    */
355   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
356       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
357       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
358       accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN,
359       G_TYPE_BOOLEAN, 2, GST_TYPE_BUFFER, G_TYPE_BOOLEAN);
360
361   /**
362    * RTPSession::on-feedback-rtcp:
363    * @session: the object which received the signal
364    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
365    *  %GST_RTCP_TYPE_RTPFB
366    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
367    * @sender_ssrc: The SSRC of the sender
368    * @media_ssrc: The SSRC of the media this refers to
369    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
370    * there was no FCI
371    *
372    * Notify that a RTCP feedback packet has been received
373    */
374
375   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
376       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
377       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
378       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT,
379       G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
380       GST_TYPE_BUFFER);
381
382   /**
383    * RTPSession::send-rtcp:
384    * @session: the object which received the signal
385    * @max_delay: The maximum delay after which the feedback will not be useful
386    *  anymore
387    *
388    * Requests that the #RTPSession initiate a new RTCP packet as soon as
389    * possible within the requested delay.
390    */
391
392   rtp_session_signals[SIGNAL_SEND_RTCP] =
393       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
394       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
395       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
396       gst_rtp_bin_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64);
397
398   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
399       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
400           "The internal SSRC used for the session",
401           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
402
403   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
404       g_param_spec_object ("internal-source", "Internal Source",
405           "The internal source element of the session",
406           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
407
408   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
409       g_param_spec_double ("bandwidth", "Bandwidth",
410           "The bandwidth of the session (0 for auto-discover)",
411           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
412           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413
414   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
415       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
416           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
417           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
418           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
419
420   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
421       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
422           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
423           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
424           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
425
426   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
427       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
428           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
429           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
430           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431
432   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
433       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
434           "The maximum size of the RTCP packets",
435           16, G_MAXINT16, DEFAULT_RTCP_MTU,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437
438   g_object_class_install_property (gobject_class, PROP_SDES,
439       g_param_spec_boxed ("sdes", "SDES",
440           "The SDES items of this session",
441           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442
443   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
444       g_param_spec_uint ("num-sources", "Num Sources",
445           "The number of sources in the session", 0, G_MAXUINT,
446           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
447
448   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
449       g_param_spec_uint ("num-active-sources", "Num Active Sources",
450           "The number of active sources in the session", 0, G_MAXUINT,
451           DEFAULT_NUM_ACTIVE_SOURCES,
452           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
453   /**
454    * RTPSource::sources
455    *
456    * Get a GValue Array of all sources in the session.
457    *
458    * <example>
459    * <title>Getting the #RTPSources of a session
460    * <programlisting>
461    * {
462    *   GValueArray *arr;
463    *   GValue *val;
464    *   guint i;
465    *
466    *   g_object_get (sess, "sources", &arr, NULL);
467    *
468    *   for (i = 0; i < arr->n_values; i++) {
469    *     RTPSource *source;
470    *
471    *     val = g_value_array_get_nth (arr, i);
472    *     source = g_value_get_object (val);
473    *   }
474    *   g_value_array_free (arr);
475    * }
476    * </programlisting>
477    * </example>
478    */
479   g_object_class_install_property (gobject_class, PROP_SOURCES,
480       g_param_spec_boxed ("sources", "Sources",
481           "An array of all known sources in the session",
482           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
483
484   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
485       g_param_spec_boolean ("favor-new", "Favor new sources",
486           "Resolve SSRC conflict in favor of new sources", FALSE,
487           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
488
489   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
490       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
491           "Minimum interval between Regular RTCP packet (in ns)",
492           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
493           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
494
495   g_object_class_install_property (gobject_class,
496       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
497       g_param_spec_uint64 ("rtcp-feedback-retention-window",
498           "RTCP Feedback retention window",
499           "Duration during which RTCP Feedback packets are retained (in ns)",
500           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
501           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
502
503   g_object_class_install_property (gobject_class,
504       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
505       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
506           "RTCP Immediate Feedback threshold",
507           "The maximum number of members of a RTP session for which immediate"
508           " feedback is used",
509           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
510           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
511
512   klass->get_source_by_ssrc =
513       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
514   klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp);
515   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
516
517   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
518 }
519
520 static void
521 rtp_session_init (RTPSession * sess)
522 {
523   gint i;
524   gchar *str;
525
526   sess->lock = g_mutex_new ();
527   sess->key = g_random_int ();
528   sess->mask_idx = 0;
529   sess->mask = 0;
530
531   for (i = 0; i < 32; i++) {
532     sess->ssrcs[i] =
533         g_hash_table_new_full (NULL, NULL, NULL,
534         (GDestroyNotify) g_object_unref);
535   }
536   sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL);
537
538   rtp_stats_init_defaults (&sess->stats);
539
540   sess->recalc_bandwidth = TRUE;
541   sess->bandwidth = DEFAULT_BANDWIDTH;
542   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
543   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
544   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
545
546   /* create an active SSRC for this session manager */
547   sess->source = rtp_session_create_source (sess);
548   sess->source->validated = TRUE;
549   sess->source->internal = TRUE;
550   sess->stats.active_sources++;
551   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
552   sess->source->stats.prev_rtcptime = 0;
553   sess->source->stats.last_rtcptime = 1;
554
555   rtp_stats_set_min_interval (&sess->stats,
556       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
557
558   /* default UDP header length */
559   sess->header_len = 28;
560   sess->mtu = DEFAULT_RTCP_MTU;
561
562   /* some default SDES entries */
563
564   /* we do not want to leak details like the username or hostname here */
565   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
566   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_CNAME, str);
567   g_free (str);
568
569 #if 0
570   /* we do not want to leak the user's real name here */
571   str = g_strdup_printf ("Anon%u", g_random_int ());
572   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_NAME, str);
573   g_free (str);
574 #endif
575
576   rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
577
578   sess->first_rtcp = TRUE;
579   sess->allow_early = TRUE;
580   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
581   sess->rtcp_immediate_feedback_threshold =
582       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
583
584   sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
585
586   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
587 }
588
589 static void
590 rtp_session_finalize (GObject * object)
591 {
592   RTPSession *sess;
593   gint i;
594
595   sess = RTP_SESSION_CAST (object);
596
597   g_mutex_free (sess->lock);
598   for (i = 0; i < 32; i++)
599     g_hash_table_destroy (sess->ssrcs[i]);
600
601   g_free (sess->bye_reason);
602
603   g_hash_table_destroy (sess->cnames);
604   g_object_unref (sess->source);
605
606   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
607 }
608
609 static void
610 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
611 {
612   GValue value = { 0 };
613
614   g_value_init (&value, RTP_TYPE_SOURCE);
615   g_value_take_object (&value, source);
616   /* copies the value */
617   g_value_array_append (arr, &value);
618 }
619
620 static GValueArray *
621 rtp_session_create_sources (RTPSession * sess)
622 {
623   GValueArray *res;
624   guint size;
625
626   RTP_SESSION_LOCK (sess);
627   /* get number of elements in the table */
628   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
629   /* create the result value array */
630   res = g_value_array_new (size);
631
632   /* and copy all values into the array */
633   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
634   RTP_SESSION_UNLOCK (sess);
635
636   return res;
637 }
638
639 static void
640 rtp_session_set_property (GObject * object, guint prop_id,
641     const GValue * value, GParamSpec * pspec)
642 {
643   RTPSession *sess;
644
645   sess = RTP_SESSION (object);
646
647   switch (prop_id) {
648     case PROP_INTERNAL_SSRC:
649       rtp_session_set_internal_ssrc (sess, g_value_get_uint (value));
650       break;
651     case PROP_BANDWIDTH:
652       sess->bandwidth = g_value_get_double (value);
653       sess->recalc_bandwidth = TRUE;
654       break;
655     case PROP_RTCP_FRACTION:
656       sess->rtcp_bandwidth = g_value_get_double (value);
657       sess->recalc_bandwidth = TRUE;
658       break;
659     case PROP_RTCP_RR_BANDWIDTH:
660       sess->rtcp_rr_bandwidth = g_value_get_int (value);
661       sess->recalc_bandwidth = TRUE;
662       break;
663     case PROP_RTCP_RS_BANDWIDTH:
664       sess->rtcp_rs_bandwidth = g_value_get_int (value);
665       sess->recalc_bandwidth = TRUE;
666       break;
667     case PROP_RTCP_MTU:
668       sess->mtu = g_value_get_uint (value);
669       break;
670     case PROP_SDES:
671       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
672       break;
673     case PROP_FAVOR_NEW:
674       sess->favor_new = g_value_get_boolean (value);
675       break;
676     case PROP_RTCP_MIN_INTERVAL:
677       rtp_stats_set_min_interval (&sess->stats,
678           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
679       /* trigger reconsideration */
680       RTP_SESSION_LOCK (sess);
681       sess->next_rtcp_check_time = 0;
682       RTP_SESSION_UNLOCK (sess);
683       if (sess->callbacks.reconsider)
684         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
685       break;
686     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
687       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
688       break;
689     default:
690       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
691       break;
692   }
693 }
694
695 static void
696 rtp_session_get_property (GObject * object, guint prop_id,
697     GValue * value, GParamSpec * pspec)
698 {
699   RTPSession *sess;
700
701   sess = RTP_SESSION (object);
702
703   switch (prop_id) {
704     case PROP_INTERNAL_SSRC:
705       g_value_set_uint (value, rtp_session_get_internal_ssrc (sess));
706       break;
707     case PROP_INTERNAL_SOURCE:
708       g_value_take_object (value, rtp_session_get_internal_source (sess));
709       break;
710     case PROP_BANDWIDTH:
711       g_value_set_double (value, sess->bandwidth);
712       break;
713     case PROP_RTCP_FRACTION:
714       g_value_set_double (value, sess->rtcp_bandwidth);
715       break;
716     case PROP_RTCP_RR_BANDWIDTH:
717       g_value_set_int (value, sess->rtcp_rr_bandwidth);
718       break;
719     case PROP_RTCP_RS_BANDWIDTH:
720       g_value_set_int (value, sess->rtcp_rs_bandwidth);
721       break;
722     case PROP_RTCP_MTU:
723       g_value_set_uint (value, sess->mtu);
724       break;
725     case PROP_SDES:
726       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
727       break;
728     case PROP_NUM_SOURCES:
729       g_value_set_uint (value, rtp_session_get_num_sources (sess));
730       break;
731     case PROP_NUM_ACTIVE_SOURCES:
732       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
733       break;
734     case PROP_SOURCES:
735       g_value_take_boxed (value, rtp_session_create_sources (sess));
736       break;
737     case PROP_FAVOR_NEW:
738       g_value_set_boolean (value, sess->favor_new);
739       break;
740     case PROP_RTCP_MIN_INTERVAL:
741       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
742       break;
743     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
744       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
745       break;
746     default:
747       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
748       break;
749   }
750 }
751
752 static void
753 on_new_ssrc (RTPSession * sess, RTPSource * source)
754 {
755   g_object_ref (source);
756   RTP_SESSION_UNLOCK (sess);
757   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
758   RTP_SESSION_LOCK (sess);
759   g_object_unref (source);
760 }
761
762 static void
763 on_ssrc_collision (RTPSession * sess, RTPSource * source)
764 {
765   g_object_ref (source);
766   RTP_SESSION_UNLOCK (sess);
767   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
768       source);
769   RTP_SESSION_LOCK (sess);
770   g_object_unref (source);
771 }
772
773 static void
774 on_ssrc_validated (RTPSession * sess, RTPSource * source)
775 {
776   g_object_ref (source);
777   RTP_SESSION_UNLOCK (sess);
778   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
779       source);
780   RTP_SESSION_LOCK (sess);
781   g_object_unref (source);
782 }
783
784 static void
785 on_ssrc_active (RTPSession * sess, RTPSource * source)
786 {
787   g_object_ref (source);
788   RTP_SESSION_UNLOCK (sess);
789   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
790   RTP_SESSION_LOCK (sess);
791   g_object_unref (source);
792 }
793
794 static void
795 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
796 {
797   g_object_ref (source);
798   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
799   RTP_SESSION_UNLOCK (sess);
800   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
801   RTP_SESSION_LOCK (sess);
802   g_object_unref (source);
803 }
804
805 static void
806 on_bye_ssrc (RTPSession * sess, RTPSource * source)
807 {
808   g_object_ref (source);
809   RTP_SESSION_UNLOCK (sess);
810   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
811   RTP_SESSION_LOCK (sess);
812   g_object_unref (source);
813 }
814
815 static void
816 on_bye_timeout (RTPSession * sess, RTPSource * source)
817 {
818   g_object_ref (source);
819   RTP_SESSION_UNLOCK (sess);
820   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
821   RTP_SESSION_LOCK (sess);
822   g_object_unref (source);
823 }
824
825 static void
826 on_timeout (RTPSession * sess, RTPSource * source)
827 {
828   g_object_ref (source);
829   RTP_SESSION_UNLOCK (sess);
830   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
831   RTP_SESSION_LOCK (sess);
832   g_object_unref (source);
833 }
834
835 static void
836 on_sender_timeout (RTPSession * sess, RTPSource * source)
837 {
838   g_object_ref (source);
839   RTP_SESSION_UNLOCK (sess);
840   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
841       source);
842   RTP_SESSION_LOCK (sess);
843   g_object_unref (source);
844 }
845
846 /**
847  * rtp_session_new:
848  *
849  * Create a new session object.
850  *
851  * Returns: a new #RTPSession. g_object_unref() after usage.
852  */
853 RTPSession *
854 rtp_session_new (void)
855 {
856   RTPSession *sess;
857
858   sess = g_object_new (RTP_TYPE_SESSION, NULL);
859
860   return sess;
861 }
862
863 /**
864  * rtp_session_set_callbacks:
865  * @sess: an #RTPSession
866  * @callbacks: callbacks to configure
867  * @user_data: user data passed in the callbacks
868  *
869  * Configure a set of callbacks to be notified of actions.
870  */
871 void
872 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
873     gpointer user_data)
874 {
875   g_return_if_fail (RTP_IS_SESSION (sess));
876
877   if (callbacks->process_rtp) {
878     sess->callbacks.process_rtp = callbacks->process_rtp;
879     sess->process_rtp_user_data = user_data;
880   }
881   if (callbacks->send_rtp) {
882     sess->callbacks.send_rtp = callbacks->send_rtp;
883     sess->send_rtp_user_data = user_data;
884   }
885   if (callbacks->send_rtcp) {
886     sess->callbacks.send_rtcp = callbacks->send_rtcp;
887     sess->send_rtcp_user_data = user_data;
888   }
889   if (callbacks->sync_rtcp) {
890     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
891     sess->sync_rtcp_user_data = user_data;
892   }
893   if (callbacks->clock_rate) {
894     sess->callbacks.clock_rate = callbacks->clock_rate;
895     sess->clock_rate_user_data = user_data;
896   }
897   if (callbacks->reconsider) {
898     sess->callbacks.reconsider = callbacks->reconsider;
899     sess->reconsider_user_data = user_data;
900   }
901   if (callbacks->request_key_unit) {
902     sess->callbacks.request_key_unit = callbacks->request_key_unit;
903     sess->request_key_unit_user_data = user_data;
904   }
905   if (callbacks->request_time) {
906     sess->callbacks.request_time = callbacks->request_time;
907     sess->request_time_user_data = user_data;
908   }
909 }
910
911 /**
912  * rtp_session_set_process_rtp_callback:
913  * @sess: an #RTPSession
914  * @callback: callback to set
915  * @user_data: user data passed in the callback
916  *
917  * Configure only the process_rtp callback to be notified of the process_rtp action.
918  */
919 void
920 rtp_session_set_process_rtp_callback (RTPSession * sess,
921     RTPSessionProcessRTP callback, gpointer user_data)
922 {
923   g_return_if_fail (RTP_IS_SESSION (sess));
924
925   sess->callbacks.process_rtp = callback;
926   sess->process_rtp_user_data = user_data;
927 }
928
929 /**
930  * rtp_session_set_send_rtp_callback:
931  * @sess: an #RTPSession
932  * @callback: callback to set
933  * @user_data: user data passed in the callback
934  *
935  * Configure only the send_rtp callback to be notified of the send_rtp action.
936  */
937 void
938 rtp_session_set_send_rtp_callback (RTPSession * sess,
939     RTPSessionSendRTP callback, gpointer user_data)
940 {
941   g_return_if_fail (RTP_IS_SESSION (sess));
942
943   sess->callbacks.send_rtp = callback;
944   sess->send_rtp_user_data = user_data;
945 }
946
947 /**
948  * rtp_session_set_send_rtcp_callback:
949  * @sess: an #RTPSession
950  * @callback: callback to set
951  * @user_data: user data passed in the callback
952  *
953  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
954  */
955 void
956 rtp_session_set_send_rtcp_callback (RTPSession * sess,
957     RTPSessionSendRTCP callback, gpointer user_data)
958 {
959   g_return_if_fail (RTP_IS_SESSION (sess));
960
961   sess->callbacks.send_rtcp = callback;
962   sess->send_rtcp_user_data = user_data;
963 }
964
965 /**
966  * rtp_session_set_sync_rtcp_callback:
967  * @sess: an #RTPSession
968  * @callback: callback to set
969  * @user_data: user data passed in the callback
970  *
971  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
972  */
973 void
974 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
975     RTPSessionSyncRTCP callback, gpointer user_data)
976 {
977   g_return_if_fail (RTP_IS_SESSION (sess));
978
979   sess->callbacks.sync_rtcp = callback;
980   sess->sync_rtcp_user_data = user_data;
981 }
982
983 /**
984  * rtp_session_set_clock_rate_callback:
985  * @sess: an #RTPSession
986  * @callback: callback to set
987  * @user_data: user data passed in the callback
988  *
989  * Configure only the clock_rate callback to be notified of the clock_rate action.
990  */
991 void
992 rtp_session_set_clock_rate_callback (RTPSession * sess,
993     RTPSessionClockRate callback, gpointer user_data)
994 {
995   g_return_if_fail (RTP_IS_SESSION (sess));
996
997   sess->callbacks.clock_rate = callback;
998   sess->clock_rate_user_data = user_data;
999 }
1000
1001 /**
1002  * rtp_session_set_reconsider_callback:
1003  * @sess: an #RTPSession
1004  * @callback: callback to set
1005  * @user_data: user data passed in the callback
1006  *
1007  * Configure only the reconsider callback to be notified of the reconsider action.
1008  */
1009 void
1010 rtp_session_set_reconsider_callback (RTPSession * sess,
1011     RTPSessionReconsider callback, gpointer user_data)
1012 {
1013   g_return_if_fail (RTP_IS_SESSION (sess));
1014
1015   sess->callbacks.reconsider = callback;
1016   sess->reconsider_user_data = user_data;
1017 }
1018
1019 /**
1020  * rtp_session_set_request_time_callback:
1021  * @sess: an #RTPSession
1022  * @callback: callback to set
1023  * @user_data: user data passed in the callback
1024  *
1025  * Configure only the request_time callback
1026  */
1027 void
1028 rtp_session_set_request_time_callback (RTPSession * sess,
1029     RTPSessionRequestTime callback, gpointer user_data)
1030 {
1031   g_return_if_fail (RTP_IS_SESSION (sess));
1032
1033   sess->callbacks.request_time = callback;
1034   sess->request_time_user_data = user_data;
1035 }
1036
1037 /**
1038  * rtp_session_set_bandwidth:
1039  * @sess: an #RTPSession
1040  * @bandwidth: the bandwidth allocated
1041  *
1042  * Set the session bandwidth in bytes per second.
1043  */
1044 void
1045 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1046 {
1047   g_return_if_fail (RTP_IS_SESSION (sess));
1048
1049   RTP_SESSION_LOCK (sess);
1050   sess->stats.bandwidth = bandwidth;
1051   RTP_SESSION_UNLOCK (sess);
1052 }
1053
1054 /**
1055  * rtp_session_get_bandwidth:
1056  * @sess: an #RTPSession
1057  *
1058  * Get the session bandwidth.
1059  *
1060  * Returns: the session bandwidth.
1061  */
1062 gdouble
1063 rtp_session_get_bandwidth (RTPSession * sess)
1064 {
1065   gdouble result;
1066
1067   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1068
1069   RTP_SESSION_LOCK (sess);
1070   result = sess->stats.bandwidth;
1071   RTP_SESSION_UNLOCK (sess);
1072
1073   return result;
1074 }
1075
1076 /**
1077  * rtp_session_set_rtcp_fraction:
1078  * @sess: an #RTPSession
1079  * @bandwidth: the RTCP bandwidth
1080  *
1081  * Set the bandwidth in bytes per second that should be used for RTCP
1082  * messages.
1083  */
1084 void
1085 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1086 {
1087   g_return_if_fail (RTP_IS_SESSION (sess));
1088
1089   RTP_SESSION_LOCK (sess);
1090   sess->stats.rtcp_bandwidth = bandwidth;
1091   RTP_SESSION_UNLOCK (sess);
1092 }
1093
1094 /**
1095  * rtp_session_get_rtcp_fraction:
1096  * @sess: an #RTPSession
1097  *
1098  * Get the session bandwidth used for RTCP.
1099  *
1100  * Returns: The bandwidth used for RTCP messages.
1101  */
1102 gdouble
1103 rtp_session_get_rtcp_fraction (RTPSession * sess)
1104 {
1105   gdouble result;
1106
1107   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1108
1109   RTP_SESSION_LOCK (sess);
1110   result = sess->stats.rtcp_bandwidth;
1111   RTP_SESSION_UNLOCK (sess);
1112
1113   return result;
1114 }
1115
1116 /**
1117  * rtp_session_set_sdes_string:
1118  * @sess: an #RTPSession
1119  * @type: the type of the SDES item
1120  * @item: a null-terminated string to set.
1121  *
1122  * Store an SDES item of @type in @sess.
1123  *
1124  * Returns: %FALSE if the data was unchanged @type is invalid.
1125  */
1126 gboolean
1127 rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type,
1128     const gchar * item)
1129 {
1130   gboolean result;
1131
1132   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1133
1134   RTP_SESSION_LOCK (sess);
1135   result = rtp_source_set_sdes_string (sess->source, type, item);
1136   RTP_SESSION_UNLOCK (sess);
1137
1138   return result;
1139 }
1140
1141 /**
1142  * rtp_session_get_sdes_string:
1143  * @sess: an #RTPSession
1144  * @type: the type of the SDES item
1145  *
1146  * Get the SDES item of @type from @sess.
1147  *
1148  * Returns: a null-terminated copy of the SDES item or NULL when @type was not
1149  * valid. g_free() after usage.
1150  */
1151 gchar *
1152 rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type)
1153 {
1154   gchar *result;
1155
1156   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1157
1158   RTP_SESSION_LOCK (sess);
1159   result = rtp_source_get_sdes_string (sess->source, type);
1160   RTP_SESSION_UNLOCK (sess);
1161
1162   return result;
1163 }
1164
1165 /**
1166  * rtp_session_get_sdes_struct:
1167  * @sess: an #RTSPSession
1168  *
1169  * Get the SDES data as a #GstStructure
1170  *
1171  * Returns: a GstStructure with SDES items for @sess. This function returns a
1172  * copy of the SDES structure, use gst_structure_free() after usage.
1173  */
1174 GstStructure *
1175 rtp_session_get_sdes_struct (RTPSession * sess)
1176 {
1177   const GstStructure *sdes;
1178   GstStructure *result = NULL;
1179
1180   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1181
1182   RTP_SESSION_LOCK (sess);
1183   sdes = rtp_source_get_sdes_struct (sess->source);
1184   if (sdes)
1185     result = gst_structure_copy (sdes);
1186   RTP_SESSION_UNLOCK (sess);
1187
1188   return result;
1189 }
1190
1191 /**
1192  * rtp_session_set_sdes_struct:
1193  * @sess: an #RTSPSession
1194  * @sdes: a #GstStructure
1195  *
1196  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1197  */
1198 void
1199 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1200 {
1201   g_return_if_fail (sdes);
1202   g_return_if_fail (RTP_IS_SESSION (sess));
1203
1204   RTP_SESSION_LOCK (sess);
1205   rtp_source_set_sdes_struct (sess->source, gst_structure_copy (sdes));
1206   RTP_SESSION_UNLOCK (sess);
1207 }
1208
1209 static GstFlowReturn
1210 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1211 {
1212   GstFlowReturn result = GST_FLOW_OK;
1213
1214   if (source == session->source) {
1215     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1216
1217     RTP_SESSION_UNLOCK (session);
1218
1219     if (session->callbacks.send_rtp)
1220       result =
1221           session->callbacks.send_rtp (session, source, data,
1222           session->send_rtp_user_data);
1223     else {
1224       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1225     }
1226   } else {
1227     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1228     RTP_SESSION_UNLOCK (session);
1229
1230     if (session->callbacks.process_rtp)
1231       result =
1232           session->callbacks.process_rtp (session, source,
1233           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1234     else
1235       gst_buffer_unref (GST_BUFFER_CAST (data));
1236   }
1237   RTP_SESSION_LOCK (session);
1238
1239   return result;
1240 }
1241
1242 static gint
1243 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1244 {
1245   gint result;
1246
1247   RTP_SESSION_UNLOCK (session);
1248
1249   if (session->callbacks.clock_rate)
1250     result =
1251         session->callbacks.clock_rate (session, pt,
1252         session->clock_rate_user_data);
1253   else
1254     result = -1;
1255
1256   RTP_SESSION_LOCK (session);
1257
1258   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1259
1260   return result;
1261 }
1262
1263 static RTPSourceCallbacks callbacks = {
1264   (RTPSourcePushRTP) source_push_rtp,
1265   (RTPSourceClockRate) source_clock_rate,
1266 };
1267
1268 static gboolean
1269 check_collision (RTPSession * sess, RTPSource * source,
1270     RTPArrivalStats * arrival, gboolean rtp)
1271 {
1272   /* If we have no arrival address, we can't do collision checking */
1273   if (!arrival->have_address)
1274     return FALSE;
1275
1276   if (sess->source != source) {
1277     GstNetAddress *from;
1278     gboolean have_from;
1279
1280     /* This is not our local source, but lets check if two remote
1281      * source collide
1282      */
1283
1284     if (rtp) {
1285       from = &source->rtp_from;
1286       have_from = source->have_rtp_from;
1287     } else {
1288       from = &source->rtcp_from;
1289       have_from = source->have_rtcp_from;
1290     }
1291
1292     if (have_from) {
1293       if (gst_netaddress_equal (from, &arrival->address)) {
1294         /* Address is the same */
1295         return FALSE;
1296       } else {
1297         GST_LOG ("we have a third-party collision or loop ssrc:%x",
1298             rtp_source_get_ssrc (source));
1299         if (sess->favor_new) {
1300           if (rtp_source_find_conflicting_address (source,
1301                   &arrival->address, arrival->current_time)) {
1302             gchar buf1[40];
1303             gst_netaddress_to_string (&arrival->address, buf1, 40);
1304             GST_LOG ("Known conflict on %x for %s, dropping packet",
1305                 rtp_source_get_ssrc (source), buf1);
1306             return TRUE;
1307           } else {
1308             gchar buf1[40], buf2[40];
1309
1310             /* Current address is not a known conflict, lets assume this is
1311              * a new source. Save old address in possible conflict list
1312              */
1313             rtp_source_add_conflicting_address (source, from,
1314                 arrival->current_time);
1315
1316             gst_netaddress_to_string (from, buf1, 40);
1317             gst_netaddress_to_string (&arrival->address, buf2, 40);
1318             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1319                 " saving old as known conflict",
1320                 rtp_source_get_ssrc (source), buf1, buf2);
1321
1322             if (rtp)
1323               rtp_source_set_rtp_from (source, &arrival->address);
1324             else
1325               rtp_source_set_rtcp_from (source, &arrival->address);
1326             return FALSE;
1327           }
1328         } else {
1329           /* Don't need to save old addresses, we ignore new sources */
1330           return TRUE;
1331         }
1332       }
1333     } else {
1334       /* We don't already have a from address for RTP, just set it */
1335       if (rtp)
1336         rtp_source_set_rtp_from (source, &arrival->address);
1337       else
1338         rtp_source_set_rtcp_from (source, &arrival->address);
1339       return FALSE;
1340     }
1341
1342     /* FIXME: Log 3rd party collision somehow
1343      * Maybe should be done in upper layer, only the SDES can tell us
1344      * if its a collision or a loop
1345      */
1346
1347     /* If the source has been inactive for some time, we assume that it has
1348      * simply changed its transport source address. Hence, there is no true
1349      * third-party collision - only a simulated one. */
1350     if (arrival->current_time > source->last_activity) {
1351       GstClockTime inactivity_period =
1352           arrival->current_time - source->last_activity;
1353       if (inactivity_period > 1 * GST_SECOND) {
1354         /* Use new network address */
1355         if (rtp) {
1356           g_assert (source->have_rtp_from);
1357           rtp_source_set_rtp_from (source, &arrival->address);
1358         } else {
1359           g_assert (source->have_rtcp_from);
1360           rtp_source_set_rtcp_from (source, &arrival->address);
1361         }
1362         return FALSE;
1363       }
1364     }
1365   } else {
1366     /* This is sending with our ssrc, is it an address we already know */
1367
1368     if (rtp_source_find_conflicting_address (source, &arrival->address,
1369             arrival->current_time)) {
1370       /* Its a known conflict, its probably a loop, not a collision
1371        * lets just drop the incoming packet
1372        */
1373       GST_DEBUG ("Our packets are being looped back to us, dropping");
1374     } else {
1375       /* Its a new collision, lets change our SSRC */
1376
1377       rtp_source_add_conflicting_address (source, &arrival->address,
1378           arrival->current_time);
1379
1380       GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
1381       on_ssrc_collision (sess, source);
1382
1383       rtp_session_schedule_bye_locked (sess, "SSRC Collision",
1384           arrival->current_time);
1385
1386       sess->change_ssrc = TRUE;
1387     }
1388   }
1389
1390   return TRUE;
1391 }
1392
1393
1394 /* must be called with the session lock, the returned source needs to be
1395  * unreffed after usage. */
1396 static RTPSource *
1397 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1398     RTPArrivalStats * arrival, gboolean rtp)
1399 {
1400   RTPSource *source;
1401
1402   source =
1403       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1404   if (source == NULL) {
1405     /* make new Source in probation and insert */
1406     source = rtp_source_new (ssrc);
1407
1408     /* for RTP packets we need to set the source in probation. Receiving RTCP
1409      * packets of an SSRC, on the other hand, is a strong indication that we
1410      * are dealing with a valid source. */
1411     if (rtp)
1412       source->probation = RTP_DEFAULT_PROBATION;
1413     else
1414       source->probation = 0;
1415
1416     /* store from address, if any */
1417     if (arrival->have_address) {
1418       if (rtp)
1419         rtp_source_set_rtp_from (source, &arrival->address);
1420       else
1421         rtp_source_set_rtcp_from (source, &arrival->address);
1422     }
1423
1424     /* configure a callback on the source */
1425     rtp_source_set_callbacks (source, &callbacks, sess);
1426
1427     g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1428         source);
1429
1430     /* we have one more source now */
1431     sess->total_sources++;
1432     *created = TRUE;
1433   } else {
1434     *created = FALSE;
1435     /* check for collision, this updates the address when not previously set */
1436     if (check_collision (sess, source, arrival, rtp)) {
1437       return NULL;
1438     }
1439   }
1440   /* update last activity */
1441   source->last_activity = arrival->current_time;
1442   if (rtp)
1443     source->last_rtp_activity = arrival->current_time;
1444   g_object_ref (source);
1445
1446   return source;
1447 }
1448
1449 /**
1450  * rtp_session_get_internal_source:
1451  * @sess: a #RTPSession
1452  *
1453  * Get the internal #RTPSource of @sess.
1454  *
1455  * Returns: The internal #RTPSource. g_object_unref() after usage.
1456  */
1457 RTPSource *
1458 rtp_session_get_internal_source (RTPSession * sess)
1459 {
1460   RTPSource *result;
1461
1462   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1463
1464   result = g_object_ref (sess->source);
1465
1466   return result;
1467 }
1468
1469 /**
1470  * rtp_session_set_internal_ssrc:
1471  * @sess: a #RTPSession
1472  * @ssrc: an SSRC
1473  *
1474  * Set the SSRC of @sess to @ssrc.
1475  */
1476 void
1477 rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
1478 {
1479   RTP_SESSION_LOCK (sess);
1480   if (ssrc != sess->source->ssrc) {
1481     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
1482         GINT_TO_POINTER (sess->source->ssrc));
1483
1484     GST_DEBUG ("setting internal SSRC to %08x", ssrc);
1485     /* After this call, any receiver of the old SSRC either in RTP or RTCP
1486      * packets will timeout on the old SSRC, we could potentially schedule a
1487      * BYE RTCP for the old SSRC... */
1488     sess->source->ssrc = ssrc;
1489     rtp_source_reset (sess->source);
1490
1491     /* rehash with the new SSRC */
1492     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1493         GINT_TO_POINTER (sess->source->ssrc), sess->source);
1494   }
1495   RTP_SESSION_UNLOCK (sess);
1496
1497   g_object_notify (G_OBJECT (sess), "internal-ssrc");
1498 }
1499
1500 /**
1501  * rtp_session_get_internal_ssrc:
1502  * @sess: a #RTPSession
1503  *
1504  * Get the internal SSRC of @sess.
1505  *
1506  * Returns: The SSRC of the session.
1507  */
1508 guint32
1509 rtp_session_get_internal_ssrc (RTPSession * sess)
1510 {
1511   guint32 ssrc;
1512
1513   RTP_SESSION_LOCK (sess);
1514   ssrc = sess->source->ssrc;
1515   RTP_SESSION_UNLOCK (sess);
1516
1517   return ssrc;
1518 }
1519
1520 /**
1521  * rtp_session_add_source:
1522  * @sess: a #RTPSession
1523  * @src: #RTPSource to add
1524  *
1525  * Add @src to @session.
1526  *
1527  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1528  * existed in the session.
1529  */
1530 gboolean
1531 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1532 {
1533   gboolean result = FALSE;
1534   RTPSource *find;
1535
1536   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1537   g_return_val_if_fail (src != NULL, FALSE);
1538
1539   RTP_SESSION_LOCK (sess);
1540   find =
1541       g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1542       GINT_TO_POINTER (src->ssrc));
1543   if (find == NULL) {
1544     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1545         GINT_TO_POINTER (src->ssrc), src);
1546     /* we have one more source now */
1547     sess->total_sources++;
1548     result = TRUE;
1549   }
1550   RTP_SESSION_UNLOCK (sess);
1551
1552   return result;
1553 }
1554
1555 /**
1556  * rtp_session_get_num_sources:
1557  * @sess: an #RTPSession
1558  *
1559  * Get the number of sources in @sess.
1560  *
1561  * Returns: The number of sources in @sess.
1562  */
1563 guint
1564 rtp_session_get_num_sources (RTPSession * sess)
1565 {
1566   guint result;
1567
1568   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1569
1570   RTP_SESSION_LOCK (sess);
1571   result = sess->total_sources;
1572   RTP_SESSION_UNLOCK (sess);
1573
1574   return result;
1575 }
1576
1577 /**
1578  * rtp_session_get_num_active_sources:
1579  * @sess: an #RTPSession
1580  *
1581  * Get the number of active sources in @sess. A source is considered active when
1582  * it has been validated and has not yet received a BYE RTCP message.
1583  *
1584  * Returns: The number of active sources in @sess.
1585  */
1586 guint
1587 rtp_session_get_num_active_sources (RTPSession * sess)
1588 {
1589   guint result;
1590
1591   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1592
1593   RTP_SESSION_LOCK (sess);
1594   result = sess->stats.active_sources;
1595   RTP_SESSION_UNLOCK (sess);
1596
1597   return result;
1598 }
1599
1600 /**
1601  * rtp_session_get_source_by_ssrc:
1602  * @sess: an #RTPSession
1603  * @ssrc: an SSRC
1604  *
1605  * Find the source with @ssrc in @sess.
1606  *
1607  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1608  * g_object_unref() after usage.
1609  */
1610 RTPSource *
1611 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1612 {
1613   RTPSource *result;
1614
1615   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1616
1617   RTP_SESSION_LOCK (sess);
1618   result =
1619       g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc));
1620   if (result)
1621     g_object_ref (result);
1622   RTP_SESSION_UNLOCK (sess);
1623
1624   return result;
1625 }
1626
1627 /**
1628  * rtp_session_get_source_by_cname:
1629  * @sess: a #RTPSession
1630  * @cname: an CNAME
1631  *
1632  * Find the source with @cname in @sess.
1633  *
1634  * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found.
1635  * g_object_unref() after usage.
1636  */
1637 RTPSource *
1638 rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname)
1639 {
1640   RTPSource *result;
1641
1642   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1643   g_return_val_if_fail (cname != NULL, NULL);
1644
1645   RTP_SESSION_LOCK (sess);
1646   result = g_hash_table_lookup (sess->cnames, cname);
1647   if (result)
1648     g_object_ref (result);
1649   RTP_SESSION_UNLOCK (sess);
1650
1651   return result;
1652 }
1653
1654 /* should be called with the SESSION lock */
1655 static guint32
1656 rtp_session_create_new_ssrc (RTPSession * sess)
1657 {
1658   guint32 ssrc;
1659
1660   while (TRUE) {
1661     ssrc = g_random_int ();
1662
1663     /* see if it exists in the session, we're done if it doesn't */
1664     if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1665             GINT_TO_POINTER (ssrc)) == NULL)
1666       break;
1667   }
1668   return ssrc;
1669 }
1670
1671
1672 /**
1673  * rtp_session_create_source:
1674  * @sess: an #RTPSession
1675  *
1676  * Create an #RTPSource for use in @sess. This function will create a source
1677  * with an ssrc that is currently not used by any participants in the session.
1678  *
1679  * Returns: an #RTPSource.
1680  */
1681 RTPSource *
1682 rtp_session_create_source (RTPSession * sess)
1683 {
1684   guint32 ssrc;
1685   RTPSource *source;
1686
1687   RTP_SESSION_LOCK (sess);
1688   ssrc = rtp_session_create_new_ssrc (sess);
1689   source = rtp_source_new (ssrc);
1690   rtp_source_set_callbacks (source, &callbacks, sess);
1691   /* we need an additional ref for the source in the hashtable */
1692   g_object_ref (source);
1693   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
1694       source);
1695   /* we have one more source now */
1696   sess->total_sources++;
1697   RTP_SESSION_UNLOCK (sess);
1698
1699   return source;
1700 }
1701
1702 /* update the RTPArrivalStats structure with the current time and other bits
1703  * about the current buffer we are handling.
1704  * This function is typically called when a validated packet is received.
1705  * This function should be called with the SESSION_LOCK
1706  */
1707 static void
1708 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1709     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1710     GstClockTime running_time, guint64 ntpnstime)
1711 {
1712   /* get time of arrival */
1713   arrival->current_time = current_time;
1714   arrival->running_time = running_time;
1715   arrival->ntpnstime = ntpnstime;
1716
1717   /* get packet size including header overhead */
1718   arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
1719
1720   if (rtp) {
1721     arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
1722   } else {
1723     arrival->payload_len = 0;
1724   }
1725
1726   /* for netbuffer we can store the IP address to check for collisions */
1727   arrival->have_address = GST_IS_NETBUFFER (buffer);
1728   if (arrival->have_address) {
1729     GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
1730
1731     memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
1732   }
1733 }
1734
1735 /**
1736  * rtp_session_process_rtp:
1737  * @sess: and #RTPSession
1738  * @buffer: an RTP buffer
1739  * @current_time: the current system time
1740  * @running_time: the running_time of @buffer
1741  *
1742  * Process an RTP buffer in the session manager. This function takes ownership
1743  * of @buffer.
1744  *
1745  * Returns: a #GstFlowReturn.
1746  */
1747 GstFlowReturn
1748 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1749     GstClockTime current_time, GstClockTime running_time)
1750 {
1751   GstFlowReturn result;
1752   guint32 ssrc;
1753   RTPSource *source;
1754   gboolean created;
1755   gboolean prevsender, prevactive;
1756   RTPArrivalStats arrival;
1757   guint32 csrcs[16];
1758   guint8 i, count;
1759   guint64 oldrate;
1760
1761   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1762   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1763
1764   if (!gst_rtp_buffer_validate (buffer))
1765     goto invalid_packet;
1766
1767   RTP_SESSION_LOCK (sess);
1768   /* update arrival stats */
1769   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1770       running_time, -1);
1771
1772   /* ignore more RTP packets when we left the session */
1773   if (sess->source->received_bye)
1774     goto ignore;
1775
1776   /* get SSRC and look up in session database */
1777   ssrc = gst_rtp_buffer_get_ssrc (buffer);
1778   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1779   if (!source)
1780     goto collision;
1781
1782   prevsender = RTP_SOURCE_IS_SENDER (source);
1783   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1784   oldrate = source->bitrate;
1785
1786   /* copy available csrc for later */
1787   count = gst_rtp_buffer_get_csrc_count (buffer);
1788   /* make sure to not overflow our array. An RTP buffer can maximally contain
1789    * 16 CSRCs */
1790   count = MIN (count, 16);
1791
1792   for (i = 0; i < count; i++)
1793     csrcs[i] = gst_rtp_buffer_get_csrc (buffer, i);
1794
1795   /* let source process the packet */
1796   result = rtp_source_process_rtp (source, buffer, &arrival);
1797
1798   /* source became active */
1799   if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
1800     sess->stats.active_sources++;
1801     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1802         sess->stats.active_sources);
1803     on_ssrc_validated (sess, source);
1804   }
1805   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1806     sess->stats.sender_sources++;
1807     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1808         sess->stats.sender_sources);
1809   }
1810   if (oldrate != source->bitrate)
1811     sess->recalc_bandwidth = TRUE;
1812
1813   if (created)
1814     on_new_ssrc (sess, source);
1815
1816   if (source->validated) {
1817     gboolean created;
1818
1819     /* for validated sources, we add the CSRCs as well */
1820     for (i = 0; i < count; i++) {
1821       guint32 csrc;
1822       RTPSource *csrc_src;
1823
1824       csrc = csrcs[i];
1825
1826       /* get source */
1827       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1828       if (!csrc_src)
1829         continue;
1830
1831       if (created) {
1832         GST_DEBUG ("created new CSRC: %08x", csrc);
1833         rtp_source_set_as_csrc (csrc_src);
1834         if (RTP_SOURCE_IS_ACTIVE (csrc_src))
1835           sess->stats.active_sources++;
1836         on_new_ssrc (sess, csrc_src);
1837       }
1838       g_object_unref (csrc_src);
1839     }
1840   }
1841   g_object_unref (source);
1842
1843   RTP_SESSION_UNLOCK (sess);
1844
1845   return result;
1846
1847   /* ERRORS */
1848 invalid_packet:
1849   {
1850     gst_buffer_unref (buffer);
1851     GST_DEBUG ("invalid RTP packet received");
1852     return GST_FLOW_OK;
1853   }
1854 ignore:
1855   {
1856     gst_buffer_unref (buffer);
1857     RTP_SESSION_UNLOCK (sess);
1858     GST_DEBUG ("ignoring RTP packet because we are leaving");
1859     return GST_FLOW_OK;
1860   }
1861 collision:
1862   {
1863     gst_buffer_unref (buffer);
1864     RTP_SESSION_UNLOCK (sess);
1865     GST_DEBUG ("ignoring packet because its collisioning");
1866     return GST_FLOW_OK;
1867   }
1868 }
1869
1870 static void
1871 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1872     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1873 {
1874   guint count, i;
1875
1876   count = gst_rtcp_packet_get_rb_count (packet);
1877   for (i = 0; i < count; i++) {
1878     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1879     guint8 fractionlost;
1880     gint32 packetslost;
1881
1882     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1883         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1884
1885     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1886
1887     if (ssrc == sess->source->ssrc) {
1888       /* only deal with report blocks for our session, we update the stats of
1889        * the sender of the RTCP message. We could also compare our stats against
1890        * the other sender to see if we are better or worse. */
1891       rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
1892           packetslost, exthighestseq, jitter, lsr, dlsr);
1893     }
1894   }
1895   on_ssrc_active (sess, source);
1896 }
1897
1898 /* A Sender report contains statistics about how the sender is doing. This
1899  * includes timing informataion such as the relation between RTP and NTP
1900  * timestamps and the number of packets/bytes it sent to us.
1901  *
1902  * In this report is also included a set of report blocks related to how this
1903  * sender is receiving data (in case we (or somebody else) is also sending stuff
1904  * to it). This info includes the packet loss, jitter and seqnum. It also
1905  * contains information to calculate the round trip time (LSR/DLSR).
1906  */
1907 static void
1908 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1909     RTPArrivalStats * arrival, gboolean * do_sync)
1910 {
1911   guint32 senderssrc, rtptime, packet_count, octet_count;
1912   guint64 ntptime;
1913   RTPSource *source;
1914   gboolean created, prevsender;
1915
1916   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1917       &packet_count, &octet_count);
1918
1919   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1920       senderssrc, GST_TIME_ARGS (arrival->current_time));
1921
1922   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1923   if (!source)
1924     return;
1925
1926   /* don't try to do lip-sync for sources that sent a BYE */
1927   if (rtp_source_received_bye (source))
1928     *do_sync = FALSE;
1929   else
1930     *do_sync = TRUE;
1931
1932   prevsender = RTP_SOURCE_IS_SENDER (source);
1933
1934   /* first update the source */
1935   rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
1936       packet_count, octet_count);
1937
1938   if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
1939     sess->stats.sender_sources++;
1940     GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
1941         sess->stats.sender_sources);
1942   }
1943
1944   if (created)
1945     on_new_ssrc (sess, source);
1946
1947   rtp_session_process_rb (sess, source, packet, arrival);
1948   g_object_unref (source);
1949 }
1950
1951 /* A receiver report contains statistics about how a receiver is doing. It
1952  * includes stuff like packet loss, jitter and the seqnum it received last. It
1953  * also contains info to calculate the round trip time.
1954  *
1955  * We are only interested in how the sender of this report is doing wrt to us.
1956  */
1957 static void
1958 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1959     RTPArrivalStats * arrival)
1960 {
1961   guint32 senderssrc;
1962   RTPSource *source;
1963   gboolean created;
1964
1965   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1966
1967   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1968
1969   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1970   if (!source)
1971     return;
1972
1973   if (created)
1974     on_new_ssrc (sess, source);
1975
1976   rtp_session_process_rb (sess, source, packet, arrival);
1977   g_object_unref (source);
1978 }
1979
1980 /* Get SDES items and store them in the SSRC */
1981 static void
1982 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1983     RTPArrivalStats * arrival)
1984 {
1985   guint items, i, j;
1986   gboolean more_items, more_entries;
1987
1988   items = gst_rtcp_packet_sdes_get_item_count (packet);
1989   GST_DEBUG ("got SDES packet with %d items", items);
1990
1991   more_items = gst_rtcp_packet_sdes_first_item (packet);
1992   i = 0;
1993   while (more_items) {
1994     guint32 ssrc;
1995     gboolean changed, created, validated;
1996     RTPSource *source;
1997     GstStructure *sdes;
1998
1999     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2000
2001     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2002
2003     changed = FALSE;
2004
2005     /* find src, no probation when dealing with RTCP */
2006     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
2007     if (!source)
2008       return;
2009
2010     sdes = gst_structure_new ("application/x-rtp-source-sdes", NULL);
2011
2012     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2013     j = 0;
2014     while (more_entries) {
2015       GstRTCPSDESType type;
2016       guint8 len;
2017       guint8 *data;
2018       gchar *name;
2019       gchar *value;
2020
2021       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2022
2023       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2024           data);
2025
2026       if (type == GST_RTCP_SDES_PRIV) {
2027         name = g_strndup ((const gchar *) &data[1], data[0]);
2028         len -= data[0] + 1;
2029         data += data[0] + 1;
2030       } else {
2031         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2032       }
2033
2034       value = g_strndup ((const gchar *) data, len);
2035
2036       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2037
2038       g_free (name);
2039       g_free (value);
2040
2041       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2042       j++;
2043     }
2044
2045     /* takes ownership of sdes */
2046     changed = rtp_source_set_sdes_struct (source, sdes);
2047
2048     validated = !RTP_SOURCE_IS_ACTIVE (source);
2049     source->validated = TRUE;
2050
2051     /* source became active */
2052     if (validated) {
2053       sess->stats.active_sources++;
2054       GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
2055           sess->stats.active_sources);
2056       on_ssrc_validated (sess, source);
2057     }
2058
2059     if (created)
2060       on_new_ssrc (sess, source);
2061     if (changed)
2062       on_ssrc_sdes (sess, source);
2063
2064     g_object_unref (source);
2065
2066     more_items = gst_rtcp_packet_sdes_next_item (packet);
2067     i++;
2068   }
2069 }
2070
2071 /* BYE is sent when a client leaves the session
2072  */
2073 static void
2074 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2075     RTPArrivalStats * arrival)
2076 {
2077   guint count, i;
2078   gchar *reason;
2079   gboolean reconsider = FALSE;
2080
2081   reason = gst_rtcp_packet_bye_get_reason (packet);
2082   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2083
2084   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2085   for (i = 0; i < count; i++) {
2086     guint32 ssrc;
2087     RTPSource *source;
2088     gboolean created, prevactive, prevsender;
2089     guint pmembers, members;
2090
2091     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2092     GST_DEBUG ("SSRC: %08x", ssrc);
2093
2094     if (ssrc == sess->source->ssrc)
2095       return;
2096
2097     /* find src and mark bye, no probation when dealing with RTCP */
2098     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
2099     if (!source)
2100       return;
2101
2102     /* store time for when we need to time out this source */
2103     source->bye_time = arrival->current_time;
2104
2105     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2106     prevsender = RTP_SOURCE_IS_SENDER (source);
2107
2108     /* let the source handle the rest */
2109     rtp_source_process_bye (source, reason);
2110
2111     pmembers = sess->stats.active_sources;
2112
2113     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
2114       sess->stats.active_sources--;
2115       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
2116           sess->stats.active_sources);
2117     }
2118     if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
2119       sess->stats.sender_sources--;
2120       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
2121           sess->stats.sender_sources);
2122     }
2123     members = sess->stats.active_sources;
2124
2125     if (!sess->source->received_bye && members < pmembers) {
2126       /* some members went away since the previous timeout estimate.
2127        * Perform reverse reconsideration but only when we are not scheduling a
2128        * BYE ourselves. */
2129       if (arrival->current_time < sess->next_rtcp_check_time) {
2130         GstClockTime time_remaining;
2131
2132         time_remaining = sess->next_rtcp_check_time - arrival->current_time;
2133         sess->next_rtcp_check_time =
2134             gst_util_uint64_scale (time_remaining, members, pmembers);
2135
2136         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2137             GST_TIME_ARGS (sess->next_rtcp_check_time));
2138
2139         sess->next_rtcp_check_time += arrival->current_time;
2140
2141         /* mark pending reconsider. We only want to signal the reconsideration
2142          * once after we handled all the source in the bye packet */
2143         reconsider = TRUE;
2144       }
2145     }
2146
2147     if (created)
2148       on_new_ssrc (sess, source);
2149
2150     on_bye_ssrc (sess, source);
2151
2152     g_object_unref (source);
2153   }
2154   if (reconsider) {
2155     RTP_SESSION_UNLOCK (sess);
2156     /* notify app of reconsideration */
2157     if (sess->callbacks.reconsider)
2158       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2159     RTP_SESSION_LOCK (sess);
2160   }
2161   g_free (reason);
2162 }
2163
2164 static void
2165 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2166     RTPArrivalStats * arrival)
2167 {
2168   GST_DEBUG ("received APP");
2169 }
2170
2171 static gboolean
2172 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2173     gboolean fir, GstClockTime current_time)
2174 {
2175   guint32 round_trip = 0;
2176
2177   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
2178
2179   if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2180     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2181         GST_SECOND, 65536);
2182
2183     if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE &&
2184         current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
2185       GST_DEBUG ("Ignoring %s request because one was send without one "
2186           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2187           fir ? "FIR" : "PLI",
2188           GST_TIME_ARGS (current_time - sess->last_keyframe_request),
2189           GST_TIME_ARGS (round_trip_in_ns));;
2190       return FALSE;
2191     }
2192   }
2193
2194   sess->last_keyframe_request = current_time;
2195
2196   GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
2197       rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
2198       sess->callbacks.request_key_unit);
2199
2200   RTP_SESSION_UNLOCK (sess);
2201   sess->callbacks.request_key_unit (sess, fir,
2202       sess->request_key_unit_user_data);
2203   RTP_SESSION_LOCK (sess);
2204
2205   return TRUE;
2206 }
2207
2208 static void
2209 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2210     guint32 media_ssrc, GstClockTime current_time)
2211 {
2212   RTPSource *src;
2213
2214   if (!sess->callbacks.request_key_unit)
2215     return;
2216
2217   src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
2218       GINT_TO_POINTER (sender_ssrc));
2219   if (!src)
2220     return;
2221
2222   rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
2223 }
2224
2225 static void
2226 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2227     guint8 * fci_data, guint fci_length, GstClockTime current_time)
2228 {
2229   RTPSource *src;
2230   guint32 ssrc;
2231   guint position = 0;
2232   gboolean our_request = FALSE;
2233
2234   if (!sess->callbacks.request_key_unit)
2235     return;
2236
2237   if (fci_length < 8)
2238     return;
2239
2240   src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
2241       GINT_TO_POINTER (sender_ssrc));
2242
2243   /* Hack because Google fails to set the sender_ssrc correctly */
2244   if (!src && sender_ssrc == 1) {
2245     GHashTableIter iter;
2246
2247     if (sess->stats.sender_sources >
2248         RTP_SOURCE_IS_SENDER (sess->source) ? 2 : 1)
2249       return;
2250
2251     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2252
2253     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2254       if (src != sess->source && rtp_source_is_sender (src))
2255         break;
2256       src = NULL;
2257     }
2258   }
2259
2260   if (!src)
2261     return;
2262
2263   for (position = 0; position < fci_length; position += 8) {
2264     guint8 *data = fci_data + position;
2265
2266     ssrc = GST_READ_UINT32_BE (data);
2267
2268     if (ssrc == rtp_source_get_ssrc (sess->source)) {
2269       our_request = TRUE;
2270       break;
2271     }
2272   }
2273   if (!our_request)
2274     return;
2275
2276   rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
2277 }
2278
2279 static void
2280 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2281     RTPArrivalStats * arrival, GstClockTime current_time)
2282 {
2283   GstRTCPType type = gst_rtcp_packet_get_type (packet);
2284   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
2285   guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2286   guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2287   guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
2288   guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
2289
2290   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2291       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2292
2293   if (g_signal_has_handler_pending (sess,
2294           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2295     GstBuffer *fci_buffer = NULL;
2296
2297     if (fci_length > 0) {
2298       fci_buffer = gst_buffer_create_sub (packet->buffer,
2299           fci_data - GST_BUFFER_DATA (packet->buffer), fci_length);
2300       GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
2301     }
2302
2303     RTP_SESSION_UNLOCK (sess);
2304     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2305         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2306     RTP_SESSION_LOCK (sess);
2307
2308     if (fci_buffer)
2309       gst_buffer_unref (fci_buffer);
2310   }
2311
2312   if (sess->rtcp_feedback_retention_window) {
2313     RTPSource *src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
2314         GINT_TO_POINTER (media_ssrc));
2315
2316     if (src)
2317       rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
2318   }
2319
2320   if (rtp_source_get_ssrc (sess->source) == media_ssrc ||
2321       /* PSFB FIR puts the media ssrc inside the FCI */
2322       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
2323     switch (type) {
2324       case GST_RTCP_TYPE_PSFB:
2325         switch (fbtype) {
2326           case GST_RTCP_PSFB_TYPE_PLI:
2327             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2328                 current_time);
2329             break;
2330           case GST_RTCP_PSFB_TYPE_FIR:
2331             rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
2332                 current_time);
2333             break;
2334           default:
2335             break;
2336         }
2337         break;
2338       case GST_RTCP_TYPE_RTPFB:
2339       default:
2340         break;
2341     }
2342   }
2343 }
2344
2345 /**
2346  * rtp_session_process_rtcp:
2347  * @sess: and #RTPSession
2348  * @buffer: an RTCP buffer
2349  * @current_time: the current system time
2350  * @ntpnstime: the current NTP time in nanoseconds
2351  *
2352  * Process an RTCP buffer in the session manager. This function takes ownership
2353  * of @buffer.
2354  *
2355  * Returns: a #GstFlowReturn.
2356  */
2357 GstFlowReturn
2358 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2359     GstClockTime current_time, guint64 ntpnstime)
2360 {
2361   GstRTCPPacket packet;
2362   gboolean more, is_bye = FALSE, do_sync = FALSE;
2363   RTPArrivalStats arrival;
2364   GstFlowReturn result = GST_FLOW_OK;
2365
2366   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2367   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2368
2369   if (!gst_rtcp_buffer_validate (buffer))
2370     goto invalid_packet;
2371
2372   GST_DEBUG ("received RTCP packet");
2373
2374   RTP_SESSION_LOCK (sess);
2375   /* update arrival stats */
2376   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
2377       ntpnstime);
2378
2379   if (sess->sent_bye)
2380     goto ignore;
2381
2382   /* start processing the compound packet */
2383   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
2384   while (more) {
2385     GstRTCPType type;
2386
2387     type = gst_rtcp_packet_get_type (&packet);
2388
2389     /* when we are leaving the session, we should ignore all non-BYE messages */
2390     if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
2391       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
2392       goto next;
2393     }
2394
2395     switch (type) {
2396       case GST_RTCP_TYPE_SR:
2397         rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
2398         break;
2399       case GST_RTCP_TYPE_RR:
2400         rtp_session_process_rr (sess, &packet, &arrival);
2401         break;
2402       case GST_RTCP_TYPE_SDES:
2403         rtp_session_process_sdes (sess, &packet, &arrival);
2404         break;
2405       case GST_RTCP_TYPE_BYE:
2406         is_bye = TRUE;
2407         /* don't try to attempt lip-sync anymore for streams with a BYE */
2408         do_sync = FALSE;
2409         rtp_session_process_bye (sess, &packet, &arrival);
2410         break;
2411       case GST_RTCP_TYPE_APP:
2412         rtp_session_process_app (sess, &packet, &arrival);
2413         break;
2414       case GST_RTCP_TYPE_RTPFB:
2415       case GST_RTCP_TYPE_PSFB:
2416         rtp_session_process_feedback (sess, &packet, &arrival, current_time);
2417         break;
2418       default:
2419         GST_WARNING ("got unknown RTCP packet");
2420         break;
2421     }
2422   next:
2423     more = gst_rtcp_packet_move_to_next (&packet);
2424   }
2425
2426   /* if we are scheduling a BYE, we only want to count bye packets, else we
2427    * count everything */
2428   if (sess->source->received_bye) {
2429     if (is_bye) {
2430       sess->stats.bye_members++;
2431       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2432     }
2433   } else {
2434     /* keep track of average packet size */
2435     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2436   }
2437   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2438       sess->stats.avg_rtcp_packet_size, arrival.bytes);
2439   RTP_SESSION_UNLOCK (sess);
2440
2441   /* notify caller of sr packets in the callback */
2442   if (do_sync && sess->callbacks.sync_rtcp) {
2443     /* make writable, we might want to change the buffer */
2444     buffer = gst_buffer_make_metadata_writable (buffer);
2445
2446     result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
2447         sess->sync_rtcp_user_data);
2448   } else
2449     gst_buffer_unref (buffer);
2450
2451   return result;
2452
2453   /* ERRORS */
2454 invalid_packet:
2455   {
2456     GST_DEBUG ("invalid RTCP packet received");
2457     gst_buffer_unref (buffer);
2458     return GST_FLOW_OK;
2459   }
2460 ignore:
2461   {
2462     gst_buffer_unref (buffer);
2463     RTP_SESSION_UNLOCK (sess);
2464     GST_DEBUG ("ignoring RTP packet because we left");
2465     return GST_FLOW_OK;
2466   }
2467 }
2468
2469 /**
2470  * rtp_session_send_rtp:
2471  * @sess: an #RTPSession
2472  * @data: pointer to either an RTP buffer or a list of RTP buffers
2473  * @is_list: TRUE when @data is a buffer list
2474  * @current_time: the current system time
2475  * @running_time: the running time of @data
2476  *
2477  * Send the RTP buffer in the session manager. This function takes ownership of
2478  * @buffer.
2479  *
2480  * Returns: a #GstFlowReturn.
2481  */
2482 GstFlowReturn
2483 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2484     GstClockTime current_time, GstClockTime running_time)
2485 {
2486   GstFlowReturn result;
2487   RTPSource *source;
2488   gboolean prevsender;
2489   gboolean valid_packet;
2490   guint64 oldrate;
2491
2492   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2493   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2494
2495   if (is_list) {
2496     valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data));
2497   } else {
2498     valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data));
2499   }
2500
2501   if (!valid_packet)
2502     goto invalid_packet;
2503
2504   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2505
2506   RTP_SESSION_LOCK (sess);
2507   source = sess->source;
2508
2509   /* update last activity */
2510   source->last_rtp_activity = current_time;
2511
2512   prevsender = RTP_SOURCE_IS_SENDER (source);
2513   oldrate = source->bitrate;
2514
2515   /* we use our own source to send */
2516   result = rtp_source_send_rtp (source, data, is_list, running_time);
2517
2518   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
2519     sess->stats.sender_sources++;
2520   if (oldrate != source->bitrate)
2521     sess->recalc_bandwidth = TRUE;
2522   RTP_SESSION_UNLOCK (sess);
2523
2524   return result;
2525
2526   /* ERRORS */
2527 invalid_packet:
2528   {
2529     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2530     GST_DEBUG ("invalid RTP packet received");
2531     return GST_FLOW_OK;
2532   }
2533 }
2534
2535 static void
2536 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2537 {
2538   *bandwidth += source->bitrate;
2539 }
2540
2541 static GstClockTime
2542 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2543     gboolean first)
2544 {
2545   GstClockTime result;
2546
2547   /* recalculate bandwidth when it changed */
2548   if (sess->recalc_bandwidth) {
2549     gdouble bandwidth;
2550
2551     if (sess->bandwidth > 0)
2552       bandwidth = sess->bandwidth;
2553     else {
2554       /* If it is <= 0, then try to estimate the actual bandwidth */
2555       bandwidth = sess->source->bitrate;
2556
2557       g_hash_table_foreach (sess->cnames, (GHFunc) add_bitrates, &bandwidth);
2558       bandwidth /= 8.0;
2559     }
2560     if (bandwidth < 8000)
2561       bandwidth = RTP_STATS_BANDWIDTH;
2562
2563     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2564         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2565
2566     sess->recalc_bandwidth = FALSE;
2567   }
2568
2569   if (sess->source->received_bye) {
2570     result = rtp_stats_calculate_bye_interval (&sess->stats);
2571   } else {
2572     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
2573         RTP_SOURCE_IS_SENDER (sess->source), first);
2574   }
2575
2576   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2577       GST_TIME_ARGS (result), first);
2578
2579   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2580     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
2581
2582   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2583
2584   return result;
2585 }
2586
2587 /* Stop the current @sess and schedule a BYE message for the other members.
2588  * One must have the session lock to call this function
2589  */
2590 static GstFlowReturn
2591 rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason,
2592     GstClockTime current_time)
2593 {
2594   GstFlowReturn result = GST_FLOW_OK;
2595   RTPSource *source;
2596   GstClockTime interval;
2597
2598   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2599
2600   source = sess->source;
2601
2602   /* ignore more BYEs */
2603   if (source->received_bye)
2604     goto done;
2605
2606   /* we have BYE now */
2607   source->received_bye = TRUE;
2608   /* at least one member wants to send a BYE */
2609   g_free (sess->bye_reason);
2610   sess->bye_reason = g_strdup (reason);
2611   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
2612   sess->stats.bye_members = 1;
2613   sess->first_rtcp = TRUE;
2614   sess->sent_bye = FALSE;
2615   sess->allow_early = TRUE;
2616
2617   /* reschedule transmission */
2618   sess->last_rtcp_send_time = current_time;
2619   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2620   sess->next_rtcp_check_time = current_time + interval;
2621
2622   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2623       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2624
2625   RTP_SESSION_UNLOCK (sess);
2626   /* notify app of reconsideration */
2627   if (sess->callbacks.reconsider)
2628     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2629   RTP_SESSION_LOCK (sess);
2630 done:
2631
2632   return result;
2633 }
2634
2635 /**
2636  * rtp_session_schedule_bye:
2637  * @sess: an #RTPSession
2638  * @reason: a reason or NULL
2639  * @current_time: the current system time
2640  *
2641  * Stop the current @sess and schedule a BYE message for the other members.
2642  *
2643  * Returns: a #GstFlowReturn.
2644  */
2645 GstFlowReturn
2646 rtp_session_schedule_bye (RTPSession * sess, const gchar * reason,
2647     GstClockTime current_time)
2648 {
2649   GstFlowReturn result = GST_FLOW_OK;
2650
2651   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2652
2653   RTP_SESSION_LOCK (sess);
2654   result = rtp_session_schedule_bye_locked (sess, reason, current_time);
2655   RTP_SESSION_UNLOCK (sess);
2656
2657   return result;
2658 }
2659
2660 /**
2661  * rtp_session_next_timeout:
2662  * @sess: an #RTPSession
2663  * @current_time: the current system time
2664  *
2665  * Get the next time we should perform session maintenance tasks.
2666  *
2667  * Returns: a time when rtp_session_on_timeout() should be called with the
2668  * current system time.
2669  */
2670 GstClockTime
2671 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2672 {
2673   GstClockTime result, interval = 0;
2674
2675   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2676
2677   RTP_SESSION_LOCK (sess);
2678
2679   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
2680     result = sess->next_early_rtcp_time;
2681     goto early_exit;
2682   }
2683
2684   result = sess->next_rtcp_check_time;
2685
2686   GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
2687       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2688
2689   if (result < current_time) {
2690     GST_DEBUG ("take current time as base");
2691     /* our previous check time expired, start counting from the current time
2692      * again. */
2693     result = current_time;
2694   }
2695
2696   if (sess->source->received_bye) {
2697     if (sess->sent_bye) {
2698       GST_DEBUG ("we sent BYE already");
2699       interval = GST_CLOCK_TIME_NONE;
2700     } else if (sess->stats.active_sources >= 50) {
2701       GST_DEBUG ("reconsider BYE, more than 50 sources");
2702       /* reconsider BYE if members >= 50 */
2703       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2704     }
2705   } else {
2706     if (sess->first_rtcp) {
2707       GST_DEBUG ("first RTCP packet");
2708       /* we are called for the first time */
2709       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2710     } else if (sess->next_rtcp_check_time < current_time) {
2711       GST_DEBUG ("old check time expired, getting new timeout");
2712       /* get a new timeout when we need to */
2713       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2714     }
2715   }
2716
2717   if (interval != GST_CLOCK_TIME_NONE)
2718     result += interval;
2719   else
2720     result = GST_CLOCK_TIME_NONE;
2721
2722   sess->next_rtcp_check_time = result;
2723
2724 early_exit:
2725
2726   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2727       ", next time: %" GST_TIME_FORMAT,
2728       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2729   RTP_SESSION_UNLOCK (sess);
2730
2731   return result;
2732 }
2733
2734 typedef struct
2735 {
2736   RTPSession *sess;
2737   GstBuffer *rtcp;
2738   GstClockTime current_time;
2739   guint64 ntpnstime;
2740   GstClockTime running_time;
2741   GstClockTime interval;
2742   GstRTCPPacket packet;
2743   gboolean is_bye;
2744   gboolean has_sdes;
2745   gboolean is_early;
2746   gboolean may_suppress;
2747 } ReportData;
2748
2749 static void
2750 session_start_rtcp (RTPSession * sess, ReportData * data)
2751 {
2752   GstRTCPPacket *packet = &data->packet;
2753   RTPSource *own = sess->source;
2754
2755   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2756
2757   if (RTP_SOURCE_IS_SENDER (own)) {
2758     guint64 ntptime;
2759     guint32 rtptime;
2760     guint32 packet_count, octet_count;
2761
2762     /* we are a sender, create SR */
2763     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2764     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
2765
2766     /* get latest stats */
2767     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
2768         &ntptime, &rtptime, &packet_count, &octet_count);
2769     /* store stats */
2770     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2771         packet_count, octet_count);
2772
2773     /* fill in sender report info */
2774     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2775         ntptime, rtptime, packet_count, octet_count);
2776   } else {
2777     /* we are only receiver, create RR */
2778     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2779     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
2780     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2781   }
2782 }
2783
2784 /* construct a Sender or Receiver Report */
2785 static void
2786 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2787 {
2788   RTPSession *sess = data->sess;
2789   GstRTCPPacket *packet = &data->packet;
2790
2791   /* create a new buffer if needed */
2792   if (data->rtcp == NULL) {
2793     session_start_rtcp (sess, data);
2794   } else if (data->is_early) {
2795     /* Put a single RR or SR in minimal compound packets */
2796     return;
2797   }
2798   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
2799     /* only report about other sender sources */
2800     if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
2801       guint8 fractionlost;
2802       gint32 packetslost;
2803       guint32 exthighestseq, jitter;
2804       guint32 lsr, dlsr;
2805
2806       /* get new stats */
2807       rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2808           &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2809
2810       /* store last generated RR packet */
2811       source->last_rr.is_valid = TRUE;
2812       source->last_rr.fractionlost = fractionlost;
2813       source->last_rr.packetslost = packetslost;
2814       source->last_rr.exthighestseq = exthighestseq;
2815       source->last_rr.jitter = jitter;
2816       source->last_rr.lsr = lsr;
2817       source->last_rr.dlsr = dlsr;
2818
2819       /* packet is not yet filled, add report block for this source. */
2820       gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2821           exthighestseq, jitter, lsr, dlsr);
2822     }
2823   }
2824 }
2825
2826 /* perform cleanup of sources that timed out */
2827 static void
2828 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2829 {
2830   gboolean remove = FALSE;
2831   gboolean byetimeout = FALSE;
2832   gboolean sendertimeout = FALSE;
2833   gboolean is_sender, is_active;
2834   RTPSession *sess = data->sess;
2835   GstClockTime interval, binterval;
2836   GstClockTime btime;
2837
2838   is_sender = RTP_SOURCE_IS_SENDER (source);
2839   is_active = RTP_SOURCE_IS_ACTIVE (source);
2840
2841   /* our own rtcp interval may have been forced low by secondary configuration,
2842    * while sender side may still operate with higher interval,
2843    * so do not just take our interval to decide on timing out sender,
2844    * but take (if data->interval <= 5 * GST_SECOND):
2845    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
2846    * where sender_interval is difference between last 2 received RTCP reports
2847    */
2848   if (data->interval >= 5 * GST_SECOND || (source == sess->source)) {
2849     binterval = data->interval;
2850   } else {
2851     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
2852         GST_TIME_ARGS (source->stats.prev_rtcptime),
2853         GST_TIME_ARGS (source->stats.last_rtcptime));
2854     /* if not received enough yet, fallback to larger default */
2855     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
2856       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
2857     else
2858       binterval = 5 * GST_SECOND;
2859     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
2860   }
2861   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
2862       GST_TIME_ARGS (binterval));
2863
2864   /* check for our own source, we don't want to delete our own source. */
2865   if (!(source == sess->source)) {
2866     if (source->received_bye) {
2867       /* if we received a BYE from the source, remove the source after some
2868        * time. */
2869       if (data->current_time > source->bye_time &&
2870           data->current_time - source->bye_time > sess->stats.bye_timeout) {
2871         GST_DEBUG ("removing BYE source %08x", source->ssrc);
2872         remove = TRUE;
2873         byetimeout = TRUE;
2874       }
2875     }
2876     /* sources that were inactive for more than 5 times the deterministic reporting
2877      * interval get timed out. the min timeout is 5 seconds. */
2878     /* mind old time that might pre-date last time going to PLAYING */
2879     btime = MAX (source->last_activity, sess->start_time);
2880     if (data->current_time > btime) {
2881       interval = MAX (binterval * 5, 5 * GST_SECOND);
2882       if (data->current_time - btime > interval) {
2883         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2884             source->ssrc, GST_TIME_ARGS (btime));
2885         remove = TRUE;
2886       }
2887     }
2888   }
2889
2890   /* senders that did not send for a long time become a receiver, this also
2891    * holds for our own source. */
2892   if (is_sender) {
2893     /* mind old time that might pre-date last time going to PLAYING */
2894     btime = MAX (source->last_rtp_activity, sess->start_time);
2895     if (data->current_time > btime) {
2896       interval = MAX (binterval * 2, 5 * GST_SECOND);
2897       if (data->current_time - btime > interval) {
2898         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
2899             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
2900         source->is_sender = FALSE;
2901         sess->stats.sender_sources--;
2902         sendertimeout = TRUE;
2903       }
2904     }
2905   }
2906
2907   if (remove) {
2908     sess->total_sources--;
2909     if (is_sender)
2910       sess->stats.sender_sources--;
2911     if (is_active)
2912       sess->stats.active_sources--;
2913
2914     if (byetimeout)
2915       on_bye_timeout (sess, source);
2916     else
2917       on_timeout (sess, source);
2918   } else {
2919     if (sendertimeout)
2920       on_sender_timeout (sess, source);
2921   }
2922
2923   source->closing = remove;
2924 }
2925
2926 static void
2927 session_sdes (RTPSession * sess, ReportData * data)
2928 {
2929   GstRTCPPacket *packet = &data->packet;
2930   const GstStructure *sdes;
2931   gint i, n_fields;
2932
2933   /* add SDES packet */
2934   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
2935
2936   gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
2937
2938   sdes = rtp_source_get_sdes_struct (sess->source);
2939
2940   /* add all fields in the structure, the order is not important. */
2941   n_fields = gst_structure_n_fields (sdes);
2942   for (i = 0; i < n_fields; ++i) {
2943     const gchar *field;
2944     const gchar *value;
2945     GstRTCPSDESType type;
2946
2947     field = gst_structure_nth_field_name (sdes, i);
2948     if (field == NULL)
2949       continue;
2950     value = gst_structure_get_string (sdes, field);
2951     if (value == NULL)
2952       continue;
2953     type = gst_rtcp_sdes_name_to_type (field);
2954
2955     /* Early packets are minimal and only include the CNAME */
2956     if (data->is_early && type != GST_RTCP_SDES_CNAME)
2957       continue;
2958
2959     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
2960       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
2961           (const guint8 *) value);
2962     } else if (type == GST_RTCP_SDES_PRIV) {
2963       gsize prefix_len;
2964       gsize value_len;
2965       gsize data_len;
2966       guint8 data[256];
2967
2968       /* don't accept entries that are too big */
2969       prefix_len = strlen (field);
2970       if (prefix_len > 255)
2971         continue;
2972       value_len = strlen (value);
2973       if (value_len > 255)
2974         continue;
2975       data_len = 1 + prefix_len + value_len;
2976       if (data_len > 255)
2977         continue;
2978
2979       data[0] = prefix_len;
2980       memcpy (&data[1], field, prefix_len);
2981       memcpy (&data[1 + prefix_len], value, value_len);
2982
2983       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
2984     }
2985   }
2986
2987   data->has_sdes = TRUE;
2988 }
2989
2990 /* schedule a BYE packet */
2991 static void
2992 session_bye (RTPSession * sess, ReportData * data)
2993 {
2994   GstRTCPPacket *packet = &data->packet;
2995
2996   /* open packet */
2997   session_start_rtcp (sess, data);
2998
2999   /* add SDES */
3000   session_sdes (sess, data);
3001
3002   /* add a BYE packet */
3003   gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
3004   gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
3005   if (sess->bye_reason)
3006     gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
3007
3008   /* we have a BYE packet now */
3009   data->is_bye = TRUE;
3010 }
3011
3012 static gboolean
3013 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
3014 {
3015   GstClockTime new_send_time, elapsed;
3016
3017   if (data->is_early && sess->next_early_rtcp_time < current_time)
3018     goto early;
3019
3020   /* no need to check yet */
3021   if (sess->next_rtcp_check_time > current_time) {
3022     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
3023         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
3024         GST_TIME_ARGS (current_time));
3025     return FALSE;
3026   }
3027
3028   /* get elapsed time since we last reported */
3029   elapsed = current_time - sess->last_rtcp_send_time;
3030
3031   /* perform forward reconsideration */
3032   new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
3033
3034   GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
3035       GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), GST_TIME_ARGS (elapsed));
3036
3037   new_send_time += sess->last_rtcp_send_time;
3038
3039   /* check if reconsideration */
3040   if (current_time < new_send_time) {
3041     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
3042         GST_TIME_ARGS (new_send_time));
3043     /* store new check time */
3044     sess->next_rtcp_check_time = new_send_time;
3045     return FALSE;
3046   }
3047
3048 early:
3049
3050   new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
3051
3052   GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
3053       GST_TIME_ARGS (new_send_time));
3054   sess->next_rtcp_check_time = current_time + new_send_time;
3055
3056   /* Apply the rules from RFC 4585 section 3.5.3 */
3057   if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
3058     GstClockTimeDiff T_rr_current_interval = g_random_double_range (0.5, 1.5) *
3059         sess->stats.min_interval;
3060
3061     /* This will caused the RTCP to be suppressed if no FB packets are added */
3062     if (sess->last_rtcp_send_time + T_rr_current_interval >
3063         sess->next_rtcp_check_time) {
3064       GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
3065           " last: %" GST_TIME_FORMAT
3066           " + T_rr_current_interval: %" GST_TIME_FORMAT
3067           " >  sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
3068           GST_TIME_ARGS (sess->stats.min_interval),
3069           GST_TIME_ARGS (sess->last_rtcp_send_time),
3070           GST_TIME_ARGS (T_rr_current_interval),
3071           GST_TIME_ARGS (sess->next_rtcp_check_time));
3072       data->may_suppress = TRUE;
3073     }
3074   }
3075
3076   return TRUE;
3077 }
3078
3079 static void
3080 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
3081 {
3082   g_hash_table_insert (hash_table, key, g_object_ref (source));
3083 }
3084
3085 static gboolean
3086 remove_closing_sources (const gchar * key, RTPSource * source, gpointer * data)
3087 {
3088   return source->closing;
3089 }
3090
3091 /**
3092  * rtp_session_on_timeout:
3093  * @sess: an #RTPSession
3094  * @current_time: the current system time
3095  * @ntpnstime: the current NTP time in nanoseconds
3096  * @running_time: the current running_time of the pipeline
3097  *
3098  * Perform maintenance actions after the timeout obtained with
3099  * rtp_session_next_timeout() expired.
3100  *
3101  * This function will perform timeouts of receivers and senders, send a BYE
3102  * packet or generate RTCP packets with current session stats.
3103  *
3104  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
3105  * times, for each packet that should be processed.
3106  *
3107  * Returns: a #GstFlowReturn.
3108  */
3109 GstFlowReturn
3110 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
3111     guint64 ntpnstime, GstClockTime running_time)
3112 {
3113   GstFlowReturn result = GST_FLOW_OK;
3114   ReportData data;
3115   RTPSource *own;
3116   GHashTable *table_copy;
3117   gboolean notify = FALSE;
3118
3119   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3120
3121   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
3122       GST_TIME_ARGS (current_time), GST_TIME_ARGS (ntpnstime));
3123
3124   data.sess = sess;
3125   data.rtcp = NULL;
3126   data.current_time = current_time;
3127   data.ntpnstime = ntpnstime;
3128   data.is_bye = FALSE;
3129   data.has_sdes = FALSE;
3130   data.may_suppress = FALSE;
3131   data.running_time = running_time;
3132
3133   own = sess->source;
3134
3135   RTP_SESSION_LOCK (sess);
3136   /* get a new interval, we need this for various cleanups etc */
3137   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
3138
3139   /* Make a local copy of the hashtable. We need to do this because the
3140    * cleanup stage below releases the session lock. */
3141   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
3142       (GDestroyNotify) g_object_unref);
3143   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3144       (GHFunc) clone_ssrcs_hashtable, table_copy);
3145
3146   /* Clean up the session, mark the source for removing, this might release the
3147    * session lock. */
3148   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
3149   g_hash_table_destroy (table_copy);
3150
3151   /* Now remove the marked sources */
3152   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
3153       (GHRFunc) remove_closing_sources, NULL);
3154
3155   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3156     data.is_early = TRUE;
3157   else
3158     data.is_early = FALSE;
3159
3160   /* see if we need to generate SR or RR packets */
3161   if (is_rtcp_time (sess, current_time, &data)) {
3162     if (own->received_bye) {
3163       /* generate BYE instead */
3164       GST_DEBUG ("generating BYE message");
3165       session_bye (sess, &data);
3166       sess->sent_bye = TRUE;
3167     } else {
3168       /* loop over all known sources and do something */
3169       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3170           (GHFunc) session_report_blocks, &data);
3171     }
3172   }
3173
3174   if (data.rtcp) {
3175     /* we keep track of the last report time in order to timeout inactive
3176      * receivers or senders */
3177     if (!data.is_early && !data.may_suppress)
3178       sess->last_rtcp_send_time = data.current_time;
3179     sess->first_rtcp = FALSE;
3180     sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
3181
3182     /* add SDES for this source when not already added */
3183     if (!data.has_sdes)
3184       session_sdes (sess, &data);
3185   }
3186
3187   /* check for outdated collisions */
3188   GST_DEBUG ("Timing out collisions");
3189   rtp_source_timeout (sess->source, current_time,
3190       /* "a relatively long time" -- RFC 3550 section 8.2 */
3191       RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
3192       running_time - sess->rtcp_feedback_retention_window);
3193
3194   if (sess->change_ssrc) {
3195     GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
3196     g_hash_table_steal (sess->ssrcs[sess->mask_idx],
3197         GINT_TO_POINTER (own->ssrc));
3198
3199     own->ssrc = rtp_session_create_new_ssrc (sess);
3200     rtp_source_reset (own);
3201
3202     g_hash_table_insert (sess->ssrcs[sess->mask_idx],
3203         GINT_TO_POINTER (own->ssrc), own);
3204
3205     g_free (sess->bye_reason);
3206     sess->bye_reason = NULL;
3207     sess->sent_bye = FALSE;
3208     sess->change_ssrc = FALSE;
3209     notify = TRUE;
3210     GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
3211   }
3212
3213   sess->allow_early = TRUE;
3214
3215   RTP_SESSION_UNLOCK (sess);
3216
3217   if (notify)
3218     g_object_notify (G_OBJECT (sess), "internal-ssrc");
3219
3220   /* push out the RTCP packet */
3221   if (data.rtcp) {
3222     gboolean do_not_suppress;
3223
3224     /* Give the user a change to add its own packet */
3225     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
3226         data.rtcp, data.is_early, &do_not_suppress);
3227
3228     if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
3229       guint packet_size;
3230
3231       /* close the RTCP packet */
3232       gst_rtcp_buffer_end (data.rtcp);
3233
3234       packet_size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
3235
3236       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
3237       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
3238           sess->stats.avg_rtcp_packet_size, packet_size);
3239       result =
3240           sess->callbacks.send_rtcp (sess, own, data.rtcp, sess->sent_bye,
3241           sess->send_rtcp_user_data);
3242     } else {
3243       GST_DEBUG ("freeing packet callback: %p"
3244           " do_not_suppress: %d may_suppress: %d",
3245           sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
3246       gst_buffer_unref (data.rtcp);
3247     }
3248   }
3249
3250   return result;
3251 }
3252
3253 void
3254 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
3255     GstClockTimeDiff max_delay)
3256 {
3257   GstClockTime T_dither_max;
3258
3259   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
3260
3261   RTP_SESSION_LOCK (sess);
3262
3263   /* Check if already requested */
3264   /*  RFC 4585 section 3.5.2 step 2 */
3265   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3266     goto dont_send;
3267
3268   /* Ignore the request a scheduled packet will be in time anyway */
3269   if (current_time + max_delay > sess->next_rtcp_check_time)
3270     goto dont_send;
3271
3272   /*  RFC 4585 section 3.5.2 step 2b */
3273   /* If the total sources is <=2, then there is only us and one peer */
3274   if (sess->total_sources <= 2) {
3275     T_dither_max = 0;
3276   } else {
3277     /* Divide by 2 because l = 0.5 */
3278     T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
3279     T_dither_max /= 2;
3280   }
3281
3282   /*  RFC 4585 section 3.5.2 step 3 */
3283   if (current_time + T_dither_max > sess->next_rtcp_check_time)
3284     goto dont_send;
3285
3286   /*  RFC 4585 section 3.5.2 step 4
3287    * Don't send if allow_early is FALSE, but not if we are in
3288    * immediate mode, meaning we are part of a group of at most the
3289    * application-specific threshold.
3290    */
3291   if (sess->total_sources > sess->rtcp_immediate_feedback_threshold &&
3292       sess->allow_early == FALSE)
3293     goto dont_send;
3294
3295   if (T_dither_max) {
3296     /* Schedule an early transmission later */
3297     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
3298         current_time;
3299   } else {
3300     /* If no dithering, schedule it for NOW */
3301     sess->next_early_rtcp_time = current_time;
3302   }
3303
3304   RTP_SESSION_UNLOCK (sess);
3305
3306   /* notify app of need to send packet early
3307    * and therefore of timeout change */
3308   if (sess->callbacks.reconsider)
3309     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3310
3311   return;
3312
3313 dont_send:
3314
3315   RTP_SESSION_UNLOCK (sess);
3316 }
3317
3318 gboolean
3319 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, GstClockTime now,
3320     gboolean fir, gint count)
3321 {
3322   RTPSource *src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
3323       GUINT_TO_POINTER (ssrc));
3324
3325   if (!src)
3326     return FALSE;
3327
3328   if (fir) {
3329     src->send_pli = FALSE;
3330     src->send_fir = TRUE;
3331
3332     if (count == -1 || count != src->last_fir_count)
3333       src->current_send_fir_seqnum++;
3334     src->last_fir_count = count;
3335   } else if (!src->send_fir) {
3336     src->send_pli = TRUE;
3337   }
3338
3339   rtp_session_request_early_rtcp (sess, now, 200 * GST_MSECOND);
3340
3341   return TRUE;
3342 }
3343
3344 static gboolean
3345 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3346 {
3347   GstRTCPPacket packet;
3348
3349   packet.buffer = (GstBuffer *) a;
3350   packet.offset = 0;
3351
3352   if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3353       gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3354     return TRUE;
3355   else
3356     return FALSE;
3357 }
3358
3359 static gboolean
3360 rtp_session_on_sending_rtcp (RTPSession * sess, GstBuffer * buffer,
3361     gboolean early)
3362 {
3363   gboolean ret = FALSE;
3364   GHashTableIter iter;
3365   gpointer key, value;
3366   gboolean started_fir = FALSE;
3367   GstRTCPPacket fir_rtcppacket;
3368
3369   RTP_SESSION_LOCK (sess);
3370
3371   g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
3372   while (g_hash_table_iter_next (&iter, &key, &value)) {
3373     guint media_ssrc = GPOINTER_TO_UINT (key);
3374     RTPSource *media_src = value;
3375     guint8 *fci_data;
3376
3377     if (media_src->send_fir) {
3378       if (!started_fir) {
3379         if (!gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_PSFB,
3380                 &fir_rtcppacket))
3381           break;
3382         gst_rtcp_packet_fb_set_type (&fir_rtcppacket, GST_RTCP_PSFB_TYPE_FIR);
3383         gst_rtcp_packet_fb_set_sender_ssrc (&fir_rtcppacket,
3384             rtp_source_get_ssrc (sess->source));
3385         gst_rtcp_packet_fb_set_media_ssrc (&fir_rtcppacket, 0);
3386
3387         if (!gst_rtcp_packet_fb_set_fci_length (&fir_rtcppacket, 2)) {
3388           gst_rtcp_packet_remove (&fir_rtcppacket);
3389           break;
3390         }
3391         ret = TRUE;
3392         started_fir = TRUE;
3393       } else {
3394         if (!gst_rtcp_packet_fb_set_fci_length (&fir_rtcppacket,
3395                 !gst_rtcp_packet_fb_get_fci_length (&fir_rtcppacket) + 2))
3396           break;
3397       }
3398
3399       fci_data = gst_rtcp_packet_fb_get_fci (&fir_rtcppacket) -
3400           ((gst_rtcp_packet_fb_get_fci_length (&fir_rtcppacket) - 2) * 4);
3401
3402       GST_WRITE_UINT32_BE (fci_data, media_ssrc);
3403       fci_data += 4;
3404       fci_data[0] = media_src->current_send_fir_seqnum;
3405       fci_data[1] = fci_data[2] = fci_data[3] = 0;
3406       media_src->send_fir = FALSE;
3407     }
3408   }
3409
3410   g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
3411   while (g_hash_table_iter_next (&iter, &key, &value)) {
3412     guint media_ssrc = GPOINTER_TO_UINT (key);
3413     RTPSource *media_src = value;
3414     GstRTCPPacket pli_rtcppacket;
3415
3416     if (media_src->send_pli && !rtp_source_has_retained (media_src,
3417             has_pli_compare_func, NULL)) {
3418       if (gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_PSFB,
3419               &pli_rtcppacket)) {
3420         gst_rtcp_packet_fb_set_type (&pli_rtcppacket, GST_RTCP_PSFB_TYPE_PLI);
3421         gst_rtcp_packet_fb_set_sender_ssrc (&pli_rtcppacket,
3422             rtp_source_get_ssrc (sess->source));
3423         gst_rtcp_packet_fb_set_media_ssrc (&pli_rtcppacket, media_ssrc);
3424         ret = TRUE;
3425       } else {
3426         /* Break because the packet is full, will put next request in a
3427          * further packet
3428          */
3429         break;
3430       }
3431     }
3432     media_src->send_pli = FALSE;
3433   }
3434
3435   RTP_SESSION_UNLOCK (sess);
3436
3437   return ret;
3438 }
3439
3440 static void
3441 rtp_session_send_rtcp (RTPSession * sess, GstClockTimeDiff max_delay)
3442 {
3443   GstClockTime now;
3444
3445   if (!sess->callbacks.send_rtcp)
3446     return;
3447
3448   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
3449
3450   rtp_session_request_early_rtcp (sess, now, max_delay);
3451 }