good:v4l2bufferpool: Add missed header file
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25 #include <stdlib.h>
26
27 #include <gst/rtp/gstrtpbuffer.h>
28 #include <gst/rtp/gstrtcpbuffer.h>
29
30 #include <gst/glib-compat-private.h>
31
32 #include "rtpsession.h"
33 #include "gstrtputils.h"
34
35 GST_DEBUG_CATEGORY (rtp_session_debug);
36 #define GST_CAT_DEFAULT rtp_session_debug
37
38 /* signals and args */
39 enum
40 {
41   SIGNAL_GET_SOURCE_BY_SSRC,
42   SIGNAL_ON_NEW_SSRC,
43   SIGNAL_ON_SSRC_COLLISION,
44   SIGNAL_ON_SSRC_VALIDATED,
45   SIGNAL_ON_SSRC_ACTIVE,
46   SIGNAL_ON_SSRC_SDES,
47   SIGNAL_ON_BYE_SSRC,
48   SIGNAL_ON_BYE_TIMEOUT,
49   SIGNAL_ON_TIMEOUT,
50   SIGNAL_ON_SENDER_TIMEOUT,
51   SIGNAL_ON_SENDING_RTCP,
52   SIGNAL_ON_APP_RTCP,
53   SIGNAL_ON_FEEDBACK_RTCP,
54   SIGNAL_SEND_RTCP,
55   SIGNAL_SEND_RTCP_FULL,
56   SIGNAL_ON_RECEIVING_RTCP,
57   SIGNAL_ON_NEW_SENDER_SSRC,
58   SIGNAL_ON_SENDER_SSRC_ACTIVE,
59   SIGNAL_ON_SENDING_NACKS,
60   LAST_SIGNAL
61 };
62
63 #define DEFAULT_INTERNAL_SOURCE      NULL
64 #define DEFAULT_BANDWIDTH            0.0
65 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
66 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
67 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
68 #define DEFAULT_RTCP_MTU             1400
69 #define DEFAULT_SDES                 NULL
70 #define DEFAULT_NUM_SOURCES          0
71 #define DEFAULT_NUM_ACTIVE_SOURCES   0
72 #define DEFAULT_SOURCES              NULL
73 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
74 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
75 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
76 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
77 #define DEFAULT_MAX_DROPOUT_TIME     60000
78 #define DEFAULT_MAX_MISORDER_TIME    2000
79 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
80 #define DEFAULT_RTCP_REDUCED_SIZE    FALSE
81 #define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE
82 #define DEFAULT_FAVOR_NEW            FALSE
83 #define DEFAULT_TWCC_FEEDBACK_INTERVAL GST_CLOCK_TIME_NONE
84 #define DEFAULT_UPDATE_NTP64_HEADER_EXT TRUE
85
86 enum
87 {
88   PROP_0,
89   PROP_INTERNAL_SSRC,
90   PROP_INTERNAL_SOURCE,
91   PROP_BANDWIDTH,
92   PROP_RTCP_FRACTION,
93   PROP_RTCP_RR_BANDWIDTH,
94   PROP_RTCP_RS_BANDWIDTH,
95   PROP_RTCP_MTU,
96   PROP_SDES,
97   PROP_NUM_SOURCES,
98   PROP_NUM_ACTIVE_SOURCES,
99   PROP_SOURCES,
100   PROP_FAVOR_NEW,
101   PROP_RTCP_MIN_INTERVAL,
102   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
103   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
104   PROP_PROBATION,
105   PROP_MAX_DROPOUT_TIME,
106   PROP_MAX_MISORDER_TIME,
107   PROP_STATS,
108   PROP_RTP_PROFILE,
109   PROP_RTCP_REDUCED_SIZE,
110   PROP_RTCP_DISABLE_SR_TIMESTAMP,
111   PROP_TWCC_FEEDBACK_INTERVAL,
112   PROP_UPDATE_NTP64_HEADER_EXT,
113   PROP_LAST,
114 };
115
116 static GParamSpec *properties[PROP_LAST];
117
118 /* update average packet size */
119 #define INIT_AVG(avg, val) \
120    (avg) = (val);
121 #define UPDATE_AVG(avg, val)            \
122   if ((avg) == 0)                       \
123    (avg) = (val);                       \
124   else                                  \
125    (avg) = ((val) + (15 * (avg))) >> 4;
126
127 /* GObject vmethods */
128 static void rtp_session_finalize (GObject * object);
129 static void rtp_session_set_property (GObject * object, guint prop_id,
130     const GValue * value, GParamSpec * pspec);
131 static void rtp_session_get_property (GObject * object, guint prop_id,
132     GValue * value, GParamSpec * pspec);
133
134 static gboolean rtp_session_send_rtcp (RTPSession * sess,
135     GstClockTime max_delay);
136 static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
137     GstClockTime deadline);
138
139 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
140
141 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
142
143 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
144 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
145     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
146 static RTPSource *obtain_internal_source (RTPSession * sess,
147     guint32 ssrc, gboolean * created, GstClockTime current_time);
148 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
149     GstClockTime current_time);
150 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
151     gboolean deterministic, gboolean first);
152
153 static gboolean
154 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
155     const GValue * handler_return, gpointer data)
156 {
157   if (g_value_get_boolean (handler_return))
158     g_value_set_boolean (return_accu, TRUE);
159
160   return TRUE;
161 }
162
163 static void
164 rtp_session_class_init (RTPSessionClass * klass)
165 {
166   GObjectClass *gobject_class;
167
168   gobject_class = (GObjectClass *) klass;
169
170   gobject_class->finalize = rtp_session_finalize;
171   gobject_class->set_property = rtp_session_set_property;
172   gobject_class->get_property = rtp_session_get_property;
173
174   /**
175    * RTPSession::get-source-by-ssrc:
176    * @session: the object which received the signal
177    * @ssrc: the SSRC of the RTPSource
178    *
179    * Request the #RTPSource object with SSRC @ssrc in @session.
180    */
181   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
182       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
183       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
184           get_source_by_ssrc), NULL, NULL, NULL,
185       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
186
187   /**
188    * RTPSession::on-new-ssrc:
189    * @session: the object which received the signal
190    * @src: the new RTPSource
191    *
192    * Notify of a new SSRC that entered @session.
193    */
194   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
195       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
196       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
197       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
198   /**
199    * RTPSession::on-ssrc-collision:
200    * @session: the object which received the signal
201    * @src: the #RTPSource that caused a collision
202    *
203    * Notify when we have an SSRC collision
204    */
205   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
206       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
207       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
208       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
209   /**
210    * RTPSession::on-ssrc-validated:
211    * @session: the object which received the signal
212    * @src: the new validated RTPSource
213    *
214    * Notify of a new SSRC that became validated.
215    */
216   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
217       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
218       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
219       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
220   /**
221    * RTPSession::on-ssrc-active:
222    * @session: the object which received the signal
223    * @src: the active RTPSource
224    *
225    * Notify of a SSRC that is active, i.e., sending RTCP.
226    */
227   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
228       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
229       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
230       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
231   /**
232    * RTPSession::on-ssrc-sdes:
233    * @session: the object which received the signal
234    * @src: the RTPSource
235    *
236    * Notify that a new SDES was received for SSRC.
237    */
238   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
239       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
241       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
242   /**
243    * RTPSession::on-bye-ssrc:
244    * @session: the object which received the signal
245    * @src: the RTPSource that went away
246    *
247    * Notify of an SSRC that became inactive because of a BYE packet.
248    */
249   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
250       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
251       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
252       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
253   /**
254    * RTPSession::on-bye-timeout:
255    * @session: the object which received the signal
256    * @src: the RTPSource that timed out
257    *
258    * Notify of an SSRC that has timed out because of BYE
259    */
260   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
261       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
262       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
263       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
264   /**
265    * RTPSession::on-timeout:
266    * @session: the object which received the signal
267    * @src: the RTPSource that timed out
268    *
269    * Notify of an SSRC that has timed out
270    */
271   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
272       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
273       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
274       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
275   /**
276    * RTPSession::on-sender-timeout:
277    * @session: the object which received the signal
278    * @src: the RTPSource that timed out
279    *
280    * Notify of an SSRC that was a sender but timed out and became a receiver.
281    */
282   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
283       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
284       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
285       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
286
287   /**
288    * RTPSession::on-sending-rtcp
289    * @session: the object which received the signal
290    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
291    * @early: %TRUE if the packet is early, %FALSE if it is regular
292    *
293    * This signal is emitted before sending an RTCP packet, it can be used
294    * to add extra RTCP Packets.
295    *
296    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
297    * if suppressing it is acceptable
298    */
299   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
300       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
301       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
302       accumulate_trues, NULL, NULL, G_TYPE_BOOLEAN, 2,
303       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
304
305   /**
306    * RTPSession::on-app-rtcp:
307    * @session: the object which received the signal
308    * @subtype: The subtype of the packet
309    * @ssrc: The SSRC/CSRC of the packet
310    * @name: The name of the packet
311    * @data: a #GstBuffer with the application-dependant data or %NULL if
312    * there was no data
313    *
314    * Notify that a RTCP APP packet has been received
315    */
316   rtp_session_signals[SIGNAL_ON_APP_RTCP] =
317       g_signal_new ("on-app-rtcp", G_TYPE_FROM_CLASS (klass),
318       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_app_rtcp),
319       NULL, NULL, NULL, G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT,
320       G_TYPE_STRING, GST_TYPE_BUFFER);
321
322   /**
323    * RTPSession::on-feedback-rtcp:
324    * @session: the object which received the signal
325    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
326    *  %GST_RTCP_TYPE_RTPFB
327    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
328    * @sender_ssrc: The SSRC of the sender
329    * @media_ssrc: The SSRC of the media this refers to
330    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
331    * there was no FCI
332    *
333    * Notify that a RTCP feedback packet has been received
334    */
335   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
336       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
337       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
338       NULL, NULL, NULL, G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
339       G_TYPE_UINT, GST_TYPE_BUFFER);
340
341   /**
342    * RTPSession::send-rtcp:
343    * @session: the object which received the signal
344    * @max_delay: The maximum delay after which the feedback will not be useful
345    *  anymore
346    *
347    * Requests that the #RTPSession initiate a new RTCP packet as soon as
348    * possible within the requested delay.
349    *
350    * This sets feedback to %TRUE if not already done before.
351    */
352   rtp_session_signals[SIGNAL_SEND_RTCP] =
353       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
354       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
355       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
356       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
357
358   /**
359    * RTPSession::send-rtcp-full:
360    * @session: the object which received the signal
361    * @max_delay: The maximum delay after which the feedback will not be useful
362    *  anymore
363    *
364    * Requests that the #RTPSession initiate a new RTCP packet as soon as
365    * possible within the requested delay.
366    *
367    * This sets feedback to %TRUE if not already done before.
368    *
369    * Returns: TRUE if the new RTCP packet could be scheduled within the
370    * requested delay, FALSE otherwise.
371    *
372    * Since: 1.6
373    */
374   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
375       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
376       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
377       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
378       NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
379
380   /**
381    * RTPSession::on-receiving-rtcp
382    * @session: the object which received the signal
383    * @buffer: the #GstBuffer containing the RTCP packet that was received
384    *
385    * This signal is emitted when receiving an RTCP packet before it is handled
386    * by the session. It can be used to extract custom information from RTCP packets.
387    *
388    * Since: 1.6
389    */
390   rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
391       g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
392       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
393       NULL, NULL, NULL, G_TYPE_NONE, 1,
394       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
395
396   /**
397    * RTPSession::on-new-sender-ssrc:
398    * @session: the object which received the signal
399    * @src: the new sender RTPSource
400    *
401    * Notify of a new sender SSRC that entered @session.
402    *
403    * Since: 1.8
404    */
405   rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
406       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
407       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc),
408       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
409
410   /**
411    * RTPSession::on-sender-ssrc-active:
412    * @session: the object which received the signal
413    * @src: the active sender RTPSource
414    *
415    * Notify of a sender SSRC that is active, i.e., sending RTCP.
416    *
417    * Since: 1.8
418    */
419   rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
420       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
421       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass,
422           on_sender_ssrc_active), NULL, NULL, NULL,
423       G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
424
425   /**
426    * RTPSession::on-sending-nack
427    * @session: the object which received the signal
428    * @sender_ssrc: the sender ssrc
429    * @media_ssrc: the media ssrc
430    * @nacks: (element-type guint16): the list of seqnum to be nacked
431    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
432    *
433    * This signal is emitted before NACK packets are added into the RTCP
434    * packet. This signal can be used to override the conversion of the NACK
435    * seqnum array into packets. This can be used if your protocol uses
436    * different type of NACK (e.g. based on RTCP APP).
437    *
438    * The handler should transform the seqnum from @nacks array into packets.
439    * @nacks seqnum must be consumed from the start. The remaining will be
440    * rescheduled for later base on bandwidth. Only one handler will be
441    * signalled.
442    *
443    * A handler may return 0 to signal that generic NACKs should be created
444    * for this set. This can be useful if the signal is used for other purpose
445    * or if the other type of NACK would use more space.
446    *
447    * Returns: the number of NACK seqnum that was consumed from @nacks.
448    *
449    * Since: 1.16
450    */
451   rtp_session_signals[SIGNAL_ON_SENDING_NACKS] =
452       g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass),
453       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks),
454       g_signal_accumulator_first_wins, NULL, NULL,
455       G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY,
456       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
457
458   properties[PROP_INTERNAL_SSRC] =
459       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
460       "The internal SSRC used for the session (deprecated)",
461       0, G_MAXUINT, 0,
462       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | GST_PARAM_DOC_SHOW_DEFAULT);
463
464   properties[PROP_INTERNAL_SOURCE] =
465       g_param_spec_object ("internal-source", "Internal Source",
466       "The internal source element of the session (deprecated)",
467       RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
468
469   properties[PROP_BANDWIDTH] =
470       g_param_spec_double ("bandwidth", "Bandwidth",
471       "The bandwidth of the session in bits per second (0 for auto-discover)",
472       0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
473       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
474
475   properties[PROP_RTCP_FRACTION] =
476       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
477       "The fraction of the bandwidth used for RTCP in bits per second (or as a real fraction of the RTP bandwidth if < 1)",
478       0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
479       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
480
481   properties[PROP_RTCP_RR_BANDWIDTH] =
482       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
483       "The RTCP bandwidth used for receivers in bits per second (-1 = default)",
484       -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
485       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
486
487   properties[PROP_RTCP_RS_BANDWIDTH] =
488       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
489       "The RTCP bandwidth used for senders in bits per second (-1 = default)",
490       -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
491       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
492
493   properties[PROP_RTCP_MTU] =
494       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
495       "The maximum size of the RTCP packets",
496       16, G_MAXINT16, DEFAULT_RTCP_MTU,
497       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
498
499   properties[PROP_SDES] =
500       g_param_spec_boxed ("sdes", "SDES",
501       "The SDES items of this session",
502       GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
503       | GST_PARAM_DOC_SHOW_DEFAULT);
504
505   properties[PROP_NUM_SOURCES] =
506       g_param_spec_uint ("num-sources", "Num Sources",
507       "The number of sources in the session", 0, G_MAXUINT,
508       DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
509
510   properties[PROP_NUM_ACTIVE_SOURCES] =
511       g_param_spec_uint ("num-active-sources", "Num Active Sources",
512       "The number of active sources in the session", 0, G_MAXUINT,
513       DEFAULT_NUM_ACTIVE_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
514   /**
515    * RTPSource:sources
516    *
517    * Get a GValue Array of all sources in the session.
518    *
519    * ## Getting the #RTPSources of a session
520    *
521    * ``` C
522    * {
523    *   GValueArray *arr;
524    *   GValue *val;
525    *   guint i;
526    *
527    *   g_object_get (sess, "sources", &arr, NULL);
528    *
529    *   for (i = 0; i < arr->n_values; i++) {
530    *     RTPSource *source;
531    *
532    *     val = g_value_array_get_nth (arr, i);
533    *     source = g_value_get_object (val);
534    *   }
535    *   g_value_array_free (arr);
536    * }
537    * ```
538    */
539   properties[PROP_SOURCES] =
540       g_param_spec_boxed ("sources", "Sources",
541       "An array of all known sources in the session",
542       G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
543
544   properties[PROP_FAVOR_NEW] =
545       g_param_spec_boolean ("favor-new", "Favor new sources",
546       "Resolve SSRC conflict in favor of new sources", DEFAULT_FAVOR_NEW,
547       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
548
549   properties[PROP_RTCP_MIN_INTERVAL] =
550       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
551       "Minimum interval between Regular RTCP packet (in ns)",
552       0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
553       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
554
555   properties[PROP_RTCP_FEEDBACK_RETENTION_WINDOW] =
556       g_param_spec_uint64 ("rtcp-feedback-retention-window",
557       "RTCP Feedback retention window",
558       "Duration during which RTCP Feedback packets are retained (in ns)",
559       0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
560       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
561
562   properties[PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD] =
563       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
564       "RTCP Immediate Feedback threshold",
565       "The maximum number of members of a RTP session for which immediate"
566       " feedback is used (DEPRECATED: has no effect and is not needed)",
567       0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
568       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED);
569
570   properties[PROP_PROBATION] =
571       g_param_spec_uint ("probation", "Number of probations",
572       "Consecutive packet sequence numbers to accept the source",
573       0, G_MAXUINT, DEFAULT_PROBATION,
574       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
575
576   properties[PROP_MAX_DROPOUT_TIME] =
577       g_param_spec_uint ("max-dropout-time", "Max dropout time",
578       "The maximum time (milliseconds) of missing packets tolerated.",
579       0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
580       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
581
582   properties[PROP_MAX_MISORDER_TIME] =
583       g_param_spec_uint ("max-misorder-time", "Max misorder time",
584       "The maximum time (milliseconds) of misordered packets tolerated.",
585       0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
586       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
587
588   /**
589    * RTPSession:stats:
590    *
591    * Various session statistics. This property returns a GstStructure
592    * with name application/x-rtp-session-stats with the following fields:
593    *
594    * * "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
595    *      dropped (due to bandwidth constraints)
596    * *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
597    * *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
598    * *  "source-stats"    G_TYPE_BOXED  GValueArray of #RTPSource:stats for all
599    *      RTP sources (Since 1.8)
600    *
601    * Since: 1.4
602    */
603   properties[PROP_STATS] =
604       g_param_spec_boxed ("stats", "Statistics",
605       "Various statistics", GST_TYPE_STRUCTURE,
606       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
607
608   properties[PROP_RTP_PROFILE] =
609       g_param_spec_enum ("rtp-profile", "RTP Profile",
610       "RTP profile to use for this session", GST_TYPE_RTP_PROFILE,
611       DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
612
613   properties[PROP_RTCP_REDUCED_SIZE] =
614       g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size",
615       "Use Reduced Size RTCP for feedback packets",
616       DEFAULT_RTCP_REDUCED_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
617
618   /**
619    * RTPSession:disable-sr-timestamp:
620    *
621    * Whether sender reports should be timestamped.
622    *
623    * Since: 1.16
624    */
625   properties[PROP_RTCP_DISABLE_SR_TIMESTAMP] =
626       g_param_spec_boolean ("disable-sr-timestamp",
627       "Disable Sender Report Timestamp",
628       "Whether sender reports should be timestamped",
629       DEFAULT_RTCP_DISABLE_SR_TIMESTAMP,
630       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
631
632   /**
633    * RTPSession:twcc-feedback-interval:
634    *
635    * The interval to send TWCC reports on.
636    * This overrides the default behavior of sending reports
637    * based on marker-bits.
638    *
639    * Since: 1.20
640    */
641   properties[PROP_TWCC_FEEDBACK_INTERVAL] =
642       g_param_spec_uint64 ("twcc-feedback-interval",
643       "TWCC Feedback Interval",
644       "The interval to send TWCC reports on",
645       0, G_MAXUINT64, DEFAULT_TWCC_FEEDBACK_INTERVAL,
646       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
647
648   /**
649    * RTPSession:update-ntp64-header-ext:
650    *
651    * Whether RTP NTP header extension should be updated with actual
652    * NTP time. If not, use the NTP time from buffer timestamp metadata
653    *
654    * Since: 1.22
655    */
656   properties[PROP_UPDATE_NTP64_HEADER_EXT] =
657       g_param_spec_boolean ("update-ntp64-header-ext",
658       "Update NTP-64 RTP Header Extension",
659       "Whether RTP NTP header extension should be updated with actual NTP time",
660       DEFAULT_UPDATE_NTP64_HEADER_EXT,
661       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
662
663   g_object_class_install_properties (gobject_class, PROP_LAST, properties);
664
665   klass->get_source_by_ssrc =
666       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
667   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
668
669   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
670 }
671
672 static void
673 rtp_session_init (RTPSession * sess)
674 {
675   gint i;
676   gchar *str;
677
678   g_mutex_init (&sess->lock);
679   sess->key = g_random_int ();
680   sess->mask_idx = 0;
681   sess->mask = 0;
682
683   /* TODO: We currently only use the first hash table but this is the
684    * beginning of an implementation for RFC2762
685    for (i = 0; i < 32; i++) {
686    */
687   for (i = 0; i < 1; i++) {
688     sess->ssrcs[i] =
689         g_hash_table_new_full (NULL, NULL, NULL,
690         (GDestroyNotify) g_object_unref);
691   }
692
693   rtp_stats_init_defaults (&sess->stats);
694   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
695   rtp_stats_set_min_interval (&sess->stats,
696       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
697
698   sess->recalc_bandwidth = TRUE;
699   sess->bandwidth = DEFAULT_BANDWIDTH;
700   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
701   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
702   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
703
704   /* default UDP header length */
705   sess->header_len = UDP_IP_HEADER_OVERHEAD;
706   sess->mtu = DEFAULT_RTCP_MTU;
707
708   sess->update_ntp64_header_ext = DEFAULT_UPDATE_NTP64_HEADER_EXT;
709
710   sess->probation = DEFAULT_PROBATION;
711   sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
712   sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
713   sess->favor_new = DEFAULT_FAVOR_NEW;
714
715   /* some default SDES entries */
716   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
717
718   /* we do not want to leak details like the username or hostname here */
719   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
720   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
721   g_free (str);
722
723 #if 0
724   /* we do not want to leak the user's real name here */
725   str = g_strdup_printf ("Anon%u", g_random_int ());
726   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
727   g_free (str);
728 #endif
729
730   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
731
732   /* this is the SSRC we suggest */
733   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
734   sess->internal_ssrc_set = FALSE;
735
736   sess->first_rtcp = TRUE;
737   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
738   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
739   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
740   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
741
742   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
743   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
744   sess->rtcp_immediate_feedback_threshold =
745       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
746   sess->rtp_profile = DEFAULT_RTP_PROFILE;
747   sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
748   sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
749
750   sess->is_doing_ptp = TRUE;
751
752   sess->twcc = rtp_twcc_manager_new (sess->mtu);
753   sess->twcc_stats = rtp_twcc_stats_new ();
754 }
755
756 static void
757 rtp_session_finalize (GObject * object)
758 {
759   RTPSession *sess;
760   gint i;
761
762   sess = RTP_SESSION_CAST (object);
763
764   gst_structure_free (sess->sdes);
765
766   g_list_free_full (sess->conflicting_addresses,
767       (GDestroyNotify) rtp_conflicting_address_free);
768
769   /* TODO: Change this again when implementing RFC 2762
770    * for (i = 0; i < 32; i++)
771    */
772   for (i = 0; i < 1; i++)
773     g_hash_table_destroy (sess->ssrcs[i]);
774
775   g_object_unref (sess->twcc);
776   rtp_twcc_stats_free (sess->twcc_stats);
777
778   g_mutex_clear (&sess->lock);
779
780   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
781 }
782
783 static void
784 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
785 {
786   GValue value = { 0 };
787
788   g_value_init (&value, RTP_TYPE_SOURCE);
789   g_value_take_object (&value, source);
790   /* copies the value */
791   g_value_array_append (arr, &value);
792 }
793
794 static GValueArray *
795 rtp_session_create_sources (RTPSession * sess)
796 {
797   GValueArray *res;
798   guint size;
799
800   RTP_SESSION_LOCK (sess);
801   /* get number of elements in the table */
802   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
803   /* create the result value array */
804   res = g_value_array_new (size);
805
806   /* and copy all values into the array */
807   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
808   RTP_SESSION_UNLOCK (sess);
809
810   return res;
811 }
812
813 static void
814 create_source_stats (gpointer key, RTPSource * source, GValueArray * arr)
815 {
816   GValue *value;
817   GstStructure *s;
818
819   g_object_get (source, "stats", &s, NULL);
820
821   g_value_array_append (arr, NULL);
822   value = g_value_array_get_nth (arr, arr->n_values - 1);
823   g_value_init (value, GST_TYPE_STRUCTURE);
824   g_value_take_boxed (value, s);
825 }
826
827 static GstStructure *
828 rtp_session_create_stats (RTPSession * sess)
829 {
830   GstStructure *s;
831   GValueArray *source_stats;
832   GValue source_stats_v = G_VALUE_INIT;
833   guint size;
834
835   RTP_SESSION_LOCK (sess);
836   s = gst_structure_new ("application/x-rtp-session-stats",
837       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
838       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
839       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
840
841   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
842   source_stats = g_value_array_new (size);
843   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
844       (GHFunc) create_source_stats, source_stats);
845   RTP_SESSION_UNLOCK (sess);
846
847   g_value_init (&source_stats_v, G_TYPE_VALUE_ARRAY);
848   g_value_take_boxed (&source_stats_v, source_stats);
849   gst_structure_take_value (s, "source-stats", &source_stats_v);
850
851   return s;
852 }
853
854 static void
855 rtp_session_set_property (GObject * object, guint prop_id,
856     const GValue * value, GParamSpec * pspec)
857 {
858   RTPSession *sess;
859
860   sess = RTP_SESSION (object);
861
862   switch (prop_id) {
863     case PROP_INTERNAL_SSRC:
864       RTP_SESSION_LOCK (sess);
865       sess->suggested_ssrc = g_value_get_uint (value);
866       sess->internal_ssrc_set = TRUE;
867       sess->internal_ssrc_from_caps_or_property = TRUE;
868       RTP_SESSION_UNLOCK (sess);
869       if (sess->callbacks.reconfigure)
870         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
871       break;
872     case PROP_BANDWIDTH:
873       RTP_SESSION_LOCK (sess);
874       sess->bandwidth = g_value_get_double (value);
875       sess->recalc_bandwidth = TRUE;
876       RTP_SESSION_UNLOCK (sess);
877       break;
878     case PROP_RTCP_FRACTION:
879       RTP_SESSION_LOCK (sess);
880       sess->rtcp_bandwidth = g_value_get_double (value);
881       sess->recalc_bandwidth = TRUE;
882       RTP_SESSION_UNLOCK (sess);
883       break;
884     case PROP_RTCP_RR_BANDWIDTH:
885       RTP_SESSION_LOCK (sess);
886       sess->rtcp_rr_bandwidth = g_value_get_int (value);
887       sess->recalc_bandwidth = TRUE;
888       RTP_SESSION_UNLOCK (sess);
889       break;
890     case PROP_RTCP_RS_BANDWIDTH:
891       RTP_SESSION_LOCK (sess);
892       sess->rtcp_rs_bandwidth = g_value_get_int (value);
893       sess->recalc_bandwidth = TRUE;
894       RTP_SESSION_UNLOCK (sess);
895       break;
896     case PROP_RTCP_MTU:
897       sess->mtu = g_value_get_uint (value);
898       rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu);
899       break;
900     case PROP_SDES:
901       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
902       break;
903     case PROP_FAVOR_NEW:
904       sess->favor_new = g_value_get_boolean (value);
905       break;
906     case PROP_RTCP_MIN_INTERVAL:
907       rtp_stats_set_min_interval (&sess->stats,
908           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
909       /* trigger reconsideration */
910       RTP_SESSION_LOCK (sess);
911       sess->next_rtcp_check_time = 0;
912       RTP_SESSION_UNLOCK (sess);
913       if (sess->callbacks.reconsider)
914         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
915       break;
916     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
917       sess->rtcp_feedback_retention_window = g_value_get_uint64 (value);
918       break;
919     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
920       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
921       break;
922     case PROP_PROBATION:
923       sess->probation = g_value_get_uint (value);
924       break;
925     case PROP_MAX_DROPOUT_TIME:
926       sess->max_dropout_time = g_value_get_uint (value);
927       break;
928     case PROP_MAX_MISORDER_TIME:
929       sess->max_misorder_time = g_value_get_uint (value);
930       break;
931     case PROP_RTP_PROFILE:
932       sess->rtp_profile = g_value_get_enum (value);
933       /* trigger reconsideration */
934       RTP_SESSION_LOCK (sess);
935       sess->next_rtcp_check_time = 0;
936       RTP_SESSION_UNLOCK (sess);
937       if (sess->callbacks.reconsider)
938         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
939       break;
940     case PROP_RTCP_REDUCED_SIZE:
941       sess->reduced_size_rtcp = g_value_get_boolean (value);
942       break;
943     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
944       sess->timestamp_sender_reports = !g_value_get_boolean (value);
945       break;
946     case PROP_TWCC_FEEDBACK_INTERVAL:
947       rtp_twcc_manager_set_feedback_interval (sess->twcc,
948           g_value_get_uint64 (value));
949       break;
950     case PROP_UPDATE_NTP64_HEADER_EXT:
951       sess->update_ntp64_header_ext = g_value_get_boolean (value);
952       break;
953     default:
954       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
955       break;
956   }
957 }
958
959 static void
960 rtp_session_get_property (GObject * object, guint prop_id,
961     GValue * value, GParamSpec * pspec)
962 {
963   RTPSession *sess;
964
965   sess = RTP_SESSION (object);
966
967   switch (prop_id) {
968     case PROP_INTERNAL_SSRC:
969       g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL));
970       break;
971     case PROP_INTERNAL_SOURCE:
972       /* FIXME, return a random source */
973       g_value_set_object (value, NULL);
974       break;
975     case PROP_BANDWIDTH:
976       g_value_set_double (value, sess->bandwidth);
977       break;
978     case PROP_RTCP_FRACTION:
979       g_value_set_double (value, sess->rtcp_bandwidth);
980       break;
981     case PROP_RTCP_RR_BANDWIDTH:
982       g_value_set_int (value, sess->rtcp_rr_bandwidth);
983       break;
984     case PROP_RTCP_RS_BANDWIDTH:
985       g_value_set_int (value, sess->rtcp_rs_bandwidth);
986       break;
987     case PROP_RTCP_MTU:
988       g_value_set_uint (value, sess->mtu);
989       break;
990     case PROP_SDES:
991       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
992       break;
993     case PROP_NUM_SOURCES:
994       g_value_set_uint (value, rtp_session_get_num_sources (sess));
995       break;
996     case PROP_NUM_ACTIVE_SOURCES:
997       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
998       break;
999     case PROP_SOURCES:
1000       g_value_take_boxed (value, rtp_session_create_sources (sess));
1001       break;
1002     case PROP_FAVOR_NEW:
1003       g_value_set_boolean (value, sess->favor_new);
1004       break;
1005     case PROP_RTCP_MIN_INTERVAL:
1006       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
1007       break;
1008     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
1009       g_value_set_uint64 (value, sess->rtcp_feedback_retention_window);
1010       break;
1011     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
1012       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
1013       break;
1014     case PROP_PROBATION:
1015       g_value_set_uint (value, sess->probation);
1016       break;
1017     case PROP_MAX_DROPOUT_TIME:
1018       g_value_set_uint (value, sess->max_dropout_time);
1019       break;
1020     case PROP_MAX_MISORDER_TIME:
1021       g_value_set_uint (value, sess->max_misorder_time);
1022       break;
1023     case PROP_STATS:
1024       g_value_take_boxed (value, rtp_session_create_stats (sess));
1025       break;
1026     case PROP_RTP_PROFILE:
1027       g_value_set_enum (value, sess->rtp_profile);
1028       break;
1029     case PROP_RTCP_REDUCED_SIZE:
1030       g_value_set_boolean (value, sess->reduced_size_rtcp);
1031       break;
1032     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
1033       g_value_set_boolean (value, !sess->timestamp_sender_reports);
1034       break;
1035     case PROP_TWCC_FEEDBACK_INTERVAL:
1036       g_value_set_uint64 (value,
1037           rtp_twcc_manager_get_feedback_interval (sess->twcc));
1038       break;
1039     case PROP_UPDATE_NTP64_HEADER_EXT:
1040       g_value_set_boolean (value, sess->update_ntp64_header_ext);
1041       break;
1042     default:
1043       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1044       break;
1045   }
1046 }
1047
1048 static void
1049 on_new_ssrc (RTPSession * sess, RTPSource * source)
1050 {
1051   g_object_ref (source);
1052   RTP_SESSION_UNLOCK (sess);
1053   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
1054   RTP_SESSION_LOCK (sess);
1055   g_object_unref (source);
1056 }
1057
1058 static void
1059 on_ssrc_collision (RTPSession * sess, RTPSource * source)
1060 {
1061   g_object_ref (source);
1062   RTP_SESSION_UNLOCK (sess);
1063   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
1064       source);
1065   RTP_SESSION_LOCK (sess);
1066   g_object_unref (source);
1067 }
1068
1069 static void
1070 on_ssrc_validated (RTPSession * sess, RTPSource * source)
1071 {
1072   g_object_ref (source);
1073   RTP_SESSION_UNLOCK (sess);
1074   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
1075       source);
1076   RTP_SESSION_LOCK (sess);
1077   g_object_unref (source);
1078 }
1079
1080 static void
1081 on_ssrc_active (RTPSession * sess, RTPSource * source)
1082 {
1083   g_object_ref (source);
1084   RTP_SESSION_UNLOCK (sess);
1085   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
1086   RTP_SESSION_LOCK (sess);
1087   g_object_unref (source);
1088 }
1089
1090 static void
1091 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
1092 {
1093   g_object_ref (source);
1094   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
1095   RTP_SESSION_UNLOCK (sess);
1096   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
1097   RTP_SESSION_LOCK (sess);
1098   g_object_unref (source);
1099 }
1100
1101 static void
1102 on_bye_ssrc (RTPSession * sess, RTPSource * source)
1103 {
1104   g_object_ref (source);
1105   RTP_SESSION_UNLOCK (sess);
1106   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
1107   RTP_SESSION_LOCK (sess);
1108   g_object_unref (source);
1109 }
1110
1111 static void
1112 on_bye_timeout (RTPSession * sess, RTPSource * source)
1113 {
1114   g_object_ref (source);
1115   RTP_SESSION_UNLOCK (sess);
1116   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
1117   RTP_SESSION_LOCK (sess);
1118   g_object_unref (source);
1119 }
1120
1121 static void
1122 on_timeout (RTPSession * sess, RTPSource * source)
1123 {
1124   g_object_ref (source);
1125   RTP_SESSION_UNLOCK (sess);
1126   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
1127   RTP_SESSION_LOCK (sess);
1128   g_object_unref (source);
1129 }
1130
1131 static void
1132 on_sender_timeout (RTPSession * sess, RTPSource * source)
1133 {
1134   g_object_ref (source);
1135   RTP_SESSION_UNLOCK (sess);
1136   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
1137       source);
1138   RTP_SESSION_LOCK (sess);
1139   g_object_unref (source);
1140 }
1141
1142 static void
1143 on_new_sender_ssrc (RTPSession * sess, RTPSource * source)
1144 {
1145   g_object_ref (source);
1146   RTP_SESSION_UNLOCK (sess);
1147   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
1148       source);
1149   RTP_SESSION_LOCK (sess);
1150   g_object_unref (source);
1151 }
1152
1153 static void
1154 on_sender_ssrc_active (RTPSession * sess, RTPSource * source)
1155 {
1156   g_object_ref (source);
1157   RTP_SESSION_UNLOCK (sess);
1158   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
1159       source);
1160   RTP_SESSION_LOCK (sess);
1161   g_object_unref (source);
1162 }
1163
1164 /**
1165  * rtp_session_new:
1166  *
1167  * Create a new session object.
1168  *
1169  * Returns: a new #RTPSession. g_object_unref() after usage.
1170  */
1171 RTPSession *
1172 rtp_session_new (void)
1173 {
1174   RTPSession *sess;
1175
1176   sess = g_object_new (RTP_TYPE_SESSION, NULL);
1177
1178   return sess;
1179 }
1180
1181 /**
1182  * rtp_session_reset:
1183  * @sess: an #RTPSession
1184  *
1185  * Reset the sources of @sess.
1186  */
1187 void
1188 rtp_session_reset (RTPSession * sess)
1189 {
1190   g_return_if_fail (RTP_IS_SESSION (sess));
1191
1192   /* remove all sources */
1193   g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]);
1194   sess->total_sources = 0;
1195   sess->stats.sender_sources = 0;
1196   sess->stats.internal_sender_sources = 0;
1197   sess->stats.internal_sources = 0;
1198   sess->stats.active_sources = 0;
1199
1200   sess->generation = 0;
1201   sess->first_rtcp = TRUE;
1202   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
1203   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
1204   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
1205   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
1206   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
1207   sess->scheduled_bye = FALSE;
1208
1209   /* reset session stats */
1210   sess->stats.bye_members = 0;
1211   sess->stats.nacks_dropped = 0;
1212   sess->stats.nacks_sent = 0;
1213   sess->stats.nacks_received = 0;
1214
1215   sess->is_doing_ptp = TRUE;
1216
1217   g_list_free_full (sess->conflicting_addresses,
1218       (GDestroyNotify) rtp_conflicting_address_free);
1219   sess->conflicting_addresses = NULL;
1220 }
1221
1222 /**
1223  * rtp_session_set_callbacks:
1224  * @sess: an #RTPSession
1225  * @callbacks: callbacks to configure
1226  * @user_data: user data passed in the callbacks
1227  *
1228  * Configure a set of callbacks to be notified of actions.
1229  */
1230 void
1231 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
1232     gpointer user_data)
1233 {
1234   g_return_if_fail (RTP_IS_SESSION (sess));
1235
1236   if (callbacks->process_rtp) {
1237     sess->callbacks.process_rtp = callbacks->process_rtp;
1238     sess->process_rtp_user_data = user_data;
1239   }
1240   if (callbacks->send_rtp) {
1241     sess->callbacks.send_rtp = callbacks->send_rtp;
1242     sess->send_rtp_user_data = user_data;
1243   }
1244   if (callbacks->send_rtcp) {
1245     sess->callbacks.send_rtcp = callbacks->send_rtcp;
1246     sess->send_rtcp_user_data = user_data;
1247   }
1248   if (callbacks->sync_rtcp) {
1249     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
1250     sess->sync_rtcp_user_data = user_data;
1251   }
1252   if (callbacks->caps) {
1253     sess->callbacks.caps = callbacks->caps;
1254     sess->caps_user_data = user_data;
1255   }
1256   if (callbacks->reconsider) {
1257     sess->callbacks.reconsider = callbacks->reconsider;
1258     sess->reconsider_user_data = user_data;
1259   }
1260   if (callbacks->request_key_unit) {
1261     sess->callbacks.request_key_unit = callbacks->request_key_unit;
1262     sess->request_key_unit_user_data = user_data;
1263   }
1264   if (callbacks->request_time) {
1265     sess->callbacks.request_time = callbacks->request_time;
1266     sess->request_time_user_data = user_data;
1267   }
1268   if (callbacks->notify_nack) {
1269     sess->callbacks.notify_nack = callbacks->notify_nack;
1270     sess->notify_nack_user_data = user_data;
1271   }
1272   if (callbacks->notify_twcc) {
1273     sess->callbacks.notify_twcc = callbacks->notify_twcc;
1274     sess->notify_twcc_user_data = user_data;
1275   }
1276   if (callbacks->reconfigure) {
1277     sess->callbacks.reconfigure = callbacks->reconfigure;
1278     sess->reconfigure_user_data = user_data;
1279   }
1280   if (callbacks->notify_early_rtcp) {
1281     sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp;
1282     sess->notify_early_rtcp_user_data = user_data;
1283   }
1284 }
1285
1286 /**
1287  * rtp_session_set_process_rtp_callback:
1288  * @sess: an #RTPSession
1289  * @callback: callback to set
1290  * @user_data: user data passed in the callback
1291  *
1292  * Configure only the process_rtp callback to be notified of the process_rtp action.
1293  */
1294 void
1295 rtp_session_set_process_rtp_callback (RTPSession * sess,
1296     RTPSessionProcessRTP callback, gpointer user_data)
1297 {
1298   g_return_if_fail (RTP_IS_SESSION (sess));
1299
1300   sess->callbacks.process_rtp = callback;
1301   sess->process_rtp_user_data = user_data;
1302 }
1303
1304 /**
1305  * rtp_session_set_send_rtp_callback:
1306  * @sess: an #RTPSession
1307  * @callback: callback to set
1308  * @user_data: user data passed in the callback
1309  *
1310  * Configure only the send_rtp callback to be notified of the send_rtp action.
1311  */
1312 void
1313 rtp_session_set_send_rtp_callback (RTPSession * sess,
1314     RTPSessionSendRTP callback, gpointer user_data)
1315 {
1316   g_return_if_fail (RTP_IS_SESSION (sess));
1317
1318   sess->callbacks.send_rtp = callback;
1319   sess->send_rtp_user_data = user_data;
1320 }
1321
1322 /**
1323  * rtp_session_set_send_rtcp_callback:
1324  * @sess: an #RTPSession
1325  * @callback: callback to set
1326  * @user_data: user data passed in the callback
1327  *
1328  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
1329  */
1330 void
1331 rtp_session_set_send_rtcp_callback (RTPSession * sess,
1332     RTPSessionSendRTCP callback, gpointer user_data)
1333 {
1334   g_return_if_fail (RTP_IS_SESSION (sess));
1335
1336   sess->callbacks.send_rtcp = callback;
1337   sess->send_rtcp_user_data = user_data;
1338 }
1339
1340 /**
1341  * rtp_session_set_sync_rtcp_callback:
1342  * @sess: an #RTPSession
1343  * @callback: callback to set
1344  * @user_data: user data passed in the callback
1345  *
1346  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
1347  */
1348 void
1349 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
1350     RTPSessionSyncRTCP callback, gpointer user_data)
1351 {
1352   g_return_if_fail (RTP_IS_SESSION (sess));
1353
1354   sess->callbacks.sync_rtcp = callback;
1355   sess->sync_rtcp_user_data = user_data;
1356 }
1357
1358 /**
1359  * rtp_session_set_caps_callback:
1360  * @sess: an #RTPSession
1361  * @callback: callback to set
1362  * @user_data: user data passed in the callback
1363  *
1364  * Configure only the clock_rate callback to be notified of the clock_rate action.
1365  */
1366 void
1367 rtp_session_set_caps_callback (RTPSession * sess,
1368     RTPSessionCaps callback, gpointer user_data)
1369 {
1370   g_return_if_fail (RTP_IS_SESSION (sess));
1371
1372   sess->callbacks.caps = callback;
1373   sess->caps_user_data = user_data;
1374 }
1375
1376 /**
1377  * rtp_session_set_reconsider_callback:
1378  * @sess: an #RTPSession
1379  * @callback: callback to set
1380  * @user_data: user data passed in the callback
1381  *
1382  * Configure only the reconsider callback to be notified of the reconsider action.
1383  */
1384 void
1385 rtp_session_set_reconsider_callback (RTPSession * sess,
1386     RTPSessionReconsider callback, gpointer user_data)
1387 {
1388   g_return_if_fail (RTP_IS_SESSION (sess));
1389
1390   sess->callbacks.reconsider = callback;
1391   sess->reconsider_user_data = user_data;
1392 }
1393
1394 /**
1395  * rtp_session_set_request_time_callback:
1396  * @sess: an #RTPSession
1397  * @callback: callback to set
1398  * @user_data: user data passed in the callback
1399  *
1400  * Configure only the request_time callback
1401  */
1402 void
1403 rtp_session_set_request_time_callback (RTPSession * sess,
1404     RTPSessionRequestTime callback, gpointer user_data)
1405 {
1406   g_return_if_fail (RTP_IS_SESSION (sess));
1407
1408   sess->callbacks.request_time = callback;
1409   sess->request_time_user_data = user_data;
1410 }
1411
1412 /**
1413  * rtp_session_set_bandwidth:
1414  * @sess: an #RTPSession
1415  * @bandwidth: the bandwidth allocated
1416  *
1417  * Set the session bandwidth in bits per second.
1418  */
1419 void
1420 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1421 {
1422   g_return_if_fail (RTP_IS_SESSION (sess));
1423
1424   RTP_SESSION_LOCK (sess);
1425   sess->stats.bandwidth = bandwidth;
1426   RTP_SESSION_UNLOCK (sess);
1427 }
1428
1429 /**
1430  * rtp_session_get_bandwidth:
1431  * @sess: an #RTPSession
1432  *
1433  * Get the session bandwidth.
1434  *
1435  * Returns: the session bandwidth.
1436  */
1437 gdouble
1438 rtp_session_get_bandwidth (RTPSession * sess)
1439 {
1440   gdouble result;
1441
1442   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1443
1444   RTP_SESSION_LOCK (sess);
1445   result = sess->stats.bandwidth;
1446   RTP_SESSION_UNLOCK (sess);
1447
1448   return result;
1449 }
1450
1451 /**
1452  * rtp_session_set_rtcp_fraction:
1453  * @sess: an #RTPSession
1454  * @bandwidth: the RTCP bandwidth
1455  *
1456  * Set the bandwidth in bits per second that should be used for RTCP
1457  * messages.
1458  */
1459 void
1460 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1461 {
1462   g_return_if_fail (RTP_IS_SESSION (sess));
1463
1464   RTP_SESSION_LOCK (sess);
1465   sess->stats.rtcp_bandwidth = bandwidth;
1466   RTP_SESSION_UNLOCK (sess);
1467 }
1468
1469 /**
1470  * rtp_session_get_rtcp_fraction:
1471  * @sess: an #RTPSession
1472  *
1473  * Get the session bandwidth used for RTCP.
1474  *
1475  * Returns: The bandwidth used for RTCP messages.
1476  */
1477 gdouble
1478 rtp_session_get_rtcp_fraction (RTPSession * sess)
1479 {
1480   gdouble result;
1481
1482   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1483
1484   RTP_SESSION_LOCK (sess);
1485   result = sess->stats.rtcp_bandwidth;
1486   RTP_SESSION_UNLOCK (sess);
1487
1488   return result;
1489 }
1490
1491 /**
1492  * rtp_session_get_sdes_struct:
1493  * @sess: an #RTSPSession
1494  *
1495  * Get the SDES data as a #GstStructure
1496  *
1497  * Returns: a GstStructure with SDES items for @sess. This function returns a
1498  * copy of the SDES structure, use gst_structure_free() after usage.
1499  */
1500 GstStructure *
1501 rtp_session_get_sdes_struct (RTPSession * sess)
1502 {
1503   GstStructure *result = NULL;
1504
1505   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1506
1507   RTP_SESSION_LOCK (sess);
1508   if (sess->sdes)
1509     result = gst_structure_copy (sess->sdes);
1510   RTP_SESSION_UNLOCK (sess);
1511
1512   return result;
1513 }
1514
1515 static void
1516 source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes)
1517 {
1518   rtp_source_set_sdes_struct (source, gst_structure_copy (sdes));
1519 }
1520
1521 /**
1522  * rtp_session_set_sdes_struct:
1523  * @sess: an #RTSPSession
1524  * @sdes: a #GstStructure
1525  *
1526  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1527  */
1528 void
1529 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1530 {
1531   g_return_if_fail (sdes);
1532   g_return_if_fail (RTP_IS_SESSION (sess));
1533
1534   RTP_SESSION_LOCK (sess);
1535   if (sess->sdes)
1536     gst_structure_free (sess->sdes);
1537   sess->sdes = gst_structure_copy (sdes);
1538
1539   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1540       (GHFunc) source_set_sdes, sess->sdes);
1541   RTP_SESSION_UNLOCK (sess);
1542 }
1543
1544 static GstFlowReturn
1545 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1546 {
1547   GstFlowReturn result = GST_FLOW_OK;
1548
1549   if (source->internal) {
1550     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1551
1552     RTP_SESSION_UNLOCK (session);
1553
1554     if (session->callbacks.send_rtp)
1555       result =
1556           session->callbacks.send_rtp (session, source, data,
1557           session->send_rtp_user_data);
1558     else {
1559       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1560     }
1561   } else {
1562     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1563     RTP_SESSION_UNLOCK (session);
1564
1565     if (session->callbacks.process_rtp)
1566       result =
1567           session->callbacks.process_rtp (session, source,
1568           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1569     else
1570       gst_buffer_unref (GST_BUFFER_CAST (data));
1571   }
1572   RTP_SESSION_LOCK (session);
1573
1574   return result;
1575 }
1576
1577 static GstCaps *
1578 source_caps (RTPSource * source, guint8 pt, RTPSession * session)
1579 {
1580   GstCaps *result = NULL;
1581
1582   RTP_SESSION_UNLOCK (session);
1583
1584   if (session->callbacks.caps)
1585     result = session->callbacks.caps (session, pt, session->caps_user_data);
1586
1587   RTP_SESSION_LOCK (session);
1588
1589   GST_DEBUG ("got caps %" GST_PTR_FORMAT " for pt %d", result, pt);
1590
1591   return result;
1592 }
1593
1594 static RTPSourceCallbacks callbacks = {
1595   (RTPSourcePushRTP) source_push_rtp,
1596   (RTPSourceCaps) source_caps,
1597 };
1598
1599
1600 /**
1601  * rtp_session_find_conflicting_address:
1602  * @session: The session the packet came in
1603  * @address: address to check for
1604  * @time: The time when the packet that is possibly in conflict arrived
1605  *
1606  * Checks if an address which has a conflict is already known. If it is
1607  * a known conflict, remember the time
1608  *
1609  * Returns: TRUE if it was a known conflict, FALSE otherwise
1610  */
1611 static gboolean
1612 rtp_session_find_conflicting_address (RTPSession * session,
1613     GSocketAddress * address, GstClockTime time)
1614 {
1615   return find_conflicting_address (session->conflicting_addresses, address,
1616       time);
1617 }
1618
1619 /**
1620  * rtp_session_add_conflicting_address:
1621  * @session: The session the packet came in
1622  * @address: address to remember
1623  * @time: The time when the packet that is in conflict arrived
1624  *
1625  * Adds a new conflict address
1626  */
1627 static void
1628 rtp_session_add_conflicting_address (RTPSession * sess,
1629     GSocketAddress * address, GstClockTime time)
1630 {
1631   sess->conflicting_addresses =
1632       add_conflicting_address (sess->conflicting_addresses, address, time);
1633 }
1634
1635 static void
1636 rtp_session_have_conflict (RTPSession * sess, RTPSource * source,
1637     GSocketAddress * address, GstClockTime current_time)
1638 {
1639   guint32 ssrc = rtp_source_get_ssrc (source);
1640
1641   /* Its a new collision, lets change our SSRC */
1642   rtp_session_add_conflicting_address (sess, address, current_time);
1643
1644   /* mark the source BYE */
1645   rtp_source_mark_bye (source, "SSRC Collision");
1646   /* if we were suggesting this SSRC, change to something else */
1647   if (sess->suggested_ssrc == ssrc) {
1648     sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1649     sess->internal_ssrc_set = TRUE;
1650   }
1651
1652   on_ssrc_collision (sess, source);
1653
1654   rtp_session_schedule_bye_locked (sess, current_time);
1655 }
1656
1657 static gboolean
1658 check_collision (RTPSession * sess, RTPSource * source,
1659     RTPPacketInfo * pinfo, gboolean rtp)
1660 {
1661   guint32 ssrc;
1662
1663   /* If we have no pinfo address, we can't do collision checking */
1664   if (!pinfo->address)
1665     return FALSE;
1666
1667   ssrc = rtp_source_get_ssrc (source);
1668
1669   if (!source->internal) {
1670     GSocketAddress *from;
1671
1672     /* This is not our local source, but lets check if two remote
1673      * source collide */
1674     if (rtp) {
1675       from = source->rtp_from;
1676     } else {
1677       from = source->rtcp_from;
1678     }
1679
1680     if (from) {
1681       if (__g_socket_address_equal (from, pinfo->address)) {
1682         /* Address is the same */
1683         return FALSE;
1684       } else {
1685         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1686         if (sess->favor_new) {
1687           if (rtp_source_find_conflicting_address (source,
1688                   pinfo->address, pinfo->current_time)) {
1689             gchar *buf1;
1690
1691             buf1 = __g_socket_address_to_string (pinfo->address);
1692             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1693                 buf1);
1694             g_free (buf1);
1695
1696             return TRUE;
1697           } else {
1698             gchar *buf1, *buf2;
1699
1700             /* Current address is not a known conflict, lets assume this is
1701              * a new source. Save old address in possible conflict list
1702              */
1703             rtp_source_add_conflicting_address (source, from,
1704                 pinfo->current_time);
1705
1706             buf1 = __g_socket_address_to_string (from);
1707             buf2 = __g_socket_address_to_string (pinfo->address);
1708
1709             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1710                 " saving old as known conflict", ssrc, buf1, buf2);
1711
1712             if (rtp)
1713               rtp_source_set_rtp_from (source, pinfo->address);
1714             else
1715               rtp_source_set_rtcp_from (source, pinfo->address);
1716
1717             g_free (buf1);
1718             g_free (buf2);
1719
1720             return FALSE;
1721           }
1722         } else {
1723           /* Don't need to save old addresses, we ignore new sources */
1724           return TRUE;
1725         }
1726       }
1727     } else {
1728       /* We don't already have a from address for RTP, just set it */
1729       if (rtp)
1730         rtp_source_set_rtp_from (source, pinfo->address);
1731       else
1732         rtp_source_set_rtcp_from (source, pinfo->address);
1733       return FALSE;
1734     }
1735
1736     /* FIXME: Log 3rd party collision somehow
1737      * Maybe should be done in upper layer, only the SDES can tell us
1738      * if its a collision or a loop
1739      */
1740   } else {
1741     /* This is sending with our ssrc, is it an address we already know */
1742     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1743             pinfo->current_time)) {
1744       /* Its a known conflict, its probably a loop, not a collision
1745        * lets just drop the incoming packet
1746        */
1747       GST_DEBUG ("Our packets are being looped back to us, dropping");
1748     } else {
1749       GST_DEBUG ("Collision for SSRC %x from new incoming packet,"
1750           " change our sender ssrc", ssrc);
1751
1752       rtp_session_have_conflict (sess, source, pinfo->address,
1753           pinfo->current_time);
1754     }
1755   }
1756
1757   return TRUE;
1758 }
1759
1760 typedef struct
1761 {
1762   gboolean is_doing_ptp;
1763   GSocketAddress *new_addr;
1764 } CompareAddrData;
1765
1766 /* check if the two given ip addr are the same (do not care about the port) */
1767 static gboolean
1768 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1769 {
1770   return
1771       g_inet_address_equal (g_inet_socket_address_get_address
1772       (G_INET_SOCKET_ADDRESS (a)),
1773       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1774 }
1775
1776 static void
1777 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1778     CompareAddrData * data)
1779 {
1780   /* only compare ip addr of remote sources which are also not closing */
1781   if (!source->internal && !source->closing && source->rtp_from) {
1782     /* look for the first rtp source */
1783     if (!data->new_addr)
1784       data->new_addr = source->rtp_from;
1785     /* compare current ip addr with the first one */
1786     else
1787       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1788   }
1789 }
1790
1791 static void
1792 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1793     CompareAddrData * data)
1794 {
1795   /* only compare ip addr of remote sources which are also not closing */
1796   if (!source->internal && !source->closing && source->rtcp_from) {
1797     /* look for the first rtcp source */
1798     if (!data->new_addr)
1799       data->new_addr = source->rtcp_from;
1800     else
1801       /* compare current ip addr with the first one */
1802       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1803   }
1804 }
1805
1806 /* loop over our non-internal source to know if the session
1807  * is doing point-to-point */
1808 static void
1809 session_update_ptp (RTPSession * sess)
1810 {
1811   /* to know if the session is doing point to point, the ip addr
1812    * of each non-internal (=remotes) source have to be compared
1813    * to each other.
1814    */
1815   gboolean is_doing_rtp_ptp;
1816   gboolean is_doing_rtcp_ptp;
1817   CompareAddrData data;
1818
1819   /* compare the first remote source's ip addr that receive rtp packets
1820    * with other remote rtp source.
1821    * it's enough because the session just needs to know if they are all
1822    * equals or not
1823    */
1824   data.is_doing_ptp = TRUE;
1825   data.new_addr = NULL;
1826   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1827       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1828   is_doing_rtp_ptp = data.is_doing_ptp;
1829
1830   /* same but about rtcp */
1831   data.is_doing_ptp = TRUE;
1832   data.new_addr = NULL;
1833   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1834       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1835   is_doing_rtcp_ptp = data.is_doing_ptp;
1836
1837   /* the session is doing point-to-point if all rtp remote have the same
1838    * ip addr and if all rtcp remote sources have the same ip addr */
1839   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1840
1841   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1842 }
1843
1844 static void
1845 add_source (RTPSession * sess, RTPSource * src)
1846 {
1847   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1848       GINT_TO_POINTER (src->ssrc), src);
1849   /* report the new source ASAP */
1850   src->generation = sess->generation;
1851   /* we have one more source now */
1852   sess->total_sources++;
1853   if (RTP_SOURCE_IS_ACTIVE (src))
1854     sess->stats.active_sources++;
1855   if (src->internal) {
1856     sess->stats.internal_sources++;
1857     if (!sess->internal_ssrc_from_caps_or_property
1858         && sess->suggested_ssrc != src->ssrc) {
1859       sess->suggested_ssrc = src->ssrc;
1860       sess->internal_ssrc_set = TRUE;
1861     }
1862   }
1863
1864   /* update point-to-point status */
1865   if (!src->internal)
1866     session_update_ptp (sess);
1867 }
1868
1869 static RTPSource *
1870 find_source (RTPSession * sess, guint32 ssrc)
1871 {
1872   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1873       GINT_TO_POINTER (ssrc));
1874 }
1875
1876 /* must be called with the session lock, the returned source needs to be
1877  * unreffed after usage. */
1878 static RTPSource *
1879 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1880     RTPPacketInfo * pinfo, gboolean rtp)
1881 {
1882   RTPSource *source;
1883
1884   source = find_source (sess, ssrc);
1885   if (source == NULL) {
1886     /* make new Source in probation and insert */
1887     source = rtp_source_new (ssrc);
1888
1889     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1890
1891     /* for RTP packets we need to set the source in probation. Receiving RTCP
1892      * packets of an SSRC, on the other hand, is a strong indication that we
1893      * are dealing with a valid source. */
1894     g_object_set (source, "probation", rtp ? sess->probation : 0,
1895         "max-dropout-time", sess->max_dropout_time, "max-misorder-time",
1896         sess->max_misorder_time, NULL);
1897
1898     /* store from address, if any */
1899     if (pinfo->address) {
1900       if (rtp)
1901         rtp_source_set_rtp_from (source, pinfo->address);
1902       else
1903         rtp_source_set_rtcp_from (source, pinfo->address);
1904     }
1905
1906     /* configure a callback on the source */
1907     rtp_source_set_callbacks (source, &callbacks, sess);
1908
1909     add_source (sess, source);
1910     *created = TRUE;
1911   } else {
1912     *created = FALSE;
1913     /* check for collision, this updates the address when not previously set */
1914     if (check_collision (sess, source, pinfo, rtp)) {
1915       return NULL;
1916     }
1917     /* Receiving RTCP packets of an SSRC is a strong indication that we
1918      * are dealing with a valid source. */
1919     if (!rtp)
1920       g_object_set (source, "probation", 0, NULL);
1921   }
1922   /* update last activity */
1923   source->last_activity = pinfo->current_time;
1924   if (rtp)
1925     source->last_rtp_activity = pinfo->current_time;
1926   g_object_ref (source);
1927
1928   return source;
1929 }
1930
1931 /* must be called with the session lock, the returned source needs to be
1932  * unreffed after usage. */
1933 static RTPSource *
1934 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1935     GstClockTime current_time)
1936 {
1937   RTPSource *source;
1938
1939   source = find_source (sess, ssrc);
1940   if (source == NULL) {
1941     /* make new internal Source and insert */
1942     source = rtp_source_new (ssrc);
1943
1944     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1945
1946     source->validated = TRUE;
1947     source->internal = TRUE;
1948     source->probation = 0;
1949     source->curr_probation = 0;
1950     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1951     rtp_source_set_callbacks (source, &callbacks, sess);
1952
1953     add_source (sess, source);
1954     *created = TRUE;
1955   } else {
1956     *created = FALSE;
1957   }
1958   /* update last activity */
1959   if (current_time != GST_CLOCK_TIME_NONE) {
1960     source->last_activity = current_time;
1961     source->last_rtp_activity = current_time;
1962   }
1963   g_object_ref (source);
1964
1965   return source;
1966 }
1967
1968 /**
1969  * rtp_session_suggest_ssrc:
1970  * @sess: a #RTPSession
1971  * @is_random: if the suggested ssrc is random
1972  *
1973  * Suggest an unused SSRC in @sess.
1974  *
1975  * Returns: a free unused SSRC
1976  */
1977 guint32
1978 rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random)
1979 {
1980   guint32 result;
1981
1982   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1983
1984   RTP_SESSION_LOCK (sess);
1985   result = sess->suggested_ssrc;
1986   if (is_random)
1987     *is_random = !sess->internal_ssrc_set;
1988   RTP_SESSION_UNLOCK (sess);
1989
1990   return result;
1991 }
1992
1993 /**
1994  * rtp_session_add_source:
1995  * @sess: a #RTPSession
1996  * @src: #RTPSource to add
1997  *
1998  * Add @src to @session.
1999  *
2000  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
2001  * existed in the session.
2002  */
2003 gboolean
2004 rtp_session_add_source (RTPSession * sess, RTPSource * src)
2005 {
2006   gboolean result = FALSE;
2007   RTPSource *find;
2008
2009   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
2010   g_return_val_if_fail (src != NULL, FALSE);
2011
2012   RTP_SESSION_LOCK (sess);
2013   find = find_source (sess, src->ssrc);
2014   if (find == NULL) {
2015     add_source (sess, src);
2016     result = TRUE;
2017   }
2018   RTP_SESSION_UNLOCK (sess);
2019
2020   return result;
2021 }
2022
2023 /**
2024  * rtp_session_get_num_sources:
2025  * @sess: an #RTPSession
2026  *
2027  * Get the number of sources in @sess.
2028  *
2029  * Returns: The number of sources in @sess.
2030  */
2031 guint
2032 rtp_session_get_num_sources (RTPSession * sess)
2033 {
2034   guint result;
2035
2036   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
2037
2038   RTP_SESSION_LOCK (sess);
2039   result = sess->total_sources;
2040   RTP_SESSION_UNLOCK (sess);
2041
2042   return result;
2043 }
2044
2045 /**
2046  * rtp_session_get_num_active_sources:
2047  * @sess: an #RTPSession
2048  *
2049  * Get the number of active sources in @sess. A source is considered active when
2050  * it has been validated and has not yet received a BYE RTCP message.
2051  *
2052  * Returns: The number of active sources in @sess.
2053  */
2054 guint
2055 rtp_session_get_num_active_sources (RTPSession * sess)
2056 {
2057   guint result;
2058
2059   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
2060
2061   RTP_SESSION_LOCK (sess);
2062   result = sess->stats.active_sources;
2063   RTP_SESSION_UNLOCK (sess);
2064
2065   return result;
2066 }
2067
2068 /**
2069  * rtp_session_get_source_by_ssrc:
2070  * @sess: an #RTPSession
2071  * @ssrc: an SSRC
2072  *
2073  * Find the source with @ssrc in @sess.
2074  *
2075  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
2076  * g_object_unref() after usage.
2077  */
2078 RTPSource *
2079 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
2080 {
2081   RTPSource *result;
2082
2083   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
2084
2085   RTP_SESSION_LOCK (sess);
2086   result = find_source (sess, ssrc);
2087   if (result != NULL)
2088     g_object_ref (result);
2089   RTP_SESSION_UNLOCK (sess);
2090
2091   return result;
2092 }
2093
2094 /* should be called with the SESSION lock */
2095 static guint32
2096 rtp_session_create_new_ssrc (RTPSession * sess)
2097 {
2098   guint32 ssrc;
2099
2100   while (TRUE) {
2101     ssrc = g_random_int ();
2102
2103     /* see if it exists in the session, we're done if it doesn't */
2104     if (find_source (sess, ssrc) == NULL)
2105       break;
2106   }
2107   return ssrc;
2108 }
2109
2110 static gboolean
2111 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
2112 {
2113   GstNetAddressMeta *meta;
2114
2115   /* get packet size including header overhead */
2116   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
2117   pinfo->packets++;
2118
2119   if (pinfo->rtp) {
2120     GstRTPBuffer rtp = { NULL };
2121
2122     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
2123       goto invalid_packet;
2124
2125     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
2126     if (idx == 0) {
2127       gint i;
2128
2129       /* only keep info for first buffer */
2130       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
2131       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
2132       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
2133       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2134       pinfo->marker = gst_rtp_buffer_get_marker (&rtp);
2135       /* copy available csrc */
2136       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
2137       for (i = 0; i < pinfo->csrc_count; i++)
2138         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
2139
2140       /* RTP header extensions */
2141       pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp,
2142           &pinfo->header_ext_bit_pattern);
2143     }
2144
2145     if (pinfo->ntp64_ext_id != 0 && pinfo->send && !pinfo->have_ntp64_ext) {
2146       guint8 *data;
2147       guint size;
2148
2149       /* Remember here that there is a 64-bit NTP header extension on this buffer
2150        * or any of the other buffers in the buffer list.
2151        * Later we update this after making the buffer(list) writable.
2152        */
2153       if ((gst_rtp_buffer_get_extension_onebyte_header (&rtp,
2154                   pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
2155               && size == 8)
2156           || (gst_rtp_buffer_get_extension_twobytes_header (&rtp, NULL,
2157                   pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
2158               && size == 8)) {
2159         pinfo->have_ntp64_ext = TRUE;
2160       }
2161     }
2162
2163     gst_rtp_buffer_unmap (&rtp);
2164   }
2165
2166   if (idx == 0) {
2167     /* for netbuffer we can store the IP address to check for collisions */
2168     meta = gst_buffer_get_net_address_meta (*buffer);
2169     if (pinfo->address)
2170       g_object_unref (pinfo->address);
2171     if (meta) {
2172       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
2173     } else {
2174       pinfo->address = NULL;
2175     }
2176   }
2177   return TRUE;
2178
2179   /* ERRORS */
2180 invalid_packet:
2181   {
2182     GST_DEBUG ("invalid RTP packet received");
2183     return FALSE;
2184   }
2185 }
2186
2187 /* update the RTPPacketInfo structure with the current time and other bits
2188  * about the current buffer we are handling.
2189  * This function is typically called when a validated packet is received.
2190  * This function should be called with the RTP_SESSION_LOCK
2191  */
2192 static gboolean
2193 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
2194     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
2195     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2196 {
2197   gboolean res;
2198
2199   pinfo->send = send;
2200   pinfo->rtp = rtp;
2201   pinfo->is_list = is_list;
2202   pinfo->data = data;
2203   pinfo->current_time = current_time;
2204   pinfo->running_time = running_time;
2205   pinfo->ntpnstime = ntpnstime;
2206   pinfo->header_len = sess->header_len;
2207   pinfo->bytes = 0;
2208   pinfo->payload_len = 0;
2209   pinfo->packets = 0;
2210   pinfo->marker = FALSE;
2211   pinfo->ntp64_ext_id = send ? sess->send_ntp64_ext_id : 0;
2212   pinfo->have_ntp64_ext = FALSE;
2213
2214   if (is_list) {
2215     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
2216     res =
2217         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
2218         pinfo);
2219     pinfo->arrival_time = GST_CLOCK_TIME_NONE;
2220   } else {
2221     GstBuffer *buffer = GST_BUFFER_CAST (data);
2222     res = update_packet (&buffer, 0, pinfo);
2223     pinfo->arrival_time = GST_BUFFER_DTS (buffer);
2224   }
2225
2226   return res;
2227 }
2228
2229 static void
2230 clean_packet_info (RTPPacketInfo * pinfo)
2231 {
2232   if (pinfo->address)
2233     g_object_unref (pinfo->address);
2234   if (pinfo->data) {
2235     gst_mini_object_unref (pinfo->data);
2236     pinfo->data = NULL;
2237   }
2238   if (pinfo->header_ext)
2239     g_bytes_unref (pinfo->header_ext);
2240 }
2241
2242 static gboolean
2243 source_update_active (RTPSession * sess, RTPSource * source,
2244     gboolean prevactive)
2245 {
2246   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
2247   guint32 ssrc = source->ssrc;
2248
2249   if (prevactive == active)
2250     return FALSE;
2251
2252   if (active) {
2253     sess->stats.active_sources++;
2254     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
2255         sess->stats.active_sources);
2256   } else {
2257     sess->stats.active_sources--;
2258     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
2259         sess->stats.active_sources);
2260   }
2261   return TRUE;
2262 }
2263
2264 static void
2265 process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
2266 {
2267   if (rtp_twcc_manager_recv_packet (sess->twcc, pinfo)) {
2268     RTP_SESSION_UNLOCK (sess);
2269
2270     /* TODO: find a better rational for this number, and possibly tune it based
2271        on factors like framerate / bandwidth etc */
2272     if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) {
2273       GST_INFO ("Could not schedule TWCC straight away");
2274     }
2275     RTP_SESSION_LOCK (sess);
2276   }
2277 }
2278
2279 static gboolean
2280 source_update_sender (RTPSession * sess, RTPSource * source,
2281     gboolean prevsender)
2282 {
2283   gboolean sender = RTP_SOURCE_IS_SENDER (source);
2284   guint32 ssrc = source->ssrc;
2285
2286   if (prevsender == sender)
2287     return FALSE;
2288
2289   if (sender) {
2290     sess->stats.sender_sources++;
2291     if (source->internal)
2292       sess->stats.internal_sender_sources++;
2293     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
2294         sess->stats.sender_sources);
2295   } else {
2296     sess->stats.sender_sources--;
2297     if (source->internal)
2298       sess->stats.internal_sender_sources--;
2299     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
2300         sess->stats.sender_sources);
2301   }
2302   return TRUE;
2303 }
2304
2305 /**
2306  * rtp_session_process_rtp:
2307  * @sess: and #RTPSession
2308  * @buffer: an RTP buffer
2309  * @current_time: the current system time
2310  * @running_time: the running_time of @buffer
2311  *
2312  * Process an RTP buffer in the session manager. This function takes ownership
2313  * of @buffer.
2314  *
2315  * Returns: a #GstFlowReturn.
2316  */
2317 GstFlowReturn
2318 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
2319     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2320 {
2321   GstFlowReturn result;
2322   guint32 ssrc;
2323   RTPSource *source;
2324   gboolean created;
2325   gboolean prevsender, prevactive;
2326   RTPPacketInfo pinfo = { 0, };
2327   guint64 oldrate;
2328
2329   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2330   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2331
2332   RTP_SESSION_LOCK (sess);
2333
2334   /* update pinfo stats */
2335   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
2336           current_time, running_time, ntpnstime)) {
2337     GST_DEBUG ("invalid RTP packet received");
2338     RTP_SESSION_UNLOCK (sess);
2339     return rtp_session_process_rtcp (sess, buffer, current_time, running_time,
2340         ntpnstime);
2341   }
2342
2343   ssrc = pinfo.ssrc;
2344
2345   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
2346   if (!source)
2347     goto collision;
2348
2349   prevsender = RTP_SOURCE_IS_SENDER (source);
2350   prevactive = RTP_SOURCE_IS_ACTIVE (source);
2351   oldrate = source->bitrate;
2352
2353   if (created)
2354     on_new_ssrc (sess, source);
2355
2356   /* let source process the packet */
2357   result = rtp_source_process_rtp (source, &pinfo);
2358   process_twcc_packet (sess, &pinfo);
2359
2360   /* source became active */
2361   if (source_update_active (sess, source, prevactive))
2362     on_ssrc_validated (sess, source);
2363
2364   source_update_sender (sess, source, prevsender);
2365
2366   if (oldrate != source->bitrate)
2367     sess->recalc_bandwidth = TRUE;
2368
2369
2370   if (source->validated) {
2371     gboolean created;
2372     gint i;
2373
2374     /* for validated sources, we add the CSRCs as well */
2375     for (i = 0; i < pinfo.csrc_count; i++) {
2376       guint32 csrc;
2377       RTPSource *csrc_src;
2378
2379       csrc = pinfo.csrcs[i];
2380
2381       /* get source */
2382       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
2383       if (!csrc_src)
2384         continue;
2385
2386       if (created) {
2387         GST_DEBUG ("created new CSRC: %08x", csrc);
2388         rtp_source_set_as_csrc (csrc_src);
2389         source_update_active (sess, csrc_src, FALSE);
2390         on_new_ssrc (sess, csrc_src);
2391       }
2392       g_object_unref (csrc_src);
2393     }
2394   }
2395   g_object_unref (source);
2396
2397   RTP_SESSION_UNLOCK (sess);
2398
2399   clean_packet_info (&pinfo);
2400
2401   return result;
2402
2403   /* ERRORS */
2404 collision:
2405   {
2406     RTP_SESSION_UNLOCK (sess);
2407     clean_packet_info (&pinfo);
2408     GST_DEBUG ("ignoring packet because its collisioning");
2409     return GST_FLOW_OK;
2410   }
2411 }
2412
2413 static void
2414 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2415     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2416 {
2417   guint count, i;
2418
2419   count = gst_rtcp_packet_get_rb_count (packet);
2420   for (i = 0; i < count; i++) {
2421     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2422     guint8 fractionlost;
2423     gint32 packetslost;
2424     RTPSource *src;
2425
2426     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2427         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2428
2429     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2430
2431     /* find our own source */
2432     src = find_source (sess, ssrc);
2433     if (src == NULL)
2434       continue;
2435
2436     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2437       /* only deal with report blocks for our session, we update the stats of
2438        * the sender of the RTCP message. We could also compare our stats against
2439        * the other sender to see if we are better or worse. */
2440       /* FIXME, need to keep track who the RB block is from */
2441       rtp_source_process_rb (source, ssrc, pinfo->ntpnstime, fractionlost,
2442           packetslost, exthighestseq, jitter, lsr, dlsr);
2443     }
2444   }
2445   on_ssrc_active (sess, source);
2446 }
2447
2448 /* A Sender report contains statistics about how the sender is doing. This
2449  * includes timing informataion such as the relation between RTP and NTP
2450  * timestamps and the number of packets/bytes it sent to us.
2451  *
2452  * In this report is also included a set of report blocks related to how this
2453  * sender is receiving data (in case we (or somebody else) is also sending stuff
2454  * to it). This info includes the packet loss, jitter and seqnum. It also
2455  * contains information to calculate the round trip time (LSR/DLSR).
2456  */
2457 static void
2458 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2459     RTPPacketInfo * pinfo, gboolean * do_sync)
2460 {
2461   guint32 senderssrc, rtptime, packet_count, octet_count;
2462   guint64 ntptime;
2463   RTPSource *source;
2464   gboolean created, prevsender;
2465
2466   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2467       &packet_count, &octet_count);
2468
2469   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2470       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2471
2472   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2473   if (!source)
2474     return;
2475
2476   /* skip non-bye packets for sources that are marked BYE */
2477   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2478     goto out;
2479
2480   /* don't try to do lip-sync for sources that sent a BYE */
2481   if (RTP_SOURCE_IS_MARKED_BYE (source))
2482     *do_sync = FALSE;
2483   else
2484     *do_sync = TRUE;
2485
2486   prevsender = RTP_SOURCE_IS_SENDER (source);
2487
2488   /* first update the source */
2489   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2490       packet_count, octet_count);
2491
2492   source_update_sender (sess, source, prevsender);
2493
2494   if (created)
2495     on_new_ssrc (sess, source);
2496
2497   rtp_session_process_rb (sess, source, packet, pinfo);
2498
2499 out:
2500   g_object_unref (source);
2501 }
2502
2503 /* A receiver report contains statistics about how a receiver is doing. It
2504  * includes stuff like packet loss, jitter and the seqnum it received last. It
2505  * also contains info to calculate the round trip time.
2506  *
2507  * We are only interested in how the sender of this report is doing wrt to us.
2508  */
2509 static void
2510 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2511     RTPPacketInfo * pinfo)
2512 {
2513   guint32 senderssrc;
2514   RTPSource *source;
2515   gboolean created;
2516
2517   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2518
2519   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2520
2521   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2522   if (!source)
2523     return;
2524
2525   /* skip non-bye packets for sources that are marked BYE */
2526   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2527     goto out;
2528
2529   if (created)
2530     on_new_ssrc (sess, source);
2531
2532   rtp_session_process_rb (sess, source, packet, pinfo);
2533
2534 out:
2535   g_object_unref (source);
2536 }
2537
2538 /* Get SDES items and store them in the SSRC */
2539 static void
2540 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2541     RTPPacketInfo * pinfo)
2542 {
2543   guint items, i, j;
2544   gboolean more_items, more_entries;
2545
2546   items = gst_rtcp_packet_sdes_get_item_count (packet);
2547   GST_DEBUG ("got SDES packet with %d items", items);
2548
2549   more_items = gst_rtcp_packet_sdes_first_item (packet);
2550   i = 0;
2551   while (more_items) {
2552     guint32 ssrc;
2553     gboolean changed, created, prevactive;
2554     RTPSource *source;
2555     GstStructure *sdes;
2556
2557     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2558
2559     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2560
2561     changed = FALSE;
2562
2563     /* find src, no probation when dealing with RTCP */
2564     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2565     if (!source)
2566       return;
2567
2568     /* skip non-bye packets for sources that are marked BYE */
2569     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2570       goto next;
2571
2572     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2573
2574     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2575     j = 0;
2576     while (more_entries) {
2577       GstRTCPSDESType type;
2578       guint8 len;
2579       guint8 *data;
2580       gchar *name;
2581       gchar *value;
2582
2583       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2584
2585       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2586           data);
2587
2588       if (type == GST_RTCP_SDES_PRIV) {
2589         name = g_strndup ((const gchar *) &data[1], data[0]);
2590         len -= data[0] + 1;
2591         data += data[0] + 1;
2592       } else {
2593         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2594       }
2595
2596       value = g_strndup ((const gchar *) data, len);
2597
2598       if (g_utf8_validate (value, -1, NULL)) {
2599         gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2600       } else {
2601         GST_WARNING ("ignore SDES field %s with non-utf8 data %s", name, value);
2602       }
2603
2604       g_free (name);
2605       g_free (value);
2606
2607       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2608       j++;
2609     }
2610
2611     /* takes ownership of sdes */
2612     changed = rtp_source_set_sdes_struct (source, sdes);
2613
2614     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2615     source->validated = TRUE;
2616
2617     if (created)
2618       on_new_ssrc (sess, source);
2619
2620     /* source became active */
2621     if (source_update_active (sess, source, prevactive))
2622       on_ssrc_validated (sess, source);
2623
2624     if (changed)
2625       on_ssrc_sdes (sess, source);
2626
2627   next:
2628     g_object_unref (source);
2629
2630     more_items = gst_rtcp_packet_sdes_next_item (packet);
2631     i++;
2632   }
2633 }
2634
2635 /* BYE is sent when a client leaves the session
2636  */
2637 static void
2638 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2639     RTPPacketInfo * pinfo)
2640 {
2641   guint count, i;
2642   gchar *reason;
2643   gboolean reconsider = FALSE;
2644
2645   reason = gst_rtcp_packet_bye_get_reason (packet);
2646   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2647
2648   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2649   for (i = 0; i < count; i++) {
2650     guint32 ssrc;
2651     RTPSource *source;
2652     gboolean prevactive, prevsender;
2653     guint pmembers, members;
2654
2655     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2656     GST_DEBUG ("SSRC: %08x", ssrc);
2657
2658     /* find src and mark bye, no probation when dealing with RTCP */
2659     source = find_source (sess, ssrc);
2660     if (!source || source->internal) {
2661       GST_DEBUG ("Ignoring suspicious BYE packet (reason: %s)",
2662           !source ? "can't find source" : "has internal source SSRC");
2663       break;
2664     }
2665
2666     /* store time for when we need to time out this source */
2667     source->bye_time = pinfo->current_time;
2668
2669     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2670     prevsender = RTP_SOURCE_IS_SENDER (source);
2671
2672     /* mark the source BYE */
2673     rtp_source_mark_bye (source, reason);
2674
2675     pmembers = sess->stats.active_sources;
2676
2677     source_update_active (sess, source, prevactive);
2678     source_update_sender (sess, source, prevsender);
2679
2680     members = sess->stats.active_sources;
2681
2682     if (!sess->scheduled_bye && members < pmembers) {
2683       /* some members went away since the previous timeout estimate.
2684        * Perform reverse reconsideration but only when we are not scheduling a
2685        * BYE ourselves. */
2686       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2687           pinfo->current_time < sess->next_rtcp_check_time) {
2688         GstClockTime time_remaining;
2689
2690         /* Scale our next RTCP check time according to the change of numbers
2691          * of members. But only if a) this is the first RTCP, or b) this is not
2692          * a feedback session, or c) this is a feedback session but we schedule
2693          * for every RTCP interval (aka no t-rr-interval set).
2694          *
2695          * FIXME: a) and b) are not great as we will possibly go below Tmin
2696          * for non-feedback profiles and in case of a) below
2697          * Tmin/t-rr-interval in any case.
2698          */
2699         if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE ||
2700             !(sess->rtp_profile == GST_RTP_PROFILE_AVPF
2701                 || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) ||
2702             sess->next_rtcp_check_time - sess->last_rtcp_send_time ==
2703             sess->last_rtcp_interval) {
2704           time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2705           sess->next_rtcp_check_time =
2706               gst_util_uint64_scale (time_remaining, members, pmembers);
2707           sess->next_rtcp_check_time += pinfo->current_time;
2708         }
2709         sess->last_rtcp_interval =
2710             gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers);
2711
2712         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2713             GST_TIME_ARGS (sess->next_rtcp_check_time));
2714
2715         /* mark pending reconsider. We only want to signal the reconsideration
2716          * once after we handled all the source in the bye packet */
2717         reconsider = TRUE;
2718       }
2719     }
2720
2721     on_bye_ssrc (sess, source);
2722   }
2723   if (reconsider) {
2724     RTP_SESSION_UNLOCK (sess);
2725     /* notify app of reconsideration */
2726     if (sess->callbacks.reconsider)
2727       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2728     RTP_SESSION_LOCK (sess);
2729   }
2730
2731   g_free (reason);
2732 }
2733
2734 static void
2735 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2736     RTPPacketInfo * pinfo)
2737 {
2738   GST_DEBUG ("received APP");
2739
2740   if (g_signal_has_handler_pending (sess,
2741           rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, TRUE)) {
2742     GstBuffer *data_buffer = NULL;
2743     guint16 data_length;
2744     gchar name[5];
2745
2746     data_length = gst_rtcp_packet_app_get_data_length (packet) * 4;
2747     if (data_length > 0) {
2748       guint8 *data = gst_rtcp_packet_app_get_data (packet);
2749       data_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2750           GST_BUFFER_COPY_MEMORY, data - packet->rtcp->map.data, data_length);
2751       GST_BUFFER_PTS (data_buffer) = pinfo->running_time;
2752     }
2753
2754     memcpy (name, gst_rtcp_packet_app_get_name (packet), 4);
2755     name[4] = '\0';
2756
2757     RTP_SESSION_UNLOCK (sess);
2758     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_APP_RTCP], 0,
2759         gst_rtcp_packet_app_get_subtype (packet),
2760         gst_rtcp_packet_app_get_ssrc (packet), name, data_buffer);
2761     RTP_SESSION_LOCK (sess);
2762
2763     if (data_buffer)
2764       gst_buffer_unref (data_buffer);
2765   }
2766 }
2767
2768 static gboolean
2769 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2770     const guint32 * ssrcs, guint num_ssrcs, gboolean fir,
2771     GstClockTime current_time)
2772 {
2773   guint32 round_trip = 0;
2774   gint i;
2775
2776   g_return_val_if_fail (ssrcs != NULL && num_ssrcs > 0, FALSE);
2777
2778   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
2779       &round_trip);
2780
2781   if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2782     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2783         GST_SECOND, 65536);
2784
2785     /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP
2786      * packets with erroneous values resulting in crazy high RTT. */
2787     if (round_trip_in_ns > 5 * GST_SECOND)
2788       round_trip_in_ns = GST_SECOND / 2;
2789
2790     if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) {
2791       GST_DEBUG ("Ignoring %s request from %X because one was send without one "
2792           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2793           fir ? "FIR" : "PLI", rtp_source_get_ssrc (src),
2794           GST_TIME_ARGS (current_time - src->last_keyframe_request),
2795           GST_TIME_ARGS (round_trip_in_ns));
2796       return FALSE;
2797     }
2798   }
2799
2800   src->last_keyframe_request = current_time;
2801
2802   for (i = 0; i < num_ssrcs; ++i) {
2803     GST_LOG ("received %s request from %X about %X %p(%p)",
2804         fir ? "FIR" : "PLI",
2805         rtp_source_get_ssrc (src), ssrcs[i], sess->callbacks.process_rtp,
2806         sess->callbacks.request_key_unit);
2807
2808     RTP_SESSION_UNLOCK (sess);
2809     sess->callbacks.request_key_unit (sess, ssrcs[i], fir,
2810         sess->request_key_unit_user_data);
2811     RTP_SESSION_LOCK (sess);
2812   }
2813
2814   return TRUE;
2815 }
2816
2817 static void
2818 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2819     guint32 media_ssrc, GstClockTime current_time)
2820 {
2821   RTPSource *src;
2822
2823   if (!sess->callbacks.request_key_unit)
2824     return;
2825
2826   src = find_source (sess, sender_ssrc);
2827   if (src == NULL) {
2828     /* try to find a src with media_ssrc instead */
2829     src = find_source (sess, media_ssrc);
2830     if (src == NULL)
2831       return;
2832   }
2833
2834   rtp_session_request_local_key_unit (sess, src, &media_ssrc, 1, FALSE,
2835       current_time);
2836 }
2837
2838 static void
2839 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2840     guint8 * fci_data, guint fci_length, GstClockTime current_time)
2841 {
2842   RTPSource *src;
2843   guint32 ssrc;
2844   guint position = 0;
2845   guint32 ssrcs[32];
2846   guint num_ssrcs = 0;
2847
2848   if (!sess->callbacks.request_key_unit)
2849     return;
2850
2851   if (fci_length < 8)
2852     return;
2853
2854   src = find_source (sess, sender_ssrc);
2855
2856   /* Hack because Google fails to set the sender_ssrc correctly */
2857   if (!src && sender_ssrc == 1) {
2858     GHashTableIter iter;
2859
2860     /* we can't find the source if there are multiple */
2861     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2862       return;
2863
2864     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2865     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2866       if (!src->internal && rtp_source_is_sender (src))
2867         break;
2868       src = NULL;
2869     }
2870   }
2871   if (!src)
2872     return;
2873
2874   for (position = 0; position < fci_length; position += 8) {
2875     guint8 *data = fci_data + position;
2876     RTPSource *own;
2877
2878     ssrc = GST_READ_UINT32_BE (data);
2879
2880     own = find_source (sess, ssrc);
2881     if (own == NULL)
2882       continue;
2883
2884     if (own->internal && num_ssrcs < 32) {
2885       ssrcs[num_ssrcs++] = ssrc;
2886     }
2887   }
2888   if (num_ssrcs == 0)
2889     return;
2890
2891   rtp_session_request_local_key_unit (sess, src, ssrcs, num_ssrcs, TRUE,
2892       current_time);
2893 }
2894
2895 static void
2896 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2897     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2898     GstClockTime current_time)
2899 {
2900   sess->stats.nacks_received++;
2901
2902   if (!sess->callbacks.notify_nack)
2903     return;
2904
2905   while (fci_length > 0) {
2906     guint16 seqnum, blp;
2907
2908     seqnum = GST_READ_UINT16_BE (fci_data);
2909     blp = GST_READ_UINT16_BE (fci_data + 2);
2910
2911     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2912
2913     RTP_SESSION_UNLOCK (sess);
2914     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2915         sess->notify_nack_user_data);
2916     RTP_SESSION_LOCK (sess);
2917
2918     fci_data += 4;
2919     fci_length -= 4;
2920   }
2921 }
2922
2923 static void
2924 rtp_session_process_sr_req (RTPSession * sess, guint32 sender_ssrc,
2925     guint32 media_ssrc)
2926 {
2927   RTPSource *src;
2928
2929   /* Request a new SR in feedback profiles ASAP */
2930   if (sess->rtp_profile != GST_RTP_PROFILE_AVPF
2931       && sess->rtp_profile != GST_RTP_PROFILE_SAVPF)
2932     return;
2933
2934   src = find_source (sess, sender_ssrc);
2935   /* Our own RTCP packet */
2936   if (src && src->internal)
2937     return;
2938
2939   src = find_source (sess, media_ssrc);
2940   /* Not an SSRC we're producing */
2941   if (!src || !src->internal)
2942     return;
2943
2944   GST_DEBUG_OBJECT (sess, "Handling RTCP-SR-REQ");
2945   /* FIXME: 5s max_delay hard-coded here as we have to give some
2946    * high enough value */
2947   sess->sr_req_pending = TRUE;
2948   rtp_session_send_rtcp (sess, 5 * GST_SECOND);
2949 }
2950
2951 static void
2952 rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc,
2953     guint32 media_ssrc, guint8 * fci_data, guint fci_length)
2954 {
2955   GArray *twcc_packets;
2956   GstStructure *twcc_packets_s;
2957   GstStructure *twcc_stats_s;
2958
2959   twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc,
2960       fci_data, fci_length * sizeof (guint32));
2961   if (twcc_packets == NULL)
2962     return;
2963
2964   twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets);
2965   twcc_stats_s =
2966       rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets);
2967
2968   GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s);
2969   GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s);
2970
2971   g_array_unref (twcc_packets);
2972
2973   RTP_SESSION_UNLOCK (sess);
2974   if (sess->callbacks.notify_twcc)
2975     sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s,
2976         sess->notify_twcc_user_data);
2977   RTP_SESSION_LOCK (sess);
2978 }
2979
2980 static void
2981 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2982     RTPPacketInfo * pinfo, GstClockTime current_time)
2983 {
2984   GstRTCPType type;
2985   GstRTCPFBType fbtype;
2986   guint32 sender_ssrc, media_ssrc;
2987   guint8 *fci_data;
2988   guint fci_length;
2989   RTPSource *src;
2990
2991   /* The feedback packet must include both sender SSRC and media SSRC */
2992   if (packet->length < 2)
2993     return;
2994
2995   type = gst_rtcp_packet_get_type (packet);
2996   fbtype = gst_rtcp_packet_fb_get_type (packet);
2997   sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2998   media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2999
3000   src = find_source (sess, media_ssrc);
3001
3002   /* skip non-bye packets for sources that are marked BYE */
3003   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
3004     return;
3005
3006   if (src)
3007     g_object_ref (src);
3008
3009   fci_data = gst_rtcp_packet_fb_get_fci (packet);
3010   fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32);
3011
3012   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
3013       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
3014
3015   if (g_signal_has_handler_pending (sess,
3016           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
3017     GstBuffer *fci_buffer = NULL;
3018
3019     if (fci_length > 0) {
3020       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
3021           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
3022           fci_length);
3023       GST_BUFFER_PTS (fci_buffer) = pinfo->running_time;
3024     }
3025
3026     RTP_SESSION_UNLOCK (sess);
3027     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
3028         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
3029     RTP_SESSION_LOCK (sess);
3030
3031     if (fci_buffer)
3032       gst_buffer_unref (fci_buffer);
3033   }
3034
3035   if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) {
3036     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
3037   }
3038
3039   if ((src && src->internal) ||
3040       /* PSFB FIR puts the media ssrc inside the FCI */
3041       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) ||
3042       /* TWCC is for all sources, so a single media-ssrc is not enough */
3043       (type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) {
3044     switch (type) {
3045       case GST_RTCP_TYPE_PSFB:
3046         switch (fbtype) {
3047           case GST_RTCP_PSFB_TYPE_PLI:
3048             if (src)
3049               src->stats.recv_pli_count++;
3050             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
3051                 current_time);
3052             break;
3053           case GST_RTCP_PSFB_TYPE_FIR:
3054             if (src)
3055               src->stats.recv_fir_count++;
3056             rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
3057                 current_time);
3058             break;
3059           default:
3060             break;
3061         }
3062         break;
3063       case GST_RTCP_TYPE_RTPFB:
3064         switch (fbtype) {
3065           case GST_RTCP_RTPFB_TYPE_NACK:
3066             if (src)
3067               src->stats.recv_nack_count++;
3068             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
3069                 fci_data, fci_length, current_time);
3070             break;
3071           case GST_RTCP_RTPFB_TYPE_RTCP_SR_REQ:
3072             rtp_session_process_sr_req (sess, sender_ssrc, media_ssrc);
3073             break;
3074           case GST_RTCP_RTPFB_TYPE_TWCC:
3075             rtp_session_process_twcc (sess, sender_ssrc, media_ssrc,
3076                 fci_data, fci_length);
3077             break;
3078           default:
3079             break;
3080         }
3081       default:
3082         break;
3083     }
3084   }
3085
3086   if (src)
3087     g_object_unref (src);
3088 }
3089
3090 /**
3091  * rtp_session_process_rtcp:
3092  * @sess: and #RTPSession
3093  * @buffer: an RTCP buffer
3094  * @current_time: the current system time
3095  * @ntpnstime: the current NTP time in nanoseconds
3096  *
3097  * Process an RTCP buffer in the session manager. This function takes ownership
3098  * of @buffer.
3099  *
3100  * Returns: a #GstFlowReturn.
3101  */
3102 GstFlowReturn
3103 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
3104     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
3105 {
3106   GstRTCPPacket packet;
3107   gboolean more, is_bye = FALSE, do_sync = FALSE, has_report = FALSE;
3108   RTPPacketInfo pinfo = { 0, };
3109   GstFlowReturn result = GST_FLOW_OK;
3110   GstRTCPBuffer rtcp = { NULL, };
3111
3112   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3113   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3114
3115   if (!gst_rtcp_buffer_validate_reduced (buffer))
3116     goto invalid_packet;
3117
3118   GST_DEBUG ("received RTCP packet");
3119
3120   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
3121       buffer);
3122
3123   RTP_SESSION_LOCK (sess);
3124   /* update pinfo stats */
3125   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
3126       running_time, ntpnstime);
3127
3128   /* start processing the compound packet */
3129   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3130   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
3131   while (more) {
3132     GstRTCPType type;
3133
3134     type = gst_rtcp_packet_get_type (&packet);
3135
3136     switch (type) {
3137       case GST_RTCP_TYPE_SR:
3138         has_report = TRUE;
3139         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
3140         break;
3141       case GST_RTCP_TYPE_RR:
3142         has_report = TRUE;
3143         rtp_session_process_rr (sess, &packet, &pinfo);
3144         break;
3145       case GST_RTCP_TYPE_SDES:
3146         rtp_session_process_sdes (sess, &packet, &pinfo);
3147         break;
3148       case GST_RTCP_TYPE_BYE:
3149         is_bye = TRUE;
3150         /* don't try to attempt lip-sync anymore for streams with a BYE */
3151         do_sync = FALSE;
3152         rtp_session_process_bye (sess, &packet, &pinfo);
3153         break;
3154       case GST_RTCP_TYPE_APP:
3155         rtp_session_process_app (sess, &packet, &pinfo);
3156         break;
3157       case GST_RTCP_TYPE_RTPFB:
3158       case GST_RTCP_TYPE_PSFB:
3159         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
3160         break;
3161       case GST_RTCP_TYPE_XR:
3162         /* FIXME: This block is added to downgrade warning level.
3163          * Once the parser is implemented, it should be replaced with
3164          * a proper process function. */
3165         GST_DEBUG ("got RTCP XR packet, but ignored");
3166         break;
3167       default:
3168         GST_WARNING ("got unknown RTCP packet type: %d", type);
3169         break;
3170     }
3171     more = gst_rtcp_packet_move_to_next (&packet);
3172   }
3173
3174   gst_rtcp_buffer_unmap (&rtcp);
3175
3176   /* if we are scheduling a BYE, we only want to count bye packets, else we
3177    * count everything */
3178   if (sess->scheduled_bye && is_bye) {
3179     sess->bye_stats.bye_members++;
3180     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
3181   }
3182
3183   /* keep track of average packet size */
3184   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3185
3186   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
3187       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3188   RTP_SESSION_UNLOCK (sess);
3189
3190   if (has_report) {
3191     g_object_notify_by_pspec (G_OBJECT (sess), properties[PROP_STATS]);
3192   }
3193
3194   pinfo.data = NULL;
3195   clean_packet_info (&pinfo);
3196
3197   /* notify caller of sr packets in the callback */
3198   if (do_sync && sess->callbacks.sync_rtcp) {
3199     result = sess->callbacks.sync_rtcp (sess, buffer,
3200         sess->sync_rtcp_user_data);
3201   } else
3202     gst_buffer_unref (buffer);
3203
3204   return result;
3205
3206   /* ERRORS */
3207 invalid_packet:
3208   {
3209     GST_DEBUG ("invalid RTCP packet received");
3210     gst_buffer_unref (buffer);
3211     return GST_FLOW_OK;
3212   }
3213 }
3214
3215 /**
3216  * rtp_session_update_send_caps:
3217  * @sess: an #RTPSession
3218  * @caps: a #GstCaps
3219  *
3220  * Update the caps of the sender in the rtp session.
3221  */
3222 void
3223 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
3224 {
3225   GstStructure *s;
3226   guint ssrc;
3227
3228   g_return_if_fail (RTP_IS_SESSION (sess));
3229   g_return_if_fail (GST_IS_CAPS (caps));
3230
3231   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
3232
3233   s = gst_caps_get_structure (caps, 0);
3234
3235   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
3236     RTPSource *source;
3237     gboolean created;
3238
3239     RTP_SESSION_LOCK (sess);
3240     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3241     sess->suggested_ssrc = ssrc;
3242     sess->internal_ssrc_set = TRUE;
3243     sess->internal_ssrc_from_caps_or_property = TRUE;
3244     if (source) {
3245       rtp_source_update_send_caps (source, caps);
3246
3247       if (created)
3248         on_new_sender_ssrc (sess, source);
3249
3250       g_object_unref (source);
3251     }
3252
3253     if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
3254       source =
3255           obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3256       if (source) {
3257         rtp_source_update_send_caps (source, caps);
3258
3259         if (created)
3260           on_new_sender_ssrc (sess, source);
3261
3262         g_object_unref (source);
3263       }
3264     }
3265     RTP_SESSION_UNLOCK (sess);
3266   } else {
3267     sess->internal_ssrc_from_caps_or_property = FALSE;
3268   }
3269
3270   sess->send_ntp64_ext_id =
3271       gst_rtp_get_extmap_id_for_attribute (s,
3272       GST_RTP_HDREXT_BASE GST_RTP_HDREXT_NTP_64);
3273
3274   rtp_twcc_manager_parse_send_ext_id (sess->twcc, s);
3275 }
3276
3277 static void
3278 update_ntp64_header_ext_data (RTPPacketInfo * pinfo, GstBuffer * buffer)
3279 {
3280   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
3281
3282   if (gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtp)) {
3283     guint16 bits;
3284     guint8 *data;
3285     guint wordlen;
3286
3287     if (gst_rtp_buffer_get_extension_data (&rtp, &bits, (gpointer *) & data,
3288             &wordlen)) {
3289       gsize len = wordlen * 4;
3290
3291       /* One-byte header */
3292       if (bits == 0xBEDE) {
3293         /* One-byte header extension */
3294         while (TRUE) {
3295           guint8 ext_id, ext_len;
3296
3297           if (len < 1)
3298             break;
3299
3300           ext_id = GST_READ_UINT8 (data) >> 4;
3301           ext_len = (GST_READ_UINT8 (data) & 0xF) + 1;
3302           data += 1;
3303           len -= 1;
3304           if (ext_id == 0) {
3305             /* Skip padding */
3306             continue;
3307           } else if (ext_id == 15) {
3308             /* Stop parsing */
3309             break;
3310           }
3311
3312           /* extension doesn't fit into the header */
3313           if (ext_len > len)
3314             break;
3315
3316           if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
3317             if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
3318               guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
3319                   G_GUINT64_CONSTANT (1) << 32,
3320                   GST_SECOND);
3321
3322               GST_WRITE_UINT64_BE (data, ntptime);
3323             } else {
3324               /* Replace extension with padding */
3325               memset (data - 1, 0, 1 + ext_len);
3326             }
3327           }
3328
3329           /* skip to the next extension */
3330           data += ext_len;
3331           len -= ext_len;
3332         }
3333       } else if ((bits >> 4) == 0x100) {
3334         /* Two-byte header extension */
3335
3336         while (TRUE) {
3337           guint8 ext_id, ext_len;
3338
3339           if (len < 1)
3340             break;
3341
3342           ext_id = GST_READ_UINT8 (data);
3343           data += 1;
3344           len -= 1;
3345           if (ext_id == 0) {
3346             /* Skip padding */
3347             continue;
3348           }
3349
3350           ext_len = GST_READ_UINT8 (data);
3351           data += 1;
3352           len -= 1;
3353
3354           /* extension doesn't fit into the header */
3355           if (ext_len > len)
3356             break;
3357
3358           if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
3359             if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
3360               guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
3361                   G_GUINT64_CONSTANT (1) << 32,
3362                   GST_SECOND);
3363
3364               GST_WRITE_UINT64_BE (data, ntptime);
3365             } else {
3366               /* Replace extension with padding */
3367               memset (data - 2, 0, 2 + ext_len);
3368             }
3369           }
3370
3371           /* skip to the next extension */
3372           data += ext_len;
3373           len -= ext_len;
3374         }
3375       }
3376     }
3377     gst_rtp_buffer_unmap (&rtp);
3378   }
3379 }
3380
3381 static void
3382 update_ntp64_header_ext (RTPPacketInfo * pinfo)
3383 {
3384   /* Early return if we don't know the header extension id or the packets
3385    * don't contain the header extension */
3386   if (pinfo->ntp64_ext_id == 0 || !pinfo->have_ntp64_ext)
3387     return;
3388
3389   /* If no NTP time is known then the header extension will be replaced with
3390    * padding, otherwise it will be updated */
3391   GST_TRACE
3392       ("Updating NTP-64 header extension for SSRC %08x packet with RTP time %u and running time %"
3393       GST_TIME_FORMAT " to %" GST_TIME_FORMAT, pinfo->ssrc, pinfo->rtptime,
3394       GST_TIME_ARGS (pinfo->running_time), GST_TIME_ARGS (pinfo->ntpnstime));
3395
3396   if (GST_IS_BUFFER_LIST (pinfo->data)) {
3397     GstBufferList *list;
3398     guint i = 0;
3399
3400     pinfo->data = gst_buffer_list_make_writable (pinfo->data);
3401
3402     list = GST_BUFFER_LIST (pinfo->data);
3403
3404     for (i = 0; i < gst_buffer_list_length (list); i++) {
3405       GstBuffer *buffer = gst_buffer_list_get_writable (list, i);
3406
3407       update_ntp64_header_ext_data (pinfo, buffer);
3408     }
3409   } else {
3410     pinfo->data = gst_buffer_make_writable (pinfo->data);
3411     update_ntp64_header_ext_data (pinfo, pinfo->data);
3412   }
3413 }
3414
3415 /**
3416  * rtp_session_send_rtp:
3417  * @sess: an #RTPSession
3418  * @data: pointer to either an RTP buffer or a list of RTP buffers
3419  * @is_list: TRUE when @data is a buffer list
3420  * @current_time: the current system time
3421  * @running_time: the running time of @data
3422  *
3423  * Send the RTP data (a buffer or buffer list) in the session manager. This
3424  * function takes ownership of @data.
3425  *
3426  * Returns: a #GstFlowReturn.
3427  */
3428 GstFlowReturn
3429 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
3430     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
3431 {
3432   GstFlowReturn result;
3433   RTPSource *source;
3434   gboolean prevsender;
3435   guint64 oldrate;
3436   RTPPacketInfo pinfo = { 0, };
3437   gboolean created;
3438
3439   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3440   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
3441
3442   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
3443
3444   RTP_SESSION_LOCK (sess);
3445   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
3446           current_time, running_time, ntpnstime))
3447     goto invalid_packet;
3448
3449   /* Update any 64-bit NTP header extensions with the actual NTP time here */
3450   if (sess->update_ntp64_header_ext)
3451     update_ntp64_header_ext (&pinfo);
3452
3453   rtp_twcc_manager_send_packet (sess->twcc, &pinfo);
3454
3455   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
3456   if (created)
3457     on_new_sender_ssrc (sess, source);
3458
3459   if (!source->internal) {
3460     GSocketAddress *from;
3461
3462     if (source->rtp_from)
3463       from = source->rtp_from;
3464     else
3465       from = source->rtcp_from;
3466     if (from) {
3467       if (rtp_session_find_conflicting_address (sess, from, current_time)) {
3468         /* Its a known conflict, its probably a loop, not a collision
3469          * lets just drop the incoming packet
3470          */
3471         GST_LOG ("Our packets are being looped back to us, ignoring collision");
3472       } else {
3473         GST_DEBUG ("Collision for SSRC %x, change our sender ssrc", pinfo.ssrc);
3474
3475         rtp_session_have_conflict (sess, source, from, current_time);
3476       }
3477     } else {
3478       GST_LOG ("Ignoring collision on sent SSRC %x because remote source"
3479           " doesn't have an address", pinfo.ssrc);
3480     }
3481
3482     /* the the sending source is not internal, we have to drop the packet,
3483        or else we will end up receving it ourselves! */
3484     goto collision;
3485   }
3486
3487   prevsender = RTP_SOURCE_IS_SENDER (source);
3488   oldrate = source->bitrate;
3489
3490   /* we use our own source to send */
3491   result = rtp_source_send_rtp (source, &pinfo);
3492
3493   source_update_sender (sess, source, prevsender);
3494
3495   if (oldrate != source->bitrate)
3496     sess->recalc_bandwidth = TRUE;
3497   RTP_SESSION_UNLOCK (sess);
3498
3499   g_object_unref (source);
3500   clean_packet_info (&pinfo);
3501
3502   return result;
3503
3504 invalid_packet:
3505   {
3506     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
3507     RTP_SESSION_UNLOCK (sess);
3508     GST_DEBUG ("invalid RTP packet received");
3509     return GST_FLOW_OK;
3510   }
3511 collision:
3512   {
3513     g_object_unref (source);
3514     clean_packet_info (&pinfo);
3515     RTP_SESSION_UNLOCK (sess);
3516     GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
3517         pinfo.ssrc);
3518     return GST_FLOW_OK;
3519   }
3520 }
3521
3522 static void
3523 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
3524 {
3525   *bandwidth += source->bitrate;
3526 }
3527
3528 /* must be called with session lock */
3529 static GstClockTime
3530 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
3531     gboolean first)
3532 {
3533   GstClockTime result;
3534   RTPSessionStats *stats;
3535
3536   /* recalculate bandwidth when it changed */
3537   if (sess->recalc_bandwidth) {
3538     gdouble bandwidth;
3539
3540     if (sess->bandwidth > 0)
3541       bandwidth = sess->bandwidth;
3542     else {
3543       /* If it is <= 0, then try to estimate the actual bandwidth */
3544       bandwidth = 0;
3545
3546       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3547           (GHFunc) add_bitrates, &bandwidth);
3548     }
3549     if (bandwidth < RTP_STATS_BANDWIDTH)
3550       bandwidth = RTP_STATS_BANDWIDTH;
3551
3552     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
3553         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
3554
3555     sess->recalc_bandwidth = FALSE;
3556   }
3557
3558   if (sess->scheduled_bye) {
3559     stats = &sess->bye_stats;
3560     result = rtp_stats_calculate_bye_interval (stats);
3561   } else {
3562     session_update_ptp (sess);
3563
3564     stats = &sess->stats;
3565     result = rtp_stats_calculate_rtcp_interval (stats,
3566         stats->internal_sender_sources > 0, sess->rtp_profile,
3567         sess->is_doing_ptp, first);
3568   }
3569
3570   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
3571       GST_TIME_ARGS (result), first);
3572
3573   if (!deterministic && result != GST_CLOCK_TIME_NONE)
3574     result = rtp_stats_add_rtcp_jitter (stats, result);
3575
3576   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
3577
3578   return result;
3579 }
3580
3581 static void
3582 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
3583 {
3584   if (source->internal)
3585     rtp_source_mark_bye (source, reason);
3586 }
3587
3588 /**
3589  * rtp_session_mark_all_bye:
3590  * @sess: an #RTPSession
3591  * @reason: a reason
3592  *
3593  * Mark all internal sources of the session as BYE with @reason.
3594  */
3595 void
3596 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
3597 {
3598   g_return_if_fail (RTP_IS_SESSION (sess));
3599
3600   RTP_SESSION_LOCK (sess);
3601   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3602       (GHFunc) source_mark_bye, (gpointer) reason);
3603   RTP_SESSION_UNLOCK (sess);
3604 }
3605
3606 /* Stop the current @sess and schedule a BYE message for the other members.
3607  * One must have the session lock to call this function
3608  */
3609 static GstFlowReturn
3610 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
3611 {
3612   GstFlowReturn result = GST_FLOW_OK;
3613   GstClockTime interval;
3614
3615   /* nothing to do it we already scheduled bye */
3616   if (sess->scheduled_bye)
3617     goto done;
3618
3619   /* we schedule BYE now */
3620   sess->scheduled_bye = TRUE;
3621   /* at least one member wants to send a BYE */
3622   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
3623   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
3624   sess->bye_stats.bye_members = 1;
3625   sess->first_rtcp = TRUE;
3626
3627   /* reschedule transmission */
3628   sess->last_rtcp_send_time = current_time;
3629   sess->last_rtcp_check_time = current_time;
3630   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3631
3632   if (interval != GST_CLOCK_TIME_NONE)
3633     sess->next_rtcp_check_time = current_time + interval;
3634   else
3635     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
3636   sess->last_rtcp_interval = interval;
3637
3638   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
3639       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
3640
3641   RTP_SESSION_UNLOCK (sess);
3642   /* notify app of reconsideration */
3643   if (sess->callbacks.reconsider)
3644     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3645   RTP_SESSION_LOCK (sess);
3646 done:
3647
3648   return result;
3649 }
3650
3651 /**
3652  * rtp_session_schedule_bye:
3653  * @sess: an #RTPSession
3654  * @current_time: the current system time
3655  *
3656  * Schedule a BYE message for all sources marked as BYE in @sess.
3657  *
3658  * Returns: a #GstFlowReturn.
3659  */
3660 GstFlowReturn
3661 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
3662 {
3663   GstFlowReturn result;
3664
3665   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3666
3667   RTP_SESSION_LOCK (sess);
3668   result = rtp_session_schedule_bye_locked (sess, current_time);
3669   RTP_SESSION_UNLOCK (sess);
3670
3671   return result;
3672 }
3673
3674 /**
3675  * rtp_session_next_timeout:
3676  * @sess: an #RTPSession
3677  * @current_time: the current system time
3678  *
3679  * Get the next time we should perform session maintenance tasks.
3680  *
3681  * Returns: a time when rtp_session_on_timeout() should be called with the
3682  * current system time.
3683  */
3684 GstClockTime
3685 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
3686 {
3687   GstClockTime result, interval = 0;
3688
3689   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
3690
3691   RTP_SESSION_LOCK (sess);
3692
3693   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3694     GST_DEBUG ("have early rtcp time");
3695     result = sess->next_early_rtcp_time;
3696     goto early_exit;
3697   }
3698
3699   result = sess->next_rtcp_check_time;
3700
3701   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3702       ", next time: %" GST_TIME_FORMAT,
3703       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3704
3705   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
3706     GST_DEBUG ("take current time as base");
3707     /* our previous check time expired, start counting from the current time
3708      * again. */
3709     result = current_time;
3710   }
3711
3712   if (sess->scheduled_bye) {
3713     if (sess->bye_stats.active_sources >= 50) {
3714       GST_DEBUG ("reconsider BYE, more than 50 sources");
3715       /* reconsider BYE if members >= 50 */
3716       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3717       sess->last_rtcp_interval = interval;
3718     }
3719   } else {
3720     if (sess->first_rtcp) {
3721       GST_DEBUG ("first RTCP packet");
3722       /* we are called for the first time */
3723       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3724       sess->last_rtcp_interval = interval;
3725     } else if (sess->next_rtcp_check_time < current_time) {
3726       GST_DEBUG ("old check time expired, getting new timeout");
3727       /* get a new timeout when we need to */
3728       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
3729       sess->last_rtcp_interval = interval;
3730
3731       if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
3732               || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
3733           && interval != GST_CLOCK_TIME_NONE) {
3734         /* Apply the rules from RFC 4585 section 3.5.3 */
3735         if (sess->stats.min_interval != 0) {
3736           GstClockTime T_rr_current_interval = g_random_double_range (0.5,
3737               1.5) * sess->stats.min_interval * GST_SECOND;
3738
3739           if (T_rr_current_interval > interval) {
3740             GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
3741                 " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
3742                 GST_TIME_ARGS (interval));
3743             interval = T_rr_current_interval;
3744           }
3745         }
3746       }
3747     }
3748   }
3749
3750   if (interval != GST_CLOCK_TIME_NONE)
3751     result += interval;
3752   else
3753     result = GST_CLOCK_TIME_NONE;
3754
3755   sess->next_rtcp_check_time = result;
3756
3757 early_exit:
3758
3759   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3760       ", next time: %" GST_TIME_FORMAT,
3761       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3762   RTP_SESSION_UNLOCK (sess);
3763
3764   return result;
3765 }
3766
3767 typedef struct
3768 {
3769   RTPSource *source;
3770   gboolean is_bye;
3771   GstBuffer *buffer;
3772 } ReportOutput;
3773
3774 typedef struct
3775 {
3776   GstRTCPBuffer rtcpbuf;
3777   RTPSession *sess;
3778   RTPSource *source;
3779   guint num_to_report;
3780   gboolean have_fir;
3781   gboolean have_pli;
3782   gboolean have_nack;
3783   GstBuffer *rtcp;
3784   GstClockTime current_time;
3785   guint64 ntpnstime;
3786   GstClockTime running_time;
3787   GstClockTime interval;
3788   GstRTCPPacket packet;
3789   gboolean has_sdes;
3790   gboolean is_early;
3791   gboolean may_suppress;
3792   GQueue output;
3793   guint nacked_seqnums;
3794 } ReportData;
3795
3796 static void
3797 session_start_rtcp (RTPSession * sess, ReportData * data)
3798 {
3799   GstRTCPPacket *packet = &data->packet;
3800   RTPSource *own = data->source;
3801   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3802
3803   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
3804   data->has_sdes = FALSE;
3805
3806   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3807
3808   if (RTP_SOURCE_IS_SENDER (own) && (!data->is_early || !sess->reduced_size_rtcp
3809           || sess->sr_req_pending)) {
3810     guint64 ntptime;
3811     guint32 rtptime;
3812     guint32 packet_count, octet_count;
3813
3814     sess->sr_req_pending = FALSE;
3815
3816     /* we are a sender, create SR */
3817     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3818     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3819
3820     /* get latest stats */
3821     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3822         &ntptime, &rtptime, &packet_count, &octet_count);
3823     /* store stats */
3824     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3825         packet_count, octet_count);
3826
3827     /* fill in sender report info */
3828     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3829         sess->timestamp_sender_reports ? ntptime : 0,
3830         sess->timestamp_sender_reports ? rtptime : 0,
3831         packet_count, octet_count);
3832   } else if (!data->is_early || !sess->reduced_size_rtcp) {
3833     /* we are only receiver, create RR */
3834     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3835     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3836     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3837   }
3838 }
3839
3840 /* construct a Sender or Receiver Report */
3841 static void
3842 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3843 {
3844   RTPSession *sess = data->sess;
3845   GstRTCPPacket *packet = &data->packet;
3846   guint8 fractionlost;
3847   gint32 packetslost;
3848   guint32 exthighestseq, jitter;
3849   guint32 lsr, dlsr;
3850
3851   /* don't report for sources in future generations */
3852   if (((gint16) (source->generation - sess->generation)) > 0) {
3853     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3854         source->generation, sess->generation);
3855     return;
3856   }
3857
3858   if (g_hash_table_contains (source->reported_in_sr_of,
3859           GUINT_TO_POINTER (data->source->ssrc))) {
3860     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3861     return;
3862   }
3863
3864   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3865     GST_DEBUG ("max RB count reached");
3866     return;
3867   }
3868
3869   /* only report about remote sources */
3870   if (source->internal)
3871     goto reported;
3872
3873   if (!RTP_SOURCE_IS_SENDER (source)) {
3874     GST_DEBUG ("source %08x not sender", source->ssrc);
3875     goto reported;
3876   }
3877
3878   if (source->disable_rtcp) {
3879     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
3880     goto reported;
3881   }
3882
3883   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3884
3885   /* get new stats */
3886   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3887       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3888
3889   /* store last generated RR packet */
3890   source->last_rr.is_valid = TRUE;
3891   source->last_rr.ssrc = data->source->ssrc;
3892   source->last_rr.fractionlost = fractionlost;
3893   source->last_rr.packetslost = packetslost;
3894   source->last_rr.exthighestseq = exthighestseq;
3895   source->last_rr.jitter = jitter;
3896   source->last_rr.lsr = lsr;
3897   source->last_rr.dlsr = dlsr;
3898
3899   /* packet is not yet filled, add report block for this source. */
3900   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3901       exthighestseq, jitter, lsr, dlsr);
3902
3903 reported:
3904   g_hash_table_add (source->reported_in_sr_of,
3905       GUINT_TO_POINTER (data->source->ssrc));
3906 }
3907
3908 /* construct FIR */
3909 static void
3910 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3911 {
3912   GstRTCPPacket *packet = &data->packet;
3913   guint16 len;
3914   guint8 *fci_data;
3915
3916   if (!source->send_fir)
3917     return;
3918
3919   len = gst_rtcp_packet_fb_get_fci_length (packet);
3920   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3921     /* exit because the packet is full, will put next request in a
3922      * further packet */
3923     return;
3924
3925   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3926
3927   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3928   fci_data += 4;
3929   fci_data[0] = source->current_send_fir_seqnum;
3930   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3931
3932   source->send_fir = FALSE;
3933   source->stats.sent_fir_count++;
3934 }
3935
3936 static void
3937 session_fir (RTPSession * sess, ReportData * data)
3938 {
3939   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3940   GstRTCPPacket *packet = &data->packet;
3941
3942   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3943     return;
3944
3945   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3946   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3947   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3948
3949   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3950       (GHFunc) session_add_fir, data);
3951
3952   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3953     gst_rtcp_packet_remove (packet);
3954   else
3955     data->may_suppress = FALSE;
3956 }
3957
3958 static gboolean
3959 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3960 {
3961   GstRTCPPacket packet;
3962   GstRTCPBuffer rtcp = { NULL, };
3963   gboolean ret = FALSE;
3964
3965   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3966
3967   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3968     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3969         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3970       ret = TRUE;
3971   }
3972
3973   gst_rtcp_buffer_unmap (&rtcp);
3974
3975   return ret;
3976 }
3977
3978 /* construct PLI */
3979 static void
3980 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3981 {
3982   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3983   GstRTCPPacket *packet = &data->packet;
3984
3985   if (!source->send_pli)
3986     return;
3987
3988   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3989     return;
3990
3991   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3992     /* exit because the packet is full, will put next request in a
3993      * further packet */
3994     return;
3995
3996   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
3997   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3998   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3999
4000   source->send_pli = FALSE;
4001   data->may_suppress = FALSE;
4002
4003   source->stats.sent_pli_count++;
4004 }
4005
4006 /* construct NACK */
4007 static void
4008 session_nack (const gchar * key, RTPSource * source, ReportData * data)
4009 {
4010   RTPSession *sess = data->sess;
4011   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4012   GstRTCPPacket *packet = &data->packet;
4013   guint16 *nacks;
4014   GstClockTime *nack_deadlines;
4015   guint n_nacks, i = 0;
4016   guint nacked_seqnums = 0;
4017   guint16 n_fb_nacks = 0;
4018   guint8 *fci_data;
4019
4020   if (!source->send_nack)
4021     return;
4022
4023   nacks = rtp_source_get_nacks (source, &n_nacks);
4024   nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
4025   GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
4026       GST_TIME_ARGS (data->current_time));
4027
4028   /* cleanup expired nacks */
4029   for (i = 0; i < n_nacks; i++) {
4030     GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
4031         GST_TIME_ARGS (nack_deadlines[i]));
4032     if (nack_deadlines[i] >= data->current_time)
4033       break;
4034   }
4035
4036   if (data->is_early) {
4037     /* don't remove them all if this is an early RTCP packet. It may happen
4038      * that the NACKs are late due to high RTT, not sending NACKs at all would
4039      * keep the RTX RTT stats high and maintain a dropping state. */
4040     i = MIN (n_nacks - 1, i);
4041   }
4042
4043   if (i) {
4044     GST_WARNING ("Removing %u expired NACKS", i);
4045     rtp_source_clear_nacks (source, i);
4046     n_nacks -= i;
4047     if (n_nacks == 0)
4048       return;
4049   }
4050
4051   /* allow overriding NACK to packet conversion */
4052   if (g_signal_has_handler_pending (sess,
4053           rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) {
4054     /* this is needed as it will actually resize the buffer */
4055     gst_rtcp_buffer_unmap (rtcp);
4056
4057     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0,
4058         data->source->ssrc, source->ssrc, source->nacks, data->rtcp,
4059         &nacked_seqnums);
4060
4061     /* and now remap for the remaining work */
4062     gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
4063
4064     if (nacked_seqnums > 0)
4065       goto done;
4066   }
4067
4068   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
4069     /* exit because the packet is full, will put next request in a
4070      * further packet */
4071     return;
4072
4073   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
4074   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
4075   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
4076
4077   if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
4078     gst_rtcp_packet_remove (packet);
4079     GST_WARNING ("no nacks fit in the packet");
4080     return;
4081   }
4082
4083   fci_data = gst_rtcp_packet_fb_get_fci (packet);
4084   for (i = 0; i < n_nacks; i = nacked_seqnums) {
4085     guint16 seqnum = nacks[i];
4086     guint16 blp = 0;
4087     guint j;
4088
4089     if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
4090       break;
4091
4092     n_fb_nacks++;
4093     nacked_seqnums++;
4094
4095     for (j = i + 1; j < n_nacks; j++) {
4096       gint diff;
4097
4098       diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
4099       GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
4100       if (diff > 16)
4101         break;
4102
4103       blp |= 1 << (diff - 1);
4104       nacked_seqnums++;
4105     }
4106
4107     GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
4108     fci_data += 4;
4109   }
4110
4111   GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
4112   source->stats.sent_nack_count += n_fb_nacks;
4113
4114 done:
4115   data->nacked_seqnums += nacked_seqnums;
4116   rtp_source_clear_nacks (source, nacked_seqnums);
4117   data->may_suppress = FALSE;
4118 }
4119
4120 /* perform cleanup of sources that timed out */
4121 static void
4122 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
4123 {
4124   gboolean remove = FALSE;
4125   gboolean byetimeout = FALSE;
4126   gboolean sendertimeout = FALSE;
4127   gboolean is_sender, is_active;
4128   RTPSession *sess = data->sess;
4129   GstClockTime interval, binterval;
4130   GstClockTime btime;
4131
4132   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
4133
4134   /* check for outdated collisions */
4135   if (source->internal) {
4136     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
4137     rtp_source_timeout (source, data->current_time, data->running_time,
4138         sess->rtcp_feedback_retention_window);
4139   }
4140
4141   /* nothing else to do when without RTCP */
4142   if (data->interval == GST_CLOCK_TIME_NONE)
4143     return;
4144
4145   is_sender = RTP_SOURCE_IS_SENDER (source);
4146   is_active = RTP_SOURCE_IS_ACTIVE (source);
4147
4148   /* our own rtcp interval may have been forced low by secondary configuration,
4149    * while sender side may still operate with higher interval,
4150    * so do not just take our interval to decide on timing out sender,
4151    * but take (if data->interval <= 5 * GST_SECOND):
4152    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
4153    * where sender_interval is difference between last 2 received RTCP reports
4154    */
4155   if (data->interval >= 5 * GST_SECOND || source->internal) {
4156     binterval = data->interval;
4157   } else {
4158     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
4159         GST_TIME_ARGS (source->stats.prev_rtcptime),
4160         GST_TIME_ARGS (source->stats.last_rtcptime));
4161     /* if not received enough yet, fallback to larger default */
4162     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
4163       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
4164     else
4165       binterval = 5 * GST_SECOND;
4166     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
4167   }
4168   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
4169       GST_TIME_ARGS (binterval));
4170
4171   if (!source->internal && source->marked_bye) {
4172     /* if we received a BYE from the source, remove the source after some
4173      * time. */
4174     if (data->current_time > source->bye_time &&
4175         data->current_time - source->bye_time > sess->stats.bye_timeout) {
4176       GST_DEBUG ("removing BYE source %08x", source->ssrc);
4177       remove = TRUE;
4178       byetimeout = TRUE;
4179     }
4180   }
4181
4182   if (source->internal && source->sent_bye) {
4183     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
4184     remove = TRUE;
4185   }
4186
4187   /* sources that were inactive for more than 5 times the deterministic reporting
4188    * interval get timed out. the min timeout is 5 seconds. */
4189   /* mind old time that might pre-date last time going to PLAYING */
4190   btime = MAX (source->last_activity, sess->start_time);
4191   if (data->current_time > btime) {
4192     interval = MAX (binterval * 5, 5 * GST_SECOND);
4193     if (data->current_time - btime > interval) {
4194       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
4195           source->ssrc, GST_TIME_ARGS (btime));
4196       if (source->internal) {
4197         /* this is an internal source that is not using our suggested ssrc.
4198          * since there must be another source using this ssrc, we can remove
4199          * this one instead of making it a receiver forever */
4200         if (source->ssrc != sess->suggested_ssrc
4201             && source->media_ssrc != sess->suggested_ssrc) {
4202           rtp_source_mark_bye (source, "timed out");
4203           /* do not schedule bye here, since we are inside the RTCP timeout
4204            * processing and scheduling bye will interfere with SR/RR sending */
4205         }
4206       } else {
4207         remove = TRUE;
4208       }
4209     }
4210   }
4211
4212   /* senders that did not send for a long time become a receiver, this also
4213    * holds for our own sources. */
4214   if (is_sender) {
4215     /* mind old time that might pre-date last time going to PLAYING */
4216     btime = MAX (source->last_rtp_activity, sess->start_time);
4217     if (data->current_time > btime) {
4218       interval = MAX (binterval * 2, 5 * GST_SECOND);
4219       if (data->current_time - btime > interval) {
4220         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
4221             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
4222         sendertimeout = TRUE;
4223       }
4224     }
4225   }
4226
4227   if (remove) {
4228     sess->total_sources--;
4229     if (is_sender) {
4230       sess->stats.sender_sources--;
4231       if (source->internal)
4232         sess->stats.internal_sender_sources--;
4233     }
4234     if (is_active)
4235       sess->stats.active_sources--;
4236
4237     if (source->internal)
4238       sess->stats.internal_sources--;
4239
4240     if (byetimeout)
4241       on_bye_timeout (sess, source);
4242     else
4243       on_timeout (sess, source);
4244   } else {
4245     if (sendertimeout) {
4246       source->is_sender = FALSE;
4247       sess->stats.sender_sources--;
4248       if (source->internal)
4249         sess->stats.internal_sender_sources--;
4250
4251       on_sender_timeout (sess, source);
4252     }
4253     /* count how many source to report in this generation */
4254     if (((gint16) (source->generation - sess->generation)) <= 0)
4255       data->num_to_report++;
4256   }
4257   source->closing = remove;
4258 }
4259
4260 static void
4261 session_sdes (RTPSession * sess, ReportData * data)
4262 {
4263   GstRTCPPacket *packet = &data->packet;
4264   const GstStructure *sdes;
4265   gint i, n_fields;
4266   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4267
4268   /* add SDES packet */
4269   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
4270
4271   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
4272
4273   sdes = rtp_source_get_sdes_struct (data->source);
4274
4275   /* add all fields in the structure, the order is not important. */
4276   n_fields = gst_structure_n_fields (sdes);
4277   for (i = 0; i < n_fields; ++i) {
4278     const gchar *field;
4279     const gchar *value;
4280     GstRTCPSDESType type;
4281
4282     field = gst_structure_nth_field_name (sdes, i);
4283     if (field == NULL)
4284       continue;
4285     value = gst_structure_get_string (sdes, field);
4286     if (value == NULL)
4287       continue;
4288     type = gst_rtcp_sdes_name_to_type (field);
4289
4290     /* Early packets are minimal and only include the CNAME */
4291     if (data->is_early && type != GST_RTCP_SDES_CNAME)
4292       continue;
4293
4294     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
4295       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
4296           (const guint8 *) value);
4297     } else if (type == GST_RTCP_SDES_PRIV) {
4298       gsize prefix_len;
4299       gsize value_len;
4300       gsize data_len;
4301       guint8 data[256];
4302
4303       /* don't accept entries that are too big */
4304       prefix_len = strlen (field);
4305       if (prefix_len > 255)
4306         continue;
4307       value_len = strlen (value);
4308       if (value_len > 255)
4309         continue;
4310       data_len = 1 + prefix_len + value_len;
4311       if (data_len > 255)
4312         continue;
4313
4314       data[0] = prefix_len;
4315       memcpy (&data[1], field, prefix_len);
4316       memcpy (&data[1 + prefix_len], value, value_len);
4317
4318       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
4319     }
4320   }
4321
4322   data->has_sdes = TRUE;
4323 }
4324
4325 /* schedule a BYE packet */
4326 static void
4327 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
4328 {
4329   GstRTCPPacket *packet = &data->packet;
4330   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4331
4332   /* add SDES */
4333   session_sdes (sess, data);
4334   /* add a BYE packet */
4335   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
4336   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
4337   if (source->bye_reason)
4338     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
4339
4340   /* we have a BYE packet now */
4341   source->sent_bye = TRUE;
4342 }
4343
4344 static gboolean
4345 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
4346 {
4347   GstClockTime new_send_time;
4348   GstClockTime interval;
4349   RTPSessionStats *stats;
4350
4351   if (sess->scheduled_bye)
4352     stats = &sess->bye_stats;
4353   else
4354     stats = &sess->stats;
4355
4356   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
4357     data->is_early = TRUE;
4358   else
4359     data->is_early = FALSE;
4360
4361   if (data->is_early && sess->next_early_rtcp_time <= current_time) {
4362     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %"
4363         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
4364         GST_TIME_ARGS (current_time));
4365   } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
4366       sess->next_rtcp_check_time > current_time) {
4367     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
4368         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
4369         GST_TIME_ARGS (current_time));
4370     return FALSE;
4371   }
4372
4373   /* take interval and add jitter */
4374   interval = data->interval;
4375   if (interval != GST_CLOCK_TIME_NONE)
4376     interval = rtp_stats_add_rtcp_jitter (stats, interval);
4377
4378   if (sess->last_rtcp_check_time != GST_CLOCK_TIME_NONE) {
4379     /* perform forward reconsideration */
4380     if (interval != GST_CLOCK_TIME_NONE) {
4381       GstClockTime elapsed;
4382
4383       /* get elapsed time since we last reported */
4384       elapsed = current_time - sess->last_rtcp_check_time;
4385
4386       GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
4387           GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
4388       new_send_time = interval + sess->last_rtcp_check_time;
4389     } else {
4390       new_send_time = sess->last_rtcp_check_time;
4391     }
4392   } else {
4393     /* If this is the first RTCP packet, we can reconsider anything based
4394      * on the last RTCP send time because there was none.
4395      */
4396     g_warn_if_fail (!data->is_early);
4397     data->is_early = FALSE;
4398     new_send_time = current_time;
4399   }
4400
4401   if (!data->is_early) {
4402     /* check if reconsideration */
4403     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
4404       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
4405           GST_TIME_ARGS (new_send_time));
4406       /* store new check time */
4407       sess->next_rtcp_check_time = new_send_time;
4408       sess->last_rtcp_interval = interval;
4409       return FALSE;
4410     }
4411
4412     sess->last_rtcp_interval = interval;
4413     if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
4414             || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
4415         && interval != GST_CLOCK_TIME_NONE) {
4416       /* Apply the rules from RFC 4585 section 3.5.3 */
4417       if (stats->min_interval != 0 && !sess->first_rtcp) {
4418         GstClockTime T_rr_current_interval =
4419             g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND;
4420
4421         if (T_rr_current_interval > interval) {
4422           GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
4423               " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
4424               GST_TIME_ARGS (interval));
4425           interval = T_rr_current_interval;
4426         }
4427       }
4428     }
4429     sess->next_rtcp_check_time = current_time + interval;
4430   }
4431
4432
4433   GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT,
4434       GST_TIME_ARGS (sess->next_rtcp_check_time));
4435
4436   return TRUE;
4437 }
4438
4439 static void
4440 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
4441 {
4442   g_hash_table_insert (hash_table, key, g_object_ref (source));
4443 }
4444
4445 static gboolean
4446 remove_closing_sources (const gchar * key, RTPSource * source,
4447     ReportData * data)
4448 {
4449   if (source->closing)
4450     return TRUE;
4451
4452   if (source->send_fir)
4453     data->have_fir = TRUE;
4454   if (source->send_pli)
4455     data->have_pli = TRUE;
4456   if (source->send_nack)
4457     data->have_nack = TRUE;
4458
4459   return FALSE;
4460 }
4461
4462 static void
4463 generate_twcc (const gchar * key, RTPSource * source, ReportData * data)
4464 {
4465   RTPSession *sess = data->sess;
4466   GstBuffer *buf;
4467
4468   /* only generate RTCP for active internal sources */
4469   if (!source->internal || source->sent_bye)
4470     return;
4471
4472   /* ignore other sources when we do the timeout after a scheduled BYE */
4473   if (sess->scheduled_bye && !source->marked_bye)
4474     return;
4475
4476   /* skip if RTCP is disabled */
4477   if (source->disable_rtcp) {
4478     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4479     return;
4480   }
4481
4482   GST_DEBUG ("generating TWCC feedback for source %08x", source->ssrc);
4483
4484   while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) {
4485     ReportOutput *output = g_slice_new (ReportOutput);
4486     output->source = g_object_ref (source);
4487     output->is_bye = FALSE;
4488     output->buffer = buf;
4489     /* queue the RTCP packet to push later */
4490     g_queue_push_tail (&data->output, output);
4491   }
4492 }
4493
4494
4495 static void
4496 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
4497 {
4498   RTPSession *sess = data->sess;
4499   gboolean is_bye = FALSE;
4500   ReportOutput *output;
4501   gboolean sr_req_pending = sess->sr_req_pending;
4502
4503   /* only generate RTCP for active internal sources */
4504   if (!source->internal || source->sent_bye)
4505     return;
4506
4507   /* ignore other sources when we do the timeout after a scheduled BYE */
4508   if (sess->scheduled_bye && !source->marked_bye)
4509     return;
4510
4511   /* skip if RTCP is disabled */
4512   if (source->disable_rtcp) {
4513     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4514     return;
4515   }
4516
4517   data->source = source;
4518
4519   /* open packet */
4520   session_start_rtcp (sess, data);
4521
4522   if (source->marked_bye) {
4523     /* send BYE */
4524     make_source_bye (sess, source, data);
4525     is_bye = TRUE;
4526   } else if (!data->is_early) {
4527     /* loop over all known sources and add report blocks. If we are early, we
4528      * just make a minimal RTCP packet and skip this step */
4529     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4530         (GHFunc) session_report_blocks, data);
4531   }
4532   if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp
4533           || sr_req_pending))
4534     session_sdes (sess, data);
4535
4536   if (data->have_fir)
4537     session_fir (sess, data);
4538
4539   if (data->have_pli)
4540     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4541         (GHFunc) session_pli, data);
4542
4543   if (data->have_nack)
4544     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4545         (GHFunc) session_nack, data);
4546
4547   gst_rtcp_buffer_unmap (&data->rtcpbuf);
4548
4549   output = g_slice_new (ReportOutput);
4550   output->source = g_object_ref (source);
4551   output->is_bye = is_bye;
4552   output->buffer = data->rtcp;
4553   /* queue the RTCP packet to push later */
4554   g_queue_push_tail (&data->output, output);
4555 }
4556
4557 static void
4558 update_generation (const gchar * key, RTPSource * source, ReportData * data)
4559 {
4560   RTPSession *sess = data->sess;
4561
4562   if (g_hash_table_size (source->reported_in_sr_of) >=
4563       sess->stats.internal_sources) {
4564     /* source is reported, move to next generation */
4565     source->generation = sess->generation + 1;
4566     g_hash_table_remove_all (source->reported_in_sr_of);
4567
4568     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
4569         source->generation);
4570
4571     /* if we reported all sources in this generation, move to next */
4572     if (--data->num_to_report == 0) {
4573       sess->generation++;
4574       GST_DEBUG ("all reported, generation now %u", sess->generation);
4575     }
4576   }
4577 }
4578
4579 static void
4580 schedule_remaining_nacks (const gchar * key, RTPSource * source,
4581     ReportData * data)
4582 {
4583   RTPSession *sess = data->sess;
4584   GstClockTime *nack_deadlines;
4585   GstClockTime deadline;
4586   guint n_nacks;
4587
4588   if (!source->send_nack)
4589     return;
4590
4591   /* the scheduling is entirely based on available bandwidth, just take the
4592    * biggest seqnum, which will have the largest deadline to request early
4593    * RTCP. */
4594   nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
4595   deadline = nack_deadlines[n_nacks - 1];
4596   RTP_SESSION_UNLOCK (sess);
4597   rtp_session_send_rtcp_with_deadline (sess, deadline);
4598   RTP_SESSION_LOCK (sess);
4599 }
4600
4601 static gboolean
4602 rtp_session_are_all_sources_bye (RTPSession * sess)
4603 {
4604   GHashTableIter iter;
4605   RTPSource *src;
4606
4607   RTP_SESSION_LOCK (sess);
4608   g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
4609   while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
4610     if (src->internal && !src->sent_bye) {
4611       RTP_SESSION_UNLOCK (sess);
4612       return FALSE;
4613     }
4614   }
4615   RTP_SESSION_UNLOCK (sess);
4616
4617   return TRUE;
4618 }
4619
4620 /**
4621  * rtp_session_on_timeout:
4622  * @sess: an #RTPSession
4623  * @current_time: the current system time
4624  * @ntpnstime: the current NTP time in nanoseconds
4625  * @running_time: the current running_time of the pipeline
4626  *
4627  * Perform maintenance actions after the timeout obtained with
4628  * rtp_session_next_timeout() expired.
4629  *
4630  * This function will perform timeouts of receivers and senders, send a BYE
4631  * packet or generate RTCP packets with current session stats.
4632  *
4633  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
4634  * times, for each packet that should be processed.
4635  *
4636  * Returns: a #GstFlowReturn.
4637  */
4638 GstFlowReturn
4639 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
4640     guint64 ntpnstime, GstClockTime running_time)
4641 {
4642   GstFlowReturn result = GST_FLOW_OK;
4643   ReportData data = { GST_RTCP_BUFFER_INIT };
4644   GHashTable *table_copy;
4645   ReportOutput *output;
4646   gboolean all_empty = FALSE;
4647
4648   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
4649
4650   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
4651       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4652       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
4653
4654   data.sess = sess;
4655   data.current_time = current_time;
4656   data.ntpnstime = ntpnstime;
4657   data.running_time = running_time;
4658   data.num_to_report = 0;
4659   data.may_suppress = FALSE;
4660   data.nacked_seqnums = 0;
4661   g_queue_init (&data.output);
4662
4663   RTP_SESSION_LOCK (sess);
4664   /* get a new interval, we need this for various cleanups etc */
4665   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
4666
4667   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
4668
4669   /* we need an internal source now */
4670   if (sess->stats.internal_sources == 0) {
4671     RTPSource *source;
4672     gboolean created;
4673
4674     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
4675         current_time);
4676     sess->internal_ssrc_set = TRUE;
4677
4678     if (created)
4679       on_new_sender_ssrc (sess, source);
4680
4681     g_object_unref (source);
4682   }
4683
4684   sess->conflicting_addresses =
4685       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
4686
4687   /* Make a local copy of the hashtable. We need to do this because the
4688    * cleanup stage below releases the session lock. */
4689   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
4690       (GDestroyNotify) g_object_unref);
4691   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4692       (GHFunc) clone_ssrcs_hashtable, table_copy);
4693
4694   /* Clean up the session, mark the source for removing, this might release the
4695    * session lock. */
4696   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
4697   g_hash_table_destroy (table_copy);
4698
4699   /* Now remove the marked sources */
4700   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
4701       (GHRFunc) remove_closing_sources, &data);
4702
4703   /* update point-to-point status */
4704   session_update_ptp (sess);
4705
4706   /* see if we need to generate SR or RR packets */
4707   if (!is_rtcp_time (sess, current_time, &data))
4708     goto done;
4709
4710   /* check if all the buffers are empty after generation */
4711   all_empty = TRUE;
4712
4713   GST_DEBUG
4714       ("doing RTCP generation %u for %u sources, early %d, may suppress %d",
4715       sess->generation, data.num_to_report, data.is_early, data.may_suppress);
4716
4717   /* generate RTCP for all internal sources */
4718   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4719       (GHFunc) generate_rtcp, &data);
4720
4721   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4722       (GHFunc) generate_twcc, &data);
4723
4724   /* update the generation for all the sources that have been reported */
4725   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4726       (GHFunc) update_generation, &data);
4727
4728   /* we keep track of the last report time in order to timeout inactive
4729    * receivers or senders */
4730   if (!data.is_early) {
4731     GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %"
4732         GST_TIME_FORMAT " = %" GST_TIME_FORMAT,
4733         GST_TIME_ARGS (data.current_time),
4734         GST_TIME_ARGS (sess->last_rtcp_send_time),
4735         GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time));
4736     sess->last_rtcp_send_time = data.current_time;
4737   }
4738
4739   GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
4740       " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time),
4741       GST_TIME_ARGS (sess->last_rtcp_check_time),
4742       GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time));
4743   sess->last_rtcp_check_time = data.current_time;
4744   sess->first_rtcp = FALSE;
4745   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
4746   sess->scheduled_bye = FALSE;
4747
4748 done:
4749   RTP_SESSION_UNLOCK (sess);
4750
4751   /* notify about updated statistics */
4752   g_object_notify_by_pspec (G_OBJECT (sess), properties[PROP_STATS]);
4753
4754   /* push out the RTCP packets */
4755   while ((output = g_queue_pop_head (&data.output))) {
4756     gboolean do_not_suppress, empty_buffer;
4757     GstBuffer *buffer = output->buffer;
4758     RTPSource *source = output->source;
4759
4760     /* Give the user a change to add its own packet */
4761     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
4762         buffer, data.is_early, &do_not_suppress);
4763
4764     empty_buffer = gst_buffer_get_size (buffer) == 0;
4765
4766     if (!empty_buffer)
4767       all_empty = FALSE;
4768
4769     if (sess->callbacks.send_rtcp &&
4770         !empty_buffer && (do_not_suppress || !data.may_suppress)) {
4771       guint packet_size;
4772
4773       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
4774
4775       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
4776       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
4777           sess->stats.avg_rtcp_packet_size, packet_size);
4778       result =
4779           sess->callbacks.send_rtcp (sess, source, buffer,
4780           rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data);
4781
4782       RTP_SESSION_LOCK (sess);
4783       sess->stats.nacks_sent += data.nacked_seqnums;
4784       on_sender_ssrc_active (sess, source);
4785       RTP_SESSION_UNLOCK (sess);
4786     } else {
4787       GST_DEBUG ("freeing packet callback: %p"
4788           " empty_buffer: %d, "
4789           " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp,
4790           empty_buffer, do_not_suppress, data.may_suppress);
4791       if (!empty_buffer) {
4792         RTP_SESSION_LOCK (sess);
4793         sess->stats.nacks_dropped += data.nacked_seqnums;
4794         RTP_SESSION_UNLOCK (sess);
4795       }
4796       gst_buffer_unref (buffer);
4797     }
4798     g_object_unref (source);
4799     g_slice_free (ReportOutput, output);
4800   }
4801
4802   if (all_empty)
4803     GST_ERROR ("generated empty RTCP messages for all the sources");
4804
4805   /* schedule remaining nacks */
4806   RTP_SESSION_LOCK (sess);
4807   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4808       (GHFunc) schedule_remaining_nacks, &data);
4809   RTP_SESSION_UNLOCK (sess);
4810
4811   return result;
4812 }
4813
4814 /**
4815  * rtp_session_request_early_rtcp:
4816  * @sess: an #RTPSession
4817  * @current_time: the current system time
4818  * @max_delay: maximum delay
4819  *
4820  * Request transmission of early RTCP
4821  *
4822  * Returns: %TRUE if the related RTCP can be scheduled.
4823  */
4824 gboolean
4825 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
4826     GstClockTime max_delay)
4827 {
4828   GstClockTime T_dither_max, T_rr, offset = 0;
4829   gboolean ret;
4830   gboolean allow_early;
4831
4832   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
4833
4834   RTP_SESSION_LOCK (sess);
4835
4836   /* We assume a feedback profile if something is requesting RTCP
4837    * to be sent */
4838   sess->rtp_profile = GST_RTP_PROFILE_AVPF;
4839
4840   /* Check if already requested */
4841   /*  RFC 4585 section 3.5.2 step 2 */
4842   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
4843     GST_LOG_OBJECT (sess, "already have next early rtcp time");
4844     ret = (current_time + max_delay > sess->next_early_rtcp_time);
4845     goto end;
4846   }
4847
4848   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
4849     GST_LOG_OBJECT (sess, "no next RTCP check time");
4850     ret = FALSE;
4851     goto end;
4852   }
4853
4854   /* RFC 4585 section 3.5.3 step 1
4855    * If no regular RTCP packet has been sent before, then a regular
4856    * RTCP packet has to be scheduled first and FB messages might be
4857    * included there
4858    */
4859   if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
4860     GST_LOG_OBJECT (sess, "no RTCP sent yet");
4861
4862     if (current_time + max_delay > sess->next_rtcp_check_time) {
4863       GST_LOG_OBJECT (sess,
4864           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4865           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4866           GST_TIME_ARGS (max_delay),
4867           GST_TIME_ARGS (sess->next_rtcp_check_time));
4868       ret = TRUE;
4869     } else {
4870       GST_LOG_OBJECT (sess,
4871           "can't allow early feedback, next scheduled time is too late %"
4872           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4873           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4874           GST_TIME_ARGS (sess->next_rtcp_check_time));
4875       ret = FALSE;
4876     }
4877     goto end;
4878   }
4879
4880   T_rr = sess->last_rtcp_interval;
4881
4882   /*  RFC 4585 section 3.5.2 step 2b */
4883   /* If the total sources is <=2, then there is only us and one peer */
4884   /* When there is one auxiliary stream the session can still do point
4885    * to point.
4886    */
4887   if (sess->is_doing_ptp) {
4888     T_dither_max = 0;
4889   } else {
4890     /* Divide by 2 because l = 0.5 */
4891     T_dither_max = T_rr;
4892     T_dither_max /= 2;
4893   }
4894
4895   /*  RFC 4585 section 3.5.2 step 3 */
4896   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
4897     GST_LOG_OBJECT (sess,
4898         "don't send because of dither, next scheduled time is too soon %"
4899         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
4900         GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
4901         GST_TIME_ARGS (sess->next_rtcp_check_time));
4902     ret = T_dither_max <= max_delay;
4903     goto end;
4904   }
4905
4906   /*  RFC 4585 section 3.5.2 step 4a and
4907    *  RFC 4585 section 3.5.2 step 6 */
4908   allow_early = FALSE;
4909   if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) {
4910     /* Last time we sent a full RTCP packet, we can now immediately
4911      * send an early one as allow_early was reset to TRUE */
4912     allow_early = TRUE;
4913   } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) {
4914     /* Last packet we sent was an early RTCP packet and more than
4915      * T_rr has passed since then, meaning we would have suppressed
4916      * a regular RTCP packet already and reset allow_early to TRUE */
4917     allow_early = TRUE;
4918
4919     /* We have to offset a bit as T_rr has not passed yet, but will before
4920      * max_delay */
4921     if (sess->last_rtcp_check_time + T_rr > current_time)
4922       offset = (sess->last_rtcp_check_time + T_rr) - current_time;
4923   } else {
4924     GST_DEBUG_OBJECT (sess,
4925         "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %"
4926         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %"
4927         GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time),
4928         GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr),
4929         GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay));
4930   }
4931
4932   if (!allow_early) {
4933     /* Ignore the request a scheduled packet will be in time anyway */
4934     if (current_time + max_delay > sess->next_rtcp_check_time) {
4935       GST_LOG_OBJECT (sess,
4936           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4937           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4938           GST_TIME_ARGS (max_delay),
4939           GST_TIME_ARGS (sess->next_rtcp_check_time));
4940       ret = TRUE;
4941     } else {
4942       GST_LOG_OBJECT (sess,
4943           "can't allow early feedback and next scheduled time is too late %"
4944           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4945           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4946           GST_TIME_ARGS (sess->next_rtcp_check_time));
4947       ret = FALSE;
4948     }
4949     goto end;
4950   }
4951
4952   /*  RFC 4585 section 3.5.2 step 4b */
4953   if (T_dither_max) {
4954     /* Schedule an early transmission later */
4955     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
4956         current_time + offset;
4957   } else {
4958     /* If no dithering, schedule it for NOW */
4959     sess->next_early_rtcp_time = current_time + offset;
4960   }
4961
4962   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
4963       ", next regular RTCP time %" GST_TIME_FORMAT,
4964       GST_TIME_ARGS (sess->next_early_rtcp_time),
4965       GST_TIME_ARGS (sess->next_rtcp_check_time));
4966   RTP_SESSION_UNLOCK (sess);
4967
4968   /* notify app of need to send packet early
4969    * and therefore of timeout change */
4970   if (sess->callbacks.reconsider)
4971     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
4972
4973   return TRUE;
4974
4975 end:
4976
4977   RTP_SESSION_UNLOCK (sess);
4978
4979   return ret;
4980 }
4981
4982 static gboolean
4983 rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
4984     GstClockTime max_delay)
4985 {
4986   /* notify the application that we intend to send early RTCP */
4987   if (sess->callbacks.notify_early_rtcp)
4988     sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
4989
4990   return rtp_session_request_early_rtcp (sess, now, max_delay);
4991 }
4992
4993 static gboolean
4994 rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
4995 {
4996   GstClockTime now, max_delay;
4997
4998   if (!sess->callbacks.send_rtcp)
4999     return FALSE;
5000
5001   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
5002
5003   if (deadline < now)
5004     return FALSE;
5005
5006   max_delay = deadline - now;
5007
5008   return rtp_session_send_rtcp_internal (sess, now, max_delay);
5009 }
5010
5011 static gboolean
5012 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
5013 {
5014   GstClockTime now;
5015
5016   if (!sess->callbacks.send_rtcp)
5017     return FALSE;
5018
5019   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
5020
5021   return rtp_session_send_rtcp_internal (sess, now, max_delay);
5022 }
5023
5024 gboolean
5025 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
5026     gboolean fir, gint count)
5027 {
5028   RTPSource *src;
5029
5030   RTP_SESSION_LOCK (sess);
5031   src = find_source (sess, ssrc);
5032   if (src == NULL)
5033     goto no_source;
5034
5035   if (fir) {
5036     src->send_pli = FALSE;
5037     src->send_fir = TRUE;
5038
5039     if (count == -1 || count != src->last_fir_count)
5040       src->current_send_fir_seqnum++;
5041     src->last_fir_count = count;
5042   } else if (!src->send_fir) {
5043     src->send_pli = TRUE;
5044   }
5045   RTP_SESSION_UNLOCK (sess);
5046
5047   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
5048     GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP");
5049   }
5050
5051   return TRUE;
5052
5053   /* ERRORS */
5054 no_source:
5055   {
5056     RTP_SESSION_UNLOCK (sess);
5057     return FALSE;
5058   }
5059 }
5060
5061 /**
5062  * rtp_session_request_nack:
5063  * @sess: a #RTPSession
5064  * @ssrc: the SSRC
5065  * @seqnum: the missing seqnum
5066  * @max_delay: max delay to request NACK
5067  *
5068  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
5069  *
5070  * Returns: %TRUE if the NACK feedback could be scheduled
5071  */
5072 gboolean
5073 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
5074     GstClockTime max_delay)
5075 {
5076   RTPSource *source;
5077   GstClockTime now;
5078
5079   if (!sess->callbacks.send_rtcp)
5080     return FALSE;
5081
5082   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
5083
5084   RTP_SESSION_LOCK (sess);
5085   source = find_source (sess, ssrc);
5086   if (source == NULL)
5087     goto no_source;
5088
5089   GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
5090       ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
5091   rtp_source_register_nack (source, seqnum, now + max_delay);
5092   RTP_SESSION_UNLOCK (sess);
5093
5094   if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
5095     GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
5096   }
5097
5098   return TRUE;
5099
5100   /* ERRORS */
5101 no_source:
5102   {
5103     RTP_SESSION_UNLOCK (sess);
5104     return FALSE;
5105   }
5106 }
5107
5108 /**
5109  * rtp_session_update_recv_caps_structure:
5110  * @sess: an #RTPSession
5111  * @s: a #GstStructure from a #GstCaps
5112  *
5113  * Update the caps of the receiver in the rtp session.
5114  */
5115 void
5116 rtp_session_update_recv_caps_structure (RTPSession * sess,
5117     const GstStructure * s)
5118 {
5119   rtp_twcc_manager_parse_recv_ext_id (sess->twcc, s);
5120 }