Merging gst-omx
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25 #include <stdlib.h>
26
27 #include <gst/rtp/gstrtpbuffer.h>
28 #include <gst/rtp/gstrtcpbuffer.h>
29
30 #include <gst/glib-compat-private.h>
31
32 #include "rtpsession.h"
33
34 GST_DEBUG_CATEGORY (rtp_session_debug);
35 #define GST_CAT_DEFAULT rtp_session_debug
36
37 /* signals and args */
38 enum
39 {
40   SIGNAL_GET_SOURCE_BY_SSRC,
41   SIGNAL_ON_NEW_SSRC,
42   SIGNAL_ON_SSRC_COLLISION,
43   SIGNAL_ON_SSRC_VALIDATED,
44   SIGNAL_ON_SSRC_ACTIVE,
45   SIGNAL_ON_SSRC_SDES,
46   SIGNAL_ON_BYE_SSRC,
47   SIGNAL_ON_BYE_TIMEOUT,
48   SIGNAL_ON_TIMEOUT,
49   SIGNAL_ON_SENDER_TIMEOUT,
50   SIGNAL_ON_SENDING_RTCP,
51   SIGNAL_ON_APP_RTCP,
52   SIGNAL_ON_FEEDBACK_RTCP,
53   SIGNAL_SEND_RTCP,
54   SIGNAL_SEND_RTCP_FULL,
55   SIGNAL_ON_RECEIVING_RTCP,
56   SIGNAL_ON_NEW_SENDER_SSRC,
57   SIGNAL_ON_SENDER_SSRC_ACTIVE,
58   SIGNAL_ON_SENDING_NACKS,
59   LAST_SIGNAL
60 };
61
62 #define DEFAULT_INTERNAL_SOURCE      NULL
63 #define DEFAULT_BANDWIDTH            0.0
64 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
65 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
66 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
67 #define DEFAULT_RTCP_MTU             1400
68 #define DEFAULT_SDES                 NULL
69 #define DEFAULT_NUM_SOURCES          0
70 #define DEFAULT_NUM_ACTIVE_SOURCES   0
71 #define DEFAULT_SOURCES              NULL
72 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
73 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
74 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
75 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
76 #define DEFAULT_MAX_DROPOUT_TIME     60000
77 #define DEFAULT_MAX_MISORDER_TIME    2000
78 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
79 #define DEFAULT_RTCP_REDUCED_SIZE    FALSE
80 #define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE
81 #define DEFAULT_TWCC_FEEDBACK_INTERVAL GST_CLOCK_TIME_NONE
82
83 enum
84 {
85   PROP_0,
86   PROP_INTERNAL_SSRC,
87   PROP_INTERNAL_SOURCE,
88   PROP_BANDWIDTH,
89   PROP_RTCP_FRACTION,
90   PROP_RTCP_RR_BANDWIDTH,
91   PROP_RTCP_RS_BANDWIDTH,
92   PROP_RTCP_MTU,
93   PROP_SDES,
94   PROP_NUM_SOURCES,
95   PROP_NUM_ACTIVE_SOURCES,
96   PROP_SOURCES,
97   PROP_FAVOR_NEW,
98   PROP_RTCP_MIN_INTERVAL,
99   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
100   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
101   PROP_PROBATION,
102   PROP_MAX_DROPOUT_TIME,
103   PROP_MAX_MISORDER_TIME,
104   PROP_STATS,
105   PROP_RTP_PROFILE,
106   PROP_RTCP_REDUCED_SIZE,
107   PROP_RTCP_DISABLE_SR_TIMESTAMP,
108   PROP_TWCC_FEEDBACK_INTERVAL,
109 };
110
111 /* update average packet size */
112 #define INIT_AVG(avg, val) \
113    (avg) = (val);
114 #define UPDATE_AVG(avg, val)            \
115   if ((avg) == 0)                       \
116    (avg) = (val);                       \
117   else                                  \
118    (avg) = ((val) + (15 * (avg))) >> 4;
119
120 /* GObject vmethods */
121 static void rtp_session_finalize (GObject * object);
122 static void rtp_session_set_property (GObject * object, guint prop_id,
123     const GValue * value, GParamSpec * pspec);
124 static void rtp_session_get_property (GObject * object, guint prop_id,
125     GValue * value, GParamSpec * pspec);
126
127 static gboolean rtp_session_send_rtcp (RTPSession * sess,
128     GstClockTime max_delay);
129 static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
130     GstClockTime deadline);
131
132 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
133
134 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
135
136 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
137 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
138     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
139 static RTPSource *obtain_internal_source (RTPSession * sess,
140     guint32 ssrc, gboolean * created, GstClockTime current_time);
141 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
142     GstClockTime current_time);
143 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
144     gboolean deterministic, gboolean first);
145
146 static gboolean
147 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
148     const GValue * handler_return, gpointer data)
149 {
150   if (g_value_get_boolean (handler_return))
151     g_value_set_boolean (return_accu, TRUE);
152
153   return TRUE;
154 }
155
156 static void
157 rtp_session_class_init (RTPSessionClass * klass)
158 {
159   GObjectClass *gobject_class;
160
161   gobject_class = (GObjectClass *) klass;
162
163   gobject_class->finalize = rtp_session_finalize;
164   gobject_class->set_property = rtp_session_set_property;
165   gobject_class->get_property = rtp_session_get_property;
166
167   /**
168    * RTPSession::get-source-by-ssrc:
169    * @session: the object which received the signal
170    * @ssrc: the SSRC of the RTPSource
171    *
172    * Request the #RTPSource object with SSRC @ssrc in @session.
173    */
174   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
175       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
176       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
177           get_source_by_ssrc), NULL, NULL, NULL,
178       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
179
180   /**
181    * RTPSession::on-new-ssrc:
182    * @session: the object which received the signal
183    * @src: the new RTPSource
184    *
185    * Notify of a new SSRC that entered @session.
186    */
187   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
188       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
189       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
190       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
191   /**
192    * RTPSession::on-ssrc-collision:
193    * @session: the object which received the signal
194    * @src: the #RTPSource that caused a collision
195    *
196    * Notify when we have an SSRC collision
197    */
198   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
199       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
200       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
201       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
202   /**
203    * RTPSession::on-ssrc-validated:
204    * @session: the object which received the signal
205    * @src: the new validated RTPSource
206    *
207    * Notify of a new SSRC that became validated.
208    */
209   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
210       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
211       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
212       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
213   /**
214    * RTPSession::on-ssrc-active:
215    * @session: the object which received the signal
216    * @src: the active RTPSource
217    *
218    * Notify of a SSRC that is active, i.e., sending RTCP.
219    */
220   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
221       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
222       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
223       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
224   /**
225    * RTPSession::on-ssrc-sdes:
226    * @session: the object which received the signal
227    * @src: the RTPSource
228    *
229    * Notify that a new SDES was received for SSRC.
230    */
231   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
232       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
233       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
234       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
235   /**
236    * RTPSession::on-bye-ssrc:
237    * @session: the object which received the signal
238    * @src: the RTPSource that went away
239    *
240    * Notify of an SSRC that became inactive because of a BYE packet.
241    */
242   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
243       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
245       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
246   /**
247    * RTPSession::on-bye-timeout:
248    * @session: the object which received the signal
249    * @src: the RTPSource that timed out
250    *
251    * Notify of an SSRC that has timed out because of BYE
252    */
253   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
254       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
255       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
256       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
257   /**
258    * RTPSession::on-timeout:
259    * @session: the object which received the signal
260    * @src: the RTPSource that timed out
261    *
262    * Notify of an SSRC that has timed out
263    */
264   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
265       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
266       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
267       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
268   /**
269    * RTPSession::on-sender-timeout:
270    * @session: the object which received the signal
271    * @src: the RTPSource that timed out
272    *
273    * Notify of an SSRC that was a sender but timed out and became a receiver.
274    */
275   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
276       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
277       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
278       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
279
280   /**
281    * RTPSession::on-sending-rtcp
282    * @session: the object which received the signal
283    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
284    * @early: %TRUE if the packet is early, %FALSE if it is regular
285    *
286    * This signal is emitted before sending an RTCP packet, it can be used
287    * to add extra RTCP Packets.
288    *
289    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
290    * if suppressing it is acceptable
291    */
292   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
293       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
294       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
295       accumulate_trues, NULL, NULL, G_TYPE_BOOLEAN, 2,
296       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
297
298   /**
299    * RTPSession::on-app-rtcp:
300    * @session: the object which received the signal
301    * @subtype: The subtype of the packet
302    * @ssrc: The SSRC/CSRC of the packet
303    * @name: The name of the packet
304    * @data: a #GstBuffer with the application-dependant data or %NULL if
305    * there was no data
306    *
307    * Notify that a RTCP APP packet has been received
308    */
309   rtp_session_signals[SIGNAL_ON_APP_RTCP] =
310       g_signal_new ("on-app-rtcp", G_TYPE_FROM_CLASS (klass),
311       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_app_rtcp),
312       NULL, NULL, NULL, G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT,
313       G_TYPE_STRING, GST_TYPE_BUFFER);
314
315   /**
316    * RTPSession::on-feedback-rtcp:
317    * @session: the object which received the signal
318    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
319    *  %GST_RTCP_TYPE_RTPFB
320    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
321    * @sender_ssrc: The SSRC of the sender
322    * @media_ssrc: The SSRC of the media this refers to
323    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
324    * there was no FCI
325    *
326    * Notify that a RTCP feedback packet has been received
327    */
328   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
329       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
330       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
331       NULL, NULL, NULL, G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
332       G_TYPE_UINT, GST_TYPE_BUFFER);
333
334   /**
335    * RTPSession::send-rtcp:
336    * @session: the object which received the signal
337    * @max_delay: The maximum delay after which the feedback will not be useful
338    *  anymore
339    *
340    * Requests that the #RTPSession initiate a new RTCP packet as soon as
341    * possible within the requested delay.
342    *
343    * This sets feedback to %TRUE if not already done before.
344    */
345   rtp_session_signals[SIGNAL_SEND_RTCP] =
346       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
347       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
348       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
349       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
350
351   /**
352    * RTPSession::send-rtcp-full:
353    * @session: the object which received the signal
354    * @max_delay: The maximum delay after which the feedback will not be useful
355    *  anymore
356    *
357    * Requests that the #RTPSession initiate a new RTCP packet as soon as
358    * possible within the requested delay.
359    *
360    * This sets feedback to %TRUE if not already done before.
361    *
362    * Returns: TRUE if the new RTCP packet could be scheduled within the
363    * requested delay, FALSE otherwise.
364    *
365    * Since: 1.6
366    */
367   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
368       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
369       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
370       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
371       NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
372
373   /**
374    * RTPSession::on-receiving-rtcp
375    * @session: the object which received the signal
376    * @buffer: the #GstBuffer containing the RTCP packet that was received
377    *
378    * This signal is emitted when receiving an RTCP packet before it is handled
379    * by the session. It can be used to extract custom information from RTCP packets.
380    *
381    * Since: 1.6
382    */
383   rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
384       g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
385       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
386       NULL, NULL, NULL, G_TYPE_NONE, 1,
387       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
388
389   /**
390    * RTPSession::on-new-sender-ssrc:
391    * @session: the object which received the signal
392    * @src: the new sender RTPSource
393    *
394    * Notify of a new sender SSRC that entered @session.
395    *
396    * Since: 1.8
397    */
398   rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
399       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
400       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc),
401       NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
402
403   /**
404    * RTPSession::on-sender-ssrc-active:
405    * @session: the object which received the signal
406    * @src: the active sender RTPSource
407    *
408    * Notify of a sender SSRC that is active, i.e., sending RTCP.
409    *
410    * Since: 1.8
411    */
412   rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
413       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
414       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass,
415           on_sender_ssrc_active), NULL, NULL, NULL,
416       G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
417
418   /**
419    * RTPSession::on-sending-nack
420    * @session: the object which received the signal
421    * @sender_ssrc: the sender ssrc
422    * @media_ssrc: the media ssrc
423    * @nacks: (element-type guint16): the list of seqnum to be nacked
424    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
425    *
426    * This signal is emitted before NACK packets are added into the RTCP
427    * packet. This signal can be used to override the conversion of the NACK
428    * seqnum array into packets. This can be used if your protocol uses
429    * different type of NACK (e.g. based on RTCP APP).
430    *
431    * The handler should transform the seqnum from @nacks array into packets.
432    * @nacks seqnum must be consumed from the start. The remaining will be
433    * rescheduled for later base on bandwidth. Only one handler will be
434    * signalled.
435    *
436    * A handler may return 0 to signal that generic NACKs should be created
437    * for this set. This can be useful if the signal is used for other purpose
438    * or if the other type of NACK would use more space.
439    *
440    * Returns: the number of NACK seqnum that was consumed from @nacks.
441    *
442    * Since: 1.16
443    */
444   rtp_session_signals[SIGNAL_ON_SENDING_NACKS] =
445       g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass),
446       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks),
447       g_signal_accumulator_first_wins, NULL, NULL,
448       G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY,
449       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
450
451   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
452       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
453           "The internal SSRC used for the session (deprecated)",
454           0, G_MAXUINT, 0,
455           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
456           GST_PARAM_DOC_SHOW_DEFAULT));
457
458   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
459       g_param_spec_object ("internal-source", "Internal Source",
460           "The internal source element of the session (deprecated)",
461           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
462
463   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
464       g_param_spec_double ("bandwidth", "Bandwidth",
465           "The bandwidth of the session in bits per second (0 for auto-discover)",
466           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
467           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
468
469   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
470       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
471           "The fraction of the bandwidth used for RTCP in bits per second (or as a real fraction of the RTP bandwidth if < 1)",
472           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
473           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474
475   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
476       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
477           "The RTCP bandwidth used for receivers in bits per second (-1 = default)",
478           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
479           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
480
481   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
482       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
483           "The RTCP bandwidth used for senders in bits per second (-1 = default)",
484           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
485           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
486
487   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
488       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
489           "The maximum size of the RTCP packets",
490           16, G_MAXINT16, DEFAULT_RTCP_MTU,
491           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
492
493   g_object_class_install_property (gobject_class, PROP_SDES,
494       g_param_spec_boxed ("sdes", "SDES",
495           "The SDES items of this session",
496           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
497           | GST_PARAM_DOC_SHOW_DEFAULT));
498
499   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
500       g_param_spec_uint ("num-sources", "Num Sources",
501           "The number of sources in the session", 0, G_MAXUINT,
502           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
503
504   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
505       g_param_spec_uint ("num-active-sources", "Num Active Sources",
506           "The number of active sources in the session", 0, G_MAXUINT,
507           DEFAULT_NUM_ACTIVE_SOURCES,
508           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
509   /**
510    * RTPSource:sources
511    *
512    * Get a GValue Array of all sources in the session.
513    *
514    * ## Getting the #RTPSources of a session
515    *
516    * ``` C
517    * {
518    *   GValueArray *arr;
519    *   GValue *val;
520    *   guint i;
521    *
522    *   g_object_get (sess, "sources", &arr, NULL);
523    *
524    *   for (i = 0; i < arr->n_values; i++) {
525    *     RTPSource *source;
526    *
527    *     val = g_value_array_get_nth (arr, i);
528    *     source = g_value_get_object (val);
529    *   }
530    *   g_value_array_free (arr);
531    * }
532    * ```
533    */
534   g_object_class_install_property (gobject_class, PROP_SOURCES,
535       g_param_spec_boxed ("sources", "Sources",
536           "An array of all known sources in the session",
537           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
538
539   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
540       g_param_spec_boolean ("favor-new", "Favor new sources",
541           "Resolve SSRC conflict in favor of new sources", FALSE,
542           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
543
544   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
545       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
546           "Minimum interval between Regular RTCP packet (in ns)",
547           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
548           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
549
550   g_object_class_install_property (gobject_class,
551       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
552       g_param_spec_uint64 ("rtcp-feedback-retention-window",
553           "RTCP Feedback retention window",
554           "Duration during which RTCP Feedback packets are retained (in ns)",
555           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
556           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
557
558   g_object_class_install_property (gobject_class,
559       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
560       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
561           "RTCP Immediate Feedback threshold",
562           "The maximum number of members of a RTP session for which immediate"
563           " feedback is used (DEPRECATED: has no effect and is not needed)",
564           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
565           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
566
567   g_object_class_install_property (gobject_class, PROP_PROBATION,
568       g_param_spec_uint ("probation", "Number of probations",
569           "Consecutive packet sequence numbers to accept the source",
570           0, G_MAXUINT, DEFAULT_PROBATION,
571           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
572
573   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
574       g_param_spec_uint ("max-dropout-time", "Max dropout time",
575           "The maximum time (milliseconds) of missing packets tolerated.",
576           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
577           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
578
579   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
580       g_param_spec_uint ("max-misorder-time", "Max misorder time",
581           "The maximum time (milliseconds) of misordered packets tolerated.",
582           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
583           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
584
585   /**
586    * RTPSession:stats:
587    *
588    * Various session statistics. This property returns a GstStructure
589    * with name application/x-rtp-session-stats with the following fields:
590    *
591    * * "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
592    *      dropped (due to bandwidth constraints)
593    * *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
594    * *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
595    * *  "source-stats"    G_TYPE_BOXED  GValueArray of #RTPSource:stats for all
596    *      RTP sources (Since 1.8)
597    *
598    * Since: 1.4
599    */
600   g_object_class_install_property (gobject_class, PROP_STATS,
601       g_param_spec_boxed ("stats", "Statistics",
602           "Various statistics", GST_TYPE_STRUCTURE,
603           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
604
605   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
606       g_param_spec_enum ("rtp-profile", "RTP Profile",
607           "RTP profile to use for this session", GST_TYPE_RTP_PROFILE,
608           DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
609
610   g_object_class_install_property (gobject_class, PROP_RTCP_REDUCED_SIZE,
611       g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size",
612           "Use Reduced Size RTCP for feedback packets",
613           DEFAULT_RTCP_REDUCED_SIZE,
614           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
615
616   /**
617    * RTPSession:disable-sr-timestamp:
618    *
619    * Whether sender reports should be timestamped.
620    *
621    * Since: 1.16
622    */
623   g_object_class_install_property (gobject_class,
624       PROP_RTCP_DISABLE_SR_TIMESTAMP,
625       g_param_spec_boolean ("disable-sr-timestamp",
626           "Disable Sender Report Timestamp",
627           "Whether sender reports should be timestamped",
628           DEFAULT_RTCP_DISABLE_SR_TIMESTAMP,
629           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
630
631   /**
632    * RTPSession:twcc-feedback-interval:
633    *
634    * The interval to send TWCC reports on.
635    * This overrides the default behavior of sending reports
636    * based on marker-bits.
637    *
638    * Since: 1.20
639    */
640   g_object_class_install_property (gobject_class,
641       PROP_TWCC_FEEDBACK_INTERVAL,
642       g_param_spec_uint64 ("twcc-feedback-interval",
643           "TWCC Feedback Interval",
644           "The interval to send TWCC reports on",
645           0, G_MAXUINT64, DEFAULT_TWCC_FEEDBACK_INTERVAL,
646           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
647
648   klass->get_source_by_ssrc =
649       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
650   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
651
652   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
653 }
654
655 static void
656 rtp_session_init (RTPSession * sess)
657 {
658   gint i;
659   gchar *str;
660
661   g_mutex_init (&sess->lock);
662   sess->key = g_random_int ();
663   sess->mask_idx = 0;
664   sess->mask = 0;
665
666   /* TODO: We currently only use the first hash table but this is the
667    * beginning of an implementation for RFC2762
668    for (i = 0; i < 32; i++) {
669    */
670   for (i = 0; i < 1; i++) {
671     sess->ssrcs[i] =
672         g_hash_table_new_full (NULL, NULL, NULL,
673         (GDestroyNotify) g_object_unref);
674   }
675
676   rtp_stats_init_defaults (&sess->stats);
677   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
678   rtp_stats_set_min_interval (&sess->stats,
679       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
680
681   sess->recalc_bandwidth = TRUE;
682   sess->bandwidth = DEFAULT_BANDWIDTH;
683   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
684   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
685   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
686
687   /* default UDP header length */
688   sess->header_len = UDP_IP_HEADER_OVERHEAD;
689   sess->mtu = DEFAULT_RTCP_MTU;
690
691   sess->probation = DEFAULT_PROBATION;
692   sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
693   sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
694
695   /* some default SDES entries */
696   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
697
698   /* we do not want to leak details like the username or hostname here */
699   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
700   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
701   g_free (str);
702
703 #if 0
704   /* we do not want to leak the user's real name here */
705   str = g_strdup_printf ("Anon%u", g_random_int ());
706   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
707   g_free (str);
708 #endif
709
710   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
711
712   /* this is the SSRC we suggest */
713   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
714   sess->internal_ssrc_set = FALSE;
715
716   sess->first_rtcp = TRUE;
717   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
718   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
719   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
720   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
721
722   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
723   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
724   sess->rtcp_immediate_feedback_threshold =
725       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
726   sess->rtp_profile = DEFAULT_RTP_PROFILE;
727   sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
728   sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
729
730   sess->is_doing_ptp = TRUE;
731
732   sess->twcc = rtp_twcc_manager_new (sess->mtu);
733   sess->twcc_stats = rtp_twcc_stats_new ();
734 }
735
736 static void
737 rtp_session_finalize (GObject * object)
738 {
739   RTPSession *sess;
740   gint i;
741
742   sess = RTP_SESSION_CAST (object);
743
744   gst_structure_free (sess->sdes);
745
746   g_list_free_full (sess->conflicting_addresses,
747       (GDestroyNotify) rtp_conflicting_address_free);
748
749   /* TODO: Change this again when implementing RFC 2762
750    * for (i = 0; i < 32; i++)
751    */
752   for (i = 0; i < 1; i++)
753     g_hash_table_destroy (sess->ssrcs[i]);
754
755   g_object_unref (sess->twcc);
756   rtp_twcc_stats_free (sess->twcc_stats);
757
758   g_mutex_clear (&sess->lock);
759
760   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
761 }
762
763 static void
764 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
765 {
766   GValue value = { 0 };
767
768   g_value_init (&value, RTP_TYPE_SOURCE);
769   g_value_take_object (&value, source);
770   /* copies the value */
771   g_value_array_append (arr, &value);
772 }
773
774 static GValueArray *
775 rtp_session_create_sources (RTPSession * sess)
776 {
777   GValueArray *res;
778   guint size;
779
780   RTP_SESSION_LOCK (sess);
781   /* get number of elements in the table */
782   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
783   /* create the result value array */
784   res = g_value_array_new (size);
785
786   /* and copy all values into the array */
787   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
788   RTP_SESSION_UNLOCK (sess);
789
790   return res;
791 }
792
793 static void
794 create_source_stats (gpointer key, RTPSource * source, GValueArray * arr)
795 {
796   GValue *value;
797   GstStructure *s;
798
799   g_object_get (source, "stats", &s, NULL);
800
801   g_value_array_append (arr, NULL);
802   value = g_value_array_get_nth (arr, arr->n_values - 1);
803   g_value_init (value, GST_TYPE_STRUCTURE);
804   g_value_take_boxed (value, s);
805 }
806
807 static GstStructure *
808 rtp_session_create_stats (RTPSession * sess)
809 {
810   GstStructure *s;
811   GValueArray *source_stats;
812   GValue source_stats_v = G_VALUE_INIT;
813   guint size;
814
815   RTP_SESSION_LOCK (sess);
816   s = gst_structure_new ("application/x-rtp-session-stats",
817       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
818       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
819       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
820
821   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
822   source_stats = g_value_array_new (size);
823   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
824       (GHFunc) create_source_stats, source_stats);
825   RTP_SESSION_UNLOCK (sess);
826
827   g_value_init (&source_stats_v, G_TYPE_VALUE_ARRAY);
828   g_value_take_boxed (&source_stats_v, source_stats);
829   gst_structure_take_value (s, "source-stats", &source_stats_v);
830
831   return s;
832 }
833
834 static void
835 rtp_session_set_property (GObject * object, guint prop_id,
836     const GValue * value, GParamSpec * pspec)
837 {
838   RTPSession *sess;
839
840   sess = RTP_SESSION (object);
841
842   switch (prop_id) {
843     case PROP_INTERNAL_SSRC:
844       RTP_SESSION_LOCK (sess);
845       sess->suggested_ssrc = g_value_get_uint (value);
846       sess->internal_ssrc_set = TRUE;
847       sess->internal_ssrc_from_caps_or_property = TRUE;
848       RTP_SESSION_UNLOCK (sess);
849       if (sess->callbacks.reconfigure)
850         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
851       break;
852     case PROP_BANDWIDTH:
853       RTP_SESSION_LOCK (sess);
854       sess->bandwidth = g_value_get_double (value);
855       sess->recalc_bandwidth = TRUE;
856       RTP_SESSION_UNLOCK (sess);
857       break;
858     case PROP_RTCP_FRACTION:
859       RTP_SESSION_LOCK (sess);
860       sess->rtcp_bandwidth = g_value_get_double (value);
861       sess->recalc_bandwidth = TRUE;
862       RTP_SESSION_UNLOCK (sess);
863       break;
864     case PROP_RTCP_RR_BANDWIDTH:
865       RTP_SESSION_LOCK (sess);
866       sess->rtcp_rr_bandwidth = g_value_get_int (value);
867       sess->recalc_bandwidth = TRUE;
868       RTP_SESSION_UNLOCK (sess);
869       break;
870     case PROP_RTCP_RS_BANDWIDTH:
871       RTP_SESSION_LOCK (sess);
872       sess->rtcp_rs_bandwidth = g_value_get_int (value);
873       sess->recalc_bandwidth = TRUE;
874       RTP_SESSION_UNLOCK (sess);
875       break;
876     case PROP_RTCP_MTU:
877       sess->mtu = g_value_get_uint (value);
878       rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu);
879       break;
880     case PROP_SDES:
881       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
882       break;
883     case PROP_FAVOR_NEW:
884       sess->favor_new = g_value_get_boolean (value);
885       break;
886     case PROP_RTCP_MIN_INTERVAL:
887       rtp_stats_set_min_interval (&sess->stats,
888           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
889       /* trigger reconsideration */
890       RTP_SESSION_LOCK (sess);
891       sess->next_rtcp_check_time = 0;
892       RTP_SESSION_UNLOCK (sess);
893       if (sess->callbacks.reconsider)
894         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
895       break;
896     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
897       sess->rtcp_feedback_retention_window = g_value_get_uint64 (value);
898       break;
899     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
900       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
901       break;
902     case PROP_PROBATION:
903       sess->probation = g_value_get_uint (value);
904       break;
905     case PROP_MAX_DROPOUT_TIME:
906       sess->max_dropout_time = g_value_get_uint (value);
907       break;
908     case PROP_MAX_MISORDER_TIME:
909       sess->max_misorder_time = g_value_get_uint (value);
910       break;
911     case PROP_RTP_PROFILE:
912       sess->rtp_profile = g_value_get_enum (value);
913       /* trigger reconsideration */
914       RTP_SESSION_LOCK (sess);
915       sess->next_rtcp_check_time = 0;
916       RTP_SESSION_UNLOCK (sess);
917       if (sess->callbacks.reconsider)
918         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
919       break;
920     case PROP_RTCP_REDUCED_SIZE:
921       sess->reduced_size_rtcp = g_value_get_boolean (value);
922       break;
923     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
924       sess->timestamp_sender_reports = !g_value_get_boolean (value);
925       break;
926     case PROP_TWCC_FEEDBACK_INTERVAL:
927       rtp_twcc_manager_set_feedback_interval (sess->twcc,
928           g_value_get_uint64 (value));
929       break;
930     default:
931       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
932       break;
933   }
934 }
935
936 static void
937 rtp_session_get_property (GObject * object, guint prop_id,
938     GValue * value, GParamSpec * pspec)
939 {
940   RTPSession *sess;
941
942   sess = RTP_SESSION (object);
943
944   switch (prop_id) {
945     case PROP_INTERNAL_SSRC:
946       g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL));
947       break;
948     case PROP_INTERNAL_SOURCE:
949       /* FIXME, return a random source */
950       g_value_set_object (value, NULL);
951       break;
952     case PROP_BANDWIDTH:
953       g_value_set_double (value, sess->bandwidth);
954       break;
955     case PROP_RTCP_FRACTION:
956       g_value_set_double (value, sess->rtcp_bandwidth);
957       break;
958     case PROP_RTCP_RR_BANDWIDTH:
959       g_value_set_int (value, sess->rtcp_rr_bandwidth);
960       break;
961     case PROP_RTCP_RS_BANDWIDTH:
962       g_value_set_int (value, sess->rtcp_rs_bandwidth);
963       break;
964     case PROP_RTCP_MTU:
965       g_value_set_uint (value, sess->mtu);
966       break;
967     case PROP_SDES:
968       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
969       break;
970     case PROP_NUM_SOURCES:
971       g_value_set_uint (value, rtp_session_get_num_sources (sess));
972       break;
973     case PROP_NUM_ACTIVE_SOURCES:
974       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
975       break;
976     case PROP_SOURCES:
977       g_value_take_boxed (value, rtp_session_create_sources (sess));
978       break;
979     case PROP_FAVOR_NEW:
980       g_value_set_boolean (value, sess->favor_new);
981       break;
982     case PROP_RTCP_MIN_INTERVAL:
983       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
984       break;
985     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
986       g_value_set_uint64 (value, sess->rtcp_feedback_retention_window);
987       break;
988     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
989       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
990       break;
991     case PROP_PROBATION:
992       g_value_set_uint (value, sess->probation);
993       break;
994     case PROP_MAX_DROPOUT_TIME:
995       g_value_set_uint (value, sess->max_dropout_time);
996       break;
997     case PROP_MAX_MISORDER_TIME:
998       g_value_set_uint (value, sess->max_misorder_time);
999       break;
1000     case PROP_STATS:
1001       g_value_take_boxed (value, rtp_session_create_stats (sess));
1002       break;
1003     case PROP_RTP_PROFILE:
1004       g_value_set_enum (value, sess->rtp_profile);
1005       break;
1006     case PROP_RTCP_REDUCED_SIZE:
1007       g_value_set_boolean (value, sess->reduced_size_rtcp);
1008       break;
1009     case PROP_RTCP_DISABLE_SR_TIMESTAMP:
1010       g_value_set_boolean (value, !sess->timestamp_sender_reports);
1011       break;
1012     case PROP_TWCC_FEEDBACK_INTERVAL:
1013       g_value_set_uint64 (value,
1014           rtp_twcc_manager_get_feedback_interval (sess->twcc));
1015       break;
1016     default:
1017       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1018       break;
1019   }
1020 }
1021
1022 static void
1023 on_new_ssrc (RTPSession * sess, RTPSource * source)
1024 {
1025   g_object_ref (source);
1026   RTP_SESSION_UNLOCK (sess);
1027   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
1028   RTP_SESSION_LOCK (sess);
1029   g_object_unref (source);
1030 }
1031
1032 static void
1033 on_ssrc_collision (RTPSession * sess, RTPSource * source)
1034 {
1035   g_object_ref (source);
1036   RTP_SESSION_UNLOCK (sess);
1037   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
1038       source);
1039   RTP_SESSION_LOCK (sess);
1040   g_object_unref (source);
1041 }
1042
1043 static void
1044 on_ssrc_validated (RTPSession * sess, RTPSource * source)
1045 {
1046   g_object_ref (source);
1047   RTP_SESSION_UNLOCK (sess);
1048   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
1049       source);
1050   RTP_SESSION_LOCK (sess);
1051   g_object_unref (source);
1052 }
1053
1054 static void
1055 on_ssrc_active (RTPSession * sess, RTPSource * source)
1056 {
1057   g_object_ref (source);
1058   RTP_SESSION_UNLOCK (sess);
1059   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
1060   RTP_SESSION_LOCK (sess);
1061   g_object_unref (source);
1062 }
1063
1064 static void
1065 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
1066 {
1067   g_object_ref (source);
1068   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
1069   RTP_SESSION_UNLOCK (sess);
1070   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
1071   RTP_SESSION_LOCK (sess);
1072   g_object_unref (source);
1073 }
1074
1075 static void
1076 on_bye_ssrc (RTPSession * sess, RTPSource * source)
1077 {
1078   g_object_ref (source);
1079   RTP_SESSION_UNLOCK (sess);
1080   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
1081   RTP_SESSION_LOCK (sess);
1082   g_object_unref (source);
1083 }
1084
1085 static void
1086 on_bye_timeout (RTPSession * sess, RTPSource * source)
1087 {
1088   g_object_ref (source);
1089   RTP_SESSION_UNLOCK (sess);
1090   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
1091   RTP_SESSION_LOCK (sess);
1092   g_object_unref (source);
1093 }
1094
1095 static void
1096 on_timeout (RTPSession * sess, RTPSource * source)
1097 {
1098   g_object_ref (source);
1099   RTP_SESSION_UNLOCK (sess);
1100   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
1101   RTP_SESSION_LOCK (sess);
1102   g_object_unref (source);
1103 }
1104
1105 static void
1106 on_sender_timeout (RTPSession * sess, RTPSource * source)
1107 {
1108   g_object_ref (source);
1109   RTP_SESSION_UNLOCK (sess);
1110   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
1111       source);
1112   RTP_SESSION_LOCK (sess);
1113   g_object_unref (source);
1114 }
1115
1116 static void
1117 on_new_sender_ssrc (RTPSession * sess, RTPSource * source)
1118 {
1119   g_object_ref (source);
1120   RTP_SESSION_UNLOCK (sess);
1121   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
1122       source);
1123   RTP_SESSION_LOCK (sess);
1124   g_object_unref (source);
1125 }
1126
1127 static void
1128 on_sender_ssrc_active (RTPSession * sess, RTPSource * source)
1129 {
1130   g_object_ref (source);
1131   RTP_SESSION_UNLOCK (sess);
1132   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
1133       source);
1134   RTP_SESSION_LOCK (sess);
1135   g_object_unref (source);
1136 }
1137
1138 /**
1139  * rtp_session_new:
1140  *
1141  * Create a new session object.
1142  *
1143  * Returns: a new #RTPSession. g_object_unref() after usage.
1144  */
1145 RTPSession *
1146 rtp_session_new (void)
1147 {
1148   RTPSession *sess;
1149
1150   sess = g_object_new (RTP_TYPE_SESSION, NULL);
1151
1152   return sess;
1153 }
1154
1155 /**
1156  * rtp_session_reset:
1157  * @sess: an #RTPSession
1158  *
1159  * Reset the sources of @sess.
1160  */
1161 void
1162 rtp_session_reset (RTPSession * sess)
1163 {
1164   g_return_if_fail (RTP_IS_SESSION (sess));
1165
1166   /* remove all sources */
1167   g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]);
1168   sess->total_sources = 0;
1169   sess->stats.sender_sources = 0;
1170   sess->stats.internal_sender_sources = 0;
1171   sess->stats.internal_sources = 0;
1172   sess->stats.active_sources = 0;
1173
1174   sess->generation = 0;
1175   sess->first_rtcp = TRUE;
1176   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
1177   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
1178   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
1179   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
1180   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
1181   sess->scheduled_bye = FALSE;
1182
1183   /* reset session stats */
1184   sess->stats.bye_members = 0;
1185   sess->stats.nacks_dropped = 0;
1186   sess->stats.nacks_sent = 0;
1187   sess->stats.nacks_received = 0;
1188
1189   sess->is_doing_ptp = TRUE;
1190
1191   g_list_free_full (sess->conflicting_addresses,
1192       (GDestroyNotify) rtp_conflicting_address_free);
1193   sess->conflicting_addresses = NULL;
1194 }
1195
1196 /**
1197  * rtp_session_set_callbacks:
1198  * @sess: an #RTPSession
1199  * @callbacks: callbacks to configure
1200  * @user_data: user data passed in the callbacks
1201  *
1202  * Configure a set of callbacks to be notified of actions.
1203  */
1204 void
1205 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
1206     gpointer user_data)
1207 {
1208   g_return_if_fail (RTP_IS_SESSION (sess));
1209
1210   if (callbacks->process_rtp) {
1211     sess->callbacks.process_rtp = callbacks->process_rtp;
1212     sess->process_rtp_user_data = user_data;
1213   }
1214   if (callbacks->send_rtp) {
1215     sess->callbacks.send_rtp = callbacks->send_rtp;
1216     sess->send_rtp_user_data = user_data;
1217   }
1218   if (callbacks->send_rtcp) {
1219     sess->callbacks.send_rtcp = callbacks->send_rtcp;
1220     sess->send_rtcp_user_data = user_data;
1221   }
1222   if (callbacks->sync_rtcp) {
1223     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
1224     sess->sync_rtcp_user_data = user_data;
1225   }
1226   if (callbacks->clock_rate) {
1227     sess->callbacks.clock_rate = callbacks->clock_rate;
1228     sess->clock_rate_user_data = user_data;
1229   }
1230   if (callbacks->reconsider) {
1231     sess->callbacks.reconsider = callbacks->reconsider;
1232     sess->reconsider_user_data = user_data;
1233   }
1234   if (callbacks->request_key_unit) {
1235     sess->callbacks.request_key_unit = callbacks->request_key_unit;
1236     sess->request_key_unit_user_data = user_data;
1237   }
1238   if (callbacks->request_time) {
1239     sess->callbacks.request_time = callbacks->request_time;
1240     sess->request_time_user_data = user_data;
1241   }
1242   if (callbacks->notify_nack) {
1243     sess->callbacks.notify_nack = callbacks->notify_nack;
1244     sess->notify_nack_user_data = user_data;
1245   }
1246   if (callbacks->notify_twcc) {
1247     sess->callbacks.notify_twcc = callbacks->notify_twcc;
1248     sess->notify_twcc_user_data = user_data;
1249   }
1250   if (callbacks->reconfigure) {
1251     sess->callbacks.reconfigure = callbacks->reconfigure;
1252     sess->reconfigure_user_data = user_data;
1253   }
1254   if (callbacks->notify_early_rtcp) {
1255     sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp;
1256     sess->notify_early_rtcp_user_data = user_data;
1257   }
1258 }
1259
1260 /**
1261  * rtp_session_set_process_rtp_callback:
1262  * @sess: an #RTPSession
1263  * @callback: callback to set
1264  * @user_data: user data passed in the callback
1265  *
1266  * Configure only the process_rtp callback to be notified of the process_rtp action.
1267  */
1268 void
1269 rtp_session_set_process_rtp_callback (RTPSession * sess,
1270     RTPSessionProcessRTP callback, gpointer user_data)
1271 {
1272   g_return_if_fail (RTP_IS_SESSION (sess));
1273
1274   sess->callbacks.process_rtp = callback;
1275   sess->process_rtp_user_data = user_data;
1276 }
1277
1278 /**
1279  * rtp_session_set_send_rtp_callback:
1280  * @sess: an #RTPSession
1281  * @callback: callback to set
1282  * @user_data: user data passed in the callback
1283  *
1284  * Configure only the send_rtp callback to be notified of the send_rtp action.
1285  */
1286 void
1287 rtp_session_set_send_rtp_callback (RTPSession * sess,
1288     RTPSessionSendRTP callback, gpointer user_data)
1289 {
1290   g_return_if_fail (RTP_IS_SESSION (sess));
1291
1292   sess->callbacks.send_rtp = callback;
1293   sess->send_rtp_user_data = user_data;
1294 }
1295
1296 /**
1297  * rtp_session_set_send_rtcp_callback:
1298  * @sess: an #RTPSession
1299  * @callback: callback to set
1300  * @user_data: user data passed in the callback
1301  *
1302  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
1303  */
1304 void
1305 rtp_session_set_send_rtcp_callback (RTPSession * sess,
1306     RTPSessionSendRTCP callback, gpointer user_data)
1307 {
1308   g_return_if_fail (RTP_IS_SESSION (sess));
1309
1310   sess->callbacks.send_rtcp = callback;
1311   sess->send_rtcp_user_data = user_data;
1312 }
1313
1314 /**
1315  * rtp_session_set_sync_rtcp_callback:
1316  * @sess: an #RTPSession
1317  * @callback: callback to set
1318  * @user_data: user data passed in the callback
1319  *
1320  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
1321  */
1322 void
1323 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
1324     RTPSessionSyncRTCP callback, gpointer user_data)
1325 {
1326   g_return_if_fail (RTP_IS_SESSION (sess));
1327
1328   sess->callbacks.sync_rtcp = callback;
1329   sess->sync_rtcp_user_data = user_data;
1330 }
1331
1332 /**
1333  * rtp_session_set_clock_rate_callback:
1334  * @sess: an #RTPSession
1335  * @callback: callback to set
1336  * @user_data: user data passed in the callback
1337  *
1338  * Configure only the clock_rate callback to be notified of the clock_rate action.
1339  */
1340 void
1341 rtp_session_set_clock_rate_callback (RTPSession * sess,
1342     RTPSessionClockRate callback, gpointer user_data)
1343 {
1344   g_return_if_fail (RTP_IS_SESSION (sess));
1345
1346   sess->callbacks.clock_rate = callback;
1347   sess->clock_rate_user_data = user_data;
1348 }
1349
1350 /**
1351  * rtp_session_set_reconsider_callback:
1352  * @sess: an #RTPSession
1353  * @callback: callback to set
1354  * @user_data: user data passed in the callback
1355  *
1356  * Configure only the reconsider callback to be notified of the reconsider action.
1357  */
1358 void
1359 rtp_session_set_reconsider_callback (RTPSession * sess,
1360     RTPSessionReconsider callback, gpointer user_data)
1361 {
1362   g_return_if_fail (RTP_IS_SESSION (sess));
1363
1364   sess->callbacks.reconsider = callback;
1365   sess->reconsider_user_data = user_data;
1366 }
1367
1368 /**
1369  * rtp_session_set_request_time_callback:
1370  * @sess: an #RTPSession
1371  * @callback: callback to set
1372  * @user_data: user data passed in the callback
1373  *
1374  * Configure only the request_time callback
1375  */
1376 void
1377 rtp_session_set_request_time_callback (RTPSession * sess,
1378     RTPSessionRequestTime callback, gpointer user_data)
1379 {
1380   g_return_if_fail (RTP_IS_SESSION (sess));
1381
1382   sess->callbacks.request_time = callback;
1383   sess->request_time_user_data = user_data;
1384 }
1385
1386 /**
1387  * rtp_session_set_bandwidth:
1388  * @sess: an #RTPSession
1389  * @bandwidth: the bandwidth allocated
1390  *
1391  * Set the session bandwidth in bits per second.
1392  */
1393 void
1394 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1395 {
1396   g_return_if_fail (RTP_IS_SESSION (sess));
1397
1398   RTP_SESSION_LOCK (sess);
1399   sess->stats.bandwidth = bandwidth;
1400   RTP_SESSION_UNLOCK (sess);
1401 }
1402
1403 /**
1404  * rtp_session_get_bandwidth:
1405  * @sess: an #RTPSession
1406  *
1407  * Get the session bandwidth.
1408  *
1409  * Returns: the session bandwidth.
1410  */
1411 gdouble
1412 rtp_session_get_bandwidth (RTPSession * sess)
1413 {
1414   gdouble result;
1415
1416   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1417
1418   RTP_SESSION_LOCK (sess);
1419   result = sess->stats.bandwidth;
1420   RTP_SESSION_UNLOCK (sess);
1421
1422   return result;
1423 }
1424
1425 /**
1426  * rtp_session_set_rtcp_fraction:
1427  * @sess: an #RTPSession
1428  * @bandwidth: the RTCP bandwidth
1429  *
1430  * Set the bandwidth in bits per second that should be used for RTCP
1431  * messages.
1432  */
1433 void
1434 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1435 {
1436   g_return_if_fail (RTP_IS_SESSION (sess));
1437
1438   RTP_SESSION_LOCK (sess);
1439   sess->stats.rtcp_bandwidth = bandwidth;
1440   RTP_SESSION_UNLOCK (sess);
1441 }
1442
1443 /**
1444  * rtp_session_get_rtcp_fraction:
1445  * @sess: an #RTPSession
1446  *
1447  * Get the session bandwidth used for RTCP.
1448  *
1449  * Returns: The bandwidth used for RTCP messages.
1450  */
1451 gdouble
1452 rtp_session_get_rtcp_fraction (RTPSession * sess)
1453 {
1454   gdouble result;
1455
1456   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1457
1458   RTP_SESSION_LOCK (sess);
1459   result = sess->stats.rtcp_bandwidth;
1460   RTP_SESSION_UNLOCK (sess);
1461
1462   return result;
1463 }
1464
1465 /**
1466  * rtp_session_get_sdes_struct:
1467  * @sess: an #RTSPSession
1468  *
1469  * Get the SDES data as a #GstStructure
1470  *
1471  * Returns: a GstStructure with SDES items for @sess. This function returns a
1472  * copy of the SDES structure, use gst_structure_free() after usage.
1473  */
1474 GstStructure *
1475 rtp_session_get_sdes_struct (RTPSession * sess)
1476 {
1477   GstStructure *result = NULL;
1478
1479   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1480
1481   RTP_SESSION_LOCK (sess);
1482   if (sess->sdes)
1483     result = gst_structure_copy (sess->sdes);
1484   RTP_SESSION_UNLOCK (sess);
1485
1486   return result;
1487 }
1488
1489 static void
1490 source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes)
1491 {
1492   rtp_source_set_sdes_struct (source, gst_structure_copy (sdes));
1493 }
1494
1495 /**
1496  * rtp_session_set_sdes_struct:
1497  * @sess: an #RTSPSession
1498  * @sdes: a #GstStructure
1499  *
1500  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1501  */
1502 void
1503 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1504 {
1505   g_return_if_fail (sdes);
1506   g_return_if_fail (RTP_IS_SESSION (sess));
1507
1508   RTP_SESSION_LOCK (sess);
1509   if (sess->sdes)
1510     gst_structure_free (sess->sdes);
1511   sess->sdes = gst_structure_copy (sdes);
1512
1513   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1514       (GHFunc) source_set_sdes, sess->sdes);
1515   RTP_SESSION_UNLOCK (sess);
1516 }
1517
1518 static GstFlowReturn
1519 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1520 {
1521   GstFlowReturn result = GST_FLOW_OK;
1522
1523   if (source->internal) {
1524     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1525
1526     RTP_SESSION_UNLOCK (session);
1527
1528     if (session->callbacks.send_rtp)
1529       result =
1530           session->callbacks.send_rtp (session, source, data,
1531           session->send_rtp_user_data);
1532     else {
1533       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1534     }
1535   } else {
1536     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1537     RTP_SESSION_UNLOCK (session);
1538
1539     if (session->callbacks.process_rtp)
1540       result =
1541           session->callbacks.process_rtp (session, source,
1542           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1543     else
1544       gst_buffer_unref (GST_BUFFER_CAST (data));
1545   }
1546   RTP_SESSION_LOCK (session);
1547
1548   return result;
1549 }
1550
1551 static gint
1552 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1553 {
1554   gint result;
1555
1556   RTP_SESSION_UNLOCK (session);
1557
1558   if (session->callbacks.clock_rate)
1559     result =
1560         session->callbacks.clock_rate (session, pt,
1561         session->clock_rate_user_data);
1562   else
1563     result = -1;
1564
1565   RTP_SESSION_LOCK (session);
1566
1567   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1568
1569   return result;
1570 }
1571
1572 static RTPSourceCallbacks callbacks = {
1573   (RTPSourcePushRTP) source_push_rtp,
1574   (RTPSourceClockRate) source_clock_rate,
1575 };
1576
1577
1578 /**
1579  * rtp_session_find_conflicting_address:
1580  * @session: The session the packet came in
1581  * @address: address to check for
1582  * @time: The time when the packet that is possibly in conflict arrived
1583  *
1584  * Checks if an address which has a conflict is already known. If it is
1585  * a known conflict, remember the time
1586  *
1587  * Returns: TRUE if it was a known conflict, FALSE otherwise
1588  */
1589 static gboolean
1590 rtp_session_find_conflicting_address (RTPSession * session,
1591     GSocketAddress * address, GstClockTime time)
1592 {
1593   return find_conflicting_address (session->conflicting_addresses, address,
1594       time);
1595 }
1596
1597 /**
1598  * rtp_session_add_conflicting_address:
1599  * @session: The session the packet came in
1600  * @address: address to remember
1601  * @time: The time when the packet that is in conflict arrived
1602  *
1603  * Adds a new conflict address
1604  */
1605 static void
1606 rtp_session_add_conflicting_address (RTPSession * sess,
1607     GSocketAddress * address, GstClockTime time)
1608 {
1609   sess->conflicting_addresses =
1610       add_conflicting_address (sess->conflicting_addresses, address, time);
1611 }
1612
1613 static void
1614 rtp_session_have_conflict (RTPSession * sess, RTPSource * source,
1615     GSocketAddress * address, GstClockTime current_time)
1616 {
1617   guint32 ssrc = rtp_source_get_ssrc (source);
1618
1619   /* Its a new collision, lets change our SSRC */
1620   rtp_session_add_conflicting_address (sess, address, current_time);
1621
1622   /* mark the source BYE */
1623   rtp_source_mark_bye (source, "SSRC Collision");
1624   /* if we were suggesting this SSRC, change to something else */
1625   if (sess->suggested_ssrc == ssrc) {
1626     sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1627     sess->internal_ssrc_set = TRUE;
1628   }
1629
1630   on_ssrc_collision (sess, source);
1631
1632   rtp_session_schedule_bye_locked (sess, current_time);
1633 }
1634
1635 static gboolean
1636 check_collision (RTPSession * sess, RTPSource * source,
1637     RTPPacketInfo * pinfo, gboolean rtp)
1638 {
1639   guint32 ssrc;
1640
1641   /* If we have no pinfo address, we can't do collision checking */
1642   if (!pinfo->address)
1643     return FALSE;
1644
1645   ssrc = rtp_source_get_ssrc (source);
1646
1647   if (!source->internal) {
1648     GSocketAddress *from;
1649
1650     /* This is not our local source, but lets check if two remote
1651      * source collide */
1652     if (rtp) {
1653       from = source->rtp_from;
1654     } else {
1655       from = source->rtcp_from;
1656     }
1657
1658     if (from) {
1659       if (__g_socket_address_equal (from, pinfo->address)) {
1660         /* Address is the same */
1661         return FALSE;
1662       } else {
1663         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1664         if (sess->favor_new) {
1665           if (rtp_source_find_conflicting_address (source,
1666                   pinfo->address, pinfo->current_time)) {
1667             gchar *buf1;
1668
1669             buf1 = __g_socket_address_to_string (pinfo->address);
1670             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1671                 buf1);
1672             g_free (buf1);
1673
1674             return TRUE;
1675           } else {
1676             gchar *buf1, *buf2;
1677
1678             /* Current address is not a known conflict, lets assume this is
1679              * a new source. Save old address in possible conflict list
1680              */
1681             rtp_source_add_conflicting_address (source, from,
1682                 pinfo->current_time);
1683
1684             buf1 = __g_socket_address_to_string (from);
1685             buf2 = __g_socket_address_to_string (pinfo->address);
1686
1687             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1688                 " saving old as known conflict", ssrc, buf1, buf2);
1689
1690             if (rtp)
1691               rtp_source_set_rtp_from (source, pinfo->address);
1692             else
1693               rtp_source_set_rtcp_from (source, pinfo->address);
1694
1695             g_free (buf1);
1696             g_free (buf2);
1697
1698             return FALSE;
1699           }
1700         } else {
1701           /* Don't need to save old addresses, we ignore new sources */
1702           return TRUE;
1703         }
1704       }
1705     } else {
1706       /* We don't already have a from address for RTP, just set it */
1707       if (rtp)
1708         rtp_source_set_rtp_from (source, pinfo->address);
1709       else
1710         rtp_source_set_rtcp_from (source, pinfo->address);
1711       return FALSE;
1712     }
1713
1714     /* FIXME: Log 3rd party collision somehow
1715      * Maybe should be done in upper layer, only the SDES can tell us
1716      * if its a collision or a loop
1717      */
1718   } else {
1719     /* This is sending with our ssrc, is it an address we already know */
1720     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1721             pinfo->current_time)) {
1722       /* Its a known conflict, its probably a loop, not a collision
1723        * lets just drop the incoming packet
1724        */
1725       GST_DEBUG ("Our packets are being looped back to us, dropping");
1726     } else {
1727       GST_DEBUG ("Collision for SSRC %x from new incoming packet,"
1728           " change our sender ssrc", ssrc);
1729
1730       rtp_session_have_conflict (sess, source, pinfo->address,
1731           pinfo->current_time);
1732     }
1733   }
1734
1735   return TRUE;
1736 }
1737
1738 typedef struct
1739 {
1740   gboolean is_doing_ptp;
1741   GSocketAddress *new_addr;
1742 } CompareAddrData;
1743
1744 /* check if the two given ip addr are the same (do not care about the port) */
1745 static gboolean
1746 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1747 {
1748   return
1749       g_inet_address_equal (g_inet_socket_address_get_address
1750       (G_INET_SOCKET_ADDRESS (a)),
1751       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1752 }
1753
1754 static void
1755 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1756     CompareAddrData * data)
1757 {
1758   /* only compare ip addr of remote sources which are also not closing */
1759   if (!source->internal && !source->closing && source->rtp_from) {
1760     /* look for the first rtp source */
1761     if (!data->new_addr)
1762       data->new_addr = source->rtp_from;
1763     /* compare current ip addr with the first one */
1764     else
1765       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1766   }
1767 }
1768
1769 static void
1770 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1771     CompareAddrData * data)
1772 {
1773   /* only compare ip addr of remote sources which are also not closing */
1774   if (!source->internal && !source->closing && source->rtcp_from) {
1775     /* look for the first rtcp source */
1776     if (!data->new_addr)
1777       data->new_addr = source->rtcp_from;
1778     else
1779       /* compare current ip addr with the first one */
1780       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1781   }
1782 }
1783
1784 /* loop over our non-internal source to know if the session
1785  * is doing point-to-point */
1786 static void
1787 session_update_ptp (RTPSession * sess)
1788 {
1789   /* to know if the session is doing point to point, the ip addr
1790    * of each non-internal (=remotes) source have to be compared
1791    * to each other.
1792    */
1793   gboolean is_doing_rtp_ptp;
1794   gboolean is_doing_rtcp_ptp;
1795   CompareAddrData data;
1796
1797   /* compare the first remote source's ip addr that receive rtp packets
1798    * with other remote rtp source.
1799    * it's enough because the session just needs to know if they are all
1800    * equals or not
1801    */
1802   data.is_doing_ptp = TRUE;
1803   data.new_addr = NULL;
1804   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1805       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1806   is_doing_rtp_ptp = data.is_doing_ptp;
1807
1808   /* same but about rtcp */
1809   data.is_doing_ptp = TRUE;
1810   data.new_addr = NULL;
1811   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1812       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1813   is_doing_rtcp_ptp = data.is_doing_ptp;
1814
1815   /* the session is doing point-to-point if all rtp remote have the same
1816    * ip addr and if all rtcp remote sources have the same ip addr */
1817   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1818
1819   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1820 }
1821
1822 static void
1823 add_source (RTPSession * sess, RTPSource * src)
1824 {
1825   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1826       GINT_TO_POINTER (src->ssrc), src);
1827   /* report the new source ASAP */
1828   src->generation = sess->generation;
1829   /* we have one more source now */
1830   sess->total_sources++;
1831   if (RTP_SOURCE_IS_ACTIVE (src))
1832     sess->stats.active_sources++;
1833   if (src->internal) {
1834     sess->stats.internal_sources++;
1835     if (!sess->internal_ssrc_from_caps_or_property
1836         && sess->suggested_ssrc != src->ssrc) {
1837       sess->suggested_ssrc = src->ssrc;
1838       sess->internal_ssrc_set = TRUE;
1839     }
1840   }
1841
1842   /* update point-to-point status */
1843   if (!src->internal)
1844     session_update_ptp (sess);
1845 }
1846
1847 static RTPSource *
1848 find_source (RTPSession * sess, guint32 ssrc)
1849 {
1850   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1851       GINT_TO_POINTER (ssrc));
1852 }
1853
1854 /* must be called with the session lock, the returned source needs to be
1855  * unreffed after usage. */
1856 static RTPSource *
1857 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1858     RTPPacketInfo * pinfo, gboolean rtp)
1859 {
1860   RTPSource *source;
1861
1862   source = find_source (sess, ssrc);
1863   if (source == NULL) {
1864     /* make new Source in probation and insert */
1865     source = rtp_source_new (ssrc);
1866
1867     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1868
1869     /* for RTP packets we need to set the source in probation. Receiving RTCP
1870      * packets of an SSRC, on the other hand, is a strong indication that we
1871      * are dealing with a valid source. */
1872     g_object_set (source, "probation", rtp ? sess->probation : 0,
1873         "max-dropout-time", sess->max_dropout_time, "max-misorder-time",
1874         sess->max_misorder_time, NULL);
1875
1876     /* store from address, if any */
1877     if (pinfo->address) {
1878       if (rtp)
1879         rtp_source_set_rtp_from (source, pinfo->address);
1880       else
1881         rtp_source_set_rtcp_from (source, pinfo->address);
1882     }
1883
1884     /* configure a callback on the source */
1885     rtp_source_set_callbacks (source, &callbacks, sess);
1886
1887     add_source (sess, source);
1888     *created = TRUE;
1889   } else {
1890     *created = FALSE;
1891     /* check for collision, this updates the address when not previously set */
1892     if (check_collision (sess, source, pinfo, rtp)) {
1893       return NULL;
1894     }
1895     /* Receiving RTCP packets of an SSRC is a strong indication that we
1896      * are dealing with a valid source. */
1897     if (!rtp)
1898       g_object_set (source, "probation", 0, NULL);
1899   }
1900   /* update last activity */
1901   source->last_activity = pinfo->current_time;
1902   if (rtp)
1903     source->last_rtp_activity = pinfo->current_time;
1904   g_object_ref (source);
1905
1906   return source;
1907 }
1908
1909 /* must be called with the session lock, the returned source needs to be
1910  * unreffed after usage. */
1911 static RTPSource *
1912 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1913     GstClockTime current_time)
1914 {
1915   RTPSource *source;
1916
1917   source = find_source (sess, ssrc);
1918   if (source == NULL) {
1919     /* make new internal Source and insert */
1920     source = rtp_source_new (ssrc);
1921
1922     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1923
1924     source->validated = TRUE;
1925     source->internal = TRUE;
1926     source->probation = FALSE;
1927     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1928     rtp_source_set_callbacks (source, &callbacks, sess);
1929
1930     add_source (sess, source);
1931     *created = TRUE;
1932   } else {
1933     *created = FALSE;
1934   }
1935   /* update last activity */
1936   if (current_time != GST_CLOCK_TIME_NONE) {
1937     source->last_activity = current_time;
1938     source->last_rtp_activity = current_time;
1939   }
1940   g_object_ref (source);
1941
1942   return source;
1943 }
1944
1945 /**
1946  * rtp_session_suggest_ssrc:
1947  * @sess: a #RTPSession
1948  * @is_random: if the suggested ssrc is random
1949  *
1950  * Suggest an unused SSRC in @sess.
1951  *
1952  * Returns: a free unused SSRC
1953  */
1954 guint32
1955 rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random)
1956 {
1957   guint32 result;
1958
1959   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1960
1961   RTP_SESSION_LOCK (sess);
1962   result = sess->suggested_ssrc;
1963   if (is_random)
1964     *is_random = !sess->internal_ssrc_set;
1965   RTP_SESSION_UNLOCK (sess);
1966
1967   return result;
1968 }
1969
1970 /**
1971  * rtp_session_add_source:
1972  * @sess: a #RTPSession
1973  * @src: #RTPSource to add
1974  *
1975  * Add @src to @session.
1976  *
1977  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1978  * existed in the session.
1979  */
1980 gboolean
1981 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1982 {
1983   gboolean result = FALSE;
1984   RTPSource *find;
1985
1986   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1987   g_return_val_if_fail (src != NULL, FALSE);
1988
1989   RTP_SESSION_LOCK (sess);
1990   find = find_source (sess, src->ssrc);
1991   if (find == NULL) {
1992     add_source (sess, src);
1993     result = TRUE;
1994   }
1995   RTP_SESSION_UNLOCK (sess);
1996
1997   return result;
1998 }
1999
2000 /**
2001  * rtp_session_get_num_sources:
2002  * @sess: an #RTPSession
2003  *
2004  * Get the number of sources in @sess.
2005  *
2006  * Returns: The number of sources in @sess.
2007  */
2008 guint
2009 rtp_session_get_num_sources (RTPSession * sess)
2010 {
2011   guint result;
2012
2013   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
2014
2015   RTP_SESSION_LOCK (sess);
2016   result = sess->total_sources;
2017   RTP_SESSION_UNLOCK (sess);
2018
2019   return result;
2020 }
2021
2022 /**
2023  * rtp_session_get_num_active_sources:
2024  * @sess: an #RTPSession
2025  *
2026  * Get the number of active sources in @sess. A source is considered active when
2027  * it has been validated and has not yet received a BYE RTCP message.
2028  *
2029  * Returns: The number of active sources in @sess.
2030  */
2031 guint
2032 rtp_session_get_num_active_sources (RTPSession * sess)
2033 {
2034   guint result;
2035
2036   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
2037
2038   RTP_SESSION_LOCK (sess);
2039   result = sess->stats.active_sources;
2040   RTP_SESSION_UNLOCK (sess);
2041
2042   return result;
2043 }
2044
2045 /**
2046  * rtp_session_get_source_by_ssrc:
2047  * @sess: an #RTPSession
2048  * @ssrc: an SSRC
2049  *
2050  * Find the source with @ssrc in @sess.
2051  *
2052  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
2053  * g_object_unref() after usage.
2054  */
2055 RTPSource *
2056 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
2057 {
2058   RTPSource *result;
2059
2060   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
2061
2062   RTP_SESSION_LOCK (sess);
2063   result = find_source (sess, ssrc);
2064   if (result != NULL)
2065     g_object_ref (result);
2066   RTP_SESSION_UNLOCK (sess);
2067
2068   return result;
2069 }
2070
2071 /* should be called with the SESSION lock */
2072 static guint32
2073 rtp_session_create_new_ssrc (RTPSession * sess)
2074 {
2075   guint32 ssrc;
2076
2077   while (TRUE) {
2078     ssrc = g_random_int ();
2079
2080     /* see if it exists in the session, we're done if it doesn't */
2081     if (find_source (sess, ssrc) == NULL)
2082       break;
2083   }
2084   return ssrc;
2085 }
2086
2087 static gboolean
2088 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
2089 {
2090   GstNetAddressMeta *meta;
2091
2092   /* get packet size including header overhead */
2093   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
2094   pinfo->packets++;
2095
2096   if (pinfo->rtp) {
2097     GstRTPBuffer rtp = { NULL };
2098
2099     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
2100       goto invalid_packet;
2101
2102     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
2103     if (idx == 0) {
2104       gint i;
2105
2106       /* only keep info for first buffer */
2107       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
2108       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
2109       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
2110       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2111       pinfo->marker = gst_rtp_buffer_get_marker (&rtp);
2112       /* copy available csrc */
2113       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
2114       for (i = 0; i < pinfo->csrc_count; i++)
2115         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
2116
2117       /* RTP header extensions */
2118       pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp,
2119           &pinfo->header_ext_bit_pattern);
2120     }
2121     gst_rtp_buffer_unmap (&rtp);
2122   }
2123
2124   if (idx == 0) {
2125     /* for netbuffer we can store the IP address to check for collisions */
2126     meta = gst_buffer_get_net_address_meta (*buffer);
2127     if (pinfo->address)
2128       g_object_unref (pinfo->address);
2129     if (meta) {
2130       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
2131     } else {
2132       pinfo->address = NULL;
2133     }
2134   }
2135   return TRUE;
2136
2137   /* ERRORS */
2138 invalid_packet:
2139   {
2140     GST_DEBUG ("invalid RTP packet received");
2141     return FALSE;
2142   }
2143 }
2144
2145 /* update the RTPPacketInfo structure with the current time and other bits
2146  * about the current buffer we are handling.
2147  * This function is typically called when a validated packet is received.
2148  * This function should be called with the RTP_SESSION_LOCK
2149  */
2150 static gboolean
2151 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
2152     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
2153     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2154 {
2155   gboolean res;
2156
2157   pinfo->send = send;
2158   pinfo->rtp = rtp;
2159   pinfo->is_list = is_list;
2160   pinfo->data = data;
2161   pinfo->current_time = current_time;
2162   pinfo->running_time = running_time;
2163   pinfo->ntpnstime = ntpnstime;
2164   pinfo->header_len = sess->header_len;
2165   pinfo->bytes = 0;
2166   pinfo->payload_len = 0;
2167   pinfo->packets = 0;
2168   pinfo->marker = FALSE;
2169
2170   if (is_list) {
2171     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
2172     res =
2173         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
2174         pinfo);
2175     pinfo->arrival_time = GST_CLOCK_TIME_NONE;
2176   } else {
2177     GstBuffer *buffer = GST_BUFFER_CAST (data);
2178     res = update_packet (&buffer, 0, pinfo);
2179     pinfo->arrival_time = GST_BUFFER_DTS (buffer);
2180   }
2181
2182   return res;
2183 }
2184
2185 static void
2186 clean_packet_info (RTPPacketInfo * pinfo)
2187 {
2188   if (pinfo->address)
2189     g_object_unref (pinfo->address);
2190   if (pinfo->data) {
2191     gst_mini_object_unref (pinfo->data);
2192     pinfo->data = NULL;
2193   }
2194   if (pinfo->header_ext)
2195     g_bytes_unref (pinfo->header_ext);
2196 }
2197
2198 static gboolean
2199 source_update_active (RTPSession * sess, RTPSource * source,
2200     gboolean prevactive)
2201 {
2202   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
2203   guint32 ssrc = source->ssrc;
2204
2205   if (prevactive == active)
2206     return FALSE;
2207
2208   if (active) {
2209     sess->stats.active_sources++;
2210     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
2211         sess->stats.active_sources);
2212   } else {
2213     sess->stats.active_sources--;
2214     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
2215         sess->stats.active_sources);
2216   }
2217   return TRUE;
2218 }
2219
2220 static void
2221 process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
2222 {
2223   if (rtp_twcc_manager_recv_packet (sess->twcc, pinfo)) {
2224     RTP_SESSION_UNLOCK (sess);
2225
2226     /* TODO: find a better rational for this number, and possibly tune it based
2227        on factors like framerate / bandwidth etc */
2228     if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) {
2229       GST_INFO ("Could not schedule TWCC straight away");
2230     }
2231     RTP_SESSION_LOCK (sess);
2232   }
2233 }
2234
2235 static gboolean
2236 source_update_sender (RTPSession * sess, RTPSource * source,
2237     gboolean prevsender)
2238 {
2239   gboolean sender = RTP_SOURCE_IS_SENDER (source);
2240   guint32 ssrc = source->ssrc;
2241
2242   if (prevsender == sender)
2243     return FALSE;
2244
2245   if (sender) {
2246     sess->stats.sender_sources++;
2247     if (source->internal)
2248       sess->stats.internal_sender_sources++;
2249     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
2250         sess->stats.sender_sources);
2251   } else {
2252     sess->stats.sender_sources--;
2253     if (source->internal)
2254       sess->stats.internal_sender_sources--;
2255     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
2256         sess->stats.sender_sources);
2257   }
2258   return TRUE;
2259 }
2260
2261 /**
2262  * rtp_session_process_rtp:
2263  * @sess: and #RTPSession
2264  * @buffer: an RTP buffer
2265  * @current_time: the current system time
2266  * @running_time: the running_time of @buffer
2267  *
2268  * Process an RTP buffer in the session manager. This function takes ownership
2269  * of @buffer.
2270  *
2271  * Returns: a #GstFlowReturn.
2272  */
2273 GstFlowReturn
2274 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
2275     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2276 {
2277   GstFlowReturn result;
2278   guint32 ssrc;
2279   RTPSource *source;
2280   gboolean created;
2281   gboolean prevsender, prevactive;
2282   RTPPacketInfo pinfo = { 0, };
2283   guint64 oldrate;
2284
2285   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2286   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2287
2288   RTP_SESSION_LOCK (sess);
2289
2290   /* update pinfo stats */
2291   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
2292           current_time, running_time, ntpnstime)) {
2293     GST_DEBUG ("invalid RTP packet received");
2294     RTP_SESSION_UNLOCK (sess);
2295     return rtp_session_process_rtcp (sess, buffer, current_time, running_time,
2296         ntpnstime);
2297   }
2298
2299   ssrc = pinfo.ssrc;
2300
2301   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
2302   if (!source)
2303     goto collision;
2304
2305   prevsender = RTP_SOURCE_IS_SENDER (source);
2306   prevactive = RTP_SOURCE_IS_ACTIVE (source);
2307   oldrate = source->bitrate;
2308
2309   if (created)
2310     on_new_ssrc (sess, source);
2311
2312   /* let source process the packet */
2313   result = rtp_source_process_rtp (source, &pinfo);
2314   process_twcc_packet (sess, &pinfo);
2315
2316   /* source became active */
2317   if (source_update_active (sess, source, prevactive))
2318     on_ssrc_validated (sess, source);
2319
2320   source_update_sender (sess, source, prevsender);
2321
2322   if (oldrate != source->bitrate)
2323     sess->recalc_bandwidth = TRUE;
2324
2325
2326   if (source->validated) {
2327     gboolean created;
2328     gint i;
2329
2330     /* for validated sources, we add the CSRCs as well */
2331     for (i = 0; i < pinfo.csrc_count; i++) {
2332       guint32 csrc;
2333       RTPSource *csrc_src;
2334
2335       csrc = pinfo.csrcs[i];
2336
2337       /* get source */
2338       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
2339       if (!csrc_src)
2340         continue;
2341
2342       if (created) {
2343         GST_DEBUG ("created new CSRC: %08x", csrc);
2344         rtp_source_set_as_csrc (csrc_src);
2345         source_update_active (sess, csrc_src, FALSE);
2346         on_new_ssrc (sess, csrc_src);
2347       }
2348       g_object_unref (csrc_src);
2349     }
2350   }
2351   g_object_unref (source);
2352
2353   RTP_SESSION_UNLOCK (sess);
2354
2355   clean_packet_info (&pinfo);
2356
2357   return result;
2358
2359   /* ERRORS */
2360 collision:
2361   {
2362     RTP_SESSION_UNLOCK (sess);
2363     clean_packet_info (&pinfo);
2364     GST_DEBUG ("ignoring packet because its collisioning");
2365     return GST_FLOW_OK;
2366   }
2367 }
2368
2369 static void
2370 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2371     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2372 {
2373   guint count, i;
2374
2375   count = gst_rtcp_packet_get_rb_count (packet);
2376   for (i = 0; i < count; i++) {
2377     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2378     guint8 fractionlost;
2379     gint32 packetslost;
2380     RTPSource *src;
2381
2382     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2383         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2384
2385     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2386
2387     /* find our own source */
2388     src = find_source (sess, ssrc);
2389     if (src == NULL)
2390       continue;
2391
2392     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2393       /* only deal with report blocks for our session, we update the stats of
2394        * the sender of the RTCP message. We could also compare our stats against
2395        * the other sender to see if we are better or worse. */
2396       /* FIXME, need to keep track who the RB block is from */
2397       rtp_source_process_rb (source, ssrc, pinfo->ntpnstime, fractionlost,
2398           packetslost, exthighestseq, jitter, lsr, dlsr);
2399     }
2400   }
2401   on_ssrc_active (sess, source);
2402 }
2403
2404 /* A Sender report contains statistics about how the sender is doing. This
2405  * includes timing informataion such as the relation between RTP and NTP
2406  * timestamps and the number of packets/bytes it sent to us.
2407  *
2408  * In this report is also included a set of report blocks related to how this
2409  * sender is receiving data (in case we (or somebody else) is also sending stuff
2410  * to it). This info includes the packet loss, jitter and seqnum. It also
2411  * contains information to calculate the round trip time (LSR/DLSR).
2412  */
2413 static void
2414 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2415     RTPPacketInfo * pinfo, gboolean * do_sync)
2416 {
2417   guint32 senderssrc, rtptime, packet_count, octet_count;
2418   guint64 ntptime;
2419   RTPSource *source;
2420   gboolean created, prevsender;
2421
2422   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2423       &packet_count, &octet_count);
2424
2425   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2426       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2427
2428   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2429   if (!source)
2430     return;
2431
2432   /* skip non-bye packets for sources that are marked BYE */
2433   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2434     goto out;
2435
2436   /* don't try to do lip-sync for sources that sent a BYE */
2437   if (RTP_SOURCE_IS_MARKED_BYE (source))
2438     *do_sync = FALSE;
2439   else
2440     *do_sync = TRUE;
2441
2442   prevsender = RTP_SOURCE_IS_SENDER (source);
2443
2444   /* first update the source */
2445   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2446       packet_count, octet_count);
2447
2448   source_update_sender (sess, source, prevsender);
2449
2450   if (created)
2451     on_new_ssrc (sess, source);
2452
2453   rtp_session_process_rb (sess, source, packet, pinfo);
2454
2455 out:
2456   g_object_unref (source);
2457 }
2458
2459 /* A receiver report contains statistics about how a receiver is doing. It
2460  * includes stuff like packet loss, jitter and the seqnum it received last. It
2461  * also contains info to calculate the round trip time.
2462  *
2463  * We are only interested in how the sender of this report is doing wrt to us.
2464  */
2465 static void
2466 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2467     RTPPacketInfo * pinfo)
2468 {
2469   guint32 senderssrc;
2470   RTPSource *source;
2471   gboolean created;
2472
2473   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2474
2475   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2476
2477   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2478   if (!source)
2479     return;
2480
2481   /* skip non-bye packets for sources that are marked BYE */
2482   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2483     goto out;
2484
2485   if (created)
2486     on_new_ssrc (sess, source);
2487
2488   rtp_session_process_rb (sess, source, packet, pinfo);
2489
2490 out:
2491   g_object_unref (source);
2492 }
2493
2494 /* Get SDES items and store them in the SSRC */
2495 static void
2496 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2497     RTPPacketInfo * pinfo)
2498 {
2499   guint items, i, j;
2500   gboolean more_items, more_entries;
2501
2502   items = gst_rtcp_packet_sdes_get_item_count (packet);
2503   GST_DEBUG ("got SDES packet with %d items", items);
2504
2505   more_items = gst_rtcp_packet_sdes_first_item (packet);
2506   i = 0;
2507   while (more_items) {
2508     guint32 ssrc;
2509     gboolean changed, created, prevactive;
2510     RTPSource *source;
2511     GstStructure *sdes;
2512
2513     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2514
2515     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2516
2517     changed = FALSE;
2518
2519     /* find src, no probation when dealing with RTCP */
2520     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2521     if (!source)
2522       return;
2523
2524     /* skip non-bye packets for sources that are marked BYE */
2525     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2526       goto next;
2527
2528     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2529
2530     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2531     j = 0;
2532     while (more_entries) {
2533       GstRTCPSDESType type;
2534       guint8 len;
2535       guint8 *data;
2536       gchar *name;
2537       gchar *value;
2538
2539       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2540
2541       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2542           data);
2543
2544       if (type == GST_RTCP_SDES_PRIV) {
2545         name = g_strndup ((const gchar *) &data[1], data[0]);
2546         len -= data[0] + 1;
2547         data += data[0] + 1;
2548       } else {
2549         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2550       }
2551
2552       value = g_strndup ((const gchar *) data, len);
2553
2554       if (g_utf8_validate (value, -1, NULL)) {
2555         gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2556       } else {
2557         GST_WARNING ("ignore SDES field %s with non-utf8 data %s", name, value);
2558       }
2559
2560       g_free (name);
2561       g_free (value);
2562
2563       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2564       j++;
2565     }
2566
2567     /* takes ownership of sdes */
2568     changed = rtp_source_set_sdes_struct (source, sdes);
2569
2570     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2571     source->validated = TRUE;
2572
2573     if (created)
2574       on_new_ssrc (sess, source);
2575
2576     /* source became active */
2577     if (source_update_active (sess, source, prevactive))
2578       on_ssrc_validated (sess, source);
2579
2580     if (changed)
2581       on_ssrc_sdes (sess, source);
2582
2583   next:
2584     g_object_unref (source);
2585
2586     more_items = gst_rtcp_packet_sdes_next_item (packet);
2587     i++;
2588   }
2589 }
2590
2591 /* BYE is sent when a client leaves the session
2592  */
2593 static void
2594 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2595     RTPPacketInfo * pinfo)
2596 {
2597   guint count, i;
2598   gchar *reason;
2599   gboolean reconsider = FALSE;
2600
2601   reason = gst_rtcp_packet_bye_get_reason (packet);
2602   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2603
2604   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2605   for (i = 0; i < count; i++) {
2606     guint32 ssrc;
2607     RTPSource *source;
2608     gboolean prevactive, prevsender;
2609     guint pmembers, members;
2610
2611     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2612     GST_DEBUG ("SSRC: %08x", ssrc);
2613
2614     /* find src and mark bye, no probation when dealing with RTCP */
2615     source = find_source (sess, ssrc);
2616     if (!source || source->internal) {
2617       GST_DEBUG ("Ignoring suspicious BYE packet (reason: %s)",
2618           !source ? "can't find source" : "has internal source SSRC");
2619       break;
2620     }
2621
2622     /* store time for when we need to time out this source */
2623     source->bye_time = pinfo->current_time;
2624
2625     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2626     prevsender = RTP_SOURCE_IS_SENDER (source);
2627
2628     /* mark the source BYE */
2629     rtp_source_mark_bye (source, reason);
2630
2631     pmembers = sess->stats.active_sources;
2632
2633     source_update_active (sess, source, prevactive);
2634     source_update_sender (sess, source, prevsender);
2635
2636     members = sess->stats.active_sources;
2637
2638     if (!sess->scheduled_bye && members < pmembers) {
2639       /* some members went away since the previous timeout estimate.
2640        * Perform reverse reconsideration but only when we are not scheduling a
2641        * BYE ourselves. */
2642       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2643           pinfo->current_time < sess->next_rtcp_check_time) {
2644         GstClockTime time_remaining;
2645
2646         /* Scale our next RTCP check time according to the change of numbers
2647          * of members. But only if a) this is the first RTCP, or b) this is not
2648          * a feedback session, or c) this is a feedback session but we schedule
2649          * for every RTCP interval (aka no t-rr-interval set).
2650          *
2651          * FIXME: a) and b) are not great as we will possibly go below Tmin
2652          * for non-feedback profiles and in case of a) below
2653          * Tmin/t-rr-interval in any case.
2654          */
2655         if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE ||
2656             !(sess->rtp_profile == GST_RTP_PROFILE_AVPF
2657                 || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) ||
2658             sess->next_rtcp_check_time - sess->last_rtcp_send_time ==
2659             sess->last_rtcp_interval) {
2660           time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2661           sess->next_rtcp_check_time =
2662               gst_util_uint64_scale (time_remaining, members, pmembers);
2663           sess->next_rtcp_check_time += pinfo->current_time;
2664         }
2665         sess->last_rtcp_interval =
2666             gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers);
2667
2668         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2669             GST_TIME_ARGS (sess->next_rtcp_check_time));
2670
2671         /* mark pending reconsider. We only want to signal the reconsideration
2672          * once after we handled all the source in the bye packet */
2673         reconsider = TRUE;
2674       }
2675     }
2676
2677     on_bye_ssrc (sess, source);
2678   }
2679   if (reconsider) {
2680     RTP_SESSION_UNLOCK (sess);
2681     /* notify app of reconsideration */
2682     if (sess->callbacks.reconsider)
2683       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2684     RTP_SESSION_LOCK (sess);
2685   }
2686
2687   g_free (reason);
2688 }
2689
2690 static void
2691 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2692     RTPPacketInfo * pinfo)
2693 {
2694   GST_DEBUG ("received APP");
2695
2696   if (g_signal_has_handler_pending (sess,
2697           rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, TRUE)) {
2698     GstBuffer *data_buffer = NULL;
2699     guint16 data_length;
2700     gchar name[5];
2701
2702     data_length = gst_rtcp_packet_app_get_data_length (packet) * 4;
2703     if (data_length > 0) {
2704       guint8 *data = gst_rtcp_packet_app_get_data (packet);
2705       data_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2706           GST_BUFFER_COPY_MEMORY, data - packet->rtcp->map.data, data_length);
2707       GST_BUFFER_PTS (data_buffer) = pinfo->running_time;
2708     }
2709
2710     memcpy (name, gst_rtcp_packet_app_get_name (packet), 4);
2711     name[4] = '\0';
2712
2713     RTP_SESSION_UNLOCK (sess);
2714     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_APP_RTCP], 0,
2715         gst_rtcp_packet_app_get_subtype (packet),
2716         gst_rtcp_packet_app_get_ssrc (packet), name, data_buffer);
2717     RTP_SESSION_LOCK (sess);
2718
2719     if (data_buffer)
2720       gst_buffer_unref (data_buffer);
2721   }
2722 }
2723
2724 static gboolean
2725 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2726     guint32 media_ssrc, gboolean fir, GstClockTime current_time)
2727 {
2728   guint32 round_trip = 0;
2729
2730   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
2731       &round_trip);
2732
2733   if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2734     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2735         GST_SECOND, 65536);
2736
2737     /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP
2738      * packets with erroneous values resulting in crazy high RTT. */
2739     if (round_trip_in_ns > 5 * GST_SECOND)
2740       round_trip_in_ns = GST_SECOND / 2;
2741
2742     if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) {
2743       GST_DEBUG ("Ignoring %s request from %X because one was send without one "
2744           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2745           fir ? "FIR" : "PLI", rtp_source_get_ssrc (src),
2746           GST_TIME_ARGS (current_time - src->last_keyframe_request),
2747           GST_TIME_ARGS (round_trip_in_ns));
2748       return FALSE;
2749     }
2750   }
2751
2752   src->last_keyframe_request = current_time;
2753
2754   GST_LOG ("received %s request from %X about %X %p(%p)", fir ? "FIR" : "PLI",
2755       rtp_source_get_ssrc (src), media_ssrc, sess->callbacks.process_rtp,
2756       sess->callbacks.request_key_unit);
2757
2758   RTP_SESSION_UNLOCK (sess);
2759   sess->callbacks.request_key_unit (sess, media_ssrc, fir,
2760       sess->request_key_unit_user_data);
2761   RTP_SESSION_LOCK (sess);
2762
2763   return TRUE;
2764 }
2765
2766 static void
2767 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2768     guint32 media_ssrc, GstClockTime current_time)
2769 {
2770   RTPSource *src;
2771
2772   if (!sess->callbacks.request_key_unit)
2773     return;
2774
2775   src = find_source (sess, sender_ssrc);
2776   if (src == NULL) {
2777     /* try to find a src with media_ssrc instead */
2778     src = find_source (sess, media_ssrc);
2779     if (src == NULL)
2780       return;
2781   }
2782
2783   rtp_session_request_local_key_unit (sess, src, media_ssrc, FALSE,
2784       current_time);
2785 }
2786
2787 static void
2788 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2789     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2790     GstClockTime current_time)
2791 {
2792   RTPSource *src;
2793   guint32 ssrc;
2794   guint position = 0;
2795   gboolean our_request = FALSE;
2796
2797   if (!sess->callbacks.request_key_unit)
2798     return;
2799
2800   if (fci_length < 8)
2801     return;
2802
2803   src = find_source (sess, sender_ssrc);
2804
2805   /* Hack because Google fails to set the sender_ssrc correctly */
2806   if (!src && sender_ssrc == 1) {
2807     GHashTableIter iter;
2808
2809     /* we can't find the source if there are multiple */
2810     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2811       return;
2812
2813     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2814     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2815       if (!src->internal && rtp_source_is_sender (src))
2816         break;
2817       src = NULL;
2818     }
2819   }
2820   if (!src)
2821     return;
2822
2823   for (position = 0; position < fci_length; position += 8) {
2824     guint8 *data = fci_data + position;
2825     RTPSource *own;
2826
2827     ssrc = GST_READ_UINT32_BE (data);
2828
2829     own = find_source (sess, ssrc);
2830     if (own == NULL)
2831       continue;
2832
2833     if (own->internal) {
2834       our_request = TRUE;
2835       break;
2836     }
2837   }
2838   if (!our_request)
2839     return;
2840
2841   rtp_session_request_local_key_unit (sess, src, media_ssrc, TRUE,
2842       current_time);
2843 }
2844
2845 static void
2846 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2847     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2848     GstClockTime current_time)
2849 {
2850   sess->stats.nacks_received++;
2851
2852   if (!sess->callbacks.notify_nack)
2853     return;
2854
2855   while (fci_length > 0) {
2856     guint16 seqnum, blp;
2857
2858     seqnum = GST_READ_UINT16_BE (fci_data);
2859     blp = GST_READ_UINT16_BE (fci_data + 2);
2860
2861     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2862
2863     RTP_SESSION_UNLOCK (sess);
2864     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2865         sess->notify_nack_user_data);
2866     RTP_SESSION_LOCK (sess);
2867
2868     fci_data += 4;
2869     fci_length -= 4;
2870   }
2871 }
2872
2873 static void
2874 rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc,
2875     guint32 media_ssrc, guint8 * fci_data, guint fci_length)
2876 {
2877   GArray *twcc_packets;
2878   GstStructure *twcc_packets_s;
2879   GstStructure *twcc_stats_s;
2880
2881   twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc,
2882       fci_data, fci_length * sizeof (guint32));
2883   if (twcc_packets == NULL)
2884     return;
2885
2886   twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets);
2887   twcc_stats_s =
2888       rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets);
2889
2890   GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s);
2891   GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s);
2892
2893   g_array_unref (twcc_packets);
2894
2895   RTP_SESSION_UNLOCK (sess);
2896   if (sess->callbacks.notify_twcc)
2897     sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s,
2898         sess->notify_twcc_user_data);
2899   RTP_SESSION_LOCK (sess);
2900 }
2901
2902 static void
2903 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2904     RTPPacketInfo * pinfo, GstClockTime current_time)
2905 {
2906   GstRTCPType type;
2907   GstRTCPFBType fbtype;
2908   guint32 sender_ssrc, media_ssrc;
2909   guint8 *fci_data;
2910   guint fci_length;
2911   RTPSource *src;
2912
2913   /* The feedback packet must include both sender SSRC and media SSRC */
2914   if (packet->length < 2)
2915     return;
2916
2917   type = gst_rtcp_packet_get_type (packet);
2918   fbtype = gst_rtcp_packet_fb_get_type (packet);
2919   sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2920   media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2921
2922   src = find_source (sess, media_ssrc);
2923
2924   /* skip non-bye packets for sources that are marked BYE */
2925   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
2926     return;
2927
2928   if (src)
2929     g_object_ref (src);
2930
2931   fci_data = gst_rtcp_packet_fb_get_fci (packet);
2932   fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32);
2933
2934   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2935       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2936
2937   if (g_signal_has_handler_pending (sess,
2938           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2939     GstBuffer *fci_buffer = NULL;
2940
2941     if (fci_length > 0) {
2942       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2943           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2944           fci_length);
2945       GST_BUFFER_PTS (fci_buffer) = pinfo->running_time;
2946     }
2947
2948     RTP_SESSION_UNLOCK (sess);
2949     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2950         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2951     RTP_SESSION_LOCK (sess);
2952
2953     if (fci_buffer)
2954       gst_buffer_unref (fci_buffer);
2955   }
2956
2957   if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) {
2958     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
2959   }
2960
2961   if ((src && src->internal) ||
2962       /* PSFB FIR puts the media ssrc inside the FCI */
2963       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) ||
2964       /* TWCC is for all sources, so a single media-ssrc is not enough */
2965       (type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) {
2966     switch (type) {
2967       case GST_RTCP_TYPE_PSFB:
2968         switch (fbtype) {
2969           case GST_RTCP_PSFB_TYPE_PLI:
2970             if (src)
2971               src->stats.recv_pli_count++;
2972             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2973                 current_time);
2974             break;
2975           case GST_RTCP_PSFB_TYPE_FIR:
2976             if (src)
2977               src->stats.recv_fir_count++;
2978             rtp_session_process_fir (sess, sender_ssrc, media_ssrc, fci_data,
2979                 fci_length, current_time);
2980             break;
2981           default:
2982             break;
2983         }
2984         break;
2985       case GST_RTCP_TYPE_RTPFB:
2986         switch (fbtype) {
2987           case GST_RTCP_RTPFB_TYPE_NACK:
2988             if (src)
2989               src->stats.recv_nack_count++;
2990             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
2991                 fci_data, fci_length, current_time);
2992             break;
2993           case GST_RTCP_RTPFB_TYPE_TWCC:
2994             rtp_session_process_twcc (sess, sender_ssrc, media_ssrc,
2995                 fci_data, fci_length);
2996             break;
2997           default:
2998             break;
2999         }
3000       default:
3001         break;
3002     }
3003   }
3004
3005   if (src)
3006     g_object_unref (src);
3007 }
3008
3009 /**
3010  * rtp_session_process_rtcp:
3011  * @sess: and #RTPSession
3012  * @buffer: an RTCP buffer
3013  * @current_time: the current system time
3014  * @ntpnstime: the current NTP time in nanoseconds
3015  *
3016  * Process an RTCP buffer in the session manager. This function takes ownership
3017  * of @buffer.
3018  *
3019  * Returns: a #GstFlowReturn.
3020  */
3021 GstFlowReturn
3022 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
3023     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
3024 {
3025   GstRTCPPacket packet;
3026   gboolean more, is_bye = FALSE, do_sync = FALSE;
3027   RTPPacketInfo pinfo = { 0, };
3028   GstFlowReturn result = GST_FLOW_OK;
3029   GstRTCPBuffer rtcp = { NULL, };
3030
3031   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3032   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3033
3034   if (!gst_rtcp_buffer_validate_reduced (buffer))
3035     goto invalid_packet;
3036
3037   GST_DEBUG ("received RTCP packet");
3038
3039   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
3040       buffer);
3041
3042   RTP_SESSION_LOCK (sess);
3043   /* update pinfo stats */
3044   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
3045       running_time, ntpnstime);
3046
3047   /* start processing the compound packet */
3048   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3049   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
3050   while (more) {
3051     GstRTCPType type;
3052
3053     type = gst_rtcp_packet_get_type (&packet);
3054
3055     switch (type) {
3056       case GST_RTCP_TYPE_SR:
3057         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
3058         break;
3059       case GST_RTCP_TYPE_RR:
3060         rtp_session_process_rr (sess, &packet, &pinfo);
3061         break;
3062       case GST_RTCP_TYPE_SDES:
3063         rtp_session_process_sdes (sess, &packet, &pinfo);
3064         break;
3065       case GST_RTCP_TYPE_BYE:
3066         is_bye = TRUE;
3067         /* don't try to attempt lip-sync anymore for streams with a BYE */
3068         do_sync = FALSE;
3069         rtp_session_process_bye (sess, &packet, &pinfo);
3070         break;
3071       case GST_RTCP_TYPE_APP:
3072         rtp_session_process_app (sess, &packet, &pinfo);
3073         break;
3074       case GST_RTCP_TYPE_RTPFB:
3075       case GST_RTCP_TYPE_PSFB:
3076         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
3077         break;
3078       case GST_RTCP_TYPE_XR:
3079         /* FIXME: This block is added to downgrade warning level.
3080          * Once the parser is implemented, it should be replaced with
3081          * a proper process function. */
3082         GST_DEBUG ("got RTCP XR packet, but ignored");
3083         break;
3084       default:
3085         GST_WARNING ("got unknown RTCP packet type: %d", type);
3086         break;
3087     }
3088     more = gst_rtcp_packet_move_to_next (&packet);
3089   }
3090
3091   gst_rtcp_buffer_unmap (&rtcp);
3092
3093   /* if we are scheduling a BYE, we only want to count bye packets, else we
3094    * count everything */
3095   if (sess->scheduled_bye && is_bye) {
3096     sess->bye_stats.bye_members++;
3097     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
3098   }
3099
3100   /* keep track of average packet size */
3101   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3102
3103   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
3104       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
3105   RTP_SESSION_UNLOCK (sess);
3106
3107   pinfo.data = NULL;
3108   clean_packet_info (&pinfo);
3109
3110   /* notify caller of sr packets in the callback */
3111   if (do_sync && sess->callbacks.sync_rtcp) {
3112     result = sess->callbacks.sync_rtcp (sess, buffer,
3113         sess->sync_rtcp_user_data);
3114   } else
3115     gst_buffer_unref (buffer);
3116
3117   return result;
3118
3119   /* ERRORS */
3120 invalid_packet:
3121   {
3122     GST_DEBUG ("invalid RTCP packet received");
3123     gst_buffer_unref (buffer);
3124     return GST_FLOW_OK;
3125   }
3126 }
3127
3128 /**
3129  * rtp_session_update_send_caps:
3130  * @sess: an #RTPSession
3131  * @caps: a #GstCaps
3132  *
3133  * Update the caps of the sender in the rtp session.
3134  */
3135 void
3136 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
3137 {
3138   GstStructure *s;
3139   guint ssrc;
3140
3141   g_return_if_fail (RTP_IS_SESSION (sess));
3142   g_return_if_fail (GST_IS_CAPS (caps));
3143
3144   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
3145
3146   s = gst_caps_get_structure (caps, 0);
3147
3148   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
3149     RTPSource *source;
3150     gboolean created;
3151
3152     RTP_SESSION_LOCK (sess);
3153     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3154     sess->suggested_ssrc = ssrc;
3155     sess->internal_ssrc_set = TRUE;
3156     sess->internal_ssrc_from_caps_or_property = TRUE;
3157     if (source) {
3158       rtp_source_update_caps (source, caps);
3159
3160       if (created)
3161         on_new_sender_ssrc (sess, source);
3162
3163       g_object_unref (source);
3164     }
3165
3166     if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
3167       source =
3168           obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3169       if (source) {
3170         rtp_source_update_caps (source, caps);
3171
3172         if (created)
3173           on_new_sender_ssrc (sess, source);
3174
3175         g_object_unref (source);
3176       }
3177     }
3178     RTP_SESSION_UNLOCK (sess);
3179   } else {
3180     sess->internal_ssrc_from_caps_or_property = FALSE;
3181   }
3182
3183   rtp_twcc_manager_parse_send_ext_id (sess->twcc, s);
3184 }
3185
3186 /**
3187  * rtp_session_send_rtp:
3188  * @sess: an #RTPSession
3189  * @data: pointer to either an RTP buffer or a list of RTP buffers
3190  * @is_list: TRUE when @data is a buffer list
3191  * @current_time: the current system time
3192  * @running_time: the running time of @data
3193  *
3194  * Send the RTP data (a buffer or buffer list) in the session manager. This
3195  * function takes ownership of @data.
3196  *
3197  * Returns: a #GstFlowReturn.
3198  */
3199 GstFlowReturn
3200 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
3201     GstClockTime current_time, GstClockTime running_time)
3202 {
3203   GstFlowReturn result;
3204   RTPSource *source;
3205   gboolean prevsender;
3206   guint64 oldrate;
3207   RTPPacketInfo pinfo = { 0, };
3208   gboolean created;
3209
3210   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3211   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
3212
3213   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
3214
3215   RTP_SESSION_LOCK (sess);
3216   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
3217           current_time, running_time, -1))
3218     goto invalid_packet;
3219
3220   rtp_twcc_manager_send_packet (sess->twcc, &pinfo);
3221
3222   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
3223   if (created)
3224     on_new_sender_ssrc (sess, source);
3225
3226   if (!source->internal) {
3227     GSocketAddress *from;
3228
3229     if (source->rtp_from)
3230       from = source->rtp_from;
3231     else
3232       from = source->rtcp_from;
3233     if (from) {
3234       if (rtp_session_find_conflicting_address (sess, from, current_time)) {
3235         /* Its a known conflict, its probably a loop, not a collision
3236          * lets just drop the incoming packet
3237          */
3238         GST_LOG ("Our packets are being looped back to us, ignoring collision");
3239       } else {
3240         GST_DEBUG ("Collision for SSRC %x, change our sender ssrc", pinfo.ssrc);
3241
3242         rtp_session_have_conflict (sess, source, from, current_time);
3243       }
3244     } else {
3245       GST_LOG ("Ignoring collision on sent SSRC %x because remote source"
3246           " doesn't have an address", pinfo.ssrc);
3247     }
3248
3249     /* the the sending source is not internal, we have to drop the packet,
3250        or else we will end up receving it ourselves! */
3251     goto collision;
3252   }
3253
3254   prevsender = RTP_SOURCE_IS_SENDER (source);
3255   oldrate = source->bitrate;
3256
3257   /* we use our own source to send */
3258   result = rtp_source_send_rtp (source, &pinfo);
3259
3260   source_update_sender (sess, source, prevsender);
3261
3262   if (oldrate != source->bitrate)
3263     sess->recalc_bandwidth = TRUE;
3264   RTP_SESSION_UNLOCK (sess);
3265
3266   g_object_unref (source);
3267   clean_packet_info (&pinfo);
3268
3269   return result;
3270
3271 invalid_packet:
3272   {
3273     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
3274     RTP_SESSION_UNLOCK (sess);
3275     GST_DEBUG ("invalid RTP packet received");
3276     return GST_FLOW_OK;
3277   }
3278 collision:
3279   {
3280     g_object_unref (source);
3281     clean_packet_info (&pinfo);
3282     RTP_SESSION_UNLOCK (sess);
3283     GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
3284         pinfo.ssrc);
3285     return GST_FLOW_OK;
3286   }
3287 }
3288
3289 static void
3290 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
3291 {
3292   *bandwidth += source->bitrate;
3293 }
3294
3295 /* must be called with session lock */
3296 static GstClockTime
3297 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
3298     gboolean first)
3299 {
3300   GstClockTime result;
3301   RTPSessionStats *stats;
3302
3303   /* recalculate bandwidth when it changed */
3304   if (sess->recalc_bandwidth) {
3305     gdouble bandwidth;
3306
3307     if (sess->bandwidth > 0)
3308       bandwidth = sess->bandwidth;
3309     else {
3310       /* If it is <= 0, then try to estimate the actual bandwidth */
3311       bandwidth = 0;
3312
3313       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3314           (GHFunc) add_bitrates, &bandwidth);
3315     }
3316     if (bandwidth < RTP_STATS_BANDWIDTH)
3317       bandwidth = RTP_STATS_BANDWIDTH;
3318
3319     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
3320         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
3321
3322     sess->recalc_bandwidth = FALSE;
3323   }
3324
3325   if (sess->scheduled_bye) {
3326     stats = &sess->bye_stats;
3327     result = rtp_stats_calculate_bye_interval (stats);
3328   } else {
3329     session_update_ptp (sess);
3330
3331     stats = &sess->stats;
3332     result = rtp_stats_calculate_rtcp_interval (stats,
3333         stats->internal_sender_sources > 0, sess->rtp_profile,
3334         sess->is_doing_ptp, first);
3335   }
3336
3337   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
3338       GST_TIME_ARGS (result), first);
3339
3340   if (!deterministic && result != GST_CLOCK_TIME_NONE)
3341     result = rtp_stats_add_rtcp_jitter (stats, result);
3342
3343   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
3344
3345   return result;
3346 }
3347
3348 static void
3349 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
3350 {
3351   if (source->internal)
3352     rtp_source_mark_bye (source, reason);
3353 }
3354
3355 /**
3356  * rtp_session_mark_all_bye:
3357  * @sess: an #RTPSession
3358  * @reason: a reason
3359  *
3360  * Mark all internal sources of the session as BYE with @reason.
3361  */
3362 void
3363 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
3364 {
3365   g_return_if_fail (RTP_IS_SESSION (sess));
3366
3367   RTP_SESSION_LOCK (sess);
3368   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3369       (GHFunc) source_mark_bye, (gpointer) reason);
3370   RTP_SESSION_UNLOCK (sess);
3371 }
3372
3373 /* Stop the current @sess and schedule a BYE message for the other members.
3374  * One must have the session lock to call this function
3375  */
3376 static GstFlowReturn
3377 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
3378 {
3379   GstFlowReturn result = GST_FLOW_OK;
3380   GstClockTime interval;
3381
3382   /* nothing to do it we already scheduled bye */
3383   if (sess->scheduled_bye)
3384     goto done;
3385
3386   /* we schedule BYE now */
3387   sess->scheduled_bye = TRUE;
3388   /* at least one member wants to send a BYE */
3389   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
3390   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
3391   sess->bye_stats.bye_members = 1;
3392   sess->first_rtcp = TRUE;
3393
3394   /* reschedule transmission */
3395   sess->last_rtcp_send_time = current_time;
3396   sess->last_rtcp_check_time = current_time;
3397   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3398
3399   if (interval != GST_CLOCK_TIME_NONE)
3400     sess->next_rtcp_check_time = current_time + interval;
3401   else
3402     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
3403   sess->last_rtcp_interval = interval;
3404
3405   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
3406       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
3407
3408   RTP_SESSION_UNLOCK (sess);
3409   /* notify app of reconsideration */
3410   if (sess->callbacks.reconsider)
3411     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3412   RTP_SESSION_LOCK (sess);
3413 done:
3414
3415   return result;
3416 }
3417
3418 /**
3419  * rtp_session_schedule_bye:
3420  * @sess: an #RTPSession
3421  * @current_time: the current system time
3422  *
3423  * Schedule a BYE message for all sources marked as BYE in @sess.
3424  *
3425  * Returns: a #GstFlowReturn.
3426  */
3427 GstFlowReturn
3428 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
3429 {
3430   GstFlowReturn result;
3431
3432   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3433
3434   RTP_SESSION_LOCK (sess);
3435   result = rtp_session_schedule_bye_locked (sess, current_time);
3436   RTP_SESSION_UNLOCK (sess);
3437
3438   return result;
3439 }
3440
3441 /**
3442  * rtp_session_next_timeout:
3443  * @sess: an #RTPSession
3444  * @current_time: the current system time
3445  *
3446  * Get the next time we should perform session maintenance tasks.
3447  *
3448  * Returns: a time when rtp_session_on_timeout() should be called with the
3449  * current system time.
3450  */
3451 GstClockTime
3452 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
3453 {
3454   GstClockTime result, interval = 0;
3455
3456   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
3457
3458   RTP_SESSION_LOCK (sess);
3459
3460   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3461     GST_DEBUG ("have early rtcp time");
3462     result = sess->next_early_rtcp_time;
3463     goto early_exit;
3464   }
3465
3466   result = sess->next_rtcp_check_time;
3467
3468   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3469       ", next time: %" GST_TIME_FORMAT,
3470       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3471
3472   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
3473     GST_DEBUG ("take current time as base");
3474     /* our previous check time expired, start counting from the current time
3475      * again. */
3476     result = current_time;
3477   }
3478
3479   if (sess->scheduled_bye) {
3480     if (sess->bye_stats.active_sources >= 50) {
3481       GST_DEBUG ("reconsider BYE, more than 50 sources");
3482       /* reconsider BYE if members >= 50 */
3483       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3484       sess->last_rtcp_interval = interval;
3485     }
3486   } else {
3487     if (sess->first_rtcp) {
3488       GST_DEBUG ("first RTCP packet");
3489       /* we are called for the first time */
3490       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3491       sess->last_rtcp_interval = interval;
3492     } else if (sess->next_rtcp_check_time < current_time) {
3493       GST_DEBUG ("old check time expired, getting new timeout");
3494       /* get a new timeout when we need to */
3495       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
3496       sess->last_rtcp_interval = interval;
3497
3498       if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
3499               || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
3500           && interval != GST_CLOCK_TIME_NONE) {
3501         /* Apply the rules from RFC 4585 section 3.5.3 */
3502         if (sess->stats.min_interval != 0) {
3503           GstClockTime T_rr_current_interval = g_random_double_range (0.5,
3504               1.5) * sess->stats.min_interval * GST_SECOND;
3505
3506           if (T_rr_current_interval > interval) {
3507             GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
3508                 " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
3509                 GST_TIME_ARGS (interval));
3510             interval = T_rr_current_interval;
3511           }
3512         }
3513       }
3514     }
3515   }
3516
3517   if (interval != GST_CLOCK_TIME_NONE)
3518     result += interval;
3519   else
3520     result = GST_CLOCK_TIME_NONE;
3521
3522   sess->next_rtcp_check_time = result;
3523
3524 early_exit:
3525
3526   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3527       ", next time: %" GST_TIME_FORMAT,
3528       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3529   RTP_SESSION_UNLOCK (sess);
3530
3531   return result;
3532 }
3533
3534 typedef struct
3535 {
3536   RTPSource *source;
3537   gboolean is_bye;
3538   GstBuffer *buffer;
3539 } ReportOutput;
3540
3541 typedef struct
3542 {
3543   GstRTCPBuffer rtcpbuf;
3544   RTPSession *sess;
3545   RTPSource *source;
3546   guint num_to_report;
3547   gboolean have_fir;
3548   gboolean have_pli;
3549   gboolean have_nack;
3550   GstBuffer *rtcp;
3551   GstClockTime current_time;
3552   guint64 ntpnstime;
3553   GstClockTime running_time;
3554   GstClockTime interval;
3555   GstRTCPPacket packet;
3556   gboolean has_sdes;
3557   gboolean is_early;
3558   gboolean may_suppress;
3559   GQueue output;
3560   guint nacked_seqnums;
3561 } ReportData;
3562
3563 static void
3564 session_start_rtcp (RTPSession * sess, ReportData * data)
3565 {
3566   GstRTCPPacket *packet = &data->packet;
3567   RTPSource *own = data->source;
3568   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3569
3570   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
3571   data->has_sdes = FALSE;
3572
3573   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3574
3575   if (data->is_early && sess->reduced_size_rtcp)
3576     return;
3577
3578   if (RTP_SOURCE_IS_SENDER (own)) {
3579     guint64 ntptime;
3580     guint32 rtptime;
3581     guint32 packet_count, octet_count;
3582
3583     /* we are a sender, create SR */
3584     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3585     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3586
3587     /* get latest stats */
3588     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3589         &ntptime, &rtptime, &packet_count, &octet_count);
3590     /* store stats */
3591     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3592         packet_count, octet_count);
3593
3594     /* fill in sender report info */
3595     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3596         sess->timestamp_sender_reports ? ntptime : 0,
3597         sess->timestamp_sender_reports ? rtptime : 0,
3598         packet_count, octet_count);
3599   } else {
3600     /* we are only receiver, create RR */
3601     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3602     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3603     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3604   }
3605 }
3606
3607 /* construct a Sender or Receiver Report */
3608 static void
3609 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3610 {
3611   RTPSession *sess = data->sess;
3612   GstRTCPPacket *packet = &data->packet;
3613   guint8 fractionlost;
3614   gint32 packetslost;
3615   guint32 exthighestseq, jitter;
3616   guint32 lsr, dlsr;
3617
3618   /* don't report for sources in future generations */
3619   if (((gint16) (source->generation - sess->generation)) > 0) {
3620     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3621         source->generation, sess->generation);
3622     return;
3623   }
3624
3625   if (g_hash_table_contains (source->reported_in_sr_of,
3626           GUINT_TO_POINTER (data->source->ssrc))) {
3627     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3628     return;
3629   }
3630
3631   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3632     GST_DEBUG ("max RB count reached");
3633     return;
3634   }
3635
3636   /* only report about remote sources */
3637   if (source->internal)
3638     goto reported;
3639
3640   if (!RTP_SOURCE_IS_SENDER (source)) {
3641     GST_DEBUG ("source %08x not sender", source->ssrc);
3642     goto reported;
3643   }
3644
3645   if (source->disable_rtcp) {
3646     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
3647     goto reported;
3648   }
3649
3650   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3651
3652   /* get new stats */
3653   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3654       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3655
3656   /* store last generated RR packet */
3657   source->last_rr.is_valid = TRUE;
3658   source->last_rr.ssrc = data->source->ssrc;
3659   source->last_rr.fractionlost = fractionlost;
3660   source->last_rr.packetslost = packetslost;
3661   source->last_rr.exthighestseq = exthighestseq;
3662   source->last_rr.jitter = jitter;
3663   source->last_rr.lsr = lsr;
3664   source->last_rr.dlsr = dlsr;
3665
3666   /* packet is not yet filled, add report block for this source. */
3667   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3668       exthighestseq, jitter, lsr, dlsr);
3669
3670 reported:
3671   g_hash_table_add (source->reported_in_sr_of,
3672       GUINT_TO_POINTER (data->source->ssrc));
3673 }
3674
3675 /* construct FIR */
3676 static void
3677 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3678 {
3679   GstRTCPPacket *packet = &data->packet;
3680   guint16 len;
3681   guint8 *fci_data;
3682
3683   if (!source->send_fir)
3684     return;
3685
3686   len = gst_rtcp_packet_fb_get_fci_length (packet);
3687   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3688     /* exit because the packet is full, will put next request in a
3689      * further packet */
3690     return;
3691
3692   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3693
3694   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3695   fci_data += 4;
3696   fci_data[0] = source->current_send_fir_seqnum;
3697   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3698
3699   source->send_fir = FALSE;
3700   source->stats.sent_fir_count++;
3701 }
3702
3703 static void
3704 session_fir (RTPSession * sess, ReportData * data)
3705 {
3706   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3707   GstRTCPPacket *packet = &data->packet;
3708
3709   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3710     return;
3711
3712   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3713   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3714   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3715
3716   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3717       (GHFunc) session_add_fir, data);
3718
3719   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3720     gst_rtcp_packet_remove (packet);
3721   else
3722     data->may_suppress = FALSE;
3723 }
3724
3725 static gboolean
3726 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3727 {
3728   GstRTCPPacket packet;
3729   GstRTCPBuffer rtcp = { NULL, };
3730   gboolean ret = FALSE;
3731
3732   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3733
3734   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3735     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3736         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3737       ret = TRUE;
3738   }
3739
3740   gst_rtcp_buffer_unmap (&rtcp);
3741
3742   return ret;
3743 }
3744
3745 /* construct PLI */
3746 static void
3747 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3748 {
3749   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3750   GstRTCPPacket *packet = &data->packet;
3751
3752   if (!source->send_pli)
3753     return;
3754
3755   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3756     return;
3757
3758   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3759     /* exit because the packet is full, will put next request in a
3760      * further packet */
3761     return;
3762
3763   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
3764   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3765   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3766
3767   source->send_pli = FALSE;
3768   data->may_suppress = FALSE;
3769
3770   source->stats.sent_pli_count++;
3771 }
3772
3773 /* construct NACK */
3774 static void
3775 session_nack (const gchar * key, RTPSource * source, ReportData * data)
3776 {
3777   RTPSession *sess = data->sess;
3778   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3779   GstRTCPPacket *packet = &data->packet;
3780   guint16 *nacks;
3781   GstClockTime *nack_deadlines;
3782   guint n_nacks, i = 0;
3783   guint nacked_seqnums = 0;
3784   guint16 n_fb_nacks = 0;
3785   guint8 *fci_data;
3786
3787   if (!source->send_nack)
3788     return;
3789
3790   nacks = rtp_source_get_nacks (source, &n_nacks);
3791   nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
3792   GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
3793       GST_TIME_ARGS (data->current_time));
3794
3795   /* cleanup expired nacks */
3796   for (i = 0; i < n_nacks; i++) {
3797     GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
3798         GST_TIME_ARGS (nack_deadlines[i]));
3799     if (nack_deadlines[i] >= data->current_time)
3800       break;
3801   }
3802
3803   if (data->is_early) {
3804     /* don't remove them all if this is an early RTCP packet. It may happen
3805      * that the NACKs are late due to high RTT, not sending NACKs at all would
3806      * keep the RTX RTT stats high and maintain a dropping state. */
3807     i = MIN (n_nacks - 1, i);
3808   }
3809
3810   if (i) {
3811     GST_WARNING ("Removing %u expired NACKS", i);
3812     rtp_source_clear_nacks (source, i);
3813     n_nacks -= i;
3814     if (n_nacks == 0)
3815       return;
3816   }
3817
3818   /* allow overriding NACK to packet conversion */
3819   if (g_signal_has_handler_pending (sess,
3820           rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) {
3821     /* this is needed as it will actually resize the buffer */
3822     gst_rtcp_buffer_unmap (rtcp);
3823
3824     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0,
3825         data->source->ssrc, source->ssrc, source->nacks, data->rtcp,
3826         &nacked_seqnums);
3827
3828     /* and now remap for the remaining work */
3829     gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3830
3831     if (nacked_seqnums > 0)
3832       goto done;
3833   }
3834
3835   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
3836     /* exit because the packet is full, will put next request in a
3837      * further packet */
3838     return;
3839
3840   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
3841   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3842   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3843
3844   if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
3845     gst_rtcp_packet_remove (packet);
3846     GST_WARNING ("no nacks fit in the packet");
3847     return;
3848   }
3849
3850   fci_data = gst_rtcp_packet_fb_get_fci (packet);
3851   for (i = 0; i < n_nacks; i = nacked_seqnums) {
3852     guint16 seqnum = nacks[i];
3853     guint16 blp = 0;
3854     guint j;
3855
3856     if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
3857       break;
3858
3859     n_fb_nacks++;
3860     nacked_seqnums++;
3861
3862     for (j = i + 1; j < n_nacks; j++) {
3863       gint diff;
3864
3865       diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
3866       GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
3867       if (diff > 16)
3868         break;
3869
3870       blp |= 1 << (diff - 1);
3871       nacked_seqnums++;
3872     }
3873
3874     GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
3875     fci_data += 4;
3876   }
3877
3878   GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
3879   source->stats.sent_nack_count += n_fb_nacks;
3880
3881 done:
3882   data->nacked_seqnums += nacked_seqnums;
3883   rtp_source_clear_nacks (source, nacked_seqnums);
3884   data->may_suppress = FALSE;
3885 }
3886
3887 /* perform cleanup of sources that timed out */
3888 static void
3889 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
3890 {
3891   gboolean remove = FALSE;
3892   gboolean byetimeout = FALSE;
3893   gboolean sendertimeout = FALSE;
3894   gboolean is_sender, is_active;
3895   RTPSession *sess = data->sess;
3896   GstClockTime interval, binterval;
3897   GstClockTime btime;
3898
3899   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
3900
3901   /* check for outdated collisions */
3902   if (source->internal) {
3903     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
3904     rtp_source_timeout (source, data->current_time, data->running_time,
3905         sess->rtcp_feedback_retention_window);
3906   }
3907
3908   /* nothing else to do when without RTCP */
3909   if (data->interval == GST_CLOCK_TIME_NONE)
3910     return;
3911
3912   is_sender = RTP_SOURCE_IS_SENDER (source);
3913   is_active = RTP_SOURCE_IS_ACTIVE (source);
3914
3915   /* our own rtcp interval may have been forced low by secondary configuration,
3916    * while sender side may still operate with higher interval,
3917    * so do not just take our interval to decide on timing out sender,
3918    * but take (if data->interval <= 5 * GST_SECOND):
3919    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
3920    * where sender_interval is difference between last 2 received RTCP reports
3921    */
3922   if (data->interval >= 5 * GST_SECOND || source->internal) {
3923     binterval = data->interval;
3924   } else {
3925     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
3926         GST_TIME_ARGS (source->stats.prev_rtcptime),
3927         GST_TIME_ARGS (source->stats.last_rtcptime));
3928     /* if not received enough yet, fallback to larger default */
3929     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
3930       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
3931     else
3932       binterval = 5 * GST_SECOND;
3933     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
3934   }
3935   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
3936       GST_TIME_ARGS (binterval));
3937
3938   if (!source->internal && source->marked_bye) {
3939     /* if we received a BYE from the source, remove the source after some
3940      * time. */
3941     if (data->current_time > source->bye_time &&
3942         data->current_time - source->bye_time > sess->stats.bye_timeout) {
3943       GST_DEBUG ("removing BYE source %08x", source->ssrc);
3944       remove = TRUE;
3945       byetimeout = TRUE;
3946     }
3947   }
3948
3949   if (source->internal && source->sent_bye) {
3950     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
3951     remove = TRUE;
3952   }
3953
3954   /* sources that were inactive for more than 5 times the deterministic reporting
3955    * interval get timed out. the min timeout is 5 seconds. */
3956   /* mind old time that might pre-date last time going to PLAYING */
3957   btime = MAX (source->last_activity, sess->start_time);
3958   if (data->current_time > btime) {
3959     interval = MAX (binterval * 5, 5 * GST_SECOND);
3960     if (data->current_time - btime > interval) {
3961       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
3962           source->ssrc, GST_TIME_ARGS (btime));
3963       if (source->internal) {
3964         /* this is an internal source that is not using our suggested ssrc.
3965          * since there must be another source using this ssrc, we can remove
3966          * this one instead of making it a receiver forever */
3967         if (source->ssrc != sess->suggested_ssrc) {
3968           rtp_source_mark_bye (source, "timed out");
3969           /* do not schedule bye here, since we are inside the RTCP timeout
3970            * processing and scheduling bye will interfere with SR/RR sending */
3971         }
3972       } else {
3973         remove = TRUE;
3974       }
3975     }
3976   }
3977
3978   /* senders that did not send for a long time become a receiver, this also
3979    * holds for our own sources. */
3980   if (is_sender) {
3981     /* mind old time that might pre-date last time going to PLAYING */
3982     btime = MAX (source->last_rtp_activity, sess->start_time);
3983     if (data->current_time > btime) {
3984       interval = MAX (binterval * 2, 5 * GST_SECOND);
3985       if (data->current_time - btime > interval) {
3986         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
3987             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3988         sendertimeout = TRUE;
3989       }
3990     }
3991   }
3992
3993   if (remove) {
3994     sess->total_sources--;
3995     if (is_sender) {
3996       sess->stats.sender_sources--;
3997       if (source->internal)
3998         sess->stats.internal_sender_sources--;
3999     }
4000     if (is_active)
4001       sess->stats.active_sources--;
4002
4003     if (source->internal)
4004       sess->stats.internal_sources--;
4005
4006     if (byetimeout)
4007       on_bye_timeout (sess, source);
4008     else
4009       on_timeout (sess, source);
4010   } else {
4011     if (sendertimeout) {
4012       source->is_sender = FALSE;
4013       sess->stats.sender_sources--;
4014       if (source->internal)
4015         sess->stats.internal_sender_sources--;
4016
4017       on_sender_timeout (sess, source);
4018     }
4019     /* count how many source to report in this generation */
4020     if (((gint16) (source->generation - sess->generation)) <= 0)
4021       data->num_to_report++;
4022   }
4023   source->closing = remove;
4024 }
4025
4026 static void
4027 session_sdes (RTPSession * sess, ReportData * data)
4028 {
4029   GstRTCPPacket *packet = &data->packet;
4030   const GstStructure *sdes;
4031   gint i, n_fields;
4032   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4033
4034   /* add SDES packet */
4035   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
4036
4037   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
4038
4039   sdes = rtp_source_get_sdes_struct (data->source);
4040
4041   /* add all fields in the structure, the order is not important. */
4042   n_fields = gst_structure_n_fields (sdes);
4043   for (i = 0; i < n_fields; ++i) {
4044     const gchar *field;
4045     const gchar *value;
4046     GstRTCPSDESType type;
4047
4048     field = gst_structure_nth_field_name (sdes, i);
4049     if (field == NULL)
4050       continue;
4051     value = gst_structure_get_string (sdes, field);
4052     if (value == NULL)
4053       continue;
4054     type = gst_rtcp_sdes_name_to_type (field);
4055
4056     /* Early packets are minimal and only include the CNAME */
4057     if (data->is_early && type != GST_RTCP_SDES_CNAME)
4058       continue;
4059
4060     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
4061       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
4062           (const guint8 *) value);
4063     } else if (type == GST_RTCP_SDES_PRIV) {
4064       gsize prefix_len;
4065       gsize value_len;
4066       gsize data_len;
4067       guint8 data[256];
4068
4069       /* don't accept entries that are too big */
4070       prefix_len = strlen (field);
4071       if (prefix_len > 255)
4072         continue;
4073       value_len = strlen (value);
4074       if (value_len > 255)
4075         continue;
4076       data_len = 1 + prefix_len + value_len;
4077       if (data_len > 255)
4078         continue;
4079
4080       data[0] = prefix_len;
4081       memcpy (&data[1], field, prefix_len);
4082       memcpy (&data[1 + prefix_len], value, value_len);
4083
4084       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
4085     }
4086   }
4087
4088   data->has_sdes = TRUE;
4089 }
4090
4091 /* schedule a BYE packet */
4092 static void
4093 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
4094 {
4095   GstRTCPPacket *packet = &data->packet;
4096   GstRTCPBuffer *rtcp = &data->rtcpbuf;
4097
4098   /* add SDES */
4099   session_sdes (sess, data);
4100   /* add a BYE packet */
4101   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
4102   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
4103   if (source->bye_reason)
4104     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
4105
4106   /* we have a BYE packet now */
4107   source->sent_bye = TRUE;
4108 }
4109
4110 static gboolean
4111 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
4112 {
4113   GstClockTime new_send_time;
4114   GstClockTime interval;
4115   RTPSessionStats *stats;
4116
4117   if (sess->scheduled_bye)
4118     stats = &sess->bye_stats;
4119   else
4120     stats = &sess->stats;
4121
4122   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
4123     data->is_early = TRUE;
4124   else
4125     data->is_early = FALSE;
4126
4127   if (data->is_early && sess->next_early_rtcp_time <= current_time) {
4128     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %"
4129         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
4130         GST_TIME_ARGS (current_time));
4131   } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
4132       sess->next_rtcp_check_time > current_time) {
4133     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
4134         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
4135         GST_TIME_ARGS (current_time));
4136     return FALSE;
4137   }
4138
4139   /* take interval and add jitter */
4140   interval = data->interval;
4141   if (interval != GST_CLOCK_TIME_NONE)
4142     interval = rtp_stats_add_rtcp_jitter (stats, interval);
4143
4144   if (sess->last_rtcp_check_time != GST_CLOCK_TIME_NONE) {
4145     /* perform forward reconsideration */
4146     if (interval != GST_CLOCK_TIME_NONE) {
4147       GstClockTime elapsed;
4148
4149       /* get elapsed time since we last reported */
4150       elapsed = current_time - sess->last_rtcp_check_time;
4151
4152       GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
4153           GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
4154       new_send_time = interval + sess->last_rtcp_check_time;
4155     } else {
4156       new_send_time = sess->last_rtcp_check_time;
4157     }
4158   } else {
4159     /* If this is the first RTCP packet, we can reconsider anything based
4160      * on the last RTCP send time because there was none.
4161      */
4162     g_warn_if_fail (!data->is_early);
4163     data->is_early = FALSE;
4164     new_send_time = current_time;
4165   }
4166
4167   if (!data->is_early) {
4168     /* check if reconsideration */
4169     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
4170       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
4171           GST_TIME_ARGS (new_send_time));
4172       /* store new check time */
4173       sess->next_rtcp_check_time = new_send_time;
4174       sess->last_rtcp_interval = interval;
4175       return FALSE;
4176     }
4177
4178     sess->last_rtcp_interval = interval;
4179     if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
4180             || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
4181         && interval != GST_CLOCK_TIME_NONE) {
4182       /* Apply the rules from RFC 4585 section 3.5.3 */
4183       if (stats->min_interval != 0 && !sess->first_rtcp) {
4184         GstClockTime T_rr_current_interval =
4185             g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND;
4186
4187         if (T_rr_current_interval > interval) {
4188           GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
4189               " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
4190               GST_TIME_ARGS (interval));
4191           interval = T_rr_current_interval;
4192         }
4193       }
4194     }
4195     sess->next_rtcp_check_time = current_time + interval;
4196   }
4197
4198
4199   GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT,
4200       GST_TIME_ARGS (sess->next_rtcp_check_time));
4201
4202   return TRUE;
4203 }
4204
4205 static void
4206 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
4207 {
4208   g_hash_table_insert (hash_table, key, g_object_ref (source));
4209 }
4210
4211 static gboolean
4212 remove_closing_sources (const gchar * key, RTPSource * source,
4213     ReportData * data)
4214 {
4215   if (source->closing)
4216     return TRUE;
4217
4218   if (source->send_fir)
4219     data->have_fir = TRUE;
4220   if (source->send_pli)
4221     data->have_pli = TRUE;
4222   if (source->send_nack)
4223     data->have_nack = TRUE;
4224
4225   return FALSE;
4226 }
4227
4228 static void
4229 generate_twcc (const gchar * key, RTPSource * source, ReportData * data)
4230 {
4231   RTPSession *sess = data->sess;
4232   GstBuffer *buf;
4233
4234   /* only generate RTCP for active internal sources */
4235   if (!source->internal || source->sent_bye)
4236     return;
4237
4238   /* ignore other sources when we do the timeout after a scheduled BYE */
4239   if (sess->scheduled_bye && !source->marked_bye)
4240     return;
4241
4242   /* skip if RTCP is disabled */
4243   if (source->disable_rtcp) {
4244     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4245     return;
4246   }
4247
4248   while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) {
4249     ReportOutput *output = g_slice_new (ReportOutput);
4250     output->source = g_object_ref (source);
4251     output->is_bye = FALSE;
4252     output->buffer = buf;
4253     /* queue the RTCP packet to push later */
4254     g_queue_push_tail (&data->output, output);
4255   }
4256 }
4257
4258
4259 static void
4260 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
4261 {
4262   RTPSession *sess = data->sess;
4263   gboolean is_bye = FALSE;
4264   ReportOutput *output;
4265
4266   /* only generate RTCP for active internal sources */
4267   if (!source->internal || source->sent_bye)
4268     return;
4269
4270   /* ignore other sources when we do the timeout after a scheduled BYE */
4271   if (sess->scheduled_bye && !source->marked_bye)
4272     return;
4273
4274   /* skip if RTCP is disabled */
4275   if (source->disable_rtcp) {
4276     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4277     return;
4278   }
4279
4280   data->source = source;
4281
4282   /* open packet */
4283   session_start_rtcp (sess, data);
4284
4285   if (source->marked_bye) {
4286     /* send BYE */
4287     make_source_bye (sess, source, data);
4288     is_bye = TRUE;
4289   } else if (!data->is_early) {
4290     /* loop over all known sources and add report blocks. If we are early, we
4291      * just make a minimal RTCP packet and skip this step */
4292     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4293         (GHFunc) session_report_blocks, data);
4294   }
4295   if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp))
4296     session_sdes (sess, data);
4297
4298   if (data->have_fir)
4299     session_fir (sess, data);
4300
4301   if (data->have_pli)
4302     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4303         (GHFunc) session_pli, data);
4304
4305   if (data->have_nack)
4306     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4307         (GHFunc) session_nack, data);
4308
4309   gst_rtcp_buffer_unmap (&data->rtcpbuf);
4310
4311   output = g_slice_new (ReportOutput);
4312   output->source = g_object_ref (source);
4313   output->is_bye = is_bye;
4314   output->buffer = data->rtcp;
4315   /* queue the RTCP packet to push later */
4316   g_queue_push_tail (&data->output, output);
4317 }
4318
4319 static void
4320 update_generation (const gchar * key, RTPSource * source, ReportData * data)
4321 {
4322   RTPSession *sess = data->sess;
4323
4324   if (g_hash_table_size (source->reported_in_sr_of) >=
4325       sess->stats.internal_sources) {
4326     /* source is reported, move to next generation */
4327     source->generation = sess->generation + 1;
4328     g_hash_table_remove_all (source->reported_in_sr_of);
4329
4330     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
4331         source->generation);
4332
4333     /* if we reported all sources in this generation, move to next */
4334     if (--data->num_to_report == 0) {
4335       sess->generation++;
4336       GST_DEBUG ("all reported, generation now %u", sess->generation);
4337     }
4338   }
4339 }
4340
4341 static void
4342 schedule_remaining_nacks (const gchar * key, RTPSource * source,
4343     ReportData * data)
4344 {
4345   RTPSession *sess = data->sess;
4346   GstClockTime *nack_deadlines;
4347   GstClockTime deadline;
4348   guint n_nacks;
4349
4350   if (!source->send_nack)
4351     return;
4352
4353   /* the scheduling is entirely based on available bandwidth, just take the
4354    * biggest seqnum, which will have the largest deadline to request early
4355    * RTCP. */
4356   nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
4357   deadline = nack_deadlines[n_nacks - 1];
4358   RTP_SESSION_UNLOCK (sess);
4359   rtp_session_send_rtcp_with_deadline (sess, deadline);
4360   RTP_SESSION_LOCK (sess);
4361 }
4362
4363 static gboolean
4364 rtp_session_are_all_sources_bye (RTPSession * sess)
4365 {
4366   GHashTableIter iter;
4367   RTPSource *src;
4368
4369   RTP_SESSION_LOCK (sess);
4370   g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
4371   while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
4372     if (src->internal && !src->sent_bye) {
4373       RTP_SESSION_UNLOCK (sess);
4374       return FALSE;
4375     }
4376   }
4377   RTP_SESSION_UNLOCK (sess);
4378
4379   return TRUE;
4380 }
4381
4382 /**
4383  * rtp_session_on_timeout:
4384  * @sess: an #RTPSession
4385  * @current_time: the current system time
4386  * @ntpnstime: the current NTP time in nanoseconds
4387  * @running_time: the current running_time of the pipeline
4388  *
4389  * Perform maintenance actions after the timeout obtained with
4390  * rtp_session_next_timeout() expired.
4391  *
4392  * This function will perform timeouts of receivers and senders, send a BYE
4393  * packet or generate RTCP packets with current session stats.
4394  *
4395  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
4396  * times, for each packet that should be processed.
4397  *
4398  * Returns: a #GstFlowReturn.
4399  */
4400 GstFlowReturn
4401 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
4402     guint64 ntpnstime, GstClockTime running_time)
4403 {
4404   GstFlowReturn result = GST_FLOW_OK;
4405   ReportData data = { GST_RTCP_BUFFER_INIT };
4406   GHashTable *table_copy;
4407   ReportOutput *output;
4408   gboolean all_empty = FALSE;
4409
4410   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
4411
4412   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
4413       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4414       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
4415
4416   data.sess = sess;
4417   data.current_time = current_time;
4418   data.ntpnstime = ntpnstime;
4419   data.running_time = running_time;
4420   data.num_to_report = 0;
4421   data.may_suppress = FALSE;
4422   data.nacked_seqnums = 0;
4423   g_queue_init (&data.output);
4424
4425   RTP_SESSION_LOCK (sess);
4426   /* get a new interval, we need this for various cleanups etc */
4427   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
4428
4429   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
4430
4431   /* we need an internal source now */
4432   if (sess->stats.internal_sources == 0) {
4433     RTPSource *source;
4434     gboolean created;
4435
4436     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
4437         current_time);
4438     sess->internal_ssrc_set = TRUE;
4439
4440     if (created)
4441       on_new_sender_ssrc (sess, source);
4442
4443     g_object_unref (source);
4444   }
4445
4446   sess->conflicting_addresses =
4447       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
4448
4449   /* Make a local copy of the hashtable. We need to do this because the
4450    * cleanup stage below releases the session lock. */
4451   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
4452       (GDestroyNotify) g_object_unref);
4453   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4454       (GHFunc) clone_ssrcs_hashtable, table_copy);
4455
4456   /* Clean up the session, mark the source for removing, this might release the
4457    * session lock. */
4458   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
4459   g_hash_table_destroy (table_copy);
4460
4461   /* Now remove the marked sources */
4462   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
4463       (GHRFunc) remove_closing_sources, &data);
4464
4465   /* update point-to-point status */
4466   session_update_ptp (sess);
4467
4468   /* see if we need to generate SR or RR packets */
4469   if (!is_rtcp_time (sess, current_time, &data))
4470     goto done;
4471
4472   /* check if all the buffers are empty after generation */
4473   all_empty = TRUE;
4474
4475   GST_DEBUG
4476       ("doing RTCP generation %u for %u sources, early %d, may suppress %d",
4477       sess->generation, data.num_to_report, data.is_early, data.may_suppress);
4478
4479   /* generate RTCP for all internal sources */
4480   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4481       (GHFunc) generate_rtcp, &data);
4482
4483   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4484       (GHFunc) generate_twcc, &data);
4485
4486   /* update the generation for all the sources that have been reported */
4487   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4488       (GHFunc) update_generation, &data);
4489
4490   /* we keep track of the last report time in order to timeout inactive
4491    * receivers or senders */
4492   if (!data.is_early) {
4493     GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %"
4494         GST_TIME_FORMAT " = %" GST_TIME_FORMAT,
4495         GST_TIME_ARGS (data.current_time),
4496         GST_TIME_ARGS (sess->last_rtcp_send_time),
4497         GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time));
4498     sess->last_rtcp_send_time = data.current_time;
4499   }
4500
4501   GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
4502       " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time),
4503       GST_TIME_ARGS (sess->last_rtcp_check_time),
4504       GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time));
4505   sess->last_rtcp_check_time = data.current_time;
4506   sess->first_rtcp = FALSE;
4507   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
4508   sess->scheduled_bye = FALSE;
4509
4510 done:
4511   RTP_SESSION_UNLOCK (sess);
4512
4513   /* notify about updated statistics */
4514   g_object_notify (G_OBJECT (sess), "stats");
4515
4516   /* push out the RTCP packets */
4517   while ((output = g_queue_pop_head (&data.output))) {
4518     gboolean do_not_suppress, empty_buffer;
4519     GstBuffer *buffer = output->buffer;
4520     RTPSource *source = output->source;
4521
4522     /* Give the user a change to add its own packet */
4523     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
4524         buffer, data.is_early, &do_not_suppress);
4525
4526     empty_buffer = gst_buffer_get_size (buffer) == 0;
4527
4528     if (!empty_buffer)
4529       all_empty = FALSE;
4530
4531     if (sess->callbacks.send_rtcp &&
4532         !empty_buffer && (do_not_suppress || !data.may_suppress)) {
4533       guint packet_size;
4534
4535       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
4536
4537       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
4538       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
4539           sess->stats.avg_rtcp_packet_size, packet_size);
4540       result =
4541           sess->callbacks.send_rtcp (sess, source, buffer,
4542           rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data);
4543
4544       RTP_SESSION_LOCK (sess);
4545       sess->stats.nacks_sent += data.nacked_seqnums;
4546       on_sender_ssrc_active (sess, source);
4547       RTP_SESSION_UNLOCK (sess);
4548     } else {
4549       GST_DEBUG ("freeing packet callback: %p"
4550           " empty_buffer: %d, "
4551           " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp,
4552           empty_buffer, do_not_suppress, data.may_suppress);
4553       if (!empty_buffer) {
4554         RTP_SESSION_LOCK (sess);
4555         sess->stats.nacks_dropped += data.nacked_seqnums;
4556         RTP_SESSION_UNLOCK (sess);
4557       }
4558       gst_buffer_unref (buffer);
4559     }
4560     g_object_unref (source);
4561     g_slice_free (ReportOutput, output);
4562   }
4563
4564   if (all_empty)
4565     GST_ERROR ("generated empty RTCP messages for all the sources");
4566
4567   /* schedule remaining nacks */
4568   RTP_SESSION_LOCK (sess);
4569   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4570       (GHFunc) schedule_remaining_nacks, &data);
4571   RTP_SESSION_UNLOCK (sess);
4572
4573   return result;
4574 }
4575
4576 /**
4577  * rtp_session_request_early_rtcp:
4578  * @sess: an #RTPSession
4579  * @current_time: the current system time
4580  * @max_delay: maximum delay
4581  *
4582  * Request transmission of early RTCP
4583  *
4584  * Returns: %TRUE if the related RTCP can be scheduled.
4585  */
4586 gboolean
4587 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
4588     GstClockTime max_delay)
4589 {
4590   GstClockTime T_dither_max, T_rr, offset = 0;
4591   gboolean ret;
4592   gboolean allow_early;
4593
4594   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
4595
4596   RTP_SESSION_LOCK (sess);
4597
4598   /* We assume a feedback profile if something is requesting RTCP
4599    * to be sent */
4600   sess->rtp_profile = GST_RTP_PROFILE_AVPF;
4601
4602   /* Check if already requested */
4603   /*  RFC 4585 section 3.5.2 step 2 */
4604   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
4605     GST_LOG_OBJECT (sess, "already have next early rtcp time");
4606     ret = (current_time + max_delay > sess->next_early_rtcp_time);
4607     goto end;
4608   }
4609
4610   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
4611     GST_LOG_OBJECT (sess, "no next RTCP check time");
4612     ret = FALSE;
4613     goto end;
4614   }
4615
4616   /* RFC 4585 section 3.5.3 step 1
4617    * If no regular RTCP packet has been sent before, then a regular
4618    * RTCP packet has to be scheduled first and FB messages might be
4619    * included there
4620    */
4621   if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
4622     GST_LOG_OBJECT (sess, "no RTCP sent yet");
4623
4624     if (current_time + max_delay > sess->next_rtcp_check_time) {
4625       GST_LOG_OBJECT (sess,
4626           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4627           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4628           GST_TIME_ARGS (max_delay),
4629           GST_TIME_ARGS (sess->next_rtcp_check_time));
4630       ret = TRUE;
4631     } else {
4632       GST_LOG_OBJECT (sess,
4633           "can't allow early feedback, next scheduled time is too late %"
4634           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4635           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4636           GST_TIME_ARGS (sess->next_rtcp_check_time));
4637       ret = FALSE;
4638     }
4639     goto end;
4640   }
4641
4642   T_rr = sess->last_rtcp_interval;
4643
4644   /*  RFC 4585 section 3.5.2 step 2b */
4645   /* If the total sources is <=2, then there is only us and one peer */
4646   /* When there is one auxiliary stream the session can still do point
4647    * to point.
4648    */
4649   if (sess->is_doing_ptp) {
4650     T_dither_max = 0;
4651   } else {
4652     /* Divide by 2 because l = 0.5 */
4653     T_dither_max = T_rr;
4654     T_dither_max /= 2;
4655   }
4656
4657   /*  RFC 4585 section 3.5.2 step 3 */
4658   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
4659     GST_LOG_OBJECT (sess,
4660         "don't send because of dither, next scheduled time is too soon %"
4661         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
4662         GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
4663         GST_TIME_ARGS (sess->next_rtcp_check_time));
4664     ret = T_dither_max <= max_delay;
4665     goto end;
4666   }
4667
4668   /*  RFC 4585 section 3.5.2 step 4a and
4669    *  RFC 4585 section 3.5.2 step 6 */
4670   allow_early = FALSE;
4671   if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) {
4672     /* Last time we sent a full RTCP packet, we can now immediately
4673      * send an early one as allow_early was reset to TRUE */
4674     allow_early = TRUE;
4675   } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) {
4676     /* Last packet we sent was an early RTCP packet and more than
4677      * T_rr has passed since then, meaning we would have suppressed
4678      * a regular RTCP packet already and reset allow_early to TRUE */
4679     allow_early = TRUE;
4680
4681     /* We have to offset a bit as T_rr has not passed yet, but will before
4682      * max_delay */
4683     if (sess->last_rtcp_check_time + T_rr > current_time)
4684       offset = (sess->last_rtcp_check_time + T_rr) - current_time;
4685   } else {
4686     GST_DEBUG_OBJECT (sess,
4687         "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %"
4688         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %"
4689         GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time),
4690         GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr),
4691         GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay));
4692   }
4693
4694   if (!allow_early) {
4695     /* Ignore the request a scheduled packet will be in time anyway */
4696     if (current_time + max_delay > sess->next_rtcp_check_time) {
4697       GST_LOG_OBJECT (sess,
4698           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4699           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4700           GST_TIME_ARGS (max_delay),
4701           GST_TIME_ARGS (sess->next_rtcp_check_time));
4702       ret = TRUE;
4703     } else {
4704       GST_LOG_OBJECT (sess,
4705           "can't allow early feedback and next scheduled time is too late %"
4706           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4707           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4708           GST_TIME_ARGS (sess->next_rtcp_check_time));
4709       ret = FALSE;
4710     }
4711     goto end;
4712   }
4713
4714   /*  RFC 4585 section 3.5.2 step 4b */
4715   if (T_dither_max) {
4716     /* Schedule an early transmission later */
4717     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
4718         current_time + offset;
4719   } else {
4720     /* If no dithering, schedule it for NOW */
4721     sess->next_early_rtcp_time = current_time + offset;
4722   }
4723
4724   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
4725       ", next regular RTCP time %" GST_TIME_FORMAT,
4726       GST_TIME_ARGS (sess->next_early_rtcp_time),
4727       GST_TIME_ARGS (sess->next_rtcp_check_time));
4728   RTP_SESSION_UNLOCK (sess);
4729
4730   /* notify app of need to send packet early
4731    * and therefore of timeout change */
4732   if (sess->callbacks.reconsider)
4733     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
4734
4735   return TRUE;
4736
4737 end:
4738
4739   RTP_SESSION_UNLOCK (sess);
4740
4741   return ret;
4742 }
4743
4744 static gboolean
4745 rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
4746     GstClockTime max_delay)
4747 {
4748   /* notify the application that we intend to send early RTCP */
4749   if (sess->callbacks.notify_early_rtcp)
4750     sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
4751
4752   return rtp_session_request_early_rtcp (sess, now, max_delay);
4753 }
4754
4755 static gboolean
4756 rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
4757 {
4758   GstClockTime now, max_delay;
4759
4760   if (!sess->callbacks.send_rtcp)
4761     return FALSE;
4762
4763   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4764
4765   if (deadline < now)
4766     return FALSE;
4767
4768   max_delay = deadline - now;
4769
4770   return rtp_session_send_rtcp_internal (sess, now, max_delay);
4771 }
4772
4773 static gboolean
4774 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
4775 {
4776   GstClockTime now;
4777
4778   if (!sess->callbacks.send_rtcp)
4779     return FALSE;
4780
4781   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4782
4783   return rtp_session_send_rtcp_internal (sess, now, max_delay);
4784 }
4785
4786 gboolean
4787 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
4788     gboolean fir, gint count)
4789 {
4790   RTPSource *src;
4791
4792   RTP_SESSION_LOCK (sess);
4793   src = find_source (sess, ssrc);
4794   if (src == NULL)
4795     goto no_source;
4796
4797   if (fir) {
4798     src->send_pli = FALSE;
4799     src->send_fir = TRUE;
4800
4801     if (count == -1 || count != src->last_fir_count)
4802       src->current_send_fir_seqnum++;
4803     src->last_fir_count = count;
4804   } else if (!src->send_fir) {
4805     src->send_pli = TRUE;
4806   }
4807   RTP_SESSION_UNLOCK (sess);
4808
4809   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
4810     GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP");
4811   }
4812
4813   return TRUE;
4814
4815   /* ERRORS */
4816 no_source:
4817   {
4818     RTP_SESSION_UNLOCK (sess);
4819     return FALSE;
4820   }
4821 }
4822
4823 /**
4824  * rtp_session_request_nack:
4825  * @sess: a #RTPSession
4826  * @ssrc: the SSRC
4827  * @seqnum: the missing seqnum
4828  * @max_delay: max delay to request NACK
4829  *
4830  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
4831  *
4832  * Returns: %TRUE if the NACK feedback could be scheduled
4833  */
4834 gboolean
4835 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
4836     GstClockTime max_delay)
4837 {
4838   RTPSource *source;
4839   GstClockTime now;
4840
4841   if (!sess->callbacks.send_rtcp)
4842     return FALSE;
4843
4844   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4845
4846   RTP_SESSION_LOCK (sess);
4847   source = find_source (sess, ssrc);
4848   if (source == NULL)
4849     goto no_source;
4850
4851   GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
4852       ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
4853   rtp_source_register_nack (source, seqnum, now + max_delay);
4854   RTP_SESSION_UNLOCK (sess);
4855
4856   if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
4857     GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
4858   }
4859
4860   return TRUE;
4861
4862   /* ERRORS */
4863 no_source:
4864   {
4865     RTP_SESSION_UNLOCK (sess);
4866     return FALSE;
4867   }
4868 }
4869
4870 /**
4871  * rtp_session_update_recv_caps_structure:
4872  * @sess: an #RTPSession
4873  * @s: a #GstStructure from a #GstCaps
4874  *
4875  * Update the caps of the receiver in the rtp session.
4876  */
4877 void
4878 rtp_session_update_recv_caps_structure (RTPSession * sess,
4879     const GstStructure * s)
4880 {
4881   rtp_twcc_manager_parse_recv_ext_id (sess->twcc, s);
4882 }