rtpsession: Implement sending of reduced size RTCP packets
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25
26 #include <gst/rtp/gstrtpbuffer.h>
27 #include <gst/rtp/gstrtcpbuffer.h>
28
29 #include <gst/glib-compat-private.h>
30
31 #include "rtpsession.h"
32
33 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
34 #define GST_CAT_DEFAULT rtp_session_debug
35
36 /* signals and args */
37 enum
38 {
39   SIGNAL_GET_SOURCE_BY_SSRC,
40   SIGNAL_ON_NEW_SSRC,
41   SIGNAL_ON_SSRC_COLLISION,
42   SIGNAL_ON_SSRC_VALIDATED,
43   SIGNAL_ON_SSRC_ACTIVE,
44   SIGNAL_ON_SSRC_SDES,
45   SIGNAL_ON_BYE_SSRC,
46   SIGNAL_ON_BYE_TIMEOUT,
47   SIGNAL_ON_TIMEOUT,
48   SIGNAL_ON_SENDER_TIMEOUT,
49   SIGNAL_ON_SENDING_RTCP,
50   SIGNAL_ON_FEEDBACK_RTCP,
51   SIGNAL_SEND_RTCP,
52   SIGNAL_SEND_RTCP_FULL,
53   SIGNAL_ON_RECEIVING_RTCP,
54   SIGNAL_ON_NEW_SENDER_SSRC,
55   SIGNAL_ON_SENDER_SSRC_ACTIVE,
56   LAST_SIGNAL
57 };
58
59 #define DEFAULT_INTERNAL_SOURCE      NULL
60 #define DEFAULT_BANDWIDTH            0.0
61 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
62 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
63 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
64 #define DEFAULT_RTCP_MTU             1400
65 #define DEFAULT_SDES                 NULL
66 #define DEFAULT_NUM_SOURCES          0
67 #define DEFAULT_NUM_ACTIVE_SOURCES   0
68 #define DEFAULT_SOURCES              NULL
69 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
70 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
71 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
72 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
73 #define DEFAULT_MAX_DROPOUT_TIME     60000
74 #define DEFAULT_MAX_MISORDER_TIME    2000
75 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
76 #define DEFAULT_RTCP_REDUCED_SIZE    FALSE
77
78 enum
79 {
80   PROP_0,
81   PROP_INTERNAL_SSRC,
82   PROP_INTERNAL_SOURCE,
83   PROP_BANDWIDTH,
84   PROP_RTCP_FRACTION,
85   PROP_RTCP_RR_BANDWIDTH,
86   PROP_RTCP_RS_BANDWIDTH,
87   PROP_RTCP_MTU,
88   PROP_SDES,
89   PROP_NUM_SOURCES,
90   PROP_NUM_ACTIVE_SOURCES,
91   PROP_SOURCES,
92   PROP_FAVOR_NEW,
93   PROP_RTCP_MIN_INTERVAL,
94   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
95   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
96   PROP_PROBATION,
97   PROP_MAX_DROPOUT_TIME,
98   PROP_MAX_MISORDER_TIME,
99   PROP_STATS,
100   PROP_RTP_PROFILE,
101   PROP_RTCP_REDUCED_SIZE
102 };
103
104 /* update average packet size */
105 #define INIT_AVG(avg, val) \
106    (avg) = (val);
107 #define UPDATE_AVG(avg, val)            \
108   if ((avg) == 0)                       \
109    (avg) = (val);                       \
110   else                                  \
111    (avg) = ((val) + (15 * (avg))) >> 4;
112
113
114 /* GObject vmethods */
115 static void rtp_session_finalize (GObject * object);
116 static void rtp_session_set_property (GObject * object, guint prop_id,
117     const GValue * value, GParamSpec * pspec);
118 static void rtp_session_get_property (GObject * object, guint prop_id,
119     GValue * value, GParamSpec * pspec);
120
121 static gboolean rtp_session_send_rtcp (RTPSession * sess,
122     GstClockTime max_delay);
123
124 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
125
126 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
127
128 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
129 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
130     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
131 static RTPSource *obtain_internal_source (RTPSession * sess,
132     guint32 ssrc, gboolean * created, GstClockTime current_time);
133 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
134     GstClockTime current_time);
135 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
136     gboolean deterministic, gboolean first);
137
138 static gboolean
139 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
140     const GValue * handler_return, gpointer data)
141 {
142   if (g_value_get_boolean (handler_return))
143     g_value_set_boolean (return_accu, TRUE);
144
145   return TRUE;
146 }
147
148 static void
149 rtp_session_class_init (RTPSessionClass * klass)
150 {
151   GObjectClass *gobject_class;
152
153   gobject_class = (GObjectClass *) klass;
154
155   gobject_class->finalize = rtp_session_finalize;
156   gobject_class->set_property = rtp_session_set_property;
157   gobject_class->get_property = rtp_session_get_property;
158
159   /**
160    * RTPSession::get-source-by-ssrc:
161    * @session: the object which received the signal
162    * @ssrc: the SSRC of the RTPSource
163    *
164    * Request the #RTPSource object with SSRC @ssrc in @session.
165    */
166   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
167       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
168       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
169           get_source_by_ssrc), NULL, NULL, g_cclosure_marshal_generic,
170       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
171
172   /**
173    * RTPSession::on-new-ssrc:
174    * @session: the object which received the signal
175    * @src: the new RTPSource
176    *
177    * Notify of a new SSRC that entered @session.
178    */
179   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
180       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
181       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
182       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
183       RTP_TYPE_SOURCE);
184   /**
185    * RTPSession::on-ssrc-collision:
186    * @session: the object which received the signal
187    * @src: the #RTPSource that caused a collision
188    *
189    * Notify when we have an SSRC collision
190    */
191   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
192       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
193       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
194       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
195       RTP_TYPE_SOURCE);
196   /**
197    * RTPSession::on-ssrc-validated:
198    * @session: the object which received the signal
199    * @src: the new validated RTPSource
200    *
201    * Notify of a new SSRC that became validated.
202    */
203   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
204       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
205       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
206       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
207       RTP_TYPE_SOURCE);
208   /**
209    * RTPSession::on-ssrc-active:
210    * @session: the object which received the signal
211    * @src: the active RTPSource
212    *
213    * Notify of a SSRC that is active, i.e., sending RTCP.
214    */
215   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
216       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
217       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
218       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
219       RTP_TYPE_SOURCE);
220   /**
221    * RTPSession::on-ssrc-sdes:
222    * @session: the object which received the signal
223    * @src: the RTPSource
224    *
225    * Notify that a new SDES was received for SSRC.
226    */
227   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
228       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
229       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
230       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
231       RTP_TYPE_SOURCE);
232   /**
233    * RTPSession::on-bye-ssrc:
234    * @session: the object which received the signal
235    * @src: the RTPSource that went away
236    *
237    * Notify of an SSRC that became inactive because of a BYE packet.
238    */
239   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
240       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
241       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
242       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
243       RTP_TYPE_SOURCE);
244   /**
245    * RTPSession::on-bye-timeout:
246    * @session: the object which received the signal
247    * @src: the RTPSource that timed out
248    *
249    * Notify of an SSRC that has timed out because of BYE
250    */
251   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
252       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
253       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
254       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
255       RTP_TYPE_SOURCE);
256   /**
257    * RTPSession::on-timeout:
258    * @session: the object which received the signal
259    * @src: the RTPSource that timed out
260    *
261    * Notify of an SSRC that has timed out
262    */
263   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
264       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
265       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
266       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
267       RTP_TYPE_SOURCE);
268   /**
269    * RTPSession::on-sender-timeout:
270    * @session: the object which received the signal
271    * @src: the RTPSource that timed out
272    *
273    * Notify of an SSRC that was a sender but timed out and became a receiver.
274    */
275   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
276       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
277       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
278       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
279       RTP_TYPE_SOURCE);
280
281   /**
282    * RTPSession::on-sending-rtcp
283    * @session: the object which received the signal
284    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
285    * @early: %TRUE if the packet is early, %FALSE if it is regular
286    *
287    * This signal is emitted before sending an RTCP packet, it can be used
288    * to add extra RTCP Packets.
289    *
290    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
291    * if suppressing it is acceptable
292    */
293   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
294       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
295       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
296       accumulate_trues, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2,
297       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
298
299   /**
300    * RTPSession::on-feedback-rtcp:
301    * @session: the object which received the signal
302    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
303    *  %GST_RTCP_TYPE_RTPFB
304    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
305    * @sender_ssrc: The SSRC of the sender
306    * @media_ssrc: The SSRC of the media this refers to
307    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
308    * there was no FCI
309    *
310    * Notify that a RTCP feedback packet has been received
311    */
312   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
313       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
314       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
315       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 5, G_TYPE_UINT,
316       G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, GST_TYPE_BUFFER);
317
318   /**
319    * RTPSession::send-rtcp:
320    * @session: the object which received the signal
321    * @max_delay: The maximum delay after which the feedback will not be useful
322    *  anymore
323    *
324    * Requests that the #RTPSession initiate a new RTCP packet as soon as
325    * possible within the requested delay.
326    *
327    * This sets feedback to %TRUE if not already done before.
328    */
329   rtp_session_signals[SIGNAL_SEND_RTCP] =
330       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
331       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
332       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
333       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
334
335   /**
336    * RTPSession::send-rtcp-full:
337    * @session: the object which received the signal
338    * @max_delay: The maximum delay after which the feedback will not be useful
339    *  anymore
340    *
341    * Requests that the #RTPSession initiate a new RTCP packet as soon as
342    * possible within the requested delay.
343    *
344    * This sets feedback to %TRUE if not already done before.
345    *
346    * Returns: TRUE if the new RTCP packet could be scheduled within the
347    * requested delay, FALSE otherwise.
348    *
349    * Since: 1.6
350    */
351   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
352       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
353       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
354       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
355       g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
356
357   /**
358    * RTPSession::on-receiving-rtcp
359    * @session: the object which received the signal
360    * @buffer: the #GstBuffer containing the RTCP packet that was received
361    *
362    * This signal is emitted when receiving an RTCP packet before it is handled
363    * by the session. It can be used to extract custom information from RTCP packets.
364    *
365    * Since: 1.6
366    */
367   rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
368       g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
369       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
370       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
371       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
372
373   /**
374    * RTPSession::on-new-sender-ssrc:
375    * @session: the object which received the signal
376    * @src: the new sender RTPSource
377    *
378    * Notify of a new sender SSRC that entered @session.
379    *
380    * Since: 1.8
381    */
382   rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
383       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
384       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc),
385       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
386       RTP_TYPE_SOURCE);
387
388   /**
389    * RTPSession::on-sender-ssrc-active:
390    * @session: the object which received the signal
391    * @src: the active sender RTPSource
392    *
393    * Notify of a sender SSRC that is active, i.e., sending RTCP.
394    *
395    * Since: 1.8
396    */
397   rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
398       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
399       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass,
400           on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
401       G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
402
403   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
404       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
405           "The internal SSRC used for the session (deprecated)",
406           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407
408   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
409       g_param_spec_object ("internal-source", "Internal Source",
410           "The internal source element of the session (deprecated)",
411           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
412
413   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
414       g_param_spec_double ("bandwidth", "Bandwidth",
415           "The bandwidth of the session (0 for auto-discover)",
416           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
417           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
418
419   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
420       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
421           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
422           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
423           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424
425   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
426       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
427           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
428           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
429           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
430
431   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
432       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
433           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
434           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
435           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
436
437   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
438       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
439           "The maximum size of the RTCP packets",
440           16, G_MAXINT16, DEFAULT_RTCP_MTU,
441           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442
443   g_object_class_install_property (gobject_class, PROP_SDES,
444       g_param_spec_boxed ("sdes", "SDES",
445           "The SDES items of this session",
446           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447
448   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
449       g_param_spec_uint ("num-sources", "Num Sources",
450           "The number of sources in the session", 0, G_MAXUINT,
451           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
452
453   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
454       g_param_spec_uint ("num-active-sources", "Num Active Sources",
455           "The number of active sources in the session", 0, G_MAXUINT,
456           DEFAULT_NUM_ACTIVE_SOURCES,
457           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
458   /**
459    * RTPSource::sources
460    *
461    * Get a GValue Array of all sources in the session.
462    *
463    * <example>
464    * <title>Getting the #RTPSources of a session
465    * <programlisting>
466    * {
467    *   GValueArray *arr;
468    *   GValue *val;
469    *   guint i;
470    *
471    *   g_object_get (sess, "sources", &arr, NULL);
472    *
473    *   for (i = 0; i < arr->n_values; i++) {
474    *     RTPSource *source;
475    *
476    *     val = g_value_array_get_nth (arr, i);
477    *     source = g_value_get_object (val);
478    *   }
479    *   g_value_array_free (arr);
480    * }
481    * </programlisting>
482    * </example>
483    */
484   g_object_class_install_property (gobject_class, PROP_SOURCES,
485       g_param_spec_boxed ("sources", "Sources",
486           "An array of all known sources in the session",
487           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
488
489   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
490       g_param_spec_boolean ("favor-new", "Favor new sources",
491           "Resolve SSRC conflict in favor of new sources", FALSE,
492           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
493
494   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
495       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
496           "Minimum interval between Regular RTCP packet (in ns)",
497           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
498           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
499
500   g_object_class_install_property (gobject_class,
501       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
502       g_param_spec_uint64 ("rtcp-feedback-retention-window",
503           "RTCP Feedback retention window",
504           "Duration during which RTCP Feedback packets are retained (in ns)",
505           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
506           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
507
508   g_object_class_install_property (gobject_class,
509       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
510       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
511           "RTCP Immediate Feedback threshold",
512           "The maximum number of members of a RTP session for which immediate"
513           " feedback is used (DEPRECATED: has no effect and is not needed)",
514           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
515           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
516
517   g_object_class_install_property (gobject_class, PROP_PROBATION,
518       g_param_spec_uint ("probation", "Number of probations",
519           "Consecutive packet sequence numbers to accept the source",
520           0, G_MAXUINT, DEFAULT_PROBATION,
521           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
522
523   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
524       g_param_spec_uint ("max-dropout-time", "Max dropout time",
525           "The maximum time (milliseconds) of missing packets tolerated.",
526           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
527           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
528
529   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
530       g_param_spec_uint ("max-misorder-time", "Max misorder time",
531           "The maximum time (milliseconds) of misordered packets tolerated.",
532           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
533           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
534
535   /**
536    * RTPSession::stats:
537    *
538    * Various session statistics. This property returns a GstStructure
539    * with name application/x-rtp-session-stats with the following fields:
540    *
541    *  "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
542    *      dropped (due to bandwidth constraints)
543    *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
544    *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
545    *
546    * Since: 1.4
547    */
548   g_object_class_install_property (gobject_class, PROP_STATS,
549       g_param_spec_boxed ("stats", "Statistics",
550           "Various statistics", GST_TYPE_STRUCTURE,
551           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
552
553   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
554       g_param_spec_enum ("rtp-profile", "RTP Profile",
555           "RTP profile to use for this session", GST_TYPE_RTP_PROFILE,
556           DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
557
558   g_object_class_install_property (gobject_class, PROP_RTCP_REDUCED_SIZE,
559       g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size",
560           "Use Reduced Size RTCP for feedback packets",
561           DEFAULT_RTCP_REDUCED_SIZE,
562           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
563
564   klass->get_source_by_ssrc =
565       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
566   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
567
568   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
569 }
570
571 static void
572 rtp_session_init (RTPSession * sess)
573 {
574   gint i;
575   gchar *str;
576
577   g_mutex_init (&sess->lock);
578   sess->key = g_random_int ();
579   sess->mask_idx = 0;
580   sess->mask = 0;
581
582   /* TODO: We currently only use the first hash table but this is the
583    * beginning of an implementation for RFC2762
584    for (i = 0; i < 32; i++) {
585    */
586   for (i = 0; i < 1; i++) {
587     sess->ssrcs[i] =
588         g_hash_table_new_full (NULL, NULL, NULL,
589         (GDestroyNotify) g_object_unref);
590   }
591
592   rtp_stats_init_defaults (&sess->stats);
593   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
594   rtp_stats_set_min_interval (&sess->stats,
595       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
596
597   sess->recalc_bandwidth = TRUE;
598   sess->bandwidth = DEFAULT_BANDWIDTH;
599   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
600   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
601   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
602
603   /* default UDP header length */
604   sess->header_len = 28;
605   sess->mtu = DEFAULT_RTCP_MTU;
606
607   sess->probation = DEFAULT_PROBATION;
608   sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
609   sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
610
611   /* some default SDES entries */
612   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
613
614   /* we do not want to leak details like the username or hostname here */
615   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
616   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
617   g_free (str);
618
619 #if 0
620   /* we do not want to leak the user's real name here */
621   str = g_strdup_printf ("Anon%u", g_random_int ());
622   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
623   g_free (str);
624 #endif
625
626   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
627
628   /* this is the SSRC we suggest */
629   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
630   sess->internal_ssrc_set = FALSE;
631
632   sess->first_rtcp = TRUE;
633   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
634   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
635   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
636   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
637
638   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
639   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
640   sess->rtcp_immediate_feedback_threshold =
641       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
642   sess->rtp_profile = DEFAULT_RTP_PROFILE;
643   sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
644
645   sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
646
647   sess->is_doing_ptp = TRUE;
648 }
649
650 static void
651 rtp_session_finalize (GObject * object)
652 {
653   RTPSession *sess;
654   gint i;
655
656   sess = RTP_SESSION_CAST (object);
657
658   gst_structure_free (sess->sdes);
659
660   g_list_free_full (sess->conflicting_addresses,
661       (GDestroyNotify) rtp_conflicting_address_free);
662
663   /* TODO: Change this again when implementing RFC 2762
664    * for (i = 0; i < 32; i++)
665    */
666   for (i = 0; i < 1; i++)
667     g_hash_table_destroy (sess->ssrcs[i]);
668
669   g_mutex_clear (&sess->lock);
670
671   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
672 }
673
674 static void
675 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
676 {
677   GValue value = { 0 };
678
679   g_value_init (&value, RTP_TYPE_SOURCE);
680   g_value_take_object (&value, source);
681   /* copies the value */
682   g_value_array_append (arr, &value);
683 }
684
685 static GValueArray *
686 rtp_session_create_sources (RTPSession * sess)
687 {
688   GValueArray *res;
689   guint size;
690
691   RTP_SESSION_LOCK (sess);
692   /* get number of elements in the table */
693   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
694   /* create the result value array */
695   res = g_value_array_new (size);
696
697   /* and copy all values into the array */
698   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
699   RTP_SESSION_UNLOCK (sess);
700
701   return res;
702 }
703
704 static GstStructure *
705 rtp_session_create_stats (RTPSession * sess)
706 {
707   GstStructure *s;
708
709   s = gst_structure_new ("application/x-rtp-session-stats",
710       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
711       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
712       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
713
714   return s;
715 }
716
717 static void
718 rtp_session_set_property (GObject * object, guint prop_id,
719     const GValue * value, GParamSpec * pspec)
720 {
721   RTPSession *sess;
722
723   sess = RTP_SESSION (object);
724
725   switch (prop_id) {
726     case PROP_INTERNAL_SSRC:
727       RTP_SESSION_LOCK (sess);
728       sess->suggested_ssrc = g_value_get_uint (value);
729       sess->internal_ssrc_set = TRUE;
730       sess->internal_ssrc_from_caps_or_property = TRUE;
731       RTP_SESSION_UNLOCK (sess);
732       if (sess->callbacks.reconfigure)
733         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
734       break;
735     case PROP_BANDWIDTH:
736       RTP_SESSION_LOCK (sess);
737       sess->bandwidth = g_value_get_double (value);
738       sess->recalc_bandwidth = TRUE;
739       RTP_SESSION_UNLOCK (sess);
740       break;
741     case PROP_RTCP_FRACTION:
742       RTP_SESSION_LOCK (sess);
743       sess->rtcp_bandwidth = g_value_get_double (value);
744       sess->recalc_bandwidth = TRUE;
745       RTP_SESSION_UNLOCK (sess);
746       break;
747     case PROP_RTCP_RR_BANDWIDTH:
748       RTP_SESSION_LOCK (sess);
749       sess->rtcp_rr_bandwidth = g_value_get_int (value);
750       sess->recalc_bandwidth = TRUE;
751       RTP_SESSION_UNLOCK (sess);
752       break;
753     case PROP_RTCP_RS_BANDWIDTH:
754       RTP_SESSION_LOCK (sess);
755       sess->rtcp_rs_bandwidth = g_value_get_int (value);
756       sess->recalc_bandwidth = TRUE;
757       RTP_SESSION_UNLOCK (sess);
758       break;
759     case PROP_RTCP_MTU:
760       sess->mtu = g_value_get_uint (value);
761       break;
762     case PROP_SDES:
763       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
764       break;
765     case PROP_FAVOR_NEW:
766       sess->favor_new = g_value_get_boolean (value);
767       break;
768     case PROP_RTCP_MIN_INTERVAL:
769       rtp_stats_set_min_interval (&sess->stats,
770           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
771       /* trigger reconsideration */
772       RTP_SESSION_LOCK (sess);
773       sess->next_rtcp_check_time = 0;
774       RTP_SESSION_UNLOCK (sess);
775       if (sess->callbacks.reconsider)
776         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
777       break;
778     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
779       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
780       break;
781     case PROP_PROBATION:
782       sess->probation = g_value_get_uint (value);
783       break;
784     case PROP_MAX_DROPOUT_TIME:
785       sess->max_dropout_time = g_value_get_uint (value);
786       break;
787     case PROP_MAX_MISORDER_TIME:
788       sess->max_misorder_time = g_value_get_uint (value);
789       break;
790     case PROP_RTP_PROFILE:
791       sess->rtp_profile = g_value_get_enum (value);
792       /* trigger reconsideration */
793       RTP_SESSION_LOCK (sess);
794       sess->next_rtcp_check_time = 0;
795       RTP_SESSION_UNLOCK (sess);
796       if (sess->callbacks.reconsider)
797         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
798       break;
799     case PROP_RTCP_REDUCED_SIZE:
800       sess->reduced_size_rtcp = g_value_get_boolean (value);
801       break;
802     default:
803       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
804       break;
805   }
806 }
807
808 static void
809 rtp_session_get_property (GObject * object, guint prop_id,
810     GValue * value, GParamSpec * pspec)
811 {
812   RTPSession *sess;
813
814   sess = RTP_SESSION (object);
815
816   switch (prop_id) {
817     case PROP_INTERNAL_SSRC:
818       g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL));
819       break;
820     case PROP_INTERNAL_SOURCE:
821       /* FIXME, return a random source */
822       g_value_set_object (value, NULL);
823       break;
824     case PROP_BANDWIDTH:
825       g_value_set_double (value, sess->bandwidth);
826       break;
827     case PROP_RTCP_FRACTION:
828       g_value_set_double (value, sess->rtcp_bandwidth);
829       break;
830     case PROP_RTCP_RR_BANDWIDTH:
831       g_value_set_int (value, sess->rtcp_rr_bandwidth);
832       break;
833     case PROP_RTCP_RS_BANDWIDTH:
834       g_value_set_int (value, sess->rtcp_rs_bandwidth);
835       break;
836     case PROP_RTCP_MTU:
837       g_value_set_uint (value, sess->mtu);
838       break;
839     case PROP_SDES:
840       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
841       break;
842     case PROP_NUM_SOURCES:
843       g_value_set_uint (value, rtp_session_get_num_sources (sess));
844       break;
845     case PROP_NUM_ACTIVE_SOURCES:
846       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
847       break;
848     case PROP_SOURCES:
849       g_value_take_boxed (value, rtp_session_create_sources (sess));
850       break;
851     case PROP_FAVOR_NEW:
852       g_value_set_boolean (value, sess->favor_new);
853       break;
854     case PROP_RTCP_MIN_INTERVAL:
855       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
856       break;
857     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
858       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
859       break;
860     case PROP_PROBATION:
861       g_value_set_uint (value, sess->probation);
862       break;
863     case PROP_MAX_DROPOUT_TIME:
864       g_value_set_uint (value, sess->max_dropout_time);
865       break;
866     case PROP_MAX_MISORDER_TIME:
867       g_value_set_uint (value, sess->max_misorder_time);
868       break;
869     case PROP_STATS:
870       g_value_take_boxed (value, rtp_session_create_stats (sess));
871       break;
872     case PROP_RTP_PROFILE:
873       g_value_set_enum (value, sess->rtp_profile);
874       break;
875     case PROP_RTCP_REDUCED_SIZE:
876       g_value_set_boolean (value, sess->reduced_size_rtcp);
877       break;
878     default:
879       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
880       break;
881   }
882 }
883
884 static void
885 on_new_ssrc (RTPSession * sess, RTPSource * source)
886 {
887   g_object_ref (source);
888   RTP_SESSION_UNLOCK (sess);
889   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
890   RTP_SESSION_LOCK (sess);
891   g_object_unref (source);
892 }
893
894 static void
895 on_ssrc_collision (RTPSession * sess, RTPSource * source)
896 {
897   g_object_ref (source);
898   RTP_SESSION_UNLOCK (sess);
899   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
900       source);
901   RTP_SESSION_LOCK (sess);
902   g_object_unref (source);
903 }
904
905 static void
906 on_ssrc_validated (RTPSession * sess, RTPSource * source)
907 {
908   g_object_ref (source);
909   RTP_SESSION_UNLOCK (sess);
910   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
911       source);
912   RTP_SESSION_LOCK (sess);
913   g_object_unref (source);
914 }
915
916 static void
917 on_ssrc_active (RTPSession * sess, RTPSource * source)
918 {
919   g_object_ref (source);
920   RTP_SESSION_UNLOCK (sess);
921   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
922   RTP_SESSION_LOCK (sess);
923   g_object_unref (source);
924 }
925
926 static void
927 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
928 {
929   g_object_ref (source);
930   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
931   RTP_SESSION_UNLOCK (sess);
932   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
933   RTP_SESSION_LOCK (sess);
934   g_object_unref (source);
935 }
936
937 static void
938 on_bye_ssrc (RTPSession * sess, RTPSource * source)
939 {
940   g_object_ref (source);
941   RTP_SESSION_UNLOCK (sess);
942   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
943   RTP_SESSION_LOCK (sess);
944   g_object_unref (source);
945 }
946
947 static void
948 on_bye_timeout (RTPSession * sess, RTPSource * source)
949 {
950   g_object_ref (source);
951   RTP_SESSION_UNLOCK (sess);
952   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
953   RTP_SESSION_LOCK (sess);
954   g_object_unref (source);
955 }
956
957 static void
958 on_timeout (RTPSession * sess, RTPSource * source)
959 {
960   g_object_ref (source);
961   RTP_SESSION_UNLOCK (sess);
962   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
963   RTP_SESSION_LOCK (sess);
964   g_object_unref (source);
965 }
966
967 static void
968 on_sender_timeout (RTPSession * sess, RTPSource * source)
969 {
970   g_object_ref (source);
971   RTP_SESSION_UNLOCK (sess);
972   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
973       source);
974   RTP_SESSION_LOCK (sess);
975   g_object_unref (source);
976 }
977
978 static void
979 on_new_sender_ssrc (RTPSession * sess, RTPSource * source)
980 {
981   g_object_ref (source);
982   RTP_SESSION_UNLOCK (sess);
983   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
984       source);
985   RTP_SESSION_LOCK (sess);
986   g_object_unref (source);
987 }
988
989 static void
990 on_sender_ssrc_active (RTPSession * sess, RTPSource * source)
991 {
992   g_object_ref (source);
993   RTP_SESSION_UNLOCK (sess);
994   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
995       source);
996   RTP_SESSION_LOCK (sess);
997   g_object_unref (source);
998 }
999
1000 /**
1001  * rtp_session_new:
1002  *
1003  * Create a new session object.
1004  *
1005  * Returns: a new #RTPSession. g_object_unref() after usage.
1006  */
1007 RTPSession *
1008 rtp_session_new (void)
1009 {
1010   RTPSession *sess;
1011
1012   sess = g_object_new (RTP_TYPE_SESSION, NULL);
1013
1014   return sess;
1015 }
1016
1017 /**
1018  * rtp_session_set_callbacks:
1019  * @sess: an #RTPSession
1020  * @callbacks: callbacks to configure
1021  * @user_data: user data passed in the callbacks
1022  *
1023  * Configure a set of callbacks to be notified of actions.
1024  */
1025 void
1026 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
1027     gpointer user_data)
1028 {
1029   g_return_if_fail (RTP_IS_SESSION (sess));
1030
1031   if (callbacks->process_rtp) {
1032     sess->callbacks.process_rtp = callbacks->process_rtp;
1033     sess->process_rtp_user_data = user_data;
1034   }
1035   if (callbacks->send_rtp) {
1036     sess->callbacks.send_rtp = callbacks->send_rtp;
1037     sess->send_rtp_user_data = user_data;
1038   }
1039   if (callbacks->send_rtcp) {
1040     sess->callbacks.send_rtcp = callbacks->send_rtcp;
1041     sess->send_rtcp_user_data = user_data;
1042   }
1043   if (callbacks->sync_rtcp) {
1044     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
1045     sess->sync_rtcp_user_data = user_data;
1046   }
1047   if (callbacks->clock_rate) {
1048     sess->callbacks.clock_rate = callbacks->clock_rate;
1049     sess->clock_rate_user_data = user_data;
1050   }
1051   if (callbacks->reconsider) {
1052     sess->callbacks.reconsider = callbacks->reconsider;
1053     sess->reconsider_user_data = user_data;
1054   }
1055   if (callbacks->request_key_unit) {
1056     sess->callbacks.request_key_unit = callbacks->request_key_unit;
1057     sess->request_key_unit_user_data = user_data;
1058   }
1059   if (callbacks->request_time) {
1060     sess->callbacks.request_time = callbacks->request_time;
1061     sess->request_time_user_data = user_data;
1062   }
1063   if (callbacks->notify_nack) {
1064     sess->callbacks.notify_nack = callbacks->notify_nack;
1065     sess->notify_nack_user_data = user_data;
1066   }
1067   if (callbacks->reconfigure) {
1068     sess->callbacks.reconfigure = callbacks->reconfigure;
1069     sess->reconfigure_user_data = user_data;
1070   }
1071 }
1072
1073 /**
1074  * rtp_session_set_process_rtp_callback:
1075  * @sess: an #RTPSession
1076  * @callback: callback to set
1077  * @user_data: user data passed in the callback
1078  *
1079  * Configure only the process_rtp callback to be notified of the process_rtp action.
1080  */
1081 void
1082 rtp_session_set_process_rtp_callback (RTPSession * sess,
1083     RTPSessionProcessRTP callback, gpointer user_data)
1084 {
1085   g_return_if_fail (RTP_IS_SESSION (sess));
1086
1087   sess->callbacks.process_rtp = callback;
1088   sess->process_rtp_user_data = user_data;
1089 }
1090
1091 /**
1092  * rtp_session_set_send_rtp_callback:
1093  * @sess: an #RTPSession
1094  * @callback: callback to set
1095  * @user_data: user data passed in the callback
1096  *
1097  * Configure only the send_rtp callback to be notified of the send_rtp action.
1098  */
1099 void
1100 rtp_session_set_send_rtp_callback (RTPSession * sess,
1101     RTPSessionSendRTP callback, gpointer user_data)
1102 {
1103   g_return_if_fail (RTP_IS_SESSION (sess));
1104
1105   sess->callbacks.send_rtp = callback;
1106   sess->send_rtp_user_data = user_data;
1107 }
1108
1109 /**
1110  * rtp_session_set_send_rtcp_callback:
1111  * @sess: an #RTPSession
1112  * @callback: callback to set
1113  * @user_data: user data passed in the callback
1114  *
1115  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
1116  */
1117 void
1118 rtp_session_set_send_rtcp_callback (RTPSession * sess,
1119     RTPSessionSendRTCP callback, gpointer user_data)
1120 {
1121   g_return_if_fail (RTP_IS_SESSION (sess));
1122
1123   sess->callbacks.send_rtcp = callback;
1124   sess->send_rtcp_user_data = user_data;
1125 }
1126
1127 /**
1128  * rtp_session_set_sync_rtcp_callback:
1129  * @sess: an #RTPSession
1130  * @callback: callback to set
1131  * @user_data: user data passed in the callback
1132  *
1133  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
1134  */
1135 void
1136 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
1137     RTPSessionSyncRTCP callback, gpointer user_data)
1138 {
1139   g_return_if_fail (RTP_IS_SESSION (sess));
1140
1141   sess->callbacks.sync_rtcp = callback;
1142   sess->sync_rtcp_user_data = user_data;
1143 }
1144
1145 /**
1146  * rtp_session_set_clock_rate_callback:
1147  * @sess: an #RTPSession
1148  * @callback: callback to set
1149  * @user_data: user data passed in the callback
1150  *
1151  * Configure only the clock_rate callback to be notified of the clock_rate action.
1152  */
1153 void
1154 rtp_session_set_clock_rate_callback (RTPSession * sess,
1155     RTPSessionClockRate callback, gpointer user_data)
1156 {
1157   g_return_if_fail (RTP_IS_SESSION (sess));
1158
1159   sess->callbacks.clock_rate = callback;
1160   sess->clock_rate_user_data = user_data;
1161 }
1162
1163 /**
1164  * rtp_session_set_reconsider_callback:
1165  * @sess: an #RTPSession
1166  * @callback: callback to set
1167  * @user_data: user data passed in the callback
1168  *
1169  * Configure only the reconsider callback to be notified of the reconsider action.
1170  */
1171 void
1172 rtp_session_set_reconsider_callback (RTPSession * sess,
1173     RTPSessionReconsider callback, gpointer user_data)
1174 {
1175   g_return_if_fail (RTP_IS_SESSION (sess));
1176
1177   sess->callbacks.reconsider = callback;
1178   sess->reconsider_user_data = user_data;
1179 }
1180
1181 /**
1182  * rtp_session_set_request_time_callback:
1183  * @sess: an #RTPSession
1184  * @callback: callback to set
1185  * @user_data: user data passed in the callback
1186  *
1187  * Configure only the request_time callback
1188  */
1189 void
1190 rtp_session_set_request_time_callback (RTPSession * sess,
1191     RTPSessionRequestTime callback, gpointer user_data)
1192 {
1193   g_return_if_fail (RTP_IS_SESSION (sess));
1194
1195   sess->callbacks.request_time = callback;
1196   sess->request_time_user_data = user_data;
1197 }
1198
1199 /**
1200  * rtp_session_set_bandwidth:
1201  * @sess: an #RTPSession
1202  * @bandwidth: the bandwidth allocated
1203  *
1204  * Set the session bandwidth in bytes per second.
1205  */
1206 void
1207 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1208 {
1209   g_return_if_fail (RTP_IS_SESSION (sess));
1210
1211   RTP_SESSION_LOCK (sess);
1212   sess->stats.bandwidth = bandwidth;
1213   RTP_SESSION_UNLOCK (sess);
1214 }
1215
1216 /**
1217  * rtp_session_get_bandwidth:
1218  * @sess: an #RTPSession
1219  *
1220  * Get the session bandwidth.
1221  *
1222  * Returns: the session bandwidth.
1223  */
1224 gdouble
1225 rtp_session_get_bandwidth (RTPSession * sess)
1226 {
1227   gdouble result;
1228
1229   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1230
1231   RTP_SESSION_LOCK (sess);
1232   result = sess->stats.bandwidth;
1233   RTP_SESSION_UNLOCK (sess);
1234
1235   return result;
1236 }
1237
1238 /**
1239  * rtp_session_set_rtcp_fraction:
1240  * @sess: an #RTPSession
1241  * @bandwidth: the RTCP bandwidth
1242  *
1243  * Set the bandwidth in bytes per second that should be used for RTCP
1244  * messages.
1245  */
1246 void
1247 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1248 {
1249   g_return_if_fail (RTP_IS_SESSION (sess));
1250
1251   RTP_SESSION_LOCK (sess);
1252   sess->stats.rtcp_bandwidth = bandwidth;
1253   RTP_SESSION_UNLOCK (sess);
1254 }
1255
1256 /**
1257  * rtp_session_get_rtcp_fraction:
1258  * @sess: an #RTPSession
1259  *
1260  * Get the session bandwidth used for RTCP.
1261  *
1262  * Returns: The bandwidth used for RTCP messages.
1263  */
1264 gdouble
1265 rtp_session_get_rtcp_fraction (RTPSession * sess)
1266 {
1267   gdouble result;
1268
1269   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1270
1271   RTP_SESSION_LOCK (sess);
1272   result = sess->stats.rtcp_bandwidth;
1273   RTP_SESSION_UNLOCK (sess);
1274
1275   return result;
1276 }
1277
1278 /**
1279  * rtp_session_get_sdes_struct:
1280  * @sess: an #RTSPSession
1281  *
1282  * Get the SDES data as a #GstStructure
1283  *
1284  * Returns: a GstStructure with SDES items for @sess. This function returns a
1285  * copy of the SDES structure, use gst_structure_free() after usage.
1286  */
1287 GstStructure *
1288 rtp_session_get_sdes_struct (RTPSession * sess)
1289 {
1290   GstStructure *result = NULL;
1291
1292   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1293
1294   RTP_SESSION_LOCK (sess);
1295   if (sess->sdes)
1296     result = gst_structure_copy (sess->sdes);
1297   RTP_SESSION_UNLOCK (sess);
1298
1299   return result;
1300 }
1301
1302 /**
1303  * rtp_session_set_sdes_struct:
1304  * @sess: an #RTSPSession
1305  * @sdes: a #GstStructure
1306  *
1307  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1308  */
1309 void
1310 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1311 {
1312   g_return_if_fail (sdes);
1313   g_return_if_fail (RTP_IS_SESSION (sess));
1314
1315   RTP_SESSION_LOCK (sess);
1316   if (sess->sdes)
1317     gst_structure_free (sess->sdes);
1318   sess->sdes = gst_structure_copy (sdes);
1319   RTP_SESSION_UNLOCK (sess);
1320 }
1321
1322 static GstFlowReturn
1323 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1324 {
1325   GstFlowReturn result = GST_FLOW_OK;
1326
1327   if (source->internal) {
1328     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1329
1330     RTP_SESSION_UNLOCK (session);
1331
1332     if (session->callbacks.send_rtp)
1333       result =
1334           session->callbacks.send_rtp (session, source, data,
1335           session->send_rtp_user_data);
1336     else {
1337       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1338     }
1339   } else {
1340     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1341     RTP_SESSION_UNLOCK (session);
1342
1343     if (session->callbacks.process_rtp)
1344       result =
1345           session->callbacks.process_rtp (session, source,
1346           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1347     else
1348       gst_buffer_unref (GST_BUFFER_CAST (data));
1349   }
1350   RTP_SESSION_LOCK (session);
1351
1352   return result;
1353 }
1354
1355 static gint
1356 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1357 {
1358   gint result;
1359
1360   RTP_SESSION_UNLOCK (session);
1361
1362   if (session->callbacks.clock_rate)
1363     result =
1364         session->callbacks.clock_rate (session, pt,
1365         session->clock_rate_user_data);
1366   else
1367     result = -1;
1368
1369   RTP_SESSION_LOCK (session);
1370
1371   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1372
1373   return result;
1374 }
1375
1376 static RTPSourceCallbacks callbacks = {
1377   (RTPSourcePushRTP) source_push_rtp,
1378   (RTPSourceClockRate) source_clock_rate,
1379 };
1380
1381
1382 /**
1383  * rtp_session_find_conflicting_address:
1384  * @session: The session the packet came in
1385  * @address: address to check for
1386  * @time: The time when the packet that is possibly in conflict arrived
1387  *
1388  * Checks if an address which has a conflict is already known. If it is
1389  * a known conflict, remember the time
1390  *
1391  * Returns: TRUE if it was a known conflict, FALSE otherwise
1392  */
1393 static gboolean
1394 rtp_session_find_conflicting_address (RTPSession * session,
1395     GSocketAddress * address, GstClockTime time)
1396 {
1397   return find_conflicting_address (session->conflicting_addresses, address,
1398       time);
1399 }
1400
1401 /**
1402  * rtp_session_add_conflicting_address:
1403  * @session: The session the packet came in
1404  * @address: address to remember
1405  * @time: The time when the packet that is in conflict arrived
1406  *
1407  * Adds a new conflict address
1408  */
1409 static void
1410 rtp_session_add_conflicting_address (RTPSession * sess,
1411     GSocketAddress * address, GstClockTime time)
1412 {
1413   sess->conflicting_addresses =
1414       add_conflicting_address (sess->conflicting_addresses, address, time);
1415 }
1416
1417
1418 static gboolean
1419 check_collision (RTPSession * sess, RTPSource * source,
1420     RTPPacketInfo * pinfo, gboolean rtp)
1421 {
1422   guint32 ssrc;
1423
1424   /* If we have no pinfo address, we can't do collision checking */
1425   if (!pinfo->address)
1426     return FALSE;
1427
1428   ssrc = rtp_source_get_ssrc (source);
1429
1430   if (!source->internal) {
1431     GSocketAddress *from;
1432
1433     /* This is not our local source, but lets check if two remote
1434      * source collide */
1435     if (rtp) {
1436       from = source->rtp_from;
1437     } else {
1438       from = source->rtcp_from;
1439     }
1440
1441     if (from) {
1442       if (__g_socket_address_equal (from, pinfo->address)) {
1443         /* Address is the same */
1444         return FALSE;
1445       } else {
1446         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1447         if (sess->favor_new) {
1448           if (rtp_source_find_conflicting_address (source,
1449                   pinfo->address, pinfo->current_time)) {
1450             gchar *buf1;
1451
1452             buf1 = __g_socket_address_to_string (pinfo->address);
1453             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1454                 buf1);
1455             g_free (buf1);
1456
1457             return TRUE;
1458           } else {
1459             gchar *buf1, *buf2;
1460
1461             /* Current address is not a known conflict, lets assume this is
1462              * a new source. Save old address in possible conflict list
1463              */
1464             rtp_source_add_conflicting_address (source, from,
1465                 pinfo->current_time);
1466
1467             buf1 = __g_socket_address_to_string (from);
1468             buf2 = __g_socket_address_to_string (pinfo->address);
1469
1470             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1471                 " saving old as known conflict", ssrc, buf1, buf2);
1472
1473             if (rtp)
1474               rtp_source_set_rtp_from (source, pinfo->address);
1475             else
1476               rtp_source_set_rtcp_from (source, pinfo->address);
1477
1478             g_free (buf1);
1479             g_free (buf2);
1480
1481             return FALSE;
1482           }
1483         } else {
1484           /* Don't need to save old addresses, we ignore new sources */
1485           return TRUE;
1486         }
1487       }
1488     } else {
1489       /* We don't already have a from address for RTP, just set it */
1490       if (rtp)
1491         rtp_source_set_rtp_from (source, pinfo->address);
1492       else
1493         rtp_source_set_rtcp_from (source, pinfo->address);
1494       return FALSE;
1495     }
1496
1497     /* FIXME: Log 3rd party collision somehow
1498      * Maybe should be done in upper layer, only the SDES can tell us
1499      * if its a collision or a loop
1500      */
1501   } else {
1502     /* This is sending with our ssrc, is it an address we already know */
1503     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1504             pinfo->current_time)) {
1505       /* Its a known conflict, its probably a loop, not a collision
1506        * lets just drop the incoming packet
1507        */
1508       GST_DEBUG ("Our packets are being looped back to us, dropping");
1509     } else {
1510       /* Its a new collision, lets change our SSRC */
1511       rtp_session_add_conflicting_address (sess, pinfo->address,
1512           pinfo->current_time);
1513
1514       GST_DEBUG ("Collision for SSRC %x", ssrc);
1515       /* mark the source BYE */
1516       rtp_source_mark_bye (source, "SSRC Collision");
1517       /* if we were suggesting this SSRC, change to something else */
1518       if (sess->suggested_ssrc == ssrc) {
1519         sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1520         sess->internal_ssrc_set = TRUE;
1521       }
1522
1523       on_ssrc_collision (sess, source);
1524
1525       rtp_session_schedule_bye_locked (sess, pinfo->current_time);
1526     }
1527   }
1528
1529   return TRUE;
1530 }
1531
1532 typedef struct
1533 {
1534   gboolean is_doing_ptp;
1535   GSocketAddress *new_addr;
1536 } CompareAddrData;
1537
1538 /* check if the two given ip addr are the same (do not care about the port) */
1539 static gboolean
1540 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1541 {
1542   return
1543       g_inet_address_equal (g_inet_socket_address_get_address
1544       (G_INET_SOCKET_ADDRESS (a)),
1545       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1546 }
1547
1548 static void
1549 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1550     CompareAddrData * data)
1551 {
1552   /* only compare ip addr of remote sources which are also not closing */
1553   if (!source->internal && !source->closing && source->rtp_from) {
1554     /* look for the first rtp source */
1555     if (!data->new_addr)
1556       data->new_addr = source->rtp_from;
1557     /* compare current ip addr with the first one */
1558     else
1559       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1560   }
1561 }
1562
1563 static void
1564 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1565     CompareAddrData * data)
1566 {
1567   /* only compare ip addr of remote sources which are also not closing */
1568   if (!source->internal && !source->closing && source->rtcp_from) {
1569     /* look for the first rtcp source */
1570     if (!data->new_addr)
1571       data->new_addr = source->rtcp_from;
1572     else
1573       /* compare current ip addr with the first one */
1574       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1575   }
1576 }
1577
1578 /* loop over our non-internal source to know if the session
1579  * is doing point-to-point */
1580 static void
1581 session_update_ptp (RTPSession * sess)
1582 {
1583   /* to know if the session is doing point to point, the ip addr
1584    * of each non-internal (=remotes) source have to be compared
1585    * to each other.
1586    */
1587   gboolean is_doing_rtp_ptp;
1588   gboolean is_doing_rtcp_ptp;
1589   CompareAddrData data;
1590
1591   /* compare the first remote source's ip addr that receive rtp packets
1592    * with other remote rtp source.
1593    * it's enough because the session just needs to know if they are all
1594    * equals or not
1595    */
1596   data.is_doing_ptp = TRUE;
1597   data.new_addr = NULL;
1598   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1599       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1600   is_doing_rtp_ptp = data.is_doing_ptp;
1601
1602   /* same but about rtcp */
1603   data.is_doing_ptp = TRUE;
1604   data.new_addr = NULL;
1605   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1606       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1607   is_doing_rtcp_ptp = data.is_doing_ptp;
1608
1609   /* the session is doing point-to-point if all rtp remote have the same
1610    * ip addr and if all rtcp remote sources have the same ip addr */
1611   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1612
1613   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1614 }
1615
1616 static void
1617 add_source (RTPSession * sess, RTPSource * src)
1618 {
1619   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1620       GINT_TO_POINTER (src->ssrc), src);
1621   /* report the new source ASAP */
1622   src->generation = sess->generation;
1623   /* we have one more source now */
1624   sess->total_sources++;
1625   if (RTP_SOURCE_IS_ACTIVE (src))
1626     sess->stats.active_sources++;
1627   if (src->internal) {
1628     sess->stats.internal_sources++;
1629     if (!sess->internal_ssrc_from_caps_or_property
1630         && sess->suggested_ssrc != src->ssrc) {
1631       sess->suggested_ssrc = src->ssrc;
1632       sess->internal_ssrc_set = TRUE;
1633     }
1634   }
1635
1636   /* update point-to-point status */
1637   if (!src->internal)
1638     session_update_ptp (sess);
1639 }
1640
1641 static RTPSource *
1642 find_source (RTPSession * sess, guint32 ssrc)
1643 {
1644   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1645       GINT_TO_POINTER (ssrc));
1646 }
1647
1648 /* must be called with the session lock, the returned source needs to be
1649  * unreffed after usage. */
1650 static RTPSource *
1651 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1652     RTPPacketInfo * pinfo, gboolean rtp)
1653 {
1654   RTPSource *source;
1655
1656   source = find_source (sess, ssrc);
1657   if (source == NULL) {
1658     /* make new Source in probation and insert */
1659     source = rtp_source_new (ssrc);
1660
1661     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1662
1663     /* for RTP packets we need to set the source in probation. Receiving RTCP
1664      * packets of an SSRC, on the other hand, is a strong indication that we
1665      * are dealing with a valid source. */
1666     g_object_set (source, "probation", rtp ? sess->probation : 0,
1667         "max-dropout-time", sess->max_dropout_time, "max-misorder-time",
1668         sess->max_misorder_time, NULL);
1669
1670     /* store from address, if any */
1671     if (pinfo->address) {
1672       if (rtp)
1673         rtp_source_set_rtp_from (source, pinfo->address);
1674       else
1675         rtp_source_set_rtcp_from (source, pinfo->address);
1676     }
1677
1678     /* configure a callback on the source */
1679     rtp_source_set_callbacks (source, &callbacks, sess);
1680
1681     add_source (sess, source);
1682     *created = TRUE;
1683   } else {
1684     *created = FALSE;
1685     /* check for collision, this updates the address when not previously set */
1686     if (check_collision (sess, source, pinfo, rtp)) {
1687       return NULL;
1688     }
1689     /* Receiving RTCP packets of an SSRC is a strong indication that we
1690      * are dealing with a valid source. */
1691     if (!rtp)
1692       g_object_set (source, "probation", 0, NULL);
1693   }
1694   /* update last activity */
1695   source->last_activity = pinfo->current_time;
1696   if (rtp)
1697     source->last_rtp_activity = pinfo->current_time;
1698   g_object_ref (source);
1699
1700   return source;
1701 }
1702
1703 /* must be called with the session lock, the returned source needs to be
1704  * unreffed after usage. */
1705 static RTPSource *
1706 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1707     GstClockTime current_time)
1708 {
1709   RTPSource *source;
1710
1711   source = find_source (sess, ssrc);
1712   if (source == NULL) {
1713     /* make new internal Source and insert */
1714     source = rtp_source_new (ssrc);
1715
1716     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1717
1718     source->validated = TRUE;
1719     source->internal = TRUE;
1720     source->probation = FALSE;
1721     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1722     rtp_source_set_callbacks (source, &callbacks, sess);
1723
1724     add_source (sess, source);
1725     *created = TRUE;
1726   } else {
1727     *created = FALSE;
1728   }
1729   /* update last activity */
1730   if (current_time != GST_CLOCK_TIME_NONE) {
1731     source->last_activity = current_time;
1732     source->last_rtp_activity = current_time;
1733   }
1734   g_object_ref (source);
1735
1736   return source;
1737 }
1738
1739 /**
1740  * rtp_session_suggest_ssrc:
1741  * @sess: a #RTPSession
1742  * @is_random: if the suggested ssrc is random
1743  *
1744  * Suggest an unused SSRC in @sess.
1745  *
1746  * Returns: a free unused SSRC
1747  */
1748 guint32
1749 rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random)
1750 {
1751   guint32 result;
1752
1753   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1754
1755   RTP_SESSION_LOCK (sess);
1756   result = sess->suggested_ssrc;
1757   if (is_random)
1758     *is_random = !sess->internal_ssrc_set;
1759   RTP_SESSION_UNLOCK (sess);
1760
1761   return result;
1762 }
1763
1764 /**
1765  * rtp_session_add_source:
1766  * @sess: a #RTPSession
1767  * @src: #RTPSource to add
1768  *
1769  * Add @src to @session.
1770  *
1771  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1772  * existed in the session.
1773  */
1774 gboolean
1775 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1776 {
1777   gboolean result = FALSE;
1778   RTPSource *find;
1779
1780   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1781   g_return_val_if_fail (src != NULL, FALSE);
1782
1783   RTP_SESSION_LOCK (sess);
1784   find = find_source (sess, src->ssrc);
1785   if (find == NULL) {
1786     add_source (sess, src);
1787     result = TRUE;
1788   }
1789   RTP_SESSION_UNLOCK (sess);
1790
1791   return result;
1792 }
1793
1794 /**
1795  * rtp_session_get_num_sources:
1796  * @sess: an #RTPSession
1797  *
1798  * Get the number of sources in @sess.
1799  *
1800  * Returns: The number of sources in @sess.
1801  */
1802 guint
1803 rtp_session_get_num_sources (RTPSession * sess)
1804 {
1805   guint result;
1806
1807   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1808
1809   RTP_SESSION_LOCK (sess);
1810   result = sess->total_sources;
1811   RTP_SESSION_UNLOCK (sess);
1812
1813   return result;
1814 }
1815
1816 /**
1817  * rtp_session_get_num_active_sources:
1818  * @sess: an #RTPSession
1819  *
1820  * Get the number of active sources in @sess. A source is considered active when
1821  * it has been validated and has not yet received a BYE RTCP message.
1822  *
1823  * Returns: The number of active sources in @sess.
1824  */
1825 guint
1826 rtp_session_get_num_active_sources (RTPSession * sess)
1827 {
1828   guint result;
1829
1830   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1831
1832   RTP_SESSION_LOCK (sess);
1833   result = sess->stats.active_sources;
1834   RTP_SESSION_UNLOCK (sess);
1835
1836   return result;
1837 }
1838
1839 /**
1840  * rtp_session_get_source_by_ssrc:
1841  * @sess: an #RTPSession
1842  * @ssrc: an SSRC
1843  *
1844  * Find the source with @ssrc in @sess.
1845  *
1846  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1847  * g_object_unref() after usage.
1848  */
1849 RTPSource *
1850 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1851 {
1852   RTPSource *result;
1853
1854   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1855
1856   RTP_SESSION_LOCK (sess);
1857   result = find_source (sess, ssrc);
1858   if (result != NULL)
1859     g_object_ref (result);
1860   RTP_SESSION_UNLOCK (sess);
1861
1862   return result;
1863 }
1864
1865 /* should be called with the SESSION lock */
1866 static guint32
1867 rtp_session_create_new_ssrc (RTPSession * sess)
1868 {
1869   guint32 ssrc;
1870
1871   while (TRUE) {
1872     ssrc = g_random_int ();
1873
1874     /* see if it exists in the session, we're done if it doesn't */
1875     if (find_source (sess, ssrc) == NULL)
1876       break;
1877   }
1878   return ssrc;
1879 }
1880
1881
1882 /**
1883  * rtp_session_create_source:
1884  * @sess: an #RTPSession
1885  *
1886  * Create an #RTPSource for use in @sess. This function will create a source
1887  * with an ssrc that is currently not used by any participants in the session.
1888  *
1889  * Returns: an #RTPSource.
1890  */
1891 RTPSource *
1892 rtp_session_create_source (RTPSession * sess)
1893 {
1894   guint32 ssrc;
1895   RTPSource *source;
1896
1897   RTP_SESSION_LOCK (sess);
1898   ssrc = rtp_session_create_new_ssrc (sess);
1899   source = rtp_source_new (ssrc);
1900   rtp_source_set_callbacks (source, &callbacks, sess);
1901   /* we need an additional ref for the source in the hashtable */
1902   g_object_ref (source);
1903   add_source (sess, source);
1904   RTP_SESSION_UNLOCK (sess);
1905
1906   return source;
1907 }
1908
1909 static gboolean
1910 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
1911 {
1912   GstNetAddressMeta *meta;
1913
1914   /* get packet size including header overhead */
1915   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
1916   pinfo->packets++;
1917
1918   if (pinfo->rtp) {
1919     GstRTPBuffer rtp = { NULL };
1920
1921     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
1922       goto invalid_packet;
1923
1924     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
1925     if (idx == 0) {
1926       gint i;
1927
1928       /* only keep info for first buffer */
1929       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
1930       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
1931       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
1932       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1933       /* copy available csrc */
1934       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
1935       for (i = 0; i < pinfo->csrc_count; i++)
1936         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
1937     }
1938     gst_rtp_buffer_unmap (&rtp);
1939   }
1940
1941   if (idx == 0) {
1942     /* for netbuffer we can store the IP address to check for collisions */
1943     meta = gst_buffer_get_net_address_meta (*buffer);
1944     if (pinfo->address)
1945       g_object_unref (pinfo->address);
1946     if (meta) {
1947       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
1948     } else {
1949       pinfo->address = NULL;
1950     }
1951   }
1952   return TRUE;
1953
1954   /* ERRORS */
1955 invalid_packet:
1956   {
1957     GST_DEBUG ("invalid RTP packet received");
1958     return FALSE;
1959   }
1960 }
1961
1962 /* update the RTPPacketInfo structure with the current time and other bits
1963  * about the current buffer we are handling.
1964  * This function is typically called when a validated packet is received.
1965  * This function should be called with the SESSION_LOCK
1966  */
1967 static gboolean
1968 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
1969     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
1970     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
1971 {
1972   gboolean res;
1973
1974   pinfo->send = send;
1975   pinfo->rtp = rtp;
1976   pinfo->is_list = is_list;
1977   pinfo->data = data;
1978   pinfo->current_time = current_time;
1979   pinfo->running_time = running_time;
1980   pinfo->ntpnstime = ntpnstime;
1981   pinfo->header_len = sess->header_len;
1982   pinfo->bytes = 0;
1983   pinfo->payload_len = 0;
1984   pinfo->packets = 0;
1985
1986   if (is_list) {
1987     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
1988     res =
1989         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
1990         pinfo);
1991   } else {
1992     GstBuffer *buffer = GST_BUFFER_CAST (data);
1993     res = update_packet (&buffer, 0, pinfo);
1994   }
1995   return res;
1996 }
1997
1998 static void
1999 clean_packet_info (RTPPacketInfo * pinfo)
2000 {
2001   if (pinfo->address)
2002     g_object_unref (pinfo->address);
2003   if (pinfo->data) {
2004     gst_mini_object_unref (pinfo->data);
2005     pinfo->data = NULL;
2006   }
2007 }
2008
2009 static gboolean
2010 source_update_active (RTPSession * sess, RTPSource * source,
2011     gboolean prevactive)
2012 {
2013   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
2014   guint32 ssrc = source->ssrc;
2015
2016   if (prevactive == active)
2017     return FALSE;
2018
2019   if (active) {
2020     sess->stats.active_sources++;
2021     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
2022         sess->stats.active_sources);
2023   } else {
2024     sess->stats.active_sources--;
2025     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
2026         sess->stats.active_sources);
2027   }
2028   return TRUE;
2029 }
2030
2031 static gboolean
2032 source_update_sender (RTPSession * sess, RTPSource * source,
2033     gboolean prevsender)
2034 {
2035   gboolean sender = RTP_SOURCE_IS_SENDER (source);
2036   guint32 ssrc = source->ssrc;
2037
2038   if (prevsender == sender)
2039     return FALSE;
2040
2041   if (sender) {
2042     sess->stats.sender_sources++;
2043     if (source->internal)
2044       sess->stats.internal_sender_sources++;
2045     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
2046         sess->stats.sender_sources);
2047   } else {
2048     sess->stats.sender_sources--;
2049     if (source->internal)
2050       sess->stats.internal_sender_sources--;
2051     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
2052         sess->stats.sender_sources);
2053   }
2054   return TRUE;
2055 }
2056
2057 /**
2058  * rtp_session_process_rtp:
2059  * @sess: and #RTPSession
2060  * @buffer: an RTP buffer
2061  * @current_time: the current system time
2062  * @running_time: the running_time of @buffer
2063  *
2064  * Process an RTP buffer in the session manager. This function takes ownership
2065  * of @buffer.
2066  *
2067  * Returns: a #GstFlowReturn.
2068  */
2069 GstFlowReturn
2070 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
2071     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2072 {
2073   GstFlowReturn result;
2074   guint32 ssrc;
2075   RTPSource *source;
2076   gboolean created;
2077   gboolean prevsender, prevactive;
2078   RTPPacketInfo pinfo = { 0, };
2079   guint64 oldrate;
2080
2081   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2082   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2083
2084   RTP_SESSION_LOCK (sess);
2085
2086   /* update pinfo stats */
2087   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
2088           current_time, running_time, ntpnstime)) {
2089     GST_DEBUG ("invalid RTP packet received");
2090     RTP_SESSION_UNLOCK (sess);
2091     return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
2092   }
2093
2094   ssrc = pinfo.ssrc;
2095
2096   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
2097   if (!source)
2098     goto collision;
2099
2100   prevsender = RTP_SOURCE_IS_SENDER (source);
2101   prevactive = RTP_SOURCE_IS_ACTIVE (source);
2102   oldrate = source->bitrate;
2103
2104   /* let source process the packet */
2105   result = rtp_source_process_rtp (source, &pinfo);
2106
2107   /* source became active */
2108   if (source_update_active (sess, source, prevactive))
2109     on_ssrc_validated (sess, source);
2110
2111   source_update_sender (sess, source, prevsender);
2112
2113   if (oldrate != source->bitrate)
2114     sess->recalc_bandwidth = TRUE;
2115
2116   if (created)
2117     on_new_ssrc (sess, source);
2118
2119   if (source->validated) {
2120     gboolean created;
2121     gint i;
2122
2123     /* for validated sources, we add the CSRCs as well */
2124     for (i = 0; i < pinfo.csrc_count; i++) {
2125       guint32 csrc;
2126       RTPSource *csrc_src;
2127
2128       csrc = pinfo.csrcs[i];
2129
2130       /* get source */
2131       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
2132       if (!csrc_src)
2133         continue;
2134
2135       if (created) {
2136         GST_DEBUG ("created new CSRC: %08x", csrc);
2137         rtp_source_set_as_csrc (csrc_src);
2138         source_update_active (sess, csrc_src, FALSE);
2139         on_new_ssrc (sess, csrc_src);
2140       }
2141       g_object_unref (csrc_src);
2142     }
2143   }
2144   g_object_unref (source);
2145
2146   RTP_SESSION_UNLOCK (sess);
2147
2148   clean_packet_info (&pinfo);
2149
2150   return result;
2151
2152   /* ERRORS */
2153 collision:
2154   {
2155     RTP_SESSION_UNLOCK (sess);
2156     clean_packet_info (&pinfo);
2157     GST_DEBUG ("ignoring packet because its collisioning");
2158     return GST_FLOW_OK;
2159   }
2160 }
2161
2162 static void
2163 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2164     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2165 {
2166   guint count, i;
2167
2168   count = gst_rtcp_packet_get_rb_count (packet);
2169   for (i = 0; i < count; i++) {
2170     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2171     guint8 fractionlost;
2172     gint32 packetslost;
2173     RTPSource *src;
2174
2175     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2176         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2177
2178     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2179
2180     /* find our own source */
2181     src = find_source (sess, ssrc);
2182     if (src == NULL)
2183       continue;
2184
2185     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2186       /* only deal with report blocks for our session, we update the stats of
2187        * the sender of the RTCP message. We could also compare our stats against
2188        * the other sender to see if we are better or worse. */
2189       /* FIXME, need to keep track who the RB block is from */
2190       rtp_source_process_rb (source, pinfo->ntpnstime, fractionlost,
2191           packetslost, exthighestseq, jitter, lsr, dlsr);
2192     }
2193   }
2194   on_ssrc_active (sess, source);
2195 }
2196
2197 /* A Sender report contains statistics about how the sender is doing. This
2198  * includes timing informataion such as the relation between RTP and NTP
2199  * timestamps and the number of packets/bytes it sent to us.
2200  *
2201  * In this report is also included a set of report blocks related to how this
2202  * sender is receiving data (in case we (or somebody else) is also sending stuff
2203  * to it). This info includes the packet loss, jitter and seqnum. It also
2204  * contains information to calculate the round trip time (LSR/DLSR).
2205  */
2206 static void
2207 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2208     RTPPacketInfo * pinfo, gboolean * do_sync)
2209 {
2210   guint32 senderssrc, rtptime, packet_count, octet_count;
2211   guint64 ntptime;
2212   RTPSource *source;
2213   gboolean created, prevsender;
2214
2215   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2216       &packet_count, &octet_count);
2217
2218   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2219       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2220
2221   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2222   if (!source)
2223     return;
2224
2225   /* skip non-bye packets for sources that are marked BYE */
2226   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2227     goto out;
2228
2229   /* don't try to do lip-sync for sources that sent a BYE */
2230   if (RTP_SOURCE_IS_MARKED_BYE (source))
2231     *do_sync = FALSE;
2232   else
2233     *do_sync = TRUE;
2234
2235   prevsender = RTP_SOURCE_IS_SENDER (source);
2236
2237   /* first update the source */
2238   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2239       packet_count, octet_count);
2240
2241   source_update_sender (sess, source, prevsender);
2242
2243   if (created)
2244     on_new_ssrc (sess, source);
2245
2246   rtp_session_process_rb (sess, source, packet, pinfo);
2247
2248 out:
2249   g_object_unref (source);
2250 }
2251
2252 /* A receiver report contains statistics about how a receiver is doing. It
2253  * includes stuff like packet loss, jitter and the seqnum it received last. It
2254  * also contains info to calculate the round trip time.
2255  *
2256  * We are only interested in how the sender of this report is doing wrt to us.
2257  */
2258 static void
2259 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2260     RTPPacketInfo * pinfo)
2261 {
2262   guint32 senderssrc;
2263   RTPSource *source;
2264   gboolean created;
2265
2266   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2267
2268   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2269
2270   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2271   if (!source)
2272     return;
2273
2274   /* skip non-bye packets for sources that are marked BYE */
2275   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2276     goto out;
2277
2278   if (created)
2279     on_new_ssrc (sess, source);
2280
2281   rtp_session_process_rb (sess, source, packet, pinfo);
2282
2283 out:
2284   g_object_unref (source);
2285 }
2286
2287 /* Get SDES items and store them in the SSRC */
2288 static void
2289 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2290     RTPPacketInfo * pinfo)
2291 {
2292   guint items, i, j;
2293   gboolean more_items, more_entries;
2294
2295   items = gst_rtcp_packet_sdes_get_item_count (packet);
2296   GST_DEBUG ("got SDES packet with %d items", items);
2297
2298   more_items = gst_rtcp_packet_sdes_first_item (packet);
2299   i = 0;
2300   while (more_items) {
2301     guint32 ssrc;
2302     gboolean changed, created, prevactive;
2303     RTPSource *source;
2304     GstStructure *sdes;
2305
2306     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2307
2308     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2309
2310     changed = FALSE;
2311
2312     /* find src, no probation when dealing with RTCP */
2313     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2314     if (!source)
2315       return;
2316
2317     /* skip non-bye packets for sources that are marked BYE */
2318     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2319       goto next;
2320
2321     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2322
2323     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2324     j = 0;
2325     while (more_entries) {
2326       GstRTCPSDESType type;
2327       guint8 len;
2328       guint8 *data;
2329       gchar *name;
2330       gchar *value;
2331
2332       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2333
2334       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2335           data);
2336
2337       if (type == GST_RTCP_SDES_PRIV) {
2338         name = g_strndup ((const gchar *) &data[1], data[0]);
2339         len -= data[0] + 1;
2340         data += data[0] + 1;
2341       } else {
2342         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2343       }
2344
2345       value = g_strndup ((const gchar *) data, len);
2346
2347       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2348
2349       g_free (name);
2350       g_free (value);
2351
2352       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2353       j++;
2354     }
2355
2356     /* takes ownership of sdes */
2357     changed = rtp_source_set_sdes_struct (source, sdes);
2358
2359     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2360     source->validated = TRUE;
2361
2362     if (created)
2363       on_new_ssrc (sess, source);
2364
2365     /* source became active */
2366     if (source_update_active (sess, source, prevactive))
2367       on_ssrc_validated (sess, source);
2368
2369     if (changed)
2370       on_ssrc_sdes (sess, source);
2371
2372   next:
2373     g_object_unref (source);
2374
2375     more_items = gst_rtcp_packet_sdes_next_item (packet);
2376     i++;
2377   }
2378 }
2379
2380 /* BYE is sent when a client leaves the session
2381  */
2382 static void
2383 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2384     RTPPacketInfo * pinfo)
2385 {
2386   guint count, i;
2387   gchar *reason;
2388   gboolean reconsider = FALSE;
2389
2390   reason = gst_rtcp_packet_bye_get_reason (packet);
2391   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2392
2393   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2394   for (i = 0; i < count; i++) {
2395     guint32 ssrc;
2396     RTPSource *source;
2397     gboolean created, prevactive, prevsender;
2398     guint pmembers, members;
2399
2400     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2401     GST_DEBUG ("SSRC: %08x", ssrc);
2402
2403     /* find src and mark bye, no probation when dealing with RTCP */
2404     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2405     if (!source)
2406       return;
2407
2408     if (source->internal) {
2409       /* our own source, something weird with this packet */
2410       g_object_unref (source);
2411       continue;
2412     }
2413
2414     /* store time for when we need to time out this source */
2415     source->bye_time = pinfo->current_time;
2416
2417     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2418     prevsender = RTP_SOURCE_IS_SENDER (source);
2419
2420     /* mark the source BYE */
2421     rtp_source_mark_bye (source, reason);
2422
2423     pmembers = sess->stats.active_sources;
2424
2425     source_update_active (sess, source, prevactive);
2426     source_update_sender (sess, source, prevsender);
2427
2428     members = sess->stats.active_sources;
2429
2430     if (!sess->scheduled_bye && members < pmembers) {
2431       /* some members went away since the previous timeout estimate.
2432        * Perform reverse reconsideration but only when we are not scheduling a
2433        * BYE ourselves. */
2434       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2435           pinfo->current_time < sess->next_rtcp_check_time) {
2436         GstClockTime time_remaining;
2437
2438         /* Scale our next RTCP check time according to the change of numbers
2439          * of members. But only if a) this is the first RTCP, or b) this is not
2440          * a feedback session, or c) this is a feedback session but we schedule
2441          * for every RTCP interval (aka no t-rr-interval set).
2442          *
2443          * FIXME: a) and b) are not great as we will possibly go below Tmin
2444          * for non-feedback profiles and in case of a) below
2445          * Tmin/t-rr-interval in any case.
2446          */
2447         if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE ||
2448             !(sess->rtp_profile == GST_RTP_PROFILE_AVPF
2449                 || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) ||
2450             sess->next_rtcp_check_time - sess->last_rtcp_send_time ==
2451             sess->last_rtcp_interval) {
2452           time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2453           sess->next_rtcp_check_time =
2454               gst_util_uint64_scale (time_remaining, members, pmembers);
2455           sess->next_rtcp_check_time += pinfo->current_time;
2456         }
2457         sess->last_rtcp_interval =
2458             gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers);
2459
2460         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2461             GST_TIME_ARGS (sess->next_rtcp_check_time));
2462
2463         /* mark pending reconsider. We only want to signal the reconsideration
2464          * once after we handled all the source in the bye packet */
2465         reconsider = TRUE;
2466       }
2467     }
2468
2469     if (created)
2470       on_new_ssrc (sess, source);
2471
2472     on_bye_ssrc (sess, source);
2473
2474     g_object_unref (source);
2475   }
2476   if (reconsider) {
2477     RTP_SESSION_UNLOCK (sess);
2478     /* notify app of reconsideration */
2479     if (sess->callbacks.reconsider)
2480       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2481     RTP_SESSION_LOCK (sess);
2482   }
2483   g_free (reason);
2484 }
2485
2486 static void
2487 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2488     RTPPacketInfo * pinfo)
2489 {
2490   GST_DEBUG ("received APP");
2491 }
2492
2493 static gboolean
2494 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2495     gboolean fir, GstClockTime current_time)
2496 {
2497   guint32 round_trip = 0;
2498
2499   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
2500
2501   if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2502     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2503         GST_SECOND, 65536);
2504
2505     if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
2506       GST_DEBUG ("Ignoring %s request because one was send without one "
2507           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2508           fir ? "FIR" : "PLI",
2509           GST_TIME_ARGS (current_time - sess->last_keyframe_request),
2510           GST_TIME_ARGS (round_trip_in_ns));
2511       return FALSE;
2512     }
2513   }
2514
2515   sess->last_keyframe_request = current_time;
2516
2517   GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
2518       rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
2519       sess->callbacks.request_key_unit);
2520
2521   RTP_SESSION_UNLOCK (sess);
2522   sess->callbacks.request_key_unit (sess, fir,
2523       sess->request_key_unit_user_data);
2524   RTP_SESSION_LOCK (sess);
2525
2526   return TRUE;
2527 }
2528
2529 static void
2530 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2531     guint32 media_ssrc, GstClockTime current_time)
2532 {
2533   RTPSource *src;
2534
2535   if (!sess->callbacks.request_key_unit)
2536     return;
2537
2538   src = find_source (sess, sender_ssrc);
2539   if (src == NULL)
2540     return;
2541
2542   rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
2543 }
2544
2545 static void
2546 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2547     guint8 * fci_data, guint fci_length, GstClockTime current_time)
2548 {
2549   RTPSource *src;
2550   guint32 ssrc;
2551   guint position = 0;
2552   gboolean our_request = FALSE;
2553
2554   if (!sess->callbacks.request_key_unit)
2555     return;
2556
2557   if (fci_length < 8)
2558     return;
2559
2560   src = find_source (sess, sender_ssrc);
2561
2562   /* Hack because Google fails to set the sender_ssrc correctly */
2563   if (!src && sender_ssrc == 1) {
2564     GHashTableIter iter;
2565
2566     /* we can't find the source if there are multiple */
2567     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2568       return;
2569
2570     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2571     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2572       if (!src->internal && rtp_source_is_sender (src))
2573         break;
2574       src = NULL;
2575     }
2576   }
2577   if (!src)
2578     return;
2579
2580   for (position = 0; position < fci_length; position += 8) {
2581     guint8 *data = fci_data + position;
2582     RTPSource *own;
2583
2584     ssrc = GST_READ_UINT32_BE (data);
2585
2586     own = find_source (sess, ssrc);
2587     if (own == NULL)
2588       continue;
2589
2590     if (own->internal) {
2591       our_request = TRUE;
2592       break;
2593     }
2594   }
2595   if (!our_request)
2596     return;
2597
2598   rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
2599 }
2600
2601 static void
2602 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2603     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2604     GstClockTime current_time)
2605 {
2606   sess->stats.nacks_received++;
2607
2608   if (!sess->callbacks.notify_nack)
2609     return;
2610
2611   while (fci_length > 0) {
2612     guint16 seqnum, blp;
2613
2614     seqnum = GST_READ_UINT16_BE (fci_data);
2615     blp = GST_READ_UINT16_BE (fci_data + 2);
2616
2617     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2618
2619     RTP_SESSION_UNLOCK (sess);
2620     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2621         sess->notify_nack_user_data);
2622     RTP_SESSION_LOCK (sess);
2623
2624     fci_data += 4;
2625     fci_length -= 4;
2626   }
2627 }
2628
2629 static void
2630 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2631     RTPPacketInfo * pinfo, GstClockTime current_time)
2632 {
2633   GstRTCPType type = gst_rtcp_packet_get_type (packet);
2634   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
2635   guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2636   guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2637   guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
2638   guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
2639   RTPSource *src;
2640
2641   src = find_source (sess, media_ssrc);
2642
2643   /* skip non-bye packets for sources that are marked BYE */
2644   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
2645     return;
2646
2647   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2648       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2649
2650   if (g_signal_has_handler_pending (sess,
2651           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2652     GstBuffer *fci_buffer = NULL;
2653
2654     if (fci_length > 0) {
2655       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2656           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2657           fci_length);
2658       GST_BUFFER_PTS (fci_buffer) = pinfo->running_time;
2659     }
2660
2661     RTP_SESSION_UNLOCK (sess);
2662     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2663         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2664     RTP_SESSION_LOCK (sess);
2665
2666     if (fci_buffer)
2667       gst_buffer_unref (fci_buffer);
2668   }
2669
2670   if (src && sess->rtcp_feedback_retention_window) {
2671     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
2672   }
2673
2674   if ((src && src->internal) ||
2675       /* PSFB FIR puts the media ssrc inside the FCI */
2676       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
2677     switch (type) {
2678       case GST_RTCP_TYPE_PSFB:
2679         switch (fbtype) {
2680           case GST_RTCP_PSFB_TYPE_PLI:
2681             if (src)
2682               src->stats.recv_pli_count++;
2683             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2684                 current_time);
2685             break;
2686           case GST_RTCP_PSFB_TYPE_FIR:
2687             if (src)
2688               src->stats.recv_fir_count++;
2689             rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
2690                 current_time);
2691             break;
2692           default:
2693             break;
2694         }
2695         break;
2696       case GST_RTCP_TYPE_RTPFB:
2697         switch (fbtype) {
2698           case GST_RTCP_RTPFB_TYPE_NACK:
2699             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
2700                 fci_data, fci_length, current_time);
2701             break;
2702           default:
2703             break;
2704         }
2705       default:
2706         break;
2707     }
2708   }
2709 }
2710
2711 /**
2712  * rtp_session_process_rtcp:
2713  * @sess: and #RTPSession
2714  * @buffer: an RTCP buffer
2715  * @current_time: the current system time
2716  * @ntpnstime: the current NTP time in nanoseconds
2717  *
2718  * Process an RTCP buffer in the session manager. This function takes ownership
2719  * of @buffer.
2720  *
2721  * Returns: a #GstFlowReturn.
2722  */
2723 GstFlowReturn
2724 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2725     GstClockTime current_time, guint64 ntpnstime)
2726 {
2727   GstRTCPPacket packet;
2728   gboolean more, is_bye = FALSE, do_sync = FALSE;
2729   RTPPacketInfo pinfo = { 0, };
2730   GstFlowReturn result = GST_FLOW_OK;
2731   GstRTCPBuffer rtcp = { NULL, };
2732
2733   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2734   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2735
2736   if (!gst_rtcp_buffer_validate_reduced (buffer))
2737     goto invalid_packet;
2738
2739   GST_DEBUG ("received RTCP packet");
2740
2741   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
2742       buffer);
2743
2744   RTP_SESSION_LOCK (sess);
2745   /* update pinfo stats */
2746   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
2747       -1, ntpnstime);
2748
2749   /* start processing the compound packet */
2750   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2751   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
2752   while (more) {
2753     GstRTCPType type;
2754
2755     type = gst_rtcp_packet_get_type (&packet);
2756
2757     switch (type) {
2758       case GST_RTCP_TYPE_SR:
2759         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
2760         break;
2761       case GST_RTCP_TYPE_RR:
2762         rtp_session_process_rr (sess, &packet, &pinfo);
2763         break;
2764       case GST_RTCP_TYPE_SDES:
2765         rtp_session_process_sdes (sess, &packet, &pinfo);
2766         break;
2767       case GST_RTCP_TYPE_BYE:
2768         is_bye = TRUE;
2769         /* don't try to attempt lip-sync anymore for streams with a BYE */
2770         do_sync = FALSE;
2771         rtp_session_process_bye (sess, &packet, &pinfo);
2772         break;
2773       case GST_RTCP_TYPE_APP:
2774         rtp_session_process_app (sess, &packet, &pinfo);
2775         break;
2776       case GST_RTCP_TYPE_RTPFB:
2777       case GST_RTCP_TYPE_PSFB:
2778         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
2779         break;
2780       default:
2781         GST_WARNING ("got unknown RTCP packet");
2782         break;
2783     }
2784     more = gst_rtcp_packet_move_to_next (&packet);
2785   }
2786
2787   gst_rtcp_buffer_unmap (&rtcp);
2788
2789   /* if we are scheduling a BYE, we only want to count bye packets, else we
2790    * count everything */
2791   if (sess->scheduled_bye && is_bye) {
2792     sess->bye_stats.bye_members++;
2793     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
2794   }
2795
2796   /* keep track of average packet size */
2797   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2798
2799   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2800       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2801   RTP_SESSION_UNLOCK (sess);
2802
2803   pinfo.data = NULL;
2804   clean_packet_info (&pinfo);
2805
2806   /* notify caller of sr packets in the callback */
2807   if (do_sync && sess->callbacks.sync_rtcp) {
2808     result = sess->callbacks.sync_rtcp (sess, buffer,
2809         sess->sync_rtcp_user_data);
2810   } else
2811     gst_buffer_unref (buffer);
2812
2813   return result;
2814
2815   /* ERRORS */
2816 invalid_packet:
2817   {
2818     GST_DEBUG ("invalid RTCP packet received");
2819     gst_buffer_unref (buffer);
2820     return GST_FLOW_OK;
2821   }
2822 }
2823
2824 /**
2825  * rtp_session_update_send_caps:
2826  * @sess: an #RTPSession
2827  * @caps: a #GstCaps
2828  *
2829  * Update the caps of the sender in the rtp session.
2830  */
2831 void
2832 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
2833 {
2834   GstStructure *s;
2835   guint ssrc;
2836
2837   g_return_if_fail (RTP_IS_SESSION (sess));
2838   g_return_if_fail (GST_IS_CAPS (caps));
2839
2840   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
2841
2842   s = gst_caps_get_structure (caps, 0);
2843
2844   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
2845     RTPSource *source;
2846     gboolean created;
2847
2848     RTP_SESSION_LOCK (sess);
2849     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
2850     sess->suggested_ssrc = ssrc;
2851     sess->internal_ssrc_set = TRUE;
2852     sess->internal_ssrc_from_caps_or_property = TRUE;
2853     if (source) {
2854       rtp_source_update_caps (source, caps);
2855
2856       if (created)
2857         on_new_sender_ssrc (sess, source);
2858
2859       g_object_unref (source);
2860     }
2861
2862     if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
2863       source =
2864           obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
2865       if (source) {
2866         rtp_source_update_caps (source, caps);
2867         g_object_unref (source);
2868       }
2869     }
2870     RTP_SESSION_UNLOCK (sess);
2871   } else {
2872     sess->internal_ssrc_from_caps_or_property = FALSE;
2873   }
2874 }
2875
2876 /**
2877  * rtp_session_send_rtp:
2878  * @sess: an #RTPSession
2879  * @data: pointer to either an RTP buffer or a list of RTP buffers
2880  * @is_list: TRUE when @data is a buffer list
2881  * @current_time: the current system time
2882  * @running_time: the running time of @data
2883  *
2884  * Send the RTP buffer in the session manager. This function takes ownership of
2885  * @buffer.
2886  *
2887  * Returns: a #GstFlowReturn.
2888  */
2889 GstFlowReturn
2890 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2891     GstClockTime current_time, GstClockTime running_time)
2892 {
2893   GstFlowReturn result;
2894   RTPSource *source;
2895   gboolean prevsender;
2896   guint64 oldrate;
2897   RTPPacketInfo pinfo = { 0, };
2898   gboolean created;
2899
2900   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2901   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2902
2903   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2904
2905   RTP_SESSION_LOCK (sess);
2906   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
2907           current_time, running_time, -1))
2908     goto invalid_packet;
2909
2910   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
2911   if (created)
2912     on_new_sender_ssrc (sess, source);
2913
2914   prevsender = RTP_SOURCE_IS_SENDER (source);
2915   oldrate = source->bitrate;
2916
2917   /* we use our own source to send */
2918   result = rtp_source_send_rtp (source, &pinfo);
2919
2920   source_update_sender (sess, source, prevsender);
2921
2922   if (oldrate != source->bitrate)
2923     sess->recalc_bandwidth = TRUE;
2924   RTP_SESSION_UNLOCK (sess);
2925
2926   g_object_unref (source);
2927   clean_packet_info (&pinfo);
2928
2929   return result;
2930
2931 invalid_packet:
2932   {
2933     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2934     RTP_SESSION_UNLOCK (sess);
2935     GST_DEBUG ("invalid RTP packet received");
2936     return GST_FLOW_OK;
2937   }
2938 }
2939
2940 static void
2941 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2942 {
2943   *bandwidth += source->bitrate;
2944 }
2945
2946 /* must be called with session lock */
2947 static GstClockTime
2948 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2949     gboolean first)
2950 {
2951   GstClockTime result;
2952   RTPSessionStats *stats;
2953
2954   /* recalculate bandwidth when it changed */
2955   if (sess->recalc_bandwidth) {
2956     gdouble bandwidth;
2957
2958     if (sess->bandwidth > 0)
2959       bandwidth = sess->bandwidth;
2960     else {
2961       /* If it is <= 0, then try to estimate the actual bandwidth */
2962       bandwidth = 0;
2963
2964       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2965           (GHFunc) add_bitrates, &bandwidth);
2966     }
2967     if (bandwidth < RTP_STATS_BANDWIDTH)
2968       bandwidth = RTP_STATS_BANDWIDTH;
2969
2970     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2971         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2972
2973     sess->recalc_bandwidth = FALSE;
2974   }
2975
2976   if (sess->scheduled_bye) {
2977     stats = &sess->bye_stats;
2978     result = rtp_stats_calculate_bye_interval (stats);
2979   } else {
2980     session_update_ptp (sess);
2981
2982     stats = &sess->stats;
2983     result = rtp_stats_calculate_rtcp_interval (stats,
2984         stats->internal_sender_sources > 0, sess->rtp_profile,
2985         sess->is_doing_ptp, first);
2986   }
2987
2988   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2989       GST_TIME_ARGS (result), first);
2990
2991   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2992     result = rtp_stats_add_rtcp_jitter (stats, result);
2993
2994   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2995
2996   return result;
2997 }
2998
2999 static void
3000 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
3001 {
3002   if (source->internal)
3003     rtp_source_mark_bye (source, reason);
3004 }
3005
3006 /**
3007  * rtp_session_mark_all_bye:
3008  * @sess: an #RTPSession
3009  * @reason: a reason
3010  *
3011  * Mark all internal sources of the session as BYE with @reason.
3012  */
3013 void
3014 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
3015 {
3016   g_return_if_fail (RTP_IS_SESSION (sess));
3017
3018   RTP_SESSION_LOCK (sess);
3019   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3020       (GHFunc) source_mark_bye, (gpointer) reason);
3021   RTP_SESSION_UNLOCK (sess);
3022 }
3023
3024 /* Stop the current @sess and schedule a BYE message for the other members.
3025  * One must have the session lock to call this function
3026  */
3027 static GstFlowReturn
3028 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
3029 {
3030   GstFlowReturn result = GST_FLOW_OK;
3031   GstClockTime interval;
3032
3033   /* nothing to do it we already scheduled bye */
3034   if (sess->scheduled_bye)
3035     goto done;
3036
3037   /* we schedule BYE now */
3038   sess->scheduled_bye = TRUE;
3039   /* at least one member wants to send a BYE */
3040   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
3041   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
3042   sess->bye_stats.bye_members = 1;
3043   sess->first_rtcp = TRUE;
3044
3045   /* reschedule transmission */
3046   sess->last_rtcp_send_time = current_time;
3047   sess->last_rtcp_check_time = current_time;
3048   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3049
3050   if (interval != GST_CLOCK_TIME_NONE)
3051     sess->next_rtcp_check_time = current_time + interval;
3052   else
3053     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
3054   sess->last_rtcp_interval = interval;
3055
3056   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
3057       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
3058
3059   RTP_SESSION_UNLOCK (sess);
3060   /* notify app of reconsideration */
3061   if (sess->callbacks.reconsider)
3062     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3063   RTP_SESSION_LOCK (sess);
3064 done:
3065
3066   return result;
3067 }
3068
3069 /**
3070  * rtp_session_schedule_bye:
3071  * @sess: an #RTPSession
3072  * @current_time: the current system time
3073  *
3074  * Schedule a BYE message for all sources marked as BYE in @sess.
3075  *
3076  * Returns: a #GstFlowReturn.
3077  */
3078 GstFlowReturn
3079 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
3080 {
3081   GstFlowReturn result;
3082
3083   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3084
3085   RTP_SESSION_LOCK (sess);
3086   result = rtp_session_schedule_bye_locked (sess, current_time);
3087   RTP_SESSION_UNLOCK (sess);
3088
3089   return result;
3090 }
3091
3092 /**
3093  * rtp_session_next_timeout:
3094  * @sess: an #RTPSession
3095  * @current_time: the current system time
3096  *
3097  * Get the next time we should perform session maintenance tasks.
3098  *
3099  * Returns: a time when rtp_session_on_timeout() should be called with the
3100  * current system time.
3101  */
3102 GstClockTime
3103 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
3104 {
3105   GstClockTime result, interval = 0;
3106
3107   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
3108
3109   RTP_SESSION_LOCK (sess);
3110
3111   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3112     GST_DEBUG ("have early rtcp time");
3113     result = sess->next_early_rtcp_time;
3114     goto early_exit;
3115   }
3116
3117   result = sess->next_rtcp_check_time;
3118
3119   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3120       ", next time: %" GST_TIME_FORMAT,
3121       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3122
3123   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
3124     GST_DEBUG ("take current time as base");
3125     /* our previous check time expired, start counting from the current time
3126      * again. */
3127     result = current_time;
3128   }
3129
3130   if (sess->scheduled_bye) {
3131     if (sess->bye_stats.active_sources >= 50) {
3132       GST_DEBUG ("reconsider BYE, more than 50 sources");
3133       /* reconsider BYE if members >= 50 */
3134       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3135       sess->last_rtcp_interval = interval;
3136     }
3137   } else {
3138     if (sess->first_rtcp) {
3139       GST_DEBUG ("first RTCP packet");
3140       /* we are called for the first time */
3141       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3142       sess->last_rtcp_interval = interval;
3143     } else if (sess->next_rtcp_check_time < current_time) {
3144       GST_DEBUG ("old check time expired, getting new timeout");
3145       /* get a new timeout when we need to */
3146       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
3147       sess->last_rtcp_interval = interval;
3148
3149       if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
3150               || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
3151           && interval != GST_CLOCK_TIME_NONE) {
3152         /* Apply the rules from RFC 4585 section 3.5.3 */
3153         if (sess->stats.min_interval != 0) {
3154           GstClockTime T_rr_current_interval = g_random_double_range (0.5,
3155               1.5) * sess->stats.min_interval * GST_SECOND;
3156
3157           if (T_rr_current_interval > interval) {
3158             GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
3159                 " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
3160                 GST_TIME_ARGS (interval));
3161             interval = T_rr_current_interval;
3162           }
3163         }
3164       }
3165     }
3166   }
3167
3168   if (interval != GST_CLOCK_TIME_NONE)
3169     result += interval;
3170   else
3171     result = GST_CLOCK_TIME_NONE;
3172
3173   sess->next_rtcp_check_time = result;
3174
3175 early_exit:
3176
3177   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3178       ", next time: %" GST_TIME_FORMAT,
3179       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3180   RTP_SESSION_UNLOCK (sess);
3181
3182   return result;
3183 }
3184
3185 typedef struct
3186 {
3187   RTPSource *source;
3188   gboolean is_bye;
3189   GstBuffer *buffer;
3190 } ReportOutput;
3191
3192 typedef struct
3193 {
3194   GstRTCPBuffer rtcpbuf;
3195   RTPSession *sess;
3196   RTPSource *source;
3197   guint num_to_report;
3198   gboolean have_fir;
3199   gboolean have_pli;
3200   gboolean have_nack;
3201   GstBuffer *rtcp;
3202   GstClockTime current_time;
3203   guint64 ntpnstime;
3204   GstClockTime running_time;
3205   GstClockTime interval;
3206   GstRTCPPacket packet;
3207   gboolean has_sdes;
3208   gboolean is_early;
3209   gboolean may_suppress;
3210   GQueue output;
3211   guint nacked_seqnums;
3212 } ReportData;
3213
3214 static void
3215 session_start_rtcp (RTPSession * sess, ReportData * data)
3216 {
3217   GstRTCPPacket *packet = &data->packet;
3218   RTPSource *own = data->source;
3219   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3220
3221   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
3222   data->has_sdes = FALSE;
3223
3224   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3225
3226   if (data->is_early && sess->reduced_size_rtcp)
3227     return;
3228
3229   if (RTP_SOURCE_IS_SENDER (own)) {
3230     guint64 ntptime;
3231     guint32 rtptime;
3232     guint32 packet_count, octet_count;
3233
3234     /* we are a sender, create SR */
3235     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3236     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3237
3238     /* get latest stats */
3239     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3240         &ntptime, &rtptime, &packet_count, &octet_count);
3241     /* store stats */
3242     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3243         packet_count, octet_count);
3244
3245     /* fill in sender report info */
3246     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3247         ntptime, rtptime, packet_count, octet_count);
3248   } else {
3249     /* we are only receiver, create RR */
3250     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3251     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3252     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3253   }
3254 }
3255
3256 /* construct a Sender or Receiver Report */
3257 static void
3258 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3259 {
3260   RTPSession *sess = data->sess;
3261   GstRTCPPacket *packet = &data->packet;
3262   guint8 fractionlost;
3263   gint32 packetslost;
3264   guint32 exthighestseq, jitter;
3265   guint32 lsr, dlsr;
3266
3267   /* don't report for sources in future generations */
3268   if (((gint16) (source->generation - sess->generation)) > 0) {
3269     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3270         source->generation, sess->generation);
3271     return;
3272   }
3273
3274   if (g_hash_table_contains (source->reported_in_sr_of,
3275           GUINT_TO_POINTER (data->source->ssrc))) {
3276     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3277     return;
3278   }
3279
3280   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3281     GST_DEBUG ("max RB count reached");
3282     return;
3283   }
3284
3285   /* only report about other sender */
3286   if (source == data->source)
3287     goto reported;
3288
3289   if (!RTP_SOURCE_IS_SENDER (source)) {
3290     GST_DEBUG ("source %08x not sender", source->ssrc);
3291     goto reported;
3292   }
3293
3294   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3295
3296   /* get new stats */
3297   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3298       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3299
3300   /* store last generated RR packet */
3301   source->last_rr.is_valid = TRUE;
3302   source->last_rr.fractionlost = fractionlost;
3303   source->last_rr.packetslost = packetslost;
3304   source->last_rr.exthighestseq = exthighestseq;
3305   source->last_rr.jitter = jitter;
3306   source->last_rr.lsr = lsr;
3307   source->last_rr.dlsr = dlsr;
3308
3309   /* packet is not yet filled, add report block for this source. */
3310   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3311       exthighestseq, jitter, lsr, dlsr);
3312
3313 reported:
3314   g_hash_table_add (source->reported_in_sr_of,
3315       GUINT_TO_POINTER (data->source->ssrc));
3316 }
3317
3318 /* construct FIR */
3319 static void
3320 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3321 {
3322   GstRTCPPacket *packet = &data->packet;
3323   guint16 len;
3324   guint8 *fci_data;
3325
3326   if (!source->send_fir)
3327     return;
3328
3329   len = gst_rtcp_packet_fb_get_fci_length (packet);
3330   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3331     /* exit because the packet is full, will put next request in a
3332      * further packet */
3333     return;
3334
3335   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3336
3337   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3338   fci_data += 4;
3339   fci_data[0] = source->current_send_fir_seqnum;
3340   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3341
3342   source->send_fir = FALSE;
3343   source->stats.sent_fir_count++;
3344 }
3345
3346 static void
3347 session_fir (RTPSession * sess, ReportData * data)
3348 {
3349   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3350   GstRTCPPacket *packet = &data->packet;
3351
3352   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3353     return;
3354
3355   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3356   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3357   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3358
3359   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3360       (GHFunc) session_add_fir, data);
3361
3362   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3363     gst_rtcp_packet_remove (packet);
3364   else
3365     data->may_suppress = FALSE;
3366 }
3367
3368 static gboolean
3369 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3370 {
3371   GstRTCPPacket packet;
3372   GstRTCPBuffer rtcp = { NULL, };
3373   gboolean ret = FALSE;
3374
3375   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3376
3377   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3378     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3379         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3380       ret = TRUE;
3381   }
3382
3383   gst_rtcp_buffer_unmap (&rtcp);
3384
3385   return ret;
3386 }
3387
3388 /* construct PLI */
3389 static void
3390 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3391 {
3392   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3393   GstRTCPPacket *packet = &data->packet;
3394
3395   if (!source->send_pli)
3396     return;
3397
3398   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3399     return;
3400
3401   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3402     /* exit because the packet is full, will put next request in a
3403      * further packet */
3404     return;
3405
3406   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
3407   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3408   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3409
3410   source->send_pli = FALSE;
3411   data->may_suppress = FALSE;
3412
3413   source->stats.sent_pli_count++;
3414 }
3415
3416 /* construct NACK */
3417 static void
3418 session_nack (const gchar * key, RTPSource * source, ReportData * data)
3419 {
3420   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3421   GstRTCPPacket *packet = &data->packet;
3422   guint32 *nacks;
3423   guint n_nacks, i;
3424   guint8 *fci_data;
3425
3426   if (!source->send_nack)
3427     return;
3428
3429   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
3430     /* exit because the packet is full, will put next request in a
3431      * further packet */
3432     return;
3433
3434   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
3435   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3436   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3437
3438   nacks = rtp_source_get_nacks (source, &n_nacks);
3439   GST_DEBUG ("%u NACKs", n_nacks);
3440   if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
3441     return;
3442
3443   fci_data = gst_rtcp_packet_fb_get_fci (packet);
3444   for (i = 0; i < n_nacks; i++) {
3445     GST_WRITE_UINT32_BE (fci_data, nacks[i]);
3446     fci_data += 4;
3447     data->nacked_seqnums++;
3448   }
3449
3450   rtp_source_clear_nacks (source);
3451   data->may_suppress = FALSE;
3452 }
3453
3454 /* perform cleanup of sources that timed out */
3455 static void
3456 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
3457 {
3458   gboolean remove = FALSE;
3459   gboolean byetimeout = FALSE;
3460   gboolean sendertimeout = FALSE;
3461   gboolean is_sender, is_active;
3462   RTPSession *sess = data->sess;
3463   GstClockTime interval, binterval;
3464   GstClockTime btime;
3465
3466   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
3467
3468   /* check for outdated collisions */
3469   if (source->internal) {
3470     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
3471     rtp_source_timeout (source, data->current_time,
3472         data->running_time - sess->rtcp_feedback_retention_window);
3473   }
3474
3475   /* nothing else to do when without RTCP */
3476   if (data->interval == GST_CLOCK_TIME_NONE)
3477     return;
3478
3479   is_sender = RTP_SOURCE_IS_SENDER (source);
3480   is_active = RTP_SOURCE_IS_ACTIVE (source);
3481
3482   /* our own rtcp interval may have been forced low by secondary configuration,
3483    * while sender side may still operate with higher interval,
3484    * so do not just take our interval to decide on timing out sender,
3485    * but take (if data->interval <= 5 * GST_SECOND):
3486    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
3487    * where sender_interval is difference between last 2 received RTCP reports
3488    */
3489   if (data->interval >= 5 * GST_SECOND || source->internal) {
3490     binterval = data->interval;
3491   } else {
3492     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
3493         GST_TIME_ARGS (source->stats.prev_rtcptime),
3494         GST_TIME_ARGS (source->stats.last_rtcptime));
3495     /* if not received enough yet, fallback to larger default */
3496     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
3497       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
3498     else
3499       binterval = 5 * GST_SECOND;
3500     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
3501   }
3502   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
3503       GST_TIME_ARGS (binterval));
3504
3505   if (!source->internal && source->marked_bye) {
3506     /* if we received a BYE from the source, remove the source after some
3507      * time. */
3508     if (data->current_time > source->bye_time &&
3509         data->current_time - source->bye_time > sess->stats.bye_timeout) {
3510       GST_DEBUG ("removing BYE source %08x", source->ssrc);
3511       remove = TRUE;
3512       byetimeout = TRUE;
3513     }
3514   }
3515
3516   if (source->internal && source->sent_bye) {
3517     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
3518     remove = TRUE;
3519   }
3520
3521   /* sources that were inactive for more than 5 times the deterministic reporting
3522    * interval get timed out. the min timeout is 5 seconds. */
3523   /* mind old time that might pre-date last time going to PLAYING */
3524   btime = MAX (source->last_activity, sess->start_time);
3525   if (data->current_time > btime) {
3526     interval = MAX (binterval * 5, 5 * GST_SECOND);
3527     if (data->current_time - btime > interval) {
3528       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
3529           source->ssrc, GST_TIME_ARGS (btime));
3530       if (source->internal) {
3531         /* this is an internal source that is not using our suggested ssrc.
3532          * since there must be another source using this ssrc, we can remove
3533          * this one instead of making it a receiver forever */
3534         if (source->ssrc != sess->suggested_ssrc) {
3535           rtp_source_mark_bye (source, "timed out");
3536           /* do not schedule bye here, since we are inside the RTCP timeout
3537            * processing and scheduling bye will interfere with SR/RR sending */
3538         }
3539       } else {
3540         remove = TRUE;
3541       }
3542     }
3543   }
3544
3545   /* senders that did not send for a long time become a receiver, this also
3546    * holds for our own sources. */
3547   if (is_sender) {
3548     /* mind old time that might pre-date last time going to PLAYING */
3549     btime = MAX (source->last_rtp_activity, sess->start_time);
3550     if (data->current_time > btime) {
3551       interval = MAX (binterval * 2, 5 * GST_SECOND);
3552       if (data->current_time - btime > interval) {
3553         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
3554             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3555         sendertimeout = TRUE;
3556       }
3557     }
3558   }
3559
3560   if (remove) {
3561     sess->total_sources--;
3562     if (is_sender) {
3563       sess->stats.sender_sources--;
3564       if (source->internal)
3565         sess->stats.internal_sender_sources--;
3566     }
3567     if (is_active)
3568       sess->stats.active_sources--;
3569
3570     if (source->internal)
3571       sess->stats.internal_sources--;
3572
3573     if (byetimeout)
3574       on_bye_timeout (sess, source);
3575     else
3576       on_timeout (sess, source);
3577   } else {
3578     if (sendertimeout) {
3579       source->is_sender = FALSE;
3580       sess->stats.sender_sources--;
3581       if (source->internal)
3582         sess->stats.internal_sender_sources--;
3583
3584       on_sender_timeout (sess, source);
3585     }
3586     /* count how many source to report in this generation */
3587     if (((gint16) (source->generation - sess->generation)) <= 0)
3588       data->num_to_report++;
3589   }
3590   source->closing = remove;
3591 }
3592
3593 static void
3594 session_sdes (RTPSession * sess, ReportData * data)
3595 {
3596   GstRTCPPacket *packet = &data->packet;
3597   const GstStructure *sdes;
3598   gint i, n_fields;
3599   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3600
3601   /* add SDES packet */
3602   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
3603
3604   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
3605
3606   sdes = rtp_source_get_sdes_struct (data->source);
3607
3608   /* add all fields in the structure, the order is not important. */
3609   n_fields = gst_structure_n_fields (sdes);
3610   for (i = 0; i < n_fields; ++i) {
3611     const gchar *field;
3612     const gchar *value;
3613     GstRTCPSDESType type;
3614
3615     field = gst_structure_nth_field_name (sdes, i);
3616     if (field == NULL)
3617       continue;
3618     value = gst_structure_get_string (sdes, field);
3619     if (value == NULL)
3620       continue;
3621     type = gst_rtcp_sdes_name_to_type (field);
3622
3623     /* Early packets are minimal and only include the CNAME */
3624     if (data->is_early && type != GST_RTCP_SDES_CNAME)
3625       continue;
3626
3627     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
3628       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
3629           (const guint8 *) value);
3630     } else if (type == GST_RTCP_SDES_PRIV) {
3631       gsize prefix_len;
3632       gsize value_len;
3633       gsize data_len;
3634       guint8 data[256];
3635
3636       /* don't accept entries that are too big */
3637       prefix_len = strlen (field);
3638       if (prefix_len > 255)
3639         continue;
3640       value_len = strlen (value);
3641       if (value_len > 255)
3642         continue;
3643       data_len = 1 + prefix_len + value_len;
3644       if (data_len > 255)
3645         continue;
3646
3647       data[0] = prefix_len;
3648       memcpy (&data[1], field, prefix_len);
3649       memcpy (&data[1 + prefix_len], value, value_len);
3650
3651       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
3652     }
3653   }
3654
3655   data->has_sdes = TRUE;
3656 }
3657
3658 /* schedule a BYE packet */
3659 static void
3660 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
3661 {
3662   GstRTCPPacket *packet = &data->packet;
3663   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3664
3665   /* add SDES */
3666   session_sdes (sess, data);
3667   /* add a BYE packet */
3668   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
3669   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
3670   if (source->bye_reason)
3671     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
3672
3673   /* we have a BYE packet now */
3674   source->sent_bye = TRUE;
3675 }
3676
3677 static gboolean
3678 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
3679 {
3680   GstClockTime new_send_time;
3681   GstClockTime interval;
3682   RTPSessionStats *stats;
3683
3684   if (sess->scheduled_bye)
3685     stats = &sess->bye_stats;
3686   else
3687     stats = &sess->stats;
3688
3689   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3690     data->is_early = TRUE;
3691   else
3692     data->is_early = FALSE;
3693
3694   if (data->is_early && sess->next_early_rtcp_time < current_time) {
3695     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
3696         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
3697         GST_TIME_ARGS (current_time));
3698   } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
3699       sess->next_rtcp_check_time > current_time) {
3700     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
3701         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
3702         GST_TIME_ARGS (current_time));
3703     return FALSE;
3704   }
3705
3706   /* take interval and add jitter */
3707   interval = data->interval;
3708   if (interval != GST_CLOCK_TIME_NONE)
3709     interval = rtp_stats_add_rtcp_jitter (stats, interval);
3710
3711   if (sess->last_rtcp_check_time != GST_CLOCK_TIME_NONE) {
3712     /* perform forward reconsideration */
3713     if (interval != GST_CLOCK_TIME_NONE) {
3714       GstClockTime elapsed;
3715
3716       /* get elapsed time since we last reported */
3717       elapsed = current_time - sess->last_rtcp_check_time;
3718
3719       GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
3720           GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
3721       new_send_time = interval + sess->last_rtcp_check_time;
3722     } else {
3723       new_send_time = sess->last_rtcp_check_time;
3724     }
3725   } else {
3726     /* If this is the first RTCP packet, we can reconsider anything based
3727      * on the last RTCP send time because there was none.
3728      */
3729     g_warn_if_fail (!data->is_early);
3730     data->is_early = FALSE;
3731     new_send_time = current_time;
3732   }
3733
3734   if (!data->is_early) {
3735     /* check if reconsideration */
3736     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
3737       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
3738           GST_TIME_ARGS (new_send_time));
3739       /* store new check time */
3740       sess->next_rtcp_check_time = new_send_time;
3741       sess->last_rtcp_interval = interval;
3742       return FALSE;
3743     }
3744
3745     sess->last_rtcp_interval = interval;
3746     if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
3747             || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
3748         && interval != GST_CLOCK_TIME_NONE) {
3749       /* Apply the rules from RFC 4585 section 3.5.3 */
3750       if (stats->min_interval != 0 && !sess->first_rtcp) {
3751         GstClockTime T_rr_current_interval =
3752             g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND;
3753
3754         if (T_rr_current_interval > interval) {
3755           GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
3756               " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
3757               GST_TIME_ARGS (interval));
3758           interval = T_rr_current_interval;
3759         }
3760       }
3761     }
3762     sess->next_rtcp_check_time = current_time + interval;
3763   }
3764
3765
3766   GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT,
3767       GST_TIME_ARGS (sess->next_rtcp_check_time));
3768
3769   return TRUE;
3770 }
3771
3772 static void
3773 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
3774 {
3775   g_hash_table_insert (hash_table, key, g_object_ref (source));
3776 }
3777
3778 static gboolean
3779 remove_closing_sources (const gchar * key, RTPSource * source,
3780     ReportData * data)
3781 {
3782   if (source->closing)
3783     return TRUE;
3784
3785   if (source->send_fir)
3786     data->have_fir = TRUE;
3787   if (source->send_pli)
3788     data->have_pli = TRUE;
3789   if (source->send_nack)
3790     data->have_nack = TRUE;
3791
3792   return FALSE;
3793 }
3794
3795 static void
3796 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
3797 {
3798   RTPSession *sess = data->sess;
3799   gboolean is_bye = FALSE;
3800   ReportOutput *output;
3801
3802   /* only generate RTCP for active internal sources */
3803   if (!source->internal || source->sent_bye)
3804     return;
3805
3806   /* ignore other sources when we do the timeout after a scheduled BYE */
3807   if (sess->scheduled_bye && !source->marked_bye)
3808     return;
3809
3810   data->source = source;
3811
3812   /* open packet */
3813   session_start_rtcp (sess, data);
3814
3815   if (source->marked_bye) {
3816     /* send BYE */
3817     make_source_bye (sess, source, data);
3818     is_bye = TRUE;
3819   } else if (!data->is_early) {
3820     /* loop over all known sources and add report blocks. If we are early, we
3821      * just make a minimal RTCP packet and skip this step */
3822     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3823         (GHFunc) session_report_blocks, data);
3824   }
3825   if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp))
3826     session_sdes (sess, data);
3827
3828   if (data->have_fir)
3829     session_fir (sess, data);
3830
3831   if (data->have_pli)
3832     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3833         (GHFunc) session_pli, data);
3834
3835   if (data->have_nack)
3836     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3837         (GHFunc) session_nack, data);
3838
3839   gst_rtcp_buffer_unmap (&data->rtcpbuf);
3840
3841   output = g_slice_new (ReportOutput);
3842   output->source = g_object_ref (source);
3843   output->is_bye = is_bye;
3844   output->buffer = data->rtcp;
3845   /* queue the RTCP packet to push later */
3846   g_queue_push_tail (&data->output, output);
3847 }
3848
3849 static void
3850 update_generation (const gchar * key, RTPSource * source, ReportData * data)
3851 {
3852   RTPSession *sess = data->sess;
3853
3854   if (g_hash_table_size (source->reported_in_sr_of) >=
3855       sess->stats.internal_sources) {
3856     /* source is reported, move to next generation */
3857     source->generation = sess->generation + 1;
3858     g_hash_table_remove_all (source->reported_in_sr_of);
3859
3860     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
3861         source->generation);
3862
3863     /* if we reported all sources in this generation, move to next */
3864     if (--data->num_to_report == 0) {
3865       sess->generation++;
3866       GST_DEBUG ("all reported, generation now %u", sess->generation);
3867     }
3868   }
3869 }
3870
3871 /**
3872  * rtp_session_on_timeout:
3873  * @sess: an #RTPSession
3874  * @current_time: the current system time
3875  * @ntpnstime: the current NTP time in nanoseconds
3876  * @running_time: the current running_time of the pipeline
3877  *
3878  * Perform maintenance actions after the timeout obtained with
3879  * rtp_session_next_timeout() expired.
3880  *
3881  * This function will perform timeouts of receivers and senders, send a BYE
3882  * packet or generate RTCP packets with current session stats.
3883  *
3884  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
3885  * times, for each packet that should be processed.
3886  *
3887  * Returns: a #GstFlowReturn.
3888  */
3889 GstFlowReturn
3890 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
3891     guint64 ntpnstime, GstClockTime running_time)
3892 {
3893   GstFlowReturn result = GST_FLOW_OK;
3894   ReportData data = { GST_RTCP_BUFFER_INIT };
3895   GHashTable *table_copy;
3896   ReportOutput *output;
3897
3898   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3899
3900   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
3901       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3902       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
3903
3904   data.sess = sess;
3905   data.current_time = current_time;
3906   data.ntpnstime = ntpnstime;
3907   data.running_time = running_time;
3908   data.num_to_report = 0;
3909   data.may_suppress = FALSE;
3910   data.nacked_seqnums = 0;
3911   g_queue_init (&data.output);
3912
3913   RTP_SESSION_LOCK (sess);
3914   /* get a new interval, we need this for various cleanups etc */
3915   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
3916
3917   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
3918
3919   /* we need an internal source now */
3920   if (sess->stats.internal_sources == 0) {
3921     RTPSource *source;
3922     gboolean created;
3923
3924     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
3925         current_time);
3926     sess->internal_ssrc_set = TRUE;
3927
3928     if (created)
3929       on_new_sender_ssrc (sess, source);
3930
3931     g_object_unref (source);
3932   }
3933
3934   sess->conflicting_addresses =
3935       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
3936
3937   /* Make a local copy of the hashtable. We need to do this because the
3938    * cleanup stage below releases the session lock. */
3939   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
3940       (GDestroyNotify) g_object_unref);
3941   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3942       (GHFunc) clone_ssrcs_hashtable, table_copy);
3943
3944   /* Clean up the session, mark the source for removing, this might release the
3945    * session lock. */
3946   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
3947   g_hash_table_destroy (table_copy);
3948
3949   /* Now remove the marked sources */
3950   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
3951       (GHRFunc) remove_closing_sources, &data);
3952
3953   /* update point-to-point status */
3954   session_update_ptp (sess);
3955
3956   /* see if we need to generate SR or RR packets */
3957   if (!is_rtcp_time (sess, current_time, &data))
3958     goto done;
3959
3960   GST_DEBUG
3961       ("doing RTCP generation %u for %u sources, early %d, may suppress %d",
3962       sess->generation, data.num_to_report, data.is_early, data.may_suppress);
3963
3964   /* generate RTCP for all internal sources */
3965   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3966       (GHFunc) generate_rtcp, &data);
3967
3968   /* update the generation for all the sources that have been reported */
3969   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3970       (GHFunc) update_generation, &data);
3971
3972   /* we keep track of the last report time in order to timeout inactive
3973    * receivers or senders */
3974   if (!data.is_early) {
3975     GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %"
3976         GST_TIME_FORMAT " = %" GST_TIME_FORMAT,
3977         GST_TIME_ARGS (data.current_time),
3978         GST_TIME_ARGS (sess->last_rtcp_send_time),
3979         GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time));
3980     sess->last_rtcp_send_time = data.current_time;
3981   }
3982
3983   GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
3984       " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time),
3985       GST_TIME_ARGS (sess->last_rtcp_send_time),
3986       GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time));
3987   sess->last_rtcp_check_time = data.current_time;
3988   sess->first_rtcp = FALSE;
3989   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
3990   sess->scheduled_bye = FALSE;
3991
3992 done:
3993   RTP_SESSION_UNLOCK (sess);
3994
3995   /* push out the RTCP packets */
3996   while ((output = g_queue_pop_head (&data.output))) {
3997     gboolean do_not_suppress;
3998     GstBuffer *buffer = output->buffer;
3999     RTPSource *source = output->source;
4000
4001     /* Give the user a change to add its own packet */
4002     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
4003         buffer, data.is_early, &do_not_suppress);
4004
4005     if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
4006       guint packet_size;
4007
4008       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
4009
4010       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
4011       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
4012           sess->stats.avg_rtcp_packet_size, packet_size);
4013       result =
4014           sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
4015           sess->send_rtcp_user_data);
4016       sess->stats.nacks_sent += data.nacked_seqnums;
4017
4018       RTP_SESSION_LOCK (sess);
4019       on_sender_ssrc_active (sess, source);
4020       RTP_SESSION_UNLOCK (sess);
4021     } else {
4022       GST_DEBUG ("freeing packet callback: %p"
4023           " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp,
4024           do_not_suppress, data.may_suppress);
4025       sess->stats.nacks_dropped += data.nacked_seqnums;
4026       gst_buffer_unref (buffer);
4027     }
4028     g_object_unref (source);
4029     g_slice_free (ReportOutput, output);
4030   }
4031   return result;
4032 }
4033
4034 /**
4035  * rtp_session_request_early_rtcp:
4036  * @sess: an #RTPSession
4037  * @current_time: the current system time
4038  * @max_delay: maximum delay
4039  *
4040  * Request transmission of early RTCP
4041  *
4042  * Returns: %TRUE if the related RTCP can be scheduled.
4043  */
4044 gboolean
4045 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
4046     GstClockTime max_delay)
4047 {
4048   GstClockTime T_dither_max, T_rr, offset = 0;
4049   gboolean ret;
4050   gboolean allow_early;
4051
4052   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
4053
4054   RTP_SESSION_LOCK (sess);
4055
4056   /* We assume a feedback profile if something is requesting RTCP
4057    * to be sent */
4058   sess->rtp_profile = GST_RTP_PROFILE_AVPF;
4059
4060   /* Check if already requested */
4061   /*  RFC 4585 section 3.5.2 step 2 */
4062   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
4063     GST_LOG_OBJECT (sess, "already have next early rtcp time");
4064     ret = (current_time + max_delay > sess->next_early_rtcp_time);
4065     goto end;
4066   }
4067
4068   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
4069     GST_LOG_OBJECT (sess, "no next RTCP check time");
4070     ret = FALSE;
4071     goto end;
4072   }
4073
4074   /* RFC 4585 section 3.5.3 step 1
4075    * If no regular RTCP packet has been sent before, then a regular
4076    * RTCP packet has to be scheduled first and FB messages might be
4077    * included there
4078    */
4079   if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
4080     GST_LOG_OBJECT (sess, "no RTCP sent yet");
4081
4082     if (current_time + max_delay > sess->next_rtcp_check_time) {
4083       GST_LOG_OBJECT (sess,
4084           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4085           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4086           GST_TIME_ARGS (max_delay),
4087           GST_TIME_ARGS (sess->next_rtcp_check_time));
4088       ret = TRUE;
4089     } else {
4090       GST_LOG_OBJECT (sess,
4091           "can't allow early feedback, next scheduled time is too late %"
4092           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4093           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4094           GST_TIME_ARGS (sess->next_rtcp_check_time));
4095       ret = FALSE;
4096     }
4097     goto end;
4098   }
4099
4100   T_rr = sess->last_rtcp_interval;
4101
4102   /*  RFC 4585 section 3.5.2 step 2b */
4103   /* If the total sources is <=2, then there is only us and one peer */
4104   /* When there is one auxiliary stream the session can still do point
4105    * to point.
4106    */
4107   if (sess->is_doing_ptp) {
4108     T_dither_max = 0;
4109   } else {
4110     /* Divide by 2 because l = 0.5 */
4111     T_dither_max = T_rr;
4112     T_dither_max /= 2;
4113   }
4114
4115   /*  RFC 4585 section 3.5.2 step 3 */
4116   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
4117     GST_LOG_OBJECT (sess,
4118         "don't send because of dither, next scheduled time is too soon %"
4119         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
4120         GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
4121         GST_TIME_ARGS (sess->next_rtcp_check_time));
4122     ret = T_dither_max <= max_delay;
4123     goto end;
4124   }
4125
4126   /*  RFC 4585 section 3.5.2 step 4a and
4127    *  RFC 4585 section 3.5.2 step 6 */
4128   allow_early = FALSE;
4129   if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) {
4130     /* Last time we sent a full RTCP packet, we can now immediately
4131      * send an early one as allow_early was reset to TRUE */
4132     allow_early = TRUE;
4133   } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) {
4134     /* Last packet we sent was an early RTCP packet and more than
4135      * T_rr has passed since then, meaning we would have suppressed
4136      * a regular RTCP packet already and reset allow_early to TRUE */
4137     allow_early = TRUE;
4138
4139     /* We have to offset a bit as T_rr has not passed yet, but will before
4140      * max_delay */
4141     if (sess->last_rtcp_check_time + T_rr > current_time)
4142       offset = (sess->last_rtcp_check_time + T_rr) - current_time;
4143   } else {
4144     GST_DEBUG_OBJECT (sess,
4145         "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %"
4146         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %"
4147         GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time),
4148         GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr),
4149         GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay));
4150   }
4151
4152   if (!allow_early) {
4153     /* Ignore the request a scheduled packet will be in time anyway */
4154     if (current_time + max_delay > sess->next_rtcp_check_time) {
4155       GST_LOG_OBJECT (sess,
4156           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4157           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4158           GST_TIME_ARGS (max_delay),
4159           GST_TIME_ARGS (sess->next_rtcp_check_time));
4160       ret = TRUE;
4161     } else {
4162       GST_LOG_OBJECT (sess,
4163           "can't allow early feedback and next scheduled time is too late %"
4164           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4165           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4166           GST_TIME_ARGS (sess->next_rtcp_check_time));
4167       ret = FALSE;
4168     }
4169     goto end;
4170   }
4171
4172   /*  RFC 4585 section 3.5.2 step 4b */
4173   if (T_dither_max) {
4174     /* Schedule an early transmission later */
4175     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
4176         current_time + offset;
4177   } else {
4178     /* If no dithering, schedule it for NOW */
4179     sess->next_early_rtcp_time = current_time + offset;
4180   }
4181
4182   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
4183       ", next regular RTCP time %" GST_TIME_FORMAT,
4184       GST_TIME_ARGS (sess->next_early_rtcp_time),
4185       GST_TIME_ARGS (sess->next_rtcp_check_time));
4186   RTP_SESSION_UNLOCK (sess);
4187
4188   /* notify app of need to send packet early
4189    * and therefore of timeout change */
4190   if (sess->callbacks.reconsider)
4191     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
4192
4193   return TRUE;
4194
4195 end:
4196
4197   RTP_SESSION_UNLOCK (sess);
4198
4199   return ret;
4200 }
4201
4202 static gboolean
4203 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
4204 {
4205   GstClockTime now;
4206
4207   if (!sess->callbacks.send_rtcp)
4208     return FALSE;
4209
4210   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4211
4212   return rtp_session_request_early_rtcp (sess, now, max_delay);
4213 }
4214
4215 gboolean
4216 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
4217     gboolean fir, gint count)
4218 {
4219   RTPSource *src;
4220
4221   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
4222     GST_DEBUG ("FIR/PLI not sent");
4223     return FALSE;
4224   }
4225
4226   RTP_SESSION_LOCK (sess);
4227   src = find_source (sess, ssrc);
4228   if (src == NULL)
4229     goto no_source;
4230
4231   if (fir) {
4232     src->send_pli = FALSE;
4233     src->send_fir = TRUE;
4234
4235     if (count == -1 || count != src->last_fir_count)
4236       src->current_send_fir_seqnum++;
4237     src->last_fir_count = count;
4238   } else if (!src->send_fir) {
4239     src->send_pli = TRUE;
4240   }
4241   RTP_SESSION_UNLOCK (sess);
4242
4243   return TRUE;
4244
4245   /* ERRORS */
4246 no_source:
4247   {
4248     RTP_SESSION_UNLOCK (sess);
4249     return FALSE;
4250   }
4251 }
4252
4253 /**
4254  * rtp_session_request_nack:
4255  * @sess: a #RTPSession
4256  * @ssrc: the SSRC
4257  * @seqnum: the missing seqnum
4258  * @max_delay: max delay to request NACK
4259  *
4260  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
4261  *
4262  * Returns: %TRUE if the NACK feedback could be scheduled
4263  */
4264 gboolean
4265 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
4266     GstClockTime max_delay)
4267 {
4268   RTPSource *source;
4269
4270   if (!rtp_session_send_rtcp (sess, max_delay)) {
4271     GST_DEBUG ("NACK not sent");
4272     return FALSE;
4273   }
4274
4275   RTP_SESSION_LOCK (sess);
4276   source = find_source (sess, ssrc);
4277   if (source == NULL)
4278     goto no_source;
4279
4280   GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
4281   rtp_source_register_nack (source, seqnum);
4282   RTP_SESSION_UNLOCK (sess);
4283
4284   return TRUE;
4285
4286   /* ERRORS */
4287 no_source:
4288   {
4289     RTP_SESSION_UNLOCK (sess);
4290     return FALSE;
4291   }
4292 }