fdb17bed82ebe85c2aae4b1c742e2d96b2e93658
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25 #include <stdlib.h>
26
27 #include <gst/rtp/gstrtpbuffer.h>
28 #include <gst/rtp/gstrtcpbuffer.h>
29
30 #include <gst/glib-compat-private.h>
31
32 #include "rtpsession.h"
33
34 GST_DEBUG_CATEGORY (rtp_session_debug);
35 #define GST_CAT_DEFAULT rtp_session_debug
36
37 /* signals and args */
38 enum
39 {
40   SIGNAL_GET_SOURCE_BY_SSRC,
41   SIGNAL_ON_NEW_SSRC,
42   SIGNAL_ON_SSRC_COLLISION,
43   SIGNAL_ON_SSRC_VALIDATED,
44   SIGNAL_ON_SSRC_ACTIVE,
45   SIGNAL_ON_SSRC_SDES,
46   SIGNAL_ON_BYE_SSRC,
47   SIGNAL_ON_BYE_TIMEOUT,
48   SIGNAL_ON_TIMEOUT,
49   SIGNAL_ON_SENDER_TIMEOUT,
50   SIGNAL_ON_SENDING_RTCP,
51   SIGNAL_ON_APP_RTCP,
52   SIGNAL_ON_FEEDBACK_RTCP,
53   SIGNAL_SEND_RTCP,
54   SIGNAL_SEND_RTCP_FULL,
55   SIGNAL_ON_RECEIVING_RTCP,
56   SIGNAL_ON_NEW_SENDER_SSRC,
57   SIGNAL_ON_SENDER_SSRC_ACTIVE,
58   SIGNAL_ON_SENDING_NACKS,
59   LAST_SIGNAL
60 };
61
62 #define DEFAULT_INTERNAL_SOURCE      NULL
63 #define DEFAULT_BANDWIDTH            0.0
64 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
65 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
66 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
67 #define DEFAULT_RTCP_MTU             1400
68 #define DEFAULT_SDES                 NULL
69 #define DEFAULT_NUM_SOURCES          0
70 #define DEFAULT_NUM_ACTIVE_SOURCES   0
71 #define DEFAULT_SOURCES              NULL
72 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
73 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
74 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
75 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
76 #define DEFAULT_MAX_DROPOUT_TIME     60000
77 #define DEFAULT_MAX_MISORDER_TIME    2000
78 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
79 #define DEFAULT_RTCP_REDUCED_SIZE    FALSE
80 #define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE
81 #define DEFAULT_TWCC_FEEDBACK_INTERVAL GST_CLOCK_TIME_NONE
82
83 enum
84 {
85   PROP_0,
86   PROP_INTERNAL_SSRC,
87   PROP_INTERNAL_SOURCE,
88   PROP_BANDWIDTH,
89   PROP_RTCP_FRACTION,
90   PROP_RTCP_RR_BANDWIDTH,
91   PROP_RTCP_RS_BANDWIDTH,
92   PROP_RTCP_MTU,
93   PROP_SDES,
94   PROP_NUM_SOURCES,
95   PROP_NUM_ACTIVE_SOURCES,
96   PROP_SOURCES,
97   PROP_FAVOR_NEW,
98   PROP_RTCP_MIN_INTERVAL,
99   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
100   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
101   PROP_PROBATION,
102   PROP_MAX_DROPOUT_TIME,
103   PROP_MAX_MISORDER_TIME,
104   PROP_STATS,
105   PROP_RTP_PROFILE,
106   PROP_RTCP_REDUCED_SIZE,
107   PROP_RTCP_DISABLE_SR_TIMESTAMP,
108   PROP_TWCC_FEEDBACK_INTERVAL,
109 };
110
111 /* update average packet size */
112 #define INIT_AVG(avg, val) \
113    (avg) = (val);
114 #define UPDATE_AVG(avg, val)            \
115   if ((avg) == 0)                       \
116    (avg) = (val);                       \
117   else                                  \
118    (avg) = ((val) + (15 * (avg))) >> 4;
119
120 /* GObject vmethods */
121 static void rtp_session_finalize (GObject * object);
122 static void rtp_session_set_property (GObject * object, guint prop_id,
123     const GValue * value, GParamSpec * pspec);
124 static void rtp_session_get_property (GObject * object, guint prop_id,
125     GValue * value, GParamSpec * pspec);
126
127 static gboolean rtp_session_send_rtcp (RTPSession * sess,
128     GstClockTime max_delay);
129 static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
130     GstClockTime deadline);
131
132 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
133
134 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
135
136 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
137 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
138     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
139 static RTPSource *obtain_internal_source (RTPSession * sess,
140     guint32 ssrc, gboolean * created, GstClockTime current_time);
141 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
142     GstClockTime current_time);
143 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
144     gboolean deterministic, gboolean first);
145
146 static gboolean
147 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
148     const GValue * handler_return, gpointer data)
149 {
150   if (g_value_get_boolean (handler_return))
151     g_value_set_boolean (return_accu, TRUE);
152
153   return TRUE;
154 }
155
156 static void
157 rtp_session_class_init (RTPSessionClass * klass)
158 {
159   GObjectClass *gobject_class;
160
161   gobject_class = (GObjectClass *) klass;
162
163   gobject_class->finalize = rtp_session_finalize;
164   gobject_class->set_property = rtp_session_set_property;
165   gobject_class->get_property = rtp_session_get_property;
166
167   /**
168    * RTPSession::get-source-by-ssrc:
169    * @session: the object which received the signal
170    * @ssrc: the SSRC of the RTPSource
171    *
172    * Request the #RTPSource object with SSRC @ssrc in @session.
173    */
174   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
175       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
176       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
177           get_source_by_ssrc), NULL, NULL, NULL,
178       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
179
180   /**
181    * RTPSession::on-new-ssrc:
182    * @session: the object which received the signal
183    * @src: the new RTPSource
184    *
185    * Notify of a new SSRC that entered @session.
186    */
187   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
188       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
189       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
190       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
191   /**
192    * RTPSession::on-ssrc-collision:
193    * @session: the object which received the signal
194    * @src: the #RTPSource that caused a collision
195    *
196    * Notify when we have an SSRC collision
197    */
198   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
199       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
200       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
201       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
202   /**
203    * RTPSession::on-ssrc-validated:
204    * @session: the object which received the signal
205    * @src: the new validated RTPSource
206    *
207    * Notify of a new SSRC that became validated.
208    */
209   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
210       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
211       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
212       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
213   /**
214    * RTPSession::on-ssrc-active:
215    * @session: the object which received the signal
216    * @src: the active RTPSource
217    *
218    * Notify of a SSRC that is active, i.e., sending RTCP.
219    */
220   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
221       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
222       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
223       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
224   /**
225    * RTPSession::on-ssrc-sdes:
226    * @session: the object which received the signal
227    * @src: the RTPSource
228    *
229    * Notify that a new SDES was received for SSRC.
230    */
231   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
232       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
233       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
234       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
235   /**
236    * RTPSession::on-bye-ssrc:
237    * @session: the object which received the signal
238    * @src: the RTPSource that went away
239    *
240    * Notify of an SSRC that became inactive because of a BYE packet.
241    */
242   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
243       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
245       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
246   /**
247    * RTPSession::on-bye-timeout:
248    * @session: the object which received the signal
249    * @src: the RTPSource that timed out
250    *
251    * Notify of an SSRC that has timed out because of BYE
252    */
253   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
254       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
255       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
256       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
257   /**
258    * RTPSession::on-timeout:
259    * @session: the object which received the signal
260    * @src: the RTPSource that timed out
261    *
262    * Notify of an SSRC that has timed out
263    */
264   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
265       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
266       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
267       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
268   /**
269    * RTPSession::on-sender-timeout:
270    * @session: the object which received the signal
271    * @src: the RTPSource that timed out
272    *
273    * Notify of an SSRC that was a sender but timed out and became a receiver.
274    */
275   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
276       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
277       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
278       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
279
280   /**
281    * RTPSession::on-sending-rtcp
282    * @session: the object which received the signal
283    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
284    * @early: %TRUE if the packet is early, %FALSE if it is regular
285    *
286    * This signal is emitted before sending an RTCP packet, it can be used
287    * to add extra RTCP Packets.
288    *
289    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
290    * if suppressing it is acceptable
291    */
292   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
293       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
294       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
295       accumulate_trues, NULL, NULL, G_TYPE_BOOLEAN, 2,
296       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
297
298   /**
299    * RTPSession::on-app-rtcp:
300    * @session: the object which received the signal
301    * @subtype: The subtype of the packet
302    * @ssrc: The SSRC/CSRC of the packet
303    * @name: The name of the packet
304    * @data: a #GstBuffer with the application-dependant data or %NULL if
305    * there was no data
306    *
307    * Notify that a RTCP APP packet has been received
308    */
309   rtp_session_signals[SIGNAL_ON_APP_RTCP] =
310       g_signal_new ("on-app-rtcp", G_TYPE_FROM_CLASS (klass),
311       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_app_rtcp),
312       NULL, NULL, NULL, G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT,
313       G_TYPE_STRING, GST_TYPE_BUFFER);
314
315   /**
316    * RTPSession::on-feedback-rtcp:
317    * @session: the object which received the signal
318    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
319    *  %GST_RTCP_TYPE_RTPFB
320    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
321    * @sender_ssrc: The SSRC of the sender
322    * @media_ssrc: The SSRC of the media this refers to
323    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
324    * there was no FCI
325    *
326    * Notify that a RTCP feedback packet has been received
327    */
328   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
329       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
330       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
331       NULL, NULL, NULL, G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
332       G_TYPE_UINT, GST_TYPE_BUFFER);
333
334   /**
335    * RTPSession::send-rtcp:
336    * @session: the object which received the signal
337    * @max_delay: The maximum delay after which the feedback will not be useful
338    *  anymore
339    *
340    * Requests that the #RTPSession initiate a new RTCP packet as soon as
341    * possible within the requested delay.
342    *
343    * This sets feedback to %TRUE if not already done before.
344    */
345   rtp_session_signals[SIGNAL_SEND_RTCP] =
346       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
347       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
348       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
349       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
350
351   /**
352    * RTPSession::send-rtcp-full:
353    * @session: the object which received the signal
354    * @max_delay: The maximum delay after which the feedback will not be useful
355    *  anymore
356    *
357    * Requests that the #RTPSession initiate a new RTCP packet as soon as
358    * possible within the requested delay.
359    *
360    * This sets feedback to %TRUE if not already done before.
361    *
362    * Returns: TRUE if the new RTCP packet could be scheduled within the
363    * requested delay, FALSE otherwise.
364    *
365    * Since: 1.6
366    */
367   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
368       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
369       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
370       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
371       NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
372
373   /**
374    * RTPSession::on-receiving-rtcp
375    * @session: the object which received the signal
376    * @buffer: the #GstBuffer containing the RTCP packet that was received
377    *
378    * This signal is emitted when receiving an RTCP packet before it is handled
379    * by the session. It can be used to extract custom information from RTCP packets.
380    *
381    * Since: 1.6
382    */
383   rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
384       g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
385       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
386       NULL, NULL, NULL, G_TYPE_NONE, 1,
387       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
388
389   /**
390    * RTPSession::on-new-sender-ssrc:
391    * @session: the object which received the signal
392    * @src: the new sender RTPSource
393    *
394    * Notify of a new sender SSRC that entered @session.
395    *
396    * Since: 1.8
397    */
398   rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
399       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
400       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc),
401       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
402
403   /**
404    * RTPSession::on-sender-ssrc-active:
405    * @session: the object which received the signal
406    * @src: the active sender RTPSource
407    *
408    * Notify of a sender SSRC that is active, i.e., sending RTCP.
409    *
410    * Since: 1.8
411    */
412   rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
413       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
414       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass,
415           on_sender_ssrc_active), NULL, NULL, NULL,
416       G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
417
418   /**
419    * RTPSession::on-sending-nack
420    * @session: the object which received the signal
421    * @sender_ssrc: the sender ssrc
422    * @media_ssrc: the media ssrc
423    * @nacks: (element-type guint16): the list of seqnum to be nacked
424    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
425    *
426    * This signal is emitted before NACK packets are added into the RTCP
427    * packet. This signal can be used to override the conversion of the NACK
428    * seqnum array into packets. This can be used if your protocol uses
429    * different type of NACK (e.g. based on RTCP APP).
430    *
431    * The handler should transform the seqnum from @nacks array into packets.
432    * @nacks seqnum must be consumed from the start. The remaining will be
433    * rescheduled for later base on bandwidth. Only one handler will be
434    * signalled.
435    *
436    * A handler may return 0 to signal that generic NACKs should be created
437    * for this set. This can be useful if the signal is used for other purpose
438    * or if the other type of NACK would use more space.
439    *
440    * Returns: the number of NACK seqnum that was consumed from @nacks.
441    *
442    * Since: 1.16
443    */
444   rtp_session_signals[SIGNAL_ON_SENDING_NACKS] =
445       g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass),
446       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks),
447       g_signal_accumulator_first_wins, NULL, NULL,
448       G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY,
449       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
450
451   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
452       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
453           "The internal SSRC used for the session (deprecated)",
454           0, G_MAXUINT, 0,
455           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
456           GST_PARAM_DOC_SHOW_DEFAULT));
457
458   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
459       g_param_spec_object ("internal-source", "Internal Source",
460           "The internal source element of the session (deprecated)",
461           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
462
463   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
464       g_param_spec_double ("bandwidth", "Bandwidth",
465           "The bandwidth of the session in bits per second (0 for auto-discover)",
466           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
467           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
468
469   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
470       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
471           "The fraction of the bandwidth used for RTCP in bits per second (or as a real fraction of the RTP bandwidth if < 1)",
472           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
473           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474
475   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
476       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
477           "The RTCP bandwidth used for receivers in bits per second (-1 = default)",
478           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
479           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
480
481   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
482       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
483           "The RTCP bandwidth used for senders in bits per second (-1 = default)",
484           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
485           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
486
487   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
488       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
489           "The maximum size of the RTCP packets",
490           16, G_MAXINT16, DEFAULT_RTCP_MTU,
491           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
492
493   g_object_class_install_property (gobject_class, PROP_SDES,
494       g_param_spec_boxed ("sdes", "SDES",
495           "The SDES items of this session",
496           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
497           | GST_PARAM_DOC_SHOW_DEFAULT));
498
499   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
500       g_param_spec_uint ("num-sources", "Num Sources",
501           "The number of sources in the session", 0, G_MAXUINT,
502           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
503
504   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
505       g_param_spec_uint ("num-active-sources", "Num Active Sources",
506           "The number of active sources in the session", 0, G_MAXUINT,
507           DEFAULT_NUM_ACTIVE_SOURCES,
508           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
509   /**
510    * RTPSource:sources
511    *
512    * Get a GValue Array of all sources in the session.
513    *
514    * ## Getting the #RTPSources of a session
515    *
516    * ``` C
517    * {
518    *   GValueArray *arr;
519    *   GValue *val;
520    *   guint i;
521    *
522    *   g_object_get (sess, "sources", &arr, NULL);
523    *
524    *   for (i = 0; i < arr->n_values; i++) {
525    *     RTPSource *source;
526    *
527    *     val = g_value_array_get_nth (arr, i);
528    *     source = g_value_get_object (val);
529    *   }
530    *   g_value_array_free (arr);
531    * }
532    * ```
533    */
534   g_object_class_install_property (gobject_class, PROP_SOURCES,
535       g_param_spec_boxed ("sources", "Sources",
536           "An array of all known sources in the session",
537           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
538
539   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
540       g_param_spec_boolean ("favor-new", "Favor new sources",
541           "Resolve SSRC conflict in favor of new sources", FALSE,
542           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
543
544   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
545       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
546           "Minimum interval between Regular RTCP packet (in ns)",
547           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
548           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
549
550   g_object_class_install_property (gobject_class,
551       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
552       g_param_spec_uint64 ("rtcp-feedback-retention-window",
553           "RTCP Feedback retention window",
554           "Duration during which RTCP Feedback packets are retained (in ns)",
555           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
556           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
557
558   g_object_class_install_property (gobject_class,
559       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
560       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
561           "RTCP Immediate Feedback threshold",
562           "The maximum number of members of a RTP session for which immediate"
563           " feedback is used (DEPRECATED: has no effect and is not needed)",
564           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
565           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
566
567   g_object_class_install_property (gobject_class, PROP_PROBATION,
568       g_param_spec_uint ("probation", "Number of probations",
569           "Consecutive packet sequence numbers to accept the source",
570           0, G_MAXUINT, DEFAULT_PROBATION,
571           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
572
573   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
574       g_param_spec_uint ("max-dropout-time", "Max dropout time",
575           "The maximum time (milliseconds) of missing packets tolerated.",
576           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
577           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
578
579   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
580       g_param_spec_uint ("max-misorder-time", "Max misorder time",
581           "The maximum time (milliseconds) of misordered packets tolerated.",
582           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
583           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
584
585   /**
586    * RTPSession:stats:
587    *
588    * Various session statistics. This property returns a GstStructure
589    * with name application/x-rtp-session-stats with the following fields:
590    *
591    * * "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
592    *      dropped (due to bandwidth constraints)
593    * *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
594    * *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
595    * *  "source-stats"    G_TYPE_BOXED  GValueArray of #RTPSource:stats for all
596    *      RTP sources (Since 1.8)
597    *
598    * Since: 1.4
599    */
600   g_object_class_install_property (gobject_class, PROP_STATS,
601       g_param_spec_boxed ("stats", "Statistics",
602           "Various statistics", GST_TYPE_STRUCTURE,
603           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
604
605   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
606       g_param_spec_enum ("rtp-profile", "RTP Profile",
607           "RTP profile to use for this session", GST_TYPE_RTP_PROFILE,
608           DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
609
610   g_object_class_install_property (gobject_class, PROP_RTCP_REDUCED_SIZE,
611       g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size",
612           "Use Reduced Size RTCP for feedback packets",
613           DEFAULT_RTCP_REDUCED_SIZE,
614           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
615
616   /**
617    * RTPSession:disable-sr-timestamp:
618    *
619    * Whether sender reports should be timestamped.
620    *
621    * Since: 1.16
622    */
623   g_object_class_install_property (gobject_class,
624       PROP_RTCP_DISABLE_SR_TIMESTAMP,
625       g_param_spec_boolean ("disable-sr-timestamp",
626           "Disable Sender Report Timestamp",
627           "Whether sender reports should be timestamped",
628           DEFAULT_RTCP_DISABLE_SR_TIMESTAMP,
629           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
630
631   /**
632    * RTPSession:twcc-feedback-interval:
633    *
634    * The interval to send TWCC reports on.
635    * This overrides the default behavior of sending reports
636    * based on marker-bits.
637    *
638    * Since: 1.20
639    */
640   g_object_class_install_property (gobject_class,
641       PROP_TWCC_FEEDBACK_INTERVAL,
642       g_param_spec_uint64 ("twcc-feedback-interval",
643           "TWCC Feedback Interval",
644           "The interval to send TWCC reports on",
645           0, G_MAXUINT64, DEFAULT_TWCC_FEEDBACK_INTERVAL,
646           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
647
648   klass->get_source_by_ssrc =
649       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
650   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
651
652   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
653 }
654
655 static void
656 rtp_session_init (RTPSession * sess)
657 {
658   gint i;
659   gchar *str;
660
661   g_mutex_init (&sess->lock);
662   sess->key = g_random_int ();
663   sess->mask_idx = 0;
664   sess->mask = 0;
665
666   /* TODO: We currently only use the first hash table but this is the
667    * beginning of an implementation for RFC2762
668    for (i = 0; i < 32; i++) {
669    */
670   for (i = 0; i < 1; i++) {
671     sess->ssrcs[i] =
672         g_hash_table_new_full (NULL, NULL, NULL,
673         (GDestroyNotify) g_object_unref);
674   }
675
676   rtp_stats_init_defaults (&sess->stats);
677   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
678   rtp_stats_set_min_interval (&sess->stats,
679       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
680
681   sess->recalc_bandwidth = TRUE;
682   sess->bandwidth = DEFAULT_BANDWIDTH;
683   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
684   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
685   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
686
687   /* default UDP header length */
688   sess->header_len = UDP_IP_HEADER_OVERHEAD;
689   sess->mtu = DEFAULT_RTCP_MTU;
690
691   sess->probation = DEFAULT_PROBATION;
692   sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
693   sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
694
695   /* some default SDES entries */
696   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
697
698   /* we do not want to leak details like the username or hostname here */
699   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
700   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
701   g_free (str);
702
703 #if 0
704   /* we do not want to leak the user's real name here */
705   str = g_strdup_printf ("Anon%u", g_random_int ());
706   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
707   g_free (str);
708 #endif
709
710   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
711
712   /* this is the SSRC we suggest */
713   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
714   sess->internal_ssrc_set = FALSE;
715
716   sess->first_rtcp = TRUE;
717   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
718   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
719   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
720   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
721
722   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
723   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
724   sess->rtcp_immediate_feedback_threshold =
725       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
726   sess->rtp_profile = DEFAULT_RTP_PROFILE;
727   sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
728   sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
729
730   sess->is_doing_ptp = TRUE;
731
732   sess->twcc = rtp_twcc_manager_new (sess->mtu);
733   sess->twcc_stats = rtp_twcc_stats_new ();
734 }
735
736 static void
737 rtp_session_finalize (GObject * object)
738 {
739   RTPSession *sess;
740   gint i;
741
742   sess = RTP_SESSION_CAST (object);
743
744   gst_structure_free (sess->sdes);
745
746   g_list_free_full (sess->conflicting_addresses,
747       (GDestroyNotify) rtp_conflicting_address_free);
748
749   /* TODO: Change this again when implementing RFC 2762
750    * for (i = 0; i < 32; i++)
751    */
752   for (i = 0; i < 1; i++)
753     g_hash_table_destroy (sess->ssrcs[i]);
754
755   g_object_unref (sess->twcc);
756   rtp_twcc_stats_free (sess->twcc_stats);
757
758   g_mutex_clear (&sess->lock);
759
760   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
761 }
762
763 static void
764 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
765 {
766   GValue value = { 0 };
767
768   g_value_init (&value, RTP_TYPE_SOURCE);
769   g_value_take_object (&value, source);
770   /* copies the value */
771   g_value_array_append (arr, &value);
772 }
773
774 static GValueArray *
775 rtp_session_create_sources (RTPSession * sess)
776 {
777   GValueArray *res;
778   guint size;
779
780   RTP_SESSION_LOCK (sess);
781   /* get number of elements in the table */
782   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
783   /* create the result value array */
784   res = g_value_array_new (size);
785
786   /* and copy all values into the array */
787   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
788   RTP_SESSION_UNLOCK (sess);
789
790   return res;
791 }
792
793 static void
794 create_source_stats (gpointer key, RTPSource * source, GValueArray * arr)
795 {
796   GValue *value;
797   GstStructure *s;
798
799   g_object_get (source, "stats", &s, NULL);
800
801   g_value_array_append (arr, NULL);
802   value = g_value_array_get_nth (arr, arr->n_values - 1);
803   g_value_init (value, GST_TYPE_STRUCTURE);
804   g_value_take_boxed (value, s);
805 }
806
807 static GstStructure *
808 rtp_session_create_stats (RTPSession * sess)
809 {
810   GstStructure *s;
811   GValueArray *source_stats;
812   GValue source_stats_v = G_VALUE_INIT;
813   guint size;
814
815   RTP_SESSION_LOCK (sess);
816   s = gst_structure_new ("application/x-rtp-session-stats",
817       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
818       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
819       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
820
821   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
822   source_stats = g_value_array_new (size);
823   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
824       (GHFunc) create_source_stats, source_stats);
825   RTP_SESSION_UNLOCK (sess);
826
827   g_value_init (&source_stats_v, G_TYPE_VALUE_ARRAY);
828   g_value_take_boxed (&source_stats_v, source_stats);
829   gst_structure_take_value (s, "source-stats", &source_stats_v);
830
831   return s;
832 }
833
834 static void
835 rtp_session_set_property (GObject * object, guint prop_id,
836     const GValue * value, GParamSpec * pspec)
837 {
838   RTPSession *sess;
839
840   sess = RTP_SESSION (object);
841
842   switch (prop_id) {
843     case PROP_INTERNAL_SSRC:
844       RTP_SESSION_LOCK (sess);
845       sess->suggested_ssrc = g_value_get_uint (value);
846       sess->internal_ssrc_set = TRUE;
847       sess->internal_ssrc_from_caps_or_property = TRUE;
848       RTP_SESSION_UNLOCK (sess);
849       if (sess->callbacks.reconfigure)
850         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
851       break;
852     case PROP_BANDWIDTH:
853       RTP_SESSION_LOCK (sess);
854       sess->bandwidth = g_value_get_double (value);
855       sess->recalc_bandwidth = TRUE;
856       RTP_SESSION_UNLOCK (sess);
857       break;
858     case PROP_RTCP_FRACTION:
859       RTP_SESSION_LOCK (sess);
860       sess->rtcp_bandwidth = g_value_get_double (value);
861       sess->recalc_bandwidth = TRUE;
862       RTP_SESSION_UNLOCK (sess);
863       break;
864     case PROP_RTCP_RR_BANDWIDTH:
865       RTP_SESSION_LOCK (sess);
866       sess->rtcp_rr_bandwidth = g_value_get_int (value);
867       sess->recalc_bandwidth = TRUE;
868       RTP_SESSION_UNLOCK (sess);
869       break;
870     case PROP_RTCP_RS_BANDWIDTH:
871       RTP_SESSION_LOCK (sess);
872       sess->rtcp_rs_bandwidth = g_value_get_int (value);
873       sess->recalc_bandwidth = TRUE;
874       RTP_SESSION_UNLOCK (sess);
875       break;
876     case PROP_RTCP_MTU:
877       sess->mtu = g_value_get_uint (value);
878       rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu);
879       break;
880     case PROP_SDES:
881       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
882       break;
883     case PROP_FAVOR_NEW:
884       sess->favor_new = g_value_get_boolean (value);
885       break;
886     case PROP_RTCP_MIN_INTERVAL:
887       rtp_stats_set_min_interval (&sess->stats,
888           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
889       /* trigger reconsideration */
890       RTP_SESSION_LOCK (sess);
891       sess->next_rtcp_check_time = 0;
892       RTP_SESSION_UNLOCK (sess);
893       if (sess->callbacks.reconsider)
894         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
895       break;
896     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
897       sess->rtcp_feedback_retention_window = g_value_get_uint64 (value);
898       break;
899     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
900       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
901       break;
902     case PROP_PROBATION:
903       sess->probation = g_value_get_uint (value);
904       break;
905     case PROP_MAX_DROPOUT_TIME:
906       sess->max_dropout_time = g_value_get_uint (value);
907       break;
908     case PROP_MAX_MISORDER_TIME:
909       sess->max_misorder_time = g_value_get_uint (value);
910       break;
911     case PROP_RTP_PROFILE:
912       sess->rtp_profile = g_value_get_enum (value);
913       /* trigger reconsideration */
914       RTP_SESSION_LOCK (sess);
915       sess->next_rtcp_check_time = 0;
916       RTP_SESSION_UNLOCK (sess);
917       if (sess->callbacks.reconsider)
918         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
919       break;
920     case PROP_RTCP_REDUCED_SIZE:
921       sess->reduced_size_rtcp = g_value_get_boolean (value);
922       break;
923     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
924       sess->timestamp_sender_reports = !g_value_get_boolean (value);
925       break;
926     case PROP_TWCC_FEEDBACK_INTERVAL:
927       rtp_twcc_manager_set_feedback_interval (sess->twcc,
928           g_value_get_uint64 (value));
929       break;
930     default:
931       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
932       break;
933   }
934 }
935
936 static void
937 rtp_session_get_property (GObject * object, guint prop_id,
938     GValue * value, GParamSpec * pspec)
939 {
940   RTPSession *sess;
941
942   sess = RTP_SESSION (object);
943
944   switch (prop_id) {
945     case PROP_INTERNAL_SSRC:
946       g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL));
947       break;
948     case PROP_INTERNAL_SOURCE:
949       /* FIXME, return a random source */
950       g_value_set_object (value, NULL);
951       break;
952     case PROP_BANDWIDTH:
953       g_value_set_double (value, sess->bandwidth);
954       break;
955     case PROP_RTCP_FRACTION:
956       g_value_set_double (value, sess->rtcp_bandwidth);
957       break;
958     case PROP_RTCP_RR_BANDWIDTH:
959       g_value_set_int (value, sess->rtcp_rr_bandwidth);
960       break;
961     case PROP_RTCP_RS_BANDWIDTH:
962       g_value_set_int (value, sess->rtcp_rs_bandwidth);
963       break;
964     case PROP_RTCP_MTU:
965       g_value_set_uint (value, sess->mtu);
966       break;
967     case PROP_SDES:
968       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
969       break;
970     case PROP_NUM_SOURCES:
971       g_value_set_uint (value, rtp_session_get_num_sources (sess));
972       break;
973     case PROP_NUM_ACTIVE_SOURCES:
974       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
975       break;
976     case PROP_SOURCES:
977       g_value_take_boxed (value, rtp_session_create_sources (sess));
978       break;
979     case PROP_FAVOR_NEW:
980       g_value_set_boolean (value, sess->favor_new);
981       break;
982     case PROP_RTCP_MIN_INTERVAL:
983       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
984       break;
985     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
986       g_value_set_uint64 (value, sess->rtcp_feedback_retention_window);
987       break;
988     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
989       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
990       break;
991     case PROP_PROBATION:
992       g_value_set_uint (value, sess->probation);
993       break;
994     case PROP_MAX_DROPOUT_TIME:
995       g_value_set_uint (value, sess->max_dropout_time);
996       break;
997     case PROP_MAX_MISORDER_TIME:
998       g_value_set_uint (value, sess->max_misorder_time);
999       break;
1000     case PROP_STATS:
1001       g_value_take_boxed (value, rtp_session_create_stats (sess));
1002       break;
1003     case PROP_RTP_PROFILE:
1004       g_value_set_enum (value, sess->rtp_profile);
1005       break;
1006     case PROP_RTCP_REDUCED_SIZE:
1007       g_value_set_boolean (value, sess->reduced_size_rtcp);
1008       break;
1009     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
1010       g_value_set_boolean (value, !sess->timestamp_sender_reports);
1011       break;
1012     case PROP_TWCC_FEEDBACK_INTERVAL:
1013       g_value_set_uint64 (value,
1014           rtp_twcc_manager_get_feedback_interval (sess->twcc));
1015       break;
1016     default:
1017       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1018       break;
1019   }
1020 }
1021
1022 static void
1023 on_new_ssrc (RTPSession * sess, RTPSource * source)
1024 {
1025   g_object_ref (source);
1026   RTP_SESSION_UNLOCK (sess);
1027   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
1028   RTP_SESSION_LOCK (sess);
1029   g_object_unref (source);
1030 }
1031
1032 static void
1033 on_ssrc_collision (RTPSession * sess, RTPSource * source)
1034 {
1035   g_object_ref (source);
1036   RTP_SESSION_UNLOCK (sess);
1037   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
1038       source);
1039   RTP_SESSION_LOCK (sess);
1040   g_object_unref (source);
1041 }
1042
1043 static void
1044 on_ssrc_validated (RTPSession * sess, RTPSource * source)
1045 {
1046   g_object_ref (source);
1047   RTP_SESSION_UNLOCK (sess);
1048   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
1049       source);
1050   RTP_SESSION_LOCK (sess);
1051   g_object_unref (source);
1052 }
1053
1054 static void
1055 on_ssrc_active (RTPSession * sess, RTPSource * source)
1056 {
1057   g_object_ref (source);
1058   RTP_SESSION_UNLOCK (sess);
1059   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
1060   RTP_SESSION_LOCK (sess);
1061   g_object_unref (source);
1062 }
1063
1064 static void
1065 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
1066 {
1067   g_object_ref (source);
1068   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
1069   RTP_SESSION_UNLOCK (sess);
1070   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
1071   RTP_SESSION_LOCK (sess);
1072   g_object_unref (source);
1073 }
1074
1075 static void
1076 on_bye_ssrc (RTPSession * sess, RTPSource * source)
1077 {
1078   g_object_ref (source);
1079   RTP_SESSION_UNLOCK (sess);
1080   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
1081   RTP_SESSION_LOCK (sess);
1082   g_object_unref (source);
1083 }
1084
1085 static void
1086 on_bye_timeout (RTPSession * sess, RTPSource * source)
1087 {
1088   g_object_ref (source);
1089   RTP_SESSION_UNLOCK (sess);
1090   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
1091   RTP_SESSION_LOCK (sess);
1092   g_object_unref (source);
1093 }
1094
1095 static void
1096 on_timeout (RTPSession * sess, RTPSource * source)
1097 {
1098   g_object_ref (source);
1099   RTP_SESSION_UNLOCK (sess);
1100   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
1101   RTP_SESSION_LOCK (sess);
1102   g_object_unref (source);
1103 }
1104
1105 static void
1106 on_sender_timeout (RTPSession * sess, RTPSource * source)
1107 {
1108   g_object_ref (source);
1109   RTP_SESSION_UNLOCK (sess);
1110   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
1111       source);
1112   RTP_SESSION_LOCK (sess);
1113   g_object_unref (source);
1114 }
1115
1116 static void
1117 on_new_sender_ssrc (RTPSession * sess, RTPSource * source)
1118 {
1119   g_object_ref (source);
1120   RTP_SESSION_UNLOCK (sess);
1121   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
1122       source);
1123   RTP_SESSION_LOCK (sess);
1124   g_object_unref (source);
1125 }
1126
1127 static void
1128 on_sender_ssrc_active (RTPSession * sess, RTPSource * source)
1129 {
1130   g_object_ref (source);
1131   RTP_SESSION_UNLOCK (sess);
1132   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
1133       source);
1134   RTP_SESSION_LOCK (sess);
1135   g_object_unref (source);
1136 }
1137
1138 /**
1139  * rtp_session_new:
1140  *
1141  * Create a new session object.
1142  *
1143  * Returns: a new #RTPSession. g_object_unref() after usage.
1144  */
1145 RTPSession *
1146 rtp_session_new (void)
1147 {
1148   RTPSession *sess;
1149
1150   sess = g_object_new (RTP_TYPE_SESSION, NULL);
1151
1152   return sess;
1153 }
1154
1155 /**
1156  * rtp_session_reset:
1157  * @sess: an #RTPSession
1158  *
1159  * Reset the sources of @sess.
1160  */
1161 void
1162 rtp_session_reset (RTPSession * sess)
1163 {
1164   g_return_if_fail (RTP_IS_SESSION (sess));
1165
1166   /* remove all sources */
1167   g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]);
1168   sess->total_sources = 0;
1169   sess->stats.sender_sources = 0;
1170   sess->stats.internal_sender_sources = 0;
1171   sess->stats.internal_sources = 0;
1172   sess->stats.active_sources = 0;
1173
1174   sess->generation = 0;
1175   sess->first_rtcp = TRUE;
1176   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
1177   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
1178   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
1179   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
1180   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
1181   sess->scheduled_bye = FALSE;
1182
1183   /* reset session stats */
1184   sess->stats.bye_members = 0;
1185   sess->stats.nacks_dropped = 0;
1186   sess->stats.nacks_sent = 0;
1187   sess->stats.nacks_received = 0;
1188
1189   sess->is_doing_ptp = TRUE;
1190
1191   g_list_free_full (sess->conflicting_addresses,
1192       (GDestroyNotify) rtp_conflicting_address_free);
1193   sess->conflicting_addresses = NULL;
1194 }
1195
1196 /**
1197  * rtp_session_set_callbacks:
1198  * @sess: an #RTPSession
1199  * @callbacks: callbacks to configure
1200  * @user_data: user data passed in the callbacks
1201  *
1202  * Configure a set of callbacks to be notified of actions.
1203  */
1204 void
1205 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
1206     gpointer user_data)
1207 {
1208   g_return_if_fail (RTP_IS_SESSION (sess));
1209
1210   if (callbacks->process_rtp) {
1211     sess->callbacks.process_rtp = callbacks->process_rtp;
1212     sess->process_rtp_user_data = user_data;
1213   }
1214   if (callbacks->send_rtp) {
1215     sess->callbacks.send_rtp = callbacks->send_rtp;
1216     sess->send_rtp_user_data = user_data;
1217   }
1218   if (callbacks->send_rtcp) {
1219     sess->callbacks.send_rtcp = callbacks->send_rtcp;
1220     sess->send_rtcp_user_data = user_data;
1221   }
1222   if (callbacks->sync_rtcp) {
1223     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
1224     sess->sync_rtcp_user_data = user_data;
1225   }
1226   if (callbacks->clock_rate) {
1227     sess->callbacks.clock_rate = callbacks->clock_rate;
1228     sess->clock_rate_user_data = user_data;
1229   }
1230   if (callbacks->reconsider) {
1231     sess->callbacks.reconsider = callbacks->reconsider;
1232     sess->reconsider_user_data = user_data;
1233   }
1234   if (callbacks->request_key_unit) {
1235     sess->callbacks.request_key_unit = callbacks->request_key_unit;
1236     sess->request_key_unit_user_data = user_data;
1237   }
1238   if (callbacks->request_time) {
1239     sess->callbacks.request_time = callbacks->request_time;
1240     sess->request_time_user_data = user_data;
1241   }
1242   if (callbacks->notify_nack) {
1243     sess->callbacks.notify_nack = callbacks->notify_nack;
1244     sess->notify_nack_user_data = user_data;
1245   }
1246   if (callbacks->notify_twcc) {
1247     sess->callbacks.notify_twcc = callbacks->notify_twcc;
1248     sess->notify_twcc_user_data = user_data;
1249   }
1250   if (callbacks->reconfigure) {
1251     sess->callbacks.reconfigure = callbacks->reconfigure;
1252     sess->reconfigure_user_data = user_data;
1253   }
1254   if (callbacks->notify_early_rtcp) {
1255     sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp;
1256     sess->notify_early_rtcp_user_data = user_data;
1257   }
1258 }
1259
1260 /**
1261  * rtp_session_set_process_rtp_callback:
1262  * @sess: an #RTPSession
1263  * @callback: callback to set
1264  * @user_data: user data passed in the callback
1265  *
1266  * Configure only the process_rtp callback to be notified of the process_rtp action.
1267  */
1268 void
1269 rtp_session_set_process_rtp_callback (RTPSession * sess,
1270     RTPSessionProcessRTP callback, gpointer user_data)
1271 {
1272   g_return_if_fail (RTP_IS_SESSION (sess));
1273
1274   sess->callbacks.process_rtp = callback;
1275   sess->process_rtp_user_data = user_data;
1276 }
1277
1278 /**
1279  * rtp_session_set_send_rtp_callback:
1280  * @sess: an #RTPSession
1281  * @callback: callback to set
1282  * @user_data: user data passed in the callback
1283  *
1284  * Configure only the send_rtp callback to be notified of the send_rtp action.
1285  */
1286 void
1287 rtp_session_set_send_rtp_callback (RTPSession * sess,
1288     RTPSessionSendRTP callback, gpointer user_data)
1289 {
1290   g_return_if_fail (RTP_IS_SESSION (sess));
1291
1292   sess->callbacks.send_rtp = callback;
1293   sess->send_rtp_user_data = user_data;
1294 }
1295
1296 /**
1297  * rtp_session_set_send_rtcp_callback:
1298  * @sess: an #RTPSession
1299  * @callback: callback to set
1300  * @user_data: user data passed in the callback
1301  *
1302  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
1303  */
1304 void
1305 rtp_session_set_send_rtcp_callback (RTPSession * sess,
1306     RTPSessionSendRTCP callback, gpointer user_data)
1307 {
1308   g_return_if_fail (RTP_IS_SESSION (sess));
1309
1310   sess->callbacks.send_rtcp = callback;
1311   sess->send_rtcp_user_data = user_data;
1312 }
1313
1314 /**
1315  * rtp_session_set_sync_rtcp_callback:
1316  * @sess: an #RTPSession
1317  * @callback: callback to set
1318  * @user_data: user data passed in the callback
1319  *
1320  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
1321  */
1322 void
1323 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
1324     RTPSessionSyncRTCP callback, gpointer user_data)
1325 {
1326   g_return_if_fail (RTP_IS_SESSION (sess));
1327
1328   sess->callbacks.sync_rtcp = callback;
1329   sess->sync_rtcp_user_data = user_data;
1330 }
1331
1332 /**
1333  * rtp_session_set_clock_rate_callback:
1334  * @sess: an #RTPSession
1335  * @callback: callback to set
1336  * @user_data: user data passed in the callback
1337  *
1338  * Configure only the clock_rate callback to be notified of the clock_rate action.
1339  */
1340 void
1341 rtp_session_set_clock_rate_callback (RTPSession * sess,
1342     RTPSessionClockRate callback, gpointer user_data)
1343 {
1344   g_return_if_fail (RTP_IS_SESSION (sess));
1345
1346   sess->callbacks.clock_rate = callback;
1347   sess->clock_rate_user_data = user_data;
1348 }
1349
1350 /**
1351  * rtp_session_set_reconsider_callback:
1352  * @sess: an #RTPSession
1353  * @callback: callback to set
1354  * @user_data: user data passed in the callback
1355  *
1356  * Configure only the reconsider callback to be notified of the reconsider action.
1357  */
1358 void
1359 rtp_session_set_reconsider_callback (RTPSession * sess,
1360     RTPSessionReconsider callback, gpointer user_data)
1361 {
1362   g_return_if_fail (RTP_IS_SESSION (sess));
1363
1364   sess->callbacks.reconsider = callback;
1365   sess->reconsider_user_data = user_data;
1366 }
1367
1368 /**
1369  * rtp_session_set_request_time_callback:
1370  * @sess: an #RTPSession
1371  * @callback: callback to set
1372  * @user_data: user data passed in the callback
1373  *
1374  * Configure only the request_time callback
1375  */
1376 void
1377 rtp_session_set_request_time_callback (RTPSession * sess,
1378     RTPSessionRequestTime callback, gpointer user_data)
1379 {
1380   g_return_if_fail (RTP_IS_SESSION (sess));
1381
1382   sess->callbacks.request_time = callback;
1383   sess->request_time_user_data = user_data;
1384 }
1385
1386 /**
1387  * rtp_session_set_bandwidth:
1388  * @sess: an #RTPSession
1389  * @bandwidth: the bandwidth allocated
1390  *
1391  * Set the session bandwidth in bits per second.
1392  */
1393 void
1394 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1395 {
1396   g_return_if_fail (RTP_IS_SESSION (sess));
1397
1398   RTP_SESSION_LOCK (sess);
1399   sess->stats.bandwidth = bandwidth;
1400   RTP_SESSION_UNLOCK (sess);
1401 }
1402
1403 /**
1404  * rtp_session_get_bandwidth:
1405  * @sess: an #RTPSession
1406  *
1407  * Get the session bandwidth.
1408  *
1409  * Returns: the session bandwidth.
1410  */
1411 gdouble
1412 rtp_session_get_bandwidth (RTPSession * sess)
1413 {
1414   gdouble result;
1415
1416   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1417
1418   RTP_SESSION_LOCK (sess);
1419   result = sess->stats.bandwidth;
1420   RTP_SESSION_UNLOCK (sess);
1421
1422   return result;
1423 }
1424
1425 /**
1426  * rtp_session_set_rtcp_fraction:
1427  * @sess: an #RTPSession
1428  * @bandwidth: the RTCP bandwidth
1429  *
1430  * Set the bandwidth in bits per second that should be used for RTCP
1431  * messages.
1432  */
1433 void
1434 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1435 {
1436   g_return_if_fail (RTP_IS_SESSION (sess));
1437
1438   RTP_SESSION_LOCK (sess);
1439   sess->stats.rtcp_bandwidth = bandwidth;
1440   RTP_SESSION_UNLOCK (sess);
1441 }
1442
1443 /**
1444  * rtp_session_get_rtcp_fraction:
1445  * @sess: an #RTPSession
1446  *
1447  * Get the session bandwidth used for RTCP.
1448  *
1449  * Returns: The bandwidth used for RTCP messages.
1450  */
1451 gdouble
1452 rtp_session_get_rtcp_fraction (RTPSession * sess)
1453 {
1454   gdouble result;
1455
1456   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1457
1458   RTP_SESSION_LOCK (sess);
1459   result = sess->stats.rtcp_bandwidth;
1460   RTP_SESSION_UNLOCK (sess);
1461
1462   return result;
1463 }
1464
1465 /**
1466  * rtp_session_get_sdes_struct:
1467  * @sess: an #RTSPSession
1468  *
1469  * Get the SDES data as a #GstStructure
1470  *
1471  * Returns: a GstStructure with SDES items for @sess. This function returns a
1472  * copy of the SDES structure, use gst_structure_free() after usage.
1473  */
1474 GstStructure *
1475 rtp_session_get_sdes_struct (RTPSession * sess)
1476 {
1477   GstStructure *result = NULL;
1478
1479   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1480
1481   RTP_SESSION_LOCK (sess);
1482   if (sess->sdes)
1483     result = gst_structure_copy (sess->sdes);
1484   RTP_SESSION_UNLOCK (sess);
1485
1486   return result;
1487 }
1488
1489 static void
1490 source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes)
1491 {
1492   rtp_source_set_sdes_struct (source, gst_structure_copy (sdes));
1493 }
1494
1495 /**
1496  * rtp_session_set_sdes_struct:
1497  * @sess: an #RTSPSession
1498  * @sdes: a #GstStructure
1499  *
1500  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1501  */
1502 void
1503 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1504 {
1505   g_return_if_fail (sdes);
1506   g_return_if_fail (RTP_IS_SESSION (sess));
1507
1508   RTP_SESSION_LOCK (sess);
1509   if (sess->sdes)
1510     gst_structure_free (sess->sdes);
1511   sess->sdes = gst_structure_copy (sdes);
1512
1513   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1514       (GHFunc) source_set_sdes, sess->sdes);
1515   RTP_SESSION_UNLOCK (sess);
1516 }
1517
1518 static GstFlowReturn
1519 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1520 {
1521   GstFlowReturn result = GST_FLOW_OK;
1522
1523   if (source->internal) {
1524     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1525
1526     RTP_SESSION_UNLOCK (session);
1527
1528     if (session->callbacks.send_rtp)
1529       result =
1530           session->callbacks.send_rtp (session, source, data,
1531           session->send_rtp_user_data);
1532     else {
1533       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1534     }
1535   } else {
1536     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1537     RTP_SESSION_UNLOCK (session);
1538
1539     if (session->callbacks.process_rtp)
1540       result =
1541           session->callbacks.process_rtp (session, source,
1542           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1543     else
1544       gst_buffer_unref (GST_BUFFER_CAST (data));
1545   }
1546   RTP_SESSION_LOCK (session);
1547
1548   return result;
1549 }
1550
1551 static gint
1552 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1553 {
1554   gint result;
1555
1556   RTP_SESSION_UNLOCK (session);
1557
1558   if (session->callbacks.clock_rate)
1559     result =
1560         session->callbacks.clock_rate (session, pt,
1561         session->clock_rate_user_data);
1562   else
1563     result = -1;
1564
1565   RTP_SESSION_LOCK (session);
1566
1567   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1568
1569   return result;
1570 }
1571
1572 static RTPSourceCallbacks callbacks = {
1573   (RTPSourcePushRTP) source_push_rtp,
1574   (RTPSourceClockRate) source_clock_rate,
1575 };
1576
1577
1578 /**
1579  * rtp_session_find_conflicting_address:
1580  * @session: The session the packet came in
1581  * @address: address to check for
1582  * @time: The time when the packet that is possibly in conflict arrived
1583  *
1584  * Checks if an address which has a conflict is already known. If it is
1585  * a known conflict, remember the time
1586  *
1587  * Returns: TRUE if it was a known conflict, FALSE otherwise
1588  */
1589 static gboolean
1590 rtp_session_find_conflicting_address (RTPSession * session,
1591     GSocketAddress * address, GstClockTime time)
1592 {
1593   return find_conflicting_address (session->conflicting_addresses, address,
1594       time);
1595 }
1596
1597 /**
1598  * rtp_session_add_conflicting_address:
1599  * @session: The session the packet came in
1600  * @address: address to remember
1601  * @time: The time when the packet that is in conflict arrived
1602  *
1603  * Adds a new conflict address
1604  */
1605 static void
1606 rtp_session_add_conflicting_address (RTPSession * sess,
1607     GSocketAddress * address, GstClockTime time)
1608 {
1609   sess->conflicting_addresses =
1610       add_conflicting_address (sess->conflicting_addresses, address, time);
1611 }
1612
1613 static void
1614 rtp_session_have_conflict (RTPSession * sess, RTPSource * source,
1615     GSocketAddress * address, GstClockTime current_time)
1616 {
1617   guint32 ssrc = rtp_source_get_ssrc (source);
1618
1619   /* Its a new collision, lets change our SSRC */
1620   rtp_session_add_conflicting_address (sess, address, current_time);
1621
1622   /* mark the source BYE */
1623   rtp_source_mark_bye (source, "SSRC Collision");
1624   /* if we were suggesting this SSRC, change to something else */
1625   if (sess->suggested_ssrc == ssrc) {
1626     sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1627     sess->internal_ssrc_set = TRUE;
1628   }
1629
1630   on_ssrc_collision (sess, source);
1631
1632   rtp_session_schedule_bye_locked (sess, current_time);
1633 }
1634
1635 static gboolean
1636 check_collision (RTPSession * sess, RTPSource * source,
1637     RTPPacketInfo * pinfo, gboolean rtp)
1638 {
1639   guint32 ssrc;
1640
1641   /* If we have no pinfo address, we can't do collision checking */
1642   if (!pinfo->address)
1643     return FALSE;
1644
1645   ssrc = rtp_source_get_ssrc (source);
1646
1647   if (!source->internal) {
1648     GSocketAddress *from;
1649
1650     /* This is not our local source, but lets check if two remote
1651      * source collide */
1652     if (rtp) {
1653       from = source->rtp_from;
1654     } else {
1655       from = source->rtcp_from;
1656     }
1657
1658     if (from) {
1659       if (__g_socket_address_equal (from, pinfo->address)) {
1660         /* Address is the same */
1661         return FALSE;
1662       } else {
1663         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1664         if (sess->favor_new) {
1665           if (rtp_source_find_conflicting_address (source,
1666                   pinfo->address, pinfo->current_time)) {
1667             gchar *buf1;
1668
1669             buf1 = __g_socket_address_to_string (pinfo->address);
1670             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1671                 buf1);
1672             g_free (buf1);
1673
1674             return TRUE;
1675           } else {
1676             gchar *buf1, *buf2;
1677
1678             /* Current address is not a known conflict, lets assume this is
1679              * a new source. Save old address in possible conflict list
1680              */
1681             rtp_source_add_conflicting_address (source, from,
1682                 pinfo->current_time);
1683
1684             buf1 = __g_socket_address_to_string (from);
1685             buf2 = __g_socket_address_to_string (pinfo->address);
1686
1687             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1688                 " saving old as known conflict", ssrc, buf1, buf2);
1689
1690             if (rtp)
1691               rtp_source_set_rtp_from (source, pinfo->address);
1692             else
1693               rtp_source_set_rtcp_from (source, pinfo->address);
1694
1695             g_free (buf1);
1696             g_free (buf2);
1697
1698             return FALSE;
1699           }
1700         } else {
1701           /* Don't need to save old addresses, we ignore new sources */
1702           return TRUE;
1703         }
1704       }
1705     } else {
1706       /* We don't already have a from address for RTP, just set it */
1707       if (rtp)
1708         rtp_source_set_rtp_from (source, pinfo->address);
1709       else
1710         rtp_source_set_rtcp_from (source, pinfo->address);
1711       return FALSE;
1712     }
1713
1714     /* FIXME: Log 3rd party collision somehow
1715      * Maybe should be done in upper layer, only the SDES can tell us
1716      * if its a collision or a loop
1717      */
1718   } else {
1719     /* This is sending with our ssrc, is it an address we already know */
1720     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1721             pinfo->current_time)) {
1722       /* Its a known conflict, its probably a loop, not a collision
1723        * lets just drop the incoming packet
1724        */
1725       GST_DEBUG ("Our packets are being looped back to us, dropping");
1726     } else {
1727       GST_DEBUG ("Collision for SSRC %x from new incoming packet,"
1728           " change our sender ssrc", ssrc);
1729
1730       rtp_session_have_conflict (sess, source, pinfo->address,
1731           pinfo->current_time);
1732     }
1733   }
1734
1735   return TRUE;
1736 }
1737
1738 typedef struct
1739 {
1740   gboolean is_doing_ptp;
1741   GSocketAddress *new_addr;
1742 } CompareAddrData;
1743
1744 /* check if the two given ip addr are the same (do not care about the port) */
1745 static gboolean
1746 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1747 {
1748   return
1749       g_inet_address_equal (g_inet_socket_address_get_address
1750       (G_INET_SOCKET_ADDRESS (a)),
1751       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1752 }
1753
1754 static void
1755 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1756     CompareAddrData * data)
1757 {
1758   /* only compare ip addr of remote sources which are also not closing */
1759   if (!source->internal && !source->closing && source->rtp_from) {
1760     /* look for the first rtp source */
1761     if (!data->new_addr)
1762       data->new_addr = source->rtp_from;
1763     /* compare current ip addr with the first one */
1764     else
1765       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1766   }
1767 }
1768
1769 static void
1770 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1771     CompareAddrData * data)
1772 {
1773   /* only compare ip addr of remote sources which are also not closing */
1774   if (!source->internal && !source->closing && source->rtcp_from) {
1775     /* look for the first rtcp source */
1776     if (!data->new_addr)
1777       data->new_addr = source->rtcp_from;
1778     else
1779       /* compare current ip addr with the first one */
1780       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1781   }
1782 }
1783
1784 /* loop over our non-internal source to know if the session
1785  * is doing point-to-point */
1786 static void
1787 session_update_ptp (RTPSession * sess)
1788 {
1789   /* to know if the session is doing point to point, the ip addr
1790    * of each non-internal (=remotes) source have to be compared
1791    * to each other.
1792    */
1793   gboolean is_doing_rtp_ptp;
1794   gboolean is_doing_rtcp_ptp;
1795   CompareAddrData data;
1796
1797   /* compare the first remote source's ip addr that receive rtp packets
1798    * with other remote rtp source.
1799    * it's enough because the session just needs to know if they are all
1800    * equals or not
1801    */
1802   data.is_doing_ptp = TRUE;
1803   data.new_addr = NULL;
1804   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1805       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1806   is_doing_rtp_ptp = data.is_doing_ptp;
1807
1808   /* same but about rtcp */
1809   data.is_doing_ptp = TRUE;
1810   data.new_addr = NULL;
1811   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1812       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1813   is_doing_rtcp_ptp = data.is_doing_ptp;
1814
1815   /* the session is doing point-to-point if all rtp remote have the same
1816    * ip addr and if all rtcp remote sources have the same ip addr */
1817   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1818
1819   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1820 }
1821
1822 static void
1823 add_source (RTPSession * sess, RTPSource * src)
1824 {
1825   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1826       GINT_TO_POINTER (src->ssrc), src);
1827   /* report the new source ASAP */
1828   src->generation = sess->generation;
1829   /* we have one more source now */
1830   sess->total_sources++;
1831   if (RTP_SOURCE_IS_ACTIVE (src))
1832     sess->stats.active_sources++;
1833   if (src->internal) {
1834     sess->stats.internal_sources++;
1835     if (!sess->internal_ssrc_from_caps_or_property
1836         && sess->suggested_ssrc != src->ssrc) {
1837       sess->suggested_ssrc = src->ssrc;
1838       sess->internal_ssrc_set = TRUE;
1839     }
1840   }
1841
1842   /* update point-to-point status */
1843   if (!src->internal)
1844     session_update_ptp (sess);
1845 }
1846
1847 static RTPSource *
1848 find_source (RTPSession * sess, guint32 ssrc)
1849 {
1850   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1851       GINT_TO_POINTER (ssrc));
1852 }
1853
1854 /* must be called with the session lock, the returned source needs to be
1855  * unreffed after usage. */
1856 static RTPSource *
1857 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1858     RTPPacketInfo * pinfo, gboolean rtp)
1859 {
1860   RTPSource *source;
1861
1862   source = find_source (sess, ssrc);
1863   if (source == NULL) {
1864     /* make new Source in probation and insert */
1865     source = rtp_source_new (ssrc);
1866
1867     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1868
1869     /* for RTP packets we need to set the source in probation. Receiving RTCP
1870      * packets of an SSRC, on the other hand, is a strong indication that we
1871      * are dealing with a valid source. */
1872     g_object_set (source, "probation", rtp ? sess->probation : 0,
1873         "max-dropout-time", sess->max_dropout_time, "max-misorder-time",
1874         sess->max_misorder_time, NULL);
1875
1876     /* store from address, if any */
1877     if (pinfo->address) {
1878       if (rtp)
1879         rtp_source_set_rtp_from (source, pinfo->address);
1880       else
1881         rtp_source_set_rtcp_from (source, pinfo->address);
1882     }
1883
1884     /* configure a callback on the source */
1885     rtp_source_set_callbacks (source, &callbacks, sess);
1886
1887     add_source (sess, source);
1888     *created = TRUE;
1889   } else {
1890     *created = FALSE;
1891     /* check for collision, this updates the address when not previously set */
1892     if (check_collision (sess, source, pinfo, rtp)) {
1893       return NULL;
1894     }
1895     /* Receiving RTCP packets of an SSRC is a strong indication that we
1896      * are dealing with a valid source. */
1897     if (!rtp)
1898       g_object_set (source, "probation", 0, NULL);
1899   }
1900   /* update last activity */
1901   source->last_activity = pinfo->current_time;
1902   if (rtp)
1903     source->last_rtp_activity = pinfo->current_time;
1904   g_object_ref (source);
1905
1906   return source;
1907 }
1908
1909 /* must be called with the session lock, the returned source needs to be
1910  * unreffed after usage. */
1911 static RTPSource *
1912 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1913     GstClockTime current_time)
1914 {
1915   RTPSource *source;
1916
1917   source = find_source (sess, ssrc);
1918   if (source == NULL) {
1919     /* make new internal Source and insert */
1920     source = rtp_source_new (ssrc);
1921
1922     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1923
1924     source->validated = TRUE;
1925     source->internal = TRUE;
1926     source->probation = FALSE;
1927     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1928     rtp_source_set_callbacks (source, &callbacks, sess);
1929
1930     add_source (sess, source);
1931     *created = TRUE;
1932   } else {
1933     *created = FALSE;
1934   }
1935   /* update last activity */
1936   if (current_time != GST_CLOCK_TIME_NONE) {
1937     source->last_activity = current_time;
1938     source->last_rtp_activity = current_time;
1939   }
1940   g_object_ref (source);
1941
1942   return source;
1943 }
1944
1945 /**
1946  * rtp_session_suggest_ssrc:
1947  * @sess: a #RTPSession
1948  * @is_random: if the suggested ssrc is random
1949  *
1950  * Suggest an unused SSRC in @sess.
1951  *
1952  * Returns: a free unused SSRC
1953  */
1954 guint32
1955 rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random)
1956 {
1957   guint32 result;
1958
1959   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1960
1961   RTP_SESSION_LOCK (sess);
1962   result = sess->suggested_ssrc;
1963   if (is_random)
1964     *is_random = !sess->internal_ssrc_set;
1965   RTP_SESSION_UNLOCK (sess);
1966
1967   return result;
1968 }
1969
1970 /**
1971  * rtp_session_add_source:
1972  * @sess: a #RTPSession
1973  * @src: #RTPSource to add
1974  *
1975  * Add @src to @session.
1976  *
1977  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1978  * existed in the session.
1979  */
1980 gboolean
1981 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1982 {
1983   gboolean result = FALSE;
1984   RTPSource *find;
1985
1986   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1987   g_return_val_if_fail (src != NULL, FALSE);
1988
1989   RTP_SESSION_LOCK (sess);
1990   find = find_source (sess, src->ssrc);
1991   if (find == NULL) {
1992     add_source (sess, src);
1993     result = TRUE;
1994   }
1995   RTP_SESSION_UNLOCK (sess);
1996
1997   return result;
1998 }
1999
2000 /**
2001  * rtp_session_get_num_sources:
2002  * @sess: an #RTPSession
2003  *
2004  * Get the number of sources in @sess.
2005  *
2006  * Returns: The number of sources in @sess.
2007  */
2008 guint
2009 rtp_session_get_num_sources (RTPSession * sess)
2010 {
2011   guint result;
2012
2013   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
2014
2015   RTP_SESSION_LOCK (sess);
2016   result = sess->total_sources;
2017   RTP_SESSION_UNLOCK (sess);
2018
2019   return result;
2020 }
2021
2022 /**
2023  * rtp_session_get_num_active_sources:
2024  * @sess: an #RTPSession
2025  *
2026  * Get the number of active sources in @sess. A source is considered active when
2027  * it has been validated and has not yet received a BYE RTCP message.
2028  *
2029  * Returns: The number of active sources in @sess.
2030  */
2031 guint
2032 rtp_session_get_num_active_sources (RTPSession * sess)
2033 {
2034   guint result;
2035
2036   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
2037
2038   RTP_SESSION_LOCK (sess);
2039   result = sess->stats.active_sources;
2040   RTP_SESSION_UNLOCK (sess);
2041
2042   return result;
2043 }
2044
2045 /**
2046  * rtp_session_get_source_by_ssrc:
2047  * @sess: an #RTPSession
2048  * @ssrc: an SSRC
2049  *
2050  * Find the source with @ssrc in @sess.
2051  *
2052  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
2053  * g_object_unref() after usage.
2054  */
2055 RTPSource *
2056 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
2057 {
2058   RTPSource *result;
2059
2060   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
2061
2062   RTP_SESSION_LOCK (sess);
2063   result = find_source (sess, ssrc);
2064   if (result != NULL)
2065     g_object_ref (result);
2066   RTP_SESSION_UNLOCK (sess);
2067
2068   return result;
2069 }
2070
2071 /* should be called with the SESSION lock */
2072 static guint32
2073 rtp_session_create_new_ssrc (RTPSession * sess)
2074 {
2075   guint32 ssrc;
2076
2077   while (TRUE) {
2078     ssrc = g_random_int ();
2079
2080     /* see if it exists in the session, we're done if it doesn't */
2081     if (find_source (sess, ssrc) == NULL)
2082       break;
2083   }
2084   return ssrc;
2085 }
2086
2087 static gboolean
2088 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
2089 {
2090   GstNetAddressMeta *meta;
2091
2092   /* get packet size including header overhead */
2093   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
2094   pinfo->packets++;
2095
2096   if (pinfo->rtp) {
2097     GstRTPBuffer rtp = { NULL };
2098
2099     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
2100       goto invalid_packet;
2101
2102     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
2103     if (idx == 0) {
2104       gint i;
2105
2106       /* only keep info for first buffer */
2107       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
2108       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
2109       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
2110       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2111       pinfo->marker = gst_rtp_buffer_get_marker (&rtp);
2112       /* copy available csrc */
2113       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
2114       for (i = 0; i < pinfo->csrc_count; i++)
2115         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
2116
2117       /* RTP header extensions */
2118       pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp,
2119           &pinfo->header_ext_bit_pattern);
2120     }
2121
2122     if (pinfo->ntp64_ext_id != 0 && pinfo->send && !pinfo->have_ntp64_ext) {
2123       guint8 *data;
2124       guint size;
2125
2126       /* Remember here that there is a 64-bit NTP header extension on this buffer
2127        * or any of the other buffers in the buffer list.
2128        * Later we update this after making the buffer(list) writable.
2129        */
2130       if ((gst_rtp_buffer_get_extension_onebyte_header (&rtp,
2131                   pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
2132               && size == 8)
2133           || (gst_rtp_buffer_get_extension_twobytes_header (&rtp, NULL,
2134                   pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
2135               && size == 8)) {
2136         pinfo->have_ntp64_ext = TRUE;
2137       }
2138     }
2139
2140     gst_rtp_buffer_unmap (&rtp);
2141   }
2142
2143   if (idx == 0) {
2144     /* for netbuffer we can store the IP address to check for collisions */
2145     meta = gst_buffer_get_net_address_meta (*buffer);
2146     if (pinfo->address)
2147       g_object_unref (pinfo->address);
2148     if (meta) {
2149       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
2150     } else {
2151       pinfo->address = NULL;
2152     }
2153   }
2154   return TRUE;
2155
2156   /* ERRORS */
2157 invalid_packet:
2158   {
2159     GST_DEBUG ("invalid RTP packet received");
2160     return FALSE;
2161   }
2162 }
2163
2164 /* update the RTPPacketInfo structure with the current time and other bits
2165  * about the current buffer we are handling.
2166  * This function is typically called when a validated packet is received.
2167  * This function should be called with the RTP_SESSION_LOCK
2168  */
2169 static gboolean
2170 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
2171     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
2172     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2173 {
2174   gboolean res;
2175
2176   pinfo->send = send;
2177   pinfo->rtp = rtp;
2178   pinfo->is_list = is_list;
2179   pinfo->data = data;
2180   pinfo->current_time = current_time;
2181   pinfo->running_time = running_time;
2182   pinfo->ntpnstime = ntpnstime;
2183   pinfo->header_len = sess->header_len;
2184   pinfo->bytes = 0;
2185   pinfo->payload_len = 0;
2186   pinfo->packets = 0;
2187   pinfo->marker = FALSE;
2188   pinfo->ntp64_ext_id = send ? sess->send_ntp64_ext_id : 0;
2189   pinfo->have_ntp64_ext = FALSE;
2190
2191   if (is_list) {
2192     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
2193     res =
2194         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
2195         pinfo);
2196     pinfo->arrival_time = GST_CLOCK_TIME_NONE;
2197   } else {
2198     GstBuffer *buffer = GST_BUFFER_CAST (data);
2199     res = update_packet (&buffer, 0, pinfo);
2200     pinfo->arrival_time = GST_BUFFER_DTS (buffer);
2201   }
2202
2203   return res;
2204 }
2205
2206 static void
2207 clean_packet_info (RTPPacketInfo * pinfo)
2208 {
2209   if (pinfo->address)
2210     g_object_unref (pinfo->address);
2211   if (pinfo->data) {
2212     gst_mini_object_unref (pinfo->data);
2213     pinfo->data = NULL;
2214   }
2215   if (pinfo->header_ext)
2216     g_bytes_unref (pinfo->header_ext);
2217 }
2218
2219 static gboolean
2220 source_update_active (RTPSession * sess, RTPSource * source,
2221     gboolean prevactive)
2222 {
2223   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
2224   guint32 ssrc = source->ssrc;
2225
2226   if (prevactive == active)
2227     return FALSE;
2228
2229   if (active) {
2230     sess->stats.active_sources++;
2231     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
2232         sess->stats.active_sources);
2233   } else {
2234     sess->stats.active_sources--;
2235     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
2236         sess->stats.active_sources);
2237   }
2238   return TRUE;
2239 }
2240
2241 static void
2242 process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
2243 {
2244   if (rtp_twcc_manager_recv_packet (sess->twcc, pinfo)) {
2245     RTP_SESSION_UNLOCK (sess);
2246
2247     /* TODO: find a better rational for this number, and possibly tune it based
2248        on factors like framerate / bandwidth etc */
2249     if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) {
2250       GST_INFO ("Could not schedule TWCC straight away");
2251     }
2252     RTP_SESSION_LOCK (sess);
2253   }
2254 }
2255
2256 static gboolean
2257 source_update_sender (RTPSession * sess, RTPSource * source,
2258     gboolean prevsender)
2259 {
2260   gboolean sender = RTP_SOURCE_IS_SENDER (source);
2261   guint32 ssrc = source->ssrc;
2262
2263   if (prevsender == sender)
2264     return FALSE;
2265
2266   if (sender) {
2267     sess->stats.sender_sources++;
2268     if (source->internal)
2269       sess->stats.internal_sender_sources++;
2270     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
2271         sess->stats.sender_sources);
2272   } else {
2273     sess->stats.sender_sources--;
2274     if (source->internal)
2275       sess->stats.internal_sender_sources--;
2276     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
2277         sess->stats.sender_sources);
2278   }
2279   return TRUE;
2280 }
2281
2282 /**
2283  * rtp_session_process_rtp:
2284  * @sess: and #RTPSession
2285  * @buffer: an RTP buffer
2286  * @current_time: the current system time
2287  * @running_time: the running_time of @buffer
2288  *
2289  * Process an RTP buffer in the session manager. This function takes ownership
2290  * of @buffer.
2291  *
2292  * Returns: a #GstFlowReturn.
2293  */
2294 GstFlowReturn
2295 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
2296     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2297 {
2298   GstFlowReturn result;
2299   guint32 ssrc;
2300   RTPSource *source;
2301   gboolean created;
2302   gboolean prevsender, prevactive;
2303   RTPPacketInfo pinfo = { 0, };
2304   guint64 oldrate;
2305
2306   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2307   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2308
2309   RTP_SESSION_LOCK (sess);
2310
2311   /* update pinfo stats */
2312   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
2313           current_time, running_time, ntpnstime)) {
2314     GST_DEBUG ("invalid RTP packet received");
2315     RTP_SESSION_UNLOCK (sess);
2316     return rtp_session_process_rtcp (sess, buffer, current_time, running_time,
2317         ntpnstime);
2318   }
2319
2320   ssrc = pinfo.ssrc;
2321
2322   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
2323   if (!source)
2324     goto collision;
2325
2326   prevsender = RTP_SOURCE_IS_SENDER (source);
2327   prevactive = RTP_SOURCE_IS_ACTIVE (source);
2328   oldrate = source->bitrate;
2329
2330   if (created)
2331     on_new_ssrc (sess, source);
2332
2333   /* let source process the packet */
2334   result = rtp_source_process_rtp (source, &pinfo);
2335   process_twcc_packet (sess, &pinfo);
2336
2337   /* source became active */
2338   if (source_update_active (sess, source, prevactive))
2339     on_ssrc_validated (sess, source);
2340
2341   source_update_sender (sess, source, prevsender);
2342
2343   if (oldrate != source->bitrate)
2344     sess->recalc_bandwidth = TRUE;
2345
2346
2347   if (source->validated) {
2348     gboolean created;
2349     gint i;
2350
2351     /* for validated sources, we add the CSRCs as well */
2352     for (i = 0; i < pinfo.csrc_count; i++) {
2353       guint32 csrc;
2354       RTPSource *csrc_src;
2355
2356       csrc = pinfo.csrcs[i];
2357
2358       /* get source */
2359       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
2360       if (!csrc_src)
2361         continue;
2362
2363       if (created) {
2364         GST_DEBUG ("created new CSRC: %08x", csrc);
2365         rtp_source_set_as_csrc (csrc_src);
2366         source_update_active (sess, csrc_src, FALSE);
2367         on_new_ssrc (sess, csrc_src);
2368       }
2369       g_object_unref (csrc_src);
2370     }
2371   }
2372   g_object_unref (source);
2373
2374   RTP_SESSION_UNLOCK (sess);
2375
2376   clean_packet_info (&pinfo);
2377
2378   return result;
2379
2380   /* ERRORS */
2381 collision:
2382   {
2383     RTP_SESSION_UNLOCK (sess);
2384     clean_packet_info (&pinfo);
2385     GST_DEBUG ("ignoring packet because its collisioning");
2386     return GST_FLOW_OK;
2387   }
2388 }
2389
2390 static void
2391 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2392     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2393 {
2394   guint count, i;
2395
2396   count = gst_rtcp_packet_get_rb_count (packet);
2397   for (i = 0; i < count; i++) {
2398     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2399     guint8 fractionlost;
2400     gint32 packetslost;
2401     RTPSource *src;
2402
2403     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2404         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2405
2406     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2407
2408     /* find our own source */
2409     src = find_source (sess, ssrc);
2410     if (src == NULL)
2411       continue;
2412
2413     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2414       /* only deal with report blocks for our session, we update the stats of
2415        * the sender of the RTCP message. We could also compare our stats against
2416        * the other sender to see if we are better or worse. */
2417       /* FIXME, need to keep track who the RB block is from */
2418       rtp_source_process_rb (source, ssrc, pinfo->ntpnstime, fractionlost,
2419           packetslost, exthighestseq, jitter, lsr, dlsr);
2420     }
2421   }
2422   on_ssrc_active (sess, source);
2423 }
2424
2425 /* A Sender report contains statistics about how the sender is doing. This
2426  * includes timing informataion such as the relation between RTP and NTP
2427  * timestamps and the number of packets/bytes it sent to us.
2428  *
2429  * In this report is also included a set of report blocks related to how this
2430  * sender is receiving data (in case we (or somebody else) is also sending stuff
2431  * to it). This info includes the packet loss, jitter and seqnum. It also
2432  * contains information to calculate the round trip time (LSR/DLSR).
2433  */
2434 static void
2435 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2436     RTPPacketInfo * pinfo, gboolean * do_sync)
2437 {
2438   guint32 senderssrc, rtptime, packet_count, octet_count;
2439   guint64 ntptime;
2440   RTPSource *source;
2441   gboolean created, prevsender;
2442
2443   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2444       &packet_count, &octet_count);
2445
2446   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2447       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2448
2449   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2450   if (!source)
2451     return;
2452
2453   /* skip non-bye packets for sources that are marked BYE */
2454   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2455     goto out;
2456
2457   /* don't try to do lip-sync for sources that sent a BYE */
2458   if (RTP_SOURCE_IS_MARKED_BYE (source))
2459     *do_sync = FALSE;
2460   else
2461     *do_sync = TRUE;
2462
2463   prevsender = RTP_SOURCE_IS_SENDER (source);
2464
2465   /* first update the source */
2466   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2467       packet_count, octet_count);
2468
2469   source_update_sender (sess, source, prevsender);
2470
2471   if (created)
2472     on_new_ssrc (sess, source);
2473
2474   rtp_session_process_rb (sess, source, packet, pinfo);
2475
2476 out:
2477   g_object_unref (source);
2478 }
2479
2480 /* A receiver report contains statistics about how a receiver is doing. It
2481  * includes stuff like packet loss, jitter and the seqnum it received last. It
2482  * also contains info to calculate the round trip time.
2483  *
2484  * We are only interested in how the sender of this report is doing wrt to us.
2485  */
2486 static void
2487 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2488     RTPPacketInfo * pinfo)
2489 {
2490   guint32 senderssrc;
2491   RTPSource *source;
2492   gboolean created;
2493
2494   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2495
2496   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2497
2498   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2499   if (!source)
2500     return;
2501
2502   /* skip non-bye packets for sources that are marked BYE */
2503   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2504     goto out;
2505
2506   if (created)
2507     on_new_ssrc (sess, source);
2508
2509   rtp_session_process_rb (sess, source, packet, pinfo);
2510
2511 out:
2512   g_object_unref (source);
2513 }
2514
2515 /* Get SDES items and store them in the SSRC */
2516 static void
2517 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2518     RTPPacketInfo * pinfo)
2519 {
2520   guint items, i, j;
2521   gboolean more_items, more_entries;
2522
2523   items = gst_rtcp_packet_sdes_get_item_count (packet);
2524   GST_DEBUG ("got SDES packet with %d items", items);
2525
2526   more_items = gst_rtcp_packet_sdes_first_item (packet);
2527   i = 0;
2528   while (more_items) {
2529     guint32 ssrc;
2530     gboolean changed, created, prevactive;
2531     RTPSource *source;
2532     GstStructure *sdes;
2533
2534     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2535
2536     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2537
2538     changed = FALSE;
2539
2540     /* find src, no probation when dealing with RTCP */
2541     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2542     if (!source)
2543       return;
2544
2545     /* skip non-bye packets for sources that are marked BYE */
2546     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2547       goto next;
2548
2549     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2550
2551     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2552     j = 0;
2553     while (more_entries) {
2554       GstRTCPSDESType type;
2555       guint8 len;
2556       guint8 *data;
2557       gchar *name;
2558       gchar *value;
2559
2560       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2561
2562       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2563           data);
2564
2565       if (type == GST_RTCP_SDES_PRIV) {
2566         name = g_strndup ((const gchar *) &data[1], data[0]);
2567         len -= data[0] + 1;
2568         data += data[0] + 1;
2569       } else {
2570         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2571       }
2572
2573       value = g_strndup ((const gchar *) data, len);
2574
2575       if (g_utf8_validate (value, -1, NULL)) {
2576         gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2577       } else {
2578         GST_WARNING ("ignore SDES field %s with non-utf8 data %s", name, value);
2579       }
2580
2581       g_free (name);
2582       g_free (value);
2583
2584       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2585       j++;
2586     }
2587
2588     /* takes ownership of sdes */
2589     changed = rtp_source_set_sdes_struct (source, sdes);
2590
2591     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2592     source->validated = TRUE;
2593
2594     if (created)
2595       on_new_ssrc (sess, source);
2596
2597     /* source became active */
2598     if (source_update_active (sess, source, prevactive))
2599       on_ssrc_validated (sess, source);
2600
2601     if (changed)
2602       on_ssrc_sdes (sess, source);
2603
2604   next:
2605     g_object_unref (source);
2606
2607     more_items = gst_rtcp_packet_sdes_next_item (packet);
2608     i++;
2609   }
2610 }
2611
2612 /* BYE is sent when a client leaves the session
2613  */
2614 static void
2615 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2616     RTPPacketInfo * pinfo)
2617 {
2618   guint count, i;
2619   gchar *reason;
2620   gboolean reconsider = FALSE;
2621
2622   reason = gst_rtcp_packet_bye_get_reason (packet);
2623   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2624
2625   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2626   for (i = 0; i < count; i++) {
2627     guint32 ssrc;
2628     RTPSource *source;
2629     gboolean prevactive, prevsender;
2630     guint pmembers, members;
2631
2632     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2633     GST_DEBUG ("SSRC: %08x", ssrc);
2634
2635     /* find src and mark bye, no probation when dealing with RTCP */
2636     source = find_source (sess, ssrc);
2637     if (!source || source->internal) {
2638       GST_DEBUG ("Ignoring suspicious BYE packet (reason: %s)",
2639           !source ? "can't find source" : "has internal source SSRC");
2640       break;
2641     }
2642
2643     /* store time for when we need to time out this source */
2644     source->bye_time = pinfo->current_time;
2645
2646     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2647     prevsender = RTP_SOURCE_IS_SENDER (source);
2648
2649     /* mark the source BYE */
2650     rtp_source_mark_bye (source, reason);
2651
2652     pmembers = sess->stats.active_sources;
2653
2654     source_update_active (sess, source, prevactive);
2655     source_update_sender (sess, source, prevsender);
2656
2657     members = sess->stats.active_sources;
2658
2659     if (!sess->scheduled_bye && members < pmembers) {
2660       /* some members went away since the previous timeout estimate.
2661        * Perform reverse reconsideration but only when we are not scheduling a
2662        * BYE ourselves. */
2663       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2664           pinfo->current_time < sess->next_rtcp_check_time) {
2665         GstClockTime time_remaining;
2666
2667         /* Scale our next RTCP check time according to the change of numbers
2668          * of members. But only if a) this is the first RTCP, or b) this is not
2669          * a feedback session, or c) this is a feedback session but we schedule
2670          * for every RTCP interval (aka no t-rr-interval set).
2671          *
2672          * FIXME: a) and b) are not great as we will possibly go below Tmin
2673          * for non-feedback profiles and in case of a) below
2674          * Tmin/t-rr-interval in any case.
2675          */
2676         if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE ||
2677             !(sess->rtp_profile == GST_RTP_PROFILE_AVPF
2678                 || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) ||
2679             sess->next_rtcp_check_time - sess->last_rtcp_send_time ==
2680             sess->last_rtcp_interval) {
2681           time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2682           sess->next_rtcp_check_time =
2683               gst_util_uint64_scale (time_remaining, members, pmembers);
2684           sess->next_rtcp_check_time += pinfo->current_time;
2685         }
2686         sess->last_rtcp_interval =
2687             gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers);
2688
2689         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2690             GST_TIME_ARGS (sess->next_rtcp_check_time));
2691
2692         /* mark pending reconsider. We only want to signal the reconsideration
2693          * once after we handled all the source in the bye packet */
2694         reconsider = TRUE;
2695       }
2696     }
2697
2698     on_bye_ssrc (sess, source);
2699   }
2700   if (reconsider) {
2701     RTP_SESSION_UNLOCK (sess);
2702     /* notify app of reconsideration */
2703     if (sess->callbacks.reconsider)
2704       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2705     RTP_SESSION_LOCK (sess);
2706   }
2707
2708   g_free (reason);
2709 }
2710
2711 static void
2712 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2713     RTPPacketInfo * pinfo)
2714 {
2715   GST_DEBUG ("received APP");
2716
2717   if (g_signal_has_handler_pending (sess,
2718           rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, TRUE)) {
2719     GstBuffer *data_buffer = NULL;
2720     guint16 data_length;
2721     gchar name[5];
2722
2723     data_length = gst_rtcp_packet_app_get_data_length (packet) * 4;
2724     if (data_length > 0) {
2725       guint8 *data = gst_rtcp_packet_app_get_data (packet);
2726       data_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2727           GST_BUFFER_COPY_MEMORY, data - packet->rtcp->map.data, data_length);
2728       GST_BUFFER_PTS (data_buffer) = pinfo->running_time;
2729     }
2730
2731     memcpy (name, gst_rtcp_packet_app_get_name (packet), 4);
2732     name[4] = '\0';
2733
2734     RTP_SESSION_UNLOCK (sess);
2735     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_APP_RTCP], 0,
2736         gst_rtcp_packet_app_get_subtype (packet),
2737         gst_rtcp_packet_app_get_ssrc (packet), name, data_buffer);
2738     RTP_SESSION_LOCK (sess);
2739
2740     if (data_buffer)
2741       gst_buffer_unref (data_buffer);
2742   }
2743 }
2744
2745 static gboolean
2746 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2747     guint32 media_ssrc, gboolean fir, GstClockTime current_time)
2748 {
2749   guint32 round_trip = 0;
2750
2751   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
2752       &round_trip);
2753
2754   if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2755     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2756         GST_SECOND, 65536);
2757
2758     /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP
2759      * packets with erroneous values resulting in crazy high RTT. */
2760     if (round_trip_in_ns > 5 * GST_SECOND)
2761       round_trip_in_ns = GST_SECOND / 2;
2762
2763     if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) {
2764       GST_DEBUG ("Ignoring %s request from %X because one was send without one "
2765           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2766           fir ? "FIR" : "PLI", rtp_source_get_ssrc (src),
2767           GST_TIME_ARGS (current_time - src->last_keyframe_request),
2768           GST_TIME_ARGS (round_trip_in_ns));
2769       return FALSE;
2770     }
2771   }
2772
2773   src->last_keyframe_request = current_time;
2774
2775   GST_LOG ("received %s request from %X about %X %p(%p)", fir ? "FIR" : "PLI",
2776       rtp_source_get_ssrc (src), media_ssrc, sess->callbacks.process_rtp,
2777       sess->callbacks.request_key_unit);
2778
2779   RTP_SESSION_UNLOCK (sess);
2780   sess->callbacks.request_key_unit (sess, media_ssrc, fir,
2781       sess->request_key_unit_user_data);
2782   RTP_SESSION_LOCK (sess);
2783
2784   return TRUE;
2785 }
2786
2787 static void
2788 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2789     guint32 media_ssrc, GstClockTime current_time)
2790 {
2791   RTPSource *src;
2792
2793   if (!sess->callbacks.request_key_unit)
2794     return;
2795
2796   src = find_source (sess, sender_ssrc);
2797   if (src == NULL) {
2798     /* try to find a src with media_ssrc instead */
2799     src = find_source (sess, media_ssrc);
2800     if (src == NULL)
2801       return;
2802   }
2803
2804   rtp_session_request_local_key_unit (sess, src, media_ssrc, FALSE,
2805       current_time);
2806 }
2807
2808 static void
2809 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2810     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2811     GstClockTime current_time)
2812 {
2813   RTPSource *src;
2814   guint32 ssrc;
2815   guint position = 0;
2816   gboolean our_request = FALSE;
2817
2818   if (!sess->callbacks.request_key_unit)
2819     return;
2820
2821   if (fci_length < 8)
2822     return;
2823
2824   src = find_source (sess, sender_ssrc);
2825
2826   /* Hack because Google fails to set the sender_ssrc correctly */
2827   if (!src && sender_ssrc == 1) {
2828     GHashTableIter iter;
2829
2830     /* we can't find the source if there are multiple */
2831     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2832       return;
2833
2834     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2835     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2836       if (!src->internal && rtp_source_is_sender (src))
2837         break;
2838       src = NULL;
2839     }
2840   }
2841   if (!src)
2842     return;
2843
2844   for (position = 0; position < fci_length; position += 8) {
2845     guint8 *data = fci_data + position;
2846     RTPSource *own;
2847
2848     ssrc = GST_READ_UINT32_BE (data);
2849
2850     own = find_source (sess, ssrc);
2851     if (own == NULL)
2852       continue;
2853
2854     if (own->internal) {
2855       our_request = TRUE;
2856       break;
2857     }
2858   }
2859   if (!our_request)
2860     return;
2861
2862   rtp_session_request_local_key_unit (sess, src, media_ssrc, TRUE,
2863       current_time);
2864 }
2865
2866 static void
2867 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2868     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2869     GstClockTime current_time)
2870 {
2871   sess->stats.nacks_received++;
2872
2873   if (!sess->callbacks.notify_nack)
2874     return;
2875
2876   while (fci_length > 0) {
2877     guint16 seqnum, blp;
2878
2879     seqnum = GST_READ_UINT16_BE (fci_data);
2880     blp = GST_READ_UINT16_BE (fci_data + 2);
2881
2882     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2883
2884     RTP_SESSION_UNLOCK (sess);
2885     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2886         sess->notify_nack_user_data);
2887     RTP_SESSION_LOCK (sess);
2888
2889     fci_data += 4;
2890     fci_length -= 4;
2891   }
2892 }
2893
2894 static void
2895 rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc,
2896     guint32 media_ssrc, guint8 * fci_data, guint fci_length)
2897 {
2898   GArray *twcc_packets;
2899   GstStructure *twcc_packets_s;
2900   GstStructure *twcc_stats_s;
2901
2902   twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc,
2903       fci_data, fci_length * sizeof (guint32));
2904   if (twcc_packets == NULL)
2905     return;
2906
2907   twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets);
2908   twcc_stats_s =
2909       rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets);
2910
2911   GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s);
2912   GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s);
2913
2914   g_array_unref (twcc_packets);
2915
2916   RTP_SESSION_UNLOCK (sess);
2917   if (sess->callbacks.notify_twcc)
2918     sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s,
2919         sess->notify_twcc_user_data);
2920   RTP_SESSION_LOCK (sess);
2921 }
2922
2923 static void
2924 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2925     RTPPacketInfo * pinfo, GstClockTime current_time)
2926 {
2927   GstRTCPType type;
2928   GstRTCPFBType fbtype;
2929   guint32 sender_ssrc, media_ssrc;
2930   guint8 *fci_data;
2931   guint fci_length;
2932   RTPSource *src;
2933
2934   /* The feedback packet must include both sender SSRC and media SSRC */
2935   if (packet->length < 2)
2936     return;
2937
2938   type = gst_rtcp_packet_get_type (packet);
2939   fbtype = gst_rtcp_packet_fb_get_type (packet);
2940   sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2941   media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2942
2943   src = find_source (sess, media_ssrc);
2944
2945   /* skip non-bye packets for sources that are marked BYE */
2946   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
2947     return;
2948
2949   if (src)
2950     g_object_ref (src);
2951
2952   fci_data = gst_rtcp_packet_fb_get_fci (packet);
2953   fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32);
2954
2955   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2956       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2957
2958   if (g_signal_has_handler_pending (sess,
2959           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2960     GstBuffer *fci_buffer = NULL;
2961
2962     if (fci_length > 0) {
2963       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2964           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2965           fci_length);
2966       GST_BUFFER_PTS (fci_buffer) = pinfo->running_time;
2967     }
2968
2969     RTP_SESSION_UNLOCK (sess);
2970     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2971         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2972     RTP_SESSION_LOCK (sess);
2973
2974     if (fci_buffer)
2975       gst_buffer_unref (fci_buffer);
2976   }
2977
2978   if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) {
2979     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
2980   }
2981
2982   if ((src && src->internal) ||
2983       /* PSFB FIR puts the media ssrc inside the FCI */
2984       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) ||
2985       /* TWCC is for all sources, so a single media-ssrc is not enough */
2986       (type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) {
2987     switch (type) {
2988       case GST_RTCP_TYPE_PSFB:
2989         switch (fbtype) {
2990           case GST_RTCP_PSFB_TYPE_PLI:
2991             if (src)
2992               src->stats.recv_pli_count++;
2993             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2994                 current_time);
2995             break;
2996           case GST_RTCP_PSFB_TYPE_FIR:
2997             if (src)
2998               src->stats.recv_fir_count++;
2999             rtp_session_process_fir (sess, sender_ssrc, media_ssrc, fci_data,
3000                 fci_length, current_time);
3001             break;
3002           default:
3003             break;
3004         }
3005         break;
3006       case GST_RTCP_TYPE_RTPFB:
3007         switch (fbtype) {
3008           case GST_RTCP_RTPFB_TYPE_NACK:
3009             if (src)
3010               src->stats.recv_nack_count++;
3011             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
3012                 fci_data, fci_length, current_time);
3013             break;
3014           case GST_RTCP_RTPFB_TYPE_TWCC:
3015             rtp_session_process_twcc (sess, sender_ssrc, media_ssrc,
3016                 fci_data, fci_length);
3017             break;
3018           default:
3019             break;
3020         }
3021       default:
3022         break;
3023     }
3024   }
3025
3026   if (src)
3027     g_object_unref (src);
3028 }
3029
3030 /**
3031  * rtp_session_process_rtcp:
3032  * @sess: and #RTPSession
3033  * @buffer: an RTCP buffer
3034  * @current_time: the current system time
3035  * @ntpnstime: the current NTP time in nanoseconds
3036  *
3037  * Process an RTCP buffer in the session manager. This function takes ownership
3038  * of @buffer.
3039  *
3040  * Returns: a #GstFlowReturn.
3041  */
3042 GstFlowReturn
3043 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
3044     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
3045 {
3046   GstRTCPPacket packet;
3047   gboolean more, is_bye = FALSE, do_sync = FALSE;
3048   RTPPacketInfo pinfo = { 0, };
3049   GstFlowReturn result = GST_FLOW_OK;
3050   GstRTCPBuffer rtcp = { NULL, };
3051
3052   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3053   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3054
3055   if (!gst_rtcp_buffer_validate_reduced (buffer))
3056     goto invalid_packet;
3057
3058   GST_DEBUG ("received RTCP packet");
3059
3060   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
3061       buffer);
3062
3063   RTP_SESSION_LOCK (sess);
3064   /* update pinfo stats */
3065   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
3066       running_time, ntpnstime);
3067
3068   /* start processing the compound packet */
3069   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3070   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
3071   while (more) {
3072     GstRTCPType type;
3073
3074     type = gst_rtcp_packet_get_type (&packet);
3075
3076     switch (type) {
3077       case GST_RTCP_TYPE_SR:
3078         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
3079         break;
3080       case GST_RTCP_TYPE_RR:
3081         rtp_session_process_rr (sess, &packet, &pinfo);
3082         break;
3083       case GST_RTCP_TYPE_SDES:
3084         rtp_session_process_sdes (sess, &packet, &pinfo);
3085         break;
3086       case GST_RTCP_TYPE_BYE:
3087         is_bye = TRUE;
3088         /* don't try to attempt lip-sync anymore for streams with a BYE */
3089         do_sync = FALSE;
3090         rtp_session_process_bye (sess, &packet, &pinfo);
3091         break;
3092       case GST_RTCP_TYPE_APP:
3093         rtp_session_process_app (sess, &packet, &pinfo);
3094         break;
3095       case GST_RTCP_TYPE_RTPFB:
3096       case GST_RTCP_TYPE_PSFB:
3097         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
3098         break;
3099       case GST_RTCP_TYPE_XR:
3100         /* FIXME: This block is added to downgrade warning level.
3101          * Once the parser is implemented, it should be replaced with
3102          * a proper process function. */
3103         GST_DEBUG ("got RTCP XR packet, but ignored");
3104         break;
3105       default:
3106         GST_WARNING ("got unknown RTCP packet type: %d", type);
3107         break;
3108     }
3109     more = gst_rtcp_packet_move_to_next (&packet);
3110   }
3111
3112   gst_rtcp_buffer_unmap (&rtcp);
3113
3114   /* if we are scheduling a BYE, we only want to count bye packets, else we
3115    * count everything */
3116   if (sess->scheduled_bye && is_bye) {
3117     sess->bye_stats.bye_members++;
3118     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
3119   }
3120
3121   /* keep track of average packet size */
3122   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3123
3124   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
3125       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3126   RTP_SESSION_UNLOCK (sess);
3127
3128   pinfo.data = NULL;
3129   clean_packet_info (&pinfo);
3130
3131   /* notify caller of sr packets in the callback */
3132   if (do_sync && sess->callbacks.sync_rtcp) {
3133     result = sess->callbacks.sync_rtcp (sess, buffer,
3134         sess->sync_rtcp_user_data);
3135   } else
3136     gst_buffer_unref (buffer);
3137
3138   return result;
3139
3140   /* ERRORS */
3141 invalid_packet:
3142   {
3143     GST_DEBUG ("invalid RTCP packet received");
3144     gst_buffer_unref (buffer);
3145     return GST_FLOW_OK;
3146   }
3147 }
3148
3149 static guint8
3150 _get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
3151 {
3152   guint i;
3153   guint8 extmap_id = 0;
3154   guint n_fields = gst_structure_n_fields (s);
3155
3156   for (i = 0; i < n_fields; i++) {
3157     const gchar *field_name = gst_structure_nth_field_name (s, i);
3158     if (g_str_has_prefix (field_name, "extmap-")) {
3159       const gchar *str = gst_structure_get_string (s, field_name);
3160       if (str && g_strcmp0 (str, ext_name) == 0) {
3161         gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
3162         if (id > 0 && id < 15) {
3163           extmap_id = id;
3164           break;
3165         }
3166       }
3167     }
3168   }
3169   return extmap_id;
3170 }
3171
3172 /**
3173  * rtp_session_update_send_caps:
3174  * @sess: an #RTPSession
3175  * @caps: a #GstCaps
3176  *
3177  * Update the caps of the sender in the rtp session.
3178  */
3179 void
3180 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
3181 {
3182   GstStructure *s;
3183   guint ssrc;
3184
3185   g_return_if_fail (RTP_IS_SESSION (sess));
3186   g_return_if_fail (GST_IS_CAPS (caps));
3187
3188   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
3189
3190   s = gst_caps_get_structure (caps, 0);
3191
3192   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
3193     RTPSource *source;
3194     gboolean created;
3195
3196     RTP_SESSION_LOCK (sess);
3197     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3198     sess->suggested_ssrc = ssrc;
3199     sess->internal_ssrc_set = TRUE;
3200     sess->internal_ssrc_from_caps_or_property = TRUE;
3201     if (source) {
3202       rtp_source_update_caps (source, caps);
3203
3204       if (created)
3205         on_new_sender_ssrc (sess, source);
3206
3207       g_object_unref (source);
3208     }
3209
3210     if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
3211       source =
3212           obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3213       if (source) {
3214         rtp_source_update_caps (source, caps);
3215
3216         if (created)
3217           on_new_sender_ssrc (sess, source);
3218
3219         g_object_unref (source);
3220       }
3221     }
3222     RTP_SESSION_UNLOCK (sess);
3223   } else {
3224     sess->internal_ssrc_from_caps_or_property = FALSE;
3225   }
3226
3227   sess->send_ntp64_ext_id =
3228       _get_extmap_id_for_attribute (s,
3229       GST_RTP_HDREXT_BASE GST_RTP_HDREXT_NTP_64);
3230
3231   rtp_twcc_manager_parse_send_ext_id (sess->twcc, s);
3232 }
3233
3234 static void
3235 update_ntp64_header_ext_data (RTPPacketInfo * pinfo, GstBuffer * buffer)
3236 {
3237   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
3238
3239   if (gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtp)) {
3240     guint16 bits;
3241     guint8 *data;
3242     guint wordlen;
3243
3244     if (gst_rtp_buffer_get_extension_data (&rtp, &bits, (gpointer *) & data,
3245             &wordlen)) {
3246       gsize len = wordlen * 4;
3247
3248       /* One-byte header */
3249       if (bits == 0xBEDE) {
3250         /* One-byte header extension */
3251         while (TRUE) {
3252           guint8 ext_id, ext_len;
3253
3254           if (len < 1)
3255             break;
3256
3257           ext_id = GST_READ_UINT8 (data) >> 4;
3258           ext_len = (GST_READ_UINT8 (data) & 0xF) + 1;
3259           data += 1;
3260           len -= 1;
3261           if (ext_id == 0) {
3262             /* Skip padding */
3263             continue;
3264           } else if (ext_id == 15) {
3265             /* Stop parsing */
3266             break;
3267           }
3268
3269           /* extension doesn't fit into the header */
3270           if (ext_len > len)
3271             break;
3272
3273           if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
3274             if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
3275               guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
3276                   G_GUINT64_CONSTANT (1) << 32,
3277                   GST_SECOND);
3278
3279               GST_WRITE_UINT64_BE (data, ntptime);
3280             } else {
3281               /* Replace extension with padding */
3282               memset (data - 1, 0, 1 + ext_len);
3283             }
3284           }
3285
3286           /* skip to the next extension */
3287           data += ext_len;
3288           len -= ext_len;
3289         }
3290       } else if ((bits >> 4) == 0x100) {
3291         /* Two-byte header extension */
3292
3293         while (TRUE) {
3294           guint8 ext_id, ext_len;
3295
3296           if (len < 1)
3297             break;
3298
3299           ext_id = GST_READ_UINT8 (data);
3300           data += 1;
3301           len -= 1;
3302           if (ext_id == 0) {
3303             /* Skip padding */
3304             continue;
3305           }
3306
3307           ext_len = GST_READ_UINT8 (data);
3308           data += 1;
3309           len -= 1;
3310
3311           /* extension doesn't fit into the header */
3312           if (ext_len > len)
3313             break;
3314
3315           if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
3316             if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
3317               guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
3318                   G_GUINT64_CONSTANT (1) << 32,
3319                   GST_SECOND);
3320
3321               GST_WRITE_UINT64_BE (data, ntptime);
3322             } else {
3323               /* Replace extension with padding */
3324               memset (data - 2, 0, 2 + ext_len);
3325             }
3326           }
3327
3328           /* skip to the next extension */
3329           data += ext_len;
3330           len -= ext_len;
3331         }
3332       }
3333     }
3334     gst_rtp_buffer_unmap (&rtp);
3335   }
3336 }
3337
3338 static void
3339 update_ntp64_header_ext (RTPPacketInfo * pinfo)
3340 {
3341   /* Early return if we don't know the header extension id or the packets
3342    * don't contain the header extension */
3343   if (pinfo->ntp64_ext_id == 0 || !pinfo->have_ntp64_ext)
3344     return;
3345
3346   /* If no NTP time is known then the header extension will be replaced with
3347    * padding, otherwise it will be updated */
3348   GST_TRACE
3349       ("Updating NTP-64 header extension for SSRC %08x packet with RTP time %u and running time %"
3350       GST_TIME_FORMAT " to %" GST_TIME_FORMAT, pinfo->ssrc, pinfo->rtptime,
3351       GST_TIME_ARGS (pinfo->running_time), GST_TIME_ARGS (pinfo->ntpnstime));
3352
3353   if (GST_IS_BUFFER_LIST (pinfo->data)) {
3354     GstBufferList *list;
3355     guint i = 0;
3356
3357     pinfo->data = gst_buffer_list_make_writable (pinfo->data);
3358
3359     list = GST_BUFFER_LIST (pinfo->data);
3360
3361     for (i = 0; i < gst_buffer_list_length (list); i++) {
3362       GstBuffer *buffer = gst_buffer_list_get_writable (list, i);
3363
3364       update_ntp64_header_ext_data (pinfo, buffer);
3365     }
3366   } else {
3367     pinfo->data = gst_buffer_make_writable (pinfo->data);
3368     update_ntp64_header_ext_data (pinfo, pinfo->data);
3369   }
3370 }
3371
3372 /**
3373  * rtp_session_send_rtp:
3374  * @sess: an #RTPSession
3375  * @data: pointer to either an RTP buffer or a list of RTP buffers
3376  * @is_list: TRUE when @data is a buffer list
3377  * @current_time: the current system time
3378  * @running_time: the running time of @data
3379  *
3380  * Send the RTP data (a buffer or buffer list) in the session manager. This
3381  * function takes ownership of @data.
3382  *
3383  * Returns: a #GstFlowReturn.
3384  */
3385 GstFlowReturn
3386 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
3387     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
3388 {
3389   GstFlowReturn result;
3390   RTPSource *source;
3391   gboolean prevsender;
3392   guint64 oldrate;
3393   RTPPacketInfo pinfo = { 0, };
3394   gboolean created;
3395
3396   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3397   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
3398
3399   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
3400
3401   RTP_SESSION_LOCK (sess);
3402   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
3403           current_time, running_time, ntpnstime))
3404     goto invalid_packet;
3405
3406   /* Update any 64-bit NTP header extensions with the actual NTP time here */
3407   update_ntp64_header_ext (&pinfo);
3408   rtp_twcc_manager_send_packet (sess->twcc, &pinfo);
3409
3410   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
3411   if (created)
3412     on_new_sender_ssrc (sess, source);
3413
3414   if (!source->internal) {
3415     GSocketAddress *from;
3416
3417     if (source->rtp_from)
3418       from = source->rtp_from;
3419     else
3420       from = source->rtcp_from;
3421     if (from) {
3422       if (rtp_session_find_conflicting_address (sess, from, current_time)) {
3423         /* Its a known conflict, its probably a loop, not a collision
3424          * lets just drop the incoming packet
3425          */
3426         GST_LOG ("Our packets are being looped back to us, ignoring collision");
3427       } else {
3428         GST_DEBUG ("Collision for SSRC %x, change our sender ssrc", pinfo.ssrc);
3429
3430         rtp_session_have_conflict (sess, source, from, current_time);
3431       }
3432     } else {
3433       GST_LOG ("Ignoring collision on sent SSRC %x because remote source"
3434           " doesn't have an address", pinfo.ssrc);
3435     }
3436
3437     /* the the sending source is not internal, we have to drop the packet,
3438        or else we will end up receving it ourselves! */
3439     goto collision;
3440   }
3441
3442   prevsender = RTP_SOURCE_IS_SENDER (source);
3443   oldrate = source->bitrate;
3444
3445   /* we use our own source to send */
3446   result = rtp_source_send_rtp (source, &pinfo);
3447
3448   source_update_sender (sess, source, prevsender);
3449
3450   if (oldrate != source->bitrate)
3451     sess->recalc_bandwidth = TRUE;
3452   RTP_SESSION_UNLOCK (sess);
3453
3454   g_object_unref (source);
3455   clean_packet_info (&pinfo);
3456
3457   return result;
3458
3459 invalid_packet:
3460   {
3461     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
3462     RTP_SESSION_UNLOCK (sess);
3463     GST_DEBUG ("invalid RTP packet received");
3464     return GST_FLOW_OK;
3465   }
3466 collision:
3467   {
3468     g_object_unref (source);
3469     clean_packet_info (&pinfo);
3470     RTP_SESSION_UNLOCK (sess);
3471     GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
3472         pinfo.ssrc);
3473     return GST_FLOW_OK;
3474   }
3475 }
3476
3477 static void
3478 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
3479 {
3480   *bandwidth += source->bitrate;
3481 }
3482
3483 /* must be called with session lock */
3484 static GstClockTime
3485 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
3486     gboolean first)
3487 {
3488   GstClockTime result;
3489   RTPSessionStats *stats;
3490
3491   /* recalculate bandwidth when it changed */
3492   if (sess->recalc_bandwidth) {
3493     gdouble bandwidth;
3494
3495     if (sess->bandwidth > 0)
3496       bandwidth = sess->bandwidth;
3497     else {
3498       /* If it is <= 0, then try to estimate the actual bandwidth */
3499       bandwidth = 0;
3500
3501       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3502           (GHFunc) add_bitrates, &bandwidth);
3503     }
3504     if (bandwidth < RTP_STATS_BANDWIDTH)
3505       bandwidth = RTP_STATS_BANDWIDTH;
3506
3507     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
3508         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
3509
3510     sess->recalc_bandwidth = FALSE;
3511   }
3512
3513   if (sess->scheduled_bye) {
3514     stats = &sess->bye_stats;
3515     result = rtp_stats_calculate_bye_interval (stats);
3516   } else {
3517     session_update_ptp (sess);
3518
3519     stats = &sess->stats;
3520     result = rtp_stats_calculate_rtcp_interval (stats,
3521         stats->internal_sender_sources > 0, sess->rtp_profile,
3522         sess->is_doing_ptp, first);
3523   }
3524
3525   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
3526       GST_TIME_ARGS (result), first);
3527
3528   if (!deterministic && result != GST_CLOCK_TIME_NONE)
3529     result = rtp_stats_add_rtcp_jitter (stats, result);
3530
3531   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
3532
3533   return result;
3534 }
3535
3536 static void
3537 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
3538 {
3539   if (source->internal)
3540     rtp_source_mark_bye (source, reason);
3541 }
3542
3543 /**
3544  * rtp_session_mark_all_bye:
3545  * @sess: an #RTPSession
3546  * @reason: a reason
3547  *
3548  * Mark all internal sources of the session as BYE with @reason.
3549  */
3550 void
3551 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
3552 {
3553   g_return_if_fail (RTP_IS_SESSION (sess));
3554
3555   RTP_SESSION_LOCK (sess);
3556   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3557       (GHFunc) source_mark_bye, (gpointer) reason);
3558   RTP_SESSION_UNLOCK (sess);
3559 }
3560
3561 /* Stop the current @sess and schedule a BYE message for the other members.
3562  * One must have the session lock to call this function
3563  */
3564 static GstFlowReturn
3565 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
3566 {
3567   GstFlowReturn result = GST_FLOW_OK;
3568   GstClockTime interval;
3569
3570   /* nothing to do it we already scheduled bye */
3571   if (sess->scheduled_bye)
3572     goto done;
3573
3574   /* we schedule BYE now */
3575   sess->scheduled_bye = TRUE;
3576   /* at least one member wants to send a BYE */
3577   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
3578   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
3579   sess->bye_stats.bye_members = 1;
3580   sess->first_rtcp = TRUE;
3581
3582   /* reschedule transmission */
3583   sess->last_rtcp_send_time = current_time;
3584   sess->last_rtcp_check_time = current_time;
3585   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3586
3587   if (interval != GST_CLOCK_TIME_NONE)
3588     sess->next_rtcp_check_time = current_time + interval;
3589   else
3590     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
3591   sess->last_rtcp_interval = interval;
3592
3593   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
3594       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
3595
3596   RTP_SESSION_UNLOCK (sess);
3597   /* notify app of reconsideration */
3598   if (sess->callbacks.reconsider)
3599     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3600   RTP_SESSION_LOCK (sess);
3601 done:
3602
3603   return result;
3604 }
3605
3606 /**
3607  * rtp_session_schedule_bye:
3608  * @sess: an #RTPSession
3609  * @current_time: the current system time
3610  *
3611  * Schedule a BYE message for all sources marked as BYE in @sess.
3612  *
3613  * Returns: a #GstFlowReturn.
3614  */
3615 GstFlowReturn
3616 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
3617 {
3618   GstFlowReturn result;
3619
3620   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3621
3622   RTP_SESSION_LOCK (sess);
3623   result = rtp_session_schedule_bye_locked (sess, current_time);
3624   RTP_SESSION_UNLOCK (sess);
3625
3626   return result;
3627 }
3628
3629 /**
3630  * rtp_session_next_timeout:
3631  * @sess: an #RTPSession
3632  * @current_time: the current system time
3633  *
3634  * Get the next time we should perform session maintenance tasks.
3635  *
3636  * Returns: a time when rtp_session_on_timeout() should be called with the
3637  * current system time.
3638  */
3639 GstClockTime
3640 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
3641 {
3642   GstClockTime result, interval = 0;
3643
3644   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
3645
3646   RTP_SESSION_LOCK (sess);
3647
3648   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3649     GST_DEBUG ("have early rtcp time");
3650     result = sess->next_early_rtcp_time;
3651     goto early_exit;
3652   }
3653
3654   result = sess->next_rtcp_check_time;
3655
3656   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3657       ", next time: %" GST_TIME_FORMAT,
3658       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3659
3660   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
3661     GST_DEBUG ("take current time as base");
3662     /* our previous check time expired, start counting from the current time
3663      * again. */
3664     result = current_time;
3665   }
3666
3667   if (sess->scheduled_bye) {
3668     if (sess->bye_stats.active_sources >= 50) {
3669       GST_DEBUG ("reconsider BYE, more than 50 sources");
3670       /* reconsider BYE if members >= 50 */
3671       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3672       sess->last_rtcp_interval = interval;
3673     }
3674   } else {
3675     if (sess->first_rtcp) {
3676       GST_DEBUG ("first RTCP packet");
3677       /* we are called for the first time */
3678       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3679       sess->last_rtcp_interval = interval;
3680     } else if (sess->next_rtcp_check_time < current_time) {
3681       GST_DEBUG ("old check time expired, getting new timeout");
3682       /* get a new timeout when we need to */
3683       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
3684       sess->last_rtcp_interval = interval;
3685
3686       if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
3687               || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
3688           && interval != GST_CLOCK_TIME_NONE) {
3689         /* Apply the rules from RFC 4585 section 3.5.3 */
3690         if (sess->stats.min_interval != 0) {
3691           GstClockTime T_rr_current_interval = g_random_double_range (0.5,
3692               1.5) * sess->stats.min_interval * GST_SECOND;
3693
3694           if (T_rr_current_interval > interval) {
3695             GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
3696                 " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
3697                 GST_TIME_ARGS (interval));
3698             interval = T_rr_current_interval;
3699           }
3700         }
3701       }
3702     }
3703   }
3704
3705   if (interval != GST_CLOCK_TIME_NONE)
3706     result += interval;
3707   else
3708     result = GST_CLOCK_TIME_NONE;
3709
3710   sess->next_rtcp_check_time = result;
3711
3712 early_exit:
3713
3714   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3715       ", next time: %" GST_TIME_FORMAT,
3716       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3717   RTP_SESSION_UNLOCK (sess);
3718
3719   return result;
3720 }
3721
3722 typedef struct
3723 {
3724   RTPSource *source;
3725   gboolean is_bye;
3726   GstBuffer *buffer;
3727 } ReportOutput;
3728
3729 typedef struct
3730 {
3731   GstRTCPBuffer rtcpbuf;
3732   RTPSession *sess;
3733   RTPSource *source;
3734   guint num_to_report;
3735   gboolean have_fir;
3736   gboolean have_pli;
3737   gboolean have_nack;
3738   GstBuffer *rtcp;
3739   GstClockTime current_time;
3740   guint64 ntpnstime;
3741   GstClockTime running_time;
3742   GstClockTime interval;
3743   GstRTCPPacket packet;
3744   gboolean has_sdes;
3745   gboolean is_early;
3746   gboolean may_suppress;
3747   GQueue output;
3748   guint nacked_seqnums;
3749 } ReportData;
3750
3751 static void
3752 session_start_rtcp (RTPSession * sess, ReportData * data)
3753 {
3754   GstRTCPPacket *packet = &data->packet;
3755   RTPSource *own = data->source;
3756   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3757
3758   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
3759   data->has_sdes = FALSE;
3760
3761   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3762
3763   if (data->is_early && sess->reduced_size_rtcp)
3764     return;
3765
3766   if (RTP_SOURCE_IS_SENDER (own)) {
3767     guint64 ntptime;
3768     guint32 rtptime;
3769     guint32 packet_count, octet_count;
3770
3771     /* we are a sender, create SR */
3772     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3773     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3774
3775     /* get latest stats */
3776     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3777         &ntptime, &rtptime, &packet_count, &octet_count);
3778     /* store stats */
3779     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3780         packet_count, octet_count);
3781
3782     /* fill in sender report info */
3783     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3784         sess->timestamp_sender_reports ? ntptime : 0,
3785         sess->timestamp_sender_reports ? rtptime : 0,
3786         packet_count, octet_count);
3787   } else {
3788     /* we are only receiver, create RR */
3789     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3790     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3791     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3792   }
3793 }
3794
3795 /* construct a Sender or Receiver Report */
3796 static void
3797 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3798 {
3799   RTPSession *sess = data->sess;
3800   GstRTCPPacket *packet = &data->packet;
3801   guint8 fractionlost;
3802   gint32 packetslost;
3803   guint32 exthighestseq, jitter;
3804   guint32 lsr, dlsr;
3805
3806   /* don't report for sources in future generations */
3807   if (((gint16) (source->generation - sess->generation)) > 0) {
3808     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3809         source->generation, sess->generation);
3810     return;
3811   }
3812
3813   if (g_hash_table_contains (source->reported_in_sr_of,
3814           GUINT_TO_POINTER (data->source->ssrc))) {
3815     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3816     return;
3817   }
3818
3819   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3820     GST_DEBUG ("max RB count reached");
3821     return;
3822   }
3823
3824   /* only report about remote sources */
3825   if (source->internal)
3826     goto reported;
3827
3828   if (!RTP_SOURCE_IS_SENDER (source)) {
3829     GST_DEBUG ("source %08x not sender", source->ssrc);
3830     goto reported;
3831   }
3832
3833   if (source->disable_rtcp) {
3834     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
3835     goto reported;
3836   }
3837
3838   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3839
3840   /* get new stats */
3841   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3842       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3843
3844   /* store last generated RR packet */
3845   source->last_rr.is_valid = TRUE;
3846   source->last_rr.ssrc = data->source->ssrc;
3847   source->last_rr.fractionlost = fractionlost;
3848   source->last_rr.packetslost = packetslost;
3849   source->last_rr.exthighestseq = exthighestseq;
3850   source->last_rr.jitter = jitter;
3851   source->last_rr.lsr = lsr;
3852   source->last_rr.dlsr = dlsr;
3853
3854   /* packet is not yet filled, add report block for this source. */
3855   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3856       exthighestseq, jitter, lsr, dlsr);
3857
3858 reported:
3859   g_hash_table_add (source->reported_in_sr_of,
3860       GUINT_TO_POINTER (data->source->ssrc));
3861 }
3862
3863 /* construct FIR */
3864 static void
3865 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3866 {
3867   GstRTCPPacket *packet = &data->packet;
3868   guint16 len;
3869   guint8 *fci_data;
3870
3871   if (!source->send_fir)
3872     return;
3873
3874   len = gst_rtcp_packet_fb_get_fci_length (packet);
3875   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3876     /* exit because the packet is full, will put next request in a
3877      * further packet */
3878     return;
3879
3880   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3881
3882   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3883   fci_data += 4;
3884   fci_data[0] = source->current_send_fir_seqnum;
3885   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3886
3887   source->send_fir = FALSE;
3888   source->stats.sent_fir_count++;
3889 }
3890
3891 static void
3892 session_fir (RTPSession * sess, ReportData * data)
3893 {
3894   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3895   GstRTCPPacket *packet = &data->packet;
3896
3897   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3898     return;
3899
3900   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3901   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3902   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3903
3904   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3905       (GHFunc) session_add_fir, data);
3906
3907   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3908     gst_rtcp_packet_remove (packet);
3909   else
3910     data->may_suppress = FALSE;
3911 }
3912
3913 static gboolean
3914 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3915 {
3916   GstRTCPPacket packet;
3917   GstRTCPBuffer rtcp = { NULL, };
3918   gboolean ret = FALSE;
3919
3920   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3921
3922   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3923     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3924         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3925       ret = TRUE;
3926   }
3927
3928   gst_rtcp_buffer_unmap (&rtcp);
3929
3930   return ret;
3931 }
3932
3933 /* construct PLI */
3934 static void
3935 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3936 {
3937   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3938   GstRTCPPacket *packet = &data->packet;
3939
3940   if (!source->send_pli)
3941     return;
3942
3943   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3944     return;
3945
3946   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3947     /* exit because the packet is full, will put next request in a
3948      * further packet */
3949     return;
3950
3951   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
3952   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3953   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3954
3955   source->send_pli = FALSE;
3956   data->may_suppress = FALSE;
3957
3958   source->stats.sent_pli_count++;
3959 }
3960
3961 /* construct NACK */
3962 static void
3963 session_nack (const gchar * key, RTPSource * source, ReportData * data)
3964 {
3965   RTPSession *sess = data->sess;
3966   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3967   GstRTCPPacket *packet = &data->packet;
3968   guint16 *nacks;
3969   GstClockTime *nack_deadlines;
3970   guint n_nacks, i = 0;
3971   guint nacked_seqnums = 0;
3972   guint16 n_fb_nacks = 0;
3973   guint8 *fci_data;
3974
3975   if (!source->send_nack)
3976     return;
3977
3978   nacks = rtp_source_get_nacks (source, &n_nacks);
3979   nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
3980   GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
3981       GST_TIME_ARGS (data->current_time));
3982
3983   /* cleanup expired nacks */
3984   for (i = 0; i < n_nacks; i++) {
3985     GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
3986         GST_TIME_ARGS (nack_deadlines[i]));
3987     if (nack_deadlines[i] >= data->current_time)
3988       break;
3989   }
3990
3991   if (data->is_early) {
3992     /* don't remove them all if this is an early RTCP packet. It may happen
3993      * that the NACKs are late due to high RTT, not sending NACKs at all would
3994      * keep the RTX RTT stats high and maintain a dropping state. */
3995     i = MIN (n_nacks - 1, i);
3996   }
3997
3998   if (i) {
3999     GST_WARNING ("Removing %u expired NACKS", i);
4000     rtp_source_clear_nacks (source, i);
4001     n_nacks -= i;
4002     if (n_nacks == 0)
4003       return;
4004   }
4005
4006   /* allow overriding NACK to packet conversion */
4007   if (g_signal_has_handler_pending (sess,
4008           rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) {
4009     /* this is needed as it will actually resize the buffer */
4010     gst_rtcp_buffer_unmap (rtcp);
4011
4012     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0,
4013         data->source->ssrc, source->ssrc, source->nacks, data->rtcp,
4014         &nacked_seqnums);
4015
4016     /* and now remap for the remaining work */
4017     gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
4018
4019     if (nacked_seqnums > 0)
4020       goto done;
4021   }
4022
4023   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
4024     /* exit because the packet is full, will put next request in a
4025      * further packet */
4026     return;
4027
4028   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
4029   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
4030   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
4031
4032   if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
4033     gst_rtcp_packet_remove (packet);
4034     GST_WARNING ("no nacks fit in the packet");
4035     return;
4036   }
4037
4038   fci_data = gst_rtcp_packet_fb_get_fci (packet);
4039   for (i = 0; i < n_nacks; i = nacked_seqnums) {
4040     guint16 seqnum = nacks[i];
4041     guint16 blp = 0;
4042     guint j;
4043
4044     if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
4045       break;
4046
4047     n_fb_nacks++;
4048     nacked_seqnums++;
4049
4050     for (j = i + 1; j < n_nacks; j++) {
4051       gint diff;
4052
4053       diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
4054       GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
4055       if (diff > 16)
4056         break;
4057
4058       blp |= 1 << (diff - 1);
4059       nacked_seqnums++;
4060     }
4061
4062     GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
4063     fci_data += 4;
4064   }
4065
4066   GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
4067   source->stats.sent_nack_count += n_fb_nacks;
4068
4069 done:
4070   data->nacked_seqnums += nacked_seqnums;
4071   rtp_source_clear_nacks (source, nacked_seqnums);
4072   data->may_suppress = FALSE;
4073 }
4074
4075 /* perform cleanup of sources that timed out */
4076 static void
4077 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
4078 {
4079   gboolean remove = FALSE;
4080   gboolean byetimeout = FALSE;
4081   gboolean sendertimeout = FALSE;
4082   gboolean is_sender, is_active;
4083   RTPSession *sess = data->sess;
4084   GstClockTime interval, binterval;
4085   GstClockTime btime;
4086
4087   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
4088
4089   /* check for outdated collisions */
4090   if (source->internal) {
4091     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
4092     rtp_source_timeout (source, data->current_time, data->running_time,
4093         sess->rtcp_feedback_retention_window);
4094   }
4095
4096   /* nothing else to do when without RTCP */
4097   if (data->interval == GST_CLOCK_TIME_NONE)
4098     return;
4099
4100   is_sender = RTP_SOURCE_IS_SENDER (source);
4101   is_active = RTP_SOURCE_IS_ACTIVE (source);
4102
4103   /* our own rtcp interval may have been forced low by secondary configuration,
4104    * while sender side may still operate with higher interval,
4105    * so do not just take our interval to decide on timing out sender,
4106    * but take (if data->interval <= 5 * GST_SECOND):
4107    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
4108    * where sender_interval is difference between last 2 received RTCP reports
4109    */
4110   if (data->interval >= 5 * GST_SECOND || source->internal) {
4111     binterval = data->interval;
4112   } else {
4113     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
4114         GST_TIME_ARGS (source->stats.prev_rtcptime),
4115         GST_TIME_ARGS (source->stats.last_rtcptime));
4116     /* if not received enough yet, fallback to larger default */
4117     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
4118       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
4119     else
4120       binterval = 5 * GST_SECOND;
4121     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
4122   }
4123   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
4124       GST_TIME_ARGS (binterval));
4125
4126   if (!source->internal && source->marked_bye) {
4127     /* if we received a BYE from the source, remove the source after some
4128      * time. */
4129     if (data->current_time > source->bye_time &&
4130         data->current_time - source->bye_time > sess->stats.bye_timeout) {
4131       GST_DEBUG ("removing BYE source %08x", source->ssrc);
4132       remove = TRUE;
4133       byetimeout = TRUE;
4134     }
4135   }
4136
4137   if (source->internal && source->sent_bye) {
4138     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
4139     remove = TRUE;
4140   }
4141
4142   /* sources that were inactive for more than 5 times the deterministic reporting
4143    * interval get timed out. the min timeout is 5 seconds. */
4144   /* mind old time that might pre-date last time going to PLAYING */
4145   btime = MAX (source->last_activity, sess->start_time);
4146   if (data->current_time > btime) {
4147     interval = MAX (binterval * 5, 5 * GST_SECOND);
4148     if (data->current_time - btime > interval) {
4149       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
4150           source->ssrc, GST_TIME_ARGS (btime));
4151       if (source->internal) {
4152         /* this is an internal source that is not using our suggested ssrc.
4153          * since there must be another source using this ssrc, we can remove
4154          * this one instead of making it a receiver forever */
4155         if (source->ssrc != sess->suggested_ssrc) {
4156           rtp_source_mark_bye (source, "timed out");
4157           /* do not schedule bye here, since we are inside the RTCP timeout
4158            * processing and scheduling bye will interfere with SR/RR sending */
4159         }
4160       } else {
4161         remove = TRUE;
4162       }
4163     }
4164   }
4165
4166   /* senders that did not send for a long time become a receiver, this also
4167    * holds for our own sources. */
4168   if (is_sender) {
4169     /* mind old time that might pre-date last time going to PLAYING */
4170     btime = MAX (source->last_rtp_activity, sess->start_time);
4171     if (data->current_time > btime) {
4172       interval = MAX (binterval * 2, 5 * GST_SECOND);
4173       if (data->current_time - btime > interval) {
4174         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
4175             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
4176         sendertimeout = TRUE;
4177       }
4178     }
4179   }
4180
4181   if (remove) {
4182     sess->total_sources--;
4183     if (is_sender) {
4184       sess->stats.sender_sources--;
4185       if (source->internal)
4186         sess->stats.internal_sender_sources--;
4187     }
4188     if (is_active)
4189       sess->stats.active_sources--;
4190
4191     if (source->internal)
4192       sess->stats.internal_sources--;
4193
4194     if (byetimeout)
4195       on_bye_timeout (sess, source);
4196     else
4197       on_timeout (sess, source);
4198   } else {
4199     if (sendertimeout) {
4200       source->is_sender = FALSE;
4201       sess->stats.sender_sources--;
4202       if (source->internal)
4203         sess->stats.internal_sender_sources--;
4204
4205       on_sender_timeout (sess, source);
4206     }
4207     /* count how many source to report in this generation */
4208     if (((gint16) (source->generation - sess->generation)) <= 0)
4209       data->num_to_report++;
4210   }
4211   source->closing = remove;
4212 }
4213
4214 static void
4215 session_sdes (RTPSession * sess, ReportData * data)
4216 {
4217   GstRTCPPacket *packet = &data->packet;
4218   const GstStructure *sdes;
4219   gint i, n_fields;
4220   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4221
4222   /* add SDES packet */
4223   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
4224
4225   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
4226
4227   sdes = rtp_source_get_sdes_struct (data->source);
4228
4229   /* add all fields in the structure, the order is not important. */
4230   n_fields = gst_structure_n_fields (sdes);
4231   for (i = 0; i < n_fields; ++i) {
4232     const gchar *field;
4233     const gchar *value;
4234     GstRTCPSDESType type;
4235
4236     field = gst_structure_nth_field_name (sdes, i);
4237     if (field == NULL)
4238       continue;
4239     value = gst_structure_get_string (sdes, field);
4240     if (value == NULL)
4241       continue;
4242     type = gst_rtcp_sdes_name_to_type (field);
4243
4244     /* Early packets are minimal and only include the CNAME */
4245     if (data->is_early && type != GST_RTCP_SDES_CNAME)
4246       continue;
4247
4248     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
4249       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
4250           (const guint8 *) value);
4251     } else if (type == GST_RTCP_SDES_PRIV) {
4252       gsize prefix_len;
4253       gsize value_len;
4254       gsize data_len;
4255       guint8 data[256];
4256
4257       /* don't accept entries that are too big */
4258       prefix_len = strlen (field);
4259       if (prefix_len > 255)
4260         continue;
4261       value_len = strlen (value);
4262       if (value_len > 255)
4263         continue;
4264       data_len = 1 + prefix_len + value_len;
4265       if (data_len > 255)
4266         continue;
4267
4268       data[0] = prefix_len;
4269       memcpy (&data[1], field, prefix_len);
4270       memcpy (&data[1 + prefix_len], value, value_len);
4271
4272       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
4273     }
4274   }
4275
4276   data->has_sdes = TRUE;
4277 }
4278
4279 /* schedule a BYE packet */
4280 static void
4281 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
4282 {
4283   GstRTCPPacket *packet = &data->packet;
4284   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4285
4286   /* add SDES */
4287   session_sdes (sess, data);
4288   /* add a BYE packet */
4289   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
4290   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
4291   if (source->bye_reason)
4292     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
4293
4294   /* we have a BYE packet now */
4295   source->sent_bye = TRUE;
4296 }
4297
4298 static gboolean
4299 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
4300 {
4301   GstClockTime new_send_time;
4302   GstClockTime interval;
4303   RTPSessionStats *stats;
4304
4305   if (sess->scheduled_bye)
4306     stats = &sess->bye_stats;
4307   else
4308     stats = &sess->stats;
4309
4310   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
4311     data->is_early = TRUE;
4312   else
4313     data->is_early = FALSE;
4314
4315   if (data->is_early && sess->next_early_rtcp_time <= current_time) {
4316     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %"
4317         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
4318         GST_TIME_ARGS (current_time));
4319   } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
4320       sess->next_rtcp_check_time > current_time) {
4321     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
4322         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
4323         GST_TIME_ARGS (current_time));
4324     return FALSE;
4325   }
4326
4327   /* take interval and add jitter */
4328   interval = data->interval;
4329   if (interval != GST_CLOCK_TIME_NONE)
4330     interval = rtp_stats_add_rtcp_jitter (stats, interval);
4331
4332   if (sess->last_rtcp_check_time != GST_CLOCK_TIME_NONE) {
4333     /* perform forward reconsideration */
4334     if (interval != GST_CLOCK_TIME_NONE) {
4335       GstClockTime elapsed;
4336
4337       /* get elapsed time since we last reported */
4338       elapsed = current_time - sess->last_rtcp_check_time;
4339
4340       GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
4341           GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
4342       new_send_time = interval + sess->last_rtcp_check_time;
4343     } else {
4344       new_send_time = sess->last_rtcp_check_time;
4345     }
4346   } else {
4347     /* If this is the first RTCP packet, we can reconsider anything based
4348      * on the last RTCP send time because there was none.
4349      */
4350     g_warn_if_fail (!data->is_early);
4351     data->is_early = FALSE;
4352     new_send_time = current_time;
4353   }
4354
4355   if (!data->is_early) {
4356     /* check if reconsideration */
4357     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
4358       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
4359           GST_TIME_ARGS (new_send_time));
4360       /* store new check time */
4361       sess->next_rtcp_check_time = new_send_time;
4362       sess->last_rtcp_interval = interval;
4363       return FALSE;
4364     }
4365
4366     sess->last_rtcp_interval = interval;
4367     if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
4368             || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
4369         && interval != GST_CLOCK_TIME_NONE) {
4370       /* Apply the rules from RFC 4585 section 3.5.3 */
4371       if (stats->min_interval != 0 && !sess->first_rtcp) {
4372         GstClockTime T_rr_current_interval =
4373             g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND;
4374
4375         if (T_rr_current_interval > interval) {
4376           GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
4377               " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
4378               GST_TIME_ARGS (interval));
4379           interval = T_rr_current_interval;
4380         }
4381       }
4382     }
4383     sess->next_rtcp_check_time = current_time + interval;
4384   }
4385
4386
4387   GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT,
4388       GST_TIME_ARGS (sess->next_rtcp_check_time));
4389
4390   return TRUE;
4391 }
4392
4393 static void
4394 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
4395 {
4396   g_hash_table_insert (hash_table, key, g_object_ref (source));
4397 }
4398
4399 static gboolean
4400 remove_closing_sources (const gchar * key, RTPSource * source,
4401     ReportData * data)
4402 {
4403   if (source->closing)
4404     return TRUE;
4405
4406   if (source->send_fir)
4407     data->have_fir = TRUE;
4408   if (source->send_pli)
4409     data->have_pli = TRUE;
4410   if (source->send_nack)
4411     data->have_nack = TRUE;
4412
4413   return FALSE;
4414 }
4415
4416 static void
4417 generate_twcc (const gchar * key, RTPSource * source, ReportData * data)
4418 {
4419   RTPSession *sess = data->sess;
4420   GstBuffer *buf;
4421
4422   /* only generate RTCP for active internal sources */
4423   if (!source->internal || source->sent_bye)
4424     return;
4425
4426   /* ignore other sources when we do the timeout after a scheduled BYE */
4427   if (sess->scheduled_bye && !source->marked_bye)
4428     return;
4429
4430   /* skip if RTCP is disabled */
4431   if (source->disable_rtcp) {
4432     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4433     return;
4434   }
4435
4436   GST_DEBUG ("generating TWCC feedback for source %08x", source->ssrc);
4437
4438   while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) {
4439     ReportOutput *output = g_slice_new (ReportOutput);
4440     output->source = g_object_ref (source);
4441     output->is_bye = FALSE;
4442     output->buffer = buf;
4443     /* queue the RTCP packet to push later */
4444     g_queue_push_tail (&data->output, output);
4445   }
4446 }
4447
4448
4449 static void
4450 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
4451 {
4452   RTPSession *sess = data->sess;
4453   gboolean is_bye = FALSE;
4454   ReportOutput *output;
4455
4456   /* only generate RTCP for active internal sources */
4457   if (!source->internal || source->sent_bye)
4458     return;
4459
4460   /* ignore other sources when we do the timeout after a scheduled BYE */
4461   if (sess->scheduled_bye && !source->marked_bye)
4462     return;
4463
4464   /* skip if RTCP is disabled */
4465   if (source->disable_rtcp) {
4466     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4467     return;
4468   }
4469
4470   data->source = source;
4471
4472   /* open packet */
4473   session_start_rtcp (sess, data);
4474
4475   if (source->marked_bye) {
4476     /* send BYE */
4477     make_source_bye (sess, source, data);
4478     is_bye = TRUE;
4479   } else if (!data->is_early) {
4480     /* loop over all known sources and add report blocks. If we are early, we
4481      * just make a minimal RTCP packet and skip this step */
4482     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4483         (GHFunc) session_report_blocks, data);
4484   }
4485   if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp))
4486     session_sdes (sess, data);
4487
4488   if (data->have_fir)
4489     session_fir (sess, data);
4490
4491   if (data->have_pli)
4492     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4493         (GHFunc) session_pli, data);
4494
4495   if (data->have_nack)
4496     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4497         (GHFunc) session_nack, data);
4498
4499   gst_rtcp_buffer_unmap (&data->rtcpbuf);
4500
4501   output = g_slice_new (ReportOutput);
4502   output->source = g_object_ref (source);
4503   output->is_bye = is_bye;
4504   output->buffer = data->rtcp;
4505   /* queue the RTCP packet to push later */
4506   g_queue_push_tail (&data->output, output);
4507 }
4508
4509 static void
4510 update_generation (const gchar * key, RTPSource * source, ReportData * data)
4511 {
4512   RTPSession *sess = data->sess;
4513
4514   if (g_hash_table_size (source->reported_in_sr_of) >=
4515       sess->stats.internal_sources) {
4516     /* source is reported, move to next generation */
4517     source->generation = sess->generation + 1;
4518     g_hash_table_remove_all (source->reported_in_sr_of);
4519
4520     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
4521         source->generation);
4522
4523     /* if we reported all sources in this generation, move to next */
4524     if (--data->num_to_report == 0) {
4525       sess->generation++;
4526       GST_DEBUG ("all reported, generation now %u", sess->generation);
4527     }
4528   }
4529 }
4530
4531 static void
4532 schedule_remaining_nacks (const gchar * key, RTPSource * source,
4533     ReportData * data)
4534 {
4535   RTPSession *sess = data->sess;
4536   GstClockTime *nack_deadlines;
4537   GstClockTime deadline;
4538   guint n_nacks;
4539
4540   if (!source->send_nack)
4541     return;
4542
4543   /* the scheduling is entirely based on available bandwidth, just take the
4544    * biggest seqnum, which will have the largest deadline to request early
4545    * RTCP. */
4546   nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
4547   deadline = nack_deadlines[n_nacks - 1];
4548   RTP_SESSION_UNLOCK (sess);
4549   rtp_session_send_rtcp_with_deadline (sess, deadline);
4550   RTP_SESSION_LOCK (sess);
4551 }
4552
4553 static gboolean
4554 rtp_session_are_all_sources_bye (RTPSession * sess)
4555 {
4556   GHashTableIter iter;
4557   RTPSource *src;
4558
4559   RTP_SESSION_LOCK (sess);
4560   g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
4561   while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
4562     if (src->internal && !src->sent_bye) {
4563       RTP_SESSION_UNLOCK (sess);
4564       return FALSE;
4565     }
4566   }
4567   RTP_SESSION_UNLOCK (sess);
4568
4569   return TRUE;
4570 }
4571
4572 /**
4573  * rtp_session_on_timeout:
4574  * @sess: an #RTPSession
4575  * @current_time: the current system time
4576  * @ntpnstime: the current NTP time in nanoseconds
4577  * @running_time: the current running_time of the pipeline
4578  *
4579  * Perform maintenance actions after the timeout obtained with
4580  * rtp_session_next_timeout() expired.
4581  *
4582  * This function will perform timeouts of receivers and senders, send a BYE
4583  * packet or generate RTCP packets with current session stats.
4584  *
4585  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
4586  * times, for each packet that should be processed.
4587  *
4588  * Returns: a #GstFlowReturn.
4589  */
4590 GstFlowReturn
4591 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
4592     guint64 ntpnstime, GstClockTime running_time)
4593 {
4594   GstFlowReturn result = GST_FLOW_OK;
4595   ReportData data = { GST_RTCP_BUFFER_INIT };
4596   GHashTable *table_copy;
4597   ReportOutput *output;
4598   gboolean all_empty = FALSE;
4599
4600   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
4601
4602   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
4603       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4604       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
4605
4606   data.sess = sess;
4607   data.current_time = current_time;
4608   data.ntpnstime = ntpnstime;
4609   data.running_time = running_time;
4610   data.num_to_report = 0;
4611   data.may_suppress = FALSE;
4612   data.nacked_seqnums = 0;
4613   g_queue_init (&data.output);
4614
4615   RTP_SESSION_LOCK (sess);
4616   /* get a new interval, we need this for various cleanups etc */
4617   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
4618
4619   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
4620
4621   /* we need an internal source now */
4622   if (sess->stats.internal_sources == 0) {
4623     RTPSource *source;
4624     gboolean created;
4625
4626     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
4627         current_time);
4628     sess->internal_ssrc_set = TRUE;
4629
4630     if (created)
4631       on_new_sender_ssrc (sess, source);
4632
4633     g_object_unref (source);
4634   }
4635
4636   sess->conflicting_addresses =
4637       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
4638
4639   /* Make a local copy of the hashtable. We need to do this because the
4640    * cleanup stage below releases the session lock. */
4641   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
4642       (GDestroyNotify) g_object_unref);
4643   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4644       (GHFunc) clone_ssrcs_hashtable, table_copy);
4645
4646   /* Clean up the session, mark the source for removing, this might release the
4647    * session lock. */
4648   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
4649   g_hash_table_destroy (table_copy);
4650
4651   /* Now remove the marked sources */
4652   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
4653       (GHRFunc) remove_closing_sources, &data);
4654
4655   /* update point-to-point status */
4656   session_update_ptp (sess);
4657
4658   /* see if we need to generate SR or RR packets */
4659   if (!is_rtcp_time (sess, current_time, &data))
4660     goto done;
4661
4662   /* check if all the buffers are empty after generation */
4663   all_empty = TRUE;
4664
4665   GST_DEBUG
4666       ("doing RTCP generation %u for %u sources, early %d, may suppress %d",
4667       sess->generation, data.num_to_report, data.is_early, data.may_suppress);
4668
4669   /* generate RTCP for all internal sources */
4670   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4671       (GHFunc) generate_rtcp, &data);
4672
4673   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4674       (GHFunc) generate_twcc, &data);
4675
4676   /* update the generation for all the sources that have been reported */
4677   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4678       (GHFunc) update_generation, &data);
4679
4680   /* we keep track of the last report time in order to timeout inactive
4681    * receivers or senders */
4682   if (!data.is_early) {
4683     GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %"
4684         GST_TIME_FORMAT " = %" GST_TIME_FORMAT,
4685         GST_TIME_ARGS (data.current_time),
4686         GST_TIME_ARGS (sess->last_rtcp_send_time),
4687         GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time));
4688     sess->last_rtcp_send_time = data.current_time;
4689   }
4690
4691   GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
4692       " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time),
4693       GST_TIME_ARGS (sess->last_rtcp_check_time),
4694       GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time));
4695   sess->last_rtcp_check_time = data.current_time;
4696   sess->first_rtcp = FALSE;
4697   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
4698   sess->scheduled_bye = FALSE;
4699
4700 done:
4701   RTP_SESSION_UNLOCK (sess);
4702
4703   /* notify about updated statistics */
4704   g_object_notify (G_OBJECT (sess), "stats");
4705
4706   /* push out the RTCP packets */
4707   while ((output = g_queue_pop_head (&data.output))) {
4708     gboolean do_not_suppress, empty_buffer;
4709     GstBuffer *buffer = output->buffer;
4710     RTPSource *source = output->source;
4711
4712     /* Give the user a change to add its own packet */
4713     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
4714         buffer, data.is_early, &do_not_suppress);
4715
4716     empty_buffer = gst_buffer_get_size (buffer) == 0;
4717
4718     if (!empty_buffer)
4719       all_empty = FALSE;
4720
4721     if (sess->callbacks.send_rtcp &&
4722         !empty_buffer && (do_not_suppress || !data.may_suppress)) {
4723       guint packet_size;
4724
4725       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
4726
4727       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
4728       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
4729           sess->stats.avg_rtcp_packet_size, packet_size);
4730       result =
4731           sess->callbacks.send_rtcp (sess, source, buffer,
4732           rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data);
4733
4734       RTP_SESSION_LOCK (sess);
4735       sess->stats.nacks_sent += data.nacked_seqnums;
4736       on_sender_ssrc_active (sess, source);
4737       RTP_SESSION_UNLOCK (sess);
4738     } else {
4739       GST_DEBUG ("freeing packet callback: %p"
4740           " empty_buffer: %d, "
4741           " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp,
4742           empty_buffer, do_not_suppress, data.may_suppress);
4743       if (!empty_buffer) {
4744         RTP_SESSION_LOCK (sess);
4745         sess->stats.nacks_dropped += data.nacked_seqnums;
4746         RTP_SESSION_UNLOCK (sess);
4747       }
4748       gst_buffer_unref (buffer);
4749     }
4750     g_object_unref (source);
4751     g_slice_free (ReportOutput, output);
4752   }
4753
4754   if (all_empty)
4755     GST_ERROR ("generated empty RTCP messages for all the sources");
4756
4757   /* schedule remaining nacks */
4758   RTP_SESSION_LOCK (sess);
4759   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4760       (GHFunc) schedule_remaining_nacks, &data);
4761   RTP_SESSION_UNLOCK (sess);
4762
4763   return result;
4764 }
4765
4766 /**
4767  * rtp_session_request_early_rtcp:
4768  * @sess: an #RTPSession
4769  * @current_time: the current system time
4770  * @max_delay: maximum delay
4771  *
4772  * Request transmission of early RTCP
4773  *
4774  * Returns: %TRUE if the related RTCP can be scheduled.
4775  */
4776 gboolean
4777 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
4778     GstClockTime max_delay)
4779 {
4780   GstClockTime T_dither_max, T_rr, offset = 0;
4781   gboolean ret;
4782   gboolean allow_early;
4783
4784   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
4785
4786   RTP_SESSION_LOCK (sess);
4787
4788   /* We assume a feedback profile if something is requesting RTCP
4789    * to be sent */
4790   sess->rtp_profile = GST_RTP_PROFILE_AVPF;
4791
4792   /* Check if already requested */
4793   /*  RFC 4585 section 3.5.2 step 2 */
4794   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
4795     GST_LOG_OBJECT (sess, "already have next early rtcp time");
4796     ret = (current_time + max_delay > sess->next_early_rtcp_time);
4797     goto end;
4798   }
4799
4800   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
4801     GST_LOG_OBJECT (sess, "no next RTCP check time");
4802     ret = FALSE;
4803     goto end;
4804   }
4805
4806   /* RFC 4585 section 3.5.3 step 1
4807    * If no regular RTCP packet has been sent before, then a regular
4808    * RTCP packet has to be scheduled first and FB messages might be
4809    * included there
4810    */
4811   if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
4812     GST_LOG_OBJECT (sess, "no RTCP sent yet");
4813
4814     if (current_time + max_delay > sess->next_rtcp_check_time) {
4815       GST_LOG_OBJECT (sess,
4816           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4817           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4818           GST_TIME_ARGS (max_delay),
4819           GST_TIME_ARGS (sess->next_rtcp_check_time));
4820       ret = TRUE;
4821     } else {
4822       GST_LOG_OBJECT (sess,
4823           "can't allow early feedback, next scheduled time is too late %"
4824           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4825           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4826           GST_TIME_ARGS (sess->next_rtcp_check_time));
4827       ret = FALSE;
4828     }
4829     goto end;
4830   }
4831
4832   T_rr = sess->last_rtcp_interval;
4833
4834   /*  RFC 4585 section 3.5.2 step 2b */
4835   /* If the total sources is <=2, then there is only us and one peer */
4836   /* When there is one auxiliary stream the session can still do point
4837    * to point.
4838    */
4839   if (sess->is_doing_ptp) {
4840     T_dither_max = 0;
4841   } else {
4842     /* Divide by 2 because l = 0.5 */
4843     T_dither_max = T_rr;
4844     T_dither_max /= 2;
4845   }
4846
4847   /*  RFC 4585 section 3.5.2 step 3 */
4848   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
4849     GST_LOG_OBJECT (sess,
4850         "don't send because of dither, next scheduled time is too soon %"
4851         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
4852         GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
4853         GST_TIME_ARGS (sess->next_rtcp_check_time));
4854     ret = T_dither_max <= max_delay;
4855     goto end;
4856   }
4857
4858   /*  RFC 4585 section 3.5.2 step 4a and
4859    *  RFC 4585 section 3.5.2 step 6 */
4860   allow_early = FALSE;
4861   if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) {
4862     /* Last time we sent a full RTCP packet, we can now immediately
4863      * send an early one as allow_early was reset to TRUE */
4864     allow_early = TRUE;
4865   } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) {
4866     /* Last packet we sent was an early RTCP packet and more than
4867      * T_rr has passed since then, meaning we would have suppressed
4868      * a regular RTCP packet already and reset allow_early to TRUE */
4869     allow_early = TRUE;
4870
4871     /* We have to offset a bit as T_rr has not passed yet, but will before
4872      * max_delay */
4873     if (sess->last_rtcp_check_time + T_rr > current_time)
4874       offset = (sess->last_rtcp_check_time + T_rr) - current_time;
4875   } else {
4876     GST_DEBUG_OBJECT (sess,
4877         "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %"
4878         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %"
4879         GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time),
4880         GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr),
4881         GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay));
4882   }
4883
4884   if (!allow_early) {
4885     /* Ignore the request a scheduled packet will be in time anyway */
4886     if (current_time + max_delay > sess->next_rtcp_check_time) {
4887       GST_LOG_OBJECT (sess,
4888           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4889           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4890           GST_TIME_ARGS (max_delay),
4891           GST_TIME_ARGS (sess->next_rtcp_check_time));
4892       ret = TRUE;
4893     } else {
4894       GST_LOG_OBJECT (sess,
4895           "can't allow early feedback and next scheduled time is too late %"
4896           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4897           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4898           GST_TIME_ARGS (sess->next_rtcp_check_time));
4899       ret = FALSE;
4900     }
4901     goto end;
4902   }
4903
4904   /*  RFC 4585 section 3.5.2 step 4b */
4905   if (T_dither_max) {
4906     /* Schedule an early transmission later */
4907     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
4908         current_time + offset;
4909   } else {
4910     /* If no dithering, schedule it for NOW */
4911     sess->next_early_rtcp_time = current_time + offset;
4912   }
4913
4914   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
4915       ", next regular RTCP time %" GST_TIME_FORMAT,
4916       GST_TIME_ARGS (sess->next_early_rtcp_time),
4917       GST_TIME_ARGS (sess->next_rtcp_check_time));
4918   RTP_SESSION_UNLOCK (sess);
4919
4920   /* notify app of need to send packet early
4921    * and therefore of timeout change */
4922   if (sess->callbacks.reconsider)
4923     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
4924
4925   return TRUE;
4926
4927 end:
4928
4929   RTP_SESSION_UNLOCK (sess);
4930
4931   return ret;
4932 }
4933
4934 static gboolean
4935 rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
4936     GstClockTime max_delay)
4937 {
4938   /* notify the application that we intend to send early RTCP */
4939   if (sess->callbacks.notify_early_rtcp)
4940     sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
4941
4942   return rtp_session_request_early_rtcp (sess, now, max_delay);
4943 }
4944
4945 static gboolean
4946 rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
4947 {
4948   GstClockTime now, max_delay;
4949
4950   if (!sess->callbacks.send_rtcp)
4951     return FALSE;
4952
4953   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4954
4955   if (deadline < now)
4956     return FALSE;
4957
4958   max_delay = deadline - now;
4959
4960   return rtp_session_send_rtcp_internal (sess, now, max_delay);
4961 }
4962
4963 static gboolean
4964 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
4965 {
4966   GstClockTime now;
4967
4968   if (!sess->callbacks.send_rtcp)
4969     return FALSE;
4970
4971   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4972
4973   return rtp_session_send_rtcp_internal (sess, now, max_delay);
4974 }
4975
4976 gboolean
4977 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
4978     gboolean fir, gint count)
4979 {
4980   RTPSource *src;
4981
4982   RTP_SESSION_LOCK (sess);
4983   src = find_source (sess, ssrc);
4984   if (src == NULL)
4985     goto no_source;
4986
4987   if (fir) {
4988     src->send_pli = FALSE;
4989     src->send_fir = TRUE;
4990
4991     if (count == -1 || count != src->last_fir_count)
4992       src->current_send_fir_seqnum++;
4993     src->last_fir_count = count;
4994   } else if (!src->send_fir) {
4995     src->send_pli = TRUE;
4996   }
4997   RTP_SESSION_UNLOCK (sess);
4998
4999   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
5000     GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP");
5001   }
5002
5003   return TRUE;
5004
5005   /* ERRORS */
5006 no_source:
5007   {
5008     RTP_SESSION_UNLOCK (sess);
5009     return FALSE;
5010   }
5011 }
5012
5013 /**
5014  * rtp_session_request_nack:
5015  * @sess: a #RTPSession
5016  * @ssrc: the SSRC
5017  * @seqnum: the missing seqnum
5018  * @max_delay: max delay to request NACK
5019  *
5020  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
5021  *
5022  * Returns: %TRUE if the NACK feedback could be scheduled
5023  */
5024 gboolean
5025 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
5026     GstClockTime max_delay)
5027 {
5028   RTPSource *source;
5029   GstClockTime now;
5030
5031   if (!sess->callbacks.send_rtcp)
5032     return FALSE;
5033
5034   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
5035
5036   RTP_SESSION_LOCK (sess);
5037   source = find_source (sess, ssrc);
5038   if (source == NULL)
5039     goto no_source;
5040
5041   GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
5042       ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
5043   rtp_source_register_nack (source, seqnum, now + max_delay);
5044   RTP_SESSION_UNLOCK (sess);
5045
5046   if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
5047     GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
5048   }
5049
5050   return TRUE;
5051
5052   /* ERRORS */
5053 no_source:
5054   {
5055     RTP_SESSION_UNLOCK (sess);
5056     return FALSE;
5057   }
5058 }
5059
5060 /**
5061  * rtp_session_update_recv_caps_structure:
5062  * @sess: an #RTPSession
5063  * @s: a #GstStructure from a #GstCaps
5064  *
5065  * Update the caps of the receiver in the rtp session.
5066  */
5067 void
5068 rtp_session_update_recv_caps_structure (RTPSession * sess,
5069     const GstStructure * s)
5070 {
5071   rtp_twcc_manager_parse_recv_ext_id (sess->twcc, s);
5072 }