rtp/rtpmanager: Fix coverity issues
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25 #include <stdlib.h>
26
27 #include <gst/rtp/gstrtpbuffer.h>
28 #include <gst/rtp/gstrtcpbuffer.h>
29
30 #include <gst/glib-compat-private.h>
31
32 #include "rtpsession.h"
33 #include "gstrtputils.h"
34
35 GST_DEBUG_CATEGORY (rtp_session_debug);
36 #define GST_CAT_DEFAULT rtp_session_debug
37
38 /* signals and args */
39 enum
40 {
41   SIGNAL_GET_SOURCE_BY_SSRC,
42   SIGNAL_ON_NEW_SSRC,
43   SIGNAL_ON_SSRC_COLLISION,
44   SIGNAL_ON_SSRC_VALIDATED,
45   SIGNAL_ON_SSRC_ACTIVE,
46   SIGNAL_ON_SSRC_SDES,
47   SIGNAL_ON_BYE_SSRC,
48   SIGNAL_ON_BYE_TIMEOUT,
49   SIGNAL_ON_TIMEOUT,
50   SIGNAL_ON_SENDER_TIMEOUT,
51   SIGNAL_ON_SENDING_RTCP,
52   SIGNAL_ON_APP_RTCP,
53   SIGNAL_ON_FEEDBACK_RTCP,
54   SIGNAL_SEND_RTCP,
55   SIGNAL_SEND_RTCP_FULL,
56   SIGNAL_ON_RECEIVING_RTCP,
57   SIGNAL_ON_NEW_SENDER_SSRC,
58   SIGNAL_ON_SENDER_SSRC_ACTIVE,
59   SIGNAL_ON_SENDING_NACKS,
60   LAST_SIGNAL
61 };
62
63 #define DEFAULT_INTERNAL_SOURCE      NULL
64 #define DEFAULT_BANDWIDTH            0.0
65 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
66 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
67 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
68 #define DEFAULT_RTCP_MTU             1400
69 #define DEFAULT_SDES                 NULL
70 #define DEFAULT_NUM_SOURCES          0
71 #define DEFAULT_NUM_ACTIVE_SOURCES   0
72 #define DEFAULT_SOURCES              NULL
73 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
74 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
75 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
76 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
77 #define DEFAULT_MAX_DROPOUT_TIME     60000
78 #define DEFAULT_MAX_MISORDER_TIME    2000
79 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
80 #define DEFAULT_RTCP_REDUCED_SIZE    FALSE
81 #define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE
82 #define DEFAULT_FAVOR_NEW            FALSE
83 #define DEFAULT_TWCC_FEEDBACK_INTERVAL GST_CLOCK_TIME_NONE
84 #define DEFAULT_UPDATE_NTP64_HEADER_EXT TRUE
85
86 enum
87 {
88   PROP_0,
89   PROP_INTERNAL_SSRC,
90   PROP_INTERNAL_SOURCE,
91   PROP_BANDWIDTH,
92   PROP_RTCP_FRACTION,
93   PROP_RTCP_RR_BANDWIDTH,
94   PROP_RTCP_RS_BANDWIDTH,
95   PROP_RTCP_MTU,
96   PROP_SDES,
97   PROP_NUM_SOURCES,
98   PROP_NUM_ACTIVE_SOURCES,
99   PROP_SOURCES,
100   PROP_FAVOR_NEW,
101   PROP_RTCP_MIN_INTERVAL,
102   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
103   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
104   PROP_PROBATION,
105   PROP_MAX_DROPOUT_TIME,
106   PROP_MAX_MISORDER_TIME,
107   PROP_STATS,
108   PROP_RTP_PROFILE,
109   PROP_RTCP_REDUCED_SIZE,
110   PROP_RTCP_DISABLE_SR_TIMESTAMP,
111   PROP_TWCC_FEEDBACK_INTERVAL,
112   PROP_UPDATE_NTP64_HEADER_EXT,
113   PROP_LAST,
114 };
115
116 static GParamSpec *properties[PROP_LAST];
117
118 /* update average packet size */
119 #define INIT_AVG(avg, val) \
120    (avg) = (val);
121 #define UPDATE_AVG(avg, val)            \
122   if ((avg) == 0)                       \
123    (avg) = (val);                       \
124   else                                  \
125    (avg) = ((val) + (15 * (avg))) >> 4;
126
127 /* GObject vmethods */
128 static void rtp_session_finalize (GObject * object);
129 static void rtp_session_set_property (GObject * object, guint prop_id,
130     const GValue * value, GParamSpec * pspec);
131 static void rtp_session_get_property (GObject * object, guint prop_id,
132     GValue * value, GParamSpec * pspec);
133
134 static gboolean rtp_session_send_rtcp (RTPSession * sess,
135     GstClockTime max_delay);
136 static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
137     GstClockTime deadline);
138
139 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
140
141 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
142
143 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
144 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
145     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
146 static RTPSource *obtain_internal_source (RTPSession * sess,
147     guint32 ssrc, gboolean * created, GstClockTime current_time);
148 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
149     GstClockTime current_time);
150 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
151     gboolean deterministic, gboolean first);
152
153 static gboolean
154 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
155     const GValue * handler_return, gpointer data)
156 {
157   if (g_value_get_boolean (handler_return))
158     g_value_set_boolean (return_accu, TRUE);
159
160   return TRUE;
161 }
162
163 static void
164 rtp_session_class_init (RTPSessionClass * klass)
165 {
166   GObjectClass *gobject_class;
167
168   gobject_class = (GObjectClass *) klass;
169
170   gobject_class->finalize = rtp_session_finalize;
171   gobject_class->set_property = rtp_session_set_property;
172   gobject_class->get_property = rtp_session_get_property;
173
174   /**
175    * RTPSession::get-source-by-ssrc:
176    * @session: the object which received the signal
177    * @ssrc: the SSRC of the RTPSource
178    *
179    * Request the #RTPSource object with SSRC @ssrc in @session.
180    */
181   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
182       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
183       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
184           get_source_by_ssrc), NULL, NULL, NULL,
185       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
186
187   /**
188    * RTPSession::on-new-ssrc:
189    * @session: the object which received the signal
190    * @src: the new RTPSource
191    *
192    * Notify of a new SSRC that entered @session.
193    */
194   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
195       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
196       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
197       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
198   /**
199    * RTPSession::on-ssrc-collision:
200    * @session: the object which received the signal
201    * @src: the #RTPSource that caused a collision
202    *
203    * Notify when we have an SSRC collision
204    */
205   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
206       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
207       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
208       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
209   /**
210    * RTPSession::on-ssrc-validated:
211    * @session: the object which received the signal
212    * @src: the new validated RTPSource
213    *
214    * Notify of a new SSRC that became validated.
215    */
216   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
217       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
218       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
219       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
220   /**
221    * RTPSession::on-ssrc-active:
222    * @session: the object which received the signal
223    * @src: the active RTPSource
224    *
225    * Notify of a SSRC that is active, i.e., sending RTCP.
226    */
227   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
228       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
229       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
230       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
231   /**
232    * RTPSession::on-ssrc-sdes:
233    * @session: the object which received the signal
234    * @src: the RTPSource
235    *
236    * Notify that a new SDES was received for SSRC.
237    */
238   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
239       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
241       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
242   /**
243    * RTPSession::on-bye-ssrc:
244    * @session: the object which received the signal
245    * @src: the RTPSource that went away
246    *
247    * Notify of an SSRC that became inactive because of a BYE packet.
248    */
249   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
250       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
251       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
252       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
253   /**
254    * RTPSession::on-bye-timeout:
255    * @session: the object which received the signal
256    * @src: the RTPSource that timed out
257    *
258    * Notify of an SSRC that has timed out because of BYE
259    */
260   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
261       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
262       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
263       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
264   /**
265    * RTPSession::on-timeout:
266    * @session: the object which received the signal
267    * @src: the RTPSource that timed out
268    *
269    * Notify of an SSRC that has timed out
270    */
271   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
272       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
273       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
274       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
275   /**
276    * RTPSession::on-sender-timeout:
277    * @session: the object which received the signal
278    * @src: the RTPSource that timed out
279    *
280    * Notify of an SSRC that was a sender but timed out and became a receiver.
281    */
282   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
283       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
284       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
285       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
286
287   /**
288    * RTPSession::on-sending-rtcp
289    * @session: the object which received the signal
290    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
291    * @early: %TRUE if the packet is early, %FALSE if it is regular
292    *
293    * This signal is emitted before sending an RTCP packet, it can be used
294    * to add extra RTCP Packets.
295    *
296    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
297    * if suppressing it is acceptable
298    */
299   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
300       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
301       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
302       accumulate_trues, NULL, NULL, G_TYPE_BOOLEAN, 2,
303       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
304
305   /**
306    * RTPSession::on-app-rtcp:
307    * @session: the object which received the signal
308    * @subtype: The subtype of the packet
309    * @ssrc: The SSRC/CSRC of the packet
310    * @name: The name of the packet
311    * @data: a #GstBuffer with the application-dependant data or %NULL if
312    * there was no data
313    *
314    * Notify that a RTCP APP packet has been received
315    */
316   rtp_session_signals[SIGNAL_ON_APP_RTCP] =
317       g_signal_new ("on-app-rtcp", G_TYPE_FROM_CLASS (klass),
318       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_app_rtcp),
319       NULL, NULL, NULL, G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT,
320       G_TYPE_STRING, GST_TYPE_BUFFER);
321
322   /**
323    * RTPSession::on-feedback-rtcp:
324    * @session: the object which received the signal
325    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
326    *  %GST_RTCP_TYPE_RTPFB
327    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
328    * @sender_ssrc: The SSRC of the sender
329    * @media_ssrc: The SSRC of the media this refers to
330    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
331    * there was no FCI
332    *
333    * Notify that a RTCP feedback packet has been received
334    */
335   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
336       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
337       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
338       NULL, NULL, NULL, G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
339       G_TYPE_UINT, GST_TYPE_BUFFER);
340
341   /**
342    * RTPSession::send-rtcp:
343    * @session: the object which received the signal
344    * @max_delay: The maximum delay after which the feedback will not be useful
345    *  anymore
346    *
347    * Requests that the #RTPSession initiate a new RTCP packet as soon as
348    * possible within the requested delay.
349    *
350    * This sets feedback to %TRUE if not already done before.
351    */
352   rtp_session_signals[SIGNAL_SEND_RTCP] =
353       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
354       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
355       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
356       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
357
358   /**
359    * RTPSession::send-rtcp-full:
360    * @session: the object which received the signal
361    * @max_delay: The maximum delay after which the feedback will not be useful
362    *  anymore
363    *
364    * Requests that the #RTPSession initiate a new RTCP packet as soon as
365    * possible within the requested delay.
366    *
367    * This sets feedback to %TRUE if not already done before.
368    *
369    * Returns: TRUE if the new RTCP packet could be scheduled within the
370    * requested delay, FALSE otherwise.
371    *
372    * Since: 1.6
373    */
374   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
375       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
376       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
377       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
378       NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
379
380   /**
381    * RTPSession::on-receiving-rtcp
382    * @session: the object which received the signal
383    * @buffer: the #GstBuffer containing the RTCP packet that was received
384    *
385    * This signal is emitted when receiving an RTCP packet before it is handled
386    * by the session. It can be used to extract custom information from RTCP packets.
387    *
388    * Since: 1.6
389    */
390   rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
391       g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
392       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
393       NULL, NULL, NULL, G_TYPE_NONE, 1,
394       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
395
396   /**
397    * RTPSession::on-new-sender-ssrc:
398    * @session: the object which received the signal
399    * @src: the new sender RTPSource
400    *
401    * Notify of a new sender SSRC that entered @session.
402    *
403    * Since: 1.8
404    */
405   rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
406       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
407       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc),
408       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
409
410   /**
411    * RTPSession::on-sender-ssrc-active:
412    * @session: the object which received the signal
413    * @src: the active sender RTPSource
414    *
415    * Notify of a sender SSRC that is active, i.e., sending RTCP.
416    *
417    * Since: 1.8
418    */
419   rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
420       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
421       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass,
422           on_sender_ssrc_active), NULL, NULL, NULL,
423       G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
424
425   /**
426    * RTPSession::on-sending-nack
427    * @session: the object which received the signal
428    * @sender_ssrc: the sender ssrc
429    * @media_ssrc: the media ssrc
430    * @nacks: (element-type guint16): the list of seqnum to be nacked
431    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
432    *
433    * This signal is emitted before NACK packets are added into the RTCP
434    * packet. This signal can be used to override the conversion of the NACK
435    * seqnum array into packets. This can be used if your protocol uses
436    * different type of NACK (e.g. based on RTCP APP).
437    *
438    * The handler should transform the seqnum from @nacks array into packets.
439    * @nacks seqnum must be consumed from the start. The remaining will be
440    * rescheduled for later base on bandwidth. Only one handler will be
441    * signalled.
442    *
443    * A handler may return 0 to signal that generic NACKs should be created
444    * for this set. This can be useful if the signal is used for other purpose
445    * or if the other type of NACK would use more space.
446    *
447    * Returns: the number of NACK seqnum that was consumed from @nacks.
448    *
449    * Since: 1.16
450    */
451   rtp_session_signals[SIGNAL_ON_SENDING_NACKS] =
452       g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass),
453       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks),
454       g_signal_accumulator_first_wins, NULL, NULL,
455       G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY,
456       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
457
458   properties[PROP_INTERNAL_SSRC] =
459       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
460       "The internal SSRC used for the session (deprecated)",
461       0, G_MAXUINT, 0,
462       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | GST_PARAM_DOC_SHOW_DEFAULT);
463
464   properties[PROP_INTERNAL_SOURCE] =
465       g_param_spec_object ("internal-source", "Internal Source",
466       "The internal source element of the session (deprecated)",
467       RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
468
469   properties[PROP_BANDWIDTH] =
470       g_param_spec_double ("bandwidth", "Bandwidth",
471       "The bandwidth of the session in bits per second (0 for auto-discover)",
472       0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
473       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
474
475   properties[PROP_RTCP_FRACTION] =
476       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
477       "The fraction of the bandwidth used for RTCP in bits per second (or as a real fraction of the RTP bandwidth if < 1)",
478       0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
479       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
480
481   properties[PROP_RTCP_RR_BANDWIDTH] =
482       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
483       "The RTCP bandwidth used for receivers in bits per second (-1 = default)",
484       -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
485       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
486
487   properties[PROP_RTCP_RS_BANDWIDTH] =
488       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
489       "The RTCP bandwidth used for senders in bits per second (-1 = default)",
490       -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
491       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
492
493   properties[PROP_RTCP_MTU] =
494       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
495       "The maximum size of the RTCP packets",
496       16, G_MAXINT16, DEFAULT_RTCP_MTU,
497       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
498
499   properties[PROP_SDES] =
500       g_param_spec_boxed ("sdes", "SDES",
501       "The SDES items of this session",
502       GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
503       | GST_PARAM_DOC_SHOW_DEFAULT);
504
505   properties[PROP_NUM_SOURCES] =
506       g_param_spec_uint ("num-sources", "Num Sources",
507       "The number of sources in the session", 0, G_MAXUINT,
508       DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
509
510   properties[PROP_NUM_ACTIVE_SOURCES] =
511       g_param_spec_uint ("num-active-sources", "Num Active Sources",
512       "The number of active sources in the session", 0, G_MAXUINT,
513       DEFAULT_NUM_ACTIVE_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
514   /**
515    * RTPSource:sources
516    *
517    * Get a GValue Array of all sources in the session.
518    *
519    * ## Getting the #RTPSources of a session
520    *
521    * ``` C
522    * {
523    *   GValueArray *arr;
524    *   GValue *val;
525    *   guint i;
526    *
527    *   g_object_get (sess, "sources", &arr, NULL);
528    *
529    *   for (i = 0; i < arr->n_values; i++) {
530    *     RTPSource *source;
531    *
532    *     val = g_value_array_get_nth (arr, i);
533    *     source = g_value_get_object (val);
534    *   }
535    *   g_value_array_free (arr);
536    * }
537    * ```
538    */
539   properties[PROP_SOURCES] =
540       g_param_spec_boxed ("sources", "Sources",
541       "An array of all known sources in the session",
542       G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
543
544   properties[PROP_FAVOR_NEW] =
545       g_param_spec_boolean ("favor-new", "Favor new sources",
546       "Resolve SSRC conflict in favor of new sources", DEFAULT_FAVOR_NEW,
547       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
548
549   properties[PROP_RTCP_MIN_INTERVAL] =
550       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
551       "Minimum interval between Regular RTCP packet (in ns)",
552       0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
553       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
554
555   properties[PROP_RTCP_FEEDBACK_RETENTION_WINDOW] =
556       g_param_spec_uint64 ("rtcp-feedback-retention-window",
557       "RTCP Feedback retention window",
558       "Duration during which RTCP Feedback packets are retained (in ns)",
559       0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
560       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
561
562   properties[PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD] =
563       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
564       "RTCP Immediate Feedback threshold",
565       "The maximum number of members of a RTP session for which immediate"
566       " feedback is used (DEPRECATED: has no effect and is not needed)",
567       0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
568       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED);
569
570   properties[PROP_PROBATION] =
571       g_param_spec_uint ("probation", "Number of probations",
572       "Consecutive packet sequence numbers to accept the source",
573       0, G_MAXUINT, DEFAULT_PROBATION,
574       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
575
576   properties[PROP_MAX_DROPOUT_TIME] =
577       g_param_spec_uint ("max-dropout-time", "Max dropout time",
578       "The maximum time (milliseconds) of missing packets tolerated.",
579       0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
580       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
581
582   properties[PROP_MAX_MISORDER_TIME] =
583       g_param_spec_uint ("max-misorder-time", "Max misorder time",
584       "The maximum time (milliseconds) of misordered packets tolerated.",
585       0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
586       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
587
588   /**
589    * RTPSession:stats:
590    *
591    * Various session statistics. This property returns a GstStructure
592    * with name application/x-rtp-session-stats with the following fields:
593    *
594    * * "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
595    *      dropped (due to bandwidth constraints)
596    * *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
597    * *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
598    * *  "source-stats"    G_TYPE_BOXED  GValueArray of #RTPSource:stats for all
599    *      RTP sources (Since 1.8)
600    *
601    * Since: 1.4
602    */
603   properties[PROP_STATS] =
604       g_param_spec_boxed ("stats", "Statistics",
605       "Various statistics", GST_TYPE_STRUCTURE,
606       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
607
608   properties[PROP_RTP_PROFILE] =
609       g_param_spec_enum ("rtp-profile", "RTP Profile",
610       "RTP profile to use for this session", GST_TYPE_RTP_PROFILE,
611       DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
612
613   properties[PROP_RTCP_REDUCED_SIZE] =
614       g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size",
615       "Use Reduced Size RTCP for feedback packets",
616       DEFAULT_RTCP_REDUCED_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
617
618   /**
619    * RTPSession:disable-sr-timestamp:
620    *
621    * Whether sender reports should be timestamped.
622    *
623    * Since: 1.16
624    */
625   properties[PROP_RTCP_DISABLE_SR_TIMESTAMP] =
626       g_param_spec_boolean ("disable-sr-timestamp",
627       "Disable Sender Report Timestamp",
628       "Whether sender reports should be timestamped",
629       DEFAULT_RTCP_DISABLE_SR_TIMESTAMP,
630       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
631
632   /**
633    * RTPSession:twcc-feedback-interval:
634    *
635    * The interval to send TWCC reports on.
636    * This overrides the default behavior of sending reports
637    * based on marker-bits.
638    *
639    * Since: 1.20
640    */
641   properties[PROP_TWCC_FEEDBACK_INTERVAL] =
642       g_param_spec_uint64 ("twcc-feedback-interval",
643       "TWCC Feedback Interval",
644       "The interval to send TWCC reports on",
645       0, G_MAXUINT64, DEFAULT_TWCC_FEEDBACK_INTERVAL,
646       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
647
648   /**
649    * RTPSession:update-ntp64-header-ext:
650    *
651    * Whether RTP NTP header extension should be updated with actual
652    * NTP time. If not, use the NTP time from buffer timestamp metadata
653    *
654    * Since: 1.22
655    */
656   properties[PROP_UPDATE_NTP64_HEADER_EXT] =
657       g_param_spec_boolean ("update-ntp64-header-ext",
658       "Update NTP-64 RTP Header Extension",
659       "Whether RTP NTP header extension should be updated with actual NTP time",
660       DEFAULT_UPDATE_NTP64_HEADER_EXT,
661       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
662
663   g_object_class_install_properties (gobject_class, PROP_LAST, properties);
664
665   klass->get_source_by_ssrc =
666       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
667   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
668
669   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
670 }
671
672 static void
673 rtp_session_init (RTPSession * sess)
674 {
675   gint i;
676   gchar *str;
677
678   g_mutex_init (&sess->lock);
679   sess->key = g_random_int ();
680   sess->mask_idx = 0;
681   sess->mask = 0;
682
683   /* TODO: We currently only use the first hash table but this is the
684    * beginning of an implementation for RFC2762
685    for (i = 0; i < 32; i++) {
686    */
687   for (i = 0; i < 1; i++) {
688     sess->ssrcs[i] =
689         g_hash_table_new_full (NULL, NULL, NULL,
690         (GDestroyNotify) g_object_unref);
691   }
692
693   rtp_stats_init_defaults (&sess->stats);
694   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
695   rtp_stats_set_min_interval (&sess->stats,
696       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
697
698   sess->recalc_bandwidth = TRUE;
699   sess->bandwidth = DEFAULT_BANDWIDTH;
700   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
701   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
702   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
703
704   /* default UDP header length */
705   sess->header_len = UDP_IP_HEADER_OVERHEAD;
706   sess->mtu = DEFAULT_RTCP_MTU;
707
708   sess->update_ntp64_header_ext = DEFAULT_UPDATE_NTP64_HEADER_EXT;
709
710   sess->probation = DEFAULT_PROBATION;
711   sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
712   sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
713   sess->favor_new = DEFAULT_FAVOR_NEW;
714
715   /* some default SDES entries */
716   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
717
718   /* we do not want to leak details like the username or hostname here */
719   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
720   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
721   g_free (str);
722
723 #if 0
724   /* we do not want to leak the user's real name here */
725   str = g_strdup_printf ("Anon%u", g_random_int ());
726   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
727   g_free (str);
728 #endif
729
730   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
731
732   /* this is the SSRC we suggest */
733   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
734   sess->internal_ssrc_set = FALSE;
735
736   sess->first_rtcp = TRUE;
737   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
738   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
739   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
740   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
741
742   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
743   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
744   sess->rtcp_immediate_feedback_threshold =
745       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
746   sess->rtp_profile = DEFAULT_RTP_PROFILE;
747   sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
748   sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
749
750   sess->is_doing_ptp = TRUE;
751
752   sess->twcc = rtp_twcc_manager_new (sess->mtu);
753   sess->twcc_stats = rtp_twcc_stats_new ();
754 }
755
756 static void
757 rtp_session_finalize (GObject * object)
758 {
759   RTPSession *sess;
760   gint i;
761
762   sess = RTP_SESSION_CAST (object);
763
764   gst_structure_free (sess->sdes);
765
766   g_list_free_full (sess->conflicting_addresses,
767       (GDestroyNotify) rtp_conflicting_address_free);
768
769   /* TODO: Change this again when implementing RFC 2762
770    * for (i = 0; i < 32; i++)
771    */
772   for (i = 0; i < 1; i++)
773     g_hash_table_destroy (sess->ssrcs[i]);
774
775   g_object_unref (sess->twcc);
776   rtp_twcc_stats_free (sess->twcc_stats);
777
778   g_mutex_clear (&sess->lock);
779
780   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
781 }
782
783 static void
784 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
785 {
786   GValue value = { 0 };
787
788   g_value_init (&value, RTP_TYPE_SOURCE);
789   g_value_take_object (&value, source);
790   /* copies the value */
791   g_value_array_append (arr, &value);
792 }
793
794 static GValueArray *
795 rtp_session_create_sources (RTPSession * sess)
796 {
797   GValueArray *res;
798   guint size;
799
800   RTP_SESSION_LOCK (sess);
801   /* get number of elements in the table */
802   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
803   /* create the result value array */
804   res = g_value_array_new (size);
805
806   /* and copy all values into the array */
807   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
808   RTP_SESSION_UNLOCK (sess);
809
810   return res;
811 }
812
813 static void
814 create_source_stats (gpointer key, RTPSource * source, GValueArray * arr)
815 {
816   GValue *value;
817   GstStructure *s;
818
819   g_object_get (source, "stats", &s, NULL);
820
821   g_value_array_append (arr, NULL);
822   value = g_value_array_get_nth (arr, arr->n_values - 1);
823   g_value_init (value, GST_TYPE_STRUCTURE);
824   g_value_take_boxed (value, s);
825 }
826
827 static GstStructure *
828 rtp_session_create_stats (RTPSession * sess)
829 {
830   GstStructure *s;
831   GValueArray *source_stats;
832   GValue source_stats_v = G_VALUE_INIT;
833   guint size;
834
835   RTP_SESSION_LOCK (sess);
836   s = gst_structure_new ("application/x-rtp-session-stats",
837       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
838       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
839       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
840
841   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
842   source_stats = g_value_array_new (size);
843   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
844       (GHFunc) create_source_stats, source_stats);
845   RTP_SESSION_UNLOCK (sess);
846
847   g_value_init (&source_stats_v, G_TYPE_VALUE_ARRAY);
848   g_value_take_boxed (&source_stats_v, source_stats);
849   gst_structure_take_value (s, "source-stats", &source_stats_v);
850
851   return s;
852 }
853
854 static void
855 rtp_session_set_property (GObject * object, guint prop_id,
856     const GValue * value, GParamSpec * pspec)
857 {
858   RTPSession *sess;
859
860   sess = RTP_SESSION (object);
861
862   switch (prop_id) {
863     case PROP_INTERNAL_SSRC:
864       RTP_SESSION_LOCK (sess);
865       sess->suggested_ssrc = g_value_get_uint (value);
866       sess->internal_ssrc_set = TRUE;
867       sess->internal_ssrc_from_caps_or_property = TRUE;
868       RTP_SESSION_UNLOCK (sess);
869       if (sess->callbacks.reconfigure)
870         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
871       break;
872     case PROP_BANDWIDTH:
873       RTP_SESSION_LOCK (sess);
874       sess->bandwidth = g_value_get_double (value);
875       sess->recalc_bandwidth = TRUE;
876       RTP_SESSION_UNLOCK (sess);
877       break;
878     case PROP_RTCP_FRACTION:
879       RTP_SESSION_LOCK (sess);
880       sess->rtcp_bandwidth = g_value_get_double (value);
881       sess->recalc_bandwidth = TRUE;
882       RTP_SESSION_UNLOCK (sess);
883       break;
884     case PROP_RTCP_RR_BANDWIDTH:
885       RTP_SESSION_LOCK (sess);
886       sess->rtcp_rr_bandwidth = g_value_get_int (value);
887       sess->recalc_bandwidth = TRUE;
888       RTP_SESSION_UNLOCK (sess);
889       break;
890     case PROP_RTCP_RS_BANDWIDTH:
891       RTP_SESSION_LOCK (sess);
892       sess->rtcp_rs_bandwidth = g_value_get_int (value);
893       sess->recalc_bandwidth = TRUE;
894       RTP_SESSION_UNLOCK (sess);
895       break;
896     case PROP_RTCP_MTU:
897       sess->mtu = g_value_get_uint (value);
898       rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu);
899       break;
900     case PROP_SDES:
901       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
902       break;
903     case PROP_FAVOR_NEW:
904       sess->favor_new = g_value_get_boolean (value);
905       break;
906     case PROP_RTCP_MIN_INTERVAL:
907       rtp_stats_set_min_interval (&sess->stats,
908           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
909       /* trigger reconsideration */
910       RTP_SESSION_LOCK (sess);
911       sess->next_rtcp_check_time = 0;
912       RTP_SESSION_UNLOCK (sess);
913       if (sess->callbacks.reconsider)
914         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
915       break;
916     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
917       sess->rtcp_feedback_retention_window = g_value_get_uint64 (value);
918       break;
919     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
920       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
921       break;
922     case PROP_PROBATION:
923       sess->probation = g_value_get_uint (value);
924       break;
925     case PROP_MAX_DROPOUT_TIME:
926       sess->max_dropout_time = g_value_get_uint (value);
927       break;
928     case PROP_MAX_MISORDER_TIME:
929       sess->max_misorder_time = g_value_get_uint (value);
930       break;
931     case PROP_RTP_PROFILE:
932       sess->rtp_profile = g_value_get_enum (value);
933       /* trigger reconsideration */
934       RTP_SESSION_LOCK (sess);
935       sess->next_rtcp_check_time = 0;
936       RTP_SESSION_UNLOCK (sess);
937       if (sess->callbacks.reconsider)
938         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
939       break;
940     case PROP_RTCP_REDUCED_SIZE:
941       sess->reduced_size_rtcp = g_value_get_boolean (value);
942       break;
943     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
944       sess->timestamp_sender_reports = !g_value_get_boolean (value);
945       break;
946     case PROP_TWCC_FEEDBACK_INTERVAL:
947       rtp_twcc_manager_set_feedback_interval (sess->twcc,
948           g_value_get_uint64 (value));
949       break;
950     case PROP_UPDATE_NTP64_HEADER_EXT:
951       sess->update_ntp64_header_ext = g_value_get_boolean (value);
952       break;
953     default:
954       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
955       break;
956   }
957 }
958
959 static void
960 rtp_session_get_property (GObject * object, guint prop_id,
961     GValue * value, GParamSpec * pspec)
962 {
963   RTPSession *sess;
964
965   sess = RTP_SESSION (object);
966
967   switch (prop_id) {
968     case PROP_INTERNAL_SSRC:
969       g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL));
970       break;
971     case PROP_INTERNAL_SOURCE:
972       /* FIXME, return a random source */
973       g_value_set_object (value, NULL);
974       break;
975     case PROP_BANDWIDTH:
976       g_value_set_double (value, sess->bandwidth);
977       break;
978     case PROP_RTCP_FRACTION:
979       g_value_set_double (value, sess->rtcp_bandwidth);
980       break;
981     case PROP_RTCP_RR_BANDWIDTH:
982       g_value_set_int (value, sess->rtcp_rr_bandwidth);
983       break;
984     case PROP_RTCP_RS_BANDWIDTH:
985       g_value_set_int (value, sess->rtcp_rs_bandwidth);
986       break;
987     case PROP_RTCP_MTU:
988       g_value_set_uint (value, sess->mtu);
989       break;
990     case PROP_SDES:
991       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
992       break;
993     case PROP_NUM_SOURCES:
994       g_value_set_uint (value, rtp_session_get_num_sources (sess));
995       break;
996     case PROP_NUM_ACTIVE_SOURCES:
997       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
998       break;
999     case PROP_SOURCES:
1000       g_value_take_boxed (value, rtp_session_create_sources (sess));
1001       break;
1002     case PROP_FAVOR_NEW:
1003       g_value_set_boolean (value, sess->favor_new);
1004       break;
1005     case PROP_RTCP_MIN_INTERVAL:
1006       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
1007       break;
1008     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
1009       g_value_set_uint64 (value, sess->rtcp_feedback_retention_window);
1010       break;
1011     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
1012       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
1013       break;
1014     case PROP_PROBATION:
1015       g_value_set_uint (value, sess->probation);
1016       break;
1017     case PROP_MAX_DROPOUT_TIME:
1018       g_value_set_uint (value, sess->max_dropout_time);
1019       break;
1020     case PROP_MAX_MISORDER_TIME:
1021       g_value_set_uint (value, sess->max_misorder_time);
1022       break;
1023     case PROP_STATS:
1024       g_value_take_boxed (value, rtp_session_create_stats (sess));
1025       break;
1026     case PROP_RTP_PROFILE:
1027       g_value_set_enum (value, sess->rtp_profile);
1028       break;
1029     case PROP_RTCP_REDUCED_SIZE:
1030       g_value_set_boolean (value, sess->reduced_size_rtcp);
1031       break;
1032     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
1033       g_value_set_boolean (value, !sess->timestamp_sender_reports);
1034       break;
1035     case PROP_TWCC_FEEDBACK_INTERVAL:
1036       g_value_set_uint64 (value,
1037           rtp_twcc_manager_get_feedback_interval (sess->twcc));
1038       break;
1039     case PROP_UPDATE_NTP64_HEADER_EXT:
1040       g_value_set_boolean (value, sess->update_ntp64_header_ext);
1041       break;
1042     default:
1043       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1044       break;
1045   }
1046 }
1047
1048 static void
1049 on_new_ssrc (RTPSession * sess, RTPSource * source)
1050 {
1051   g_object_ref (source);
1052   RTP_SESSION_UNLOCK (sess);
1053   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
1054   RTP_SESSION_LOCK (sess);
1055   g_object_unref (source);
1056 }
1057
1058 static void
1059 on_ssrc_collision (RTPSession * sess, RTPSource * source)
1060 {
1061   g_object_ref (source);
1062   RTP_SESSION_UNLOCK (sess);
1063   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
1064       source);
1065   RTP_SESSION_LOCK (sess);
1066   g_object_unref (source);
1067 }
1068
1069 static void
1070 on_ssrc_validated (RTPSession * sess, RTPSource * source)
1071 {
1072   g_object_ref (source);
1073   RTP_SESSION_UNLOCK (sess);
1074   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
1075       source);
1076   RTP_SESSION_LOCK (sess);
1077   g_object_unref (source);
1078 }
1079
1080 static void
1081 on_ssrc_active (RTPSession * sess, RTPSource * source)
1082 {
1083   g_object_ref (source);
1084   RTP_SESSION_UNLOCK (sess);
1085   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
1086   RTP_SESSION_LOCK (sess);
1087   g_object_unref (source);
1088 }
1089
1090 static void
1091 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
1092 {
1093   g_object_ref (source);
1094   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
1095   RTP_SESSION_UNLOCK (sess);
1096   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
1097   RTP_SESSION_LOCK (sess);
1098   g_object_unref (source);
1099 }
1100
1101 static void
1102 on_bye_ssrc (RTPSession * sess, RTPSource * source)
1103 {
1104   g_object_ref (source);
1105   RTP_SESSION_UNLOCK (sess);
1106   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
1107   RTP_SESSION_LOCK (sess);
1108   g_object_unref (source);
1109 }
1110
1111 static void
1112 on_bye_timeout (RTPSession * sess, RTPSource * source)
1113 {
1114   g_object_ref (source);
1115   RTP_SESSION_UNLOCK (sess);
1116   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
1117   RTP_SESSION_LOCK (sess);
1118   g_object_unref (source);
1119 }
1120
1121 static void
1122 on_timeout (RTPSession * sess, RTPSource * source)
1123 {
1124   g_object_ref (source);
1125   RTP_SESSION_UNLOCK (sess);
1126   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
1127   RTP_SESSION_LOCK (sess);
1128   g_object_unref (source);
1129 }
1130
1131 static void
1132 on_sender_timeout (RTPSession * sess, RTPSource * source)
1133 {
1134   g_object_ref (source);
1135   RTP_SESSION_UNLOCK (sess);
1136   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
1137       source);
1138   RTP_SESSION_LOCK (sess);
1139   g_object_unref (source);
1140 }
1141
1142 static void
1143 on_new_sender_ssrc (RTPSession * sess, RTPSource * source)
1144 {
1145   g_object_ref (source);
1146   RTP_SESSION_UNLOCK (sess);
1147   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
1148       source);
1149   RTP_SESSION_LOCK (sess);
1150   g_object_unref (source);
1151 }
1152
1153 static void
1154 on_sender_ssrc_active (RTPSession * sess, RTPSource * source)
1155 {
1156   g_object_ref (source);
1157   RTP_SESSION_UNLOCK (sess);
1158   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
1159       source);
1160   RTP_SESSION_LOCK (sess);
1161   g_object_unref (source);
1162 }
1163
1164 /**
1165  * rtp_session_new:
1166  *
1167  * Create a new session object.
1168  *
1169  * Returns: a new #RTPSession. g_object_unref() after usage.
1170  */
1171 RTPSession *
1172 rtp_session_new (void)
1173 {
1174   RTPSession *sess;
1175
1176   sess = g_object_new (RTP_TYPE_SESSION, NULL);
1177
1178   return sess;
1179 }
1180
1181 /**
1182  * rtp_session_reset:
1183  * @sess: an #RTPSession
1184  *
1185  * Reset the sources of @sess.
1186  */
1187 void
1188 rtp_session_reset (RTPSession * sess)
1189 {
1190   g_return_if_fail (RTP_IS_SESSION (sess));
1191
1192   /* remove all sources */
1193   g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]);
1194   sess->total_sources = 0;
1195   sess->stats.sender_sources = 0;
1196   sess->stats.internal_sender_sources = 0;
1197   sess->stats.internal_sources = 0;
1198   sess->stats.active_sources = 0;
1199
1200   sess->generation = 0;
1201   sess->first_rtcp = TRUE;
1202   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
1203   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
1204   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
1205   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
1206   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
1207   sess->scheduled_bye = FALSE;
1208
1209   /* reset session stats */
1210   sess->stats.bye_members = 0;
1211   sess->stats.nacks_dropped = 0;
1212   sess->stats.nacks_sent = 0;
1213   sess->stats.nacks_received = 0;
1214
1215   sess->is_doing_ptp = TRUE;
1216
1217   g_list_free_full (sess->conflicting_addresses,
1218       (GDestroyNotify) rtp_conflicting_address_free);
1219   sess->conflicting_addresses = NULL;
1220 }
1221
1222 /**
1223  * rtp_session_set_callbacks:
1224  * @sess: an #RTPSession
1225  * @callbacks: callbacks to configure
1226  * @user_data: user data passed in the callbacks
1227  *
1228  * Configure a set of callbacks to be notified of actions.
1229  */
1230 void
1231 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
1232     gpointer user_data)
1233 {
1234   g_return_if_fail (RTP_IS_SESSION (sess));
1235
1236   if (callbacks->process_rtp) {
1237     sess->callbacks.process_rtp = callbacks->process_rtp;
1238     sess->process_rtp_user_data = user_data;
1239   }
1240   if (callbacks->send_rtp) {
1241     sess->callbacks.send_rtp = callbacks->send_rtp;
1242     sess->send_rtp_user_data = user_data;
1243   }
1244   if (callbacks->send_rtcp) {
1245     sess->callbacks.send_rtcp = callbacks->send_rtcp;
1246     sess->send_rtcp_user_data = user_data;
1247   }
1248   if (callbacks->sync_rtcp) {
1249     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
1250     sess->sync_rtcp_user_data = user_data;
1251   }
1252   if (callbacks->caps) {
1253     sess->callbacks.caps = callbacks->caps;
1254     sess->caps_user_data = user_data;
1255   }
1256   if (callbacks->reconsider) {
1257     sess->callbacks.reconsider = callbacks->reconsider;
1258     sess->reconsider_user_data = user_data;
1259   }
1260   if (callbacks->request_key_unit) {
1261     sess->callbacks.request_key_unit = callbacks->request_key_unit;
1262     sess->request_key_unit_user_data = user_data;
1263   }
1264   if (callbacks->request_time) {
1265     sess->callbacks.request_time = callbacks->request_time;
1266     sess->request_time_user_data = user_data;
1267   }
1268   if (callbacks->notify_nack) {
1269     sess->callbacks.notify_nack = callbacks->notify_nack;
1270     sess->notify_nack_user_data = user_data;
1271   }
1272   if (callbacks->notify_twcc) {
1273     sess->callbacks.notify_twcc = callbacks->notify_twcc;
1274     sess->notify_twcc_user_data = user_data;
1275   }
1276   if (callbacks->reconfigure) {
1277     sess->callbacks.reconfigure = callbacks->reconfigure;
1278     sess->reconfigure_user_data = user_data;
1279   }
1280   if (callbacks->notify_early_rtcp) {
1281     sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp;
1282     sess->notify_early_rtcp_user_data = user_data;
1283   }
1284 }
1285
1286 /**
1287  * rtp_session_set_process_rtp_callback:
1288  * @sess: an #RTPSession
1289  * @callback: callback to set
1290  * @user_data: user data passed in the callback
1291  *
1292  * Configure only the process_rtp callback to be notified of the process_rtp action.
1293  */
1294 void
1295 rtp_session_set_process_rtp_callback (RTPSession * sess,
1296     RTPSessionProcessRTP callback, gpointer user_data)
1297 {
1298   g_return_if_fail (RTP_IS_SESSION (sess));
1299
1300   sess->callbacks.process_rtp = callback;
1301   sess->process_rtp_user_data = user_data;
1302 }
1303
1304 /**
1305  * rtp_session_set_send_rtp_callback:
1306  * @sess: an #RTPSession
1307  * @callback: callback to set
1308  * @user_data: user data passed in the callback
1309  *
1310  * Configure only the send_rtp callback to be notified of the send_rtp action.
1311  */
1312 void
1313 rtp_session_set_send_rtp_callback (RTPSession * sess,
1314     RTPSessionSendRTP callback, gpointer user_data)
1315 {
1316   g_return_if_fail (RTP_IS_SESSION (sess));
1317
1318   sess->callbacks.send_rtp = callback;
1319   sess->send_rtp_user_data = user_data;
1320 }
1321
1322 /**
1323  * rtp_session_set_send_rtcp_callback:
1324  * @sess: an #RTPSession
1325  * @callback: callback to set
1326  * @user_data: user data passed in the callback
1327  *
1328  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
1329  */
1330 void
1331 rtp_session_set_send_rtcp_callback (RTPSession * sess,
1332     RTPSessionSendRTCP callback, gpointer user_data)
1333 {
1334   g_return_if_fail (RTP_IS_SESSION (sess));
1335
1336   sess->callbacks.send_rtcp = callback;
1337   sess->send_rtcp_user_data = user_data;
1338 }
1339
1340 /**
1341  * rtp_session_set_sync_rtcp_callback:
1342  * @sess: an #RTPSession
1343  * @callback: callback to set
1344  * @user_data: user data passed in the callback
1345  *
1346  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
1347  */
1348 void
1349 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
1350     RTPSessionSyncRTCP callback, gpointer user_data)
1351 {
1352   g_return_if_fail (RTP_IS_SESSION (sess));
1353
1354   sess->callbacks.sync_rtcp = callback;
1355   sess->sync_rtcp_user_data = user_data;
1356 }
1357
1358 /**
1359  * rtp_session_set_caps_callback:
1360  * @sess: an #RTPSession
1361  * @callback: callback to set
1362  * @user_data: user data passed in the callback
1363  *
1364  * Configure only the clock_rate callback to be notified of the clock_rate action.
1365  */
1366 void
1367 rtp_session_set_caps_callback (RTPSession * sess,
1368     RTPSessionCaps callback, gpointer user_data)
1369 {
1370   g_return_if_fail (RTP_IS_SESSION (sess));
1371
1372   sess->callbacks.caps = callback;
1373   sess->caps_user_data = user_data;
1374 }
1375
1376 /**
1377  * rtp_session_set_reconsider_callback:
1378  * @sess: an #RTPSession
1379  * @callback: callback to set
1380  * @user_data: user data passed in the callback
1381  *
1382  * Configure only the reconsider callback to be notified of the reconsider action.
1383  */
1384 void
1385 rtp_session_set_reconsider_callback (RTPSession * sess,
1386     RTPSessionReconsider callback, gpointer user_data)
1387 {
1388   g_return_if_fail (RTP_IS_SESSION (sess));
1389
1390   sess->callbacks.reconsider = callback;
1391   sess->reconsider_user_data = user_data;
1392 }
1393
1394 /**
1395  * rtp_session_set_request_time_callback:
1396  * @sess: an #RTPSession
1397  * @callback: callback to set
1398  * @user_data: user data passed in the callback
1399  *
1400  * Configure only the request_time callback
1401  */
1402 void
1403 rtp_session_set_request_time_callback (RTPSession * sess,
1404     RTPSessionRequestTime callback, gpointer user_data)
1405 {
1406   g_return_if_fail (RTP_IS_SESSION (sess));
1407
1408   sess->callbacks.request_time = callback;
1409   sess->request_time_user_data = user_data;
1410 }
1411
1412 /**
1413  * rtp_session_set_bandwidth:
1414  * @sess: an #RTPSession
1415  * @bandwidth: the bandwidth allocated
1416  *
1417  * Set the session bandwidth in bits per second.
1418  */
1419 void
1420 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1421 {
1422   g_return_if_fail (RTP_IS_SESSION (sess));
1423
1424   RTP_SESSION_LOCK (sess);
1425   sess->stats.bandwidth = bandwidth;
1426   RTP_SESSION_UNLOCK (sess);
1427 }
1428
1429 /**
1430  * rtp_session_get_bandwidth:
1431  * @sess: an #RTPSession
1432  *
1433  * Get the session bandwidth.
1434  *
1435  * Returns: the session bandwidth.
1436  */
1437 gdouble
1438 rtp_session_get_bandwidth (RTPSession * sess)
1439 {
1440   gdouble result;
1441
1442   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1443
1444   RTP_SESSION_LOCK (sess);
1445   result = sess->stats.bandwidth;
1446   RTP_SESSION_UNLOCK (sess);
1447
1448   return result;
1449 }
1450
1451 /**
1452  * rtp_session_set_rtcp_fraction:
1453  * @sess: an #RTPSession
1454  * @bandwidth: the RTCP bandwidth
1455  *
1456  * Set the bandwidth in bits per second that should be used for RTCP
1457  * messages.
1458  */
1459 void
1460 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1461 {
1462   g_return_if_fail (RTP_IS_SESSION (sess));
1463
1464   RTP_SESSION_LOCK (sess);
1465   sess->stats.rtcp_bandwidth = bandwidth;
1466   RTP_SESSION_UNLOCK (sess);
1467 }
1468
1469 /**
1470  * rtp_session_get_rtcp_fraction:
1471  * @sess: an #RTPSession
1472  *
1473  * Get the session bandwidth used for RTCP.
1474  *
1475  * Returns: The bandwidth used for RTCP messages.
1476  */
1477 gdouble
1478 rtp_session_get_rtcp_fraction (RTPSession * sess)
1479 {
1480   gdouble result;
1481
1482   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1483
1484   RTP_SESSION_LOCK (sess);
1485   result = sess->stats.rtcp_bandwidth;
1486   RTP_SESSION_UNLOCK (sess);
1487
1488   return result;
1489 }
1490
1491 /**
1492  * rtp_session_get_sdes_struct:
1493  * @sess: an #RTSPSession
1494  *
1495  * Get the SDES data as a #GstStructure
1496  *
1497  * Returns: a GstStructure with SDES items for @sess. This function returns a
1498  * copy of the SDES structure, use gst_structure_free() after usage.
1499  */
1500 GstStructure *
1501 rtp_session_get_sdes_struct (RTPSession * sess)
1502 {
1503   GstStructure *result = NULL;
1504
1505   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1506
1507   RTP_SESSION_LOCK (sess);
1508   if (sess->sdes)
1509     result = gst_structure_copy (sess->sdes);
1510   RTP_SESSION_UNLOCK (sess);
1511
1512   return result;
1513 }
1514
1515 static void
1516 source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes)
1517 {
1518   rtp_source_set_sdes_struct (source, gst_structure_copy (sdes));
1519 }
1520
1521 /**
1522  * rtp_session_set_sdes_struct:
1523  * @sess: an #RTSPSession
1524  * @sdes: a #GstStructure
1525  *
1526  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1527  */
1528 void
1529 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1530 {
1531   g_return_if_fail (sdes);
1532   g_return_if_fail (RTP_IS_SESSION (sess));
1533
1534   RTP_SESSION_LOCK (sess);
1535   if (sess->sdes)
1536     gst_structure_free (sess->sdes);
1537   sess->sdes = gst_structure_copy (sdes);
1538
1539   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1540       (GHFunc) source_set_sdes, sess->sdes);
1541   RTP_SESSION_UNLOCK (sess);
1542 }
1543
1544 static GstFlowReturn
1545 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1546 {
1547   GstFlowReturn result = GST_FLOW_OK;
1548
1549   if (source->internal) {
1550     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1551
1552     RTP_SESSION_UNLOCK (session);
1553
1554     if (session->callbacks.send_rtp)
1555       result =
1556           session->callbacks.send_rtp (session, source, data,
1557           session->send_rtp_user_data);
1558     else {
1559       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1560     }
1561   } else {
1562     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1563     RTP_SESSION_UNLOCK (session);
1564
1565     if (session->callbacks.process_rtp)
1566       result =
1567           session->callbacks.process_rtp (session, source,
1568           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1569     else
1570       gst_buffer_unref (GST_BUFFER_CAST (data));
1571   }
1572   RTP_SESSION_LOCK (session);
1573
1574   return result;
1575 }
1576
1577 static GstCaps *
1578 source_caps (RTPSource * source, guint8 pt, RTPSession * session)
1579 {
1580   GstCaps *result = NULL;
1581
1582   RTP_SESSION_UNLOCK (session);
1583
1584   if (session->callbacks.caps)
1585     result = session->callbacks.caps (session, pt, session->caps_user_data);
1586
1587   RTP_SESSION_LOCK (session);
1588
1589   GST_DEBUG ("got caps %" GST_PTR_FORMAT " for pt %d", result, pt);
1590
1591   return result;
1592 }
1593
1594 static RTPSourceCallbacks callbacks = {
1595   (RTPSourcePushRTP) source_push_rtp,
1596   (RTPSourceCaps) source_caps,
1597 };
1598
1599
1600 /**
1601  * rtp_session_find_conflicting_address:
1602  * @session: The session the packet came in
1603  * @address: address to check for
1604  * @time: The time when the packet that is possibly in conflict arrived
1605  *
1606  * Checks if an address which has a conflict is already known. If it is
1607  * a known conflict, remember the time
1608  *
1609  * Returns: TRUE if it was a known conflict, FALSE otherwise
1610  */
1611 static gboolean
1612 rtp_session_find_conflicting_address (RTPSession * session,
1613     GSocketAddress * address, GstClockTime time)
1614 {
1615   return find_conflicting_address (session->conflicting_addresses, address,
1616       time);
1617 }
1618
1619 /**
1620  * rtp_session_add_conflicting_address:
1621  * @session: The session the packet came in
1622  * @address: address to remember
1623  * @time: The time when the packet that is in conflict arrived
1624  *
1625  * Adds a new conflict address
1626  */
1627 static void
1628 rtp_session_add_conflicting_address (RTPSession * sess,
1629     GSocketAddress * address, GstClockTime time)
1630 {
1631   sess->conflicting_addresses =
1632       add_conflicting_address (sess->conflicting_addresses, address, time);
1633 }
1634
1635 static void
1636 rtp_session_have_conflict (RTPSession * sess, RTPSource * source,
1637     GSocketAddress * address, GstClockTime current_time)
1638 {
1639   guint32 ssrc = rtp_source_get_ssrc (source);
1640
1641   /* Its a new collision, lets change our SSRC */
1642   rtp_session_add_conflicting_address (sess, address, current_time);
1643
1644   /* mark the source BYE */
1645   rtp_source_mark_bye (source, "SSRC Collision");
1646   /* if we were suggesting this SSRC, change to something else */
1647   if (sess->suggested_ssrc == ssrc) {
1648     sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1649     sess->internal_ssrc_set = TRUE;
1650   }
1651
1652   on_ssrc_collision (sess, source);
1653
1654   rtp_session_schedule_bye_locked (sess, current_time);
1655 }
1656
1657 static gboolean
1658 check_collision (RTPSession * sess, RTPSource * source,
1659     RTPPacketInfo * pinfo, gboolean rtp)
1660 {
1661   guint32 ssrc;
1662
1663   /* If we have no pinfo address, we can't do collision checking */
1664   if (!pinfo->address)
1665     return FALSE;
1666
1667   ssrc = rtp_source_get_ssrc (source);
1668
1669   if (!source->internal) {
1670     GSocketAddress *from;
1671
1672     /* This is not our local source, but lets check if two remote
1673      * source collide */
1674     if (rtp) {
1675       from = source->rtp_from;
1676     } else {
1677       from = source->rtcp_from;
1678     }
1679
1680     if (from) {
1681       if (__g_socket_address_equal (from, pinfo->address)) {
1682         /* Address is the same */
1683         return FALSE;
1684       } else {
1685         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1686         if (sess->favor_new) {
1687           if (rtp_source_find_conflicting_address (source,
1688                   pinfo->address, pinfo->current_time)) {
1689             gchar *buf1;
1690
1691             buf1 = __g_socket_address_to_string (pinfo->address);
1692             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1693                 buf1);
1694             g_free (buf1);
1695
1696             return TRUE;
1697           } else {
1698             gchar *buf1, *buf2;
1699
1700             /* Current address is not a known conflict, lets assume this is
1701              * a new source. Save old address in possible conflict list
1702              */
1703             rtp_source_add_conflicting_address (source, from,
1704                 pinfo->current_time);
1705
1706             buf1 = __g_socket_address_to_string (from);
1707             buf2 = __g_socket_address_to_string (pinfo->address);
1708
1709             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1710                 " saving old as known conflict", ssrc, buf1, buf2);
1711
1712             if (rtp)
1713               rtp_source_set_rtp_from (source, pinfo->address);
1714             else
1715               rtp_source_set_rtcp_from (source, pinfo->address);
1716
1717             g_free (buf1);
1718             g_free (buf2);
1719
1720             return FALSE;
1721           }
1722         } else {
1723           /* Don't need to save old addresses, we ignore new sources */
1724           return TRUE;
1725         }
1726       }
1727     } else {
1728       /* We don't already have a from address for RTP, just set it */
1729       if (rtp)
1730         rtp_source_set_rtp_from (source, pinfo->address);
1731       else
1732         rtp_source_set_rtcp_from (source, pinfo->address);
1733       return FALSE;
1734     }
1735
1736     /* FIXME: Log 3rd party collision somehow
1737      * Maybe should be done in upper layer, only the SDES can tell us
1738      * if its a collision or a loop
1739      */
1740   } else {
1741     /* This is sending with our ssrc, is it an address we already know */
1742     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1743             pinfo->current_time)) {
1744       /* Its a known conflict, its probably a loop, not a collision
1745        * lets just drop the incoming packet
1746        */
1747       GST_DEBUG ("Our packets are being looped back to us, dropping");
1748     } else {
1749       GST_DEBUG ("Collision for SSRC %x from new incoming packet,"
1750           " change our sender ssrc", ssrc);
1751
1752       rtp_session_have_conflict (sess, source, pinfo->address,
1753           pinfo->current_time);
1754     }
1755   }
1756
1757   return TRUE;
1758 }
1759
1760 typedef struct
1761 {
1762   gboolean is_doing_ptp;
1763   GSocketAddress *new_addr;
1764 } CompareAddrData;
1765
1766 /* check if the two given ip addr are the same (do not care about the port) */
1767 static gboolean
1768 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1769 {
1770   return
1771       g_inet_address_equal (g_inet_socket_address_get_address
1772       (G_INET_SOCKET_ADDRESS (a)),
1773       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1774 }
1775
1776 static void
1777 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1778     CompareAddrData * data)
1779 {
1780   /* only compare ip addr of remote sources which are also not closing */
1781   if (!source->internal && !source->closing && source->rtp_from) {
1782     /* look for the first rtp source */
1783     if (!data->new_addr)
1784       data->new_addr = source->rtp_from;
1785     /* compare current ip addr with the first one */
1786     else
1787       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1788   }
1789 }
1790
1791 static void
1792 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1793     CompareAddrData * data)
1794 {
1795   /* only compare ip addr of remote sources which are also not closing */
1796   if (!source->internal && !source->closing && source->rtcp_from) {
1797     /* look for the first rtcp source */
1798     if (!data->new_addr)
1799       data->new_addr = source->rtcp_from;
1800     else
1801       /* compare current ip addr with the first one */
1802       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1803   }
1804 }
1805
1806 /* loop over our non-internal source to know if the session
1807  * is doing point-to-point */
1808 static void
1809 session_update_ptp (RTPSession * sess)
1810 {
1811   /* to know if the session is doing point to point, the ip addr
1812    * of each non-internal (=remotes) source have to be compared
1813    * to each other.
1814    */
1815   gboolean is_doing_rtp_ptp;
1816   gboolean is_doing_rtcp_ptp;
1817   CompareAddrData data;
1818
1819   /* compare the first remote source's ip addr that receive rtp packets
1820    * with other remote rtp source.
1821    * it's enough because the session just needs to know if they are all
1822    * equals or not
1823    */
1824   data.is_doing_ptp = TRUE;
1825   data.new_addr = NULL;
1826   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1827       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1828   is_doing_rtp_ptp = data.is_doing_ptp;
1829
1830   /* same but about rtcp */
1831   data.is_doing_ptp = TRUE;
1832   data.new_addr = NULL;
1833   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1834       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1835   is_doing_rtcp_ptp = data.is_doing_ptp;
1836
1837   /* the session is doing point-to-point if all rtp remote have the same
1838    * ip addr and if all rtcp remote sources have the same ip addr */
1839   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1840
1841   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1842 }
1843
1844 static void
1845 add_source (RTPSession * sess, RTPSource * src)
1846 {
1847   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1848       GINT_TO_POINTER (src->ssrc), src);
1849   /* report the new source ASAP */
1850   src->generation = sess->generation;
1851   /* we have one more source now */
1852   sess->total_sources++;
1853   if (RTP_SOURCE_IS_ACTIVE (src))
1854     sess->stats.active_sources++;
1855   if (src->internal) {
1856     sess->stats.internal_sources++;
1857     if (!sess->internal_ssrc_from_caps_or_property
1858         && sess->suggested_ssrc != src->ssrc) {
1859       sess->suggested_ssrc = src->ssrc;
1860       sess->internal_ssrc_set = TRUE;
1861     }
1862   }
1863
1864   /* update point-to-point status */
1865   if (!src->internal)
1866     session_update_ptp (sess);
1867 }
1868
1869 static RTPSource *
1870 find_source (RTPSession * sess, guint32 ssrc)
1871 {
1872   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1873       GINT_TO_POINTER (ssrc));
1874 }
1875
1876 /* must be called with the session lock, the returned source needs to be
1877  * unreffed after usage. */
1878 static RTPSource *
1879 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1880     RTPPacketInfo * pinfo, gboolean rtp)
1881 {
1882   RTPSource *source;
1883
1884   source = find_source (sess, ssrc);
1885   if (source == NULL) {
1886     /* make new Source in probation and insert */
1887     source = rtp_source_new (ssrc);
1888
1889     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1890
1891     /* for RTP packets we need to set the source in probation. Receiving RTCP
1892      * packets of an SSRC, on the other hand, is a strong indication that we
1893      * are dealing with a valid source. */
1894     g_object_set (source, "probation", rtp ? sess->probation : 0,
1895         "max-dropout-time", sess->max_dropout_time, "max-misorder-time",
1896         sess->max_misorder_time, NULL);
1897
1898     /* store from address, if any */
1899     if (pinfo->address) {
1900       if (rtp)
1901         rtp_source_set_rtp_from (source, pinfo->address);
1902       else
1903         rtp_source_set_rtcp_from (source, pinfo->address);
1904     }
1905
1906     /* configure a callback on the source */
1907     rtp_source_set_callbacks (source, &callbacks, sess);
1908
1909     add_source (sess, source);
1910     *created = TRUE;
1911   } else {
1912     *created = FALSE;
1913     /* check for collision, this updates the address when not previously set */
1914     if (check_collision (sess, source, pinfo, rtp)) {
1915       return NULL;
1916     }
1917     /* Receiving RTCP packets of an SSRC is a strong indication that we
1918      * are dealing with a valid source. */
1919     if (!rtp)
1920       g_object_set (source, "probation", 0, NULL);
1921   }
1922   /* update last activity */
1923   source->last_activity = pinfo->current_time;
1924   if (rtp)
1925     source->last_rtp_activity = pinfo->current_time;
1926   g_object_ref (source);
1927
1928   return source;
1929 }
1930
1931 /* must be called with the session lock, the returned source needs to be
1932  * unreffed after usage. */
1933 static RTPSource *
1934 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1935     GstClockTime current_time)
1936 {
1937   RTPSource *source;
1938
1939   source = find_source (sess, ssrc);
1940   if (source == NULL) {
1941     /* make new internal Source and insert */
1942     source = rtp_source_new (ssrc);
1943
1944     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1945
1946     source->validated = TRUE;
1947     source->internal = TRUE;
1948     source->probation = 0;
1949     source->curr_probation = 0;
1950     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1951     rtp_source_set_callbacks (source, &callbacks, sess);
1952
1953     add_source (sess, source);
1954     *created = TRUE;
1955   } else {
1956     *created = FALSE;
1957   }
1958   /* update last activity */
1959   if (current_time != GST_CLOCK_TIME_NONE) {
1960     source->last_activity = current_time;
1961     source->last_rtp_activity = current_time;
1962   }
1963   g_object_ref (source);
1964
1965   return source;
1966 }
1967
1968 /**
1969  * rtp_session_suggest_ssrc:
1970  * @sess: a #RTPSession
1971  * @is_random: if the suggested ssrc is random
1972  *
1973  * Suggest an unused SSRC in @sess.
1974  *
1975  * Returns: a free unused SSRC
1976  */
1977 guint32
1978 rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random)
1979 {
1980   guint32 result;
1981
1982   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1983
1984   RTP_SESSION_LOCK (sess);
1985   result = sess->suggested_ssrc;
1986   if (is_random)
1987     *is_random = !sess->internal_ssrc_set;
1988   RTP_SESSION_UNLOCK (sess);
1989
1990   return result;
1991 }
1992
1993 /**
1994  * rtp_session_add_source:
1995  * @sess: a #RTPSession
1996  * @src: #RTPSource to add
1997  *
1998  * Add @src to @session.
1999  *
2000  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
2001  * existed in the session.
2002  */
2003 gboolean
2004 rtp_session_add_source (RTPSession * sess, RTPSource * src)
2005 {
2006   gboolean result = FALSE;
2007   RTPSource *find;
2008
2009   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
2010   g_return_val_if_fail (src != NULL, FALSE);
2011
2012   RTP_SESSION_LOCK (sess);
2013   find = find_source (sess, src->ssrc);
2014   if (find == NULL) {
2015     add_source (sess, src);
2016     result = TRUE;
2017   }
2018   RTP_SESSION_UNLOCK (sess);
2019
2020   return result;
2021 }
2022
2023 /**
2024  * rtp_session_get_num_sources:
2025  * @sess: an #RTPSession
2026  *
2027  * Get the number of sources in @sess.
2028  *
2029  * Returns: The number of sources in @sess.
2030  */
2031 guint
2032 rtp_session_get_num_sources (RTPSession * sess)
2033 {
2034   guint result;
2035
2036   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
2037
2038   RTP_SESSION_LOCK (sess);
2039   result = sess->total_sources;
2040   RTP_SESSION_UNLOCK (sess);
2041
2042   return result;
2043 }
2044
2045 /**
2046  * rtp_session_get_num_active_sources:
2047  * @sess: an #RTPSession
2048  *
2049  * Get the number of active sources in @sess. A source is considered active when
2050  * it has been validated and has not yet received a BYE RTCP message.
2051  *
2052  * Returns: The number of active sources in @sess.
2053  */
2054 guint
2055 rtp_session_get_num_active_sources (RTPSession * sess)
2056 {
2057   guint result;
2058
2059   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
2060
2061   RTP_SESSION_LOCK (sess);
2062   result = sess->stats.active_sources;
2063   RTP_SESSION_UNLOCK (sess);
2064
2065   return result;
2066 }
2067
2068 /**
2069  * rtp_session_get_source_by_ssrc:
2070  * @sess: an #RTPSession
2071  * @ssrc: an SSRC
2072  *
2073  * Find the source with @ssrc in @sess.
2074  *
2075  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
2076  * g_object_unref() after usage.
2077  */
2078 RTPSource *
2079 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
2080 {
2081   RTPSource *result;
2082
2083   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
2084
2085   RTP_SESSION_LOCK (sess);
2086   result = find_source (sess, ssrc);
2087   if (result != NULL)
2088     g_object_ref (result);
2089   RTP_SESSION_UNLOCK (sess);
2090
2091   return result;
2092 }
2093
2094 /* should be called with the SESSION lock */
2095 static guint32
2096 rtp_session_create_new_ssrc (RTPSession * sess)
2097 {
2098   guint32 ssrc;
2099
2100   while (TRUE) {
2101     ssrc = g_random_int ();
2102
2103     /* see if it exists in the session, we're done if it doesn't */
2104     if (find_source (sess, ssrc) == NULL)
2105       break;
2106   }
2107   return ssrc;
2108 }
2109
2110 static gboolean
2111 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
2112 {
2113   GstNetAddressMeta *meta;
2114
2115   /* get packet size including header overhead */
2116   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
2117   pinfo->packets++;
2118
2119   if (pinfo->rtp) {
2120     GstRTPBuffer rtp = { NULL };
2121
2122     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
2123       goto invalid_packet;
2124
2125     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
2126     if (idx == 0) {
2127       gint i;
2128
2129       /* only keep info for first buffer */
2130       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
2131       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
2132       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
2133       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2134       pinfo->marker = gst_rtp_buffer_get_marker (&rtp);
2135       /* copy available csrc */
2136       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
2137       for (i = 0; i < pinfo->csrc_count; i++)
2138         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
2139
2140       /* RTP header extensions */
2141       pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp,
2142           &pinfo->header_ext_bit_pattern);
2143     }
2144
2145     if (pinfo->ntp64_ext_id != 0 && pinfo->send && !pinfo->have_ntp64_ext) {
2146       guint8 *data;
2147       guint size;
2148
2149       /* Remember here that there is a 64-bit NTP header extension on this buffer
2150        * or any of the other buffers in the buffer list.
2151        * Later we update this after making the buffer(list) writable.
2152        */
2153       if ((gst_rtp_buffer_get_extension_onebyte_header (&rtp,
2154                   pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
2155               && size == 8)
2156           || (gst_rtp_buffer_get_extension_twobytes_header (&rtp, NULL,
2157                   pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
2158               && size == 8)) {
2159         pinfo->have_ntp64_ext = TRUE;
2160       }
2161     }
2162
2163     gst_rtp_buffer_unmap (&rtp);
2164   }
2165
2166   if (idx == 0) {
2167     /* for netbuffer we can store the IP address to check for collisions */
2168     meta = gst_buffer_get_net_address_meta (*buffer);
2169     if (pinfo->address)
2170       g_object_unref (pinfo->address);
2171     if (meta) {
2172       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
2173     } else {
2174       pinfo->address = NULL;
2175     }
2176   }
2177   return TRUE;
2178
2179   /* ERRORS */
2180 invalid_packet:
2181   {
2182     GST_DEBUG ("invalid RTP packet received");
2183     return FALSE;
2184   }
2185 }
2186
2187 /* update the RTPPacketInfo structure with the current time and other bits
2188  * about the current buffer we are handling.
2189  * This function is typically called when a validated packet is received.
2190  * This function should be called with the RTP_SESSION_LOCK
2191  */
2192 static gboolean
2193 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
2194     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
2195     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2196 {
2197   gboolean res;
2198
2199   pinfo->send = send;
2200   pinfo->rtp = rtp;
2201   pinfo->is_list = is_list;
2202   pinfo->data = data;
2203   pinfo->current_time = current_time;
2204   pinfo->running_time = running_time;
2205   pinfo->ntpnstime = ntpnstime;
2206   pinfo->header_len = sess->header_len;
2207   pinfo->bytes = 0;
2208   pinfo->payload_len = 0;
2209   pinfo->packets = 0;
2210   pinfo->marker = FALSE;
2211   pinfo->ntp64_ext_id = send ? sess->send_ntp64_ext_id : 0;
2212   pinfo->have_ntp64_ext = FALSE;
2213
2214   if (is_list) {
2215     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
2216     res =
2217         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
2218         pinfo);
2219     pinfo->arrival_time = GST_CLOCK_TIME_NONE;
2220   } else {
2221     GstBuffer *buffer = GST_BUFFER_CAST (data);
2222     res = update_packet (&buffer, 0, pinfo);
2223     pinfo->arrival_time = GST_BUFFER_DTS (buffer);
2224   }
2225
2226   return res;
2227 }
2228
2229 static void
2230 clean_packet_info (RTPPacketInfo * pinfo)
2231 {
2232   if (pinfo->address)
2233     g_object_unref (pinfo->address);
2234   if (pinfo->data) {
2235     gst_mini_object_unref (pinfo->data);
2236     pinfo->data = NULL;
2237   }
2238   if (pinfo->header_ext)
2239     g_bytes_unref (pinfo->header_ext);
2240 }
2241
2242 static gboolean
2243 source_update_active (RTPSession * sess, RTPSource * source,
2244     gboolean prevactive)
2245 {
2246   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
2247   guint32 ssrc = source->ssrc;
2248
2249   if (prevactive == active)
2250     return FALSE;
2251
2252   if (active) {
2253     sess->stats.active_sources++;
2254     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
2255         sess->stats.active_sources);
2256   } else {
2257     sess->stats.active_sources--;
2258     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
2259         sess->stats.active_sources);
2260   }
2261   return TRUE;
2262 }
2263
2264 static void
2265 process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
2266 {
2267   if (rtp_twcc_manager_recv_packet (sess->twcc, pinfo)) {
2268     RTP_SESSION_UNLOCK (sess);
2269
2270     /* TODO: find a better rational for this number, and possibly tune it based
2271        on factors like framerate / bandwidth etc */
2272     if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) {
2273       GST_INFO ("Could not schedule TWCC straight away");
2274     }
2275     RTP_SESSION_LOCK (sess);
2276   }
2277 }
2278
2279 static gboolean
2280 source_update_sender (RTPSession * sess, RTPSource * source,
2281     gboolean prevsender)
2282 {
2283   gboolean sender = RTP_SOURCE_IS_SENDER (source);
2284   guint32 ssrc = source->ssrc;
2285
2286   if (prevsender == sender)
2287     return FALSE;
2288
2289   if (sender) {
2290     sess->stats.sender_sources++;
2291     if (source->internal)
2292       sess->stats.internal_sender_sources++;
2293     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
2294         sess->stats.sender_sources);
2295   } else {
2296     sess->stats.sender_sources--;
2297     if (source->internal)
2298       sess->stats.internal_sender_sources--;
2299     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
2300         sess->stats.sender_sources);
2301   }
2302   return TRUE;
2303 }
2304
2305 /**
2306  * rtp_session_process_rtp:
2307  * @sess: and #RTPSession
2308  * @buffer: an RTP buffer
2309  * @current_time: the current system time
2310  * @running_time: the running_time of @buffer
2311  *
2312  * Process an RTP buffer in the session manager. This function takes ownership
2313  * of @buffer.
2314  *
2315  * Returns: a #GstFlowReturn.
2316  */
2317 GstFlowReturn
2318 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
2319     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2320 {
2321   GstFlowReturn result;
2322   guint32 ssrc;
2323   RTPSource *source;
2324   gboolean created;
2325   gboolean prevsender, prevactive;
2326   RTPPacketInfo pinfo = { 0, };
2327   guint64 oldrate;
2328
2329   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2330   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2331
2332   RTP_SESSION_LOCK (sess);
2333
2334   /* update pinfo stats */
2335   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
2336           current_time, running_time, ntpnstime)) {
2337     GST_DEBUG ("invalid RTP packet received");
2338     RTP_SESSION_UNLOCK (sess);
2339     return rtp_session_process_rtcp (sess, buffer, current_time, running_time,
2340         ntpnstime);
2341   }
2342
2343   ssrc = pinfo.ssrc;
2344
2345   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
2346   if (!source)
2347     goto collision;
2348
2349   prevsender = RTP_SOURCE_IS_SENDER (source);
2350   prevactive = RTP_SOURCE_IS_ACTIVE (source);
2351   oldrate = source->bitrate;
2352
2353   if (created)
2354     on_new_ssrc (sess, source);
2355
2356   /* let source process the packet */
2357   result = rtp_source_process_rtp (source, &pinfo);
2358   process_twcc_packet (sess, &pinfo);
2359
2360   /* source became active */
2361   if (source_update_active (sess, source, prevactive))
2362     on_ssrc_validated (sess, source);
2363
2364   source_update_sender (sess, source, prevsender);
2365
2366   if (oldrate != source->bitrate)
2367     sess->recalc_bandwidth = TRUE;
2368
2369
2370   if (source->validated) {
2371     gboolean created;
2372     gint i;
2373
2374     /* for validated sources, we add the CSRCs as well */
2375     for (i = 0; i < pinfo.csrc_count; i++) {
2376       guint32 csrc;
2377       RTPSource *csrc_src;
2378
2379       csrc = pinfo.csrcs[i];
2380
2381       /* get source */
2382       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
2383       if (!csrc_src)
2384         continue;
2385
2386       if (created) {
2387         GST_DEBUG ("created new CSRC: %08x", csrc);
2388         rtp_source_set_as_csrc (csrc_src);
2389         source_update_active (sess, csrc_src, FALSE);
2390         on_new_ssrc (sess, csrc_src);
2391       }
2392       g_object_unref (csrc_src);
2393     }
2394   }
2395   g_object_unref (source);
2396
2397   RTP_SESSION_UNLOCK (sess);
2398
2399   clean_packet_info (&pinfo);
2400
2401   return result;
2402
2403   /* ERRORS */
2404 collision:
2405   {
2406     RTP_SESSION_UNLOCK (sess);
2407     clean_packet_info (&pinfo);
2408     GST_DEBUG ("ignoring packet because its collisioning");
2409     return GST_FLOW_OK;
2410   }
2411 }
2412
2413 static void
2414 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2415     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2416 {
2417   guint count, i;
2418
2419   count = gst_rtcp_packet_get_rb_count (packet);
2420   for (i = 0; i < count; i++) {
2421     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2422     guint8 fractionlost;
2423     gint32 packetslost;
2424     RTPSource *src;
2425
2426     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2427         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2428
2429     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2430
2431     /* find our own source */
2432     src = find_source (sess, ssrc);
2433     if (src == NULL)
2434       continue;
2435
2436     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2437       /* only deal with report blocks for our session, we update the stats of
2438        * the sender of the RTCP message. We could also compare our stats against
2439        * the other sender to see if we are better or worse. */
2440       /* FIXME, need to keep track who the RB block is from */
2441       rtp_source_process_rb (source, ssrc, pinfo->ntpnstime, fractionlost,
2442           packetslost, exthighestseq, jitter, lsr, dlsr);
2443     }
2444   }
2445   on_ssrc_active (sess, source);
2446 }
2447
2448 /* A Sender report contains statistics about how the sender is doing. This
2449  * includes timing informataion such as the relation between RTP and NTP
2450  * timestamps and the number of packets/bytes it sent to us.
2451  *
2452  * In this report is also included a set of report blocks related to how this
2453  * sender is receiving data (in case we (or somebody else) is also sending stuff
2454  * to it). This info includes the packet loss, jitter and seqnum. It also
2455  * contains information to calculate the round trip time (LSR/DLSR).
2456  */
2457 static void
2458 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2459     RTPPacketInfo * pinfo, gboolean * do_sync)
2460 {
2461   guint32 senderssrc, rtptime, packet_count, octet_count;
2462   guint64 ntptime;
2463   RTPSource *source;
2464   gboolean created, prevsender;
2465
2466   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2467       &packet_count, &octet_count);
2468
2469   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2470       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2471
2472   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2473   if (!source)
2474     return;
2475
2476   /* skip non-bye packets for sources that are marked BYE */
2477   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2478     goto out;
2479
2480   /* don't try to do lip-sync for sources that sent a BYE */
2481   if (RTP_SOURCE_IS_MARKED_BYE (source))
2482     *do_sync = FALSE;
2483   else
2484     *do_sync = TRUE;
2485
2486   prevsender = RTP_SOURCE_IS_SENDER (source);
2487
2488   /* first update the source */
2489   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2490       packet_count, octet_count);
2491
2492   source_update_sender (sess, source, prevsender);
2493
2494   if (created)
2495     on_new_ssrc (sess, source);
2496
2497   rtp_session_process_rb (sess, source, packet, pinfo);
2498
2499 out:
2500   g_object_unref (source);
2501 }
2502
2503 /* A receiver report contains statistics about how a receiver is doing. It
2504  * includes stuff like packet loss, jitter and the seqnum it received last. It
2505  * also contains info to calculate the round trip time.
2506  *
2507  * We are only interested in how the sender of this report is doing wrt to us.
2508  */
2509 static void
2510 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2511     RTPPacketInfo * pinfo)
2512 {
2513   guint32 senderssrc;
2514   RTPSource *source;
2515   gboolean created;
2516
2517   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2518
2519   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2520
2521   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2522   if (!source)
2523     return;
2524
2525   /* skip non-bye packets for sources that are marked BYE */
2526   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2527     goto out;
2528
2529   if (created)
2530     on_new_ssrc (sess, source);
2531
2532   rtp_session_process_rb (sess, source, packet, pinfo);
2533
2534 out:
2535   g_object_unref (source);
2536 }
2537
2538 /* Get SDES items and store them in the SSRC */
2539 static void
2540 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2541     RTPPacketInfo * pinfo)
2542 {
2543   guint items, i, j;
2544   gboolean more_items, more_entries;
2545
2546   items = gst_rtcp_packet_sdes_get_item_count (packet);
2547   GST_DEBUG ("got SDES packet with %d items", items);
2548
2549   more_items = gst_rtcp_packet_sdes_first_item (packet);
2550   i = 0;
2551   while (more_items) {
2552     guint32 ssrc;
2553     gboolean changed, created, prevactive;
2554     RTPSource *source;
2555     GstStructure *sdes;
2556
2557     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2558
2559     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2560
2561     changed = FALSE;
2562
2563     /* find src, no probation when dealing with RTCP */
2564     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2565     if (!source)
2566       return;
2567
2568     /* skip non-bye packets for sources that are marked BYE */
2569     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2570       goto next;
2571
2572     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2573
2574     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2575     j = 0;
2576     while (more_entries) {
2577       GstRTCPSDESType type;
2578       guint8 len;
2579       guint8 *data;
2580       gchar *name;
2581       gchar *value;
2582
2583       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2584
2585       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2586           data);
2587
2588       if (type == GST_RTCP_SDES_PRIV) {
2589         name = g_strndup ((const gchar *) &data[1], data[0]);
2590         len -= data[0] + 1;
2591         data += data[0] + 1;
2592       } else {
2593         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2594       }
2595
2596       value = g_strndup ((const gchar *) data, len);
2597
2598       if (g_utf8_validate (value, -1, NULL)) {
2599         gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2600       } else {
2601         GST_WARNING ("ignore SDES field %s with non-utf8 data %s", name, value);
2602       }
2603
2604       g_free (name);
2605       g_free (value);
2606
2607       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2608       j++;
2609     }
2610
2611     /* takes ownership of sdes */
2612     changed = rtp_source_set_sdes_struct (source, sdes);
2613
2614     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2615     source->validated = TRUE;
2616
2617     if (created)
2618       on_new_ssrc (sess, source);
2619
2620     /* source became active */
2621     if (source_update_active (sess, source, prevactive))
2622       on_ssrc_validated (sess, source);
2623
2624     if (changed)
2625       on_ssrc_sdes (sess, source);
2626
2627   next:
2628     g_object_unref (source);
2629
2630     more_items = gst_rtcp_packet_sdes_next_item (packet);
2631     i++;
2632   }
2633 }
2634
2635 /* BYE is sent when a client leaves the session
2636  */
2637 static void
2638 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2639     RTPPacketInfo * pinfo)
2640 {
2641   guint count, i;
2642   gchar *reason;
2643   gboolean reconsider = FALSE;
2644
2645   reason = gst_rtcp_packet_bye_get_reason (packet);
2646   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2647
2648   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2649   for (i = 0; i < count; i++) {
2650     guint32 ssrc;
2651     RTPSource *source;
2652     gboolean prevactive, prevsender;
2653     guint pmembers, members;
2654
2655     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2656     GST_DEBUG ("SSRC: %08x", ssrc);
2657
2658     /* find src and mark bye, no probation when dealing with RTCP */
2659     source = find_source (sess, ssrc);
2660     if (!source || source->internal) {
2661       GST_DEBUG ("Ignoring suspicious BYE packet (reason: %s)",
2662           !source ? "can't find source" : "has internal source SSRC");
2663       break;
2664     }
2665
2666     /* store time for when we need to time out this source */
2667     source->bye_time = pinfo->current_time;
2668
2669     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2670     prevsender = RTP_SOURCE_IS_SENDER (source);
2671
2672     /* mark the source BYE */
2673     rtp_source_mark_bye (source, reason);
2674
2675     pmembers = sess->stats.active_sources;
2676
2677     source_update_active (sess, source, prevactive);
2678     source_update_sender (sess, source, prevsender);
2679
2680     members = sess->stats.active_sources;
2681
2682     if (!sess->scheduled_bye && members < pmembers) {
2683       /* some members went away since the previous timeout estimate.
2684        * Perform reverse reconsideration but only when we are not scheduling a
2685        * BYE ourselves. */
2686       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2687           pinfo->current_time < sess->next_rtcp_check_time) {
2688         GstClockTime time_remaining;
2689
2690         /* Scale our next RTCP check time according to the change of numbers
2691          * of members. But only if a) this is the first RTCP, or b) this is not
2692          * a feedback session, or c) this is a feedback session but we schedule
2693          * for every RTCP interval (aka no t-rr-interval set).
2694          *
2695          * FIXME: a) and b) are not great as we will possibly go below Tmin
2696          * for non-feedback profiles and in case of a) below
2697          * Tmin/t-rr-interval in any case.
2698          */
2699         if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE ||
2700             !(sess->rtp_profile == GST_RTP_PROFILE_AVPF
2701                 || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) ||
2702             sess->next_rtcp_check_time - sess->last_rtcp_send_time ==
2703             sess->last_rtcp_interval) {
2704           time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2705           sess->next_rtcp_check_time =
2706               gst_util_uint64_scale (time_remaining, members, pmembers);
2707           sess->next_rtcp_check_time += pinfo->current_time;
2708         }
2709         sess->last_rtcp_interval =
2710             gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers);
2711
2712         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2713             GST_TIME_ARGS (sess->next_rtcp_check_time));
2714
2715         /* mark pending reconsider. We only want to signal the reconsideration
2716          * once after we handled all the source in the bye packet */
2717         reconsider = TRUE;
2718       }
2719     }
2720
2721     on_bye_ssrc (sess, source);
2722   }
2723   if (reconsider) {
2724     RTP_SESSION_UNLOCK (sess);
2725     /* notify app of reconsideration */
2726     if (sess->callbacks.reconsider)
2727       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2728     RTP_SESSION_LOCK (sess);
2729   }
2730
2731   g_free (reason);
2732 }
2733
2734 static void
2735 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2736     RTPPacketInfo * pinfo)
2737 {
2738   GST_DEBUG ("received APP");
2739
2740   if (g_signal_has_handler_pending (sess,
2741           rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, TRUE)) {
2742     GstBuffer *data_buffer = NULL;
2743     guint16 data_length;
2744     gchar name[5];
2745
2746     data_length = gst_rtcp_packet_app_get_data_length (packet) * 4;
2747     if (data_length > 0) {
2748       guint8 *data = gst_rtcp_packet_app_get_data (packet);
2749       data_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2750           GST_BUFFER_COPY_MEMORY, data - packet->rtcp->map.data, data_length);
2751       GST_BUFFER_PTS (data_buffer) = pinfo->running_time;
2752     }
2753
2754     memcpy (name, gst_rtcp_packet_app_get_name (packet), 4);
2755     name[4] = '\0';
2756
2757     RTP_SESSION_UNLOCK (sess);
2758     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_APP_RTCP], 0,
2759         gst_rtcp_packet_app_get_subtype (packet),
2760         gst_rtcp_packet_app_get_ssrc (packet), name, data_buffer);
2761     RTP_SESSION_LOCK (sess);
2762
2763     if (data_buffer)
2764       gst_buffer_unref (data_buffer);
2765   }
2766 }
2767
2768 static gboolean
2769 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2770     const guint32 * ssrcs, guint num_ssrcs, gboolean fir,
2771     GstClockTime current_time)
2772 {
2773   guint32 round_trip = 0;
2774   gint i;
2775
2776   g_return_val_if_fail (ssrcs != NULL && num_ssrcs > 0, FALSE);
2777
2778   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
2779       &round_trip);
2780
2781   if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2782     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2783         GST_SECOND, 65536);
2784
2785     /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP
2786      * packets with erroneous values resulting in crazy high RTT. */
2787     if (round_trip_in_ns > 5 * GST_SECOND)
2788       round_trip_in_ns = GST_SECOND / 2;
2789
2790     if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) {
2791       GST_DEBUG ("Ignoring %s request from %X because one was send without one "
2792           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2793           fir ? "FIR" : "PLI", rtp_source_get_ssrc (src),
2794           GST_TIME_ARGS (current_time - src->last_keyframe_request),
2795           GST_TIME_ARGS (round_trip_in_ns));
2796       return FALSE;
2797     }
2798   }
2799
2800   src->last_keyframe_request = current_time;
2801
2802   for (i = 0; i < num_ssrcs; ++i) {
2803     GST_LOG ("received %s request from %X about %X %p(%p)",
2804         fir ? "FIR" : "PLI",
2805         rtp_source_get_ssrc (src), ssrcs[i], sess->callbacks.process_rtp,
2806         sess->callbacks.request_key_unit);
2807
2808     RTP_SESSION_UNLOCK (sess);
2809     sess->callbacks.request_key_unit (sess, ssrcs[i], fir,
2810         sess->request_key_unit_user_data);
2811     RTP_SESSION_LOCK (sess);
2812   }
2813
2814   return TRUE;
2815 }
2816
2817 static void
2818 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2819     guint32 media_ssrc, GstClockTime current_time)
2820 {
2821   RTPSource *src;
2822
2823   if (!sess->callbacks.request_key_unit)
2824     return;
2825
2826   src = find_source (sess, sender_ssrc);
2827   if (src == NULL) {
2828     /* try to find a src with media_ssrc instead */
2829     src = find_source (sess, media_ssrc);
2830     if (src == NULL)
2831       return;
2832   }
2833
2834   rtp_session_request_local_key_unit (sess, src, &media_ssrc, 1, FALSE,
2835       current_time);
2836 }
2837
2838 static void
2839 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2840     guint8 * fci_data, guint fci_length, GstClockTime current_time)
2841 {
2842   RTPSource *src;
2843   guint32 ssrc;
2844   guint position = 0;
2845   guint32 ssrcs[32];
2846   guint num_ssrcs = 0;
2847
2848   if (!sess->callbacks.request_key_unit)
2849     return;
2850
2851   if (fci_length < 8)
2852     return;
2853
2854   src = find_source (sess, sender_ssrc);
2855
2856   /* Hack because Google fails to set the sender_ssrc correctly */
2857   if (!src && sender_ssrc == 1) {
2858     GHashTableIter iter;
2859
2860     /* we can't find the source if there are multiple */
2861     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2862       return;
2863
2864     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2865     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2866       if (!src->internal && rtp_source_is_sender (src))
2867         break;
2868       src = NULL;
2869     }
2870   }
2871   if (!src)
2872     return;
2873
2874   for (position = 0; position < fci_length; position += 8) {
2875     guint8 *data = fci_data + position;
2876     RTPSource *own;
2877
2878     ssrc = GST_READ_UINT32_BE (data);
2879
2880     own = find_source (sess, ssrc);
2881     if (own == NULL)
2882       continue;
2883
2884     if (own->internal && num_ssrcs < 32) {
2885       ssrcs[num_ssrcs++] = ssrc;
2886     }
2887   }
2888   if (num_ssrcs == 0)
2889     return;
2890
2891   rtp_session_request_local_key_unit (sess, src, ssrcs, num_ssrcs, TRUE,
2892       current_time);
2893 }
2894
2895 static void
2896 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2897     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2898     GstClockTime current_time)
2899 {
2900   sess->stats.nacks_received++;
2901
2902   if (!sess->callbacks.notify_nack)
2903     return;
2904
2905   while (fci_length > 0) {
2906     guint16 seqnum, blp;
2907
2908     seqnum = GST_READ_UINT16_BE (fci_data);
2909     blp = GST_READ_UINT16_BE (fci_data + 2);
2910
2911     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2912
2913     RTP_SESSION_UNLOCK (sess);
2914     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2915         sess->notify_nack_user_data);
2916     RTP_SESSION_LOCK (sess);
2917
2918     fci_data += 4;
2919     fci_length -= 4;
2920   }
2921 }
2922
2923 static void
2924 rtp_session_process_sr_req (RTPSession * sess, guint32 sender_ssrc,
2925     guint32 media_ssrc)
2926 {
2927   RTPSource *src;
2928
2929   /* Request a new SR in feedback profiles ASAP */
2930   if (sess->rtp_profile != GST_RTP_PROFILE_AVPF
2931       && sess->rtp_profile != GST_RTP_PROFILE_SAVPF)
2932     return;
2933
2934   src = find_source (sess, sender_ssrc);
2935   /* Our own RTCP packet */
2936   if (src && src->internal)
2937     return;
2938
2939   src = find_source (sess, media_ssrc);
2940   /* Not an SSRC we're producing */
2941   if (!src || !src->internal)
2942     return;
2943
2944   GST_DEBUG_OBJECT (sess, "Handling RTCP-SR-REQ");
2945   /* FIXME: 5s max_delay hard-coded here as we have to give some
2946    * high enough value */
2947   sess->sr_req_pending = TRUE;
2948 #ifdef TIZEN_FEATURE_BUG_FIX
2949   RTP_SESSION_UNLOCK (sess);
2950 #endif
2951   rtp_session_send_rtcp (sess, 5 * GST_SECOND);
2952 #ifdef TIZEN_FEATURE_BUG_FIX
2953   RTP_SESSION_LOCK (sess);
2954 #endif
2955 }
2956
2957 static void
2958 rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc,
2959     guint32 media_ssrc, guint8 * fci_data, guint fci_length)
2960 {
2961   GArray *twcc_packets;
2962   GstStructure *twcc_packets_s;
2963   GstStructure *twcc_stats_s;
2964
2965   twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc,
2966       fci_data, fci_length * sizeof (guint32));
2967   if (twcc_packets == NULL)
2968     return;
2969
2970   twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets);
2971   twcc_stats_s =
2972       rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets);
2973
2974   GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s);
2975   GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s);
2976
2977   g_array_unref (twcc_packets);
2978
2979   RTP_SESSION_UNLOCK (sess);
2980   if (sess->callbacks.notify_twcc)
2981     sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s,
2982         sess->notify_twcc_user_data);
2983   RTP_SESSION_LOCK (sess);
2984 }
2985
2986 static void
2987 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2988     RTPPacketInfo * pinfo, GstClockTime current_time)
2989 {
2990   GstRTCPType type;
2991   GstRTCPFBType fbtype;
2992   guint32 sender_ssrc, media_ssrc;
2993   guint8 *fci_data;
2994   guint fci_length;
2995   RTPSource *src;
2996
2997   /* The feedback packet must include both sender SSRC and media SSRC */
2998   if (packet->length < 2)
2999     return;
3000
3001   type = gst_rtcp_packet_get_type (packet);
3002   fbtype = gst_rtcp_packet_fb_get_type (packet);
3003   sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
3004   media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
3005
3006   src = find_source (sess, media_ssrc);
3007
3008   /* skip non-bye packets for sources that are marked BYE */
3009   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
3010     return;
3011
3012   if (src)
3013     g_object_ref (src);
3014
3015   fci_data = gst_rtcp_packet_fb_get_fci (packet);
3016   fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32);
3017
3018   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
3019       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
3020
3021   if (g_signal_has_handler_pending (sess,
3022           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
3023     GstBuffer *fci_buffer = NULL;
3024
3025     if (fci_length > 0) {
3026       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
3027           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
3028           fci_length);
3029       GST_BUFFER_PTS (fci_buffer) = pinfo->running_time;
3030     }
3031
3032     RTP_SESSION_UNLOCK (sess);
3033     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
3034         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
3035     RTP_SESSION_LOCK (sess);
3036
3037     if (fci_buffer)
3038       gst_buffer_unref (fci_buffer);
3039   }
3040
3041   if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) {
3042     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
3043   }
3044
3045   if ((src && src->internal) ||
3046       /* PSFB FIR puts the media ssrc inside the FCI */
3047       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) ||
3048       /* TWCC is for all sources, so a single media-ssrc is not enough */
3049       (type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) {
3050     switch (type) {
3051       case GST_RTCP_TYPE_PSFB:
3052         switch (fbtype) {
3053           case GST_RTCP_PSFB_TYPE_PLI:
3054             if (src)
3055               src->stats.recv_pli_count++;
3056             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
3057                 current_time);
3058             break;
3059           case GST_RTCP_PSFB_TYPE_FIR:
3060             if (src)
3061               src->stats.recv_fir_count++;
3062             rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
3063                 current_time);
3064             break;
3065           default:
3066             break;
3067         }
3068         break;
3069       case GST_RTCP_TYPE_RTPFB:
3070         switch (fbtype) {
3071           case GST_RTCP_RTPFB_TYPE_NACK:
3072             if (src)
3073               src->stats.recv_nack_count++;
3074             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
3075                 fci_data, fci_length, current_time);
3076             break;
3077           case GST_RTCP_RTPFB_TYPE_RTCP_SR_REQ:
3078             rtp_session_process_sr_req (sess, sender_ssrc, media_ssrc);
3079             break;
3080           case GST_RTCP_RTPFB_TYPE_TWCC:
3081             rtp_session_process_twcc (sess, sender_ssrc, media_ssrc,
3082                 fci_data, fci_length);
3083             break;
3084           default:
3085             break;
3086         }
3087       default:
3088         break;
3089     }
3090   }
3091
3092   if (src)
3093     g_object_unref (src);
3094 }
3095
3096 /**
3097  * rtp_session_process_rtcp:
3098  * @sess: and #RTPSession
3099  * @buffer: an RTCP buffer
3100  * @current_time: the current system time
3101  * @ntpnstime: the current NTP time in nanoseconds
3102  *
3103  * Process an RTCP buffer in the session manager. This function takes ownership
3104  * of @buffer.
3105  *
3106  * Returns: a #GstFlowReturn.
3107  */
3108 GstFlowReturn
3109 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
3110     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
3111 {
3112   GstRTCPPacket packet;
3113   gboolean more, is_bye = FALSE, do_sync = FALSE, has_report = FALSE;
3114   RTPPacketInfo pinfo = { 0, };
3115   GstFlowReturn result = GST_FLOW_OK;
3116   GstRTCPBuffer rtcp = { NULL, };
3117
3118   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3119   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3120
3121   if (!gst_rtcp_buffer_validate_reduced (buffer))
3122     goto invalid_packet;
3123
3124   GST_DEBUG ("received RTCP packet");
3125
3126   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
3127       buffer);
3128
3129   RTP_SESSION_LOCK (sess);
3130   /* update pinfo stats */
3131   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
3132       running_time, ntpnstime);
3133
3134   /* start processing the compound packet */
3135   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3136   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
3137   while (more) {
3138     GstRTCPType type;
3139
3140     type = gst_rtcp_packet_get_type (&packet);
3141
3142     switch (type) {
3143       case GST_RTCP_TYPE_SR:
3144         has_report = TRUE;
3145         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
3146         break;
3147       case GST_RTCP_TYPE_RR:
3148         has_report = TRUE;
3149         rtp_session_process_rr (sess, &packet, &pinfo);
3150         break;
3151       case GST_RTCP_TYPE_SDES:
3152         rtp_session_process_sdes (sess, &packet, &pinfo);
3153         break;
3154       case GST_RTCP_TYPE_BYE:
3155         is_bye = TRUE;
3156         /* don't try to attempt lip-sync anymore for streams with a BYE */
3157         do_sync = FALSE;
3158         rtp_session_process_bye (sess, &packet, &pinfo);
3159         break;
3160       case GST_RTCP_TYPE_APP:
3161         rtp_session_process_app (sess, &packet, &pinfo);
3162         break;
3163       case GST_RTCP_TYPE_RTPFB:
3164       case GST_RTCP_TYPE_PSFB:
3165         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
3166         break;
3167       case GST_RTCP_TYPE_XR:
3168         /* FIXME: This block is added to downgrade warning level.
3169          * Once the parser is implemented, it should be replaced with
3170          * a proper process function. */
3171         GST_DEBUG ("got RTCP XR packet, but ignored");
3172         break;
3173       default:
3174         GST_WARNING ("got unknown RTCP packet type: %d", type);
3175         break;
3176     }
3177     more = gst_rtcp_packet_move_to_next (&packet);
3178   }
3179
3180   gst_rtcp_buffer_unmap (&rtcp);
3181
3182   /* if we are scheduling a BYE, we only want to count bye packets, else we
3183    * count everything */
3184   if (sess->scheduled_bye && is_bye) {
3185     sess->bye_stats.bye_members++;
3186     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
3187   }
3188
3189   /* keep track of average packet size */
3190   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3191
3192   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
3193       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3194   RTP_SESSION_UNLOCK (sess);
3195
3196   if (has_report) {
3197     g_object_notify_by_pspec (G_OBJECT (sess), properties[PROP_STATS]);
3198   }
3199
3200   pinfo.data = NULL;
3201   clean_packet_info (&pinfo);
3202
3203   /* notify caller of sr packets in the callback */
3204   if (do_sync && sess->callbacks.sync_rtcp) {
3205     result = sess->callbacks.sync_rtcp (sess, buffer,
3206         sess->sync_rtcp_user_data);
3207   } else
3208     gst_buffer_unref (buffer);
3209
3210   return result;
3211
3212   /* ERRORS */
3213 invalid_packet:
3214   {
3215     GST_DEBUG ("invalid RTCP packet received");
3216     gst_buffer_unref (buffer);
3217     return GST_FLOW_OK;
3218   }
3219 }
3220
3221 /**
3222  * rtp_session_update_send_caps:
3223  * @sess: an #RTPSession
3224  * @caps: a #GstCaps
3225  *
3226  * Update the caps of the sender in the rtp session.
3227  */
3228 void
3229 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
3230 {
3231   GstStructure *s;
3232   guint ssrc;
3233
3234   g_return_if_fail (RTP_IS_SESSION (sess));
3235   g_return_if_fail (GST_IS_CAPS (caps));
3236
3237   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
3238
3239   s = gst_caps_get_structure (caps, 0);
3240
3241   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
3242     RTPSource *source;
3243     gboolean created;
3244
3245     RTP_SESSION_LOCK (sess);
3246     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3247     sess->suggested_ssrc = ssrc;
3248     sess->internal_ssrc_set = TRUE;
3249     sess->internal_ssrc_from_caps_or_property = TRUE;
3250     if (source) {
3251       rtp_source_update_send_caps (source, caps);
3252
3253       if (created)
3254         on_new_sender_ssrc (sess, source);
3255
3256       g_object_unref (source);
3257     }
3258
3259     if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
3260       source =
3261           obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3262       if (source) {
3263         rtp_source_update_send_caps (source, caps);
3264
3265         if (created)
3266           on_new_sender_ssrc (sess, source);
3267
3268         g_object_unref (source);
3269       }
3270     }
3271     RTP_SESSION_UNLOCK (sess);
3272   } else {
3273     sess->internal_ssrc_from_caps_or_property = FALSE;
3274   }
3275
3276   sess->send_ntp64_ext_id =
3277       gst_rtp_get_extmap_id_for_attribute (s,
3278       GST_RTP_HDREXT_BASE GST_RTP_HDREXT_NTP_64);
3279
3280   rtp_twcc_manager_parse_send_ext_id (sess->twcc, s);
3281 }
3282
3283 static void
3284 update_ntp64_header_ext_data (RTPPacketInfo * pinfo, GstBuffer * buffer)
3285 {
3286   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
3287
3288   if (gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtp)) {
3289     guint16 bits;
3290     guint8 *data;
3291     guint wordlen;
3292
3293     if (gst_rtp_buffer_get_extension_data (&rtp, &bits, (gpointer *) & data,
3294             &wordlen)) {
3295       gsize len = wordlen * 4;
3296
3297       /* One-byte header */
3298       if (bits == 0xBEDE) {
3299         /* One-byte header extension */
3300         while (TRUE) {
3301           guint8 ext_id, ext_len;
3302
3303           if (len < 1)
3304             break;
3305
3306           ext_id = GST_READ_UINT8 (data) >> 4;
3307           ext_len = (GST_READ_UINT8 (data) & 0xF) + 1;
3308           data += 1;
3309           len -= 1;
3310           if (ext_id == 0) {
3311             /* Skip padding */
3312             continue;
3313           } else if (ext_id == 15) {
3314             /* Stop parsing */
3315             break;
3316           }
3317
3318           /* extension doesn't fit into the header */
3319           if (ext_len > len)
3320             break;
3321
3322           if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
3323             if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
3324               guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
3325                   G_GUINT64_CONSTANT (1) << 32,
3326                   GST_SECOND);
3327
3328               GST_WRITE_UINT64_BE (data, ntptime);
3329             } else {
3330               /* Replace extension with padding */
3331               memset (data - 1, 0, 1 + ext_len);
3332             }
3333           }
3334
3335           /* skip to the next extension */
3336           data += ext_len;
3337           len -= ext_len;
3338         }
3339       } else if ((bits >> 4) == 0x100) {
3340         /* Two-byte header extension */
3341
3342         while (TRUE) {
3343           guint8 ext_id, ext_len;
3344
3345           if (len < 1)
3346             break;
3347
3348           ext_id = GST_READ_UINT8 (data);
3349           data += 1;
3350           len -= 1;
3351           if (ext_id == 0) {
3352             /* Skip padding */
3353             continue;
3354           }
3355
3356           ext_len = GST_READ_UINT8 (data);
3357           data += 1;
3358           len -= 1;
3359
3360           /* extension doesn't fit into the header */
3361           if (ext_len > len)
3362             break;
3363
3364           if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
3365             if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
3366               guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
3367                   G_GUINT64_CONSTANT (1) << 32,
3368                   GST_SECOND);
3369
3370               GST_WRITE_UINT64_BE (data, ntptime);
3371             } else {
3372               /* Replace extension with padding */
3373               memset (data - 2, 0, 2 + ext_len);
3374             }
3375           }
3376
3377           /* skip to the next extension */
3378           data += ext_len;
3379           len -= ext_len;
3380         }
3381       }
3382     }
3383     gst_rtp_buffer_unmap (&rtp);
3384   }
3385 }
3386
3387 static void
3388 update_ntp64_header_ext (RTPPacketInfo * pinfo)
3389 {
3390   /* Early return if we don't know the header extension id or the packets
3391    * don't contain the header extension */
3392   if (pinfo->ntp64_ext_id == 0 || !pinfo->have_ntp64_ext)
3393     return;
3394
3395   /* If no NTP time is known then the header extension will be replaced with
3396    * padding, otherwise it will be updated */
3397   GST_TRACE
3398       ("Updating NTP-64 header extension for SSRC %08x packet with RTP time %u and running time %"
3399       GST_TIME_FORMAT " to %" GST_TIME_FORMAT, pinfo->ssrc, pinfo->rtptime,
3400       GST_TIME_ARGS (pinfo->running_time), GST_TIME_ARGS (pinfo->ntpnstime));
3401
3402   if (GST_IS_BUFFER_LIST (pinfo->data)) {
3403     GstBufferList *list;
3404     guint i = 0;
3405
3406     pinfo->data = gst_buffer_list_make_writable (pinfo->data);
3407
3408     list = GST_BUFFER_LIST (pinfo->data);
3409
3410     for (i = 0; i < gst_buffer_list_length (list); i++) {
3411       GstBuffer *buffer = gst_buffer_list_get_writable (list, i);
3412
3413       update_ntp64_header_ext_data (pinfo, buffer);
3414     }
3415   } else {
3416     pinfo->data = gst_buffer_make_writable (pinfo->data);
3417     update_ntp64_header_ext_data (pinfo, pinfo->data);
3418   }
3419 }
3420
3421 /**
3422  * rtp_session_send_rtp:
3423  * @sess: an #RTPSession
3424  * @data: pointer to either an RTP buffer or a list of RTP buffers
3425  * @is_list: TRUE when @data is a buffer list
3426  * @current_time: the current system time
3427  * @running_time: the running time of @data
3428  *
3429  * Send the RTP data (a buffer or buffer list) in the session manager. This
3430  * function takes ownership of @data.
3431  *
3432  * Returns: a #GstFlowReturn.
3433  */
3434 GstFlowReturn
3435 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
3436     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
3437 {
3438   GstFlowReturn result;
3439   RTPSource *source;
3440   gboolean prevsender;
3441   guint64 oldrate;
3442   RTPPacketInfo pinfo = { 0, };
3443   gboolean created;
3444
3445   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3446   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
3447
3448   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
3449
3450   RTP_SESSION_LOCK (sess);
3451   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
3452           current_time, running_time, ntpnstime))
3453     goto invalid_packet;
3454
3455   /* Update any 64-bit NTP header extensions with the actual NTP time here */
3456   if (sess->update_ntp64_header_ext)
3457     update_ntp64_header_ext (&pinfo);
3458
3459   rtp_twcc_manager_send_packet (sess->twcc, &pinfo);
3460
3461   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
3462   if (created)
3463     on_new_sender_ssrc (sess, source);
3464
3465   if (!source->internal) {
3466     GSocketAddress *from;
3467
3468     if (source->rtp_from)
3469       from = source->rtp_from;
3470     else
3471       from = source->rtcp_from;
3472     if (from) {
3473       if (rtp_session_find_conflicting_address (sess, from, current_time)) {
3474         /* Its a known conflict, its probably a loop, not a collision
3475          * lets just drop the incoming packet
3476          */
3477         GST_LOG ("Our packets are being looped back to us, ignoring collision");
3478       } else {
3479         GST_DEBUG ("Collision for SSRC %x, change our sender ssrc", pinfo.ssrc);
3480
3481         rtp_session_have_conflict (sess, source, from, current_time);
3482       }
3483     } else {
3484       GST_LOG ("Ignoring collision on sent SSRC %x because remote source"
3485           " doesn't have an address", pinfo.ssrc);
3486     }
3487
3488     /* the the sending source is not internal, we have to drop the packet,
3489        or else we will end up receving it ourselves! */
3490     goto collision;
3491   }
3492
3493   prevsender = RTP_SOURCE_IS_SENDER (source);
3494   oldrate = source->bitrate;
3495
3496   /* we use our own source to send */
3497   result = rtp_source_send_rtp (source, &pinfo);
3498
3499   source_update_sender (sess, source, prevsender);
3500
3501   if (oldrate != source->bitrate)
3502     sess->recalc_bandwidth = TRUE;
3503   RTP_SESSION_UNLOCK (sess);
3504
3505   g_object_unref (source);
3506   clean_packet_info (&pinfo);
3507
3508   return result;
3509
3510 invalid_packet:
3511   {
3512     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
3513     RTP_SESSION_UNLOCK (sess);
3514     GST_DEBUG ("invalid RTP packet received");
3515     return GST_FLOW_OK;
3516   }
3517 collision:
3518   {
3519     g_object_unref (source);
3520     clean_packet_info (&pinfo);
3521     RTP_SESSION_UNLOCK (sess);
3522     GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
3523         pinfo.ssrc);
3524     return GST_FLOW_OK;
3525   }
3526 }
3527
3528 static void
3529 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
3530 {
3531   *bandwidth += source->bitrate;
3532 }
3533
3534 /* must be called with session lock */
3535 static GstClockTime
3536 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
3537     gboolean first)
3538 {
3539   GstClockTime result;
3540   RTPSessionStats *stats;
3541
3542   /* recalculate bandwidth when it changed */
3543   if (sess->recalc_bandwidth) {
3544     gdouble bandwidth;
3545
3546     if (sess->bandwidth > 0)
3547       bandwidth = sess->bandwidth;
3548     else {
3549       /* If it is <= 0, then try to estimate the actual bandwidth */
3550       bandwidth = 0;
3551
3552       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3553           (GHFunc) add_bitrates, &bandwidth);
3554     }
3555     if (bandwidth < RTP_STATS_BANDWIDTH)
3556       bandwidth = RTP_STATS_BANDWIDTH;
3557
3558     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
3559         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
3560
3561     sess->recalc_bandwidth = FALSE;
3562   }
3563
3564   if (sess->scheduled_bye) {
3565     stats = &sess->bye_stats;
3566     result = rtp_stats_calculate_bye_interval (stats);
3567   } else {
3568     session_update_ptp (sess);
3569
3570     stats = &sess->stats;
3571     result = rtp_stats_calculate_rtcp_interval (stats,
3572         stats->internal_sender_sources > 0, sess->rtp_profile,
3573         sess->is_doing_ptp, first);
3574   }
3575
3576   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
3577       GST_TIME_ARGS (result), first);
3578
3579   if (!deterministic && result != GST_CLOCK_TIME_NONE)
3580     result = rtp_stats_add_rtcp_jitter (stats, result);
3581
3582   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
3583
3584   return result;
3585 }
3586
3587 static void
3588 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
3589 {
3590   if (source->internal)
3591     rtp_source_mark_bye (source, reason);
3592 }
3593
3594 /**
3595  * rtp_session_mark_all_bye:
3596  * @sess: an #RTPSession
3597  * @reason: a reason
3598  *
3599  * Mark all internal sources of the session as BYE with @reason.
3600  */
3601 void
3602 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
3603 {
3604   g_return_if_fail (RTP_IS_SESSION (sess));
3605
3606   RTP_SESSION_LOCK (sess);
3607   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3608       (GHFunc) source_mark_bye, (gpointer) reason);
3609   RTP_SESSION_UNLOCK (sess);
3610 }
3611
3612 /* Stop the current @sess and schedule a BYE message for the other members.
3613  * One must have the session lock to call this function
3614  */
3615 static GstFlowReturn
3616 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
3617 {
3618   GstFlowReturn result = GST_FLOW_OK;
3619   GstClockTime interval;
3620
3621   /* nothing to do it we already scheduled bye */
3622   if (sess->scheduled_bye)
3623     goto done;
3624
3625   /* we schedule BYE now */
3626   sess->scheduled_bye = TRUE;
3627   /* at least one member wants to send a BYE */
3628   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
3629   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
3630   sess->bye_stats.bye_members = 1;
3631   sess->first_rtcp = TRUE;
3632
3633   /* reschedule transmission */
3634   sess->last_rtcp_send_time = current_time;
3635   sess->last_rtcp_check_time = current_time;
3636   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3637
3638   if (interval != GST_CLOCK_TIME_NONE)
3639     sess->next_rtcp_check_time = current_time + interval;
3640   else
3641     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
3642   sess->last_rtcp_interval = interval;
3643
3644   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
3645       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
3646
3647   RTP_SESSION_UNLOCK (sess);
3648   /* notify app of reconsideration */
3649   if (sess->callbacks.reconsider)
3650     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3651   RTP_SESSION_LOCK (sess);
3652 done:
3653
3654   return result;
3655 }
3656
3657 /**
3658  * rtp_session_schedule_bye:
3659  * @sess: an #RTPSession
3660  * @current_time: the current system time
3661  *
3662  * Schedule a BYE message for all sources marked as BYE in @sess.
3663  *
3664  * Returns: a #GstFlowReturn.
3665  */
3666 GstFlowReturn
3667 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
3668 {
3669   GstFlowReturn result;
3670
3671   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3672
3673   RTP_SESSION_LOCK (sess);
3674   result = rtp_session_schedule_bye_locked (sess, current_time);
3675   RTP_SESSION_UNLOCK (sess);
3676
3677   return result;
3678 }
3679
3680 /**
3681  * rtp_session_next_timeout:
3682  * @sess: an #RTPSession
3683  * @current_time: the current system time
3684  *
3685  * Get the next time we should perform session maintenance tasks.
3686  *
3687  * Returns: a time when rtp_session_on_timeout() should be called with the
3688  * current system time.
3689  */
3690 GstClockTime
3691 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
3692 {
3693   GstClockTime result, interval = 0;
3694
3695   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
3696
3697   RTP_SESSION_LOCK (sess);
3698
3699   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3700     GST_DEBUG ("have early rtcp time");
3701     result = sess->next_early_rtcp_time;
3702     goto early_exit;
3703   }
3704
3705   result = sess->next_rtcp_check_time;
3706
3707   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3708       ", next time: %" GST_TIME_FORMAT,
3709       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3710
3711   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
3712     GST_DEBUG ("take current time as base");
3713     /* our previous check time expired, start counting from the current time
3714      * again. */
3715     result = current_time;
3716   }
3717
3718   if (sess->scheduled_bye) {
3719     if (sess->bye_stats.active_sources >= 50) {
3720       GST_DEBUG ("reconsider BYE, more than 50 sources");
3721       /* reconsider BYE if members >= 50 */
3722       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3723       sess->last_rtcp_interval = interval;
3724     }
3725   } else {
3726     if (sess->first_rtcp) {
3727       GST_DEBUG ("first RTCP packet");
3728       /* we are called for the first time */
3729       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3730       sess->last_rtcp_interval = interval;
3731     } else if (sess->next_rtcp_check_time < current_time) {
3732       GST_DEBUG ("old check time expired, getting new timeout");
3733       /* get a new timeout when we need to */
3734       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
3735       sess->last_rtcp_interval = interval;
3736
3737       if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
3738               || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
3739           && interval != GST_CLOCK_TIME_NONE) {
3740         /* Apply the rules from RFC 4585 section 3.5.3 */
3741         if (sess->stats.min_interval != 0) {
3742           GstClockTime T_rr_current_interval = g_random_double_range (0.5,
3743               1.5) * sess->stats.min_interval * GST_SECOND;
3744
3745           if (T_rr_current_interval > interval) {
3746             GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
3747                 " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
3748                 GST_TIME_ARGS (interval));
3749             interval = T_rr_current_interval;
3750           }
3751         }
3752       }
3753     }
3754   }
3755
3756   if (interval != GST_CLOCK_TIME_NONE)
3757     result += interval;
3758   else
3759     result = GST_CLOCK_TIME_NONE;
3760
3761   sess->next_rtcp_check_time = result;
3762
3763 early_exit:
3764
3765   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3766       ", next time: %" GST_TIME_FORMAT,
3767       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3768   RTP_SESSION_UNLOCK (sess);
3769
3770   return result;
3771 }
3772
3773 typedef struct
3774 {
3775   RTPSource *source;
3776   gboolean is_bye;
3777   GstBuffer *buffer;
3778 } ReportOutput;
3779
3780 typedef struct
3781 {
3782   GstRTCPBuffer rtcpbuf;
3783   RTPSession *sess;
3784   RTPSource *source;
3785   guint num_to_report;
3786   gboolean have_fir;
3787   gboolean have_pli;
3788   gboolean have_nack;
3789   GstBuffer *rtcp;
3790   GstClockTime current_time;
3791   guint64 ntpnstime;
3792   GstClockTime running_time;
3793   GstClockTime interval;
3794   GstRTCPPacket packet;
3795   gboolean has_sdes;
3796   gboolean is_early;
3797   gboolean may_suppress;
3798   GQueue output;
3799   guint nacked_seqnums;
3800 } ReportData;
3801
3802 static void
3803 session_start_rtcp (RTPSession * sess, ReportData * data)
3804 {
3805   GstRTCPPacket *packet = &data->packet;
3806   RTPSource *own = data->source;
3807   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3808
3809   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
3810   data->has_sdes = FALSE;
3811
3812   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3813
3814   if (RTP_SOURCE_IS_SENDER (own) && (!data->is_early || !sess->reduced_size_rtcp
3815           || sess->sr_req_pending)) {
3816     guint64 ntptime;
3817     guint32 rtptime;
3818     guint32 packet_count, octet_count;
3819
3820     sess->sr_req_pending = FALSE;
3821
3822     /* we are a sender, create SR */
3823     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3824     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3825
3826     /* get latest stats */
3827     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3828         &ntptime, &rtptime, &packet_count, &octet_count);
3829     /* store stats */
3830     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3831         packet_count, octet_count);
3832
3833     /* fill in sender report info */
3834     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3835         sess->timestamp_sender_reports ? ntptime : 0,
3836         sess->timestamp_sender_reports ? rtptime : 0,
3837         packet_count, octet_count);
3838   } else if (!data->is_early || !sess->reduced_size_rtcp) {
3839     /* we are only receiver, create RR */
3840     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3841     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3842     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3843   }
3844 }
3845
3846 /* construct a Sender or Receiver Report */
3847 static void
3848 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3849 {
3850   RTPSession *sess = data->sess;
3851   GstRTCPPacket *packet = &data->packet;
3852   guint8 fractionlost;
3853   gint32 packetslost;
3854   guint32 exthighestseq, jitter;
3855   guint32 lsr, dlsr;
3856
3857   /* don't report for sources in future generations */
3858   if (((gint16) (source->generation - sess->generation)) > 0) {
3859     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3860         source->generation, sess->generation);
3861     return;
3862   }
3863
3864   if (g_hash_table_contains (source->reported_in_sr_of,
3865           GUINT_TO_POINTER (data->source->ssrc))) {
3866     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3867     return;
3868   }
3869
3870   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3871     GST_DEBUG ("max RB count reached");
3872     return;
3873   }
3874
3875   /* only report about remote sources */
3876   if (source->internal)
3877     goto reported;
3878
3879   if (!RTP_SOURCE_IS_SENDER (source)) {
3880     GST_DEBUG ("source %08x not sender", source->ssrc);
3881     goto reported;
3882   }
3883
3884   if (source->disable_rtcp) {
3885     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
3886     goto reported;
3887   }
3888
3889   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3890
3891   /* get new stats */
3892   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3893       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3894
3895   /* store last generated RR packet */
3896   source->last_rr.is_valid = TRUE;
3897   source->last_rr.ssrc = data->source->ssrc;
3898   source->last_rr.fractionlost = fractionlost;
3899   source->last_rr.packetslost = packetslost;
3900   source->last_rr.exthighestseq = exthighestseq;
3901   source->last_rr.jitter = jitter;
3902   source->last_rr.lsr = lsr;
3903   source->last_rr.dlsr = dlsr;
3904
3905   /* packet is not yet filled, add report block for this source. */
3906   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3907       exthighestseq, jitter, lsr, dlsr);
3908
3909 reported:
3910   g_hash_table_add (source->reported_in_sr_of,
3911       GUINT_TO_POINTER (data->source->ssrc));
3912 }
3913
3914 /* construct FIR */
3915 static void
3916 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3917 {
3918   GstRTCPPacket *packet = &data->packet;
3919   guint16 len;
3920   guint8 *fci_data;
3921
3922   if (!source->send_fir)
3923     return;
3924
3925   len = gst_rtcp_packet_fb_get_fci_length (packet);
3926   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3927     /* exit because the packet is full, will put next request in a
3928      * further packet */
3929     return;
3930
3931   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3932
3933   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3934   fci_data += 4;
3935   fci_data[0] = source->current_send_fir_seqnum;
3936   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3937
3938   source->send_fir = FALSE;
3939   source->stats.sent_fir_count++;
3940 }
3941
3942 static void
3943 session_fir (RTPSession * sess, ReportData * data)
3944 {
3945   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3946   GstRTCPPacket *packet = &data->packet;
3947
3948   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3949     return;
3950
3951   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3952   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3953   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3954
3955   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3956       (GHFunc) session_add_fir, data);
3957
3958   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3959     gst_rtcp_packet_remove (packet);
3960   else
3961     data->may_suppress = FALSE;
3962 }
3963
3964 static gboolean
3965 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3966 {
3967   GstRTCPPacket packet;
3968   GstRTCPBuffer rtcp = { NULL, };
3969   gboolean ret = FALSE;
3970
3971   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3972
3973   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3974     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3975         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3976       ret = TRUE;
3977   }
3978
3979   gst_rtcp_buffer_unmap (&rtcp);
3980
3981   return ret;
3982 }
3983
3984 /* construct PLI */
3985 static void
3986 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3987 {
3988   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3989   GstRTCPPacket *packet = &data->packet;
3990
3991   if (!source->send_pli)
3992     return;
3993
3994   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3995     return;
3996
3997   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3998     /* exit because the packet is full, will put next request in a
3999      * further packet */
4000     return;
4001
4002   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
4003   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
4004   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
4005
4006   source->send_pli = FALSE;
4007   data->may_suppress = FALSE;
4008
4009   source->stats.sent_pli_count++;
4010 }
4011
4012 /* construct NACK */
4013 static void
4014 session_nack (const gchar * key, RTPSource * source, ReportData * data)
4015 {
4016   RTPSession *sess = data->sess;
4017   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4018   GstRTCPPacket *packet = &data->packet;
4019   guint16 *nacks;
4020   GstClockTime *nack_deadlines;
4021   guint n_nacks, i = 0;
4022   guint nacked_seqnums = 0;
4023   guint16 n_fb_nacks = 0;
4024   guint8 *fci_data;
4025
4026   if (!source->send_nack)
4027     return;
4028
4029   nacks = rtp_source_get_nacks (source, &n_nacks);
4030   nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
4031   GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
4032       GST_TIME_ARGS (data->current_time));
4033
4034   /* cleanup expired nacks */
4035   for (i = 0; i < n_nacks; i++) {
4036     GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
4037         GST_TIME_ARGS (nack_deadlines[i]));
4038     if (nack_deadlines[i] >= data->current_time)
4039       break;
4040   }
4041
4042   if (data->is_early) {
4043     /* don't remove them all if this is an early RTCP packet. It may happen
4044      * that the NACKs are late due to high RTT, not sending NACKs at all would
4045      * keep the RTX RTT stats high and maintain a dropping state. */
4046     i = MIN (n_nacks - 1, i);
4047   }
4048
4049   if (i) {
4050     GST_WARNING ("Removing %u expired NACKS", i);
4051     rtp_source_clear_nacks (source, i);
4052     n_nacks -= i;
4053     if (n_nacks == 0)
4054       return;
4055   }
4056
4057   /* allow overriding NACK to packet conversion */
4058   if (g_signal_has_handler_pending (sess,
4059           rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) {
4060     /* this is needed as it will actually resize the buffer */
4061     gst_rtcp_buffer_unmap (rtcp);
4062
4063     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0,
4064         data->source->ssrc, source->ssrc, source->nacks, data->rtcp,
4065         &nacked_seqnums);
4066
4067     /* and now remap for the remaining work */
4068     gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
4069
4070     if (nacked_seqnums > 0)
4071       goto done;
4072   }
4073
4074   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
4075     /* exit because the packet is full, will put next request in a
4076      * further packet */
4077     return;
4078
4079   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
4080   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
4081   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
4082
4083   if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
4084     gst_rtcp_packet_remove (packet);
4085     GST_WARNING ("no nacks fit in the packet");
4086     return;
4087   }
4088
4089   fci_data = gst_rtcp_packet_fb_get_fci (packet);
4090   for (i = 0; i < n_nacks; i = nacked_seqnums) {
4091     guint16 seqnum = nacks[i];
4092     guint16 blp = 0;
4093     guint j;
4094
4095     if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
4096       break;
4097
4098     n_fb_nacks++;
4099     nacked_seqnums++;
4100
4101     for (j = i + 1; j < n_nacks; j++) {
4102       gint diff;
4103
4104       diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
4105       GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
4106       if (diff > 16)
4107         break;
4108
4109       blp |= 1 << (diff - 1);
4110       nacked_seqnums++;
4111     }
4112
4113     GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
4114     fci_data += 4;
4115   }
4116
4117   GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
4118   source->stats.sent_nack_count += n_fb_nacks;
4119
4120 done:
4121   data->nacked_seqnums += nacked_seqnums;
4122   rtp_source_clear_nacks (source, nacked_seqnums);
4123   data->may_suppress = FALSE;
4124 }
4125
4126 /* perform cleanup of sources that timed out */
4127 static void
4128 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
4129 {
4130   gboolean remove = FALSE;
4131   gboolean byetimeout = FALSE;
4132   gboolean sendertimeout = FALSE;
4133   gboolean is_sender, is_active;
4134   RTPSession *sess = data->sess;
4135   GstClockTime interval, binterval;
4136   GstClockTime btime;
4137
4138   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
4139
4140   /* check for outdated collisions */
4141   if (source->internal) {
4142     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
4143     rtp_source_timeout (source, data->current_time, data->running_time,
4144         sess->rtcp_feedback_retention_window);
4145   }
4146
4147   /* nothing else to do when without RTCP */
4148   if (data->interval == GST_CLOCK_TIME_NONE)
4149     return;
4150
4151   is_sender = RTP_SOURCE_IS_SENDER (source);
4152   is_active = RTP_SOURCE_IS_ACTIVE (source);
4153
4154   /* our own rtcp interval may have been forced low by secondary configuration,
4155    * while sender side may still operate with higher interval,
4156    * so do not just take our interval to decide on timing out sender,
4157    * but take (if data->interval <= 5 * GST_SECOND):
4158    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
4159    * where sender_interval is difference between last 2 received RTCP reports
4160    */
4161   if (data->interval >= 5 * GST_SECOND || source->internal) {
4162     binterval = data->interval;
4163   } else {
4164     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
4165         GST_TIME_ARGS (source->stats.prev_rtcptime),
4166         GST_TIME_ARGS (source->stats.last_rtcptime));
4167     /* if not received enough yet, fallback to larger default */
4168     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
4169       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
4170     else
4171       binterval = 5 * GST_SECOND;
4172     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
4173   }
4174   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
4175       GST_TIME_ARGS (binterval));
4176
4177   if (!source->internal && source->marked_bye) {
4178     /* if we received a BYE from the source, remove the source after some
4179      * time. */
4180     if (data->current_time > source->bye_time &&
4181         data->current_time - source->bye_time > sess->stats.bye_timeout) {
4182       GST_DEBUG ("removing BYE source %08x", source->ssrc);
4183       remove = TRUE;
4184       byetimeout = TRUE;
4185     }
4186   }
4187
4188   if (source->internal && source->sent_bye) {
4189     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
4190     remove = TRUE;
4191   }
4192
4193   /* sources that were inactive for more than 5 times the deterministic reporting
4194    * interval get timed out. the min timeout is 5 seconds. */
4195   /* mind old time that might pre-date last time going to PLAYING */
4196   btime = MAX (source->last_activity, sess->start_time);
4197   if (data->current_time > btime) {
4198     interval = MAX (binterval * 5, 5 * GST_SECOND);
4199     if (data->current_time - btime > interval) {
4200       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
4201           source->ssrc, GST_TIME_ARGS (btime));
4202       if (source->internal) {
4203         /* this is an internal source that is not using our suggested ssrc.
4204          * since there must be another source using this ssrc, we can remove
4205          * this one instead of making it a receiver forever */
4206         if (source->ssrc != sess->suggested_ssrc
4207             && source->media_ssrc != sess->suggested_ssrc) {
4208           rtp_source_mark_bye (source, "timed out");
4209           /* do not schedule bye here, since we are inside the RTCP timeout
4210            * processing and scheduling bye will interfere with SR/RR sending */
4211         }
4212       } else {
4213         remove = TRUE;
4214       }
4215     }
4216   }
4217
4218   /* senders that did not send for a long time become a receiver, this also
4219    * holds for our own sources. */
4220   if (is_sender) {
4221     /* mind old time that might pre-date last time going to PLAYING */
4222     btime = MAX (source->last_rtp_activity, sess->start_time);
4223     if (data->current_time > btime) {
4224       interval = MAX (binterval * 2, 5 * GST_SECOND);
4225       if (data->current_time - btime > interval) {
4226         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
4227             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
4228         sendertimeout = TRUE;
4229       }
4230     }
4231   }
4232
4233   if (remove) {
4234     sess->total_sources--;
4235     if (is_sender) {
4236       sess->stats.sender_sources--;
4237       if (source->internal)
4238         sess->stats.internal_sender_sources--;
4239     }
4240     if (is_active)
4241       sess->stats.active_sources--;
4242
4243     if (source->internal)
4244       sess->stats.internal_sources--;
4245
4246     if (byetimeout)
4247       on_bye_timeout (sess, source);
4248     else
4249       on_timeout (sess, source);
4250   } else {
4251     if (sendertimeout) {
4252       source->is_sender = FALSE;
4253       sess->stats.sender_sources--;
4254       if (source->internal)
4255         sess->stats.internal_sender_sources--;
4256
4257       on_sender_timeout (sess, source);
4258     }
4259     /* count how many source to report in this generation */
4260     if (((gint16) (source->generation - sess->generation)) <= 0)
4261       data->num_to_report++;
4262   }
4263   source->closing = remove;
4264 }
4265
4266 static void
4267 session_sdes (RTPSession * sess, ReportData * data)
4268 {
4269   GstRTCPPacket *packet = &data->packet;
4270   const GstStructure *sdes;
4271   gint i, n_fields;
4272   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4273
4274   /* add SDES packet */
4275   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
4276
4277   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
4278
4279   sdes = rtp_source_get_sdes_struct (data->source);
4280
4281   /* add all fields in the structure, the order is not important. */
4282   n_fields = gst_structure_n_fields (sdes);
4283   for (i = 0; i < n_fields; ++i) {
4284     const gchar *field;
4285     const gchar *value;
4286     GstRTCPSDESType type;
4287
4288     field = gst_structure_nth_field_name (sdes, i);
4289     if (field == NULL)
4290       continue;
4291     value = gst_structure_get_string (sdes, field);
4292     if (value == NULL)
4293       continue;
4294     type = gst_rtcp_sdes_name_to_type (field);
4295
4296     /* Early packets are minimal and only include the CNAME */
4297     if (data->is_early && type != GST_RTCP_SDES_CNAME)
4298       continue;
4299
4300     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
4301       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
4302           (const guint8 *) value);
4303     } else if (type == GST_RTCP_SDES_PRIV) {
4304       gsize prefix_len;
4305       gsize value_len;
4306       gsize data_len;
4307       guint8 data[256];
4308
4309       /* don't accept entries that are too big */
4310       prefix_len = strlen (field);
4311       if (prefix_len > 255)
4312         continue;
4313       value_len = strlen (value);
4314       if (value_len > 255)
4315         continue;
4316       data_len = 1 + prefix_len + value_len;
4317       if (data_len > 255)
4318         continue;
4319
4320       data[0] = prefix_len;
4321       memcpy (&data[1], field, prefix_len);
4322       memcpy (&data[1 + prefix_len], value, value_len);
4323
4324       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
4325     }
4326   }
4327
4328   data->has_sdes = TRUE;
4329 }
4330
4331 /* schedule a BYE packet */
4332 static void
4333 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
4334 {
4335   GstRTCPPacket *packet = &data->packet;
4336   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4337
4338   /* add SDES */
4339   session_sdes (sess, data);
4340   /* add a BYE packet */
4341   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
4342   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
4343   if (source->bye_reason)
4344     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
4345
4346   /* we have a BYE packet now */
4347   source->sent_bye = TRUE;
4348 }
4349
4350 static gboolean
4351 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
4352 {
4353   GstClockTime new_send_time;
4354   GstClockTime interval;
4355   RTPSessionStats *stats;
4356
4357   if (sess->scheduled_bye)
4358     stats = &sess->bye_stats;
4359   else
4360     stats = &sess->stats;
4361
4362   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
4363     data->is_early = TRUE;
4364   else
4365     data->is_early = FALSE;
4366
4367   if (data->is_early && sess->next_early_rtcp_time <= current_time) {
4368     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %"
4369         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
4370         GST_TIME_ARGS (current_time));
4371   } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
4372       sess->next_rtcp_check_time > current_time) {
4373     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
4374         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
4375         GST_TIME_ARGS (current_time));
4376     return FALSE;
4377   }
4378
4379   /* take interval and add jitter */
4380   interval = data->interval;
4381   if (interval != GST_CLOCK_TIME_NONE)
4382     interval = rtp_stats_add_rtcp_jitter (stats, interval);
4383
4384   if (sess->last_rtcp_check_time != GST_CLOCK_TIME_NONE) {
4385     /* perform forward reconsideration */
4386     if (interval != GST_CLOCK_TIME_NONE) {
4387       GstClockTime elapsed;
4388
4389       /* get elapsed time since we last reported */
4390       elapsed = current_time - sess->last_rtcp_check_time;
4391
4392       GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
4393           GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
4394       new_send_time = interval + sess->last_rtcp_check_time;
4395     } else {
4396       new_send_time = sess->last_rtcp_check_time;
4397     }
4398   } else {
4399     /* If this is the first RTCP packet, we can reconsider anything based
4400      * on the last RTCP send time because there was none.
4401      */
4402     g_warn_if_fail (!data->is_early);
4403     data->is_early = FALSE;
4404     new_send_time = current_time;
4405   }
4406
4407   if (!data->is_early) {
4408     /* check if reconsideration */
4409     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
4410       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
4411           GST_TIME_ARGS (new_send_time));
4412       /* store new check time */
4413       sess->next_rtcp_check_time = new_send_time;
4414       sess->last_rtcp_interval = interval;
4415       return FALSE;
4416     }
4417
4418     sess->last_rtcp_interval = interval;
4419     if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
4420             || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
4421         && interval != GST_CLOCK_TIME_NONE) {
4422       /* Apply the rules from RFC 4585 section 3.5.3 */
4423       if (stats->min_interval != 0 && !sess->first_rtcp) {
4424         GstClockTime T_rr_current_interval =
4425             g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND;
4426
4427         if (T_rr_current_interval > interval) {
4428           GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
4429               " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
4430               GST_TIME_ARGS (interval));
4431           interval = T_rr_current_interval;
4432         }
4433       }
4434     }
4435     sess->next_rtcp_check_time = current_time + interval;
4436   }
4437
4438
4439   GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT,
4440       GST_TIME_ARGS (sess->next_rtcp_check_time));
4441
4442   return TRUE;
4443 }
4444
4445 static void
4446 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
4447 {
4448   g_hash_table_insert (hash_table, key, g_object_ref (source));
4449 }
4450
4451 static gboolean
4452 remove_closing_sources (const gchar * key, RTPSource * source,
4453     ReportData * data)
4454 {
4455   if (source->closing)
4456     return TRUE;
4457
4458   if (source->send_fir)
4459     data->have_fir = TRUE;
4460   if (source->send_pli)
4461     data->have_pli = TRUE;
4462   if (source->send_nack)
4463     data->have_nack = TRUE;
4464
4465   return FALSE;
4466 }
4467
4468 static void
4469 generate_twcc (const gchar * key, RTPSource * source, ReportData * data)
4470 {
4471   RTPSession *sess = data->sess;
4472   GstBuffer *buf;
4473
4474   /* only generate RTCP for active internal sources */
4475   if (!source->internal || source->sent_bye)
4476     return;
4477
4478   /* ignore other sources when we do the timeout after a scheduled BYE */
4479   if (sess->scheduled_bye && !source->marked_bye)
4480     return;
4481
4482   /* skip if RTCP is disabled */
4483   if (source->disable_rtcp) {
4484     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4485     return;
4486   }
4487
4488   GST_DEBUG ("generating TWCC feedback for source %08x", source->ssrc);
4489
4490   while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) {
4491     ReportOutput *output = g_slice_new (ReportOutput);
4492     output->source = g_object_ref (source);
4493     output->is_bye = FALSE;
4494     output->buffer = buf;
4495     /* queue the RTCP packet to push later */
4496     g_queue_push_tail (&data->output, output);
4497   }
4498 }
4499
4500
4501 static void
4502 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
4503 {
4504   RTPSession *sess = data->sess;
4505   gboolean is_bye = FALSE;
4506   ReportOutput *output;
4507   gboolean sr_req_pending = sess->sr_req_pending;
4508
4509   /* only generate RTCP for active internal sources */
4510   if (!source->internal || source->sent_bye)
4511     return;
4512
4513   /* ignore other sources when we do the timeout after a scheduled BYE */
4514   if (sess->scheduled_bye && !source->marked_bye)
4515     return;
4516
4517   /* skip if RTCP is disabled */
4518   if (source->disable_rtcp) {
4519     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4520     return;
4521   }
4522
4523   data->source = source;
4524
4525   /* open packet */
4526   session_start_rtcp (sess, data);
4527
4528   if (source->marked_bye) {
4529     /* send BYE */
4530     make_source_bye (sess, source, data);
4531     is_bye = TRUE;
4532   } else if (!data->is_early) {
4533     /* loop over all known sources and add report blocks. If we are early, we
4534      * just make a minimal RTCP packet and skip this step */
4535     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4536         (GHFunc) session_report_blocks, data);
4537   }
4538   if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp
4539           || sr_req_pending))
4540     session_sdes (sess, data);
4541
4542   if (data->have_fir)
4543     session_fir (sess, data);
4544
4545   if (data->have_pli)
4546     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4547         (GHFunc) session_pli, data);
4548
4549   if (data->have_nack)
4550     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4551         (GHFunc) session_nack, data);
4552
4553   gst_rtcp_buffer_unmap (&data->rtcpbuf);
4554
4555   output = g_slice_new (ReportOutput);
4556   output->source = g_object_ref (source);
4557   output->is_bye = is_bye;
4558   output->buffer = data->rtcp;
4559   /* queue the RTCP packet to push later */
4560   g_queue_push_tail (&data->output, output);
4561 }
4562
4563 static void
4564 update_generation (const gchar * key, RTPSource * source, ReportData * data)
4565 {
4566   RTPSession *sess = data->sess;
4567
4568   if (g_hash_table_size (source->reported_in_sr_of) >=
4569       sess->stats.internal_sources) {
4570     /* source is reported, move to next generation */
4571     source->generation = sess->generation + 1;
4572     g_hash_table_remove_all (source->reported_in_sr_of);
4573
4574     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
4575         source->generation);
4576
4577     /* if we reported all sources in this generation, move to next */
4578     if (--data->num_to_report == 0) {
4579       sess->generation++;
4580       GST_DEBUG ("all reported, generation now %u", sess->generation);
4581     }
4582   }
4583 }
4584
4585 static void
4586 schedule_remaining_nacks (const gchar * key, RTPSource * source,
4587     ReportData * data)
4588 {
4589   RTPSession *sess = data->sess;
4590   GstClockTime *nack_deadlines;
4591   GstClockTime deadline;
4592   guint n_nacks;
4593
4594   if (!source->send_nack)
4595     return;
4596
4597   /* the scheduling is entirely based on available bandwidth, just take the
4598    * biggest seqnum, which will have the largest deadline to request early
4599    * RTCP. */
4600   nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
4601   deadline = nack_deadlines[n_nacks - 1];
4602   RTP_SESSION_UNLOCK (sess);
4603   rtp_session_send_rtcp_with_deadline (sess, deadline);
4604   RTP_SESSION_LOCK (sess);
4605 }
4606
4607 static gboolean
4608 rtp_session_are_all_sources_bye (RTPSession * sess)
4609 {
4610   GHashTableIter iter;
4611   RTPSource *src;
4612
4613   RTP_SESSION_LOCK (sess);
4614   g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
4615   while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
4616     if (src->internal && !src->sent_bye) {
4617       RTP_SESSION_UNLOCK (sess);
4618       return FALSE;
4619     }
4620   }
4621   RTP_SESSION_UNLOCK (sess);
4622
4623   return TRUE;
4624 }
4625
4626 /**
4627  * rtp_session_on_timeout:
4628  * @sess: an #RTPSession
4629  * @current_time: the current system time
4630  * @ntpnstime: the current NTP time in nanoseconds
4631  * @running_time: the current running_time of the pipeline
4632  *
4633  * Perform maintenance actions after the timeout obtained with
4634  * rtp_session_next_timeout() expired.
4635  *
4636  * This function will perform timeouts of receivers and senders, send a BYE
4637  * packet or generate RTCP packets with current session stats.
4638  *
4639  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
4640  * times, for each packet that should be processed.
4641  *
4642  * Returns: a #GstFlowReturn.
4643  */
4644 GstFlowReturn
4645 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
4646     guint64 ntpnstime, GstClockTime running_time)
4647 {
4648   GstFlowReturn result = GST_FLOW_OK;
4649   ReportData data = { GST_RTCP_BUFFER_INIT };
4650   GHashTable *table_copy;
4651   ReportOutput *output;
4652   gboolean all_empty = FALSE;
4653
4654   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
4655
4656   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
4657       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4658       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
4659
4660   data.sess = sess;
4661   data.current_time = current_time;
4662   data.ntpnstime = ntpnstime;
4663   data.running_time = running_time;
4664   data.num_to_report = 0;
4665   data.may_suppress = FALSE;
4666   data.nacked_seqnums = 0;
4667   g_queue_init (&data.output);
4668
4669   RTP_SESSION_LOCK (sess);
4670   /* get a new interval, we need this for various cleanups etc */
4671   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
4672
4673   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
4674
4675   /* we need an internal source now */
4676   if (sess->stats.internal_sources == 0) {
4677     RTPSource *source;
4678     gboolean created;
4679
4680     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
4681         current_time);
4682     sess->internal_ssrc_set = TRUE;
4683
4684     if (created)
4685       on_new_sender_ssrc (sess, source);
4686
4687     g_object_unref (source);
4688   }
4689
4690   sess->conflicting_addresses =
4691       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
4692
4693   /* Make a local copy of the hashtable. We need to do this because the
4694    * cleanup stage below releases the session lock. */
4695   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
4696       (GDestroyNotify) g_object_unref);
4697   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4698       (GHFunc) clone_ssrcs_hashtable, table_copy);
4699
4700   /* Clean up the session, mark the source for removing, this might release the
4701    * session lock. */
4702   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
4703   g_hash_table_destroy (table_copy);
4704
4705   /* Now remove the marked sources */
4706   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
4707       (GHRFunc) remove_closing_sources, &data);
4708
4709   /* update point-to-point status */
4710   session_update_ptp (sess);
4711
4712   /* see if we need to generate SR or RR packets */
4713   if (!is_rtcp_time (sess, current_time, &data))
4714     goto done;
4715
4716   /* check if all the buffers are empty after generation */
4717   all_empty = TRUE;
4718
4719   GST_DEBUG
4720       ("doing RTCP generation %u for %u sources, early %d, may suppress %d",
4721       sess->generation, data.num_to_report, data.is_early, data.may_suppress);
4722
4723   /* generate RTCP for all internal sources */
4724   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4725       (GHFunc) generate_rtcp, &data);
4726
4727   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4728       (GHFunc) generate_twcc, &data);
4729
4730   /* update the generation for all the sources that have been reported */
4731   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4732       (GHFunc) update_generation, &data);
4733
4734   /* we keep track of the last report time in order to timeout inactive
4735    * receivers or senders */
4736   if (!data.is_early) {
4737     GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %"
4738         GST_TIME_FORMAT " = %" GST_TIME_FORMAT,
4739         GST_TIME_ARGS (data.current_time),
4740         GST_TIME_ARGS (sess->last_rtcp_send_time),
4741         GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time));
4742     sess->last_rtcp_send_time = data.current_time;
4743   }
4744
4745   GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
4746       " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time),
4747       GST_TIME_ARGS (sess->last_rtcp_check_time),
4748       GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time));
4749   sess->last_rtcp_check_time = data.current_time;
4750   sess->first_rtcp = FALSE;
4751   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
4752   sess->scheduled_bye = FALSE;
4753
4754 done:
4755   RTP_SESSION_UNLOCK (sess);
4756
4757   /* notify about updated statistics */
4758   g_object_notify_by_pspec (G_OBJECT (sess), properties[PROP_STATS]);
4759
4760   /* push out the RTCP packets */
4761   while ((output = g_queue_pop_head (&data.output))) {
4762     gboolean do_not_suppress, empty_buffer;
4763     GstBuffer *buffer = output->buffer;
4764     RTPSource *source = output->source;
4765
4766     /* Give the user a change to add its own packet */
4767     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
4768         buffer, data.is_early, &do_not_suppress);
4769
4770     empty_buffer = gst_buffer_get_size (buffer) == 0;
4771
4772     if (!empty_buffer)
4773       all_empty = FALSE;
4774
4775     if (sess->callbacks.send_rtcp &&
4776         !empty_buffer && (do_not_suppress || !data.may_suppress)) {
4777       guint packet_size;
4778
4779       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
4780
4781       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
4782       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
4783           sess->stats.avg_rtcp_packet_size, packet_size);
4784       result =
4785           sess->callbacks.send_rtcp (sess, source, buffer,
4786           rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data);
4787
4788       RTP_SESSION_LOCK (sess);
4789       sess->stats.nacks_sent += data.nacked_seqnums;
4790       on_sender_ssrc_active (sess, source);
4791       RTP_SESSION_UNLOCK (sess);
4792     } else {
4793       GST_DEBUG ("freeing packet callback: %p"
4794           " empty_buffer: %d, "
4795           " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp,
4796           empty_buffer, do_not_suppress, data.may_suppress);
4797       if (!empty_buffer) {
4798         RTP_SESSION_LOCK (sess);
4799         sess->stats.nacks_dropped += data.nacked_seqnums;
4800         RTP_SESSION_UNLOCK (sess);
4801       }
4802       gst_buffer_unref (buffer);
4803     }
4804     g_object_unref (source);
4805     g_slice_free (ReportOutput, output);
4806   }
4807
4808   if (all_empty)
4809     GST_ERROR ("generated empty RTCP messages for all the sources");
4810
4811   /* schedule remaining nacks */
4812   RTP_SESSION_LOCK (sess);
4813   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4814       (GHFunc) schedule_remaining_nacks, &data);
4815   RTP_SESSION_UNLOCK (sess);
4816
4817   return result;
4818 }
4819
4820 /**
4821  * rtp_session_request_early_rtcp:
4822  * @sess: an #RTPSession
4823  * @current_time: the current system time
4824  * @max_delay: maximum delay
4825  *
4826  * Request transmission of early RTCP
4827  *
4828  * Returns: %TRUE if the related RTCP can be scheduled.
4829  */
4830 gboolean
4831 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
4832     GstClockTime max_delay)
4833 {
4834   GstClockTime T_dither_max, T_rr, offset = 0;
4835   gboolean ret;
4836   gboolean allow_early;
4837
4838   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
4839
4840   RTP_SESSION_LOCK (sess);
4841
4842   /* We assume a feedback profile if something is requesting RTCP
4843    * to be sent */
4844   sess->rtp_profile = GST_RTP_PROFILE_AVPF;
4845
4846   /* Check if already requested */
4847   /*  RFC 4585 section 3.5.2 step 2 */
4848   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
4849     GST_LOG_OBJECT (sess, "already have next early rtcp time");
4850     ret = (current_time + max_delay > sess->next_early_rtcp_time);
4851     goto end;
4852   }
4853
4854   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
4855     GST_LOG_OBJECT (sess, "no next RTCP check time");
4856     ret = FALSE;
4857     goto end;
4858   }
4859
4860   /* RFC 4585 section 3.5.3 step 1
4861    * If no regular RTCP packet has been sent before, then a regular
4862    * RTCP packet has to be scheduled first and FB messages might be
4863    * included there
4864    */
4865   if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
4866     GST_LOG_OBJECT (sess, "no RTCP sent yet");
4867
4868     if (current_time + max_delay > sess->next_rtcp_check_time) {
4869       GST_LOG_OBJECT (sess,
4870           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4871           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4872           GST_TIME_ARGS (max_delay),
4873           GST_TIME_ARGS (sess->next_rtcp_check_time));
4874       ret = TRUE;
4875     } else {
4876       GST_LOG_OBJECT (sess,
4877           "can't allow early feedback, next scheduled time is too late %"
4878           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4879           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4880           GST_TIME_ARGS (sess->next_rtcp_check_time));
4881       ret = FALSE;
4882     }
4883     goto end;
4884   }
4885
4886   T_rr = sess->last_rtcp_interval;
4887
4888   /*  RFC 4585 section 3.5.2 step 2b */
4889   /* If the total sources is <=2, then there is only us and one peer */
4890   /* When there is one auxiliary stream the session can still do point
4891    * to point.
4892    */
4893   if (sess->is_doing_ptp) {
4894     T_dither_max = 0;
4895   } else {
4896     /* Divide by 2 because l = 0.5 */
4897     T_dither_max = T_rr;
4898     T_dither_max /= 2;
4899   }
4900
4901   /*  RFC 4585 section 3.5.2 step 3 */
4902   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
4903     GST_LOG_OBJECT (sess,
4904         "don't send because of dither, next scheduled time is too soon %"
4905         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
4906         GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
4907         GST_TIME_ARGS (sess->next_rtcp_check_time));
4908     ret = T_dither_max <= max_delay;
4909     goto end;
4910   }
4911
4912   /*  RFC 4585 section 3.5.2 step 4a and
4913    *  RFC 4585 section 3.5.2 step 6 */
4914   allow_early = FALSE;
4915   if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) {
4916     /* Last time we sent a full RTCP packet, we can now immediately
4917      * send an early one as allow_early was reset to TRUE */
4918     allow_early = TRUE;
4919   } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) {
4920     /* Last packet we sent was an early RTCP packet and more than
4921      * T_rr has passed since then, meaning we would have suppressed
4922      * a regular RTCP packet already and reset allow_early to TRUE */
4923     allow_early = TRUE;
4924
4925     /* We have to offset a bit as T_rr has not passed yet, but will before
4926      * max_delay */
4927     if (sess->last_rtcp_check_time + T_rr > current_time)
4928       offset = (sess->last_rtcp_check_time + T_rr) - current_time;
4929   } else {
4930     GST_DEBUG_OBJECT (sess,
4931         "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %"
4932         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %"
4933         GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time),
4934         GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr),
4935         GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay));
4936   }
4937
4938   if (!allow_early) {
4939     /* Ignore the request a scheduled packet will be in time anyway */
4940     if (current_time + max_delay > sess->next_rtcp_check_time) {
4941       GST_LOG_OBJECT (sess,
4942           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4943           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4944           GST_TIME_ARGS (max_delay),
4945           GST_TIME_ARGS (sess->next_rtcp_check_time));
4946       ret = TRUE;
4947     } else {
4948       GST_LOG_OBJECT (sess,
4949           "can't allow early feedback and next scheduled time is too late %"
4950           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4951           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4952           GST_TIME_ARGS (sess->next_rtcp_check_time));
4953       ret = FALSE;
4954     }
4955     goto end;
4956   }
4957
4958   /*  RFC 4585 section 3.5.2 step 4b */
4959   if (T_dither_max) {
4960     /* Schedule an early transmission later */
4961     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
4962         current_time + offset;
4963   } else {
4964     /* If no dithering, schedule it for NOW */
4965     sess->next_early_rtcp_time = current_time + offset;
4966   }
4967
4968   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
4969       ", next regular RTCP time %" GST_TIME_FORMAT,
4970       GST_TIME_ARGS (sess->next_early_rtcp_time),
4971       GST_TIME_ARGS (sess->next_rtcp_check_time));
4972   RTP_SESSION_UNLOCK (sess);
4973
4974   /* notify app of need to send packet early
4975    * and therefore of timeout change */
4976   if (sess->callbacks.reconsider)
4977     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
4978
4979   return TRUE;
4980
4981 end:
4982
4983   RTP_SESSION_UNLOCK (sess);
4984
4985   return ret;
4986 }
4987
4988 static gboolean
4989 rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
4990     GstClockTime max_delay)
4991 {
4992   /* notify the application that we intend to send early RTCP */
4993   if (sess->callbacks.notify_early_rtcp)
4994     sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
4995
4996   return rtp_session_request_early_rtcp (sess, now, max_delay);
4997 }
4998
4999 static gboolean
5000 rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
5001 {
5002   GstClockTime now, max_delay;
5003
5004   if (!sess->callbacks.send_rtcp)
5005     return FALSE;
5006
5007   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
5008
5009   if (deadline < now)
5010     return FALSE;
5011
5012   max_delay = deadline - now;
5013
5014   return rtp_session_send_rtcp_internal (sess, now, max_delay);
5015 }
5016
5017 static gboolean
5018 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
5019 {
5020   GstClockTime now;
5021
5022   if (!sess->callbacks.send_rtcp)
5023     return FALSE;
5024
5025   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
5026
5027   return rtp_session_send_rtcp_internal (sess, now, max_delay);
5028 }
5029
5030 gboolean
5031 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
5032     gboolean fir, gint count)
5033 {
5034   RTPSource *src;
5035
5036   RTP_SESSION_LOCK (sess);
5037   src = find_source (sess, ssrc);
5038   if (src == NULL)
5039     goto no_source;
5040
5041   if (fir) {
5042     src->send_pli = FALSE;
5043     src->send_fir = TRUE;
5044
5045     if (count == -1 || count != src->last_fir_count)
5046       src->current_send_fir_seqnum++;
5047     src->last_fir_count = count;
5048   } else if (!src->send_fir) {
5049     src->send_pli = TRUE;
5050   }
5051   RTP_SESSION_UNLOCK (sess);
5052
5053   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
5054     GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP");
5055   }
5056
5057   return TRUE;
5058
5059   /* ERRORS */
5060 no_source:
5061   {
5062     RTP_SESSION_UNLOCK (sess);
5063     return FALSE;
5064   }
5065 }
5066
5067 /**
5068  * rtp_session_request_nack:
5069  * @sess: a #RTPSession
5070  * @ssrc: the SSRC
5071  * @seqnum: the missing seqnum
5072  * @max_delay: max delay to request NACK
5073  *
5074  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
5075  *
5076  * Returns: %TRUE if the NACK feedback could be scheduled
5077  */
5078 gboolean
5079 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
5080     GstClockTime max_delay)
5081 {
5082   RTPSource *source;
5083   GstClockTime now;
5084
5085   if (!sess->callbacks.send_rtcp)
5086     return FALSE;
5087
5088   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
5089
5090   RTP_SESSION_LOCK (sess);
5091   source = find_source (sess, ssrc);
5092   if (source == NULL)
5093     goto no_source;
5094
5095   GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
5096       ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
5097   rtp_source_register_nack (source, seqnum, now + max_delay);
5098   RTP_SESSION_UNLOCK (sess);
5099
5100   if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
5101     GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
5102   }
5103
5104   return TRUE;
5105
5106   /* ERRORS */
5107 no_source:
5108   {
5109     RTP_SESSION_UNLOCK (sess);
5110     return FALSE;
5111   }
5112 }
5113
5114 /**
5115  * rtp_session_update_recv_caps_structure:
5116  * @sess: an #RTPSession
5117  * @s: a #GstStructure from a #GstCaps
5118  *
5119  * Update the caps of the receiver in the rtp session.
5120  */
5121 void
5122 rtp_session_update_recv_caps_structure (RTPSession * sess,
5123     const GstStructure * s)
5124 {
5125   rtp_twcc_manager_parse_recv_ext_id (sess->twcc, s);
5126 }