rtpsession: Handle RTCP-SR-REQ (RFC6051) RTCP feedback message
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25 #include <stdlib.h>
26
27 #include <gst/rtp/gstrtpbuffer.h>
28 #include <gst/rtp/gstrtcpbuffer.h>
29
30 #include <gst/glib-compat-private.h>
31
32 #include "rtpsession.h"
33
34 GST_DEBUG_CATEGORY (rtp_session_debug);
35 #define GST_CAT_DEFAULT rtp_session_debug
36
37 /* signals and args */
38 enum
39 {
40   SIGNAL_GET_SOURCE_BY_SSRC,
41   SIGNAL_ON_NEW_SSRC,
42   SIGNAL_ON_SSRC_COLLISION,
43   SIGNAL_ON_SSRC_VALIDATED,
44   SIGNAL_ON_SSRC_ACTIVE,
45   SIGNAL_ON_SSRC_SDES,
46   SIGNAL_ON_BYE_SSRC,
47   SIGNAL_ON_BYE_TIMEOUT,
48   SIGNAL_ON_TIMEOUT,
49   SIGNAL_ON_SENDER_TIMEOUT,
50   SIGNAL_ON_SENDING_RTCP,
51   SIGNAL_ON_APP_RTCP,
52   SIGNAL_ON_FEEDBACK_RTCP,
53   SIGNAL_SEND_RTCP,
54   SIGNAL_SEND_RTCP_FULL,
55   SIGNAL_ON_RECEIVING_RTCP,
56   SIGNAL_ON_NEW_SENDER_SSRC,
57   SIGNAL_ON_SENDER_SSRC_ACTIVE,
58   SIGNAL_ON_SENDING_NACKS,
59   LAST_SIGNAL
60 };
61
62 #define DEFAULT_INTERNAL_SOURCE      NULL
63 #define DEFAULT_BANDWIDTH            0.0
64 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
65 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
66 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
67 #define DEFAULT_RTCP_MTU             1400
68 #define DEFAULT_SDES                 NULL
69 #define DEFAULT_NUM_SOURCES          0
70 #define DEFAULT_NUM_ACTIVE_SOURCES   0
71 #define DEFAULT_SOURCES              NULL
72 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
73 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
74 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
75 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
76 #define DEFAULT_MAX_DROPOUT_TIME     60000
77 #define DEFAULT_MAX_MISORDER_TIME    2000
78 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
79 #define DEFAULT_RTCP_REDUCED_SIZE    FALSE
80 #define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE
81 #define DEFAULT_TWCC_FEEDBACK_INTERVAL GST_CLOCK_TIME_NONE
82
83 enum
84 {
85   PROP_0,
86   PROP_INTERNAL_SSRC,
87   PROP_INTERNAL_SOURCE,
88   PROP_BANDWIDTH,
89   PROP_RTCP_FRACTION,
90   PROP_RTCP_RR_BANDWIDTH,
91   PROP_RTCP_RS_BANDWIDTH,
92   PROP_RTCP_MTU,
93   PROP_SDES,
94   PROP_NUM_SOURCES,
95   PROP_NUM_ACTIVE_SOURCES,
96   PROP_SOURCES,
97   PROP_FAVOR_NEW,
98   PROP_RTCP_MIN_INTERVAL,
99   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
100   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
101   PROP_PROBATION,
102   PROP_MAX_DROPOUT_TIME,
103   PROP_MAX_MISORDER_TIME,
104   PROP_STATS,
105   PROP_RTP_PROFILE,
106   PROP_RTCP_REDUCED_SIZE,
107   PROP_RTCP_DISABLE_SR_TIMESTAMP,
108   PROP_TWCC_FEEDBACK_INTERVAL,
109 };
110
111 /* update average packet size */
112 #define INIT_AVG(avg, val) \
113    (avg) = (val);
114 #define UPDATE_AVG(avg, val)            \
115   if ((avg) == 0)                       \
116    (avg) = (val);                       \
117   else                                  \
118    (avg) = ((val) + (15 * (avg))) >> 4;
119
120 /* GObject vmethods */
121 static void rtp_session_finalize (GObject * object);
122 static void rtp_session_set_property (GObject * object, guint prop_id,
123     const GValue * value, GParamSpec * pspec);
124 static void rtp_session_get_property (GObject * object, guint prop_id,
125     GValue * value, GParamSpec * pspec);
126
127 static gboolean rtp_session_send_rtcp (RTPSession * sess,
128     GstClockTime max_delay);
129 static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
130     GstClockTime deadline);
131
132 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
133
134 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
135
136 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
137 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
138     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
139 static RTPSource *obtain_internal_source (RTPSession * sess,
140     guint32 ssrc, gboolean * created, GstClockTime current_time);
141 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
142     GstClockTime current_time);
143 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
144     gboolean deterministic, gboolean first);
145
146 static gboolean
147 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
148     const GValue * handler_return, gpointer data)
149 {
150   if (g_value_get_boolean (handler_return))
151     g_value_set_boolean (return_accu, TRUE);
152
153   return TRUE;
154 }
155
156 static void
157 rtp_session_class_init (RTPSessionClass * klass)
158 {
159   GObjectClass *gobject_class;
160
161   gobject_class = (GObjectClass *) klass;
162
163   gobject_class->finalize = rtp_session_finalize;
164   gobject_class->set_property = rtp_session_set_property;
165   gobject_class->get_property = rtp_session_get_property;
166
167   /**
168    * RTPSession::get-source-by-ssrc:
169    * @session: the object which received the signal
170    * @ssrc: the SSRC of the RTPSource
171    *
172    * Request the #RTPSource object with SSRC @ssrc in @session.
173    */
174   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
175       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
176       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
177           get_source_by_ssrc), NULL, NULL, NULL,
178       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
179
180   /**
181    * RTPSession::on-new-ssrc:
182    * @session: the object which received the signal
183    * @src: the new RTPSource
184    *
185    * Notify of a new SSRC that entered @session.
186    */
187   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
188       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
189       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
190       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
191   /**
192    * RTPSession::on-ssrc-collision:
193    * @session: the object which received the signal
194    * @src: the #RTPSource that caused a collision
195    *
196    * Notify when we have an SSRC collision
197    */
198   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
199       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
200       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
201       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
202   /**
203    * RTPSession::on-ssrc-validated:
204    * @session: the object which received the signal
205    * @src: the new validated RTPSource
206    *
207    * Notify of a new SSRC that became validated.
208    */
209   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
210       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
211       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
212       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
213   /**
214    * RTPSession::on-ssrc-active:
215    * @session: the object which received the signal
216    * @src: the active RTPSource
217    *
218    * Notify of a SSRC that is active, i.e., sending RTCP.
219    */
220   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
221       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
222       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
223       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
224   /**
225    * RTPSession::on-ssrc-sdes:
226    * @session: the object which received the signal
227    * @src: the RTPSource
228    *
229    * Notify that a new SDES was received for SSRC.
230    */
231   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
232       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
233       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
234       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
235   /**
236    * RTPSession::on-bye-ssrc:
237    * @session: the object which received the signal
238    * @src: the RTPSource that went away
239    *
240    * Notify of an SSRC that became inactive because of a BYE packet.
241    */
242   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
243       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
245       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
246   /**
247    * RTPSession::on-bye-timeout:
248    * @session: the object which received the signal
249    * @src: the RTPSource that timed out
250    *
251    * Notify of an SSRC that has timed out because of BYE
252    */
253   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
254       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
255       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
256       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
257   /**
258    * RTPSession::on-timeout:
259    * @session: the object which received the signal
260    * @src: the RTPSource that timed out
261    *
262    * Notify of an SSRC that has timed out
263    */
264   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
265       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
266       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
267       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
268   /**
269    * RTPSession::on-sender-timeout:
270    * @session: the object which received the signal
271    * @src: the RTPSource that timed out
272    *
273    * Notify of an SSRC that was a sender but timed out and became a receiver.
274    */
275   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
276       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
277       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
278       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
279
280   /**
281    * RTPSession::on-sending-rtcp
282    * @session: the object which received the signal
283    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
284    * @early: %TRUE if the packet is early, %FALSE if it is regular
285    *
286    * This signal is emitted before sending an RTCP packet, it can be used
287    * to add extra RTCP Packets.
288    *
289    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
290    * if suppressing it is acceptable
291    */
292   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
293       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
294       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
295       accumulate_trues, NULL, NULL, G_TYPE_BOOLEAN, 2,
296       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
297
298   /**
299    * RTPSession::on-app-rtcp:
300    * @session: the object which received the signal
301    * @subtype: The subtype of the packet
302    * @ssrc: The SSRC/CSRC of the packet
303    * @name: The name of the packet
304    * @data: a #GstBuffer with the application-dependant data or %NULL if
305    * there was no data
306    *
307    * Notify that a RTCP APP packet has been received
308    */
309   rtp_session_signals[SIGNAL_ON_APP_RTCP] =
310       g_signal_new ("on-app-rtcp", G_TYPE_FROM_CLASS (klass),
311       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_app_rtcp),
312       NULL, NULL, NULL, G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT,
313       G_TYPE_STRING, GST_TYPE_BUFFER);
314
315   /**
316    * RTPSession::on-feedback-rtcp:
317    * @session: the object which received the signal
318    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
319    *  %GST_RTCP_TYPE_RTPFB
320    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
321    * @sender_ssrc: The SSRC of the sender
322    * @media_ssrc: The SSRC of the media this refers to
323    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
324    * there was no FCI
325    *
326    * Notify that a RTCP feedback packet has been received
327    */
328   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
329       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
330       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
331       NULL, NULL, NULL, G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
332       G_TYPE_UINT, GST_TYPE_BUFFER);
333
334   /**
335    * RTPSession::send-rtcp:
336    * @session: the object which received the signal
337    * @max_delay: The maximum delay after which the feedback will not be useful
338    *  anymore
339    *
340    * Requests that the #RTPSession initiate a new RTCP packet as soon as
341    * possible within the requested delay.
342    *
343    * This sets feedback to %TRUE if not already done before.
344    */
345   rtp_session_signals[SIGNAL_SEND_RTCP] =
346       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
347       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
348       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
349       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
350
351   /**
352    * RTPSession::send-rtcp-full:
353    * @session: the object which received the signal
354    * @max_delay: The maximum delay after which the feedback will not be useful
355    *  anymore
356    *
357    * Requests that the #RTPSession initiate a new RTCP packet as soon as
358    * possible within the requested delay.
359    *
360    * This sets feedback to %TRUE if not already done before.
361    *
362    * Returns: TRUE if the new RTCP packet could be scheduled within the
363    * requested delay, FALSE otherwise.
364    *
365    * Since: 1.6
366    */
367   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
368       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
369       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
370       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
371       NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
372
373   /**
374    * RTPSession::on-receiving-rtcp
375    * @session: the object which received the signal
376    * @buffer: the #GstBuffer containing the RTCP packet that was received
377    *
378    * This signal is emitted when receiving an RTCP packet before it is handled
379    * by the session. It can be used to extract custom information from RTCP packets.
380    *
381    * Since: 1.6
382    */
383   rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
384       g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
385       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
386       NULL, NULL, NULL, G_TYPE_NONE, 1,
387       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
388
389   /**
390    * RTPSession::on-new-sender-ssrc:
391    * @session: the object which received the signal
392    * @src: the new sender RTPSource
393    *
394    * Notify of a new sender SSRC that entered @session.
395    *
396    * Since: 1.8
397    */
398   rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
399       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
400       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc),
401       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
402
403   /**
404    * RTPSession::on-sender-ssrc-active:
405    * @session: the object which received the signal
406    * @src: the active sender RTPSource
407    *
408    * Notify of a sender SSRC that is active, i.e., sending RTCP.
409    *
410    * Since: 1.8
411    */
412   rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
413       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
414       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass,
415           on_sender_ssrc_active), NULL, NULL, NULL,
416       G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
417
418   /**
419    * RTPSession::on-sending-nack
420    * @session: the object which received the signal
421    * @sender_ssrc: the sender ssrc
422    * @media_ssrc: the media ssrc
423    * @nacks: (element-type guint16): the list of seqnum to be nacked
424    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
425    *
426    * This signal is emitted before NACK packets are added into the RTCP
427    * packet. This signal can be used to override the conversion of the NACK
428    * seqnum array into packets. This can be used if your protocol uses
429    * different type of NACK (e.g. based on RTCP APP).
430    *
431    * The handler should transform the seqnum from @nacks array into packets.
432    * @nacks seqnum must be consumed from the start. The remaining will be
433    * rescheduled for later base on bandwidth. Only one handler will be
434    * signalled.
435    *
436    * A handler may return 0 to signal that generic NACKs should be created
437    * for this set. This can be useful if the signal is used for other purpose
438    * or if the other type of NACK would use more space.
439    *
440    * Returns: the number of NACK seqnum that was consumed from @nacks.
441    *
442    * Since: 1.16
443    */
444   rtp_session_signals[SIGNAL_ON_SENDING_NACKS] =
445       g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass),
446       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks),
447       g_signal_accumulator_first_wins, NULL, NULL,
448       G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY,
449       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
450
451   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
452       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
453           "The internal SSRC used for the session (deprecated)",
454           0, G_MAXUINT, 0,
455           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
456           GST_PARAM_DOC_SHOW_DEFAULT));
457
458   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
459       g_param_spec_object ("internal-source", "Internal Source",
460           "The internal source element of the session (deprecated)",
461           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
462
463   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
464       g_param_spec_double ("bandwidth", "Bandwidth",
465           "The bandwidth of the session in bits per second (0 for auto-discover)",
466           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
467           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
468
469   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
470       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
471           "The fraction of the bandwidth used for RTCP in bits per second (or as a real fraction of the RTP bandwidth if < 1)",
472           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
473           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474
475   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
476       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
477           "The RTCP bandwidth used for receivers in bits per second (-1 = default)",
478           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
479           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
480
481   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
482       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
483           "The RTCP bandwidth used for senders in bits per second (-1 = default)",
484           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
485           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
486
487   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
488       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
489           "The maximum size of the RTCP packets",
490           16, G_MAXINT16, DEFAULT_RTCP_MTU,
491           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
492
493   g_object_class_install_property (gobject_class, PROP_SDES,
494       g_param_spec_boxed ("sdes", "SDES",
495           "The SDES items of this session",
496           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
497           | GST_PARAM_DOC_SHOW_DEFAULT));
498
499   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
500       g_param_spec_uint ("num-sources", "Num Sources",
501           "The number of sources in the session", 0, G_MAXUINT,
502           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
503
504   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
505       g_param_spec_uint ("num-active-sources", "Num Active Sources",
506           "The number of active sources in the session", 0, G_MAXUINT,
507           DEFAULT_NUM_ACTIVE_SOURCES,
508           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
509   /**
510    * RTPSource:sources
511    *
512    * Get a GValue Array of all sources in the session.
513    *
514    * ## Getting the #RTPSources of a session
515    *
516    * ``` C
517    * {
518    *   GValueArray *arr;
519    *   GValue *val;
520    *   guint i;
521    *
522    *   g_object_get (sess, "sources", &arr, NULL);
523    *
524    *   for (i = 0; i < arr->n_values; i++) {
525    *     RTPSource *source;
526    *
527    *     val = g_value_array_get_nth (arr, i);
528    *     source = g_value_get_object (val);
529    *   }
530    *   g_value_array_free (arr);
531    * }
532    * ```
533    */
534   g_object_class_install_property (gobject_class, PROP_SOURCES,
535       g_param_spec_boxed ("sources", "Sources",
536           "An array of all known sources in the session",
537           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
538
539   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
540       g_param_spec_boolean ("favor-new", "Favor new sources",
541           "Resolve SSRC conflict in favor of new sources", FALSE,
542           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
543
544   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
545       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
546           "Minimum interval between Regular RTCP packet (in ns)",
547           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
548           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
549
550   g_object_class_install_property (gobject_class,
551       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
552       g_param_spec_uint64 ("rtcp-feedback-retention-window",
553           "RTCP Feedback retention window",
554           "Duration during which RTCP Feedback packets are retained (in ns)",
555           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
556           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
557
558   g_object_class_install_property (gobject_class,
559       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
560       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
561           "RTCP Immediate Feedback threshold",
562           "The maximum number of members of a RTP session for which immediate"
563           " feedback is used (DEPRECATED: has no effect and is not needed)",
564           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
565           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
566
567   g_object_class_install_property (gobject_class, PROP_PROBATION,
568       g_param_spec_uint ("probation", "Number of probations",
569           "Consecutive packet sequence numbers to accept the source",
570           0, G_MAXUINT, DEFAULT_PROBATION,
571           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
572
573   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
574       g_param_spec_uint ("max-dropout-time", "Max dropout time",
575           "The maximum time (milliseconds) of missing packets tolerated.",
576           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
577           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
578
579   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
580       g_param_spec_uint ("max-misorder-time", "Max misorder time",
581           "The maximum time (milliseconds) of misordered packets tolerated.",
582           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
583           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
584
585   /**
586    * RTPSession:stats:
587    *
588    * Various session statistics. This property returns a GstStructure
589    * with name application/x-rtp-session-stats with the following fields:
590    *
591    * * "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
592    *      dropped (due to bandwidth constraints)
593    * *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
594    * *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
595    * *  "source-stats"    G_TYPE_BOXED  GValueArray of #RTPSource:stats for all
596    *      RTP sources (Since 1.8)
597    *
598    * Since: 1.4
599    */
600   g_object_class_install_property (gobject_class, PROP_STATS,
601       g_param_spec_boxed ("stats", "Statistics",
602           "Various statistics", GST_TYPE_STRUCTURE,
603           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
604
605   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
606       g_param_spec_enum ("rtp-profile", "RTP Profile",
607           "RTP profile to use for this session", GST_TYPE_RTP_PROFILE,
608           DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
609
610   g_object_class_install_property (gobject_class, PROP_RTCP_REDUCED_SIZE,
611       g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size",
612           "Use Reduced Size RTCP for feedback packets",
613           DEFAULT_RTCP_REDUCED_SIZE,
614           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
615
616   /**
617    * RTPSession:disable-sr-timestamp:
618    *
619    * Whether sender reports should be timestamped.
620    *
621    * Since: 1.16
622    */
623   g_object_class_install_property (gobject_class,
624       PROP_RTCP_DISABLE_SR_TIMESTAMP,
625       g_param_spec_boolean ("disable-sr-timestamp",
626           "Disable Sender Report Timestamp",
627           "Whether sender reports should be timestamped",
628           DEFAULT_RTCP_DISABLE_SR_TIMESTAMP,
629           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
630
631   /**
632    * RTPSession:twcc-feedback-interval:
633    *
634    * The interval to send TWCC reports on.
635    * This overrides the default behavior of sending reports
636    * based on marker-bits.
637    *
638    * Since: 1.20
639    */
640   g_object_class_install_property (gobject_class,
641       PROP_TWCC_FEEDBACK_INTERVAL,
642       g_param_spec_uint64 ("twcc-feedback-interval",
643           "TWCC Feedback Interval",
644           "The interval to send TWCC reports on",
645           0, G_MAXUINT64, DEFAULT_TWCC_FEEDBACK_INTERVAL,
646           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
647
648   klass->get_source_by_ssrc =
649       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
650   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
651
652   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
653 }
654
655 static void
656 rtp_session_init (RTPSession * sess)
657 {
658   gint i;
659   gchar *str;
660
661   g_mutex_init (&sess->lock);
662   sess->key = g_random_int ();
663   sess->mask_idx = 0;
664   sess->mask = 0;
665
666   /* TODO: We currently only use the first hash table but this is the
667    * beginning of an implementation for RFC2762
668    for (i = 0; i < 32; i++) {
669    */
670   for (i = 0; i < 1; i++) {
671     sess->ssrcs[i] =
672         g_hash_table_new_full (NULL, NULL, NULL,
673         (GDestroyNotify) g_object_unref);
674   }
675
676   rtp_stats_init_defaults (&sess->stats);
677   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
678   rtp_stats_set_min_interval (&sess->stats,
679       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
680
681   sess->recalc_bandwidth = TRUE;
682   sess->bandwidth = DEFAULT_BANDWIDTH;
683   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
684   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
685   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
686
687   /* default UDP header length */
688   sess->header_len = UDP_IP_HEADER_OVERHEAD;
689   sess->mtu = DEFAULT_RTCP_MTU;
690
691   sess->probation = DEFAULT_PROBATION;
692   sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
693   sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
694
695   /* some default SDES entries */
696   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
697
698   /* we do not want to leak details like the username or hostname here */
699   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
700   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
701   g_free (str);
702
703 #if 0
704   /* we do not want to leak the user's real name here */
705   str = g_strdup_printf ("Anon%u", g_random_int ());
706   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
707   g_free (str);
708 #endif
709
710   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
711
712   /* this is the SSRC we suggest */
713   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
714   sess->internal_ssrc_set = FALSE;
715
716   sess->first_rtcp = TRUE;
717   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
718   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
719   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
720   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
721
722   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
723   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
724   sess->rtcp_immediate_feedback_threshold =
725       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
726   sess->rtp_profile = DEFAULT_RTP_PROFILE;
727   sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
728   sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
729
730   sess->is_doing_ptp = TRUE;
731
732   sess->twcc = rtp_twcc_manager_new (sess->mtu);
733   sess->twcc_stats = rtp_twcc_stats_new ();
734 }
735
736 static void
737 rtp_session_finalize (GObject * object)
738 {
739   RTPSession *sess;
740   gint i;
741
742   sess = RTP_SESSION_CAST (object);
743
744   gst_structure_free (sess->sdes);
745
746   g_list_free_full (sess->conflicting_addresses,
747       (GDestroyNotify) rtp_conflicting_address_free);
748
749   /* TODO: Change this again when implementing RFC 2762
750    * for (i = 0; i < 32; i++)
751    */
752   for (i = 0; i < 1; i++)
753     g_hash_table_destroy (sess->ssrcs[i]);
754
755   g_object_unref (sess->twcc);
756   rtp_twcc_stats_free (sess->twcc_stats);
757
758   g_mutex_clear (&sess->lock);
759
760   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
761 }
762
763 static void
764 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
765 {
766   GValue value = { 0 };
767
768   g_value_init (&value, RTP_TYPE_SOURCE);
769   g_value_take_object (&value, source);
770   /* copies the value */
771   g_value_array_append (arr, &value);
772 }
773
774 static GValueArray *
775 rtp_session_create_sources (RTPSession * sess)
776 {
777   GValueArray *res;
778   guint size;
779
780   RTP_SESSION_LOCK (sess);
781   /* get number of elements in the table */
782   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
783   /* create the result value array */
784   res = g_value_array_new (size);
785
786   /* and copy all values into the array */
787   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
788   RTP_SESSION_UNLOCK (sess);
789
790   return res;
791 }
792
793 static void
794 create_source_stats (gpointer key, RTPSource * source, GValueArray * arr)
795 {
796   GValue *value;
797   GstStructure *s;
798
799   g_object_get (source, "stats", &s, NULL);
800
801   g_value_array_append (arr, NULL);
802   value = g_value_array_get_nth (arr, arr->n_values - 1);
803   g_value_init (value, GST_TYPE_STRUCTURE);
804   g_value_take_boxed (value, s);
805 }
806
807 static GstStructure *
808 rtp_session_create_stats (RTPSession * sess)
809 {
810   GstStructure *s;
811   GValueArray *source_stats;
812   GValue source_stats_v = G_VALUE_INIT;
813   guint size;
814
815   RTP_SESSION_LOCK (sess);
816   s = gst_structure_new ("application/x-rtp-session-stats",
817       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
818       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
819       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
820
821   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
822   source_stats = g_value_array_new (size);
823   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
824       (GHFunc) create_source_stats, source_stats);
825   RTP_SESSION_UNLOCK (sess);
826
827   g_value_init (&source_stats_v, G_TYPE_VALUE_ARRAY);
828   g_value_take_boxed (&source_stats_v, source_stats);
829   gst_structure_take_value (s, "source-stats", &source_stats_v);
830
831   return s;
832 }
833
834 static void
835 rtp_session_set_property (GObject * object, guint prop_id,
836     const GValue * value, GParamSpec * pspec)
837 {
838   RTPSession *sess;
839
840   sess = RTP_SESSION (object);
841
842   switch (prop_id) {
843     case PROP_INTERNAL_SSRC:
844       RTP_SESSION_LOCK (sess);
845       sess->suggested_ssrc = g_value_get_uint (value);
846       sess->internal_ssrc_set = TRUE;
847       sess->internal_ssrc_from_caps_or_property = TRUE;
848       RTP_SESSION_UNLOCK (sess);
849       if (sess->callbacks.reconfigure)
850         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
851       break;
852     case PROP_BANDWIDTH:
853       RTP_SESSION_LOCK (sess);
854       sess->bandwidth = g_value_get_double (value);
855       sess->recalc_bandwidth = TRUE;
856       RTP_SESSION_UNLOCK (sess);
857       break;
858     case PROP_RTCP_FRACTION:
859       RTP_SESSION_LOCK (sess);
860       sess->rtcp_bandwidth = g_value_get_double (value);
861       sess->recalc_bandwidth = TRUE;
862       RTP_SESSION_UNLOCK (sess);
863       break;
864     case PROP_RTCP_RR_BANDWIDTH:
865       RTP_SESSION_LOCK (sess);
866       sess->rtcp_rr_bandwidth = g_value_get_int (value);
867       sess->recalc_bandwidth = TRUE;
868       RTP_SESSION_UNLOCK (sess);
869       break;
870     case PROP_RTCP_RS_BANDWIDTH:
871       RTP_SESSION_LOCK (sess);
872       sess->rtcp_rs_bandwidth = g_value_get_int (value);
873       sess->recalc_bandwidth = TRUE;
874       RTP_SESSION_UNLOCK (sess);
875       break;
876     case PROP_RTCP_MTU:
877       sess->mtu = g_value_get_uint (value);
878       rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu);
879       break;
880     case PROP_SDES:
881       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
882       break;
883     case PROP_FAVOR_NEW:
884       sess->favor_new = g_value_get_boolean (value);
885       break;
886     case PROP_RTCP_MIN_INTERVAL:
887       rtp_stats_set_min_interval (&sess->stats,
888           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
889       /* trigger reconsideration */
890       RTP_SESSION_LOCK (sess);
891       sess->next_rtcp_check_time = 0;
892       RTP_SESSION_UNLOCK (sess);
893       if (sess->callbacks.reconsider)
894         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
895       break;
896     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
897       sess->rtcp_feedback_retention_window = g_value_get_uint64 (value);
898       break;
899     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
900       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
901       break;
902     case PROP_PROBATION:
903       sess->probation = g_value_get_uint (value);
904       break;
905     case PROP_MAX_DROPOUT_TIME:
906       sess->max_dropout_time = g_value_get_uint (value);
907       break;
908     case PROP_MAX_MISORDER_TIME:
909       sess->max_misorder_time = g_value_get_uint (value);
910       break;
911     case PROP_RTP_PROFILE:
912       sess->rtp_profile = g_value_get_enum (value);
913       /* trigger reconsideration */
914       RTP_SESSION_LOCK (sess);
915       sess->next_rtcp_check_time = 0;
916       RTP_SESSION_UNLOCK (sess);
917       if (sess->callbacks.reconsider)
918         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
919       break;
920     case PROP_RTCP_REDUCED_SIZE:
921       sess->reduced_size_rtcp = g_value_get_boolean (value);
922       break;
923     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
924       sess->timestamp_sender_reports = !g_value_get_boolean (value);
925       break;
926     case PROP_TWCC_FEEDBACK_INTERVAL:
927       rtp_twcc_manager_set_feedback_interval (sess->twcc,
928           g_value_get_uint64 (value));
929       break;
930     default:
931       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
932       break;
933   }
934 }
935
936 static void
937 rtp_session_get_property (GObject * object, guint prop_id,
938     GValue * value, GParamSpec * pspec)
939 {
940   RTPSession *sess;
941
942   sess = RTP_SESSION (object);
943
944   switch (prop_id) {
945     case PROP_INTERNAL_SSRC:
946       g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL));
947       break;
948     case PROP_INTERNAL_SOURCE:
949       /* FIXME, return a random source */
950       g_value_set_object (value, NULL);
951       break;
952     case PROP_BANDWIDTH:
953       g_value_set_double (value, sess->bandwidth);
954       break;
955     case PROP_RTCP_FRACTION:
956       g_value_set_double (value, sess->rtcp_bandwidth);
957       break;
958     case PROP_RTCP_RR_BANDWIDTH:
959       g_value_set_int (value, sess->rtcp_rr_bandwidth);
960       break;
961     case PROP_RTCP_RS_BANDWIDTH:
962       g_value_set_int (value, sess->rtcp_rs_bandwidth);
963       break;
964     case PROP_RTCP_MTU:
965       g_value_set_uint (value, sess->mtu);
966       break;
967     case PROP_SDES:
968       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
969       break;
970     case PROP_NUM_SOURCES:
971       g_value_set_uint (value, rtp_session_get_num_sources (sess));
972       break;
973     case PROP_NUM_ACTIVE_SOURCES:
974       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
975       break;
976     case PROP_SOURCES:
977       g_value_take_boxed (value, rtp_session_create_sources (sess));
978       break;
979     case PROP_FAVOR_NEW:
980       g_value_set_boolean (value, sess->favor_new);
981       break;
982     case PROP_RTCP_MIN_INTERVAL:
983       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
984       break;
985     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
986       g_value_set_uint64 (value, sess->rtcp_feedback_retention_window);
987       break;
988     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
989       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
990       break;
991     case PROP_PROBATION:
992       g_value_set_uint (value, sess->probation);
993       break;
994     case PROP_MAX_DROPOUT_TIME:
995       g_value_set_uint (value, sess->max_dropout_time);
996       break;
997     case PROP_MAX_MISORDER_TIME:
998       g_value_set_uint (value, sess->max_misorder_time);
999       break;
1000     case PROP_STATS:
1001       g_value_take_boxed (value, rtp_session_create_stats (sess));
1002       break;
1003     case PROP_RTP_PROFILE:
1004       g_value_set_enum (value, sess->rtp_profile);
1005       break;
1006     case PROP_RTCP_REDUCED_SIZE:
1007       g_value_set_boolean (value, sess->reduced_size_rtcp);
1008       break;
1009     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
1010       g_value_set_boolean (value, !sess->timestamp_sender_reports);
1011       break;
1012     case PROP_TWCC_FEEDBACK_INTERVAL:
1013       g_value_set_uint64 (value,
1014           rtp_twcc_manager_get_feedback_interval (sess->twcc));
1015       break;
1016     default:
1017       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1018       break;
1019   }
1020 }
1021
1022 static void
1023 on_new_ssrc (RTPSession * sess, RTPSource * source)
1024 {
1025   g_object_ref (source);
1026   RTP_SESSION_UNLOCK (sess);
1027   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
1028   RTP_SESSION_LOCK (sess);
1029   g_object_unref (source);
1030 }
1031
1032 static void
1033 on_ssrc_collision (RTPSession * sess, RTPSource * source)
1034 {
1035   g_object_ref (source);
1036   RTP_SESSION_UNLOCK (sess);
1037   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
1038       source);
1039   RTP_SESSION_LOCK (sess);
1040   g_object_unref (source);
1041 }
1042
1043 static void
1044 on_ssrc_validated (RTPSession * sess, RTPSource * source)
1045 {
1046   g_object_ref (source);
1047   RTP_SESSION_UNLOCK (sess);
1048   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
1049       source);
1050   RTP_SESSION_LOCK (sess);
1051   g_object_unref (source);
1052 }
1053
1054 static void
1055 on_ssrc_active (RTPSession * sess, RTPSource * source)
1056 {
1057   g_object_ref (source);
1058   RTP_SESSION_UNLOCK (sess);
1059   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
1060   RTP_SESSION_LOCK (sess);
1061   g_object_unref (source);
1062 }
1063
1064 static void
1065 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
1066 {
1067   g_object_ref (source);
1068   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
1069   RTP_SESSION_UNLOCK (sess);
1070   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
1071   RTP_SESSION_LOCK (sess);
1072   g_object_unref (source);
1073 }
1074
1075 static void
1076 on_bye_ssrc (RTPSession * sess, RTPSource * source)
1077 {
1078   g_object_ref (source);
1079   RTP_SESSION_UNLOCK (sess);
1080   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
1081   RTP_SESSION_LOCK (sess);
1082   g_object_unref (source);
1083 }
1084
1085 static void
1086 on_bye_timeout (RTPSession * sess, RTPSource * source)
1087 {
1088   g_object_ref (source);
1089   RTP_SESSION_UNLOCK (sess);
1090   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
1091   RTP_SESSION_LOCK (sess);
1092   g_object_unref (source);
1093 }
1094
1095 static void
1096 on_timeout (RTPSession * sess, RTPSource * source)
1097 {
1098   g_object_ref (source);
1099   RTP_SESSION_UNLOCK (sess);
1100   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
1101   RTP_SESSION_LOCK (sess);
1102   g_object_unref (source);
1103 }
1104
1105 static void
1106 on_sender_timeout (RTPSession * sess, RTPSource * source)
1107 {
1108   g_object_ref (source);
1109   RTP_SESSION_UNLOCK (sess);
1110   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
1111       source);
1112   RTP_SESSION_LOCK (sess);
1113   g_object_unref (source);
1114 }
1115
1116 static void
1117 on_new_sender_ssrc (RTPSession * sess, RTPSource * source)
1118 {
1119   g_object_ref (source);
1120   RTP_SESSION_UNLOCK (sess);
1121   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
1122       source);
1123   RTP_SESSION_LOCK (sess);
1124   g_object_unref (source);
1125 }
1126
1127 static void
1128 on_sender_ssrc_active (RTPSession * sess, RTPSource * source)
1129 {
1130   g_object_ref (source);
1131   RTP_SESSION_UNLOCK (sess);
1132   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
1133       source);
1134   RTP_SESSION_LOCK (sess);
1135   g_object_unref (source);
1136 }
1137
1138 /**
1139  * rtp_session_new:
1140  *
1141  * Create a new session object.
1142  *
1143  * Returns: a new #RTPSession. g_object_unref() after usage.
1144  */
1145 RTPSession *
1146 rtp_session_new (void)
1147 {
1148   RTPSession *sess;
1149
1150   sess = g_object_new (RTP_TYPE_SESSION, NULL);
1151
1152   return sess;
1153 }
1154
1155 /**
1156  * rtp_session_reset:
1157  * @sess: an #RTPSession
1158  *
1159  * Reset the sources of @sess.
1160  */
1161 void
1162 rtp_session_reset (RTPSession * sess)
1163 {
1164   g_return_if_fail (RTP_IS_SESSION (sess));
1165
1166   /* remove all sources */
1167   g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]);
1168   sess->total_sources = 0;
1169   sess->stats.sender_sources = 0;
1170   sess->stats.internal_sender_sources = 0;
1171   sess->stats.internal_sources = 0;
1172   sess->stats.active_sources = 0;
1173
1174   sess->generation = 0;
1175   sess->first_rtcp = TRUE;
1176   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
1177   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
1178   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
1179   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
1180   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
1181   sess->scheduled_bye = FALSE;
1182
1183   /* reset session stats */
1184   sess->stats.bye_members = 0;
1185   sess->stats.nacks_dropped = 0;
1186   sess->stats.nacks_sent = 0;
1187   sess->stats.nacks_received = 0;
1188
1189   sess->is_doing_ptp = TRUE;
1190
1191   g_list_free_full (sess->conflicting_addresses,
1192       (GDestroyNotify) rtp_conflicting_address_free);
1193   sess->conflicting_addresses = NULL;
1194 }
1195
1196 /**
1197  * rtp_session_set_callbacks:
1198  * @sess: an #RTPSession
1199  * @callbacks: callbacks to configure
1200  * @user_data: user data passed in the callbacks
1201  *
1202  * Configure a set of callbacks to be notified of actions.
1203  */
1204 void
1205 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
1206     gpointer user_data)
1207 {
1208   g_return_if_fail (RTP_IS_SESSION (sess));
1209
1210   if (callbacks->process_rtp) {
1211     sess->callbacks.process_rtp = callbacks->process_rtp;
1212     sess->process_rtp_user_data = user_data;
1213   }
1214   if (callbacks->send_rtp) {
1215     sess->callbacks.send_rtp = callbacks->send_rtp;
1216     sess->send_rtp_user_data = user_data;
1217   }
1218   if (callbacks->send_rtcp) {
1219     sess->callbacks.send_rtcp = callbacks->send_rtcp;
1220     sess->send_rtcp_user_data = user_data;
1221   }
1222   if (callbacks->sync_rtcp) {
1223     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
1224     sess->sync_rtcp_user_data = user_data;
1225   }
1226   if (callbacks->clock_rate) {
1227     sess->callbacks.clock_rate = callbacks->clock_rate;
1228     sess->clock_rate_user_data = user_data;
1229   }
1230   if (callbacks->reconsider) {
1231     sess->callbacks.reconsider = callbacks->reconsider;
1232     sess->reconsider_user_data = user_data;
1233   }
1234   if (callbacks->request_key_unit) {
1235     sess->callbacks.request_key_unit = callbacks->request_key_unit;
1236     sess->request_key_unit_user_data = user_data;
1237   }
1238   if (callbacks->request_time) {
1239     sess->callbacks.request_time = callbacks->request_time;
1240     sess->request_time_user_data = user_data;
1241   }
1242   if (callbacks->notify_nack) {
1243     sess->callbacks.notify_nack = callbacks->notify_nack;
1244     sess->notify_nack_user_data = user_data;
1245   }
1246   if (callbacks->notify_twcc) {
1247     sess->callbacks.notify_twcc = callbacks->notify_twcc;
1248     sess->notify_twcc_user_data = user_data;
1249   }
1250   if (callbacks->reconfigure) {
1251     sess->callbacks.reconfigure = callbacks->reconfigure;
1252     sess->reconfigure_user_data = user_data;
1253   }
1254   if (callbacks->notify_early_rtcp) {
1255     sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp;
1256     sess->notify_early_rtcp_user_data = user_data;
1257   }
1258 }
1259
1260 /**
1261  * rtp_session_set_process_rtp_callback:
1262  * @sess: an #RTPSession
1263  * @callback: callback to set
1264  * @user_data: user data passed in the callback
1265  *
1266  * Configure only the process_rtp callback to be notified of the process_rtp action.
1267  */
1268 void
1269 rtp_session_set_process_rtp_callback (RTPSession * sess,
1270     RTPSessionProcessRTP callback, gpointer user_data)
1271 {
1272   g_return_if_fail (RTP_IS_SESSION (sess));
1273
1274   sess->callbacks.process_rtp = callback;
1275   sess->process_rtp_user_data = user_data;
1276 }
1277
1278 /**
1279  * rtp_session_set_send_rtp_callback:
1280  * @sess: an #RTPSession
1281  * @callback: callback to set
1282  * @user_data: user data passed in the callback
1283  *
1284  * Configure only the send_rtp callback to be notified of the send_rtp action.
1285  */
1286 void
1287 rtp_session_set_send_rtp_callback (RTPSession * sess,
1288     RTPSessionSendRTP callback, gpointer user_data)
1289 {
1290   g_return_if_fail (RTP_IS_SESSION (sess));
1291
1292   sess->callbacks.send_rtp = callback;
1293   sess->send_rtp_user_data = user_data;
1294 }
1295
1296 /**
1297  * rtp_session_set_send_rtcp_callback:
1298  * @sess: an #RTPSession
1299  * @callback: callback to set
1300  * @user_data: user data passed in the callback
1301  *
1302  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
1303  */
1304 void
1305 rtp_session_set_send_rtcp_callback (RTPSession * sess,
1306     RTPSessionSendRTCP callback, gpointer user_data)
1307 {
1308   g_return_if_fail (RTP_IS_SESSION (sess));
1309
1310   sess->callbacks.send_rtcp = callback;
1311   sess->send_rtcp_user_data = user_data;
1312 }
1313
1314 /**
1315  * rtp_session_set_sync_rtcp_callback:
1316  * @sess: an #RTPSession
1317  * @callback: callback to set
1318  * @user_data: user data passed in the callback
1319  *
1320  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
1321  */
1322 void
1323 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
1324     RTPSessionSyncRTCP callback, gpointer user_data)
1325 {
1326   g_return_if_fail (RTP_IS_SESSION (sess));
1327
1328   sess->callbacks.sync_rtcp = callback;
1329   sess->sync_rtcp_user_data = user_data;
1330 }
1331
1332 /**
1333  * rtp_session_set_clock_rate_callback:
1334  * @sess: an #RTPSession
1335  * @callback: callback to set
1336  * @user_data: user data passed in the callback
1337  *
1338  * Configure only the clock_rate callback to be notified of the clock_rate action.
1339  */
1340 void
1341 rtp_session_set_clock_rate_callback (RTPSession * sess,
1342     RTPSessionClockRate callback, gpointer user_data)
1343 {
1344   g_return_if_fail (RTP_IS_SESSION (sess));
1345
1346   sess->callbacks.clock_rate = callback;
1347   sess->clock_rate_user_data = user_data;
1348 }
1349
1350 /**
1351  * rtp_session_set_reconsider_callback:
1352  * @sess: an #RTPSession
1353  * @callback: callback to set
1354  * @user_data: user data passed in the callback
1355  *
1356  * Configure only the reconsider callback to be notified of the reconsider action.
1357  */
1358 void
1359 rtp_session_set_reconsider_callback (RTPSession * sess,
1360     RTPSessionReconsider callback, gpointer user_data)
1361 {
1362   g_return_if_fail (RTP_IS_SESSION (sess));
1363
1364   sess->callbacks.reconsider = callback;
1365   sess->reconsider_user_data = user_data;
1366 }
1367
1368 /**
1369  * rtp_session_set_request_time_callback:
1370  * @sess: an #RTPSession
1371  * @callback: callback to set
1372  * @user_data: user data passed in the callback
1373  *
1374  * Configure only the request_time callback
1375  */
1376 void
1377 rtp_session_set_request_time_callback (RTPSession * sess,
1378     RTPSessionRequestTime callback, gpointer user_data)
1379 {
1380   g_return_if_fail (RTP_IS_SESSION (sess));
1381
1382   sess->callbacks.request_time = callback;
1383   sess->request_time_user_data = user_data;
1384 }
1385
1386 /**
1387  * rtp_session_set_bandwidth:
1388  * @sess: an #RTPSession
1389  * @bandwidth: the bandwidth allocated
1390  *
1391  * Set the session bandwidth in bits per second.
1392  */
1393 void
1394 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1395 {
1396   g_return_if_fail (RTP_IS_SESSION (sess));
1397
1398   RTP_SESSION_LOCK (sess);
1399   sess->stats.bandwidth = bandwidth;
1400   RTP_SESSION_UNLOCK (sess);
1401 }
1402
1403 /**
1404  * rtp_session_get_bandwidth:
1405  * @sess: an #RTPSession
1406  *
1407  * Get the session bandwidth.
1408  *
1409  * Returns: the session bandwidth.
1410  */
1411 gdouble
1412 rtp_session_get_bandwidth (RTPSession * sess)
1413 {
1414   gdouble result;
1415
1416   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1417
1418   RTP_SESSION_LOCK (sess);
1419   result = sess->stats.bandwidth;
1420   RTP_SESSION_UNLOCK (sess);
1421
1422   return result;
1423 }
1424
1425 /**
1426  * rtp_session_set_rtcp_fraction:
1427  * @sess: an #RTPSession
1428  * @bandwidth: the RTCP bandwidth
1429  *
1430  * Set the bandwidth in bits per second that should be used for RTCP
1431  * messages.
1432  */
1433 void
1434 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1435 {
1436   g_return_if_fail (RTP_IS_SESSION (sess));
1437
1438   RTP_SESSION_LOCK (sess);
1439   sess->stats.rtcp_bandwidth = bandwidth;
1440   RTP_SESSION_UNLOCK (sess);
1441 }
1442
1443 /**
1444  * rtp_session_get_rtcp_fraction:
1445  * @sess: an #RTPSession
1446  *
1447  * Get the session bandwidth used for RTCP.
1448  *
1449  * Returns: The bandwidth used for RTCP messages.
1450  */
1451 gdouble
1452 rtp_session_get_rtcp_fraction (RTPSession * sess)
1453 {
1454   gdouble result;
1455
1456   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1457
1458   RTP_SESSION_LOCK (sess);
1459   result = sess->stats.rtcp_bandwidth;
1460   RTP_SESSION_UNLOCK (sess);
1461
1462   return result;
1463 }
1464
1465 /**
1466  * rtp_session_get_sdes_struct:
1467  * @sess: an #RTSPSession
1468  *
1469  * Get the SDES data as a #GstStructure
1470  *
1471  * Returns: a GstStructure with SDES items for @sess. This function returns a
1472  * copy of the SDES structure, use gst_structure_free() after usage.
1473  */
1474 GstStructure *
1475 rtp_session_get_sdes_struct (RTPSession * sess)
1476 {
1477   GstStructure *result = NULL;
1478
1479   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1480
1481   RTP_SESSION_LOCK (sess);
1482   if (sess->sdes)
1483     result = gst_structure_copy (sess->sdes);
1484   RTP_SESSION_UNLOCK (sess);
1485
1486   return result;
1487 }
1488
1489 static void
1490 source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes)
1491 {
1492   rtp_source_set_sdes_struct (source, gst_structure_copy (sdes));
1493 }
1494
1495 /**
1496  * rtp_session_set_sdes_struct:
1497  * @sess: an #RTSPSession
1498  * @sdes: a #GstStructure
1499  *
1500  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1501  */
1502 void
1503 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1504 {
1505   g_return_if_fail (sdes);
1506   g_return_if_fail (RTP_IS_SESSION (sess));
1507
1508   RTP_SESSION_LOCK (sess);
1509   if (sess->sdes)
1510     gst_structure_free (sess->sdes);
1511   sess->sdes = gst_structure_copy (sdes);
1512
1513   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1514       (GHFunc) source_set_sdes, sess->sdes);
1515   RTP_SESSION_UNLOCK (sess);
1516 }
1517
1518 static GstFlowReturn
1519 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1520 {
1521   GstFlowReturn result = GST_FLOW_OK;
1522
1523   if (source->internal) {
1524     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1525
1526     RTP_SESSION_UNLOCK (session);
1527
1528     if (session->callbacks.send_rtp)
1529       result =
1530           session->callbacks.send_rtp (session, source, data,
1531           session->send_rtp_user_data);
1532     else {
1533       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1534     }
1535   } else {
1536     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1537     RTP_SESSION_UNLOCK (session);
1538
1539     if (session->callbacks.process_rtp)
1540       result =
1541           session->callbacks.process_rtp (session, source,
1542           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1543     else
1544       gst_buffer_unref (GST_BUFFER_CAST (data));
1545   }
1546   RTP_SESSION_LOCK (session);
1547
1548   return result;
1549 }
1550
1551 static gint
1552 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1553 {
1554   gint result;
1555
1556   RTP_SESSION_UNLOCK (session);
1557
1558   if (session->callbacks.clock_rate)
1559     result =
1560         session->callbacks.clock_rate (session, pt,
1561         session->clock_rate_user_data);
1562   else
1563     result = -1;
1564
1565   RTP_SESSION_LOCK (session);
1566
1567   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1568
1569   return result;
1570 }
1571
1572 static RTPSourceCallbacks callbacks = {
1573   (RTPSourcePushRTP) source_push_rtp,
1574   (RTPSourceClockRate) source_clock_rate,
1575 };
1576
1577
1578 /**
1579  * rtp_session_find_conflicting_address:
1580  * @session: The session the packet came in
1581  * @address: address to check for
1582  * @time: The time when the packet that is possibly in conflict arrived
1583  *
1584  * Checks if an address which has a conflict is already known. If it is
1585  * a known conflict, remember the time
1586  *
1587  * Returns: TRUE if it was a known conflict, FALSE otherwise
1588  */
1589 static gboolean
1590 rtp_session_find_conflicting_address (RTPSession * session,
1591     GSocketAddress * address, GstClockTime time)
1592 {
1593   return find_conflicting_address (session->conflicting_addresses, address,
1594       time);
1595 }
1596
1597 /**
1598  * rtp_session_add_conflicting_address:
1599  * @session: The session the packet came in
1600  * @address: address to remember
1601  * @time: The time when the packet that is in conflict arrived
1602  *
1603  * Adds a new conflict address
1604  */
1605 static void
1606 rtp_session_add_conflicting_address (RTPSession * sess,
1607     GSocketAddress * address, GstClockTime time)
1608 {
1609   sess->conflicting_addresses =
1610       add_conflicting_address (sess->conflicting_addresses, address, time);
1611 }
1612
1613 static void
1614 rtp_session_have_conflict (RTPSession * sess, RTPSource * source,
1615     GSocketAddress * address, GstClockTime current_time)
1616 {
1617   guint32 ssrc = rtp_source_get_ssrc (source);
1618
1619   /* Its a new collision, lets change our SSRC */
1620   rtp_session_add_conflicting_address (sess, address, current_time);
1621
1622   /* mark the source BYE */
1623   rtp_source_mark_bye (source, "SSRC Collision");
1624   /* if we were suggesting this SSRC, change to something else */
1625   if (sess->suggested_ssrc == ssrc) {
1626     sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1627     sess->internal_ssrc_set = TRUE;
1628   }
1629
1630   on_ssrc_collision (sess, source);
1631
1632   rtp_session_schedule_bye_locked (sess, current_time);
1633 }
1634
1635 static gboolean
1636 check_collision (RTPSession * sess, RTPSource * source,
1637     RTPPacketInfo * pinfo, gboolean rtp)
1638 {
1639   guint32 ssrc;
1640
1641   /* If we have no pinfo address, we can't do collision checking */
1642   if (!pinfo->address)
1643     return FALSE;
1644
1645   ssrc = rtp_source_get_ssrc (source);
1646
1647   if (!source->internal) {
1648     GSocketAddress *from;
1649
1650     /* This is not our local source, but lets check if two remote
1651      * source collide */
1652     if (rtp) {
1653       from = source->rtp_from;
1654     } else {
1655       from = source->rtcp_from;
1656     }
1657
1658     if (from) {
1659       if (__g_socket_address_equal (from, pinfo->address)) {
1660         /* Address is the same */
1661         return FALSE;
1662       } else {
1663         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1664         if (sess->favor_new) {
1665           if (rtp_source_find_conflicting_address (source,
1666                   pinfo->address, pinfo->current_time)) {
1667             gchar *buf1;
1668
1669             buf1 = __g_socket_address_to_string (pinfo->address);
1670             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1671                 buf1);
1672             g_free (buf1);
1673
1674             return TRUE;
1675           } else {
1676             gchar *buf1, *buf2;
1677
1678             /* Current address is not a known conflict, lets assume this is
1679              * a new source. Save old address in possible conflict list
1680              */
1681             rtp_source_add_conflicting_address (source, from,
1682                 pinfo->current_time);
1683
1684             buf1 = __g_socket_address_to_string (from);
1685             buf2 = __g_socket_address_to_string (pinfo->address);
1686
1687             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1688                 " saving old as known conflict", ssrc, buf1, buf2);
1689
1690             if (rtp)
1691               rtp_source_set_rtp_from (source, pinfo->address);
1692             else
1693               rtp_source_set_rtcp_from (source, pinfo->address);
1694
1695             g_free (buf1);
1696             g_free (buf2);
1697
1698             return FALSE;
1699           }
1700         } else {
1701           /* Don't need to save old addresses, we ignore new sources */
1702           return TRUE;
1703         }
1704       }
1705     } else {
1706       /* We don't already have a from address for RTP, just set it */
1707       if (rtp)
1708         rtp_source_set_rtp_from (source, pinfo->address);
1709       else
1710         rtp_source_set_rtcp_from (source, pinfo->address);
1711       return FALSE;
1712     }
1713
1714     /* FIXME: Log 3rd party collision somehow
1715      * Maybe should be done in upper layer, only the SDES can tell us
1716      * if its a collision or a loop
1717      */
1718   } else {
1719     /* This is sending with our ssrc, is it an address we already know */
1720     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1721             pinfo->current_time)) {
1722       /* Its a known conflict, its probably a loop, not a collision
1723        * lets just drop the incoming packet
1724        */
1725       GST_DEBUG ("Our packets are being looped back to us, dropping");
1726     } else {
1727       GST_DEBUG ("Collision for SSRC %x from new incoming packet,"
1728           " change our sender ssrc", ssrc);
1729
1730       rtp_session_have_conflict (sess, source, pinfo->address,
1731           pinfo->current_time);
1732     }
1733   }
1734
1735   return TRUE;
1736 }
1737
1738 typedef struct
1739 {
1740   gboolean is_doing_ptp;
1741   GSocketAddress *new_addr;
1742 } CompareAddrData;
1743
1744 /* check if the two given ip addr are the same (do not care about the port) */
1745 static gboolean
1746 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1747 {
1748   return
1749       g_inet_address_equal (g_inet_socket_address_get_address
1750       (G_INET_SOCKET_ADDRESS (a)),
1751       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1752 }
1753
1754 static void
1755 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1756     CompareAddrData * data)
1757 {
1758   /* only compare ip addr of remote sources which are also not closing */
1759   if (!source->internal && !source->closing && source->rtp_from) {
1760     /* look for the first rtp source */
1761     if (!data->new_addr)
1762       data->new_addr = source->rtp_from;
1763     /* compare current ip addr with the first one */
1764     else
1765       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1766   }
1767 }
1768
1769 static void
1770 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1771     CompareAddrData * data)
1772 {
1773   /* only compare ip addr of remote sources which are also not closing */
1774   if (!source->internal && !source->closing && source->rtcp_from) {
1775     /* look for the first rtcp source */
1776     if (!data->new_addr)
1777       data->new_addr = source->rtcp_from;
1778     else
1779       /* compare current ip addr with the first one */
1780       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1781   }
1782 }
1783
1784 /* loop over our non-internal source to know if the session
1785  * is doing point-to-point */
1786 static void
1787 session_update_ptp (RTPSession * sess)
1788 {
1789   /* to know if the session is doing point to point, the ip addr
1790    * of each non-internal (=remotes) source have to be compared
1791    * to each other.
1792    */
1793   gboolean is_doing_rtp_ptp;
1794   gboolean is_doing_rtcp_ptp;
1795   CompareAddrData data;
1796
1797   /* compare the first remote source's ip addr that receive rtp packets
1798    * with other remote rtp source.
1799    * it's enough because the session just needs to know if they are all
1800    * equals or not
1801    */
1802   data.is_doing_ptp = TRUE;
1803   data.new_addr = NULL;
1804   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1805       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1806   is_doing_rtp_ptp = data.is_doing_ptp;
1807
1808   /* same but about rtcp */
1809   data.is_doing_ptp = TRUE;
1810   data.new_addr = NULL;
1811   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1812       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1813   is_doing_rtcp_ptp = data.is_doing_ptp;
1814
1815   /* the session is doing point-to-point if all rtp remote have the same
1816    * ip addr and if all rtcp remote sources have the same ip addr */
1817   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1818
1819   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1820 }
1821
1822 static void
1823 add_source (RTPSession * sess, RTPSource * src)
1824 {
1825   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1826       GINT_TO_POINTER (src->ssrc), src);
1827   /* report the new source ASAP */
1828   src->generation = sess->generation;
1829   /* we have one more source now */
1830   sess->total_sources++;
1831   if (RTP_SOURCE_IS_ACTIVE (src))
1832     sess->stats.active_sources++;
1833   if (src->internal) {
1834     sess->stats.internal_sources++;
1835     if (!sess->internal_ssrc_from_caps_or_property
1836         && sess->suggested_ssrc != src->ssrc) {
1837       sess->suggested_ssrc = src->ssrc;
1838       sess->internal_ssrc_set = TRUE;
1839     }
1840   }
1841
1842   /* update point-to-point status */
1843   if (!src->internal)
1844     session_update_ptp (sess);
1845 }
1846
1847 static RTPSource *
1848 find_source (RTPSession * sess, guint32 ssrc)
1849 {
1850   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1851       GINT_TO_POINTER (ssrc));
1852 }
1853
1854 /* must be called with the session lock, the returned source needs to be
1855  * unreffed after usage. */
1856 static RTPSource *
1857 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1858     RTPPacketInfo * pinfo, gboolean rtp)
1859 {
1860   RTPSource *source;
1861
1862   source = find_source (sess, ssrc);
1863   if (source == NULL) {
1864     /* make new Source in probation and insert */
1865     source = rtp_source_new (ssrc);
1866
1867     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1868
1869     /* for RTP packets we need to set the source in probation. Receiving RTCP
1870      * packets of an SSRC, on the other hand, is a strong indication that we
1871      * are dealing with a valid source. */
1872     g_object_set (source, "probation", rtp ? sess->probation : 0,
1873         "max-dropout-time", sess->max_dropout_time, "max-misorder-time",
1874         sess->max_misorder_time, NULL);
1875
1876     /* store from address, if any */
1877     if (pinfo->address) {
1878       if (rtp)
1879         rtp_source_set_rtp_from (source, pinfo->address);
1880       else
1881         rtp_source_set_rtcp_from (source, pinfo->address);
1882     }
1883
1884     /* configure a callback on the source */
1885     rtp_source_set_callbacks (source, &callbacks, sess);
1886
1887     add_source (sess, source);
1888     *created = TRUE;
1889   } else {
1890     *created = FALSE;
1891     /* check for collision, this updates the address when not previously set */
1892     if (check_collision (sess, source, pinfo, rtp)) {
1893       return NULL;
1894     }
1895     /* Receiving RTCP packets of an SSRC is a strong indication that we
1896      * are dealing with a valid source. */
1897     if (!rtp)
1898       g_object_set (source, "probation", 0, NULL);
1899   }
1900   /* update last activity */
1901   source->last_activity = pinfo->current_time;
1902   if (rtp)
1903     source->last_rtp_activity = pinfo->current_time;
1904   g_object_ref (source);
1905
1906   return source;
1907 }
1908
1909 /* must be called with the session lock, the returned source needs to be
1910  * unreffed after usage. */
1911 static RTPSource *
1912 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1913     GstClockTime current_time)
1914 {
1915   RTPSource *source;
1916
1917   source = find_source (sess, ssrc);
1918   if (source == NULL) {
1919     /* make new internal Source and insert */
1920     source = rtp_source_new (ssrc);
1921
1922     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1923
1924     source->validated = TRUE;
1925     source->internal = TRUE;
1926     source->probation = FALSE;
1927     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1928     rtp_source_set_callbacks (source, &callbacks, sess);
1929
1930     add_source (sess, source);
1931     *created = TRUE;
1932   } else {
1933     *created = FALSE;
1934   }
1935   /* update last activity */
1936   if (current_time != GST_CLOCK_TIME_NONE) {
1937     source->last_activity = current_time;
1938     source->last_rtp_activity = current_time;
1939   }
1940   g_object_ref (source);
1941
1942   return source;
1943 }
1944
1945 /**
1946  * rtp_session_suggest_ssrc:
1947  * @sess: a #RTPSession
1948  * @is_random: if the suggested ssrc is random
1949  *
1950  * Suggest an unused SSRC in @sess.
1951  *
1952  * Returns: a free unused SSRC
1953  */
1954 guint32
1955 rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random)
1956 {
1957   guint32 result;
1958
1959   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1960
1961   RTP_SESSION_LOCK (sess);
1962   result = sess->suggested_ssrc;
1963   if (is_random)
1964     *is_random = !sess->internal_ssrc_set;
1965   RTP_SESSION_UNLOCK (sess);
1966
1967   return result;
1968 }
1969
1970 /**
1971  * rtp_session_add_source:
1972  * @sess: a #RTPSession
1973  * @src: #RTPSource to add
1974  *
1975  * Add @src to @session.
1976  *
1977  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1978  * existed in the session.
1979  */
1980 gboolean
1981 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1982 {
1983   gboolean result = FALSE;
1984   RTPSource *find;
1985
1986   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1987   g_return_val_if_fail (src != NULL, FALSE);
1988
1989   RTP_SESSION_LOCK (sess);
1990   find = find_source (sess, src->ssrc);
1991   if (find == NULL) {
1992     add_source (sess, src);
1993     result = TRUE;
1994   }
1995   RTP_SESSION_UNLOCK (sess);
1996
1997   return result;
1998 }
1999
2000 /**
2001  * rtp_session_get_num_sources:
2002  * @sess: an #RTPSession
2003  *
2004  * Get the number of sources in @sess.
2005  *
2006  * Returns: The number of sources in @sess.
2007  */
2008 guint
2009 rtp_session_get_num_sources (RTPSession * sess)
2010 {
2011   guint result;
2012
2013   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
2014
2015   RTP_SESSION_LOCK (sess);
2016   result = sess->total_sources;
2017   RTP_SESSION_UNLOCK (sess);
2018
2019   return result;
2020 }
2021
2022 /**
2023  * rtp_session_get_num_active_sources:
2024  * @sess: an #RTPSession
2025  *
2026  * Get the number of active sources in @sess. A source is considered active when
2027  * it has been validated and has not yet received a BYE RTCP message.
2028  *
2029  * Returns: The number of active sources in @sess.
2030  */
2031 guint
2032 rtp_session_get_num_active_sources (RTPSession * sess)
2033 {
2034   guint result;
2035
2036   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
2037
2038   RTP_SESSION_LOCK (sess);
2039   result = sess->stats.active_sources;
2040   RTP_SESSION_UNLOCK (sess);
2041
2042   return result;
2043 }
2044
2045 /**
2046  * rtp_session_get_source_by_ssrc:
2047  * @sess: an #RTPSession
2048  * @ssrc: an SSRC
2049  *
2050  * Find the source with @ssrc in @sess.
2051  *
2052  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
2053  * g_object_unref() after usage.
2054  */
2055 RTPSource *
2056 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
2057 {
2058   RTPSource *result;
2059
2060   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
2061
2062   RTP_SESSION_LOCK (sess);
2063   result = find_source (sess, ssrc);
2064   if (result != NULL)
2065     g_object_ref (result);
2066   RTP_SESSION_UNLOCK (sess);
2067
2068   return result;
2069 }
2070
2071 /* should be called with the SESSION lock */
2072 static guint32
2073 rtp_session_create_new_ssrc (RTPSession * sess)
2074 {
2075   guint32 ssrc;
2076
2077   while (TRUE) {
2078     ssrc = g_random_int ();
2079
2080     /* see if it exists in the session, we're done if it doesn't */
2081     if (find_source (sess, ssrc) == NULL)
2082       break;
2083   }
2084   return ssrc;
2085 }
2086
2087 static gboolean
2088 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
2089 {
2090   GstNetAddressMeta *meta;
2091
2092   /* get packet size including header overhead */
2093   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
2094   pinfo->packets++;
2095
2096   if (pinfo->rtp) {
2097     GstRTPBuffer rtp = { NULL };
2098
2099     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
2100       goto invalid_packet;
2101
2102     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
2103     if (idx == 0) {
2104       gint i;
2105
2106       /* only keep info for first buffer */
2107       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
2108       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
2109       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
2110       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2111       pinfo->marker = gst_rtp_buffer_get_marker (&rtp);
2112       /* copy available csrc */
2113       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
2114       for (i = 0; i < pinfo->csrc_count; i++)
2115         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
2116
2117       /* RTP header extensions */
2118       pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp,
2119           &pinfo->header_ext_bit_pattern);
2120     }
2121
2122     if (pinfo->ntp64_ext_id != 0 && pinfo->send && !pinfo->have_ntp64_ext) {
2123       guint8 *data;
2124       guint size;
2125
2126       /* Remember here that there is a 64-bit NTP header extension on this buffer
2127        * or any of the other buffers in the buffer list.
2128        * Later we update this after making the buffer(list) writable.
2129        */
2130       if ((gst_rtp_buffer_get_extension_onebyte_header (&rtp,
2131                   pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
2132               && size == 8)
2133           || (gst_rtp_buffer_get_extension_twobytes_header (&rtp, NULL,
2134                   pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
2135               && size == 8)) {
2136         pinfo->have_ntp64_ext = TRUE;
2137       }
2138     }
2139
2140     gst_rtp_buffer_unmap (&rtp);
2141   }
2142
2143   if (idx == 0) {
2144     /* for netbuffer we can store the IP address to check for collisions */
2145     meta = gst_buffer_get_net_address_meta (*buffer);
2146     if (pinfo->address)
2147       g_object_unref (pinfo->address);
2148     if (meta) {
2149       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
2150     } else {
2151       pinfo->address = NULL;
2152     }
2153   }
2154   return TRUE;
2155
2156   /* ERRORS */
2157 invalid_packet:
2158   {
2159     GST_DEBUG ("invalid RTP packet received");
2160     return FALSE;
2161   }
2162 }
2163
2164 /* update the RTPPacketInfo structure with the current time and other bits
2165  * about the current buffer we are handling.
2166  * This function is typically called when a validated packet is received.
2167  * This function should be called with the RTP_SESSION_LOCK
2168  */
2169 static gboolean
2170 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
2171     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
2172     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2173 {
2174   gboolean res;
2175
2176   pinfo->send = send;
2177   pinfo->rtp = rtp;
2178   pinfo->is_list = is_list;
2179   pinfo->data = data;
2180   pinfo->current_time = current_time;
2181   pinfo->running_time = running_time;
2182   pinfo->ntpnstime = ntpnstime;
2183   pinfo->header_len = sess->header_len;
2184   pinfo->bytes = 0;
2185   pinfo->payload_len = 0;
2186   pinfo->packets = 0;
2187   pinfo->marker = FALSE;
2188   pinfo->ntp64_ext_id = send ? sess->send_ntp64_ext_id : 0;
2189   pinfo->have_ntp64_ext = FALSE;
2190
2191   if (is_list) {
2192     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
2193     res =
2194         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
2195         pinfo);
2196     pinfo->arrival_time = GST_CLOCK_TIME_NONE;
2197   } else {
2198     GstBuffer *buffer = GST_BUFFER_CAST (data);
2199     res = update_packet (&buffer, 0, pinfo);
2200     pinfo->arrival_time = GST_BUFFER_DTS (buffer);
2201   }
2202
2203   return res;
2204 }
2205
2206 static void
2207 clean_packet_info (RTPPacketInfo * pinfo)
2208 {
2209   if (pinfo->address)
2210     g_object_unref (pinfo->address);
2211   if (pinfo->data) {
2212     gst_mini_object_unref (pinfo->data);
2213     pinfo->data = NULL;
2214   }
2215   if (pinfo->header_ext)
2216     g_bytes_unref (pinfo->header_ext);
2217 }
2218
2219 static gboolean
2220 source_update_active (RTPSession * sess, RTPSource * source,
2221     gboolean prevactive)
2222 {
2223   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
2224   guint32 ssrc = source->ssrc;
2225
2226   if (prevactive == active)
2227     return FALSE;
2228
2229   if (active) {
2230     sess->stats.active_sources++;
2231     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
2232         sess->stats.active_sources);
2233   } else {
2234     sess->stats.active_sources--;
2235     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
2236         sess->stats.active_sources);
2237   }
2238   return TRUE;
2239 }
2240
2241 static void
2242 process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
2243 {
2244   if (rtp_twcc_manager_recv_packet (sess->twcc, pinfo)) {
2245     RTP_SESSION_UNLOCK (sess);
2246
2247     /* TODO: find a better rational for this number, and possibly tune it based
2248        on factors like framerate / bandwidth etc */
2249     if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) {
2250       GST_INFO ("Could not schedule TWCC straight away");
2251     }
2252     RTP_SESSION_LOCK (sess);
2253   }
2254 }
2255
2256 static gboolean
2257 source_update_sender (RTPSession * sess, RTPSource * source,
2258     gboolean prevsender)
2259 {
2260   gboolean sender = RTP_SOURCE_IS_SENDER (source);
2261   guint32 ssrc = source->ssrc;
2262
2263   if (prevsender == sender)
2264     return FALSE;
2265
2266   if (sender) {
2267     sess->stats.sender_sources++;
2268     if (source->internal)
2269       sess->stats.internal_sender_sources++;
2270     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
2271         sess->stats.sender_sources);
2272   } else {
2273     sess->stats.sender_sources--;
2274     if (source->internal)
2275       sess->stats.internal_sender_sources--;
2276     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
2277         sess->stats.sender_sources);
2278   }
2279   return TRUE;
2280 }
2281
2282 /**
2283  * rtp_session_process_rtp:
2284  * @sess: and #RTPSession
2285  * @buffer: an RTP buffer
2286  * @current_time: the current system time
2287  * @running_time: the running_time of @buffer
2288  *
2289  * Process an RTP buffer in the session manager. This function takes ownership
2290  * of @buffer.
2291  *
2292  * Returns: a #GstFlowReturn.
2293  */
2294 GstFlowReturn
2295 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
2296     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2297 {
2298   GstFlowReturn result;
2299   guint32 ssrc;
2300   RTPSource *source;
2301   gboolean created;
2302   gboolean prevsender, prevactive;
2303   RTPPacketInfo pinfo = { 0, };
2304   guint64 oldrate;
2305
2306   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2307   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2308
2309   RTP_SESSION_LOCK (sess);
2310
2311   /* update pinfo stats */
2312   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
2313           current_time, running_time, ntpnstime)) {
2314     GST_DEBUG ("invalid RTP packet received");
2315     RTP_SESSION_UNLOCK (sess);
2316     return rtp_session_process_rtcp (sess, buffer, current_time, running_time,
2317         ntpnstime);
2318   }
2319
2320   ssrc = pinfo.ssrc;
2321
2322   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
2323   if (!source)
2324     goto collision;
2325
2326   prevsender = RTP_SOURCE_IS_SENDER (source);
2327   prevactive = RTP_SOURCE_IS_ACTIVE (source);
2328   oldrate = source->bitrate;
2329
2330   if (created)
2331     on_new_ssrc (sess, source);
2332
2333   /* let source process the packet */
2334   result = rtp_source_process_rtp (source, &pinfo);
2335   process_twcc_packet (sess, &pinfo);
2336
2337   /* source became active */
2338   if (source_update_active (sess, source, prevactive))
2339     on_ssrc_validated (sess, source);
2340
2341   source_update_sender (sess, source, prevsender);
2342
2343   if (oldrate != source->bitrate)
2344     sess->recalc_bandwidth = TRUE;
2345
2346
2347   if (source->validated) {
2348     gboolean created;
2349     gint i;
2350
2351     /* for validated sources, we add the CSRCs as well */
2352     for (i = 0; i < pinfo.csrc_count; i++) {
2353       guint32 csrc;
2354       RTPSource *csrc_src;
2355
2356       csrc = pinfo.csrcs[i];
2357
2358       /* get source */
2359       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
2360       if (!csrc_src)
2361         continue;
2362
2363       if (created) {
2364         GST_DEBUG ("created new CSRC: %08x", csrc);
2365         rtp_source_set_as_csrc (csrc_src);
2366         source_update_active (sess, csrc_src, FALSE);
2367         on_new_ssrc (sess, csrc_src);
2368       }
2369       g_object_unref (csrc_src);
2370     }
2371   }
2372   g_object_unref (source);
2373
2374   RTP_SESSION_UNLOCK (sess);
2375
2376   clean_packet_info (&pinfo);
2377
2378   return result;
2379
2380   /* ERRORS */
2381 collision:
2382   {
2383     RTP_SESSION_UNLOCK (sess);
2384     clean_packet_info (&pinfo);
2385     GST_DEBUG ("ignoring packet because its collisioning");
2386     return GST_FLOW_OK;
2387   }
2388 }
2389
2390 static void
2391 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2392     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2393 {
2394   guint count, i;
2395
2396   count = gst_rtcp_packet_get_rb_count (packet);
2397   for (i = 0; i < count; i++) {
2398     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2399     guint8 fractionlost;
2400     gint32 packetslost;
2401     RTPSource *src;
2402
2403     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2404         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2405
2406     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2407
2408     /* find our own source */
2409     src = find_source (sess, ssrc);
2410     if (src == NULL)
2411       continue;
2412
2413     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2414       /* only deal with report blocks for our session, we update the stats of
2415        * the sender of the RTCP message. We could also compare our stats against
2416        * the other sender to see if we are better or worse. */
2417       /* FIXME, need to keep track who the RB block is from */
2418       rtp_source_process_rb (source, ssrc, pinfo->ntpnstime, fractionlost,
2419           packetslost, exthighestseq, jitter, lsr, dlsr);
2420     }
2421   }
2422   on_ssrc_active (sess, source);
2423 }
2424
2425 /* A Sender report contains statistics about how the sender is doing. This
2426  * includes timing informataion such as the relation between RTP and NTP
2427  * timestamps and the number of packets/bytes it sent to us.
2428  *
2429  * In this report is also included a set of report blocks related to how this
2430  * sender is receiving data (in case we (or somebody else) is also sending stuff
2431  * to it). This info includes the packet loss, jitter and seqnum. It also
2432  * contains information to calculate the round trip time (LSR/DLSR).
2433  */
2434 static void
2435 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2436     RTPPacketInfo * pinfo, gboolean * do_sync)
2437 {
2438   guint32 senderssrc, rtptime, packet_count, octet_count;
2439   guint64 ntptime;
2440   RTPSource *source;
2441   gboolean created, prevsender;
2442
2443   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2444       &packet_count, &octet_count);
2445
2446   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2447       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2448
2449   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2450   if (!source)
2451     return;
2452
2453   /* skip non-bye packets for sources that are marked BYE */
2454   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2455     goto out;
2456
2457   /* don't try to do lip-sync for sources that sent a BYE */
2458   if (RTP_SOURCE_IS_MARKED_BYE (source))
2459     *do_sync = FALSE;
2460   else
2461     *do_sync = TRUE;
2462
2463   prevsender = RTP_SOURCE_IS_SENDER (source);
2464
2465   /* first update the source */
2466   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2467       packet_count, octet_count);
2468
2469   source_update_sender (sess, source, prevsender);
2470
2471   if (created)
2472     on_new_ssrc (sess, source);
2473
2474   rtp_session_process_rb (sess, source, packet, pinfo);
2475
2476 out:
2477   g_object_unref (source);
2478 }
2479
2480 /* A receiver report contains statistics about how a receiver is doing. It
2481  * includes stuff like packet loss, jitter and the seqnum it received last. It
2482  * also contains info to calculate the round trip time.
2483  *
2484  * We are only interested in how the sender of this report is doing wrt to us.
2485  */
2486 static void
2487 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2488     RTPPacketInfo * pinfo)
2489 {
2490   guint32 senderssrc;
2491   RTPSource *source;
2492   gboolean created;
2493
2494   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2495
2496   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2497
2498   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2499   if (!source)
2500     return;
2501
2502   /* skip non-bye packets for sources that are marked BYE */
2503   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2504     goto out;
2505
2506   if (created)
2507     on_new_ssrc (sess, source);
2508
2509   rtp_session_process_rb (sess, source, packet, pinfo);
2510
2511 out:
2512   g_object_unref (source);
2513 }
2514
2515 /* Get SDES items and store them in the SSRC */
2516 static void
2517 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2518     RTPPacketInfo * pinfo)
2519 {
2520   guint items, i, j;
2521   gboolean more_items, more_entries;
2522
2523   items = gst_rtcp_packet_sdes_get_item_count (packet);
2524   GST_DEBUG ("got SDES packet with %d items", items);
2525
2526   more_items = gst_rtcp_packet_sdes_first_item (packet);
2527   i = 0;
2528   while (more_items) {
2529     guint32 ssrc;
2530     gboolean changed, created, prevactive;
2531     RTPSource *source;
2532     GstStructure *sdes;
2533
2534     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2535
2536     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2537
2538     changed = FALSE;
2539
2540     /* find src, no probation when dealing with RTCP */
2541     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2542     if (!source)
2543       return;
2544
2545     /* skip non-bye packets for sources that are marked BYE */
2546     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2547       goto next;
2548
2549     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2550
2551     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2552     j = 0;
2553     while (more_entries) {
2554       GstRTCPSDESType type;
2555       guint8 len;
2556       guint8 *data;
2557       gchar *name;
2558       gchar *value;
2559
2560       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2561
2562       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2563           data);
2564
2565       if (type == GST_RTCP_SDES_PRIV) {
2566         name = g_strndup ((const gchar *) &data[1], data[0]);
2567         len -= data[0] + 1;
2568         data += data[0] + 1;
2569       } else {
2570         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2571       }
2572
2573       value = g_strndup ((const gchar *) data, len);
2574
2575       if (g_utf8_validate (value, -1, NULL)) {
2576         gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2577       } else {
2578         GST_WARNING ("ignore SDES field %s with non-utf8 data %s", name, value);
2579       }
2580
2581       g_free (name);
2582       g_free (value);
2583
2584       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2585       j++;
2586     }
2587
2588     /* takes ownership of sdes */
2589     changed = rtp_source_set_sdes_struct (source, sdes);
2590
2591     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2592     source->validated = TRUE;
2593
2594     if (created)
2595       on_new_ssrc (sess, source);
2596
2597     /* source became active */
2598     if (source_update_active (sess, source, prevactive))
2599       on_ssrc_validated (sess, source);
2600
2601     if (changed)
2602       on_ssrc_sdes (sess, source);
2603
2604   next:
2605     g_object_unref (source);
2606
2607     more_items = gst_rtcp_packet_sdes_next_item (packet);
2608     i++;
2609   }
2610 }
2611
2612 /* BYE is sent when a client leaves the session
2613  */
2614 static void
2615 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2616     RTPPacketInfo * pinfo)
2617 {
2618   guint count, i;
2619   gchar *reason;
2620   gboolean reconsider = FALSE;
2621
2622   reason = gst_rtcp_packet_bye_get_reason (packet);
2623   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2624
2625   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2626   for (i = 0; i < count; i++) {
2627     guint32 ssrc;
2628     RTPSource *source;
2629     gboolean prevactive, prevsender;
2630     guint pmembers, members;
2631
2632     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2633     GST_DEBUG ("SSRC: %08x", ssrc);
2634
2635     /* find src and mark bye, no probation when dealing with RTCP */
2636     source = find_source (sess, ssrc);
2637     if (!source || source->internal) {
2638       GST_DEBUG ("Ignoring suspicious BYE packet (reason: %s)",
2639           !source ? "can't find source" : "has internal source SSRC");
2640       break;
2641     }
2642
2643     /* store time for when we need to time out this source */
2644     source->bye_time = pinfo->current_time;
2645
2646     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2647     prevsender = RTP_SOURCE_IS_SENDER (source);
2648
2649     /* mark the source BYE */
2650     rtp_source_mark_bye (source, reason);
2651
2652     pmembers = sess->stats.active_sources;
2653
2654     source_update_active (sess, source, prevactive);
2655     source_update_sender (sess, source, prevsender);
2656
2657     members = sess->stats.active_sources;
2658
2659     if (!sess->scheduled_bye && members < pmembers) {
2660       /* some members went away since the previous timeout estimate.
2661        * Perform reverse reconsideration but only when we are not scheduling a
2662        * BYE ourselves. */
2663       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2664           pinfo->current_time < sess->next_rtcp_check_time) {
2665         GstClockTime time_remaining;
2666
2667         /* Scale our next RTCP check time according to the change of numbers
2668          * of members. But only if a) this is the first RTCP, or b) this is not
2669          * a feedback session, or c) this is a feedback session but we schedule
2670          * for every RTCP interval (aka no t-rr-interval set).
2671          *
2672          * FIXME: a) and b) are not great as we will possibly go below Tmin
2673          * for non-feedback profiles and in case of a) below
2674          * Tmin/t-rr-interval in any case.
2675          */
2676         if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE ||
2677             !(sess->rtp_profile == GST_RTP_PROFILE_AVPF
2678                 || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) ||
2679             sess->next_rtcp_check_time - sess->last_rtcp_send_time ==
2680             sess->last_rtcp_interval) {
2681           time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2682           sess->next_rtcp_check_time =
2683               gst_util_uint64_scale (time_remaining, members, pmembers);
2684           sess->next_rtcp_check_time += pinfo->current_time;
2685         }
2686         sess->last_rtcp_interval =
2687             gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers);
2688
2689         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2690             GST_TIME_ARGS (sess->next_rtcp_check_time));
2691
2692         /* mark pending reconsider. We only want to signal the reconsideration
2693          * once after we handled all the source in the bye packet */
2694         reconsider = TRUE;
2695       }
2696     }
2697
2698     on_bye_ssrc (sess, source);
2699   }
2700   if (reconsider) {
2701     RTP_SESSION_UNLOCK (sess);
2702     /* notify app of reconsideration */
2703     if (sess->callbacks.reconsider)
2704       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2705     RTP_SESSION_LOCK (sess);
2706   }
2707
2708   g_free (reason);
2709 }
2710
2711 static void
2712 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2713     RTPPacketInfo * pinfo)
2714 {
2715   GST_DEBUG ("received APP");
2716
2717   if (g_signal_has_handler_pending (sess,
2718           rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, TRUE)) {
2719     GstBuffer *data_buffer = NULL;
2720     guint16 data_length;
2721     gchar name[5];
2722
2723     data_length = gst_rtcp_packet_app_get_data_length (packet) * 4;
2724     if (data_length > 0) {
2725       guint8 *data = gst_rtcp_packet_app_get_data (packet);
2726       data_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2727           GST_BUFFER_COPY_MEMORY, data - packet->rtcp->map.data, data_length);
2728       GST_BUFFER_PTS (data_buffer) = pinfo->running_time;
2729     }
2730
2731     memcpy (name, gst_rtcp_packet_app_get_name (packet), 4);
2732     name[4] = '\0';
2733
2734     RTP_SESSION_UNLOCK (sess);
2735     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_APP_RTCP], 0,
2736         gst_rtcp_packet_app_get_subtype (packet),
2737         gst_rtcp_packet_app_get_ssrc (packet), name, data_buffer);
2738     RTP_SESSION_LOCK (sess);
2739
2740     if (data_buffer)
2741       gst_buffer_unref (data_buffer);
2742   }
2743 }
2744
2745 static gboolean
2746 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2747     guint32 media_ssrc, gboolean fir, GstClockTime current_time)
2748 {
2749   guint32 round_trip = 0;
2750
2751   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
2752       &round_trip);
2753
2754   if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2755     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2756         GST_SECOND, 65536);
2757
2758     /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP
2759      * packets with erroneous values resulting in crazy high RTT. */
2760     if (round_trip_in_ns > 5 * GST_SECOND)
2761       round_trip_in_ns = GST_SECOND / 2;
2762
2763     if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) {
2764       GST_DEBUG ("Ignoring %s request from %X because one was send without one "
2765           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2766           fir ? "FIR" : "PLI", rtp_source_get_ssrc (src),
2767           GST_TIME_ARGS (current_time - src->last_keyframe_request),
2768           GST_TIME_ARGS (round_trip_in_ns));
2769       return FALSE;
2770     }
2771   }
2772
2773   src->last_keyframe_request = current_time;
2774
2775   GST_LOG ("received %s request from %X about %X %p(%p)", fir ? "FIR" : "PLI",
2776       rtp_source_get_ssrc (src), media_ssrc, sess->callbacks.process_rtp,
2777       sess->callbacks.request_key_unit);
2778
2779   RTP_SESSION_UNLOCK (sess);
2780   sess->callbacks.request_key_unit (sess, media_ssrc, fir,
2781       sess->request_key_unit_user_data);
2782   RTP_SESSION_LOCK (sess);
2783
2784   return TRUE;
2785 }
2786
2787 static void
2788 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2789     guint32 media_ssrc, GstClockTime current_time)
2790 {
2791   RTPSource *src;
2792
2793   if (!sess->callbacks.request_key_unit)
2794     return;
2795
2796   src = find_source (sess, sender_ssrc);
2797   if (src == NULL) {
2798     /* try to find a src with media_ssrc instead */
2799     src = find_source (sess, media_ssrc);
2800     if (src == NULL)
2801       return;
2802   }
2803
2804   rtp_session_request_local_key_unit (sess, src, media_ssrc, FALSE,
2805       current_time);
2806 }
2807
2808 static void
2809 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2810     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2811     GstClockTime current_time)
2812 {
2813   RTPSource *src;
2814   guint32 ssrc;
2815   guint position = 0;
2816   gboolean our_request = FALSE;
2817
2818   if (!sess->callbacks.request_key_unit)
2819     return;
2820
2821   if (fci_length < 8)
2822     return;
2823
2824   src = find_source (sess, sender_ssrc);
2825
2826   /* Hack because Google fails to set the sender_ssrc correctly */
2827   if (!src && sender_ssrc == 1) {
2828     GHashTableIter iter;
2829
2830     /* we can't find the source if there are multiple */
2831     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2832       return;
2833
2834     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2835     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2836       if (!src->internal && rtp_source_is_sender (src))
2837         break;
2838       src = NULL;
2839     }
2840   }
2841   if (!src)
2842     return;
2843
2844   for (position = 0; position < fci_length; position += 8) {
2845     guint8 *data = fci_data + position;
2846     RTPSource *own;
2847
2848     ssrc = GST_READ_UINT32_BE (data);
2849
2850     own = find_source (sess, ssrc);
2851     if (own == NULL)
2852       continue;
2853
2854     if (own->internal) {
2855       our_request = TRUE;
2856       break;
2857     }
2858   }
2859   if (!our_request)
2860     return;
2861
2862   rtp_session_request_local_key_unit (sess, src, media_ssrc, TRUE,
2863       current_time);
2864 }
2865
2866 static void
2867 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2868     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2869     GstClockTime current_time)
2870 {
2871   sess->stats.nacks_received++;
2872
2873   if (!sess->callbacks.notify_nack)
2874     return;
2875
2876   while (fci_length > 0) {
2877     guint16 seqnum, blp;
2878
2879     seqnum = GST_READ_UINT16_BE (fci_data);
2880     blp = GST_READ_UINT16_BE (fci_data + 2);
2881
2882     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2883
2884     RTP_SESSION_UNLOCK (sess);
2885     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2886         sess->notify_nack_user_data);
2887     RTP_SESSION_LOCK (sess);
2888
2889     fci_data += 4;
2890     fci_length -= 4;
2891   }
2892 }
2893
2894 static void
2895 rtp_session_process_sr_req (RTPSession * sess, guint32 sender_ssrc,
2896     guint32 media_ssrc)
2897 {
2898   RTPSource *src;
2899
2900   /* Request a new SR in feedback profiles ASAP */
2901   if (sess->rtp_profile != GST_RTP_PROFILE_AVPF
2902       && sess->rtp_profile != GST_RTP_PROFILE_SAVPF)
2903     return;
2904
2905   src = find_source (sess, sender_ssrc);
2906   /* Our own RTCP packet */
2907   if (src && src->internal)
2908     return;
2909
2910   src = find_source (sess, media_ssrc);
2911   /* Not an SSRC we're producing */
2912   if (!src || !src->internal)
2913     return;
2914
2915   GST_DEBUG_OBJECT (sess, "Handling RTCP-SR-REQ");
2916   /* FIXME: 5s max_delay hard-coded here as we have to give some
2917    * high enough value */
2918   sess->sr_req_pending = TRUE;
2919   rtp_session_send_rtcp (sess, 5 * GST_SECOND);
2920 }
2921
2922 static void
2923 rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc,
2924     guint32 media_ssrc, guint8 * fci_data, guint fci_length)
2925 {
2926   GArray *twcc_packets;
2927   GstStructure *twcc_packets_s;
2928   GstStructure *twcc_stats_s;
2929
2930   twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc,
2931       fci_data, fci_length * sizeof (guint32));
2932   if (twcc_packets == NULL)
2933     return;
2934
2935   twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets);
2936   twcc_stats_s =
2937       rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets);
2938
2939   GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s);
2940   GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s);
2941
2942   g_array_unref (twcc_packets);
2943
2944   RTP_SESSION_UNLOCK (sess);
2945   if (sess->callbacks.notify_twcc)
2946     sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s,
2947         sess->notify_twcc_user_data);
2948   RTP_SESSION_LOCK (sess);
2949 }
2950
2951 static void
2952 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2953     RTPPacketInfo * pinfo, GstClockTime current_time)
2954 {
2955   GstRTCPType type;
2956   GstRTCPFBType fbtype;
2957   guint32 sender_ssrc, media_ssrc;
2958   guint8 *fci_data;
2959   guint fci_length;
2960   RTPSource *src;
2961
2962   /* The feedback packet must include both sender SSRC and media SSRC */
2963   if (packet->length < 2)
2964     return;
2965
2966   type = gst_rtcp_packet_get_type (packet);
2967   fbtype = gst_rtcp_packet_fb_get_type (packet);
2968   sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2969   media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2970
2971   src = find_source (sess, media_ssrc);
2972
2973   /* skip non-bye packets for sources that are marked BYE */
2974   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
2975     return;
2976
2977   if (src)
2978     g_object_ref (src);
2979
2980   fci_data = gst_rtcp_packet_fb_get_fci (packet);
2981   fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32);
2982
2983   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2984       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2985
2986   if (g_signal_has_handler_pending (sess,
2987           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2988     GstBuffer *fci_buffer = NULL;
2989
2990     if (fci_length > 0) {
2991       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2992           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2993           fci_length);
2994       GST_BUFFER_PTS (fci_buffer) = pinfo->running_time;
2995     }
2996
2997     RTP_SESSION_UNLOCK (sess);
2998     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2999         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
3000     RTP_SESSION_LOCK (sess);
3001
3002     if (fci_buffer)
3003       gst_buffer_unref (fci_buffer);
3004   }
3005
3006   if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) {
3007     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
3008   }
3009
3010   if ((src && src->internal) ||
3011       /* PSFB FIR puts the media ssrc inside the FCI */
3012       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) ||
3013       /* TWCC is for all sources, so a single media-ssrc is not enough */
3014       (type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) {
3015     switch (type) {
3016       case GST_RTCP_TYPE_PSFB:
3017         switch (fbtype) {
3018           case GST_RTCP_PSFB_TYPE_PLI:
3019             if (src)
3020               src->stats.recv_pli_count++;
3021             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
3022                 current_time);
3023             break;
3024           case GST_RTCP_PSFB_TYPE_FIR:
3025             if (src)
3026               src->stats.recv_fir_count++;
3027             rtp_session_process_fir (sess, sender_ssrc, media_ssrc, fci_data,
3028                 fci_length, current_time);
3029             break;
3030           default:
3031             break;
3032         }
3033         break;
3034       case GST_RTCP_TYPE_RTPFB:
3035         switch (fbtype) {
3036           case GST_RTCP_RTPFB_TYPE_NACK:
3037             if (src)
3038               src->stats.recv_nack_count++;
3039             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
3040                 fci_data, fci_length, current_time);
3041             break;
3042           case GST_RTCP_RTPFB_TYPE_RTCP_SR_REQ:
3043             rtp_session_process_sr_req (sess, sender_ssrc, media_ssrc);
3044             break;
3045           case GST_RTCP_RTPFB_TYPE_TWCC:
3046             rtp_session_process_twcc (sess, sender_ssrc, media_ssrc,
3047                 fci_data, fci_length);
3048             break;
3049           default:
3050             break;
3051         }
3052       default:
3053         break;
3054     }
3055   }
3056
3057   if (src)
3058     g_object_unref (src);
3059 }
3060
3061 /**
3062  * rtp_session_process_rtcp:
3063  * @sess: and #RTPSession
3064  * @buffer: an RTCP buffer
3065  * @current_time: the current system time
3066  * @ntpnstime: the current NTP time in nanoseconds
3067  *
3068  * Process an RTCP buffer in the session manager. This function takes ownership
3069  * of @buffer.
3070  *
3071  * Returns: a #GstFlowReturn.
3072  */
3073 GstFlowReturn
3074 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
3075     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
3076 {
3077   GstRTCPPacket packet;
3078   gboolean more, is_bye = FALSE, do_sync = FALSE;
3079   RTPPacketInfo pinfo = { 0, };
3080   GstFlowReturn result = GST_FLOW_OK;
3081   GstRTCPBuffer rtcp = { NULL, };
3082
3083   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3084   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3085
3086   if (!gst_rtcp_buffer_validate_reduced (buffer))
3087     goto invalid_packet;
3088
3089   GST_DEBUG ("received RTCP packet");
3090
3091   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
3092       buffer);
3093
3094   RTP_SESSION_LOCK (sess);
3095   /* update pinfo stats */
3096   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
3097       running_time, ntpnstime);
3098
3099   /* start processing the compound packet */
3100   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3101   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
3102   while (more) {
3103     GstRTCPType type;
3104
3105     type = gst_rtcp_packet_get_type (&packet);
3106
3107     switch (type) {
3108       case GST_RTCP_TYPE_SR:
3109         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
3110         break;
3111       case GST_RTCP_TYPE_RR:
3112         rtp_session_process_rr (sess, &packet, &pinfo);
3113         break;
3114       case GST_RTCP_TYPE_SDES:
3115         rtp_session_process_sdes (sess, &packet, &pinfo);
3116         break;
3117       case GST_RTCP_TYPE_BYE:
3118         is_bye = TRUE;
3119         /* don't try to attempt lip-sync anymore for streams with a BYE */
3120         do_sync = FALSE;
3121         rtp_session_process_bye (sess, &packet, &pinfo);
3122         break;
3123       case GST_RTCP_TYPE_APP:
3124         rtp_session_process_app (sess, &packet, &pinfo);
3125         break;
3126       case GST_RTCP_TYPE_RTPFB:
3127       case GST_RTCP_TYPE_PSFB:
3128         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
3129         break;
3130       case GST_RTCP_TYPE_XR:
3131         /* FIXME: This block is added to downgrade warning level.
3132          * Once the parser is implemented, it should be replaced with
3133          * a proper process function. */
3134         GST_DEBUG ("got RTCP XR packet, but ignored");
3135         break;
3136       default:
3137         GST_WARNING ("got unknown RTCP packet type: %d", type);
3138         break;
3139     }
3140     more = gst_rtcp_packet_move_to_next (&packet);
3141   }
3142
3143   gst_rtcp_buffer_unmap (&rtcp);
3144
3145   /* if we are scheduling a BYE, we only want to count bye packets, else we
3146    * count everything */
3147   if (sess->scheduled_bye && is_bye) {
3148     sess->bye_stats.bye_members++;
3149     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
3150   }
3151
3152   /* keep track of average packet size */
3153   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3154
3155   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
3156       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3157   RTP_SESSION_UNLOCK (sess);
3158
3159   pinfo.data = NULL;
3160   clean_packet_info (&pinfo);
3161
3162   /* notify caller of sr packets in the callback */
3163   if (do_sync && sess->callbacks.sync_rtcp) {
3164     result = sess->callbacks.sync_rtcp (sess, buffer,
3165         sess->sync_rtcp_user_data);
3166   } else
3167     gst_buffer_unref (buffer);
3168
3169   return result;
3170
3171   /* ERRORS */
3172 invalid_packet:
3173   {
3174     GST_DEBUG ("invalid RTCP packet received");
3175     gst_buffer_unref (buffer);
3176     return GST_FLOW_OK;
3177   }
3178 }
3179
3180 static guint8
3181 _get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
3182 {
3183   guint i;
3184   guint8 extmap_id = 0;
3185   guint n_fields = gst_structure_n_fields (s);
3186
3187   for (i = 0; i < n_fields; i++) {
3188     const gchar *field_name = gst_structure_nth_field_name (s, i);
3189     if (g_str_has_prefix (field_name, "extmap-")) {
3190       const gchar *str = gst_structure_get_string (s, field_name);
3191       if (str && g_strcmp0 (str, ext_name) == 0) {
3192         gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
3193         if (id > 0 && id < 15) {
3194           extmap_id = id;
3195           break;
3196         }
3197       }
3198     }
3199   }
3200   return extmap_id;
3201 }
3202
3203 /**
3204  * rtp_session_update_send_caps:
3205  * @sess: an #RTPSession
3206  * @caps: a #GstCaps
3207  *
3208  * Update the caps of the sender in the rtp session.
3209  */
3210 void
3211 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
3212 {
3213   GstStructure *s;
3214   guint ssrc;
3215
3216   g_return_if_fail (RTP_IS_SESSION (sess));
3217   g_return_if_fail (GST_IS_CAPS (caps));
3218
3219   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
3220
3221   s = gst_caps_get_structure (caps, 0);
3222
3223   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
3224     RTPSource *source;
3225     gboolean created;
3226
3227     RTP_SESSION_LOCK (sess);
3228     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3229     sess->suggested_ssrc = ssrc;
3230     sess->internal_ssrc_set = TRUE;
3231     sess->internal_ssrc_from_caps_or_property = TRUE;
3232     if (source) {
3233       rtp_source_update_caps (source, caps);
3234
3235       if (created)
3236         on_new_sender_ssrc (sess, source);
3237
3238       g_object_unref (source);
3239     }
3240
3241     if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
3242       source =
3243           obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3244       if (source) {
3245         rtp_source_update_caps (source, caps);
3246
3247         if (created)
3248           on_new_sender_ssrc (sess, source);
3249
3250         g_object_unref (source);
3251       }
3252     }
3253     RTP_SESSION_UNLOCK (sess);
3254   } else {
3255     sess->internal_ssrc_from_caps_or_property = FALSE;
3256   }
3257
3258   sess->send_ntp64_ext_id =
3259       _get_extmap_id_for_attribute (s,
3260       GST_RTP_HDREXT_BASE GST_RTP_HDREXT_NTP_64);
3261
3262   rtp_twcc_manager_parse_send_ext_id (sess->twcc, s);
3263 }
3264
3265 static void
3266 update_ntp64_header_ext_data (RTPPacketInfo * pinfo, GstBuffer * buffer)
3267 {
3268   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
3269
3270   if (gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtp)) {
3271     guint16 bits;
3272     guint8 *data;
3273     guint wordlen;
3274
3275     if (gst_rtp_buffer_get_extension_data (&rtp, &bits, (gpointer *) & data,
3276             &wordlen)) {
3277       gsize len = wordlen * 4;
3278
3279       /* One-byte header */
3280       if (bits == 0xBEDE) {
3281         /* One-byte header extension */
3282         while (TRUE) {
3283           guint8 ext_id, ext_len;
3284
3285           if (len < 1)
3286             break;
3287
3288           ext_id = GST_READ_UINT8 (data) >> 4;
3289           ext_len = (GST_READ_UINT8 (data) & 0xF) + 1;
3290           data += 1;
3291           len -= 1;
3292           if (ext_id == 0) {
3293             /* Skip padding */
3294             continue;
3295           } else if (ext_id == 15) {
3296             /* Stop parsing */
3297             break;
3298           }
3299
3300           /* extension doesn't fit into the header */
3301           if (ext_len > len)
3302             break;
3303
3304           if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
3305             if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
3306               guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
3307                   G_GUINT64_CONSTANT (1) << 32,
3308                   GST_SECOND);
3309
3310               GST_WRITE_UINT64_BE (data, ntptime);
3311             } else {
3312               /* Replace extension with padding */
3313               memset (data - 1, 0, 1 + ext_len);
3314             }
3315           }
3316
3317           /* skip to the next extension */
3318           data += ext_len;
3319           len -= ext_len;
3320         }
3321       } else if ((bits >> 4) == 0x100) {
3322         /* Two-byte header extension */
3323
3324         while (TRUE) {
3325           guint8 ext_id, ext_len;
3326
3327           if (len < 1)
3328             break;
3329
3330           ext_id = GST_READ_UINT8 (data);
3331           data += 1;
3332           len -= 1;
3333           if (ext_id == 0) {
3334             /* Skip padding */
3335             continue;
3336           }
3337
3338           ext_len = GST_READ_UINT8 (data);
3339           data += 1;
3340           len -= 1;
3341
3342           /* extension doesn't fit into the header */
3343           if (ext_len > len)
3344             break;
3345
3346           if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
3347             if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
3348               guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
3349                   G_GUINT64_CONSTANT (1) << 32,
3350                   GST_SECOND);
3351
3352               GST_WRITE_UINT64_BE (data, ntptime);
3353             } else {
3354               /* Replace extension with padding */
3355               memset (data - 2, 0, 2 + ext_len);
3356             }
3357           }
3358
3359           /* skip to the next extension */
3360           data += ext_len;
3361           len -= ext_len;
3362         }
3363       }
3364     }
3365     gst_rtp_buffer_unmap (&rtp);
3366   }
3367 }
3368
3369 static void
3370 update_ntp64_header_ext (RTPPacketInfo * pinfo)
3371 {
3372   /* Early return if we don't know the header extension id or the packets
3373    * don't contain the header extension */
3374   if (pinfo->ntp64_ext_id == 0 || !pinfo->have_ntp64_ext)
3375     return;
3376
3377   /* If no NTP time is known then the header extension will be replaced with
3378    * padding, otherwise it will be updated */
3379   GST_TRACE
3380       ("Updating NTP-64 header extension for SSRC %08x packet with RTP time %u and running time %"
3381       GST_TIME_FORMAT " to %" GST_TIME_FORMAT, pinfo->ssrc, pinfo->rtptime,
3382       GST_TIME_ARGS (pinfo->running_time), GST_TIME_ARGS (pinfo->ntpnstime));
3383
3384   if (GST_IS_BUFFER_LIST (pinfo->data)) {
3385     GstBufferList *list;
3386     guint i = 0;
3387
3388     pinfo->data = gst_buffer_list_make_writable (pinfo->data);
3389
3390     list = GST_BUFFER_LIST (pinfo->data);
3391
3392     for (i = 0; i < gst_buffer_list_length (list); i++) {
3393       GstBuffer *buffer = gst_buffer_list_get_writable (list, i);
3394
3395       update_ntp64_header_ext_data (pinfo, buffer);
3396     }
3397   } else {
3398     pinfo->data = gst_buffer_make_writable (pinfo->data);
3399     update_ntp64_header_ext_data (pinfo, pinfo->data);
3400   }
3401 }
3402
3403 /**
3404  * rtp_session_send_rtp:
3405  * @sess: an #RTPSession
3406  * @data: pointer to either an RTP buffer or a list of RTP buffers
3407  * @is_list: TRUE when @data is a buffer list
3408  * @current_time: the current system time
3409  * @running_time: the running time of @data
3410  *
3411  * Send the RTP data (a buffer or buffer list) in the session manager. This
3412  * function takes ownership of @data.
3413  *
3414  * Returns: a #GstFlowReturn.
3415  */
3416 GstFlowReturn
3417 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
3418     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
3419 {
3420   GstFlowReturn result;
3421   RTPSource *source;
3422   gboolean prevsender;
3423   guint64 oldrate;
3424   RTPPacketInfo pinfo = { 0, };
3425   gboolean created;
3426
3427   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3428   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
3429
3430   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
3431
3432   RTP_SESSION_LOCK (sess);
3433   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
3434           current_time, running_time, ntpnstime))
3435     goto invalid_packet;
3436
3437   /* Update any 64-bit NTP header extensions with the actual NTP time here */
3438   update_ntp64_header_ext (&pinfo);
3439   rtp_twcc_manager_send_packet (sess->twcc, &pinfo);
3440
3441   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
3442   if (created)
3443     on_new_sender_ssrc (sess, source);
3444
3445   if (!source->internal) {
3446     GSocketAddress *from;
3447
3448     if (source->rtp_from)
3449       from = source->rtp_from;
3450     else
3451       from = source->rtcp_from;
3452     if (from) {
3453       if (rtp_session_find_conflicting_address (sess, from, current_time)) {
3454         /* Its a known conflict, its probably a loop, not a collision
3455          * lets just drop the incoming packet
3456          */
3457         GST_LOG ("Our packets are being looped back to us, ignoring collision");
3458       } else {
3459         GST_DEBUG ("Collision for SSRC %x, change our sender ssrc", pinfo.ssrc);
3460
3461         rtp_session_have_conflict (sess, source, from, current_time);
3462       }
3463     } else {
3464       GST_LOG ("Ignoring collision on sent SSRC %x because remote source"
3465           " doesn't have an address", pinfo.ssrc);
3466     }
3467
3468     /* the the sending source is not internal, we have to drop the packet,
3469        or else we will end up receving it ourselves! */
3470     goto collision;
3471   }
3472
3473   prevsender = RTP_SOURCE_IS_SENDER (source);
3474   oldrate = source->bitrate;
3475
3476   /* we use our own source to send */
3477   result = rtp_source_send_rtp (source, &pinfo);
3478
3479   source_update_sender (sess, source, prevsender);
3480
3481   if (oldrate != source->bitrate)
3482     sess->recalc_bandwidth = TRUE;
3483   RTP_SESSION_UNLOCK (sess);
3484
3485   g_object_unref (source);
3486   clean_packet_info (&pinfo);
3487
3488   return result;
3489
3490 invalid_packet:
3491   {
3492     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
3493     RTP_SESSION_UNLOCK (sess);
3494     GST_DEBUG ("invalid RTP packet received");
3495     return GST_FLOW_OK;
3496   }
3497 collision:
3498   {
3499     g_object_unref (source);
3500     clean_packet_info (&pinfo);
3501     RTP_SESSION_UNLOCK (sess);
3502     GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
3503         pinfo.ssrc);
3504     return GST_FLOW_OK;
3505   }
3506 }
3507
3508 static void
3509 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
3510 {
3511   *bandwidth += source->bitrate;
3512 }
3513
3514 /* must be called with session lock */
3515 static GstClockTime
3516 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
3517     gboolean first)
3518 {
3519   GstClockTime result;
3520   RTPSessionStats *stats;
3521
3522   /* recalculate bandwidth when it changed */
3523   if (sess->recalc_bandwidth) {
3524     gdouble bandwidth;
3525
3526     if (sess->bandwidth > 0)
3527       bandwidth = sess->bandwidth;
3528     else {
3529       /* If it is <= 0, then try to estimate the actual bandwidth */
3530       bandwidth = 0;
3531
3532       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3533           (GHFunc) add_bitrates, &bandwidth);
3534     }
3535     if (bandwidth < RTP_STATS_BANDWIDTH)
3536       bandwidth = RTP_STATS_BANDWIDTH;
3537
3538     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
3539         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
3540
3541     sess->recalc_bandwidth = FALSE;
3542   }
3543
3544   if (sess->scheduled_bye) {
3545     stats = &sess->bye_stats;
3546     result = rtp_stats_calculate_bye_interval (stats);
3547   } else {
3548     session_update_ptp (sess);
3549
3550     stats = &sess->stats;
3551     result = rtp_stats_calculate_rtcp_interval (stats,
3552         stats->internal_sender_sources > 0, sess->rtp_profile,
3553         sess->is_doing_ptp, first);
3554   }
3555
3556   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
3557       GST_TIME_ARGS (result), first);
3558
3559   if (!deterministic && result != GST_CLOCK_TIME_NONE)
3560     result = rtp_stats_add_rtcp_jitter (stats, result);
3561
3562   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
3563
3564   return result;
3565 }
3566
3567 static void
3568 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
3569 {
3570   if (source->internal)
3571     rtp_source_mark_bye (source, reason);
3572 }
3573
3574 /**
3575  * rtp_session_mark_all_bye:
3576  * @sess: an #RTPSession
3577  * @reason: a reason
3578  *
3579  * Mark all internal sources of the session as BYE with @reason.
3580  */
3581 void
3582 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
3583 {
3584   g_return_if_fail (RTP_IS_SESSION (sess));
3585
3586   RTP_SESSION_LOCK (sess);
3587   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3588       (GHFunc) source_mark_bye, (gpointer) reason);
3589   RTP_SESSION_UNLOCK (sess);
3590 }
3591
3592 /* Stop the current @sess and schedule a BYE message for the other members.
3593  * One must have the session lock to call this function
3594  */
3595 static GstFlowReturn
3596 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
3597 {
3598   GstFlowReturn result = GST_FLOW_OK;
3599   GstClockTime interval;
3600
3601   /* nothing to do it we already scheduled bye */
3602   if (sess->scheduled_bye)
3603     goto done;
3604
3605   /* we schedule BYE now */
3606   sess->scheduled_bye = TRUE;
3607   /* at least one member wants to send a BYE */
3608   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
3609   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
3610   sess->bye_stats.bye_members = 1;
3611   sess->first_rtcp = TRUE;
3612
3613   /* reschedule transmission */
3614   sess->last_rtcp_send_time = current_time;
3615   sess->last_rtcp_check_time = current_time;
3616   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3617
3618   if (interval != GST_CLOCK_TIME_NONE)
3619     sess->next_rtcp_check_time = current_time + interval;
3620   else
3621     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
3622   sess->last_rtcp_interval = interval;
3623
3624   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
3625       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
3626
3627   RTP_SESSION_UNLOCK (sess);
3628   /* notify app of reconsideration */
3629   if (sess->callbacks.reconsider)
3630     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3631   RTP_SESSION_LOCK (sess);
3632 done:
3633
3634   return result;
3635 }
3636
3637 /**
3638  * rtp_session_schedule_bye:
3639  * @sess: an #RTPSession
3640  * @current_time: the current system time
3641  *
3642  * Schedule a BYE message for all sources marked as BYE in @sess.
3643  *
3644  * Returns: a #GstFlowReturn.
3645  */
3646 GstFlowReturn
3647 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
3648 {
3649   GstFlowReturn result;
3650
3651   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3652
3653   RTP_SESSION_LOCK (sess);
3654   result = rtp_session_schedule_bye_locked (sess, current_time);
3655   RTP_SESSION_UNLOCK (sess);
3656
3657   return result;
3658 }
3659
3660 /**
3661  * rtp_session_next_timeout:
3662  * @sess: an #RTPSession
3663  * @current_time: the current system time
3664  *
3665  * Get the next time we should perform session maintenance tasks.
3666  *
3667  * Returns: a time when rtp_session_on_timeout() should be called with the
3668  * current system time.
3669  */
3670 GstClockTime
3671 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
3672 {
3673   GstClockTime result, interval = 0;
3674
3675   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
3676
3677   RTP_SESSION_LOCK (sess);
3678
3679   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3680     GST_DEBUG ("have early rtcp time");
3681     result = sess->next_early_rtcp_time;
3682     goto early_exit;
3683   }
3684
3685   result = sess->next_rtcp_check_time;
3686
3687   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3688       ", next time: %" GST_TIME_FORMAT,
3689       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3690
3691   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
3692     GST_DEBUG ("take current time as base");
3693     /* our previous check time expired, start counting from the current time
3694      * again. */
3695     result = current_time;
3696   }
3697
3698   if (sess->scheduled_bye) {
3699     if (sess->bye_stats.active_sources >= 50) {
3700       GST_DEBUG ("reconsider BYE, more than 50 sources");
3701       /* reconsider BYE if members >= 50 */
3702       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3703       sess->last_rtcp_interval = interval;
3704     }
3705   } else {
3706     if (sess->first_rtcp) {
3707       GST_DEBUG ("first RTCP packet");
3708       /* we are called for the first time */
3709       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3710       sess->last_rtcp_interval = interval;
3711     } else if (sess->next_rtcp_check_time < current_time) {
3712       GST_DEBUG ("old check time expired, getting new timeout");
3713       /* get a new timeout when we need to */
3714       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
3715       sess->last_rtcp_interval = interval;
3716
3717       if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
3718               || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
3719           && interval != GST_CLOCK_TIME_NONE) {
3720         /* Apply the rules from RFC 4585 section 3.5.3 */
3721         if (sess->stats.min_interval != 0) {
3722           GstClockTime T_rr_current_interval = g_random_double_range (0.5,
3723               1.5) * sess->stats.min_interval * GST_SECOND;
3724
3725           if (T_rr_current_interval > interval) {
3726             GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
3727                 " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
3728                 GST_TIME_ARGS (interval));
3729             interval = T_rr_current_interval;
3730           }
3731         }
3732       }
3733     }
3734   }
3735
3736   if (interval != GST_CLOCK_TIME_NONE)
3737     result += interval;
3738   else
3739     result = GST_CLOCK_TIME_NONE;
3740
3741   sess->next_rtcp_check_time = result;
3742
3743 early_exit:
3744
3745   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3746       ", next time: %" GST_TIME_FORMAT,
3747       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3748   RTP_SESSION_UNLOCK (sess);
3749
3750   return result;
3751 }
3752
3753 typedef struct
3754 {
3755   RTPSource *source;
3756   gboolean is_bye;
3757   GstBuffer *buffer;
3758 } ReportOutput;
3759
3760 typedef struct
3761 {
3762   GstRTCPBuffer rtcpbuf;
3763   RTPSession *sess;
3764   RTPSource *source;
3765   guint num_to_report;
3766   gboolean have_fir;
3767   gboolean have_pli;
3768   gboolean have_nack;
3769   GstBuffer *rtcp;
3770   GstClockTime current_time;
3771   guint64 ntpnstime;
3772   GstClockTime running_time;
3773   GstClockTime interval;
3774   GstRTCPPacket packet;
3775   gboolean has_sdes;
3776   gboolean is_early;
3777   gboolean may_suppress;
3778   GQueue output;
3779   guint nacked_seqnums;
3780 } ReportData;
3781
3782 static void
3783 session_start_rtcp (RTPSession * sess, ReportData * data)
3784 {
3785   GstRTCPPacket *packet = &data->packet;
3786   RTPSource *own = data->source;
3787   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3788
3789   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
3790   data->has_sdes = FALSE;
3791
3792   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3793
3794   if (RTP_SOURCE_IS_SENDER (own) && (!data->is_early || !sess->reduced_size_rtcp
3795           || sess->sr_req_pending)) {
3796     guint64 ntptime;
3797     guint32 rtptime;
3798     guint32 packet_count, octet_count;
3799
3800     sess->sr_req_pending = FALSE;
3801
3802     /* we are a sender, create SR */
3803     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3804     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3805
3806     /* get latest stats */
3807     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3808         &ntptime, &rtptime, &packet_count, &octet_count);
3809     /* store stats */
3810     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3811         packet_count, octet_count);
3812
3813     /* fill in sender report info */
3814     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3815         sess->timestamp_sender_reports ? ntptime : 0,
3816         sess->timestamp_sender_reports ? rtptime : 0,
3817         packet_count, octet_count);
3818   } else if (!data->is_early || !sess->reduced_size_rtcp) {
3819     /* we are only receiver, create RR */
3820     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3821     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3822     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3823   }
3824 }
3825
3826 /* construct a Sender or Receiver Report */
3827 static void
3828 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3829 {
3830   RTPSession *sess = data->sess;
3831   GstRTCPPacket *packet = &data->packet;
3832   guint8 fractionlost;
3833   gint32 packetslost;
3834   guint32 exthighestseq, jitter;
3835   guint32 lsr, dlsr;
3836
3837   /* don't report for sources in future generations */
3838   if (((gint16) (source->generation - sess->generation)) > 0) {
3839     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3840         source->generation, sess->generation);
3841     return;
3842   }
3843
3844   if (g_hash_table_contains (source->reported_in_sr_of,
3845           GUINT_TO_POINTER (data->source->ssrc))) {
3846     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3847     return;
3848   }
3849
3850   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3851     GST_DEBUG ("max RB count reached");
3852     return;
3853   }
3854
3855   /* only report about remote sources */
3856   if (source->internal)
3857     goto reported;
3858
3859   if (!RTP_SOURCE_IS_SENDER (source)) {
3860     GST_DEBUG ("source %08x not sender", source->ssrc);
3861     goto reported;
3862   }
3863
3864   if (source->disable_rtcp) {
3865     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
3866     goto reported;
3867   }
3868
3869   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3870
3871   /* get new stats */
3872   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3873       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3874
3875   /* store last generated RR packet */
3876   source->last_rr.is_valid = TRUE;
3877   source->last_rr.ssrc = data->source->ssrc;
3878   source->last_rr.fractionlost = fractionlost;
3879   source->last_rr.packetslost = packetslost;
3880   source->last_rr.exthighestseq = exthighestseq;
3881   source->last_rr.jitter = jitter;
3882   source->last_rr.lsr = lsr;
3883   source->last_rr.dlsr = dlsr;
3884
3885   /* packet is not yet filled, add report block for this source. */
3886   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3887       exthighestseq, jitter, lsr, dlsr);
3888
3889 reported:
3890   g_hash_table_add (source->reported_in_sr_of,
3891       GUINT_TO_POINTER (data->source->ssrc));
3892 }
3893
3894 /* construct FIR */
3895 static void
3896 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3897 {
3898   GstRTCPPacket *packet = &data->packet;
3899   guint16 len;
3900   guint8 *fci_data;
3901
3902   if (!source->send_fir)
3903     return;
3904
3905   len = gst_rtcp_packet_fb_get_fci_length (packet);
3906   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3907     /* exit because the packet is full, will put next request in a
3908      * further packet */
3909     return;
3910
3911   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3912
3913   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3914   fci_data += 4;
3915   fci_data[0] = source->current_send_fir_seqnum;
3916   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3917
3918   source->send_fir = FALSE;
3919   source->stats.sent_fir_count++;
3920 }
3921
3922 static void
3923 session_fir (RTPSession * sess, ReportData * data)
3924 {
3925   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3926   GstRTCPPacket *packet = &data->packet;
3927
3928   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3929     return;
3930
3931   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3932   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3933   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3934
3935   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3936       (GHFunc) session_add_fir, data);
3937
3938   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3939     gst_rtcp_packet_remove (packet);
3940   else
3941     data->may_suppress = FALSE;
3942 }
3943
3944 static gboolean
3945 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3946 {
3947   GstRTCPPacket packet;
3948   GstRTCPBuffer rtcp = { NULL, };
3949   gboolean ret = FALSE;
3950
3951   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3952
3953   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3954     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3955         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3956       ret = TRUE;
3957   }
3958
3959   gst_rtcp_buffer_unmap (&rtcp);
3960
3961   return ret;
3962 }
3963
3964 /* construct PLI */
3965 static void
3966 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3967 {
3968   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3969   GstRTCPPacket *packet = &data->packet;
3970
3971   if (!source->send_pli)
3972     return;
3973
3974   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3975     return;
3976
3977   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3978     /* exit because the packet is full, will put next request in a
3979      * further packet */
3980     return;
3981
3982   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
3983   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3984   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3985
3986   source->send_pli = FALSE;
3987   data->may_suppress = FALSE;
3988
3989   source->stats.sent_pli_count++;
3990 }
3991
3992 /* construct NACK */
3993 static void
3994 session_nack (const gchar * key, RTPSource * source, ReportData * data)
3995 {
3996   RTPSession *sess = data->sess;
3997   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3998   GstRTCPPacket *packet = &data->packet;
3999   guint16 *nacks;
4000   GstClockTime *nack_deadlines;
4001   guint n_nacks, i = 0;
4002   guint nacked_seqnums = 0;
4003   guint16 n_fb_nacks = 0;
4004   guint8 *fci_data;
4005
4006   if (!source->send_nack)
4007     return;
4008
4009   nacks = rtp_source_get_nacks (source, &n_nacks);
4010   nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
4011   GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
4012       GST_TIME_ARGS (data->current_time));
4013
4014   /* cleanup expired nacks */
4015   for (i = 0; i < n_nacks; i++) {
4016     GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
4017         GST_TIME_ARGS (nack_deadlines[i]));
4018     if (nack_deadlines[i] >= data->current_time)
4019       break;
4020   }
4021
4022   if (data->is_early) {
4023     /* don't remove them all if this is an early RTCP packet. It may happen
4024      * that the NACKs are late due to high RTT, not sending NACKs at all would
4025      * keep the RTX RTT stats high and maintain a dropping state. */
4026     i = MIN (n_nacks - 1, i);
4027   }
4028
4029   if (i) {
4030     GST_WARNING ("Removing %u expired NACKS", i);
4031     rtp_source_clear_nacks (source, i);
4032     n_nacks -= i;
4033     if (n_nacks == 0)
4034       return;
4035   }
4036
4037   /* allow overriding NACK to packet conversion */
4038   if (g_signal_has_handler_pending (sess,
4039           rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) {
4040     /* this is needed as it will actually resize the buffer */
4041     gst_rtcp_buffer_unmap (rtcp);
4042
4043     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0,
4044         data->source->ssrc, source->ssrc, source->nacks, data->rtcp,
4045         &nacked_seqnums);
4046
4047     /* and now remap for the remaining work */
4048     gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
4049
4050     if (nacked_seqnums > 0)
4051       goto done;
4052   }
4053
4054   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
4055     /* exit because the packet is full, will put next request in a
4056      * further packet */
4057     return;
4058
4059   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
4060   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
4061   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
4062
4063   if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
4064     gst_rtcp_packet_remove (packet);
4065     GST_WARNING ("no nacks fit in the packet");
4066     return;
4067   }
4068
4069   fci_data = gst_rtcp_packet_fb_get_fci (packet);
4070   for (i = 0; i < n_nacks; i = nacked_seqnums) {
4071     guint16 seqnum = nacks[i];
4072     guint16 blp = 0;
4073     guint j;
4074
4075     if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
4076       break;
4077
4078     n_fb_nacks++;
4079     nacked_seqnums++;
4080
4081     for (j = i + 1; j < n_nacks; j++) {
4082       gint diff;
4083
4084       diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
4085       GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
4086       if (diff > 16)
4087         break;
4088
4089       blp |= 1 << (diff - 1);
4090       nacked_seqnums++;
4091     }
4092
4093     GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
4094     fci_data += 4;
4095   }
4096
4097   GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
4098   source->stats.sent_nack_count += n_fb_nacks;
4099
4100 done:
4101   data->nacked_seqnums += nacked_seqnums;
4102   rtp_source_clear_nacks (source, nacked_seqnums);
4103   data->may_suppress = FALSE;
4104 }
4105
4106 /* perform cleanup of sources that timed out */
4107 static void
4108 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
4109 {
4110   gboolean remove = FALSE;
4111   gboolean byetimeout = FALSE;
4112   gboolean sendertimeout = FALSE;
4113   gboolean is_sender, is_active;
4114   RTPSession *sess = data->sess;
4115   GstClockTime interval, binterval;
4116   GstClockTime btime;
4117
4118   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
4119
4120   /* check for outdated collisions */
4121   if (source->internal) {
4122     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
4123     rtp_source_timeout (source, data->current_time, data->running_time,
4124         sess->rtcp_feedback_retention_window);
4125   }
4126
4127   /* nothing else to do when without RTCP */
4128   if (data->interval == GST_CLOCK_TIME_NONE)
4129     return;
4130
4131   is_sender = RTP_SOURCE_IS_SENDER (source);
4132   is_active = RTP_SOURCE_IS_ACTIVE (source);
4133
4134   /* our own rtcp interval may have been forced low by secondary configuration,
4135    * while sender side may still operate with higher interval,
4136    * so do not just take our interval to decide on timing out sender,
4137    * but take (if data->interval <= 5 * GST_SECOND):
4138    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
4139    * where sender_interval is difference between last 2 received RTCP reports
4140    */
4141   if (data->interval >= 5 * GST_SECOND || source->internal) {
4142     binterval = data->interval;
4143   } else {
4144     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
4145         GST_TIME_ARGS (source->stats.prev_rtcptime),
4146         GST_TIME_ARGS (source->stats.last_rtcptime));
4147     /* if not received enough yet, fallback to larger default */
4148     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
4149       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
4150     else
4151       binterval = 5 * GST_SECOND;
4152     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
4153   }
4154   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
4155       GST_TIME_ARGS (binterval));
4156
4157   if (!source->internal && source->marked_bye) {
4158     /* if we received a BYE from the source, remove the source after some
4159      * time. */
4160     if (data->current_time > source->bye_time &&
4161         data->current_time - source->bye_time > sess->stats.bye_timeout) {
4162       GST_DEBUG ("removing BYE source %08x", source->ssrc);
4163       remove = TRUE;
4164       byetimeout = TRUE;
4165     }
4166   }
4167
4168   if (source->internal && source->sent_bye) {
4169     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
4170     remove = TRUE;
4171   }
4172
4173   /* sources that were inactive for more than 5 times the deterministic reporting
4174    * interval get timed out. the min timeout is 5 seconds. */
4175   /* mind old time that might pre-date last time going to PLAYING */
4176   btime = MAX (source->last_activity, sess->start_time);
4177   if (data->current_time > btime) {
4178     interval = MAX (binterval * 5, 5 * GST_SECOND);
4179     if (data->current_time - btime > interval) {
4180       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
4181           source->ssrc, GST_TIME_ARGS (btime));
4182       if (source->internal) {
4183         /* this is an internal source that is not using our suggested ssrc.
4184          * since there must be another source using this ssrc, we can remove
4185          * this one instead of making it a receiver forever */
4186         if (source->ssrc != sess->suggested_ssrc) {
4187           rtp_source_mark_bye (source, "timed out");
4188           /* do not schedule bye here, since we are inside the RTCP timeout
4189            * processing and scheduling bye will interfere with SR/RR sending */
4190         }
4191       } else {
4192         remove = TRUE;
4193       }
4194     }
4195   }
4196
4197   /* senders that did not send for a long time become a receiver, this also
4198    * holds for our own sources. */
4199   if (is_sender) {
4200     /* mind old time that might pre-date last time going to PLAYING */
4201     btime = MAX (source->last_rtp_activity, sess->start_time);
4202     if (data->current_time > btime) {
4203       interval = MAX (binterval * 2, 5 * GST_SECOND);
4204       if (data->current_time - btime > interval) {
4205         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
4206             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
4207         sendertimeout = TRUE;
4208       }
4209     }
4210   }
4211
4212   if (remove) {
4213     sess->total_sources--;
4214     if (is_sender) {
4215       sess->stats.sender_sources--;
4216       if (source->internal)
4217         sess->stats.internal_sender_sources--;
4218     }
4219     if (is_active)
4220       sess->stats.active_sources--;
4221
4222     if (source->internal)
4223       sess->stats.internal_sources--;
4224
4225     if (byetimeout)
4226       on_bye_timeout (sess, source);
4227     else
4228       on_timeout (sess, source);
4229   } else {
4230     if (sendertimeout) {
4231       source->is_sender = FALSE;
4232       sess->stats.sender_sources--;
4233       if (source->internal)
4234         sess->stats.internal_sender_sources--;
4235
4236       on_sender_timeout (sess, source);
4237     }
4238     /* count how many source to report in this generation */
4239     if (((gint16) (source->generation - sess->generation)) <= 0)
4240       data->num_to_report++;
4241   }
4242   source->closing = remove;
4243 }
4244
4245 static void
4246 session_sdes (RTPSession * sess, ReportData * data)
4247 {
4248   GstRTCPPacket *packet = &data->packet;
4249   const GstStructure *sdes;
4250   gint i, n_fields;
4251   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4252
4253   /* add SDES packet */
4254   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
4255
4256   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
4257
4258   sdes = rtp_source_get_sdes_struct (data->source);
4259
4260   /* add all fields in the structure, the order is not important. */
4261   n_fields = gst_structure_n_fields (sdes);
4262   for (i = 0; i < n_fields; ++i) {
4263     const gchar *field;
4264     const gchar *value;
4265     GstRTCPSDESType type;
4266
4267     field = gst_structure_nth_field_name (sdes, i);
4268     if (field == NULL)
4269       continue;
4270     value = gst_structure_get_string (sdes, field);
4271     if (value == NULL)
4272       continue;
4273     type = gst_rtcp_sdes_name_to_type (field);
4274
4275     /* Early packets are minimal and only include the CNAME */
4276     if (data->is_early && type != GST_RTCP_SDES_CNAME)
4277       continue;
4278
4279     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
4280       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
4281           (const guint8 *) value);
4282     } else if (type == GST_RTCP_SDES_PRIV) {
4283       gsize prefix_len;
4284       gsize value_len;
4285       gsize data_len;
4286       guint8 data[256];
4287
4288       /* don't accept entries that are too big */
4289       prefix_len = strlen (field);
4290       if (prefix_len > 255)
4291         continue;
4292       value_len = strlen (value);
4293       if (value_len > 255)
4294         continue;
4295       data_len = 1 + prefix_len + value_len;
4296       if (data_len > 255)
4297         continue;
4298
4299       data[0] = prefix_len;
4300       memcpy (&data[1], field, prefix_len);
4301       memcpy (&data[1 + prefix_len], value, value_len);
4302
4303       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
4304     }
4305   }
4306
4307   data->has_sdes = TRUE;
4308 }
4309
4310 /* schedule a BYE packet */
4311 static void
4312 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
4313 {
4314   GstRTCPPacket *packet = &data->packet;
4315   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4316
4317   /* add SDES */
4318   session_sdes (sess, data);
4319   /* add a BYE packet */
4320   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
4321   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
4322   if (source->bye_reason)
4323     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
4324
4325   /* we have a BYE packet now */
4326   source->sent_bye = TRUE;
4327 }
4328
4329 static gboolean
4330 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
4331 {
4332   GstClockTime new_send_time;
4333   GstClockTime interval;
4334   RTPSessionStats *stats;
4335
4336   if (sess->scheduled_bye)
4337     stats = &sess->bye_stats;
4338   else
4339     stats = &sess->stats;
4340
4341   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
4342     data->is_early = TRUE;
4343   else
4344     data->is_early = FALSE;
4345
4346   if (data->is_early && sess->next_early_rtcp_time <= current_time) {
4347     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %"
4348         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
4349         GST_TIME_ARGS (current_time));
4350   } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
4351       sess->next_rtcp_check_time > current_time) {
4352     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
4353         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
4354         GST_TIME_ARGS (current_time));
4355     return FALSE;
4356   }
4357
4358   /* take interval and add jitter */
4359   interval = data->interval;
4360   if (interval != GST_CLOCK_TIME_NONE)
4361     interval = rtp_stats_add_rtcp_jitter (stats, interval);
4362
4363   if (sess->last_rtcp_check_time != GST_CLOCK_TIME_NONE) {
4364     /* perform forward reconsideration */
4365     if (interval != GST_CLOCK_TIME_NONE) {
4366       GstClockTime elapsed;
4367
4368       /* get elapsed time since we last reported */
4369       elapsed = current_time - sess->last_rtcp_check_time;
4370
4371       GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
4372           GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
4373       new_send_time = interval + sess->last_rtcp_check_time;
4374     } else {
4375       new_send_time = sess->last_rtcp_check_time;
4376     }
4377   } else {
4378     /* If this is the first RTCP packet, we can reconsider anything based
4379      * on the last RTCP send time because there was none.
4380      */
4381     g_warn_if_fail (!data->is_early);
4382     data->is_early = FALSE;
4383     new_send_time = current_time;
4384   }
4385
4386   if (!data->is_early) {
4387     /* check if reconsideration */
4388     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
4389       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
4390           GST_TIME_ARGS (new_send_time));
4391       /* store new check time */
4392       sess->next_rtcp_check_time = new_send_time;
4393       sess->last_rtcp_interval = interval;
4394       return FALSE;
4395     }
4396
4397     sess->last_rtcp_interval = interval;
4398     if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
4399             || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
4400         && interval != GST_CLOCK_TIME_NONE) {
4401       /* Apply the rules from RFC 4585 section 3.5.3 */
4402       if (stats->min_interval != 0 && !sess->first_rtcp) {
4403         GstClockTime T_rr_current_interval =
4404             g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND;
4405
4406         if (T_rr_current_interval > interval) {
4407           GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
4408               " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
4409               GST_TIME_ARGS (interval));
4410           interval = T_rr_current_interval;
4411         }
4412       }
4413     }
4414     sess->next_rtcp_check_time = current_time + interval;
4415   }
4416
4417
4418   GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT,
4419       GST_TIME_ARGS (sess->next_rtcp_check_time));
4420
4421   return TRUE;
4422 }
4423
4424 static void
4425 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
4426 {
4427   g_hash_table_insert (hash_table, key, g_object_ref (source));
4428 }
4429
4430 static gboolean
4431 remove_closing_sources (const gchar * key, RTPSource * source,
4432     ReportData * data)
4433 {
4434   if (source->closing)
4435     return TRUE;
4436
4437   if (source->send_fir)
4438     data->have_fir = TRUE;
4439   if (source->send_pli)
4440     data->have_pli = TRUE;
4441   if (source->send_nack)
4442     data->have_nack = TRUE;
4443
4444   return FALSE;
4445 }
4446
4447 static void
4448 generate_twcc (const gchar * key, RTPSource * source, ReportData * data)
4449 {
4450   RTPSession *sess = data->sess;
4451   GstBuffer *buf;
4452
4453   /* only generate RTCP for active internal sources */
4454   if (!source->internal || source->sent_bye)
4455     return;
4456
4457   /* ignore other sources when we do the timeout after a scheduled BYE */
4458   if (sess->scheduled_bye && !source->marked_bye)
4459     return;
4460
4461   /* skip if RTCP is disabled */
4462   if (source->disable_rtcp) {
4463     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4464     return;
4465   }
4466
4467   GST_DEBUG ("generating TWCC feedback for source %08x", source->ssrc);
4468
4469   while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) {
4470     ReportOutput *output = g_slice_new (ReportOutput);
4471     output->source = g_object_ref (source);
4472     output->is_bye = FALSE;
4473     output->buffer = buf;
4474     /* queue the RTCP packet to push later */
4475     g_queue_push_tail (&data->output, output);
4476   }
4477 }
4478
4479
4480 static void
4481 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
4482 {
4483   RTPSession *sess = data->sess;
4484   gboolean is_bye = FALSE;
4485   ReportOutput *output;
4486   gboolean sr_req_pending = sess->sr_req_pending;
4487
4488   /* only generate RTCP for active internal sources */
4489   if (!source->internal || source->sent_bye)
4490     return;
4491
4492   /* ignore other sources when we do the timeout after a scheduled BYE */
4493   if (sess->scheduled_bye && !source->marked_bye)
4494     return;
4495
4496   /* skip if RTCP is disabled */
4497   if (source->disable_rtcp) {
4498     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4499     return;
4500   }
4501
4502   data->source = source;
4503
4504   /* open packet */
4505   session_start_rtcp (sess, data);
4506
4507   if (source->marked_bye) {
4508     /* send BYE */
4509     make_source_bye (sess, source, data);
4510     is_bye = TRUE;
4511   } else if (!data->is_early) {
4512     /* loop over all known sources and add report blocks. If we are early, we
4513      * just make a minimal RTCP packet and skip this step */
4514     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4515         (GHFunc) session_report_blocks, data);
4516   }
4517   if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp
4518           || sr_req_pending))
4519     session_sdes (sess, data);
4520
4521   if (data->have_fir)
4522     session_fir (sess, data);
4523
4524   if (data->have_pli)
4525     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4526         (GHFunc) session_pli, data);
4527
4528   if (data->have_nack)
4529     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4530         (GHFunc) session_nack, data);
4531
4532   gst_rtcp_buffer_unmap (&data->rtcpbuf);
4533
4534   output = g_slice_new (ReportOutput);
4535   output->source = g_object_ref (source);
4536   output->is_bye = is_bye;
4537   output->buffer = data->rtcp;
4538   /* queue the RTCP packet to push later */
4539   g_queue_push_tail (&data->output, output);
4540 }
4541
4542 static void
4543 update_generation (const gchar * key, RTPSource * source, ReportData * data)
4544 {
4545   RTPSession *sess = data->sess;
4546
4547   if (g_hash_table_size (source->reported_in_sr_of) >=
4548       sess->stats.internal_sources) {
4549     /* source is reported, move to next generation */
4550     source->generation = sess->generation + 1;
4551     g_hash_table_remove_all (source->reported_in_sr_of);
4552
4553     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
4554         source->generation);
4555
4556     /* if we reported all sources in this generation, move to next */
4557     if (--data->num_to_report == 0) {
4558       sess->generation++;
4559       GST_DEBUG ("all reported, generation now %u", sess->generation);
4560     }
4561   }
4562 }
4563
4564 static void
4565 schedule_remaining_nacks (const gchar * key, RTPSource * source,
4566     ReportData * data)
4567 {
4568   RTPSession *sess = data->sess;
4569   GstClockTime *nack_deadlines;
4570   GstClockTime deadline;
4571   guint n_nacks;
4572
4573   if (!source->send_nack)
4574     return;
4575
4576   /* the scheduling is entirely based on available bandwidth, just take the
4577    * biggest seqnum, which will have the largest deadline to request early
4578    * RTCP. */
4579   nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
4580   deadline = nack_deadlines[n_nacks - 1];
4581   RTP_SESSION_UNLOCK (sess);
4582   rtp_session_send_rtcp_with_deadline (sess, deadline);
4583   RTP_SESSION_LOCK (sess);
4584 }
4585
4586 static gboolean
4587 rtp_session_are_all_sources_bye (RTPSession * sess)
4588 {
4589   GHashTableIter iter;
4590   RTPSource *src;
4591
4592   RTP_SESSION_LOCK (sess);
4593   g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
4594   while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
4595     if (src->internal && !src->sent_bye) {
4596       RTP_SESSION_UNLOCK (sess);
4597       return FALSE;
4598     }
4599   }
4600   RTP_SESSION_UNLOCK (sess);
4601
4602   return TRUE;
4603 }
4604
4605 /**
4606  * rtp_session_on_timeout:
4607  * @sess: an #RTPSession
4608  * @current_time: the current system time
4609  * @ntpnstime: the current NTP time in nanoseconds
4610  * @running_time: the current running_time of the pipeline
4611  *
4612  * Perform maintenance actions after the timeout obtained with
4613  * rtp_session_next_timeout() expired.
4614  *
4615  * This function will perform timeouts of receivers and senders, send a BYE
4616  * packet or generate RTCP packets with current session stats.
4617  *
4618  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
4619  * times, for each packet that should be processed.
4620  *
4621  * Returns: a #GstFlowReturn.
4622  */
4623 GstFlowReturn
4624 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
4625     guint64 ntpnstime, GstClockTime running_time)
4626 {
4627   GstFlowReturn result = GST_FLOW_OK;
4628   ReportData data = { GST_RTCP_BUFFER_INIT };
4629   GHashTable *table_copy;
4630   ReportOutput *output;
4631   gboolean all_empty = FALSE;
4632
4633   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
4634
4635   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
4636       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4637       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
4638
4639   data.sess = sess;
4640   data.current_time = current_time;
4641   data.ntpnstime = ntpnstime;
4642   data.running_time = running_time;
4643   data.num_to_report = 0;
4644   data.may_suppress = FALSE;
4645   data.nacked_seqnums = 0;
4646   g_queue_init (&data.output);
4647
4648   RTP_SESSION_LOCK (sess);
4649   /* get a new interval, we need this for various cleanups etc */
4650   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
4651
4652   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
4653
4654   /* we need an internal source now */
4655   if (sess->stats.internal_sources == 0) {
4656     RTPSource *source;
4657     gboolean created;
4658
4659     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
4660         current_time);
4661     sess->internal_ssrc_set = TRUE;
4662
4663     if (created)
4664       on_new_sender_ssrc (sess, source);
4665
4666     g_object_unref (source);
4667   }
4668
4669   sess->conflicting_addresses =
4670       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
4671
4672   /* Make a local copy of the hashtable. We need to do this because the
4673    * cleanup stage below releases the session lock. */
4674   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
4675       (GDestroyNotify) g_object_unref);
4676   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4677       (GHFunc) clone_ssrcs_hashtable, table_copy);
4678
4679   /* Clean up the session, mark the source for removing, this might release the
4680    * session lock. */
4681   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
4682   g_hash_table_destroy (table_copy);
4683
4684   /* Now remove the marked sources */
4685   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
4686       (GHRFunc) remove_closing_sources, &data);
4687
4688   /* update point-to-point status */
4689   session_update_ptp (sess);
4690
4691   /* see if we need to generate SR or RR packets */
4692   if (!is_rtcp_time (sess, current_time, &data))
4693     goto done;
4694
4695   /* check if all the buffers are empty after generation */
4696   all_empty = TRUE;
4697
4698   GST_DEBUG
4699       ("doing RTCP generation %u for %u sources, early %d, may suppress %d",
4700       sess->generation, data.num_to_report, data.is_early, data.may_suppress);
4701
4702   /* generate RTCP for all internal sources */
4703   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4704       (GHFunc) generate_rtcp, &data);
4705
4706   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4707       (GHFunc) generate_twcc, &data);
4708
4709   /* update the generation for all the sources that have been reported */
4710   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4711       (GHFunc) update_generation, &data);
4712
4713   /* we keep track of the last report time in order to timeout inactive
4714    * receivers or senders */
4715   if (!data.is_early) {
4716     GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %"
4717         GST_TIME_FORMAT " = %" GST_TIME_FORMAT,
4718         GST_TIME_ARGS (data.current_time),
4719         GST_TIME_ARGS (sess->last_rtcp_send_time),
4720         GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time));
4721     sess->last_rtcp_send_time = data.current_time;
4722   }
4723
4724   GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
4725       " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time),
4726       GST_TIME_ARGS (sess->last_rtcp_check_time),
4727       GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time));
4728   sess->last_rtcp_check_time = data.current_time;
4729   sess->first_rtcp = FALSE;
4730   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
4731   sess->scheduled_bye = FALSE;
4732
4733 done:
4734   RTP_SESSION_UNLOCK (sess);
4735
4736   /* notify about updated statistics */
4737   g_object_notify (G_OBJECT (sess), "stats");
4738
4739   /* push out the RTCP packets */
4740   while ((output = g_queue_pop_head (&data.output))) {
4741     gboolean do_not_suppress, empty_buffer;
4742     GstBuffer *buffer = output->buffer;
4743     RTPSource *source = output->source;
4744
4745     /* Give the user a change to add its own packet */
4746     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
4747         buffer, data.is_early, &do_not_suppress);
4748
4749     empty_buffer = gst_buffer_get_size (buffer) == 0;
4750
4751     if (!empty_buffer)
4752       all_empty = FALSE;
4753
4754     if (sess->callbacks.send_rtcp &&
4755         !empty_buffer && (do_not_suppress || !data.may_suppress)) {
4756       guint packet_size;
4757
4758       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
4759
4760       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
4761       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
4762           sess->stats.avg_rtcp_packet_size, packet_size);
4763       result =
4764           sess->callbacks.send_rtcp (sess, source, buffer,
4765           rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data);
4766
4767       RTP_SESSION_LOCK (sess);
4768       sess->stats.nacks_sent += data.nacked_seqnums;
4769       on_sender_ssrc_active (sess, source);
4770       RTP_SESSION_UNLOCK (sess);
4771     } else {
4772       GST_DEBUG ("freeing packet callback: %p"
4773           " empty_buffer: %d, "
4774           " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp,
4775           empty_buffer, do_not_suppress, data.may_suppress);
4776       if (!empty_buffer) {
4777         RTP_SESSION_LOCK (sess);
4778         sess->stats.nacks_dropped += data.nacked_seqnums;
4779         RTP_SESSION_UNLOCK (sess);
4780       }
4781       gst_buffer_unref (buffer);
4782     }
4783     g_object_unref (source);
4784     g_slice_free (ReportOutput, output);
4785   }
4786
4787   if (all_empty)
4788     GST_ERROR ("generated empty RTCP messages for all the sources");
4789
4790   /* schedule remaining nacks */
4791   RTP_SESSION_LOCK (sess);
4792   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4793       (GHFunc) schedule_remaining_nacks, &data);
4794   RTP_SESSION_UNLOCK (sess);
4795
4796   return result;
4797 }
4798
4799 /**
4800  * rtp_session_request_early_rtcp:
4801  * @sess: an #RTPSession
4802  * @current_time: the current system time
4803  * @max_delay: maximum delay
4804  *
4805  * Request transmission of early RTCP
4806  *
4807  * Returns: %TRUE if the related RTCP can be scheduled.
4808  */
4809 gboolean
4810 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
4811     GstClockTime max_delay)
4812 {
4813   GstClockTime T_dither_max, T_rr, offset = 0;
4814   gboolean ret;
4815   gboolean allow_early;
4816
4817   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
4818
4819   RTP_SESSION_LOCK (sess);
4820
4821   /* We assume a feedback profile if something is requesting RTCP
4822    * to be sent */
4823   sess->rtp_profile = GST_RTP_PROFILE_AVPF;
4824
4825   /* Check if already requested */
4826   /*  RFC 4585 section 3.5.2 step 2 */
4827   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
4828     GST_LOG_OBJECT (sess, "already have next early rtcp time");
4829     ret = (current_time + max_delay > sess->next_early_rtcp_time);
4830     goto end;
4831   }
4832
4833   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
4834     GST_LOG_OBJECT (sess, "no next RTCP check time");
4835     ret = FALSE;
4836     goto end;
4837   }
4838
4839   /* RFC 4585 section 3.5.3 step 1
4840    * If no regular RTCP packet has been sent before, then a regular
4841    * RTCP packet has to be scheduled first and FB messages might be
4842    * included there
4843    */
4844   if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
4845     GST_LOG_OBJECT (sess, "no RTCP sent yet");
4846
4847     if (current_time + max_delay > sess->next_rtcp_check_time) {
4848       GST_LOG_OBJECT (sess,
4849           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4850           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4851           GST_TIME_ARGS (max_delay),
4852           GST_TIME_ARGS (sess->next_rtcp_check_time));
4853       ret = TRUE;
4854     } else {
4855       GST_LOG_OBJECT (sess,
4856           "can't allow early feedback, next scheduled time is too late %"
4857           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4858           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4859           GST_TIME_ARGS (sess->next_rtcp_check_time));
4860       ret = FALSE;
4861     }
4862     goto end;
4863   }
4864
4865   T_rr = sess->last_rtcp_interval;
4866
4867   /*  RFC 4585 section 3.5.2 step 2b */
4868   /* If the total sources is <=2, then there is only us and one peer */
4869   /* When there is one auxiliary stream the session can still do point
4870    * to point.
4871    */
4872   if (sess->is_doing_ptp) {
4873     T_dither_max = 0;
4874   } else {
4875     /* Divide by 2 because l = 0.5 */
4876     T_dither_max = T_rr;
4877     T_dither_max /= 2;
4878   }
4879
4880   /*  RFC 4585 section 3.5.2 step 3 */
4881   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
4882     GST_LOG_OBJECT (sess,
4883         "don't send because of dither, next scheduled time is too soon %"
4884         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
4885         GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
4886         GST_TIME_ARGS (sess->next_rtcp_check_time));
4887     ret = T_dither_max <= max_delay;
4888     goto end;
4889   }
4890
4891   /*  RFC 4585 section 3.5.2 step 4a and
4892    *  RFC 4585 section 3.5.2 step 6 */
4893   allow_early = FALSE;
4894   if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) {
4895     /* Last time we sent a full RTCP packet, we can now immediately
4896      * send an early one as allow_early was reset to TRUE */
4897     allow_early = TRUE;
4898   } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) {
4899     /* Last packet we sent was an early RTCP packet and more than
4900      * T_rr has passed since then, meaning we would have suppressed
4901      * a regular RTCP packet already and reset allow_early to TRUE */
4902     allow_early = TRUE;
4903
4904     /* We have to offset a bit as T_rr has not passed yet, but will before
4905      * max_delay */
4906     if (sess->last_rtcp_check_time + T_rr > current_time)
4907       offset = (sess->last_rtcp_check_time + T_rr) - current_time;
4908   } else {
4909     GST_DEBUG_OBJECT (sess,
4910         "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %"
4911         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %"
4912         GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time),
4913         GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr),
4914         GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay));
4915   }
4916
4917   if (!allow_early) {
4918     /* Ignore the request a scheduled packet will be in time anyway */
4919     if (current_time + max_delay > sess->next_rtcp_check_time) {
4920       GST_LOG_OBJECT (sess,
4921           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4922           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4923           GST_TIME_ARGS (max_delay),
4924           GST_TIME_ARGS (sess->next_rtcp_check_time));
4925       ret = TRUE;
4926     } else {
4927       GST_LOG_OBJECT (sess,
4928           "can't allow early feedback and next scheduled time is too late %"
4929           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4930           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4931           GST_TIME_ARGS (sess->next_rtcp_check_time));
4932       ret = FALSE;
4933     }
4934     goto end;
4935   }
4936
4937   /*  RFC 4585 section 3.5.2 step 4b */
4938   if (T_dither_max) {
4939     /* Schedule an early transmission later */
4940     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
4941         current_time + offset;
4942   } else {
4943     /* If no dithering, schedule it for NOW */
4944     sess->next_early_rtcp_time = current_time + offset;
4945   }
4946
4947   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
4948       ", next regular RTCP time %" GST_TIME_FORMAT,
4949       GST_TIME_ARGS (sess->next_early_rtcp_time),
4950       GST_TIME_ARGS (sess->next_rtcp_check_time));
4951   RTP_SESSION_UNLOCK (sess);
4952
4953   /* notify app of need to send packet early
4954    * and therefore of timeout change */
4955   if (sess->callbacks.reconsider)
4956     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
4957
4958   return TRUE;
4959
4960 end:
4961
4962   RTP_SESSION_UNLOCK (sess);
4963
4964   return ret;
4965 }
4966
4967 static gboolean
4968 rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
4969     GstClockTime max_delay)
4970 {
4971   /* notify the application that we intend to send early RTCP */
4972   if (sess->callbacks.notify_early_rtcp)
4973     sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
4974
4975   return rtp_session_request_early_rtcp (sess, now, max_delay);
4976 }
4977
4978 static gboolean
4979 rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
4980 {
4981   GstClockTime now, max_delay;
4982
4983   if (!sess->callbacks.send_rtcp)
4984     return FALSE;
4985
4986   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4987
4988   if (deadline < now)
4989     return FALSE;
4990
4991   max_delay = deadline - now;
4992
4993   return rtp_session_send_rtcp_internal (sess, now, max_delay);
4994 }
4995
4996 static gboolean
4997 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
4998 {
4999   GstClockTime now;
5000
5001   if (!sess->callbacks.send_rtcp)
5002     return FALSE;
5003
5004   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
5005
5006   return rtp_session_send_rtcp_internal (sess, now, max_delay);
5007 }
5008
5009 gboolean
5010 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
5011     gboolean fir, gint count)
5012 {
5013   RTPSource *src;
5014
5015   RTP_SESSION_LOCK (sess);
5016   src = find_source (sess, ssrc);
5017   if (src == NULL)
5018     goto no_source;
5019
5020   if (fir) {
5021     src->send_pli = FALSE;
5022     src->send_fir = TRUE;
5023
5024     if (count == -1 || count != src->last_fir_count)
5025       src->current_send_fir_seqnum++;
5026     src->last_fir_count = count;
5027   } else if (!src->send_fir) {
5028     src->send_pli = TRUE;
5029   }
5030   RTP_SESSION_UNLOCK (sess);
5031
5032   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
5033     GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP");
5034   }
5035
5036   return TRUE;
5037
5038   /* ERRORS */
5039 no_source:
5040   {
5041     RTP_SESSION_UNLOCK (sess);
5042     return FALSE;
5043   }
5044 }
5045
5046 /**
5047  * rtp_session_request_nack:
5048  * @sess: a #RTPSession
5049  * @ssrc: the SSRC
5050  * @seqnum: the missing seqnum
5051  * @max_delay: max delay to request NACK
5052  *
5053  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
5054  *
5055  * Returns: %TRUE if the NACK feedback could be scheduled
5056  */
5057 gboolean
5058 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
5059     GstClockTime max_delay)
5060 {
5061   RTPSource *source;
5062   GstClockTime now;
5063
5064   if (!sess->callbacks.send_rtcp)
5065     return FALSE;
5066
5067   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
5068
5069   RTP_SESSION_LOCK (sess);
5070   source = find_source (sess, ssrc);
5071   if (source == NULL)
5072     goto no_source;
5073
5074   GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
5075       ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
5076   rtp_source_register_nack (source, seqnum, now + max_delay);
5077   RTP_SESSION_UNLOCK (sess);
5078
5079   if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
5080     GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
5081   }
5082
5083   return TRUE;
5084
5085   /* ERRORS */
5086 no_source:
5087   {
5088     RTP_SESSION_UNLOCK (sess);
5089     return FALSE;
5090   }
5091 }
5092
5093 /**
5094  * rtp_session_update_recv_caps_structure:
5095  * @sess: an #RTPSession
5096  * @s: a #GstStructure from a #GstCaps
5097  *
5098  * Update the caps of the receiver in the rtp session.
5099  */
5100 void
5101 rtp_session_update_recv_caps_structure (RTPSession * sess,
5102     const GstStructure * s)
5103 {
5104   rtp_twcc_manager_parse_recv_ext_id (sess->twcc, s);
5105 }