rtpsession: Send as many nack seqnum as possible
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25
26 #include <gst/rtp/gstrtpbuffer.h>
27 #include <gst/rtp/gstrtcpbuffer.h>
28
29 #include <gst/glib-compat-private.h>
30
31 #include "rtpsession.h"
32
33 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
34 #define GST_CAT_DEFAULT rtp_session_debug
35
36 /* signals and args */
37 enum
38 {
39   SIGNAL_GET_SOURCE_BY_SSRC,
40   SIGNAL_ON_NEW_SSRC,
41   SIGNAL_ON_SSRC_COLLISION,
42   SIGNAL_ON_SSRC_VALIDATED,
43   SIGNAL_ON_SSRC_ACTIVE,
44   SIGNAL_ON_SSRC_SDES,
45   SIGNAL_ON_BYE_SSRC,
46   SIGNAL_ON_BYE_TIMEOUT,
47   SIGNAL_ON_TIMEOUT,
48   SIGNAL_ON_SENDER_TIMEOUT,
49   SIGNAL_ON_SENDING_RTCP,
50   SIGNAL_ON_APP_RTCP,
51   SIGNAL_ON_FEEDBACK_RTCP,
52   SIGNAL_SEND_RTCP,
53   SIGNAL_SEND_RTCP_FULL,
54   SIGNAL_ON_RECEIVING_RTCP,
55   SIGNAL_ON_NEW_SENDER_SSRC,
56   SIGNAL_ON_SENDER_SSRC_ACTIVE,
57   LAST_SIGNAL
58 };
59
60 #define DEFAULT_INTERNAL_SOURCE      NULL
61 #define DEFAULT_BANDWIDTH            0.0
62 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
63 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
64 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
65 #define DEFAULT_RTCP_MTU             1400
66 #define DEFAULT_SDES                 NULL
67 #define DEFAULT_NUM_SOURCES          0
68 #define DEFAULT_NUM_ACTIVE_SOURCES   0
69 #define DEFAULT_SOURCES              NULL
70 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
71 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
72 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
73 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
74 #define DEFAULT_MAX_DROPOUT_TIME     60000
75 #define DEFAULT_MAX_MISORDER_TIME    2000
76 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
77 #define DEFAULT_RTCP_REDUCED_SIZE    FALSE
78
79 enum
80 {
81   PROP_0,
82   PROP_INTERNAL_SSRC,
83   PROP_INTERNAL_SOURCE,
84   PROP_BANDWIDTH,
85   PROP_RTCP_FRACTION,
86   PROP_RTCP_RR_BANDWIDTH,
87   PROP_RTCP_RS_BANDWIDTH,
88   PROP_RTCP_MTU,
89   PROP_SDES,
90   PROP_NUM_SOURCES,
91   PROP_NUM_ACTIVE_SOURCES,
92   PROP_SOURCES,
93   PROP_FAVOR_NEW,
94   PROP_RTCP_MIN_INTERVAL,
95   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
96   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
97   PROP_PROBATION,
98   PROP_MAX_DROPOUT_TIME,
99   PROP_MAX_MISORDER_TIME,
100   PROP_STATS,
101   PROP_RTP_PROFILE,
102   PROP_RTCP_REDUCED_SIZE
103 };
104
105 /* update average packet size */
106 #define INIT_AVG(avg, val) \
107    (avg) = (val);
108 #define UPDATE_AVG(avg, val)            \
109   if ((avg) == 0)                       \
110    (avg) = (val);                       \
111   else                                  \
112    (avg) = ((val) + (15 * (avg))) >> 4;
113
114
115 /* GObject vmethods */
116 static void rtp_session_finalize (GObject * object);
117 static void rtp_session_set_property (GObject * object, guint prop_id,
118     const GValue * value, GParamSpec * pspec);
119 static void rtp_session_get_property (GObject * object, guint prop_id,
120     GValue * value, GParamSpec * pspec);
121
122 static gboolean rtp_session_send_rtcp (RTPSession * sess,
123     GstClockTime max_delay);
124 static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
125     GstClockTime deadline);
126
127 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
128
129 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
130
131 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
132 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
133     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
134 static RTPSource *obtain_internal_source (RTPSession * sess,
135     guint32 ssrc, gboolean * created, GstClockTime current_time);
136 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
137     GstClockTime current_time);
138 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
139     gboolean deterministic, gboolean first);
140
141 static gboolean
142 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
143     const GValue * handler_return, gpointer data)
144 {
145   if (g_value_get_boolean (handler_return))
146     g_value_set_boolean (return_accu, TRUE);
147
148   return TRUE;
149 }
150
151 static void
152 rtp_session_class_init (RTPSessionClass * klass)
153 {
154   GObjectClass *gobject_class;
155
156   gobject_class = (GObjectClass *) klass;
157
158   gobject_class->finalize = rtp_session_finalize;
159   gobject_class->set_property = rtp_session_set_property;
160   gobject_class->get_property = rtp_session_get_property;
161
162   /**
163    * RTPSession::get-source-by-ssrc:
164    * @session: the object which received the signal
165    * @ssrc: the SSRC of the RTPSource
166    *
167    * Request the #RTPSource object with SSRC @ssrc in @session.
168    */
169   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
170       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
171       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
172           get_source_by_ssrc), NULL, NULL, g_cclosure_marshal_generic,
173       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
174
175   /**
176    * RTPSession::on-new-ssrc:
177    * @session: the object which received the signal
178    * @src: the new RTPSource
179    *
180    * Notify of a new SSRC that entered @session.
181    */
182   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
183       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
184       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
185       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
186       RTP_TYPE_SOURCE);
187   /**
188    * RTPSession::on-ssrc-collision:
189    * @session: the object which received the signal
190    * @src: the #RTPSource that caused a collision
191    *
192    * Notify when we have an SSRC collision
193    */
194   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
195       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
196       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
197       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
198       RTP_TYPE_SOURCE);
199   /**
200    * RTPSession::on-ssrc-validated:
201    * @session: the object which received the signal
202    * @src: the new validated RTPSource
203    *
204    * Notify of a new SSRC that became validated.
205    */
206   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
207       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
208       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
209       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
210       RTP_TYPE_SOURCE);
211   /**
212    * RTPSession::on-ssrc-active:
213    * @session: the object which received the signal
214    * @src: the active RTPSource
215    *
216    * Notify of a SSRC that is active, i.e., sending RTCP.
217    */
218   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
219       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
220       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
221       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
222       RTP_TYPE_SOURCE);
223   /**
224    * RTPSession::on-ssrc-sdes:
225    * @session: the object which received the signal
226    * @src: the RTPSource
227    *
228    * Notify that a new SDES was received for SSRC.
229    */
230   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
231       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
232       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
233       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
234       RTP_TYPE_SOURCE);
235   /**
236    * RTPSession::on-bye-ssrc:
237    * @session: the object which received the signal
238    * @src: the RTPSource that went away
239    *
240    * Notify of an SSRC that became inactive because of a BYE packet.
241    */
242   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
243       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
245       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
246       RTP_TYPE_SOURCE);
247   /**
248    * RTPSession::on-bye-timeout:
249    * @session: the object which received the signal
250    * @src: the RTPSource that timed out
251    *
252    * Notify of an SSRC that has timed out because of BYE
253    */
254   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
255       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
256       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
257       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
258       RTP_TYPE_SOURCE);
259   /**
260    * RTPSession::on-timeout:
261    * @session: the object which received the signal
262    * @src: the RTPSource that timed out
263    *
264    * Notify of an SSRC that has timed out
265    */
266   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
267       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
268       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
269       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
270       RTP_TYPE_SOURCE);
271   /**
272    * RTPSession::on-sender-timeout:
273    * @session: the object which received the signal
274    * @src: the RTPSource that timed out
275    *
276    * Notify of an SSRC that was a sender but timed out and became a receiver.
277    */
278   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
279       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
280       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
281       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
282       RTP_TYPE_SOURCE);
283
284   /**
285    * RTPSession::on-sending-rtcp
286    * @session: the object which received the signal
287    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
288    * @early: %TRUE if the packet is early, %FALSE if it is regular
289    *
290    * This signal is emitted before sending an RTCP packet, it can be used
291    * to add extra RTCP Packets.
292    *
293    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
294    * if suppressing it is acceptable
295    */
296   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
297       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
298       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
299       accumulate_trues, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2,
300       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
301
302   /**
303    * RTPSession::on-app-rtcp:
304    * @session: the object which received the signal
305    * @subtype: The subtype of the packet
306    * @ssrc: The SSRC/CSRC of the packet
307    * @name: The name of the packet
308    * @data: a #GstBuffer with the application-dependant data or %NULL if
309    * there was no data
310    *
311    * Notify that a RTCP APP packet has been received
312    */
313   rtp_session_signals[SIGNAL_ON_APP_RTCP] =
314       g_signal_new ("on-app-rtcp", G_TYPE_FROM_CLASS (klass),
315       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_app_rtcp),
316       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 4,
317       G_TYPE_UINT, G_TYPE_UINT, G_TYPE_STRING, GST_TYPE_BUFFER);
318
319   /**
320    * RTPSession::on-feedback-rtcp:
321    * @session: the object which received the signal
322    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
323    *  %GST_RTCP_TYPE_RTPFB
324    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
325    * @sender_ssrc: The SSRC of the sender
326    * @media_ssrc: The SSRC of the media this refers to
327    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
328    * there was no FCI
329    *
330    * Notify that a RTCP feedback packet has been received
331    */
332   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
333       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
334       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
335       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 5, G_TYPE_UINT,
336       G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, GST_TYPE_BUFFER);
337
338   /**
339    * RTPSession::send-rtcp:
340    * @session: the object which received the signal
341    * @max_delay: The maximum delay after which the feedback will not be useful
342    *  anymore
343    *
344    * Requests that the #RTPSession initiate a new RTCP packet as soon as
345    * possible within the requested delay.
346    *
347    * This sets feedback to %TRUE if not already done before.
348    */
349   rtp_session_signals[SIGNAL_SEND_RTCP] =
350       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
351       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
352       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
353       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
354
355   /**
356    * RTPSession::send-rtcp-full:
357    * @session: the object which received the signal
358    * @max_delay: The maximum delay after which the feedback will not be useful
359    *  anymore
360    *
361    * Requests that the #RTPSession initiate a new RTCP packet as soon as
362    * possible within the requested delay.
363    *
364    * This sets feedback to %TRUE if not already done before.
365    *
366    * Returns: TRUE if the new RTCP packet could be scheduled within the
367    * requested delay, FALSE otherwise.
368    *
369    * Since: 1.6
370    */
371   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
372       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
373       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
374       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
375       g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
376
377   /**
378    * RTPSession::on-receiving-rtcp
379    * @session: the object which received the signal
380    * @buffer: the #GstBuffer containing the RTCP packet that was received
381    *
382    * This signal is emitted when receiving an RTCP packet before it is handled
383    * by the session. It can be used to extract custom information from RTCP packets.
384    *
385    * Since: 1.6
386    */
387   rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
388       g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
389       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
390       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
391       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
392
393   /**
394    * RTPSession::on-new-sender-ssrc:
395    * @session: the object which received the signal
396    * @src: the new sender RTPSource
397    *
398    * Notify of a new sender SSRC that entered @session.
399    *
400    * Since: 1.8
401    */
402   rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
403       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
404       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc),
405       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
406       RTP_TYPE_SOURCE);
407
408   /**
409    * RTPSession::on-sender-ssrc-active:
410    * @session: the object which received the signal
411    * @src: the active sender RTPSource
412    *
413    * Notify of a sender SSRC that is active, i.e., sending RTCP.
414    *
415    * Since: 1.8
416    */
417   rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
418       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
419       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass,
420           on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
421       G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
422
423   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
424       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
425           "The internal SSRC used for the session (deprecated)",
426           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
427
428   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
429       g_param_spec_object ("internal-source", "Internal Source",
430           "The internal source element of the session (deprecated)",
431           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
432
433   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
434       g_param_spec_double ("bandwidth", "Bandwidth",
435           "The bandwidth of the session in bits per second (0 for auto-discover)",
436           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
437           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
438
439   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
440       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
441           "The fraction of the bandwidth used for RTCP in bits per second (or as a real fraction of the RTP bandwidth if < 1)",
442           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
443           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444
445   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
446       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
447           "The RTCP bandwidth used for receivers in bits per second (-1 = default)",
448           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450
451   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
452       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
453           "The RTCP bandwidth used for senders in bits per second (-1 = default)",
454           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
455           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456
457   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
458       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
459           "The maximum size of the RTCP packets",
460           16, G_MAXINT16, DEFAULT_RTCP_MTU,
461           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
462
463   g_object_class_install_property (gobject_class, PROP_SDES,
464       g_param_spec_boxed ("sdes", "SDES",
465           "The SDES items of this session",
466           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
467
468   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
469       g_param_spec_uint ("num-sources", "Num Sources",
470           "The number of sources in the session", 0, G_MAXUINT,
471           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
472
473   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
474       g_param_spec_uint ("num-active-sources", "Num Active Sources",
475           "The number of active sources in the session", 0, G_MAXUINT,
476           DEFAULT_NUM_ACTIVE_SOURCES,
477           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
478   /**
479    * RTPSource::sources
480    *
481    * Get a GValue Array of all sources in the session.
482    *
483    * <example>
484    * <title>Getting the #RTPSources of a session
485    * <programlisting>
486    * {
487    *   GValueArray *arr;
488    *   GValue *val;
489    *   guint i;
490    *
491    *   g_object_get (sess, "sources", &arr, NULL);
492    *
493    *   for (i = 0; i < arr->n_values; i++) {
494    *     RTPSource *source;
495    *
496    *     val = g_value_array_get_nth (arr, i);
497    *     source = g_value_get_object (val);
498    *   }
499    *   g_value_array_free (arr);
500    * }
501    * </programlisting>
502    * </example>
503    */
504   g_object_class_install_property (gobject_class, PROP_SOURCES,
505       g_param_spec_boxed ("sources", "Sources",
506           "An array of all known sources in the session",
507           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
508
509   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
510       g_param_spec_boolean ("favor-new", "Favor new sources",
511           "Resolve SSRC conflict in favor of new sources", FALSE,
512           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
513
514   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
515       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
516           "Minimum interval between Regular RTCP packet (in ns)",
517           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
518           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
519
520   g_object_class_install_property (gobject_class,
521       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
522       g_param_spec_uint64 ("rtcp-feedback-retention-window",
523           "RTCP Feedback retention window",
524           "Duration during which RTCP Feedback packets are retained (in ns)",
525           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
526           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
527
528   g_object_class_install_property (gobject_class,
529       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
530       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
531           "RTCP Immediate Feedback threshold",
532           "The maximum number of members of a RTP session for which immediate"
533           " feedback is used (DEPRECATED: has no effect and is not needed)",
534           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
535           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
536
537   g_object_class_install_property (gobject_class, PROP_PROBATION,
538       g_param_spec_uint ("probation", "Number of probations",
539           "Consecutive packet sequence numbers to accept the source",
540           0, G_MAXUINT, DEFAULT_PROBATION,
541           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
542
543   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
544       g_param_spec_uint ("max-dropout-time", "Max dropout time",
545           "The maximum time (milliseconds) of missing packets tolerated.",
546           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
547           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
548
549   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
550       g_param_spec_uint ("max-misorder-time", "Max misorder time",
551           "The maximum time (milliseconds) of misordered packets tolerated.",
552           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
553           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
554
555   /**
556    * RTPSession::stats:
557    *
558    * Various session statistics. This property returns a GstStructure
559    * with name application/x-rtp-session-stats with the following fields:
560    *
561    *  "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
562    *      dropped (due to bandwidth constraints)
563    *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
564    *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
565    *  "source-stats"    G_TYPE_BOXED  GValueArray of #RTPSource::stats for all
566    *      RTP sources (Since 1.8)
567    *
568    * Since: 1.4
569    */
570   g_object_class_install_property (gobject_class, PROP_STATS,
571       g_param_spec_boxed ("stats", "Statistics",
572           "Various statistics", GST_TYPE_STRUCTURE,
573           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
574
575   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
576       g_param_spec_enum ("rtp-profile", "RTP Profile",
577           "RTP profile to use for this session", GST_TYPE_RTP_PROFILE,
578           DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
579
580   g_object_class_install_property (gobject_class, PROP_RTCP_REDUCED_SIZE,
581       g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size",
582           "Use Reduced Size RTCP for feedback packets",
583           DEFAULT_RTCP_REDUCED_SIZE,
584           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
585
586   klass->get_source_by_ssrc =
587       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
588   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
589
590   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
591 }
592
593 static void
594 rtp_session_init (RTPSession * sess)
595 {
596   gint i;
597   gchar *str;
598
599   g_mutex_init (&sess->lock);
600   sess->key = g_random_int ();
601   sess->mask_idx = 0;
602   sess->mask = 0;
603
604   /* TODO: We currently only use the first hash table but this is the
605    * beginning of an implementation for RFC2762
606    for (i = 0; i < 32; i++) {
607    */
608   for (i = 0; i < 1; i++) {
609     sess->ssrcs[i] =
610         g_hash_table_new_full (NULL, NULL, NULL,
611         (GDestroyNotify) g_object_unref);
612   }
613
614   rtp_stats_init_defaults (&sess->stats);
615   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
616   rtp_stats_set_min_interval (&sess->stats,
617       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
618
619   sess->recalc_bandwidth = TRUE;
620   sess->bandwidth = DEFAULT_BANDWIDTH;
621   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
622   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
623   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
624
625   /* default UDP header length */
626   sess->header_len = 28;
627   sess->mtu = DEFAULT_RTCP_MTU;
628
629   sess->probation = DEFAULT_PROBATION;
630   sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
631   sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
632
633   /* some default SDES entries */
634   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
635
636   /* we do not want to leak details like the username or hostname here */
637   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
638   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
639   g_free (str);
640
641 #if 0
642   /* we do not want to leak the user's real name here */
643   str = g_strdup_printf ("Anon%u", g_random_int ());
644   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
645   g_free (str);
646 #endif
647
648   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
649
650   /* this is the SSRC we suggest */
651   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
652   sess->internal_ssrc_set = FALSE;
653
654   sess->first_rtcp = TRUE;
655   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
656   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
657   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
658   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
659
660   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
661   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
662   sess->rtcp_immediate_feedback_threshold =
663       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
664   sess->rtp_profile = DEFAULT_RTP_PROFILE;
665   sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
666
667   sess->is_doing_ptp = TRUE;
668 }
669
670 static void
671 rtp_session_finalize (GObject * object)
672 {
673   RTPSession *sess;
674   gint i;
675
676   sess = RTP_SESSION_CAST (object);
677
678   gst_structure_free (sess->sdes);
679
680   g_list_free_full (sess->conflicting_addresses,
681       (GDestroyNotify) rtp_conflicting_address_free);
682
683   /* TODO: Change this again when implementing RFC 2762
684    * for (i = 0; i < 32; i++)
685    */
686   for (i = 0; i < 1; i++)
687     g_hash_table_destroy (sess->ssrcs[i]);
688
689   g_mutex_clear (&sess->lock);
690
691   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
692 }
693
694 static void
695 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
696 {
697   GValue value = { 0 };
698
699   g_value_init (&value, RTP_TYPE_SOURCE);
700   g_value_take_object (&value, source);
701   /* copies the value */
702   g_value_array_append (arr, &value);
703 }
704
705 static GValueArray *
706 rtp_session_create_sources (RTPSession * sess)
707 {
708   GValueArray *res;
709   guint size;
710
711   RTP_SESSION_LOCK (sess);
712   /* get number of elements in the table */
713   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
714   /* create the result value array */
715   res = g_value_array_new (size);
716
717   /* and copy all values into the array */
718   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
719   RTP_SESSION_UNLOCK (sess);
720
721   return res;
722 }
723
724 static void
725 create_source_stats (gpointer key, RTPSource * source, GValueArray * arr)
726 {
727   GValue *value;
728   GstStructure *s;
729
730   g_object_get (source, "stats", &s, NULL);
731
732   g_value_array_append (arr, NULL);
733   value = g_value_array_get_nth (arr, arr->n_values - 1);
734   g_value_init (value, GST_TYPE_STRUCTURE);
735   g_value_take_boxed (value, s);
736 }
737
738 static GstStructure *
739 rtp_session_create_stats (RTPSession * sess)
740 {
741   GstStructure *s;
742   GValueArray *source_stats;
743   GValue source_stats_v = G_VALUE_INIT;
744   guint size;
745
746   RTP_SESSION_LOCK (sess);
747   s = gst_structure_new ("application/x-rtp-session-stats",
748       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
749       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
750       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
751
752   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
753   source_stats = g_value_array_new (size);
754   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
755       (GHFunc) create_source_stats, source_stats);
756   RTP_SESSION_UNLOCK (sess);
757
758   g_value_init (&source_stats_v, G_TYPE_VALUE_ARRAY);
759   g_value_take_boxed (&source_stats_v, source_stats);
760   gst_structure_take_value (s, "source-stats", &source_stats_v);
761
762   return s;
763 }
764
765 static void
766 rtp_session_set_property (GObject * object, guint prop_id,
767     const GValue * value, GParamSpec * pspec)
768 {
769   RTPSession *sess;
770
771   sess = RTP_SESSION (object);
772
773   switch (prop_id) {
774     case PROP_INTERNAL_SSRC:
775       RTP_SESSION_LOCK (sess);
776       sess->suggested_ssrc = g_value_get_uint (value);
777       sess->internal_ssrc_set = TRUE;
778       sess->internal_ssrc_from_caps_or_property = TRUE;
779       RTP_SESSION_UNLOCK (sess);
780       if (sess->callbacks.reconfigure)
781         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
782       break;
783     case PROP_BANDWIDTH:
784       RTP_SESSION_LOCK (sess);
785       sess->bandwidth = g_value_get_double (value);
786       sess->recalc_bandwidth = TRUE;
787       RTP_SESSION_UNLOCK (sess);
788       break;
789     case PROP_RTCP_FRACTION:
790       RTP_SESSION_LOCK (sess);
791       sess->rtcp_bandwidth = g_value_get_double (value);
792       sess->recalc_bandwidth = TRUE;
793       RTP_SESSION_UNLOCK (sess);
794       break;
795     case PROP_RTCP_RR_BANDWIDTH:
796       RTP_SESSION_LOCK (sess);
797       sess->rtcp_rr_bandwidth = g_value_get_int (value);
798       sess->recalc_bandwidth = TRUE;
799       RTP_SESSION_UNLOCK (sess);
800       break;
801     case PROP_RTCP_RS_BANDWIDTH:
802       RTP_SESSION_LOCK (sess);
803       sess->rtcp_rs_bandwidth = g_value_get_int (value);
804       sess->recalc_bandwidth = TRUE;
805       RTP_SESSION_UNLOCK (sess);
806       break;
807     case PROP_RTCP_MTU:
808       sess->mtu = g_value_get_uint (value);
809       break;
810     case PROP_SDES:
811       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
812       break;
813     case PROP_FAVOR_NEW:
814       sess->favor_new = g_value_get_boolean (value);
815       break;
816     case PROP_RTCP_MIN_INTERVAL:
817       rtp_stats_set_min_interval (&sess->stats,
818           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
819       /* trigger reconsideration */
820       RTP_SESSION_LOCK (sess);
821       sess->next_rtcp_check_time = 0;
822       RTP_SESSION_UNLOCK (sess);
823       if (sess->callbacks.reconsider)
824         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
825       break;
826     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
827       sess->rtcp_feedback_retention_window = g_value_get_uint64 (value);
828       break;
829     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
830       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
831       break;
832     case PROP_PROBATION:
833       sess->probation = g_value_get_uint (value);
834       break;
835     case PROP_MAX_DROPOUT_TIME:
836       sess->max_dropout_time = g_value_get_uint (value);
837       break;
838     case PROP_MAX_MISORDER_TIME:
839       sess->max_misorder_time = g_value_get_uint (value);
840       break;
841     case PROP_RTP_PROFILE:
842       sess->rtp_profile = g_value_get_enum (value);
843       /* trigger reconsideration */
844       RTP_SESSION_LOCK (sess);
845       sess->next_rtcp_check_time = 0;
846       RTP_SESSION_UNLOCK (sess);
847       if (sess->callbacks.reconsider)
848         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
849       break;
850     case PROP_RTCP_REDUCED_SIZE:
851       sess->reduced_size_rtcp = g_value_get_boolean (value);
852       break;
853     default:
854       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
855       break;
856   }
857 }
858
859 static void
860 rtp_session_get_property (GObject * object, guint prop_id,
861     GValue * value, GParamSpec * pspec)
862 {
863   RTPSession *sess;
864
865   sess = RTP_SESSION (object);
866
867   switch (prop_id) {
868     case PROP_INTERNAL_SSRC:
869       g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL));
870       break;
871     case PROP_INTERNAL_SOURCE:
872       /* FIXME, return a random source */
873       g_value_set_object (value, NULL);
874       break;
875     case PROP_BANDWIDTH:
876       g_value_set_double (value, sess->bandwidth);
877       break;
878     case PROP_RTCP_FRACTION:
879       g_value_set_double (value, sess->rtcp_bandwidth);
880       break;
881     case PROP_RTCP_RR_BANDWIDTH:
882       g_value_set_int (value, sess->rtcp_rr_bandwidth);
883       break;
884     case PROP_RTCP_RS_BANDWIDTH:
885       g_value_set_int (value, sess->rtcp_rs_bandwidth);
886       break;
887     case PROP_RTCP_MTU:
888       g_value_set_uint (value, sess->mtu);
889       break;
890     case PROP_SDES:
891       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
892       break;
893     case PROP_NUM_SOURCES:
894       g_value_set_uint (value, rtp_session_get_num_sources (sess));
895       break;
896     case PROP_NUM_ACTIVE_SOURCES:
897       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
898       break;
899     case PROP_SOURCES:
900       g_value_take_boxed (value, rtp_session_create_sources (sess));
901       break;
902     case PROP_FAVOR_NEW:
903       g_value_set_boolean (value, sess->favor_new);
904       break;
905     case PROP_RTCP_MIN_INTERVAL:
906       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
907       break;
908     case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
909       g_value_set_uint64 (value, sess->rtcp_feedback_retention_window);
910       break;
911     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
912       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
913       break;
914     case PROP_PROBATION:
915       g_value_set_uint (value, sess->probation);
916       break;
917     case PROP_MAX_DROPOUT_TIME:
918       g_value_set_uint (value, sess->max_dropout_time);
919       break;
920     case PROP_MAX_MISORDER_TIME:
921       g_value_set_uint (value, sess->max_misorder_time);
922       break;
923     case PROP_STATS:
924       g_value_take_boxed (value, rtp_session_create_stats (sess));
925       break;
926     case PROP_RTP_PROFILE:
927       g_value_set_enum (value, sess->rtp_profile);
928       break;
929     case PROP_RTCP_REDUCED_SIZE:
930       g_value_set_boolean (value, sess->reduced_size_rtcp);
931       break;
932     default:
933       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
934       break;
935   }
936 }
937
938 static void
939 on_new_ssrc (RTPSession * sess, RTPSource * source)
940 {
941   g_object_ref (source);
942   RTP_SESSION_UNLOCK (sess);
943   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
944   RTP_SESSION_LOCK (sess);
945   g_object_unref (source);
946 }
947
948 static void
949 on_ssrc_collision (RTPSession * sess, RTPSource * source)
950 {
951   g_object_ref (source);
952   RTP_SESSION_UNLOCK (sess);
953   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
954       source);
955   RTP_SESSION_LOCK (sess);
956   g_object_unref (source);
957 }
958
959 static void
960 on_ssrc_validated (RTPSession * sess, RTPSource * source)
961 {
962   g_object_ref (source);
963   RTP_SESSION_UNLOCK (sess);
964   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
965       source);
966   RTP_SESSION_LOCK (sess);
967   g_object_unref (source);
968 }
969
970 static void
971 on_ssrc_active (RTPSession * sess, RTPSource * source)
972 {
973   g_object_ref (source);
974   RTP_SESSION_UNLOCK (sess);
975   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
976   RTP_SESSION_LOCK (sess);
977   g_object_unref (source);
978 }
979
980 static void
981 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
982 {
983   g_object_ref (source);
984   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
985   RTP_SESSION_UNLOCK (sess);
986   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
987   RTP_SESSION_LOCK (sess);
988   g_object_unref (source);
989 }
990
991 static void
992 on_bye_ssrc (RTPSession * sess, RTPSource * source)
993 {
994   g_object_ref (source);
995   RTP_SESSION_UNLOCK (sess);
996   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
997   RTP_SESSION_LOCK (sess);
998   g_object_unref (source);
999 }
1000
1001 static void
1002 on_bye_timeout (RTPSession * sess, RTPSource * source)
1003 {
1004   g_object_ref (source);
1005   RTP_SESSION_UNLOCK (sess);
1006   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
1007   RTP_SESSION_LOCK (sess);
1008   g_object_unref (source);
1009 }
1010
1011 static void
1012 on_timeout (RTPSession * sess, RTPSource * source)
1013 {
1014   g_object_ref (source);
1015   RTP_SESSION_UNLOCK (sess);
1016   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
1017   RTP_SESSION_LOCK (sess);
1018   g_object_unref (source);
1019 }
1020
1021 static void
1022 on_sender_timeout (RTPSession * sess, RTPSource * source)
1023 {
1024   g_object_ref (source);
1025   RTP_SESSION_UNLOCK (sess);
1026   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
1027       source);
1028   RTP_SESSION_LOCK (sess);
1029   g_object_unref (source);
1030 }
1031
1032 static void
1033 on_new_sender_ssrc (RTPSession * sess, RTPSource * source)
1034 {
1035   g_object_ref (source);
1036   RTP_SESSION_UNLOCK (sess);
1037   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
1038       source);
1039   RTP_SESSION_LOCK (sess);
1040   g_object_unref (source);
1041 }
1042
1043 static void
1044 on_sender_ssrc_active (RTPSession * sess, RTPSource * source)
1045 {
1046   g_object_ref (source);
1047   RTP_SESSION_UNLOCK (sess);
1048   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
1049       source);
1050   RTP_SESSION_LOCK (sess);
1051   g_object_unref (source);
1052 }
1053
1054 /**
1055  * rtp_session_new:
1056  *
1057  * Create a new session object.
1058  *
1059  * Returns: a new #RTPSession. g_object_unref() after usage.
1060  */
1061 RTPSession *
1062 rtp_session_new (void)
1063 {
1064   RTPSession *sess;
1065
1066   sess = g_object_new (RTP_TYPE_SESSION, NULL);
1067
1068   return sess;
1069 }
1070
1071 /**
1072  * rtp_session_reset:
1073  * @sess: an #RTPSession
1074  *
1075  * Reset the sources of @sess.
1076  */
1077 void
1078 rtp_session_reset (RTPSession * sess)
1079 {
1080   g_return_if_fail (RTP_IS_SESSION (sess));
1081
1082   /* remove all sources */
1083   g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]);
1084   sess->total_sources = 0;
1085   sess->stats.sender_sources = 0;
1086   sess->stats.internal_sender_sources = 0;
1087   sess->stats.internal_sources = 0;
1088   sess->stats.active_sources = 0;
1089
1090   sess->generation = 0;
1091   sess->first_rtcp = TRUE;
1092   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
1093   sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
1094   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
1095   sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
1096   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
1097   sess->scheduled_bye = FALSE;
1098
1099   /* reset session stats */
1100   sess->stats.bye_members = 0;
1101   sess->stats.nacks_dropped = 0;
1102   sess->stats.nacks_sent = 0;
1103   sess->stats.nacks_received = 0;
1104
1105   sess->is_doing_ptp = TRUE;
1106
1107   g_list_free_full (sess->conflicting_addresses,
1108       (GDestroyNotify) rtp_conflicting_address_free);
1109   sess->conflicting_addresses = NULL;
1110 }
1111
1112 /**
1113  * rtp_session_set_callbacks:
1114  * @sess: an #RTPSession
1115  * @callbacks: callbacks to configure
1116  * @user_data: user data passed in the callbacks
1117  *
1118  * Configure a set of callbacks to be notified of actions.
1119  */
1120 void
1121 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
1122     gpointer user_data)
1123 {
1124   g_return_if_fail (RTP_IS_SESSION (sess));
1125
1126   if (callbacks->process_rtp) {
1127     sess->callbacks.process_rtp = callbacks->process_rtp;
1128     sess->process_rtp_user_data = user_data;
1129   }
1130   if (callbacks->send_rtp) {
1131     sess->callbacks.send_rtp = callbacks->send_rtp;
1132     sess->send_rtp_user_data = user_data;
1133   }
1134   if (callbacks->send_rtcp) {
1135     sess->callbacks.send_rtcp = callbacks->send_rtcp;
1136     sess->send_rtcp_user_data = user_data;
1137   }
1138   if (callbacks->sync_rtcp) {
1139     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
1140     sess->sync_rtcp_user_data = user_data;
1141   }
1142   if (callbacks->clock_rate) {
1143     sess->callbacks.clock_rate = callbacks->clock_rate;
1144     sess->clock_rate_user_data = user_data;
1145   }
1146   if (callbacks->reconsider) {
1147     sess->callbacks.reconsider = callbacks->reconsider;
1148     sess->reconsider_user_data = user_data;
1149   }
1150   if (callbacks->request_key_unit) {
1151     sess->callbacks.request_key_unit = callbacks->request_key_unit;
1152     sess->request_key_unit_user_data = user_data;
1153   }
1154   if (callbacks->request_time) {
1155     sess->callbacks.request_time = callbacks->request_time;
1156     sess->request_time_user_data = user_data;
1157   }
1158   if (callbacks->notify_nack) {
1159     sess->callbacks.notify_nack = callbacks->notify_nack;
1160     sess->notify_nack_user_data = user_data;
1161   }
1162   if (callbacks->reconfigure) {
1163     sess->callbacks.reconfigure = callbacks->reconfigure;
1164     sess->reconfigure_user_data = user_data;
1165   }
1166   if (callbacks->notify_early_rtcp) {
1167     sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp;
1168     sess->notify_early_rtcp_user_data = user_data;
1169   }
1170 }
1171
1172 /**
1173  * rtp_session_set_process_rtp_callback:
1174  * @sess: an #RTPSession
1175  * @callback: callback to set
1176  * @user_data: user data passed in the callback
1177  *
1178  * Configure only the process_rtp callback to be notified of the process_rtp action.
1179  */
1180 void
1181 rtp_session_set_process_rtp_callback (RTPSession * sess,
1182     RTPSessionProcessRTP callback, gpointer user_data)
1183 {
1184   g_return_if_fail (RTP_IS_SESSION (sess));
1185
1186   sess->callbacks.process_rtp = callback;
1187   sess->process_rtp_user_data = user_data;
1188 }
1189
1190 /**
1191  * rtp_session_set_send_rtp_callback:
1192  * @sess: an #RTPSession
1193  * @callback: callback to set
1194  * @user_data: user data passed in the callback
1195  *
1196  * Configure only the send_rtp callback to be notified of the send_rtp action.
1197  */
1198 void
1199 rtp_session_set_send_rtp_callback (RTPSession * sess,
1200     RTPSessionSendRTP callback, gpointer user_data)
1201 {
1202   g_return_if_fail (RTP_IS_SESSION (sess));
1203
1204   sess->callbacks.send_rtp = callback;
1205   sess->send_rtp_user_data = user_data;
1206 }
1207
1208 /**
1209  * rtp_session_set_send_rtcp_callback:
1210  * @sess: an #RTPSession
1211  * @callback: callback to set
1212  * @user_data: user data passed in the callback
1213  *
1214  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
1215  */
1216 void
1217 rtp_session_set_send_rtcp_callback (RTPSession * sess,
1218     RTPSessionSendRTCP callback, gpointer user_data)
1219 {
1220   g_return_if_fail (RTP_IS_SESSION (sess));
1221
1222   sess->callbacks.send_rtcp = callback;
1223   sess->send_rtcp_user_data = user_data;
1224 }
1225
1226 /**
1227  * rtp_session_set_sync_rtcp_callback:
1228  * @sess: an #RTPSession
1229  * @callback: callback to set
1230  * @user_data: user data passed in the callback
1231  *
1232  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
1233  */
1234 void
1235 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
1236     RTPSessionSyncRTCP callback, gpointer user_data)
1237 {
1238   g_return_if_fail (RTP_IS_SESSION (sess));
1239
1240   sess->callbacks.sync_rtcp = callback;
1241   sess->sync_rtcp_user_data = user_data;
1242 }
1243
1244 /**
1245  * rtp_session_set_clock_rate_callback:
1246  * @sess: an #RTPSession
1247  * @callback: callback to set
1248  * @user_data: user data passed in the callback
1249  *
1250  * Configure only the clock_rate callback to be notified of the clock_rate action.
1251  */
1252 void
1253 rtp_session_set_clock_rate_callback (RTPSession * sess,
1254     RTPSessionClockRate callback, gpointer user_data)
1255 {
1256   g_return_if_fail (RTP_IS_SESSION (sess));
1257
1258   sess->callbacks.clock_rate = callback;
1259   sess->clock_rate_user_data = user_data;
1260 }
1261
1262 /**
1263  * rtp_session_set_reconsider_callback:
1264  * @sess: an #RTPSession
1265  * @callback: callback to set
1266  * @user_data: user data passed in the callback
1267  *
1268  * Configure only the reconsider callback to be notified of the reconsider action.
1269  */
1270 void
1271 rtp_session_set_reconsider_callback (RTPSession * sess,
1272     RTPSessionReconsider callback, gpointer user_data)
1273 {
1274   g_return_if_fail (RTP_IS_SESSION (sess));
1275
1276   sess->callbacks.reconsider = callback;
1277   sess->reconsider_user_data = user_data;
1278 }
1279
1280 /**
1281  * rtp_session_set_request_time_callback:
1282  * @sess: an #RTPSession
1283  * @callback: callback to set
1284  * @user_data: user data passed in the callback
1285  *
1286  * Configure only the request_time callback
1287  */
1288 void
1289 rtp_session_set_request_time_callback (RTPSession * sess,
1290     RTPSessionRequestTime callback, gpointer user_data)
1291 {
1292   g_return_if_fail (RTP_IS_SESSION (sess));
1293
1294   sess->callbacks.request_time = callback;
1295   sess->request_time_user_data = user_data;
1296 }
1297
1298 /**
1299  * rtp_session_set_bandwidth:
1300  * @sess: an #RTPSession
1301  * @bandwidth: the bandwidth allocated
1302  *
1303  * Set the session bandwidth in bits per second.
1304  */
1305 void
1306 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1307 {
1308   g_return_if_fail (RTP_IS_SESSION (sess));
1309
1310   RTP_SESSION_LOCK (sess);
1311   sess->stats.bandwidth = bandwidth;
1312   RTP_SESSION_UNLOCK (sess);
1313 }
1314
1315 /**
1316  * rtp_session_get_bandwidth:
1317  * @sess: an #RTPSession
1318  *
1319  * Get the session bandwidth.
1320  *
1321  * Returns: the session bandwidth.
1322  */
1323 gdouble
1324 rtp_session_get_bandwidth (RTPSession * sess)
1325 {
1326   gdouble result;
1327
1328   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1329
1330   RTP_SESSION_LOCK (sess);
1331   result = sess->stats.bandwidth;
1332   RTP_SESSION_UNLOCK (sess);
1333
1334   return result;
1335 }
1336
1337 /**
1338  * rtp_session_set_rtcp_fraction:
1339  * @sess: an #RTPSession
1340  * @bandwidth: the RTCP bandwidth
1341  *
1342  * Set the bandwidth in bits per second that should be used for RTCP
1343  * messages.
1344  */
1345 void
1346 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1347 {
1348   g_return_if_fail (RTP_IS_SESSION (sess));
1349
1350   RTP_SESSION_LOCK (sess);
1351   sess->stats.rtcp_bandwidth = bandwidth;
1352   RTP_SESSION_UNLOCK (sess);
1353 }
1354
1355 /**
1356  * rtp_session_get_rtcp_fraction:
1357  * @sess: an #RTPSession
1358  *
1359  * Get the session bandwidth used for RTCP.
1360  *
1361  * Returns: The bandwidth used for RTCP messages.
1362  */
1363 gdouble
1364 rtp_session_get_rtcp_fraction (RTPSession * sess)
1365 {
1366   gdouble result;
1367
1368   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1369
1370   RTP_SESSION_LOCK (sess);
1371   result = sess->stats.rtcp_bandwidth;
1372   RTP_SESSION_UNLOCK (sess);
1373
1374   return result;
1375 }
1376
1377 /**
1378  * rtp_session_get_sdes_struct:
1379  * @sess: an #RTSPSession
1380  *
1381  * Get the SDES data as a #GstStructure
1382  *
1383  * Returns: a GstStructure with SDES items for @sess. This function returns a
1384  * copy of the SDES structure, use gst_structure_free() after usage.
1385  */
1386 GstStructure *
1387 rtp_session_get_sdes_struct (RTPSession * sess)
1388 {
1389   GstStructure *result = NULL;
1390
1391   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1392
1393   RTP_SESSION_LOCK (sess);
1394   if (sess->sdes)
1395     result = gst_structure_copy (sess->sdes);
1396   RTP_SESSION_UNLOCK (sess);
1397
1398   return result;
1399 }
1400
1401 static void
1402 source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes)
1403 {
1404   rtp_source_set_sdes_struct (source, gst_structure_copy (sdes));
1405 }
1406
1407 /**
1408  * rtp_session_set_sdes_struct:
1409  * @sess: an #RTSPSession
1410  * @sdes: a #GstStructure
1411  *
1412  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1413  */
1414 void
1415 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1416 {
1417   g_return_if_fail (sdes);
1418   g_return_if_fail (RTP_IS_SESSION (sess));
1419
1420   RTP_SESSION_LOCK (sess);
1421   if (sess->sdes)
1422     gst_structure_free (sess->sdes);
1423   sess->sdes = gst_structure_copy (sdes);
1424
1425   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1426       (GHFunc) source_set_sdes, sess->sdes);
1427   RTP_SESSION_UNLOCK (sess);
1428 }
1429
1430 static GstFlowReturn
1431 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1432 {
1433   GstFlowReturn result = GST_FLOW_OK;
1434
1435   if (source->internal) {
1436     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1437
1438     RTP_SESSION_UNLOCK (session);
1439
1440     if (session->callbacks.send_rtp)
1441       result =
1442           session->callbacks.send_rtp (session, source, data,
1443           session->send_rtp_user_data);
1444     else {
1445       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1446     }
1447   } else {
1448     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1449     RTP_SESSION_UNLOCK (session);
1450
1451     if (session->callbacks.process_rtp)
1452       result =
1453           session->callbacks.process_rtp (session, source,
1454           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1455     else
1456       gst_buffer_unref (GST_BUFFER_CAST (data));
1457   }
1458   RTP_SESSION_LOCK (session);
1459
1460   return result;
1461 }
1462
1463 static gint
1464 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1465 {
1466   gint result;
1467
1468   RTP_SESSION_UNLOCK (session);
1469
1470   if (session->callbacks.clock_rate)
1471     result =
1472         session->callbacks.clock_rate (session, pt,
1473         session->clock_rate_user_data);
1474   else
1475     result = -1;
1476
1477   RTP_SESSION_LOCK (session);
1478
1479   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1480
1481   return result;
1482 }
1483
1484 static RTPSourceCallbacks callbacks = {
1485   (RTPSourcePushRTP) source_push_rtp,
1486   (RTPSourceClockRate) source_clock_rate,
1487 };
1488
1489
1490 /**
1491  * rtp_session_find_conflicting_address:
1492  * @session: The session the packet came in
1493  * @address: address to check for
1494  * @time: The time when the packet that is possibly in conflict arrived
1495  *
1496  * Checks if an address which has a conflict is already known. If it is
1497  * a known conflict, remember the time
1498  *
1499  * Returns: TRUE if it was a known conflict, FALSE otherwise
1500  */
1501 static gboolean
1502 rtp_session_find_conflicting_address (RTPSession * session,
1503     GSocketAddress * address, GstClockTime time)
1504 {
1505   return find_conflicting_address (session->conflicting_addresses, address,
1506       time);
1507 }
1508
1509 /**
1510  * rtp_session_add_conflicting_address:
1511  * @session: The session the packet came in
1512  * @address: address to remember
1513  * @time: The time when the packet that is in conflict arrived
1514  *
1515  * Adds a new conflict address
1516  */
1517 static void
1518 rtp_session_add_conflicting_address (RTPSession * sess,
1519     GSocketAddress * address, GstClockTime time)
1520 {
1521   sess->conflicting_addresses =
1522       add_conflicting_address (sess->conflicting_addresses, address, time);
1523 }
1524
1525
1526 static gboolean
1527 check_collision (RTPSession * sess, RTPSource * source,
1528     RTPPacketInfo * pinfo, gboolean rtp)
1529 {
1530   guint32 ssrc;
1531
1532   /* If we have no pinfo address, we can't do collision checking */
1533   if (!pinfo->address)
1534     return FALSE;
1535
1536   ssrc = rtp_source_get_ssrc (source);
1537
1538   if (!source->internal) {
1539     GSocketAddress *from;
1540
1541     /* This is not our local source, but lets check if two remote
1542      * source collide */
1543     if (rtp) {
1544       from = source->rtp_from;
1545     } else {
1546       from = source->rtcp_from;
1547     }
1548
1549     if (from) {
1550       if (__g_socket_address_equal (from, pinfo->address)) {
1551         /* Address is the same */
1552         return FALSE;
1553       } else {
1554         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1555         if (sess->favor_new) {
1556           if (rtp_source_find_conflicting_address (source,
1557                   pinfo->address, pinfo->current_time)) {
1558             gchar *buf1;
1559
1560             buf1 = __g_socket_address_to_string (pinfo->address);
1561             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1562                 buf1);
1563             g_free (buf1);
1564
1565             return TRUE;
1566           } else {
1567             gchar *buf1, *buf2;
1568
1569             /* Current address is not a known conflict, lets assume this is
1570              * a new source. Save old address in possible conflict list
1571              */
1572             rtp_source_add_conflicting_address (source, from,
1573                 pinfo->current_time);
1574
1575             buf1 = __g_socket_address_to_string (from);
1576             buf2 = __g_socket_address_to_string (pinfo->address);
1577
1578             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1579                 " saving old as known conflict", ssrc, buf1, buf2);
1580
1581             if (rtp)
1582               rtp_source_set_rtp_from (source, pinfo->address);
1583             else
1584               rtp_source_set_rtcp_from (source, pinfo->address);
1585
1586             g_free (buf1);
1587             g_free (buf2);
1588
1589             return FALSE;
1590           }
1591         } else {
1592           /* Don't need to save old addresses, we ignore new sources */
1593           return TRUE;
1594         }
1595       }
1596     } else {
1597       /* We don't already have a from address for RTP, just set it */
1598       if (rtp)
1599         rtp_source_set_rtp_from (source, pinfo->address);
1600       else
1601         rtp_source_set_rtcp_from (source, pinfo->address);
1602       return FALSE;
1603     }
1604
1605     /* FIXME: Log 3rd party collision somehow
1606      * Maybe should be done in upper layer, only the SDES can tell us
1607      * if its a collision or a loop
1608      */
1609   } else {
1610     /* This is sending with our ssrc, is it an address we already know */
1611     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1612             pinfo->current_time)) {
1613       /* Its a known conflict, its probably a loop, not a collision
1614        * lets just drop the incoming packet
1615        */
1616       GST_DEBUG ("Our packets are being looped back to us, dropping");
1617     } else {
1618       /* Its a new collision, lets change our SSRC */
1619       rtp_session_add_conflicting_address (sess, pinfo->address,
1620           pinfo->current_time);
1621
1622       GST_DEBUG ("Collision for SSRC %x", ssrc);
1623       /* mark the source BYE */
1624       rtp_source_mark_bye (source, "SSRC Collision");
1625       /* if we were suggesting this SSRC, change to something else */
1626       if (sess->suggested_ssrc == ssrc) {
1627         sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1628         sess->internal_ssrc_set = TRUE;
1629       }
1630
1631       on_ssrc_collision (sess, source);
1632
1633       rtp_session_schedule_bye_locked (sess, pinfo->current_time);
1634     }
1635   }
1636
1637   return TRUE;
1638 }
1639
1640 typedef struct
1641 {
1642   gboolean is_doing_ptp;
1643   GSocketAddress *new_addr;
1644 } CompareAddrData;
1645
1646 /* check if the two given ip addr are the same (do not care about the port) */
1647 static gboolean
1648 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1649 {
1650   return
1651       g_inet_address_equal (g_inet_socket_address_get_address
1652       (G_INET_SOCKET_ADDRESS (a)),
1653       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1654 }
1655
1656 static void
1657 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1658     CompareAddrData * data)
1659 {
1660   /* only compare ip addr of remote sources which are also not closing */
1661   if (!source->internal && !source->closing && source->rtp_from) {
1662     /* look for the first rtp source */
1663     if (!data->new_addr)
1664       data->new_addr = source->rtp_from;
1665     /* compare current ip addr with the first one */
1666     else
1667       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1668   }
1669 }
1670
1671 static void
1672 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1673     CompareAddrData * data)
1674 {
1675   /* only compare ip addr of remote sources which are also not closing */
1676   if (!source->internal && !source->closing && source->rtcp_from) {
1677     /* look for the first rtcp source */
1678     if (!data->new_addr)
1679       data->new_addr = source->rtcp_from;
1680     else
1681       /* compare current ip addr with the first one */
1682       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1683   }
1684 }
1685
1686 /* loop over our non-internal source to know if the session
1687  * is doing point-to-point */
1688 static void
1689 session_update_ptp (RTPSession * sess)
1690 {
1691   /* to know if the session is doing point to point, the ip addr
1692    * of each non-internal (=remotes) source have to be compared
1693    * to each other.
1694    */
1695   gboolean is_doing_rtp_ptp;
1696   gboolean is_doing_rtcp_ptp;
1697   CompareAddrData data;
1698
1699   /* compare the first remote source's ip addr that receive rtp packets
1700    * with other remote rtp source.
1701    * it's enough because the session just needs to know if they are all
1702    * equals or not
1703    */
1704   data.is_doing_ptp = TRUE;
1705   data.new_addr = NULL;
1706   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1707       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1708   is_doing_rtp_ptp = data.is_doing_ptp;
1709
1710   /* same but about rtcp */
1711   data.is_doing_ptp = TRUE;
1712   data.new_addr = NULL;
1713   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1714       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1715   is_doing_rtcp_ptp = data.is_doing_ptp;
1716
1717   /* the session is doing point-to-point if all rtp remote have the same
1718    * ip addr and if all rtcp remote sources have the same ip addr */
1719   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1720
1721   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1722 }
1723
1724 static void
1725 add_source (RTPSession * sess, RTPSource * src)
1726 {
1727   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1728       GINT_TO_POINTER (src->ssrc), src);
1729   /* report the new source ASAP */
1730   src->generation = sess->generation;
1731   /* we have one more source now */
1732   sess->total_sources++;
1733   if (RTP_SOURCE_IS_ACTIVE (src))
1734     sess->stats.active_sources++;
1735   if (src->internal) {
1736     sess->stats.internal_sources++;
1737     if (!sess->internal_ssrc_from_caps_or_property
1738         && sess->suggested_ssrc != src->ssrc) {
1739       sess->suggested_ssrc = src->ssrc;
1740       sess->internal_ssrc_set = TRUE;
1741     }
1742   }
1743
1744   /* update point-to-point status */
1745   if (!src->internal)
1746     session_update_ptp (sess);
1747 }
1748
1749 static RTPSource *
1750 find_source (RTPSession * sess, guint32 ssrc)
1751 {
1752   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1753       GINT_TO_POINTER (ssrc));
1754 }
1755
1756 /* must be called with the session lock, the returned source needs to be
1757  * unreffed after usage. */
1758 static RTPSource *
1759 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1760     RTPPacketInfo * pinfo, gboolean rtp)
1761 {
1762   RTPSource *source;
1763
1764   source = find_source (sess, ssrc);
1765   if (source == NULL) {
1766     /* make new Source in probation and insert */
1767     source = rtp_source_new (ssrc);
1768
1769     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1770
1771     /* for RTP packets we need to set the source in probation. Receiving RTCP
1772      * packets of an SSRC, on the other hand, is a strong indication that we
1773      * are dealing with a valid source. */
1774     g_object_set (source, "probation", rtp ? sess->probation : 0,
1775         "max-dropout-time", sess->max_dropout_time, "max-misorder-time",
1776         sess->max_misorder_time, NULL);
1777
1778     /* store from address, if any */
1779     if (pinfo->address) {
1780       if (rtp)
1781         rtp_source_set_rtp_from (source, pinfo->address);
1782       else
1783         rtp_source_set_rtcp_from (source, pinfo->address);
1784     }
1785
1786     /* configure a callback on the source */
1787     rtp_source_set_callbacks (source, &callbacks, sess);
1788
1789     add_source (sess, source);
1790     *created = TRUE;
1791   } else {
1792     *created = FALSE;
1793     /* check for collision, this updates the address when not previously set */
1794     if (check_collision (sess, source, pinfo, rtp)) {
1795       return NULL;
1796     }
1797     /* Receiving RTCP packets of an SSRC is a strong indication that we
1798      * are dealing with a valid source. */
1799     if (!rtp)
1800       g_object_set (source, "probation", 0, NULL);
1801   }
1802   /* update last activity */
1803   source->last_activity = pinfo->current_time;
1804   if (rtp)
1805     source->last_rtp_activity = pinfo->current_time;
1806   g_object_ref (source);
1807
1808   return source;
1809 }
1810
1811 /* must be called with the session lock, the returned source needs to be
1812  * unreffed after usage. */
1813 static RTPSource *
1814 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1815     GstClockTime current_time)
1816 {
1817   RTPSource *source;
1818
1819   source = find_source (sess, ssrc);
1820   if (source == NULL) {
1821     /* make new internal Source and insert */
1822     source = rtp_source_new (ssrc);
1823
1824     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1825
1826     source->validated = TRUE;
1827     source->internal = TRUE;
1828     source->probation = FALSE;
1829     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1830     rtp_source_set_callbacks (source, &callbacks, sess);
1831
1832     add_source (sess, source);
1833     *created = TRUE;
1834   } else {
1835     *created = FALSE;
1836   }
1837   /* update last activity */
1838   if (current_time != GST_CLOCK_TIME_NONE) {
1839     source->last_activity = current_time;
1840     source->last_rtp_activity = current_time;
1841   }
1842   g_object_ref (source);
1843
1844   return source;
1845 }
1846
1847 /**
1848  * rtp_session_suggest_ssrc:
1849  * @sess: a #RTPSession
1850  * @is_random: if the suggested ssrc is random
1851  *
1852  * Suggest an unused SSRC in @sess.
1853  *
1854  * Returns: a free unused SSRC
1855  */
1856 guint32
1857 rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random)
1858 {
1859   guint32 result;
1860
1861   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1862
1863   RTP_SESSION_LOCK (sess);
1864   result = sess->suggested_ssrc;
1865   if (is_random)
1866     *is_random = !sess->internal_ssrc_set;
1867   RTP_SESSION_UNLOCK (sess);
1868
1869   return result;
1870 }
1871
1872 /**
1873  * rtp_session_add_source:
1874  * @sess: a #RTPSession
1875  * @src: #RTPSource to add
1876  *
1877  * Add @src to @session.
1878  *
1879  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1880  * existed in the session.
1881  */
1882 gboolean
1883 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1884 {
1885   gboolean result = FALSE;
1886   RTPSource *find;
1887
1888   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1889   g_return_val_if_fail (src != NULL, FALSE);
1890
1891   RTP_SESSION_LOCK (sess);
1892   find = find_source (sess, src->ssrc);
1893   if (find == NULL) {
1894     add_source (sess, src);
1895     result = TRUE;
1896   }
1897   RTP_SESSION_UNLOCK (sess);
1898
1899   return result;
1900 }
1901
1902 /**
1903  * rtp_session_get_num_sources:
1904  * @sess: an #RTPSession
1905  *
1906  * Get the number of sources in @sess.
1907  *
1908  * Returns: The number of sources in @sess.
1909  */
1910 guint
1911 rtp_session_get_num_sources (RTPSession * sess)
1912 {
1913   guint result;
1914
1915   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1916
1917   RTP_SESSION_LOCK (sess);
1918   result = sess->total_sources;
1919   RTP_SESSION_UNLOCK (sess);
1920
1921   return result;
1922 }
1923
1924 /**
1925  * rtp_session_get_num_active_sources:
1926  * @sess: an #RTPSession
1927  *
1928  * Get the number of active sources in @sess. A source is considered active when
1929  * it has been validated and has not yet received a BYE RTCP message.
1930  *
1931  * Returns: The number of active sources in @sess.
1932  */
1933 guint
1934 rtp_session_get_num_active_sources (RTPSession * sess)
1935 {
1936   guint result;
1937
1938   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1939
1940   RTP_SESSION_LOCK (sess);
1941   result = sess->stats.active_sources;
1942   RTP_SESSION_UNLOCK (sess);
1943
1944   return result;
1945 }
1946
1947 /**
1948  * rtp_session_get_source_by_ssrc:
1949  * @sess: an #RTPSession
1950  * @ssrc: an SSRC
1951  *
1952  * Find the source with @ssrc in @sess.
1953  *
1954  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1955  * g_object_unref() after usage.
1956  */
1957 RTPSource *
1958 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1959 {
1960   RTPSource *result;
1961
1962   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1963
1964   RTP_SESSION_LOCK (sess);
1965   result = find_source (sess, ssrc);
1966   if (result != NULL)
1967     g_object_ref (result);
1968   RTP_SESSION_UNLOCK (sess);
1969
1970   return result;
1971 }
1972
1973 /* should be called with the SESSION lock */
1974 static guint32
1975 rtp_session_create_new_ssrc (RTPSession * sess)
1976 {
1977   guint32 ssrc;
1978
1979   while (TRUE) {
1980     ssrc = g_random_int ();
1981
1982     /* see if it exists in the session, we're done if it doesn't */
1983     if (find_source (sess, ssrc) == NULL)
1984       break;
1985   }
1986   return ssrc;
1987 }
1988
1989 static gboolean
1990 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
1991 {
1992   GstNetAddressMeta *meta;
1993
1994   /* get packet size including header overhead */
1995   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
1996   pinfo->packets++;
1997
1998   if (pinfo->rtp) {
1999     GstRTPBuffer rtp = { NULL };
2000
2001     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
2002       goto invalid_packet;
2003
2004     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
2005     if (idx == 0) {
2006       gint i;
2007
2008       /* only keep info for first buffer */
2009       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
2010       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
2011       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
2012       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2013       /* copy available csrc */
2014       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
2015       for (i = 0; i < pinfo->csrc_count; i++)
2016         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
2017     }
2018     gst_rtp_buffer_unmap (&rtp);
2019   }
2020
2021   if (idx == 0) {
2022     /* for netbuffer we can store the IP address to check for collisions */
2023     meta = gst_buffer_get_net_address_meta (*buffer);
2024     if (pinfo->address)
2025       g_object_unref (pinfo->address);
2026     if (meta) {
2027       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
2028     } else {
2029       pinfo->address = NULL;
2030     }
2031   }
2032   return TRUE;
2033
2034   /* ERRORS */
2035 invalid_packet:
2036   {
2037     GST_DEBUG ("invalid RTP packet received");
2038     return FALSE;
2039   }
2040 }
2041
2042 /* update the RTPPacketInfo structure with the current time and other bits
2043  * about the current buffer we are handling.
2044  * This function is typically called when a validated packet is received.
2045  * This function should be called with the RTP_SESSION_LOCK
2046  */
2047 static gboolean
2048 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
2049     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
2050     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2051 {
2052   gboolean res;
2053
2054   pinfo->send = send;
2055   pinfo->rtp = rtp;
2056   pinfo->is_list = is_list;
2057   pinfo->data = data;
2058   pinfo->current_time = current_time;
2059   pinfo->running_time = running_time;
2060   pinfo->ntpnstime = ntpnstime;
2061   pinfo->header_len = sess->header_len;
2062   pinfo->bytes = 0;
2063   pinfo->payload_len = 0;
2064   pinfo->packets = 0;
2065
2066   if (is_list) {
2067     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
2068     res =
2069         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
2070         pinfo);
2071   } else {
2072     GstBuffer *buffer = GST_BUFFER_CAST (data);
2073     res = update_packet (&buffer, 0, pinfo);
2074   }
2075   return res;
2076 }
2077
2078 static void
2079 clean_packet_info (RTPPacketInfo * pinfo)
2080 {
2081   if (pinfo->address)
2082     g_object_unref (pinfo->address);
2083   if (pinfo->data) {
2084     gst_mini_object_unref (pinfo->data);
2085     pinfo->data = NULL;
2086   }
2087 }
2088
2089 static gboolean
2090 source_update_active (RTPSession * sess, RTPSource * source,
2091     gboolean prevactive)
2092 {
2093   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
2094   guint32 ssrc = source->ssrc;
2095
2096   if (prevactive == active)
2097     return FALSE;
2098
2099   if (active) {
2100     sess->stats.active_sources++;
2101     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
2102         sess->stats.active_sources);
2103   } else {
2104     sess->stats.active_sources--;
2105     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
2106         sess->stats.active_sources);
2107   }
2108   return TRUE;
2109 }
2110
2111 static gboolean
2112 source_update_sender (RTPSession * sess, RTPSource * source,
2113     gboolean prevsender)
2114 {
2115   gboolean sender = RTP_SOURCE_IS_SENDER (source);
2116   guint32 ssrc = source->ssrc;
2117
2118   if (prevsender == sender)
2119     return FALSE;
2120
2121   if (sender) {
2122     sess->stats.sender_sources++;
2123     if (source->internal)
2124       sess->stats.internal_sender_sources++;
2125     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
2126         sess->stats.sender_sources);
2127   } else {
2128     sess->stats.sender_sources--;
2129     if (source->internal)
2130       sess->stats.internal_sender_sources--;
2131     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
2132         sess->stats.sender_sources);
2133   }
2134   return TRUE;
2135 }
2136
2137 /**
2138  * rtp_session_process_rtp:
2139  * @sess: and #RTPSession
2140  * @buffer: an RTP buffer
2141  * @current_time: the current system time
2142  * @running_time: the running_time of @buffer
2143  *
2144  * Process an RTP buffer in the session manager. This function takes ownership
2145  * of @buffer.
2146  *
2147  * Returns: a #GstFlowReturn.
2148  */
2149 GstFlowReturn
2150 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
2151     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2152 {
2153   GstFlowReturn result;
2154   guint32 ssrc;
2155   RTPSource *source;
2156   gboolean created;
2157   gboolean prevsender, prevactive;
2158   RTPPacketInfo pinfo = { 0, };
2159   guint64 oldrate;
2160
2161   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2162   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2163
2164   RTP_SESSION_LOCK (sess);
2165
2166   /* update pinfo stats */
2167   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
2168           current_time, running_time, ntpnstime)) {
2169     GST_DEBUG ("invalid RTP packet received");
2170     RTP_SESSION_UNLOCK (sess);
2171     return rtp_session_process_rtcp (sess, buffer, current_time, running_time,
2172         ntpnstime);
2173   }
2174
2175   ssrc = pinfo.ssrc;
2176
2177   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
2178   if (!source)
2179     goto collision;
2180
2181   prevsender = RTP_SOURCE_IS_SENDER (source);
2182   prevactive = RTP_SOURCE_IS_ACTIVE (source);
2183   oldrate = source->bitrate;
2184
2185   /* let source process the packet */
2186   result = rtp_source_process_rtp (source, &pinfo);
2187
2188   /* source became active */
2189   if (source_update_active (sess, source, prevactive))
2190     on_ssrc_validated (sess, source);
2191
2192   source_update_sender (sess, source, prevsender);
2193
2194   if (oldrate != source->bitrate)
2195     sess->recalc_bandwidth = TRUE;
2196
2197   if (created)
2198     on_new_ssrc (sess, source);
2199
2200   if (source->validated) {
2201     gboolean created;
2202     gint i;
2203
2204     /* for validated sources, we add the CSRCs as well */
2205     for (i = 0; i < pinfo.csrc_count; i++) {
2206       guint32 csrc;
2207       RTPSource *csrc_src;
2208
2209       csrc = pinfo.csrcs[i];
2210
2211       /* get source */
2212       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
2213       if (!csrc_src)
2214         continue;
2215
2216       if (created) {
2217         GST_DEBUG ("created new CSRC: %08x", csrc);
2218         rtp_source_set_as_csrc (csrc_src);
2219         source_update_active (sess, csrc_src, FALSE);
2220         on_new_ssrc (sess, csrc_src);
2221       }
2222       g_object_unref (csrc_src);
2223     }
2224   }
2225   g_object_unref (source);
2226
2227   RTP_SESSION_UNLOCK (sess);
2228
2229   clean_packet_info (&pinfo);
2230
2231   return result;
2232
2233   /* ERRORS */
2234 collision:
2235   {
2236     RTP_SESSION_UNLOCK (sess);
2237     clean_packet_info (&pinfo);
2238     GST_DEBUG ("ignoring packet because its collisioning");
2239     return GST_FLOW_OK;
2240   }
2241 }
2242
2243 static void
2244 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2245     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2246 {
2247   guint count, i;
2248
2249   count = gst_rtcp_packet_get_rb_count (packet);
2250   for (i = 0; i < count; i++) {
2251     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2252     guint8 fractionlost;
2253     gint32 packetslost;
2254     RTPSource *src;
2255
2256     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2257         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2258
2259     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2260
2261     /* find our own source */
2262     src = find_source (sess, ssrc);
2263     if (src == NULL)
2264       continue;
2265
2266     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2267       /* only deal with report blocks for our session, we update the stats of
2268        * the sender of the RTCP message. We could also compare our stats against
2269        * the other sender to see if we are better or worse. */
2270       /* FIXME, need to keep track who the RB block is from */
2271       rtp_source_process_rb (source, pinfo->ntpnstime, fractionlost,
2272           packetslost, exthighestseq, jitter, lsr, dlsr);
2273     }
2274   }
2275   on_ssrc_active (sess, source);
2276 }
2277
2278 /* A Sender report contains statistics about how the sender is doing. This
2279  * includes timing informataion such as the relation between RTP and NTP
2280  * timestamps and the number of packets/bytes it sent to us.
2281  *
2282  * In this report is also included a set of report blocks related to how this
2283  * sender is receiving data (in case we (or somebody else) is also sending stuff
2284  * to it). This info includes the packet loss, jitter and seqnum. It also
2285  * contains information to calculate the round trip time (LSR/DLSR).
2286  */
2287 static void
2288 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2289     RTPPacketInfo * pinfo, gboolean * do_sync)
2290 {
2291   guint32 senderssrc, rtptime, packet_count, octet_count;
2292   guint64 ntptime;
2293   RTPSource *source;
2294   gboolean created, prevsender;
2295
2296   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2297       &packet_count, &octet_count);
2298
2299   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2300       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2301
2302   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2303   if (!source)
2304     return;
2305
2306   /* skip non-bye packets for sources that are marked BYE */
2307   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2308     goto out;
2309
2310   /* don't try to do lip-sync for sources that sent a BYE */
2311   if (RTP_SOURCE_IS_MARKED_BYE (source))
2312     *do_sync = FALSE;
2313   else
2314     *do_sync = TRUE;
2315
2316   prevsender = RTP_SOURCE_IS_SENDER (source);
2317
2318   /* first update the source */
2319   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2320       packet_count, octet_count);
2321
2322   source_update_sender (sess, source, prevsender);
2323
2324   if (created)
2325     on_new_ssrc (sess, source);
2326
2327   rtp_session_process_rb (sess, source, packet, pinfo);
2328
2329 out:
2330   g_object_unref (source);
2331 }
2332
2333 /* A receiver report contains statistics about how a receiver is doing. It
2334  * includes stuff like packet loss, jitter and the seqnum it received last. It
2335  * also contains info to calculate the round trip time.
2336  *
2337  * We are only interested in how the sender of this report is doing wrt to us.
2338  */
2339 static void
2340 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2341     RTPPacketInfo * pinfo)
2342 {
2343   guint32 senderssrc;
2344   RTPSource *source;
2345   gboolean created;
2346
2347   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2348
2349   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2350
2351   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2352   if (!source)
2353     return;
2354
2355   /* skip non-bye packets for sources that are marked BYE */
2356   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2357     goto out;
2358
2359   if (created)
2360     on_new_ssrc (sess, source);
2361
2362   rtp_session_process_rb (sess, source, packet, pinfo);
2363
2364 out:
2365   g_object_unref (source);
2366 }
2367
2368 /* Get SDES items and store them in the SSRC */
2369 static void
2370 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2371     RTPPacketInfo * pinfo)
2372 {
2373   guint items, i, j;
2374   gboolean more_items, more_entries;
2375
2376   items = gst_rtcp_packet_sdes_get_item_count (packet);
2377   GST_DEBUG ("got SDES packet with %d items", items);
2378
2379   more_items = gst_rtcp_packet_sdes_first_item (packet);
2380   i = 0;
2381   while (more_items) {
2382     guint32 ssrc;
2383     gboolean changed, created, prevactive;
2384     RTPSource *source;
2385     GstStructure *sdes;
2386
2387     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2388
2389     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2390
2391     changed = FALSE;
2392
2393     /* find src, no probation when dealing with RTCP */
2394     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2395     if (!source)
2396       return;
2397
2398     /* skip non-bye packets for sources that are marked BYE */
2399     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2400       goto next;
2401
2402     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2403
2404     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2405     j = 0;
2406     while (more_entries) {
2407       GstRTCPSDESType type;
2408       guint8 len;
2409       guint8 *data;
2410       gchar *name;
2411       gchar *value;
2412
2413       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2414
2415       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2416           data);
2417
2418       if (type == GST_RTCP_SDES_PRIV) {
2419         name = g_strndup ((const gchar *) &data[1], data[0]);
2420         len -= data[0] + 1;
2421         data += data[0] + 1;
2422       } else {
2423         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2424       }
2425
2426       value = g_strndup ((const gchar *) data, len);
2427
2428       if (g_utf8_validate (value, -1, NULL)) {
2429         gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2430       } else {
2431         GST_WARNING ("ignore SDES field %s with non-utf8 data %s", name, value);
2432       }
2433
2434       g_free (name);
2435       g_free (value);
2436
2437       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2438       j++;
2439     }
2440
2441     /* takes ownership of sdes */
2442     changed = rtp_source_set_sdes_struct (source, sdes);
2443
2444     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2445     source->validated = TRUE;
2446
2447     if (created)
2448       on_new_ssrc (sess, source);
2449
2450     /* source became active */
2451     if (source_update_active (sess, source, prevactive))
2452       on_ssrc_validated (sess, source);
2453
2454     if (changed)
2455       on_ssrc_sdes (sess, source);
2456
2457   next:
2458     g_object_unref (source);
2459
2460     more_items = gst_rtcp_packet_sdes_next_item (packet);
2461     i++;
2462   }
2463 }
2464
2465 /* BYE is sent when a client leaves the session
2466  */
2467 static void
2468 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2469     RTPPacketInfo * pinfo)
2470 {
2471   guint count, i;
2472   gchar *reason;
2473   gboolean reconsider = FALSE;
2474
2475   reason = gst_rtcp_packet_bye_get_reason (packet);
2476   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2477
2478   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2479   for (i = 0; i < count; i++) {
2480     guint32 ssrc;
2481     RTPSource *source;
2482     gboolean prevactive, prevsender;
2483     guint pmembers, members;
2484
2485     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2486     GST_DEBUG ("SSRC: %08x", ssrc);
2487
2488     /* find src and mark bye, no probation when dealing with RTCP */
2489     source = find_source (sess, ssrc);
2490     if (!source || source->internal) {
2491       GST_DEBUG ("Ignoring suspicious BYE packet (reason: %s)",
2492           !source ? "can't find source" : "has internal source SSRC");
2493       break;
2494     }
2495
2496     /* store time for when we need to time out this source */
2497     source->bye_time = pinfo->current_time;
2498
2499     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2500     prevsender = RTP_SOURCE_IS_SENDER (source);
2501
2502     /* mark the source BYE */
2503     rtp_source_mark_bye (source, reason);
2504
2505     pmembers = sess->stats.active_sources;
2506
2507     source_update_active (sess, source, prevactive);
2508     source_update_sender (sess, source, prevsender);
2509
2510     members = sess->stats.active_sources;
2511
2512     if (!sess->scheduled_bye && members < pmembers) {
2513       /* some members went away since the previous timeout estimate.
2514        * Perform reverse reconsideration but only when we are not scheduling a
2515        * BYE ourselves. */
2516       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2517           pinfo->current_time < sess->next_rtcp_check_time) {
2518         GstClockTime time_remaining;
2519
2520         /* Scale our next RTCP check time according to the change of numbers
2521          * of members. But only if a) this is the first RTCP, or b) this is not
2522          * a feedback session, or c) this is a feedback session but we schedule
2523          * for every RTCP interval (aka no t-rr-interval set).
2524          *
2525          * FIXME: a) and b) are not great as we will possibly go below Tmin
2526          * for non-feedback profiles and in case of a) below
2527          * Tmin/t-rr-interval in any case.
2528          */
2529         if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE ||
2530             !(sess->rtp_profile == GST_RTP_PROFILE_AVPF
2531                 || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) ||
2532             sess->next_rtcp_check_time - sess->last_rtcp_send_time ==
2533             sess->last_rtcp_interval) {
2534           time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2535           sess->next_rtcp_check_time =
2536               gst_util_uint64_scale (time_remaining, members, pmembers);
2537           sess->next_rtcp_check_time += pinfo->current_time;
2538         }
2539         sess->last_rtcp_interval =
2540             gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers);
2541
2542         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2543             GST_TIME_ARGS (sess->next_rtcp_check_time));
2544
2545         /* mark pending reconsider. We only want to signal the reconsideration
2546          * once after we handled all the source in the bye packet */
2547         reconsider = TRUE;
2548       }
2549     }
2550
2551     on_bye_ssrc (sess, source);
2552   }
2553   if (reconsider) {
2554     RTP_SESSION_UNLOCK (sess);
2555     /* notify app of reconsideration */
2556     if (sess->callbacks.reconsider)
2557       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2558     RTP_SESSION_LOCK (sess);
2559   }
2560
2561   g_free (reason);
2562 }
2563
2564 static void
2565 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2566     RTPPacketInfo * pinfo)
2567 {
2568   GST_DEBUG ("received APP");
2569
2570   if (g_signal_has_handler_pending (sess,
2571           rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, TRUE)) {
2572     GstBuffer *data_buffer = NULL;
2573     guint16 data_length;
2574     gchar name[5];
2575
2576     data_length = gst_rtcp_packet_app_get_data_length (packet) * 4;
2577     if (data_length > 0) {
2578       guint8 *data = gst_rtcp_packet_app_get_data (packet);
2579       data_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2580           GST_BUFFER_COPY_MEMORY, data - packet->rtcp->map.data, data_length);
2581       GST_BUFFER_PTS (data_buffer) = pinfo->running_time;
2582     }
2583
2584     memcpy (name, gst_rtcp_packet_app_get_name (packet), 4);
2585     name[4] = '\0';
2586
2587     RTP_SESSION_UNLOCK (sess);
2588     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_APP_RTCP], 0,
2589         gst_rtcp_packet_app_get_subtype (packet),
2590         gst_rtcp_packet_app_get_ssrc (packet), name, data_buffer);
2591     RTP_SESSION_LOCK (sess);
2592
2593     if (data_buffer)
2594       gst_buffer_unref (data_buffer);
2595   }
2596 }
2597
2598 static gboolean
2599 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2600     guint32 media_ssrc, gboolean fir, GstClockTime current_time)
2601 {
2602   guint32 round_trip = 0;
2603
2604   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
2605
2606   if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2607     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2608         GST_SECOND, 65536);
2609
2610     /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP
2611      * packets with erroneous values resulting in crazy high RTT. */
2612     if (round_trip_in_ns > 5 * GST_SECOND)
2613       round_trip_in_ns = GST_SECOND / 2;
2614
2615     if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) {
2616       GST_DEBUG ("Ignoring %s request from %X because one was send without one "
2617           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2618           fir ? "FIR" : "PLI", rtp_source_get_ssrc (src),
2619           GST_TIME_ARGS (current_time - src->last_keyframe_request),
2620           GST_TIME_ARGS (round_trip_in_ns));
2621       return FALSE;
2622     }
2623   }
2624
2625   src->last_keyframe_request = current_time;
2626
2627   GST_LOG ("received %s request from %X about %X %p(%p)", fir ? "FIR" : "PLI",
2628       rtp_source_get_ssrc (src), media_ssrc, sess->callbacks.process_rtp,
2629       sess->callbacks.request_key_unit);
2630
2631   RTP_SESSION_UNLOCK (sess);
2632   sess->callbacks.request_key_unit (sess, media_ssrc, fir,
2633       sess->request_key_unit_user_data);
2634   RTP_SESSION_LOCK (sess);
2635
2636   return TRUE;
2637 }
2638
2639 static void
2640 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2641     guint32 media_ssrc, GstClockTime current_time)
2642 {
2643   RTPSource *src;
2644
2645   if (!sess->callbacks.request_key_unit)
2646     return;
2647
2648   src = find_source (sess, sender_ssrc);
2649   if (src == NULL) {
2650     /* try to find a src with media_ssrc instead */
2651     src = find_source (sess, media_ssrc);
2652     if (src == NULL)
2653       return;
2654   }
2655
2656   rtp_session_request_local_key_unit (sess, src, media_ssrc, FALSE,
2657       current_time);
2658 }
2659
2660 static void
2661 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2662     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2663     GstClockTime current_time)
2664 {
2665   RTPSource *src;
2666   guint32 ssrc;
2667   guint position = 0;
2668   gboolean our_request = FALSE;
2669
2670   if (!sess->callbacks.request_key_unit)
2671     return;
2672
2673   if (fci_length < 8)
2674     return;
2675
2676   src = find_source (sess, sender_ssrc);
2677
2678   /* Hack because Google fails to set the sender_ssrc correctly */
2679   if (!src && sender_ssrc == 1) {
2680     GHashTableIter iter;
2681
2682     /* we can't find the source if there are multiple */
2683     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2684       return;
2685
2686     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2687     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2688       if (!src->internal && rtp_source_is_sender (src))
2689         break;
2690       src = NULL;
2691     }
2692   }
2693   if (!src)
2694     return;
2695
2696   for (position = 0; position < fci_length; position += 8) {
2697     guint8 *data = fci_data + position;
2698     RTPSource *own;
2699
2700     ssrc = GST_READ_UINT32_BE (data);
2701
2702     own = find_source (sess, ssrc);
2703     if (own == NULL)
2704       continue;
2705
2706     if (own->internal) {
2707       our_request = TRUE;
2708       break;
2709     }
2710   }
2711   if (!our_request)
2712     return;
2713
2714   rtp_session_request_local_key_unit (sess, src, media_ssrc, TRUE,
2715       current_time);
2716 }
2717
2718 static void
2719 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2720     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2721     GstClockTime current_time)
2722 {
2723   sess->stats.nacks_received++;
2724
2725   if (!sess->callbacks.notify_nack)
2726     return;
2727
2728   while (fci_length > 0) {
2729     guint16 seqnum, blp;
2730
2731     seqnum = GST_READ_UINT16_BE (fci_data);
2732     blp = GST_READ_UINT16_BE (fci_data + 2);
2733
2734     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2735
2736     RTP_SESSION_UNLOCK (sess);
2737     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2738         sess->notify_nack_user_data);
2739     RTP_SESSION_LOCK (sess);
2740
2741     fci_data += 4;
2742     fci_length -= 4;
2743   }
2744 }
2745
2746 static void
2747 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2748     RTPPacketInfo * pinfo, GstClockTime current_time)
2749 {
2750   GstRTCPType type;
2751   GstRTCPFBType fbtype;
2752   guint32 sender_ssrc, media_ssrc;
2753   guint8 *fci_data;
2754   guint fci_length;
2755   RTPSource *src;
2756
2757   /* The feedback packet must include both sender SSRC and media SSRC */
2758   if (packet->length < 2)
2759     return;
2760
2761   type = gst_rtcp_packet_get_type (packet);
2762   fbtype = gst_rtcp_packet_fb_get_type (packet);
2763   sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2764   media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2765
2766   src = find_source (sess, media_ssrc);
2767
2768   /* skip non-bye packets for sources that are marked BYE */
2769   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
2770     return;
2771
2772   if (src)
2773     g_object_ref (src);
2774
2775   fci_data = gst_rtcp_packet_fb_get_fci (packet);
2776   fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32);
2777
2778   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2779       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2780
2781   if (g_signal_has_handler_pending (sess,
2782           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2783     GstBuffer *fci_buffer = NULL;
2784
2785     if (fci_length > 0) {
2786       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2787           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2788           fci_length);
2789       GST_BUFFER_PTS (fci_buffer) = pinfo->running_time;
2790     }
2791
2792     RTP_SESSION_UNLOCK (sess);
2793     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2794         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2795     RTP_SESSION_LOCK (sess);
2796
2797     if (fci_buffer)
2798       gst_buffer_unref (fci_buffer);
2799   }
2800
2801   if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) {
2802     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
2803   }
2804
2805   if ((src && src->internal) ||
2806       /* PSFB FIR puts the media ssrc inside the FCI */
2807       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
2808     switch (type) {
2809       case GST_RTCP_TYPE_PSFB:
2810         switch (fbtype) {
2811           case GST_RTCP_PSFB_TYPE_PLI:
2812             if (src)
2813               src->stats.recv_pli_count++;
2814             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2815                 current_time);
2816             break;
2817           case GST_RTCP_PSFB_TYPE_FIR:
2818             if (src)
2819               src->stats.recv_fir_count++;
2820             rtp_session_process_fir (sess, sender_ssrc, media_ssrc, fci_data,
2821                 fci_length, current_time);
2822             break;
2823           default:
2824             break;
2825         }
2826         break;
2827       case GST_RTCP_TYPE_RTPFB:
2828         switch (fbtype) {
2829           case GST_RTCP_RTPFB_TYPE_NACK:
2830             if (src)
2831               src->stats.recv_nack_count++;
2832             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
2833                 fci_data, fci_length, current_time);
2834             break;
2835           default:
2836             break;
2837         }
2838       default:
2839         break;
2840     }
2841   }
2842
2843   if (src)
2844     g_object_unref (src);
2845 }
2846
2847 /**
2848  * rtp_session_process_rtcp:
2849  * @sess: and #RTPSession
2850  * @buffer: an RTCP buffer
2851  * @current_time: the current system time
2852  * @ntpnstime: the current NTP time in nanoseconds
2853  *
2854  * Process an RTCP buffer in the session manager. This function takes ownership
2855  * of @buffer.
2856  *
2857  * Returns: a #GstFlowReturn.
2858  */
2859 GstFlowReturn
2860 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2861     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
2862 {
2863   GstRTCPPacket packet;
2864   gboolean more, is_bye = FALSE, do_sync = FALSE;
2865   RTPPacketInfo pinfo = { 0, };
2866   GstFlowReturn result = GST_FLOW_OK;
2867   GstRTCPBuffer rtcp = { NULL, };
2868
2869   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2870   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2871
2872   if (!gst_rtcp_buffer_validate_reduced (buffer))
2873     goto invalid_packet;
2874
2875   GST_DEBUG ("received RTCP packet");
2876
2877   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
2878       buffer);
2879
2880   RTP_SESSION_LOCK (sess);
2881   /* update pinfo stats */
2882   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
2883       running_time, ntpnstime);
2884
2885   /* start processing the compound packet */
2886   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2887   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
2888   while (more) {
2889     GstRTCPType type;
2890
2891     type = gst_rtcp_packet_get_type (&packet);
2892
2893     switch (type) {
2894       case GST_RTCP_TYPE_SR:
2895         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
2896         break;
2897       case GST_RTCP_TYPE_RR:
2898         rtp_session_process_rr (sess, &packet, &pinfo);
2899         break;
2900       case GST_RTCP_TYPE_SDES:
2901         rtp_session_process_sdes (sess, &packet, &pinfo);
2902         break;
2903       case GST_RTCP_TYPE_BYE:
2904         is_bye = TRUE;
2905         /* don't try to attempt lip-sync anymore for streams with a BYE */
2906         do_sync = FALSE;
2907         rtp_session_process_bye (sess, &packet, &pinfo);
2908         break;
2909       case GST_RTCP_TYPE_APP:
2910         rtp_session_process_app (sess, &packet, &pinfo);
2911         break;
2912       case GST_RTCP_TYPE_RTPFB:
2913       case GST_RTCP_TYPE_PSFB:
2914         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
2915         break;
2916       case GST_RTCP_TYPE_XR:
2917         /* FIXME: This block is added to downgrade warning level.
2918          * Once the parser is implemented, it should be replaced with
2919          * a proper process function. */
2920         GST_DEBUG ("got RTCP XR packet, but ignored");
2921         break;
2922       default:
2923         GST_WARNING ("got unknown RTCP packet type: %d", type);
2924         break;
2925     }
2926     more = gst_rtcp_packet_move_to_next (&packet);
2927   }
2928
2929   gst_rtcp_buffer_unmap (&rtcp);
2930
2931   /* if we are scheduling a BYE, we only want to count bye packets, else we
2932    * count everything */
2933   if (sess->scheduled_bye && is_bye) {
2934     sess->bye_stats.bye_members++;
2935     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
2936   }
2937
2938   /* keep track of average packet size */
2939   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2940
2941   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2942       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2943   RTP_SESSION_UNLOCK (sess);
2944
2945   pinfo.data = NULL;
2946   clean_packet_info (&pinfo);
2947
2948   /* notify caller of sr packets in the callback */
2949   if (do_sync && sess->callbacks.sync_rtcp) {
2950     result = sess->callbacks.sync_rtcp (sess, buffer,
2951         sess->sync_rtcp_user_data);
2952   } else
2953     gst_buffer_unref (buffer);
2954
2955   return result;
2956
2957   /* ERRORS */
2958 invalid_packet:
2959   {
2960     GST_DEBUG ("invalid RTCP packet received");
2961     gst_buffer_unref (buffer);
2962     return GST_FLOW_OK;
2963   }
2964 }
2965
2966 /**
2967  * rtp_session_update_send_caps:
2968  * @sess: an #RTPSession
2969  * @caps: a #GstCaps
2970  *
2971  * Update the caps of the sender in the rtp session.
2972  */
2973 void
2974 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
2975 {
2976   GstStructure *s;
2977   guint ssrc;
2978
2979   g_return_if_fail (RTP_IS_SESSION (sess));
2980   g_return_if_fail (GST_IS_CAPS (caps));
2981
2982   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
2983
2984   s = gst_caps_get_structure (caps, 0);
2985
2986   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
2987     RTPSource *source;
2988     gboolean created;
2989
2990     RTP_SESSION_LOCK (sess);
2991     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
2992     sess->suggested_ssrc = ssrc;
2993     sess->internal_ssrc_set = TRUE;
2994     sess->internal_ssrc_from_caps_or_property = TRUE;
2995     if (source) {
2996       rtp_source_update_caps (source, caps);
2997
2998       if (created)
2999         on_new_sender_ssrc (sess, source);
3000
3001       g_object_unref (source);
3002     }
3003
3004     if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
3005       source =
3006           obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
3007       if (source) {
3008         rtp_source_update_caps (source, caps);
3009
3010         if (created)
3011           on_new_sender_ssrc (sess, source);
3012
3013         g_object_unref (source);
3014       }
3015     }
3016     RTP_SESSION_UNLOCK (sess);
3017   } else {
3018     sess->internal_ssrc_from_caps_or_property = FALSE;
3019   }
3020 }
3021
3022 /**
3023  * rtp_session_send_rtp:
3024  * @sess: an #RTPSession
3025  * @data: pointer to either an RTP buffer or a list of RTP buffers
3026  * @is_list: TRUE when @data is a buffer list
3027  * @current_time: the current system time
3028  * @running_time: the running time of @data
3029  *
3030  * Send the RTP data (a buffer or buffer list) in the session manager. This
3031  * function takes ownership of @data.
3032  *
3033  * Returns: a #GstFlowReturn.
3034  */
3035 GstFlowReturn
3036 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
3037     GstClockTime current_time, GstClockTime running_time)
3038 {
3039   GstFlowReturn result;
3040   RTPSource *source;
3041   gboolean prevsender;
3042   guint64 oldrate;
3043   RTPPacketInfo pinfo = { 0, };
3044   gboolean created;
3045
3046   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3047   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
3048
3049   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
3050
3051   RTP_SESSION_LOCK (sess);
3052   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
3053           current_time, running_time, -1))
3054     goto invalid_packet;
3055
3056   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
3057   if (created)
3058     on_new_sender_ssrc (sess, source);
3059
3060   if (!source->internal)
3061     /* FIXME: Send GstRTPCollision upstream  */
3062     goto collision;
3063
3064   prevsender = RTP_SOURCE_IS_SENDER (source);
3065   oldrate = source->bitrate;
3066
3067   /* we use our own source to send */
3068   result = rtp_source_send_rtp (source, &pinfo);
3069
3070   source_update_sender (sess, source, prevsender);
3071
3072   if (oldrate != source->bitrate)
3073     sess->recalc_bandwidth = TRUE;
3074   RTP_SESSION_UNLOCK (sess);
3075
3076   g_object_unref (source);
3077   clean_packet_info (&pinfo);
3078
3079   return result;
3080
3081 invalid_packet:
3082   {
3083     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
3084     RTP_SESSION_UNLOCK (sess);
3085     GST_DEBUG ("invalid RTP packet received");
3086     return GST_FLOW_OK;
3087   }
3088 collision:
3089   {
3090     g_object_unref (source);
3091     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
3092     RTP_SESSION_UNLOCK (sess);
3093     GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
3094         pinfo.ssrc);
3095     return GST_FLOW_OK;
3096   }
3097 }
3098
3099 static void
3100 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
3101 {
3102   *bandwidth += source->bitrate;
3103 }
3104
3105 /* must be called with session lock */
3106 static GstClockTime
3107 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
3108     gboolean first)
3109 {
3110   GstClockTime result;
3111   RTPSessionStats *stats;
3112
3113   /* recalculate bandwidth when it changed */
3114   if (sess->recalc_bandwidth) {
3115     gdouble bandwidth;
3116
3117     if (sess->bandwidth > 0)
3118       bandwidth = sess->bandwidth;
3119     else {
3120       /* If it is <= 0, then try to estimate the actual bandwidth */
3121       bandwidth = 0;
3122
3123       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3124           (GHFunc) add_bitrates, &bandwidth);
3125     }
3126     if (bandwidth < RTP_STATS_BANDWIDTH)
3127       bandwidth = RTP_STATS_BANDWIDTH;
3128
3129     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
3130         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
3131
3132     sess->recalc_bandwidth = FALSE;
3133   }
3134
3135   if (sess->scheduled_bye) {
3136     stats = &sess->bye_stats;
3137     result = rtp_stats_calculate_bye_interval (stats);
3138   } else {
3139     session_update_ptp (sess);
3140
3141     stats = &sess->stats;
3142     result = rtp_stats_calculate_rtcp_interval (stats,
3143         stats->internal_sender_sources > 0, sess->rtp_profile,
3144         sess->is_doing_ptp, first);
3145   }
3146
3147   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
3148       GST_TIME_ARGS (result), first);
3149
3150   if (!deterministic && result != GST_CLOCK_TIME_NONE)
3151     result = rtp_stats_add_rtcp_jitter (stats, result);
3152
3153   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
3154
3155   return result;
3156 }
3157
3158 static void
3159 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
3160 {
3161   if (source->internal)
3162     rtp_source_mark_bye (source, reason);
3163 }
3164
3165 /**
3166  * rtp_session_mark_all_bye:
3167  * @sess: an #RTPSession
3168  * @reason: a reason
3169  *
3170  * Mark all internal sources of the session as BYE with @reason.
3171  */
3172 void
3173 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
3174 {
3175   g_return_if_fail (RTP_IS_SESSION (sess));
3176
3177   RTP_SESSION_LOCK (sess);
3178   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3179       (GHFunc) source_mark_bye, (gpointer) reason);
3180   RTP_SESSION_UNLOCK (sess);
3181 }
3182
3183 /* Stop the current @sess and schedule a BYE message for the other members.
3184  * One must have the session lock to call this function
3185  */
3186 static GstFlowReturn
3187 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
3188 {
3189   GstFlowReturn result = GST_FLOW_OK;
3190   GstClockTime interval;
3191
3192   /* nothing to do it we already scheduled bye */
3193   if (sess->scheduled_bye)
3194     goto done;
3195
3196   /* we schedule BYE now */
3197   sess->scheduled_bye = TRUE;
3198   /* at least one member wants to send a BYE */
3199   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
3200   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
3201   sess->bye_stats.bye_members = 1;
3202   sess->first_rtcp = TRUE;
3203
3204   /* reschedule transmission */
3205   sess->last_rtcp_send_time = current_time;
3206   sess->last_rtcp_check_time = current_time;
3207   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3208
3209   if (interval != GST_CLOCK_TIME_NONE)
3210     sess->next_rtcp_check_time = current_time + interval;
3211   else
3212     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
3213   sess->last_rtcp_interval = interval;
3214
3215   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
3216       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
3217
3218   RTP_SESSION_UNLOCK (sess);
3219   /* notify app of reconsideration */
3220   if (sess->callbacks.reconsider)
3221     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3222   RTP_SESSION_LOCK (sess);
3223 done:
3224
3225   return result;
3226 }
3227
3228 /**
3229  * rtp_session_schedule_bye:
3230  * @sess: an #RTPSession
3231  * @current_time: the current system time
3232  *
3233  * Schedule a BYE message for all sources marked as BYE in @sess.
3234  *
3235  * Returns: a #GstFlowReturn.
3236  */
3237 GstFlowReturn
3238 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
3239 {
3240   GstFlowReturn result;
3241
3242   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3243
3244   RTP_SESSION_LOCK (sess);
3245   result = rtp_session_schedule_bye_locked (sess, current_time);
3246   RTP_SESSION_UNLOCK (sess);
3247
3248   return result;
3249 }
3250
3251 /**
3252  * rtp_session_next_timeout:
3253  * @sess: an #RTPSession
3254  * @current_time: the current system time
3255  *
3256  * Get the next time we should perform session maintenance tasks.
3257  *
3258  * Returns: a time when rtp_session_on_timeout() should be called with the
3259  * current system time.
3260  */
3261 GstClockTime
3262 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
3263 {
3264   GstClockTime result, interval = 0;
3265
3266   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
3267
3268   RTP_SESSION_LOCK (sess);
3269
3270   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3271     GST_DEBUG ("have early rtcp time");
3272     result = sess->next_early_rtcp_time;
3273     goto early_exit;
3274   }
3275
3276   result = sess->next_rtcp_check_time;
3277
3278   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3279       ", next time: %" GST_TIME_FORMAT,
3280       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3281
3282   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
3283     GST_DEBUG ("take current time as base");
3284     /* our previous check time expired, start counting from the current time
3285      * again. */
3286     result = current_time;
3287   }
3288
3289   if (sess->scheduled_bye) {
3290     if (sess->bye_stats.active_sources >= 50) {
3291       GST_DEBUG ("reconsider BYE, more than 50 sources");
3292       /* reconsider BYE if members >= 50 */
3293       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3294       sess->last_rtcp_interval = interval;
3295     }
3296   } else {
3297     if (sess->first_rtcp) {
3298       GST_DEBUG ("first RTCP packet");
3299       /* we are called for the first time */
3300       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
3301       sess->last_rtcp_interval = interval;
3302     } else if (sess->next_rtcp_check_time < current_time) {
3303       GST_DEBUG ("old check time expired, getting new timeout");
3304       /* get a new timeout when we need to */
3305       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
3306       sess->last_rtcp_interval = interval;
3307
3308       if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
3309               || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
3310           && interval != GST_CLOCK_TIME_NONE) {
3311         /* Apply the rules from RFC 4585 section 3.5.3 */
3312         if (sess->stats.min_interval != 0) {
3313           GstClockTime T_rr_current_interval = g_random_double_range (0.5,
3314               1.5) * sess->stats.min_interval * GST_SECOND;
3315
3316           if (T_rr_current_interval > interval) {
3317             GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
3318                 " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
3319                 GST_TIME_ARGS (interval));
3320             interval = T_rr_current_interval;
3321           }
3322         }
3323       }
3324     }
3325   }
3326
3327   if (interval != GST_CLOCK_TIME_NONE)
3328     result += interval;
3329   else
3330     result = GST_CLOCK_TIME_NONE;
3331
3332   sess->next_rtcp_check_time = result;
3333
3334 early_exit:
3335
3336   GST_DEBUG ("current time: %" GST_TIME_FORMAT
3337       ", next time: %" GST_TIME_FORMAT,
3338       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
3339   RTP_SESSION_UNLOCK (sess);
3340
3341   return result;
3342 }
3343
3344 typedef struct
3345 {
3346   RTPSource *source;
3347   gboolean is_bye;
3348   GstBuffer *buffer;
3349 } ReportOutput;
3350
3351 typedef struct
3352 {
3353   GstRTCPBuffer rtcpbuf;
3354   RTPSession *sess;
3355   RTPSource *source;
3356   guint num_to_report;
3357   gboolean have_fir;
3358   gboolean have_pli;
3359   gboolean have_nack;
3360   GstBuffer *rtcp;
3361   GstClockTime current_time;
3362   guint64 ntpnstime;
3363   GstClockTime running_time;
3364   GstClockTime interval;
3365   GstRTCPPacket packet;
3366   gboolean has_sdes;
3367   gboolean is_early;
3368   gboolean may_suppress;
3369   GQueue output;
3370   guint nacked_seqnums;
3371 } ReportData;
3372
3373 static void
3374 session_start_rtcp (RTPSession * sess, ReportData * data)
3375 {
3376   GstRTCPPacket *packet = &data->packet;
3377   RTPSource *own = data->source;
3378   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3379
3380   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
3381   data->has_sdes = FALSE;
3382
3383   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3384
3385   if (data->is_early && sess->reduced_size_rtcp)
3386     return;
3387
3388   if (RTP_SOURCE_IS_SENDER (own)) {
3389     guint64 ntptime;
3390     guint32 rtptime;
3391     guint32 packet_count, octet_count;
3392
3393     /* we are a sender, create SR */
3394     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3395     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3396
3397     /* get latest stats */
3398     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3399         &ntptime, &rtptime, &packet_count, &octet_count);
3400     /* store stats */
3401     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3402         packet_count, octet_count);
3403
3404     /* fill in sender report info */
3405     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3406         ntptime, rtptime, packet_count, octet_count);
3407   } else {
3408     /* we are only receiver, create RR */
3409     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3410     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3411     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3412   }
3413 }
3414
3415 /* construct a Sender or Receiver Report */
3416 static void
3417 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3418 {
3419   RTPSession *sess = data->sess;
3420   GstRTCPPacket *packet = &data->packet;
3421   guint8 fractionlost;
3422   gint32 packetslost;
3423   guint32 exthighestseq, jitter;
3424   guint32 lsr, dlsr;
3425
3426   /* don't report for sources in future generations */
3427   if (((gint16) (source->generation - sess->generation)) > 0) {
3428     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3429         source->generation, sess->generation);
3430     return;
3431   }
3432
3433   if (g_hash_table_contains (source->reported_in_sr_of,
3434           GUINT_TO_POINTER (data->source->ssrc))) {
3435     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3436     return;
3437   }
3438
3439   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3440     GST_DEBUG ("max RB count reached");
3441     return;
3442   }
3443
3444   /* only report about remote sources */
3445   if (source->internal)
3446     goto reported;
3447
3448   if (!RTP_SOURCE_IS_SENDER (source)) {
3449     GST_DEBUG ("source %08x not sender", source->ssrc);
3450     goto reported;
3451   }
3452
3453   if (source->disable_rtcp) {
3454     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
3455     goto reported;
3456   }
3457
3458   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3459
3460   /* get new stats */
3461   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3462       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3463
3464   /* store last generated RR packet */
3465   source->last_rr.is_valid = TRUE;
3466   source->last_rr.fractionlost = fractionlost;
3467   source->last_rr.packetslost = packetslost;
3468   source->last_rr.exthighestseq = exthighestseq;
3469   source->last_rr.jitter = jitter;
3470   source->last_rr.lsr = lsr;
3471   source->last_rr.dlsr = dlsr;
3472
3473   /* packet is not yet filled, add report block for this source. */
3474   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3475       exthighestseq, jitter, lsr, dlsr);
3476
3477 reported:
3478   g_hash_table_add (source->reported_in_sr_of,
3479       GUINT_TO_POINTER (data->source->ssrc));
3480 }
3481
3482 /* construct FIR */
3483 static void
3484 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3485 {
3486   GstRTCPPacket *packet = &data->packet;
3487   guint16 len;
3488   guint8 *fci_data;
3489
3490   if (!source->send_fir)
3491     return;
3492
3493   len = gst_rtcp_packet_fb_get_fci_length (packet);
3494   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3495     /* exit because the packet is full, will put next request in a
3496      * further packet */
3497     return;
3498
3499   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3500
3501   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3502   fci_data += 4;
3503   fci_data[0] = source->current_send_fir_seqnum;
3504   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3505
3506   source->send_fir = FALSE;
3507   source->stats.sent_fir_count++;
3508 }
3509
3510 static void
3511 session_fir (RTPSession * sess, ReportData * data)
3512 {
3513   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3514   GstRTCPPacket *packet = &data->packet;
3515
3516   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3517     return;
3518
3519   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3520   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3521   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3522
3523   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3524       (GHFunc) session_add_fir, data);
3525
3526   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3527     gst_rtcp_packet_remove (packet);
3528   else
3529     data->may_suppress = FALSE;
3530 }
3531
3532 static gboolean
3533 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3534 {
3535   GstRTCPPacket packet;
3536   GstRTCPBuffer rtcp = { NULL, };
3537   gboolean ret = FALSE;
3538
3539   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3540
3541   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3542     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3543         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3544       ret = TRUE;
3545   }
3546
3547   gst_rtcp_buffer_unmap (&rtcp);
3548
3549   return ret;
3550 }
3551
3552 /* construct PLI */
3553 static void
3554 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3555 {
3556   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3557   GstRTCPPacket *packet = &data->packet;
3558
3559   if (!source->send_pli)
3560     return;
3561
3562   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3563     return;
3564
3565   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3566     /* exit because the packet is full, will put next request in a
3567      * further packet */
3568     return;
3569
3570   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
3571   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3572   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3573
3574   source->send_pli = FALSE;
3575   data->may_suppress = FALSE;
3576
3577   source->stats.sent_pli_count++;
3578 }
3579
3580 /* construct NACK */
3581 static void
3582 session_nack (const gchar * key, RTPSource * source, ReportData * data)
3583 {
3584   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3585   GstRTCPPacket *packet = &data->packet;
3586   guint16 *nacks;
3587   GstClockTime *nack_deadlines;
3588   guint n_nacks, i = 0;
3589   guint nacked_seqnums = 0;
3590   guint16 n_fb_nacks = 0;
3591   guint8 *fci_data;
3592
3593   if (!source->send_nack)
3594     return;
3595
3596   nacks = rtp_source_get_nacks (source, &n_nacks);
3597   nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
3598   GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
3599       GST_TIME_ARGS (data->current_time));
3600
3601   /* cleanup expired nacks */
3602   for (i = 0; i < n_nacks; i++) {
3603     GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
3604         GST_TIME_ARGS (nack_deadlines[i]));
3605     if (nack_deadlines[i] >= data->current_time)
3606       break;
3607   }
3608   if (i) {
3609     GST_WARNING ("Removing %u expired NACKS", i);
3610     rtp_source_clear_nacks (source, i);
3611     n_nacks -= i;
3612     if (n_nacks == 0)
3613       return;
3614   }
3615
3616   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
3617     /* exit because the packet is full, will put next request in a
3618      * further packet */
3619     return;
3620
3621   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
3622   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3623   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3624
3625   if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
3626     gst_rtcp_packet_remove (packet);
3627     GST_WARNING ("no nacks fit in the packet");
3628     return;
3629   }
3630
3631   fci_data = gst_rtcp_packet_fb_get_fci (packet);
3632   for (i = 0; i < n_nacks; i = nacked_seqnums) {
3633     guint16 seqnum = nacks[i];
3634     guint16 blp = 0;
3635     guint j;
3636
3637     if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
3638       break;
3639
3640     n_fb_nacks++;
3641     nacked_seqnums++;
3642
3643     for (j = i + 1; j < n_nacks; j++) {
3644       gint diff;
3645
3646       diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
3647       GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
3648       if (diff > 16)
3649         break;
3650
3651       blp |= 1 << (diff - 1);
3652       nacked_seqnums++;
3653     }
3654
3655     GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
3656     fci_data += 4;
3657   }
3658
3659   data->nacked_seqnums += nacked_seqnums;
3660   rtp_source_clear_nacks (source, nacked_seqnums);
3661   data->may_suppress = FALSE;
3662   source->stats.sent_nack_count += n_fb_nacks;
3663
3664   GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
3665 }
3666
3667 /* perform cleanup of sources that timed out */
3668 static void
3669 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
3670 {
3671   gboolean remove = FALSE;
3672   gboolean byetimeout = FALSE;
3673   gboolean sendertimeout = FALSE;
3674   gboolean is_sender, is_active;
3675   RTPSession *sess = data->sess;
3676   GstClockTime interval, binterval;
3677   GstClockTime btime;
3678
3679   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
3680
3681   /* check for outdated collisions */
3682   if (source->internal) {
3683     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
3684     rtp_source_timeout (source, data->current_time, data->running_time,
3685         sess->rtcp_feedback_retention_window);
3686   }
3687
3688   /* nothing else to do when without RTCP */
3689   if (data->interval == GST_CLOCK_TIME_NONE)
3690     return;
3691
3692   is_sender = RTP_SOURCE_IS_SENDER (source);
3693   is_active = RTP_SOURCE_IS_ACTIVE (source);
3694
3695   /* our own rtcp interval may have been forced low by secondary configuration,
3696    * while sender side may still operate with higher interval,
3697    * so do not just take our interval to decide on timing out sender,
3698    * but take (if data->interval <= 5 * GST_SECOND):
3699    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
3700    * where sender_interval is difference between last 2 received RTCP reports
3701    */
3702   if (data->interval >= 5 * GST_SECOND || source->internal) {
3703     binterval = data->interval;
3704   } else {
3705     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
3706         GST_TIME_ARGS (source->stats.prev_rtcptime),
3707         GST_TIME_ARGS (source->stats.last_rtcptime));
3708     /* if not received enough yet, fallback to larger default */
3709     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
3710       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
3711     else
3712       binterval = 5 * GST_SECOND;
3713     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
3714   }
3715   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
3716       GST_TIME_ARGS (binterval));
3717
3718   if (!source->internal && source->marked_bye) {
3719     /* if we received a BYE from the source, remove the source after some
3720      * time. */
3721     if (data->current_time > source->bye_time &&
3722         data->current_time - source->bye_time > sess->stats.bye_timeout) {
3723       GST_DEBUG ("removing BYE source %08x", source->ssrc);
3724       remove = TRUE;
3725       byetimeout = TRUE;
3726     }
3727   }
3728
3729   if (source->internal && source->sent_bye) {
3730     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
3731     remove = TRUE;
3732   }
3733
3734   /* sources that were inactive for more than 5 times the deterministic reporting
3735    * interval get timed out. the min timeout is 5 seconds. */
3736   /* mind old time that might pre-date last time going to PLAYING */
3737   btime = MAX (source->last_activity, sess->start_time);
3738   if (data->current_time > btime) {
3739     interval = MAX (binterval * 5, 5 * GST_SECOND);
3740     if (data->current_time - btime > interval) {
3741       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
3742           source->ssrc, GST_TIME_ARGS (btime));
3743       if (source->internal) {
3744         /* this is an internal source that is not using our suggested ssrc.
3745          * since there must be another source using this ssrc, we can remove
3746          * this one instead of making it a receiver forever */
3747         if (source->ssrc != sess->suggested_ssrc) {
3748           rtp_source_mark_bye (source, "timed out");
3749           /* do not schedule bye here, since we are inside the RTCP timeout
3750            * processing and scheduling bye will interfere with SR/RR sending */
3751         }
3752       } else {
3753         remove = TRUE;
3754       }
3755     }
3756   }
3757
3758   /* senders that did not send for a long time become a receiver, this also
3759    * holds for our own sources. */
3760   if (is_sender) {
3761     /* mind old time that might pre-date last time going to PLAYING */
3762     btime = MAX (source->last_rtp_activity, sess->start_time);
3763     if (data->current_time > btime) {
3764       interval = MAX (binterval * 2, 5 * GST_SECOND);
3765       if (data->current_time - btime > interval) {
3766         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
3767             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3768         sendertimeout = TRUE;
3769       }
3770     }
3771   }
3772
3773   if (remove) {
3774     sess->total_sources--;
3775     if (is_sender) {
3776       sess->stats.sender_sources--;
3777       if (source->internal)
3778         sess->stats.internal_sender_sources--;
3779     }
3780     if (is_active)
3781       sess->stats.active_sources--;
3782
3783     if (source->internal)
3784       sess->stats.internal_sources--;
3785
3786     if (byetimeout)
3787       on_bye_timeout (sess, source);
3788     else
3789       on_timeout (sess, source);
3790   } else {
3791     if (sendertimeout) {
3792       source->is_sender = FALSE;
3793       sess->stats.sender_sources--;
3794       if (source->internal)
3795         sess->stats.internal_sender_sources--;
3796
3797       on_sender_timeout (sess, source);
3798     }
3799     /* count how many source to report in this generation */
3800     if (((gint16) (source->generation - sess->generation)) <= 0)
3801       data->num_to_report++;
3802   }
3803   source->closing = remove;
3804 }
3805
3806 static void
3807 session_sdes (RTPSession * sess, ReportData * data)
3808 {
3809   GstRTCPPacket *packet = &data->packet;
3810   const GstStructure *sdes;
3811   gint i, n_fields;
3812   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3813
3814   /* add SDES packet */
3815   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
3816
3817   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
3818
3819   sdes = rtp_source_get_sdes_struct (data->source);
3820
3821   /* add all fields in the structure, the order is not important. */
3822   n_fields = gst_structure_n_fields (sdes);
3823   for (i = 0; i < n_fields; ++i) {
3824     const gchar *field;
3825     const gchar *value;
3826     GstRTCPSDESType type;
3827
3828     field = gst_structure_nth_field_name (sdes, i);
3829     if (field == NULL)
3830       continue;
3831     value = gst_structure_get_string (sdes, field);
3832     if (value == NULL)
3833       continue;
3834     type = gst_rtcp_sdes_name_to_type (field);
3835
3836     /* Early packets are minimal and only include the CNAME */
3837     if (data->is_early && type != GST_RTCP_SDES_CNAME)
3838       continue;
3839
3840     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
3841       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
3842           (const guint8 *) value);
3843     } else if (type == GST_RTCP_SDES_PRIV) {
3844       gsize prefix_len;
3845       gsize value_len;
3846       gsize data_len;
3847       guint8 data[256];
3848
3849       /* don't accept entries that are too big */
3850       prefix_len = strlen (field);
3851       if (prefix_len > 255)
3852         continue;
3853       value_len = strlen (value);
3854       if (value_len > 255)
3855         continue;
3856       data_len = 1 + prefix_len + value_len;
3857       if (data_len > 255)
3858         continue;
3859
3860       data[0] = prefix_len;
3861       memcpy (&data[1], field, prefix_len);
3862       memcpy (&data[1 + prefix_len], value, value_len);
3863
3864       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
3865     }
3866   }
3867
3868   data->has_sdes = TRUE;
3869 }
3870
3871 /* schedule a BYE packet */
3872 static void
3873 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
3874 {
3875   GstRTCPPacket *packet = &data->packet;
3876   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3877
3878   /* add SDES */
3879   session_sdes (sess, data);
3880   /* add a BYE packet */
3881   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
3882   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
3883   if (source->bye_reason)
3884     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
3885
3886   /* we have a BYE packet now */
3887   source->sent_bye = TRUE;
3888 }
3889
3890 static gboolean
3891 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
3892 {
3893   GstClockTime new_send_time;
3894   GstClockTime interval;
3895   RTPSessionStats *stats;
3896
3897   if (sess->scheduled_bye)
3898     stats = &sess->bye_stats;
3899   else
3900     stats = &sess->stats;
3901
3902   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3903     data->is_early = TRUE;
3904   else
3905     data->is_early = FALSE;
3906
3907   if (data->is_early && sess->next_early_rtcp_time <= current_time) {
3908     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %"
3909         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
3910         GST_TIME_ARGS (current_time));
3911   } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
3912       sess->next_rtcp_check_time > current_time) {
3913     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
3914         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
3915         GST_TIME_ARGS (current_time));
3916     return FALSE;
3917   }
3918
3919   /* take interval and add jitter */
3920   interval = data->interval;
3921   if (interval != GST_CLOCK_TIME_NONE)
3922     interval = rtp_stats_add_rtcp_jitter (stats, interval);
3923
3924   if (sess->last_rtcp_check_time != GST_CLOCK_TIME_NONE) {
3925     /* perform forward reconsideration */
3926     if (interval != GST_CLOCK_TIME_NONE) {
3927       GstClockTime elapsed;
3928
3929       /* get elapsed time since we last reported */
3930       elapsed = current_time - sess->last_rtcp_check_time;
3931
3932       GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
3933           GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
3934       new_send_time = interval + sess->last_rtcp_check_time;
3935     } else {
3936       new_send_time = sess->last_rtcp_check_time;
3937     }
3938   } else {
3939     /* If this is the first RTCP packet, we can reconsider anything based
3940      * on the last RTCP send time because there was none.
3941      */
3942     g_warn_if_fail (!data->is_early);
3943     data->is_early = FALSE;
3944     new_send_time = current_time;
3945   }
3946
3947   if (!data->is_early) {
3948     /* check if reconsideration */
3949     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
3950       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
3951           GST_TIME_ARGS (new_send_time));
3952       /* store new check time */
3953       sess->next_rtcp_check_time = new_send_time;
3954       sess->last_rtcp_interval = interval;
3955       return FALSE;
3956     }
3957
3958     sess->last_rtcp_interval = interval;
3959     if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
3960             || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
3961         && interval != GST_CLOCK_TIME_NONE) {
3962       /* Apply the rules from RFC 4585 section 3.5.3 */
3963       if (stats->min_interval != 0 && !sess->first_rtcp) {
3964         GstClockTime T_rr_current_interval =
3965             g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND;
3966
3967         if (T_rr_current_interval > interval) {
3968           GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
3969               " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
3970               GST_TIME_ARGS (interval));
3971           interval = T_rr_current_interval;
3972         }
3973       }
3974     }
3975     sess->next_rtcp_check_time = current_time + interval;
3976   }
3977
3978
3979   GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT,
3980       GST_TIME_ARGS (sess->next_rtcp_check_time));
3981
3982   return TRUE;
3983 }
3984
3985 static void
3986 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
3987 {
3988   g_hash_table_insert (hash_table, key, g_object_ref (source));
3989 }
3990
3991 static gboolean
3992 remove_closing_sources (const gchar * key, RTPSource * source,
3993     ReportData * data)
3994 {
3995   if (source->closing)
3996     return TRUE;
3997
3998   if (source->send_fir)
3999     data->have_fir = TRUE;
4000   if (source->send_pli)
4001     data->have_pli = TRUE;
4002   if (source->send_nack)
4003     data->have_nack = TRUE;
4004
4005   return FALSE;
4006 }
4007
4008 static void
4009 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
4010 {
4011   RTPSession *sess = data->sess;
4012   gboolean is_bye = FALSE;
4013   ReportOutput *output;
4014
4015   /* only generate RTCP for active internal sources */
4016   if (!source->internal || source->sent_bye)
4017     return;
4018
4019   /* ignore other sources when we do the timeout after a scheduled BYE */
4020   if (sess->scheduled_bye && !source->marked_bye)
4021     return;
4022
4023   /* skip if RTCP is disabled */
4024   if (source->disable_rtcp) {
4025     GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
4026     return;
4027   }
4028
4029   data->source = source;
4030
4031   /* open packet */
4032   session_start_rtcp (sess, data);
4033
4034   if (source->marked_bye) {
4035     /* send BYE */
4036     make_source_bye (sess, source, data);
4037     is_bye = TRUE;
4038   } else if (!data->is_early) {
4039     /* loop over all known sources and add report blocks. If we are early, we
4040      * just make a minimal RTCP packet and skip this step */
4041     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4042         (GHFunc) session_report_blocks, data);
4043   }
4044   if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp))
4045     session_sdes (sess, data);
4046
4047   if (data->have_fir)
4048     session_fir (sess, data);
4049
4050   if (data->have_pli)
4051     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4052         (GHFunc) session_pli, data);
4053
4054   if (data->have_nack)
4055     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4056         (GHFunc) session_nack, data);
4057
4058   gst_rtcp_buffer_unmap (&data->rtcpbuf);
4059
4060   output = g_slice_new (ReportOutput);
4061   output->source = g_object_ref (source);
4062   output->is_bye = is_bye;
4063   output->buffer = data->rtcp;
4064   /* queue the RTCP packet to push later */
4065   g_queue_push_tail (&data->output, output);
4066 }
4067
4068 static void
4069 update_generation (const gchar * key, RTPSource * source, ReportData * data)
4070 {
4071   RTPSession *sess = data->sess;
4072
4073   if (g_hash_table_size (source->reported_in_sr_of) >=
4074       sess->stats.internal_sources) {
4075     /* source is reported, move to next generation */
4076     source->generation = sess->generation + 1;
4077     g_hash_table_remove_all (source->reported_in_sr_of);
4078
4079     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
4080         source->generation);
4081
4082     /* if we reported all sources in this generation, move to next */
4083     if (--data->num_to_report == 0) {
4084       sess->generation++;
4085       GST_DEBUG ("all reported, generation now %u", sess->generation);
4086     }
4087   }
4088 }
4089
4090 static void
4091 schedule_remaining_nacks (const gchar * key, RTPSource * source,
4092     ReportData * data)
4093 {
4094   RTPSession *sess = data->sess;
4095   GstClockTime *nack_deadlines;
4096   GstClockTime deadline;
4097   guint n_nacks;
4098
4099   if (!source->send_nack)
4100     return;
4101
4102   /* the scheduling is entirely based on available bandwidth, just take the
4103    * biggest seqnum, which will have the largest deadline to request early
4104    * RTCP. */
4105   nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
4106   deadline = nack_deadlines[n_nacks - 1];
4107   RTP_SESSION_UNLOCK (sess);
4108   rtp_session_send_rtcp_with_deadline (sess, deadline);
4109   RTP_SESSION_LOCK (sess);
4110 }
4111
4112 static gboolean
4113 rtp_session_are_all_sources_bye (RTPSession * sess)
4114 {
4115   GHashTableIter iter;
4116   RTPSource *src;
4117
4118   RTP_SESSION_LOCK (sess);
4119   g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
4120   while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
4121     if (src->internal && !src->sent_bye) {
4122       RTP_SESSION_UNLOCK (sess);
4123       return FALSE;
4124     }
4125   }
4126   RTP_SESSION_UNLOCK (sess);
4127
4128   return TRUE;
4129 }
4130
4131 /**
4132  * rtp_session_on_timeout:
4133  * @sess: an #RTPSession
4134  * @current_time: the current system time
4135  * @ntpnstime: the current NTP time in nanoseconds
4136  * @running_time: the current running_time of the pipeline
4137  *
4138  * Perform maintenance actions after the timeout obtained with
4139  * rtp_session_next_timeout() expired.
4140  *
4141  * This function will perform timeouts of receivers and senders, send a BYE
4142  * packet or generate RTCP packets with current session stats.
4143  *
4144  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
4145  * times, for each packet that should be processed.
4146  *
4147  * Returns: a #GstFlowReturn.
4148  */
4149 GstFlowReturn
4150 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
4151     guint64 ntpnstime, GstClockTime running_time)
4152 {
4153   GstFlowReturn result = GST_FLOW_OK;
4154   ReportData data = { GST_RTCP_BUFFER_INIT };
4155   GHashTable *table_copy;
4156   ReportOutput *output;
4157   gboolean all_empty = FALSE;
4158
4159   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
4160
4161   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
4162       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4163       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
4164
4165   data.sess = sess;
4166   data.current_time = current_time;
4167   data.ntpnstime = ntpnstime;
4168   data.running_time = running_time;
4169   data.num_to_report = 0;
4170   data.may_suppress = FALSE;
4171   data.nacked_seqnums = 0;
4172   g_queue_init (&data.output);
4173
4174   RTP_SESSION_LOCK (sess);
4175   /* get a new interval, we need this for various cleanups etc */
4176   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
4177
4178   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
4179
4180   /* we need an internal source now */
4181   if (sess->stats.internal_sources == 0) {
4182     RTPSource *source;
4183     gboolean created;
4184
4185     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
4186         current_time);
4187     sess->internal_ssrc_set = TRUE;
4188
4189     if (created)
4190       on_new_sender_ssrc (sess, source);
4191
4192     g_object_unref (source);
4193   }
4194
4195   sess->conflicting_addresses =
4196       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
4197
4198   /* Make a local copy of the hashtable. We need to do this because the
4199    * cleanup stage below releases the session lock. */
4200   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
4201       (GDestroyNotify) g_object_unref);
4202   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4203       (GHFunc) clone_ssrcs_hashtable, table_copy);
4204
4205   /* Clean up the session, mark the source for removing, this might release the
4206    * session lock. */
4207   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
4208   g_hash_table_destroy (table_copy);
4209
4210   /* Now remove the marked sources */
4211   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
4212       (GHRFunc) remove_closing_sources, &data);
4213
4214   /* update point-to-point status */
4215   session_update_ptp (sess);
4216
4217   /* see if we need to generate SR or RR packets */
4218   if (!is_rtcp_time (sess, current_time, &data))
4219     goto done;
4220
4221   /* check if all the buffers are empty afer generation */
4222   all_empty = TRUE;
4223
4224   GST_DEBUG
4225       ("doing RTCP generation %u for %u sources, early %d, may suppress %d",
4226       sess->generation, data.num_to_report, data.is_early, data.may_suppress);
4227
4228   /* generate RTCP for all internal sources */
4229   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4230       (GHFunc) generate_rtcp, &data);
4231
4232   /* update the generation for all the sources that have been reported */
4233   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4234       (GHFunc) update_generation, &data);
4235
4236   /* we keep track of the last report time in order to timeout inactive
4237    * receivers or senders */
4238   if (!data.is_early) {
4239     GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %"
4240         GST_TIME_FORMAT " = %" GST_TIME_FORMAT,
4241         GST_TIME_ARGS (data.current_time),
4242         GST_TIME_ARGS (sess->last_rtcp_send_time),
4243         GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time));
4244     sess->last_rtcp_send_time = data.current_time;
4245   }
4246
4247   GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
4248       " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time),
4249       GST_TIME_ARGS (sess->last_rtcp_check_time),
4250       GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time));
4251   sess->last_rtcp_check_time = data.current_time;
4252   sess->first_rtcp = FALSE;
4253   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
4254   sess->scheduled_bye = FALSE;
4255
4256 done:
4257   RTP_SESSION_UNLOCK (sess);
4258
4259   /* notify about updated statistics */
4260   g_object_notify (G_OBJECT (sess), "stats");
4261
4262   /* push out the RTCP packets */
4263   while ((output = g_queue_pop_head (&data.output))) {
4264     gboolean do_not_suppress, empty_buffer;
4265     GstBuffer *buffer = output->buffer;
4266     RTPSource *source = output->source;
4267
4268     /* Give the user a change to add its own packet */
4269     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
4270         buffer, data.is_early, &do_not_suppress);
4271
4272     empty_buffer = gst_buffer_get_size (buffer) == 0;
4273
4274     if (!empty_buffer)
4275       all_empty = FALSE;
4276
4277     if (sess->callbacks.send_rtcp &&
4278         !empty_buffer && (do_not_suppress || !data.may_suppress)) {
4279       guint packet_size;
4280
4281       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
4282
4283       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
4284       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
4285           sess->stats.avg_rtcp_packet_size, packet_size);
4286       result =
4287           sess->callbacks.send_rtcp (sess, source, buffer,
4288           rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data);
4289
4290       RTP_SESSION_LOCK (sess);
4291       sess->stats.nacks_sent += data.nacked_seqnums;
4292       on_sender_ssrc_active (sess, source);
4293       RTP_SESSION_UNLOCK (sess);
4294     } else {
4295       GST_DEBUG ("freeing packet callback: %p"
4296           " empty_buffer: %d, "
4297           " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp,
4298           empty_buffer, do_not_suppress, data.may_suppress);
4299       if (!empty_buffer) {
4300         RTP_SESSION_LOCK (sess);
4301         sess->stats.nacks_dropped += data.nacked_seqnums;
4302         RTP_SESSION_UNLOCK (sess);
4303       }
4304       gst_buffer_unref (buffer);
4305     }
4306     g_object_unref (source);
4307     g_slice_free (ReportOutput, output);
4308   }
4309
4310   if (all_empty)
4311     GST_ERROR ("generated empty RTCP messages for all the sources");
4312
4313   /* schedule remaining nacks */
4314   RTP_SESSION_LOCK (sess);
4315   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
4316       (GHFunc) schedule_remaining_nacks, &data);
4317   RTP_SESSION_UNLOCK (sess);
4318
4319   return result;
4320 }
4321
4322 /**
4323  * rtp_session_request_early_rtcp:
4324  * @sess: an #RTPSession
4325  * @current_time: the current system time
4326  * @max_delay: maximum delay
4327  *
4328  * Request transmission of early RTCP
4329  *
4330  * Returns: %TRUE if the related RTCP can be scheduled.
4331  */
4332 gboolean
4333 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
4334     GstClockTime max_delay)
4335 {
4336   GstClockTime T_dither_max, T_rr, offset = 0;
4337   gboolean ret;
4338   gboolean allow_early;
4339
4340   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
4341
4342   RTP_SESSION_LOCK (sess);
4343
4344   /* We assume a feedback profile if something is requesting RTCP
4345    * to be sent */
4346   sess->rtp_profile = GST_RTP_PROFILE_AVPF;
4347
4348   /* Check if already requested */
4349   /*  RFC 4585 section 3.5.2 step 2 */
4350   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
4351     GST_LOG_OBJECT (sess, "already have next early rtcp time");
4352     ret = (current_time + max_delay > sess->next_early_rtcp_time);
4353     goto end;
4354   }
4355
4356   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
4357     GST_LOG_OBJECT (sess, "no next RTCP check time");
4358     ret = FALSE;
4359     goto end;
4360   }
4361
4362   /* RFC 4585 section 3.5.3 step 1
4363    * If no regular RTCP packet has been sent before, then a regular
4364    * RTCP packet has to be scheduled first and FB messages might be
4365    * included there
4366    */
4367   if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
4368     GST_LOG_OBJECT (sess, "no RTCP sent yet");
4369
4370     if (current_time + max_delay > sess->next_rtcp_check_time) {
4371       GST_LOG_OBJECT (sess,
4372           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4373           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4374           GST_TIME_ARGS (max_delay),
4375           GST_TIME_ARGS (sess->next_rtcp_check_time));
4376       ret = TRUE;
4377     } else {
4378       GST_LOG_OBJECT (sess,
4379           "can't allow early feedback, next scheduled time is too late %"
4380           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4381           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4382           GST_TIME_ARGS (sess->next_rtcp_check_time));
4383       ret = FALSE;
4384     }
4385     goto end;
4386   }
4387
4388   T_rr = sess->last_rtcp_interval;
4389
4390   /*  RFC 4585 section 3.5.2 step 2b */
4391   /* If the total sources is <=2, then there is only us and one peer */
4392   /* When there is one auxiliary stream the session can still do point
4393    * to point.
4394    */
4395   if (sess->is_doing_ptp) {
4396     T_dither_max = 0;
4397   } else {
4398     /* Divide by 2 because l = 0.5 */
4399     T_dither_max = T_rr;
4400     T_dither_max /= 2;
4401   }
4402
4403   /*  RFC 4585 section 3.5.2 step 3 */
4404   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
4405     GST_LOG_OBJECT (sess,
4406         "don't send because of dither, next scheduled time is too soon %"
4407         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
4408         GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
4409         GST_TIME_ARGS (sess->next_rtcp_check_time));
4410     ret = T_dither_max <= max_delay;
4411     goto end;
4412   }
4413
4414   /*  RFC 4585 section 3.5.2 step 4a and
4415    *  RFC 4585 section 3.5.2 step 6 */
4416   allow_early = FALSE;
4417   if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) {
4418     /* Last time we sent a full RTCP packet, we can now immediately
4419      * send an early one as allow_early was reset to TRUE */
4420     allow_early = TRUE;
4421   } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) {
4422     /* Last packet we sent was an early RTCP packet and more than
4423      * T_rr has passed since then, meaning we would have suppressed
4424      * a regular RTCP packet already and reset allow_early to TRUE */
4425     allow_early = TRUE;
4426
4427     /* We have to offset a bit as T_rr has not passed yet, but will before
4428      * max_delay */
4429     if (sess->last_rtcp_check_time + T_rr > current_time)
4430       offset = (sess->last_rtcp_check_time + T_rr) - current_time;
4431   } else {
4432     GST_DEBUG_OBJECT (sess,
4433         "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %"
4434         GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %"
4435         GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time),
4436         GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr),
4437         GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay));
4438   }
4439
4440   if (!allow_early) {
4441     /* Ignore the request a scheduled packet will be in time anyway */
4442     if (current_time + max_delay > sess->next_rtcp_check_time) {
4443       GST_LOG_OBJECT (sess,
4444           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
4445           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
4446           GST_TIME_ARGS (max_delay),
4447           GST_TIME_ARGS (sess->next_rtcp_check_time));
4448       ret = TRUE;
4449     } else {
4450       GST_LOG_OBJECT (sess,
4451           "can't allow early feedback and next scheduled time is too late %"
4452           GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
4453           GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
4454           GST_TIME_ARGS (sess->next_rtcp_check_time));
4455       ret = FALSE;
4456     }
4457     goto end;
4458   }
4459
4460   /*  RFC 4585 section 3.5.2 step 4b */
4461   if (T_dither_max) {
4462     /* Schedule an early transmission later */
4463     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
4464         current_time + offset;
4465   } else {
4466     /* If no dithering, schedule it for NOW */
4467     sess->next_early_rtcp_time = current_time + offset;
4468   }
4469
4470   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
4471       ", next regular RTCP time %" GST_TIME_FORMAT,
4472       GST_TIME_ARGS (sess->next_early_rtcp_time),
4473       GST_TIME_ARGS (sess->next_rtcp_check_time));
4474   RTP_SESSION_UNLOCK (sess);
4475
4476   /* notify app of need to send packet early
4477    * and therefore of timeout change */
4478   if (sess->callbacks.reconsider)
4479     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
4480
4481   return TRUE;
4482
4483 end:
4484
4485   RTP_SESSION_UNLOCK (sess);
4486
4487   return ret;
4488 }
4489
4490 static gboolean
4491 rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
4492     GstClockTime max_delay)
4493 {
4494   /* notify the application that we intend to send early RTCP */
4495   if (sess->callbacks.notify_early_rtcp)
4496     sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
4497
4498   return rtp_session_request_early_rtcp (sess, now, max_delay);
4499 }
4500
4501 static gboolean
4502 rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
4503 {
4504   GstClockTime now, max_delay;
4505
4506   if (!sess->callbacks.send_rtcp)
4507     return FALSE;
4508
4509   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4510
4511   if (deadline < now)
4512     return FALSE;
4513
4514   max_delay = deadline - now;
4515
4516   return rtp_session_send_rtcp_internal (sess, now, max_delay);
4517 }
4518
4519 static gboolean
4520 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
4521 {
4522   GstClockTime now;
4523
4524   if (!sess->callbacks.send_rtcp)
4525     return FALSE;
4526
4527   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4528
4529   return rtp_session_send_rtcp_internal (sess, now, max_delay);
4530 }
4531
4532 gboolean
4533 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
4534     gboolean fir, gint count)
4535 {
4536   RTPSource *src;
4537
4538   RTP_SESSION_LOCK (sess);
4539   src = find_source (sess, ssrc);
4540   if (src == NULL)
4541     goto no_source;
4542
4543   if (fir) {
4544     src->send_pli = FALSE;
4545     src->send_fir = TRUE;
4546
4547     if (count == -1 || count != src->last_fir_count)
4548       src->current_send_fir_seqnum++;
4549     src->last_fir_count = count;
4550   } else if (!src->send_fir) {
4551     src->send_pli = TRUE;
4552   }
4553   RTP_SESSION_UNLOCK (sess);
4554
4555   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
4556     GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP");
4557   }
4558
4559   return TRUE;
4560
4561   /* ERRORS */
4562 no_source:
4563   {
4564     RTP_SESSION_UNLOCK (sess);
4565     return FALSE;
4566   }
4567 }
4568
4569 /**
4570  * rtp_session_request_nack:
4571  * @sess: a #RTPSession
4572  * @ssrc: the SSRC
4573  * @seqnum: the missing seqnum
4574  * @max_delay: max delay to request NACK
4575  *
4576  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
4577  *
4578  * Returns: %TRUE if the NACK feedback could be scheduled
4579  */
4580 gboolean
4581 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
4582     GstClockTime max_delay)
4583 {
4584   RTPSource *source;
4585   GstClockTime now;
4586
4587   if (!sess->callbacks.send_rtcp)
4588     return FALSE;
4589
4590   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
4591
4592   RTP_SESSION_LOCK (sess);
4593   source = find_source (sess, ssrc);
4594   if (source == NULL)
4595     goto no_source;
4596
4597   GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
4598       ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
4599   rtp_source_register_nack (source, seqnum, now + max_delay);
4600   RTP_SESSION_UNLOCK (sess);
4601
4602   if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
4603     GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
4604   }
4605
4606   return TRUE;
4607
4608   /* ERRORS */
4609 no_source:
4610   {
4611     RTP_SESSION_UNLOCK (sess);
4612     return FALSE;
4613   }
4614 }