Fix double semicolons
[platform/upstream/gstreamer.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25
26 #include <gst/rtp/gstrtpbuffer.h>
27 #include <gst/rtp/gstrtcpbuffer.h>
28
29 #include <gst/glib-compat-private.h>
30
31 #include "rtpsession.h"
32
33 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
34 #define GST_CAT_DEFAULT rtp_session_debug
35
36 /* signals and args */
37 enum
38 {
39   SIGNAL_GET_SOURCE_BY_SSRC,
40   SIGNAL_ON_NEW_SSRC,
41   SIGNAL_ON_SSRC_COLLISION,
42   SIGNAL_ON_SSRC_VALIDATED,
43   SIGNAL_ON_SSRC_ACTIVE,
44   SIGNAL_ON_SSRC_SDES,
45   SIGNAL_ON_BYE_SSRC,
46   SIGNAL_ON_BYE_TIMEOUT,
47   SIGNAL_ON_TIMEOUT,
48   SIGNAL_ON_SENDER_TIMEOUT,
49   SIGNAL_ON_SENDING_RTCP,
50   SIGNAL_ON_FEEDBACK_RTCP,
51   SIGNAL_SEND_RTCP,
52   SIGNAL_SEND_RTCP_FULL,
53   SIGNAL_ON_RECEIVING_RTCP,
54   LAST_SIGNAL
55 };
56
57 #define DEFAULT_INTERNAL_SOURCE      NULL
58 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
59 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
60 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
61 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
62 #define DEFAULT_RTCP_MTU             1400
63 #define DEFAULT_SDES                 NULL
64 #define DEFAULT_NUM_SOURCES          0
65 #define DEFAULT_NUM_ACTIVE_SOURCES   0
66 #define DEFAULT_SOURCES              NULL
67 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
68 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
69 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
70 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
71
72 enum
73 {
74   PROP_0,
75   PROP_INTERNAL_SSRC,
76   PROP_INTERNAL_SOURCE,
77   PROP_BANDWIDTH,
78   PROP_RTCP_FRACTION,
79   PROP_RTCP_RR_BANDWIDTH,
80   PROP_RTCP_RS_BANDWIDTH,
81   PROP_RTCP_MTU,
82   PROP_SDES,
83   PROP_NUM_SOURCES,
84   PROP_NUM_ACTIVE_SOURCES,
85   PROP_SOURCES,
86   PROP_FAVOR_NEW,
87   PROP_RTCP_MIN_INTERVAL,
88   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
89   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
90   PROP_PROBATION,
91   PROP_STATS,
92   PROP_LAST
93 };
94
95 /* update average packet size */
96 #define INIT_AVG(avg, val) \
97    (avg) = (val);
98 #define UPDATE_AVG(avg, val)            \
99   if ((avg) == 0)                       \
100    (avg) = (val);                       \
101   else                                  \
102    (avg) = ((val) + (15 * (avg))) >> 4;
103
104
105 /* GObject vmethods */
106 static void rtp_session_finalize (GObject * object);
107 static void rtp_session_set_property (GObject * object, guint prop_id,
108     const GValue * value, GParamSpec * pspec);
109 static void rtp_session_get_property (GObject * object, guint prop_id,
110     GValue * value, GParamSpec * pspec);
111
112 static gboolean rtp_session_send_rtcp (RTPSession * sess,
113     GstClockTime max_delay);
114
115 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
116
117 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
118
119 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
120 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
121     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
122 static RTPSource *obtain_internal_source (RTPSession * sess,
123     guint32 ssrc, gboolean * created, GstClockTime current_time);
124 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
125     GstClockTime current_time);
126 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
127     gboolean deterministic, gboolean first);
128
129 static gboolean
130 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
131     const GValue * handler_return, gpointer data)
132 {
133   if (g_value_get_boolean (handler_return))
134     g_value_set_boolean (return_accu, TRUE);
135
136   return TRUE;
137 }
138
139 static void
140 rtp_session_class_init (RTPSessionClass * klass)
141 {
142   GObjectClass *gobject_class;
143
144   gobject_class = (GObjectClass *) klass;
145
146   gobject_class->finalize = rtp_session_finalize;
147   gobject_class->set_property = rtp_session_set_property;
148   gobject_class->get_property = rtp_session_get_property;
149
150   /**
151    * RTPSession::get-source-by-ssrc:
152    * @session: the object which received the signal
153    * @ssrc: the SSRC of the RTPSource
154    *
155    * Request the #RTPSource object with SSRC @ssrc in @session.
156    */
157   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
158       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
159       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
160           get_source_by_ssrc), NULL, NULL, g_cclosure_marshal_generic,
161       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
162
163   /**
164    * RTPSession::on-new-ssrc:
165    * @session: the object which received the signal
166    * @src: the new RTPSource
167    *
168    * Notify of a new SSRC that entered @session.
169    */
170   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
171       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
172       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
173       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
174       RTP_TYPE_SOURCE);
175   /**
176    * RTPSession::on-ssrc-collision:
177    * @session: the object which received the signal
178    * @src: the #RTPSource that caused a collision
179    *
180    * Notify when we have an SSRC collision
181    */
182   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
183       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
184       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
185       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
186       RTP_TYPE_SOURCE);
187   /**
188    * RTPSession::on-ssrc-validated:
189    * @session: the object which received the signal
190    * @src: the new validated RTPSource
191    *
192    * Notify of a new SSRC that became validated.
193    */
194   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
195       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
196       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
197       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
198       RTP_TYPE_SOURCE);
199   /**
200    * RTPSession::on-ssrc-active:
201    * @session: the object which received the signal
202    * @src: the active RTPSource
203    *
204    * Notify of a SSRC that is active, i.e., sending RTCP.
205    */
206   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
207       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
208       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
209       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
210       RTP_TYPE_SOURCE);
211   /**
212    * RTPSession::on-ssrc-sdes:
213    * @session: the object which received the signal
214    * @src: the RTPSource
215    *
216    * Notify that a new SDES was received for SSRC.
217    */
218   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
219       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
220       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
221       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
222       RTP_TYPE_SOURCE);
223   /**
224    * RTPSession::on-bye-ssrc:
225    * @session: the object which received the signal
226    * @src: the RTPSource that went away
227    *
228    * Notify of an SSRC that became inactive because of a BYE packet.
229    */
230   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
231       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
232       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
233       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
234       RTP_TYPE_SOURCE);
235   /**
236    * RTPSession::on-bye-timeout:
237    * @session: the object which received the signal
238    * @src: the RTPSource that timed out
239    *
240    * Notify of an SSRC that has timed out because of BYE
241    */
242   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
243       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
245       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
246       RTP_TYPE_SOURCE);
247   /**
248    * RTPSession::on-timeout:
249    * @session: the object which received the signal
250    * @src: the RTPSource that timed out
251    *
252    * Notify of an SSRC that has timed out
253    */
254   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
255       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
256       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
257       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
258       RTP_TYPE_SOURCE);
259   /**
260    * RTPSession::on-sender-timeout:
261    * @session: the object which received the signal
262    * @src: the RTPSource that timed out
263    *
264    * Notify of an SSRC that was a sender but timed out and became a receiver.
265    */
266   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
267       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
268       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
269       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
270       RTP_TYPE_SOURCE);
271
272   /**
273    * RTPSession::on-sending-rtcp
274    * @session: the object which received the signal
275    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
276    * @early: %TRUE if the packet is early, %FALSE if it is regular
277    *
278    * This signal is emitted before sending an RTCP packet, it can be used
279    * to add extra RTCP Packets.
280    *
281    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
282    * if suppressing it is acceptable
283    */
284   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
285       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
286       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
287       accumulate_trues, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2,
288       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
289
290   /**
291    * RTPSession::on-feedback-rtcp:
292    * @session: the object which received the signal
293    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
294    *  %GST_RTCP_TYPE_RTPFB
295    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
296    * @sender_ssrc: The SSRC of the sender
297    * @media_ssrc: The SSRC of the media this refers to
298    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
299    * there was no FCI
300    *
301    * Notify that a RTCP feedback packet has been received
302    */
303   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
304       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
305       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
306       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 5, G_TYPE_UINT,
307       G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, GST_TYPE_BUFFER);
308
309   /**
310    * RTPSession::send-rtcp:
311    * @session: the object which received the signal
312    * @max_delay: The maximum delay after which the feedback will not be useful
313    *  anymore
314    *
315    * Requests that the #RTPSession initiate a new RTCP packet as soon as
316    * possible within the requested delay.
317    */
318   rtp_session_signals[SIGNAL_SEND_RTCP] =
319       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
320       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
321       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
322       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
323
324   /**
325    * RTPSession::send-rtcp-full:
326    * @session: the object which received the signal
327    * @max_delay: The maximum delay after which the feedback will not be useful
328    *  anymore
329    *
330    * Requests that the #RTPSession initiate a new RTCP packet as soon as
331    * possible within the requested delay.
332    *
333    * Returns: TRUE if the new RTCP packet could be scheduled within the
334    * requested delay, FALSE otherwise.
335    *
336    * Since: 1.6
337    */
338   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
339       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
340       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
341       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
342       g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
343
344   /**
345    * RTPSession::on-receiving-rtcp
346    * @session: the object which received the signal
347    * @buffer: the #GstBuffer containing the RTCP packet that was received
348    *
349    * This signal is emitted when receiving an RTCP packet before it is handled
350    * by the session. It can be used to extract custom information from RTCP packets.
351    *
352    * Since: 1.6
353    */
354   rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
355       g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
356       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
357       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
358       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
359
360   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
361       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
362           "The internal SSRC used for the session (deprecated)",
363           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364
365   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
366       g_param_spec_object ("internal-source", "Internal Source",
367           "The internal source element of the session (deprecated)",
368           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
371       g_param_spec_double ("bandwidth", "Bandwidth",
372           "The bandwidth of the session (0 for auto-discover)",
373           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
374           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375
376   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
377       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
378           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
379           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
380           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381
382   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
383       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
384           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
385           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
386           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
387
388   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
389       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
390           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
391           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
392           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393
394   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
395       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
396           "The maximum size of the RTCP packets",
397           16, G_MAXINT16, DEFAULT_RTCP_MTU,
398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399
400   g_object_class_install_property (gobject_class, PROP_SDES,
401       g_param_spec_boxed ("sdes", "SDES",
402           "The SDES items of this session",
403           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404
405   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
406       g_param_spec_uint ("num-sources", "Num Sources",
407           "The number of sources in the session", 0, G_MAXUINT,
408           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
409
410   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
411       g_param_spec_uint ("num-active-sources", "Num Active Sources",
412           "The number of active sources in the session", 0, G_MAXUINT,
413           DEFAULT_NUM_ACTIVE_SOURCES,
414           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
415   /**
416    * RTPSource::sources
417    *
418    * Get a GValue Array of all sources in the session.
419    *
420    * <example>
421    * <title>Getting the #RTPSources of a session
422    * <programlisting>
423    * {
424    *   GValueArray *arr;
425    *   GValue *val;
426    *   guint i;
427    *
428    *   g_object_get (sess, "sources", &arr, NULL);
429    *
430    *   for (i = 0; i < arr->n_values; i++) {
431    *     RTPSource *source;
432    *
433    *     val = g_value_array_get_nth (arr, i);
434    *     source = g_value_get_object (val);
435    *   }
436    *   g_value_array_free (arr);
437    * }
438    * </programlisting>
439    * </example>
440    */
441   g_object_class_install_property (gobject_class, PROP_SOURCES,
442       g_param_spec_boxed ("sources", "Sources",
443           "An array of all known sources in the session",
444           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
445
446   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
447       g_param_spec_boolean ("favor-new", "Favor new sources",
448           "Resolve SSRC conflict in favor of new sources", FALSE,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450
451   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
452       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
453           "Minimum interval between Regular RTCP packet (in ns)",
454           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
455           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456
457   g_object_class_install_property (gobject_class,
458       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
459       g_param_spec_uint64 ("rtcp-feedback-retention-window",
460           "RTCP Feedback retention window",
461           "Duration during which RTCP Feedback packets are retained (in ns)",
462           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
463           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
464
465   g_object_class_install_property (gobject_class,
466       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
467       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
468           "RTCP Immediate Feedback threshold",
469           "The maximum number of members of a RTP session for which immediate"
470           " feedback is used (DEPRECATED: has no effect and is not needed)",
471           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
472           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
473
474   g_object_class_install_property (gobject_class, PROP_PROBATION,
475       g_param_spec_uint ("probation", "Number of probations",
476           "Consecutive packet sequence numbers to accept the source",
477           0, G_MAXUINT, DEFAULT_PROBATION,
478           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
479
480   /**
481    * RTPSession::stats:
482    *
483    * Various session statistics. This property returns a GstStructure
484    * with name application/x-rtp-session-stats with the following fields:
485    *
486    *  "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
487    *      dropped (due to bandwidth constraints)
488    *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
489    *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
490    *
491    * Since: 1.4
492    */
493   g_object_class_install_property (gobject_class, PROP_STATS,
494       g_param_spec_boxed ("stats", "Statistics",
495           "Various statistics", GST_TYPE_STRUCTURE,
496           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
497
498   klass->get_source_by_ssrc =
499       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
500   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
501
502   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
503 }
504
505 static void
506 rtp_session_init (RTPSession * sess)
507 {
508   gint i;
509   gchar *str;
510
511   g_mutex_init (&sess->lock);
512   sess->key = g_random_int ();
513   sess->mask_idx = 0;
514   sess->mask = 0;
515
516   /* TODO: We currently only use the first hash table but this is the
517    * beginning of an implementation for RFC2762
518    for (i = 0; i < 32; i++) {
519    */
520   for (i = 0; i < 1; i++) {
521     sess->ssrcs[i] =
522         g_hash_table_new_full (NULL, NULL, NULL,
523         (GDestroyNotify) g_object_unref);
524   }
525
526   rtp_stats_init_defaults (&sess->stats);
527   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
528   rtp_stats_set_min_interval (&sess->stats,
529       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
530
531   sess->recalc_bandwidth = TRUE;
532   sess->bandwidth = DEFAULT_BANDWIDTH;
533   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
534   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
535   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
536
537   /* default UDP header length */
538   sess->header_len = 28;
539   sess->mtu = DEFAULT_RTCP_MTU;
540
541   sess->probation = DEFAULT_PROBATION;
542
543   /* some default SDES entries */
544   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
545
546   /* we do not want to leak details like the username or hostname here */
547   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
548   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
549   g_free (str);
550
551 #if 0
552   /* we do not want to leak the user's real name here */
553   str = g_strdup_printf ("Anon%u", g_random_int ());
554   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
555   g_free (str);
556 #endif
557
558   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
559
560   /* this is the SSRC we suggest */
561   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
562
563   sess->first_rtcp = TRUE;
564   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
565   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
566
567   sess->allow_early = TRUE;
568   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
569   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
570   sess->rtcp_immediate_feedback_threshold =
571       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
572
573   sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
574
575   sess->is_doing_ptp = TRUE;
576 }
577
578 static void
579 rtp_session_finalize (GObject * object)
580 {
581   RTPSession *sess;
582   gint i;
583
584   sess = RTP_SESSION_CAST (object);
585
586   gst_structure_free (sess->sdes);
587
588   g_list_free_full (sess->conflicting_addresses,
589       (GDestroyNotify) rtp_conflicting_address_free);
590
591   /* TODO: Change this again when implementing RFC 2762
592    * for (i = 0; i < 32; i++)
593    */
594   for (i = 0; i < 1; i++)
595     g_hash_table_destroy (sess->ssrcs[i]);
596
597   g_mutex_clear (&sess->lock);
598
599   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
600 }
601
602 static void
603 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
604 {
605   GValue value = { 0 };
606
607   g_value_init (&value, RTP_TYPE_SOURCE);
608   g_value_take_object (&value, source);
609   /* copies the value */
610   g_value_array_append (arr, &value);
611 }
612
613 static GValueArray *
614 rtp_session_create_sources (RTPSession * sess)
615 {
616   GValueArray *res;
617   guint size;
618
619   RTP_SESSION_LOCK (sess);
620   /* get number of elements in the table */
621   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
622   /* create the result value array */
623   res = g_value_array_new (size);
624
625   /* and copy all values into the array */
626   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
627   RTP_SESSION_UNLOCK (sess);
628
629   return res;
630 }
631
632 static GstStructure *
633 rtp_session_create_stats (RTPSession * sess)
634 {
635   GstStructure *s;
636
637   s = gst_structure_new ("application/x-rtp-session-stats",
638       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
639       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
640       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
641
642   return s;
643 }
644
645 static void
646 rtp_session_set_property (GObject * object, guint prop_id,
647     const GValue * value, GParamSpec * pspec)
648 {
649   RTPSession *sess;
650
651   sess = RTP_SESSION (object);
652
653   switch (prop_id) {
654     case PROP_INTERNAL_SSRC:
655       RTP_SESSION_LOCK (sess);
656       sess->suggested_ssrc = g_value_get_uint (value);
657       RTP_SESSION_UNLOCK (sess);
658       if (sess->callbacks.reconfigure)
659         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
660       break;
661     case PROP_BANDWIDTH:
662       RTP_SESSION_LOCK (sess);
663       sess->bandwidth = g_value_get_double (value);
664       sess->recalc_bandwidth = TRUE;
665       RTP_SESSION_UNLOCK (sess);
666       break;
667     case PROP_RTCP_FRACTION:
668       RTP_SESSION_LOCK (sess);
669       sess->rtcp_bandwidth = g_value_get_double (value);
670       sess->recalc_bandwidth = TRUE;
671       RTP_SESSION_UNLOCK (sess);
672       break;
673     case PROP_RTCP_RR_BANDWIDTH:
674       RTP_SESSION_LOCK (sess);
675       sess->rtcp_rr_bandwidth = g_value_get_int (value);
676       sess->recalc_bandwidth = TRUE;
677       RTP_SESSION_UNLOCK (sess);
678       break;
679     case PROP_RTCP_RS_BANDWIDTH:
680       RTP_SESSION_LOCK (sess);
681       sess->rtcp_rs_bandwidth = g_value_get_int (value);
682       sess->recalc_bandwidth = TRUE;
683       RTP_SESSION_UNLOCK (sess);
684       break;
685     case PROP_RTCP_MTU:
686       sess->mtu = g_value_get_uint (value);
687       break;
688     case PROP_SDES:
689       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
690       break;
691     case PROP_FAVOR_NEW:
692       sess->favor_new = g_value_get_boolean (value);
693       break;
694     case PROP_RTCP_MIN_INTERVAL:
695       rtp_stats_set_min_interval (&sess->stats,
696           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
697       /* trigger reconsideration */
698       RTP_SESSION_LOCK (sess);
699       sess->next_rtcp_check_time = 0;
700       RTP_SESSION_UNLOCK (sess);
701       if (sess->callbacks.reconsider)
702         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
703       break;
704     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
705       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
706       break;
707     case PROP_PROBATION:
708       sess->probation = g_value_get_uint (value);
709       break;
710     default:
711       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
712       break;
713   }
714 }
715
716 static void
717 rtp_session_get_property (GObject * object, guint prop_id,
718     GValue * value, GParamSpec * pspec)
719 {
720   RTPSession *sess;
721
722   sess = RTP_SESSION (object);
723
724   switch (prop_id) {
725     case PROP_INTERNAL_SSRC:
726       g_value_set_uint (value, rtp_session_suggest_ssrc (sess));
727       break;
728     case PROP_INTERNAL_SOURCE:
729       /* FIXME, return a random source */
730       g_value_set_object (value, NULL);
731       break;
732     case PROP_BANDWIDTH:
733       g_value_set_double (value, sess->bandwidth);
734       break;
735     case PROP_RTCP_FRACTION:
736       g_value_set_double (value, sess->rtcp_bandwidth);
737       break;
738     case PROP_RTCP_RR_BANDWIDTH:
739       g_value_set_int (value, sess->rtcp_rr_bandwidth);
740       break;
741     case PROP_RTCP_RS_BANDWIDTH:
742       g_value_set_int (value, sess->rtcp_rs_bandwidth);
743       break;
744     case PROP_RTCP_MTU:
745       g_value_set_uint (value, sess->mtu);
746       break;
747     case PROP_SDES:
748       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
749       break;
750     case PROP_NUM_SOURCES:
751       g_value_set_uint (value, rtp_session_get_num_sources (sess));
752       break;
753     case PROP_NUM_ACTIVE_SOURCES:
754       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
755       break;
756     case PROP_SOURCES:
757       g_value_take_boxed (value, rtp_session_create_sources (sess));
758       break;
759     case PROP_FAVOR_NEW:
760       g_value_set_boolean (value, sess->favor_new);
761       break;
762     case PROP_RTCP_MIN_INTERVAL:
763       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
764       break;
765     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
766       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
767       break;
768     case PROP_PROBATION:
769       g_value_set_uint (value, sess->probation);
770       break;
771     case PROP_STATS:
772       g_value_take_boxed (value, rtp_session_create_stats (sess));
773       break;
774     default:
775       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
776       break;
777   }
778 }
779
780 static void
781 on_new_ssrc (RTPSession * sess, RTPSource * source)
782 {
783   g_object_ref (source);
784   RTP_SESSION_UNLOCK (sess);
785   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
786   RTP_SESSION_LOCK (sess);
787   g_object_unref (source);
788 }
789
790 static void
791 on_ssrc_collision (RTPSession * sess, RTPSource * source)
792 {
793   g_object_ref (source);
794   RTP_SESSION_UNLOCK (sess);
795   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
796       source);
797   RTP_SESSION_LOCK (sess);
798   g_object_unref (source);
799 }
800
801 static void
802 on_ssrc_validated (RTPSession * sess, RTPSource * source)
803 {
804   g_object_ref (source);
805   RTP_SESSION_UNLOCK (sess);
806   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
807       source);
808   RTP_SESSION_LOCK (sess);
809   g_object_unref (source);
810 }
811
812 static void
813 on_ssrc_active (RTPSession * sess, RTPSource * source)
814 {
815   g_object_ref (source);
816   RTP_SESSION_UNLOCK (sess);
817   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
818   RTP_SESSION_LOCK (sess);
819   g_object_unref (source);
820 }
821
822 static void
823 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
824 {
825   g_object_ref (source);
826   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
827   RTP_SESSION_UNLOCK (sess);
828   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
829   RTP_SESSION_LOCK (sess);
830   g_object_unref (source);
831 }
832
833 static void
834 on_bye_ssrc (RTPSession * sess, RTPSource * source)
835 {
836   g_object_ref (source);
837   RTP_SESSION_UNLOCK (sess);
838   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
839   RTP_SESSION_LOCK (sess);
840   g_object_unref (source);
841 }
842
843 static void
844 on_bye_timeout (RTPSession * sess, RTPSource * source)
845 {
846   g_object_ref (source);
847   RTP_SESSION_UNLOCK (sess);
848   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
849   RTP_SESSION_LOCK (sess);
850   g_object_unref (source);
851 }
852
853 static void
854 on_timeout (RTPSession * sess, RTPSource * source)
855 {
856   g_object_ref (source);
857   RTP_SESSION_UNLOCK (sess);
858   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
859   RTP_SESSION_LOCK (sess);
860   g_object_unref (source);
861 }
862
863 static void
864 on_sender_timeout (RTPSession * sess, RTPSource * source)
865 {
866   g_object_ref (source);
867   RTP_SESSION_UNLOCK (sess);
868   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
869       source);
870   RTP_SESSION_LOCK (sess);
871   g_object_unref (source);
872 }
873
874 /**
875  * rtp_session_new:
876  *
877  * Create a new session object.
878  *
879  * Returns: a new #RTPSession. g_object_unref() after usage.
880  */
881 RTPSession *
882 rtp_session_new (void)
883 {
884   RTPSession *sess;
885
886   sess = g_object_new (RTP_TYPE_SESSION, NULL);
887
888   return sess;
889 }
890
891 /**
892  * rtp_session_set_callbacks:
893  * @sess: an #RTPSession
894  * @callbacks: callbacks to configure
895  * @user_data: user data passed in the callbacks
896  *
897  * Configure a set of callbacks to be notified of actions.
898  */
899 void
900 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
901     gpointer user_data)
902 {
903   g_return_if_fail (RTP_IS_SESSION (sess));
904
905   if (callbacks->process_rtp) {
906     sess->callbacks.process_rtp = callbacks->process_rtp;
907     sess->process_rtp_user_data = user_data;
908   }
909   if (callbacks->send_rtp) {
910     sess->callbacks.send_rtp = callbacks->send_rtp;
911     sess->send_rtp_user_data = user_data;
912   }
913   if (callbacks->send_rtcp) {
914     sess->callbacks.send_rtcp = callbacks->send_rtcp;
915     sess->send_rtcp_user_data = user_data;
916   }
917   if (callbacks->sync_rtcp) {
918     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
919     sess->sync_rtcp_user_data = user_data;
920   }
921   if (callbacks->clock_rate) {
922     sess->callbacks.clock_rate = callbacks->clock_rate;
923     sess->clock_rate_user_data = user_data;
924   }
925   if (callbacks->reconsider) {
926     sess->callbacks.reconsider = callbacks->reconsider;
927     sess->reconsider_user_data = user_data;
928   }
929   if (callbacks->request_key_unit) {
930     sess->callbacks.request_key_unit = callbacks->request_key_unit;
931     sess->request_key_unit_user_data = user_data;
932   }
933   if (callbacks->request_time) {
934     sess->callbacks.request_time = callbacks->request_time;
935     sess->request_time_user_data = user_data;
936   }
937   if (callbacks->notify_nack) {
938     sess->callbacks.notify_nack = callbacks->notify_nack;
939     sess->notify_nack_user_data = user_data;
940   }
941   if (callbacks->reconfigure) {
942     sess->callbacks.reconfigure = callbacks->reconfigure;
943     sess->reconfigure_user_data = user_data;
944   }
945 }
946
947 /**
948  * rtp_session_set_process_rtp_callback:
949  * @sess: an #RTPSession
950  * @callback: callback to set
951  * @user_data: user data passed in the callback
952  *
953  * Configure only the process_rtp callback to be notified of the process_rtp action.
954  */
955 void
956 rtp_session_set_process_rtp_callback (RTPSession * sess,
957     RTPSessionProcessRTP callback, gpointer user_data)
958 {
959   g_return_if_fail (RTP_IS_SESSION (sess));
960
961   sess->callbacks.process_rtp = callback;
962   sess->process_rtp_user_data = user_data;
963 }
964
965 /**
966  * rtp_session_set_send_rtp_callback:
967  * @sess: an #RTPSession
968  * @callback: callback to set
969  * @user_data: user data passed in the callback
970  *
971  * Configure only the send_rtp callback to be notified of the send_rtp action.
972  */
973 void
974 rtp_session_set_send_rtp_callback (RTPSession * sess,
975     RTPSessionSendRTP callback, gpointer user_data)
976 {
977   g_return_if_fail (RTP_IS_SESSION (sess));
978
979   sess->callbacks.send_rtp = callback;
980   sess->send_rtp_user_data = user_data;
981 }
982
983 /**
984  * rtp_session_set_send_rtcp_callback:
985  * @sess: an #RTPSession
986  * @callback: callback to set
987  * @user_data: user data passed in the callback
988  *
989  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
990  */
991 void
992 rtp_session_set_send_rtcp_callback (RTPSession * sess,
993     RTPSessionSendRTCP callback, gpointer user_data)
994 {
995   g_return_if_fail (RTP_IS_SESSION (sess));
996
997   sess->callbacks.send_rtcp = callback;
998   sess->send_rtcp_user_data = user_data;
999 }
1000
1001 /**
1002  * rtp_session_set_sync_rtcp_callback:
1003  * @sess: an #RTPSession
1004  * @callback: callback to set
1005  * @user_data: user data passed in the callback
1006  *
1007  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
1008  */
1009 void
1010 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
1011     RTPSessionSyncRTCP callback, gpointer user_data)
1012 {
1013   g_return_if_fail (RTP_IS_SESSION (sess));
1014
1015   sess->callbacks.sync_rtcp = callback;
1016   sess->sync_rtcp_user_data = user_data;
1017 }
1018
1019 /**
1020  * rtp_session_set_clock_rate_callback:
1021  * @sess: an #RTPSession
1022  * @callback: callback to set
1023  * @user_data: user data passed in the callback
1024  *
1025  * Configure only the clock_rate callback to be notified of the clock_rate action.
1026  */
1027 void
1028 rtp_session_set_clock_rate_callback (RTPSession * sess,
1029     RTPSessionClockRate callback, gpointer user_data)
1030 {
1031   g_return_if_fail (RTP_IS_SESSION (sess));
1032
1033   sess->callbacks.clock_rate = callback;
1034   sess->clock_rate_user_data = user_data;
1035 }
1036
1037 /**
1038  * rtp_session_set_reconsider_callback:
1039  * @sess: an #RTPSession
1040  * @callback: callback to set
1041  * @user_data: user data passed in the callback
1042  *
1043  * Configure only the reconsider callback to be notified of the reconsider action.
1044  */
1045 void
1046 rtp_session_set_reconsider_callback (RTPSession * sess,
1047     RTPSessionReconsider callback, gpointer user_data)
1048 {
1049   g_return_if_fail (RTP_IS_SESSION (sess));
1050
1051   sess->callbacks.reconsider = callback;
1052   sess->reconsider_user_data = user_data;
1053 }
1054
1055 /**
1056  * rtp_session_set_request_time_callback:
1057  * @sess: an #RTPSession
1058  * @callback: callback to set
1059  * @user_data: user data passed in the callback
1060  *
1061  * Configure only the request_time callback
1062  */
1063 void
1064 rtp_session_set_request_time_callback (RTPSession * sess,
1065     RTPSessionRequestTime callback, gpointer user_data)
1066 {
1067   g_return_if_fail (RTP_IS_SESSION (sess));
1068
1069   sess->callbacks.request_time = callback;
1070   sess->request_time_user_data = user_data;
1071 }
1072
1073 /**
1074  * rtp_session_set_bandwidth:
1075  * @sess: an #RTPSession
1076  * @bandwidth: the bandwidth allocated
1077  *
1078  * Set the session bandwidth in bytes per second.
1079  */
1080 void
1081 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1082 {
1083   g_return_if_fail (RTP_IS_SESSION (sess));
1084
1085   RTP_SESSION_LOCK (sess);
1086   sess->stats.bandwidth = bandwidth;
1087   RTP_SESSION_UNLOCK (sess);
1088 }
1089
1090 /**
1091  * rtp_session_get_bandwidth:
1092  * @sess: an #RTPSession
1093  *
1094  * Get the session bandwidth.
1095  *
1096  * Returns: the session bandwidth.
1097  */
1098 gdouble
1099 rtp_session_get_bandwidth (RTPSession * sess)
1100 {
1101   gdouble result;
1102
1103   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1104
1105   RTP_SESSION_LOCK (sess);
1106   result = sess->stats.bandwidth;
1107   RTP_SESSION_UNLOCK (sess);
1108
1109   return result;
1110 }
1111
1112 /**
1113  * rtp_session_set_rtcp_fraction:
1114  * @sess: an #RTPSession
1115  * @bandwidth: the RTCP bandwidth
1116  *
1117  * Set the bandwidth in bytes per second that should be used for RTCP
1118  * messages.
1119  */
1120 void
1121 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1122 {
1123   g_return_if_fail (RTP_IS_SESSION (sess));
1124
1125   RTP_SESSION_LOCK (sess);
1126   sess->stats.rtcp_bandwidth = bandwidth;
1127   RTP_SESSION_UNLOCK (sess);
1128 }
1129
1130 /**
1131  * rtp_session_get_rtcp_fraction:
1132  * @sess: an #RTPSession
1133  *
1134  * Get the session bandwidth used for RTCP.
1135  *
1136  * Returns: The bandwidth used for RTCP messages.
1137  */
1138 gdouble
1139 rtp_session_get_rtcp_fraction (RTPSession * sess)
1140 {
1141   gdouble result;
1142
1143   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1144
1145   RTP_SESSION_LOCK (sess);
1146   result = sess->stats.rtcp_bandwidth;
1147   RTP_SESSION_UNLOCK (sess);
1148
1149   return result;
1150 }
1151
1152 /**
1153  * rtp_session_get_sdes_struct:
1154  * @sess: an #RTSPSession
1155  *
1156  * Get the SDES data as a #GstStructure
1157  *
1158  * Returns: a GstStructure with SDES items for @sess. This function returns a
1159  * copy of the SDES structure, use gst_structure_free() after usage.
1160  */
1161 GstStructure *
1162 rtp_session_get_sdes_struct (RTPSession * sess)
1163 {
1164   GstStructure *result = NULL;
1165
1166   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1167
1168   RTP_SESSION_LOCK (sess);
1169   if (sess->sdes)
1170     result = gst_structure_copy (sess->sdes);
1171   RTP_SESSION_UNLOCK (sess);
1172
1173   return result;
1174 }
1175
1176 /**
1177  * rtp_session_set_sdes_struct:
1178  * @sess: an #RTSPSession
1179  * @sdes: a #GstStructure
1180  *
1181  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1182  */
1183 void
1184 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1185 {
1186   g_return_if_fail (sdes);
1187   g_return_if_fail (RTP_IS_SESSION (sess));
1188
1189   RTP_SESSION_LOCK (sess);
1190   if (sess->sdes)
1191     gst_structure_free (sess->sdes);
1192   sess->sdes = gst_structure_copy (sdes);
1193   RTP_SESSION_UNLOCK (sess);
1194 }
1195
1196 static GstFlowReturn
1197 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1198 {
1199   GstFlowReturn result = GST_FLOW_OK;
1200
1201   if (source->internal) {
1202     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1203
1204     RTP_SESSION_UNLOCK (session);
1205
1206     if (session->callbacks.send_rtp)
1207       result =
1208           session->callbacks.send_rtp (session, source, data,
1209           session->send_rtp_user_data);
1210     else {
1211       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1212     }
1213   } else {
1214     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1215     RTP_SESSION_UNLOCK (session);
1216
1217     if (session->callbacks.process_rtp)
1218       result =
1219           session->callbacks.process_rtp (session, source,
1220           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1221     else
1222       gst_buffer_unref (GST_BUFFER_CAST (data));
1223   }
1224   RTP_SESSION_LOCK (session);
1225
1226   return result;
1227 }
1228
1229 static gint
1230 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1231 {
1232   gint result;
1233
1234   RTP_SESSION_UNLOCK (session);
1235
1236   if (session->callbacks.clock_rate)
1237     result =
1238         session->callbacks.clock_rate (session, pt,
1239         session->clock_rate_user_data);
1240   else
1241     result = -1;
1242
1243   RTP_SESSION_LOCK (session);
1244
1245   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1246
1247   return result;
1248 }
1249
1250 static RTPSourceCallbacks callbacks = {
1251   (RTPSourcePushRTP) source_push_rtp,
1252   (RTPSourceClockRate) source_clock_rate,
1253 };
1254
1255
1256 /**
1257  * rtp_session_find_conflicting_address:
1258  * @session: The session the packet came in
1259  * @address: address to check for
1260  * @time: The time when the packet that is possibly in conflict arrived
1261  *
1262  * Checks if an address which has a conflict is already known. If it is
1263  * a known conflict, remember the time
1264  *
1265  * Returns: TRUE if it was a known conflict, FALSE otherwise
1266  */
1267 static gboolean
1268 rtp_session_find_conflicting_address (RTPSession * session,
1269     GSocketAddress * address, GstClockTime time)
1270 {
1271   return find_conflicting_address (session->conflicting_addresses, address,
1272       time);
1273 }
1274
1275 /**
1276  * rtp_session_add_conflicting_address:
1277  * @session: The session the packet came in
1278  * @address: address to remember
1279  * @time: The time when the packet that is in conflict arrived
1280  *
1281  * Adds a new conflict address
1282  */
1283 static void
1284 rtp_session_add_conflicting_address (RTPSession * sess,
1285     GSocketAddress * address, GstClockTime time)
1286 {
1287   sess->conflicting_addresses =
1288       add_conflicting_address (sess->conflicting_addresses, address, time);
1289 }
1290
1291
1292 static gboolean
1293 check_collision (RTPSession * sess, RTPSource * source,
1294     RTPPacketInfo * pinfo, gboolean rtp)
1295 {
1296   guint32 ssrc;
1297
1298   /* If we have no pinfo address, we can't do collision checking */
1299   if (!pinfo->address)
1300     return FALSE;
1301
1302   ssrc = rtp_source_get_ssrc (source);
1303
1304   if (!source->internal) {
1305     GSocketAddress *from;
1306
1307     /* This is not our local source, but lets check if two remote
1308      * source collide */
1309     if (rtp) {
1310       from = source->rtp_from;
1311     } else {
1312       from = source->rtcp_from;
1313     }
1314
1315     if (from) {
1316       if (__g_socket_address_equal (from, pinfo->address)) {
1317         /* Address is the same */
1318         return FALSE;
1319       } else {
1320         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1321         if (sess->favor_new) {
1322           if (rtp_source_find_conflicting_address (source,
1323                   pinfo->address, pinfo->current_time)) {
1324             gchar *buf1;
1325
1326             buf1 = __g_socket_address_to_string (pinfo->address);
1327             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1328                 buf1);
1329             g_free (buf1);
1330
1331             return TRUE;
1332           } else {
1333             gchar *buf1, *buf2;
1334
1335             /* Current address is not a known conflict, lets assume this is
1336              * a new source. Save old address in possible conflict list
1337              */
1338             rtp_source_add_conflicting_address (source, from,
1339                 pinfo->current_time);
1340
1341             buf1 = __g_socket_address_to_string (from);
1342             buf2 = __g_socket_address_to_string (pinfo->address);
1343
1344             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1345                 " saving old as known conflict", ssrc, buf1, buf2);
1346
1347             if (rtp)
1348               rtp_source_set_rtp_from (source, pinfo->address);
1349             else
1350               rtp_source_set_rtcp_from (source, pinfo->address);
1351
1352             g_free (buf1);
1353             g_free (buf2);
1354
1355             return FALSE;
1356           }
1357         } else {
1358           /* Don't need to save old addresses, we ignore new sources */
1359           return TRUE;
1360         }
1361       }
1362     } else {
1363       /* We don't already have a from address for RTP, just set it */
1364       if (rtp)
1365         rtp_source_set_rtp_from (source, pinfo->address);
1366       else
1367         rtp_source_set_rtcp_from (source, pinfo->address);
1368       return FALSE;
1369     }
1370
1371     /* FIXME: Log 3rd party collision somehow
1372      * Maybe should be done in upper layer, only the SDES can tell us
1373      * if its a collision or a loop
1374      */
1375   } else {
1376     /* This is sending with our ssrc, is it an address we already know */
1377     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1378             pinfo->current_time)) {
1379       /* Its a known conflict, its probably a loop, not a collision
1380        * lets just drop the incoming packet
1381        */
1382       GST_DEBUG ("Our packets are being looped back to us, dropping");
1383     } else {
1384       /* Its a new collision, lets change our SSRC */
1385       rtp_session_add_conflicting_address (sess, pinfo->address,
1386           pinfo->current_time);
1387
1388       GST_DEBUG ("Collision for SSRC %x", ssrc);
1389       /* mark the source BYE */
1390       rtp_source_mark_bye (source, "SSRC Collision");
1391       /* if we were suggesting this SSRC, change to something else */
1392       if (sess->suggested_ssrc == ssrc)
1393         sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1394
1395       on_ssrc_collision (sess, source);
1396
1397       rtp_session_schedule_bye_locked (sess, pinfo->current_time);
1398     }
1399   }
1400
1401   return TRUE;
1402 }
1403
1404 typedef struct
1405 {
1406   gboolean is_doing_ptp;
1407   GSocketAddress *new_addr;
1408 } CompareAddrData;
1409
1410 /* check if the two given ip addr are the same (do not care about the port) */
1411 static gboolean
1412 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1413 {
1414   return
1415       g_inet_address_equal (g_inet_socket_address_get_address
1416       (G_INET_SOCKET_ADDRESS (a)),
1417       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1418 }
1419
1420 static void
1421 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1422     CompareAddrData * data)
1423 {
1424   /* only compare ip addr of remote sources which are also not closing */
1425   if (!source->internal && !source->closing && source->rtp_from) {
1426     /* look for the first rtp source */
1427     if (!data->new_addr)
1428       data->new_addr = source->rtp_from;
1429     /* compare current ip addr with the first one */
1430     else
1431       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1432   }
1433 }
1434
1435 static void
1436 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1437     CompareAddrData * data)
1438 {
1439   /* only compare ip addr of remote sources which are also not closing */
1440   if (!source->internal && !source->closing && source->rtcp_from) {
1441     /* look for the first rtcp source */
1442     if (!data->new_addr)
1443       data->new_addr = source->rtcp_from;
1444     else
1445       /* compare current ip addr with the first one */
1446       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1447   }
1448 }
1449
1450 /* loop over our non-internal source to know if the session
1451  * is doing point-to-point */
1452 static void
1453 session_update_ptp (RTPSession * sess)
1454 {
1455   /* to know if the session is doing point to point, the ip addr
1456    * of each non-internal (=remotes) source have to be compared
1457    * to each other.
1458    */
1459   gboolean is_doing_rtp_ptp;
1460   gboolean is_doing_rtcp_ptp;
1461   CompareAddrData data;
1462
1463   /* compare the first remote source's ip addr that receive rtp packets
1464    * with other remote rtp source.
1465    * it's enough because the session just needs to know if they are all
1466    * equals or not
1467    */
1468   data.is_doing_ptp = TRUE;
1469   data.new_addr = NULL;
1470   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1471       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1472   is_doing_rtp_ptp = data.is_doing_ptp;
1473
1474   /* same but about rtcp */
1475   data.is_doing_ptp = TRUE;
1476   data.new_addr = NULL;
1477   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1478       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1479   is_doing_rtcp_ptp = data.is_doing_ptp;
1480
1481   /* the session is doing point-to-point if all rtp remote have the same
1482    * ip addr and if all rtcp remote sources have the same ip addr */
1483   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1484
1485   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1486 }
1487
1488 static void
1489 add_source (RTPSession * sess, RTPSource * src)
1490 {
1491   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1492       GINT_TO_POINTER (src->ssrc), src);
1493   /* report the new source ASAP */
1494   src->generation = sess->generation;
1495   /* we have one more source now */
1496   sess->total_sources++;
1497   if (RTP_SOURCE_IS_ACTIVE (src))
1498     sess->stats.active_sources++;
1499   if (src->internal) {
1500     sess->stats.internal_sources++;
1501     if (sess->suggested_ssrc != src->ssrc)
1502       sess->suggested_ssrc = src->ssrc;
1503   }
1504
1505   /* update point-to-point status */
1506   if (!src->internal)
1507     session_update_ptp (sess);
1508 }
1509
1510 static RTPSource *
1511 find_source (RTPSession * sess, guint32 ssrc)
1512 {
1513   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1514       GINT_TO_POINTER (ssrc));
1515 }
1516
1517 /* must be called with the session lock, the returned source needs to be
1518  * unreffed after usage. */
1519 static RTPSource *
1520 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1521     RTPPacketInfo * pinfo, gboolean rtp)
1522 {
1523   RTPSource *source;
1524
1525   source = find_source (sess, ssrc);
1526   if (source == NULL) {
1527     /* make new Source in probation and insert */
1528     source = rtp_source_new (ssrc);
1529
1530     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1531
1532     /* for RTP packets we need to set the source in probation. Receiving RTCP
1533      * packets of an SSRC, on the other hand, is a strong indication that we
1534      * are dealing with a valid source. */
1535     if (rtp)
1536       g_object_set (source, "probation", sess->probation, NULL);
1537     else
1538       g_object_set (source, "probation", 0, NULL);
1539
1540     /* store from address, if any */
1541     if (pinfo->address) {
1542       if (rtp)
1543         rtp_source_set_rtp_from (source, pinfo->address);
1544       else
1545         rtp_source_set_rtcp_from (source, pinfo->address);
1546     }
1547
1548     /* configure a callback on the source */
1549     rtp_source_set_callbacks (source, &callbacks, sess);
1550
1551     add_source (sess, source);
1552     *created = TRUE;
1553   } else {
1554     *created = FALSE;
1555     /* check for collision, this updates the address when not previously set */
1556     if (check_collision (sess, source, pinfo, rtp)) {
1557       return NULL;
1558     }
1559     /* Receiving RTCP packets of an SSRC is a strong indication that we
1560      * are dealing with a valid source. */
1561     if (!rtp)
1562       g_object_set (source, "probation", 0, NULL);
1563   }
1564   /* update last activity */
1565   source->last_activity = pinfo->current_time;
1566   if (rtp)
1567     source->last_rtp_activity = pinfo->current_time;
1568   g_object_ref (source);
1569
1570   return source;
1571 }
1572
1573 /* must be called with the session lock, the returned source needs to be
1574  * unreffed after usage. */
1575 static RTPSource *
1576 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1577     GstClockTime current_time)
1578 {
1579   RTPSource *source;
1580
1581   source = find_source (sess, ssrc);
1582   if (source == NULL) {
1583     /* make new internal Source and insert */
1584     source = rtp_source_new (ssrc);
1585
1586     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1587
1588     source->validated = TRUE;
1589     source->internal = TRUE;
1590     source->probation = FALSE;
1591     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1592     rtp_source_set_callbacks (source, &callbacks, sess);
1593
1594     add_source (sess, source);
1595     *created = TRUE;
1596   } else {
1597     *created = FALSE;
1598   }
1599   /* update last activity */
1600   if (current_time != GST_CLOCK_TIME_NONE) {
1601     source->last_activity = current_time;
1602     source->last_rtp_activity = current_time;
1603   }
1604   g_object_ref (source);
1605
1606   return source;
1607 }
1608
1609 /**
1610  * rtp_session_suggest_ssrc:
1611  * @sess: a #RTPSession
1612  *
1613  * Suggest an unused SSRC in @sess.
1614  *
1615  * Returns: a free unused SSRC
1616  */
1617 guint32
1618 rtp_session_suggest_ssrc (RTPSession * sess)
1619 {
1620   guint32 result;
1621
1622   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1623
1624   RTP_SESSION_LOCK (sess);
1625   result = sess->suggested_ssrc;
1626   RTP_SESSION_UNLOCK (sess);
1627
1628   return result;
1629 }
1630
1631 /**
1632  * rtp_session_add_source:
1633  * @sess: a #RTPSession
1634  * @src: #RTPSource to add
1635  *
1636  * Add @src to @session.
1637  *
1638  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1639  * existed in the session.
1640  */
1641 gboolean
1642 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1643 {
1644   gboolean result = FALSE;
1645   RTPSource *find;
1646
1647   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1648   g_return_val_if_fail (src != NULL, FALSE);
1649
1650   RTP_SESSION_LOCK (sess);
1651   find = find_source (sess, src->ssrc);
1652   if (find == NULL) {
1653     add_source (sess, src);
1654     result = TRUE;
1655   }
1656   RTP_SESSION_UNLOCK (sess);
1657
1658   return result;
1659 }
1660
1661 /**
1662  * rtp_session_get_num_sources:
1663  * @sess: an #RTPSession
1664  *
1665  * Get the number of sources in @sess.
1666  *
1667  * Returns: The number of sources in @sess.
1668  */
1669 guint
1670 rtp_session_get_num_sources (RTPSession * sess)
1671 {
1672   guint result;
1673
1674   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1675
1676   RTP_SESSION_LOCK (sess);
1677   result = sess->total_sources;
1678   RTP_SESSION_UNLOCK (sess);
1679
1680   return result;
1681 }
1682
1683 /**
1684  * rtp_session_get_num_active_sources:
1685  * @sess: an #RTPSession
1686  *
1687  * Get the number of active sources in @sess. A source is considered active when
1688  * it has been validated and has not yet received a BYE RTCP message.
1689  *
1690  * Returns: The number of active sources in @sess.
1691  */
1692 guint
1693 rtp_session_get_num_active_sources (RTPSession * sess)
1694 {
1695   guint result;
1696
1697   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1698
1699   RTP_SESSION_LOCK (sess);
1700   result = sess->stats.active_sources;
1701   RTP_SESSION_UNLOCK (sess);
1702
1703   return result;
1704 }
1705
1706 /**
1707  * rtp_session_get_source_by_ssrc:
1708  * @sess: an #RTPSession
1709  * @ssrc: an SSRC
1710  *
1711  * Find the source with @ssrc in @sess.
1712  *
1713  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1714  * g_object_unref() after usage.
1715  */
1716 RTPSource *
1717 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1718 {
1719   RTPSource *result;
1720
1721   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1722
1723   RTP_SESSION_LOCK (sess);
1724   result = find_source (sess, ssrc);
1725   if (result != NULL)
1726     g_object_ref (result);
1727   RTP_SESSION_UNLOCK (sess);
1728
1729   return result;
1730 }
1731
1732 /* should be called with the SESSION lock */
1733 static guint32
1734 rtp_session_create_new_ssrc (RTPSession * sess)
1735 {
1736   guint32 ssrc;
1737
1738   while (TRUE) {
1739     ssrc = g_random_int ();
1740
1741     /* see if it exists in the session, we're done if it doesn't */
1742     if (find_source (sess, ssrc) == NULL)
1743       break;
1744   }
1745   return ssrc;
1746 }
1747
1748
1749 /**
1750  * rtp_session_create_source:
1751  * @sess: an #RTPSession
1752  *
1753  * Create an #RTPSource for use in @sess. This function will create a source
1754  * with an ssrc that is currently not used by any participants in the session.
1755  *
1756  * Returns: an #RTPSource.
1757  */
1758 RTPSource *
1759 rtp_session_create_source (RTPSession * sess)
1760 {
1761   guint32 ssrc;
1762   RTPSource *source;
1763
1764   RTP_SESSION_LOCK (sess);
1765   ssrc = rtp_session_create_new_ssrc (sess);
1766   source = rtp_source_new (ssrc);
1767   rtp_source_set_callbacks (source, &callbacks, sess);
1768   /* we need an additional ref for the source in the hashtable */
1769   g_object_ref (source);
1770   add_source (sess, source);
1771   RTP_SESSION_UNLOCK (sess);
1772
1773   return source;
1774 }
1775
1776 static gboolean
1777 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
1778 {
1779   GstNetAddressMeta *meta;
1780
1781   /* get packet size including header overhead */
1782   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
1783   pinfo->packets++;
1784
1785   if (pinfo->rtp) {
1786     GstRTPBuffer rtp = { NULL };
1787
1788     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
1789       goto invalid_packet;
1790
1791     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
1792     if (idx == 0) {
1793       gint i;
1794
1795       /* only keep info for first buffer */
1796       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
1797       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
1798       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
1799       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1800       /* copy available csrc */
1801       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
1802       for (i = 0; i < pinfo->csrc_count; i++)
1803         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
1804     }
1805     gst_rtp_buffer_unmap (&rtp);
1806   }
1807
1808   if (idx == 0) {
1809     /* for netbuffer we can store the IP address to check for collisions */
1810     meta = gst_buffer_get_net_address_meta (*buffer);
1811     if (pinfo->address)
1812       g_object_unref (pinfo->address);
1813     if (meta) {
1814       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
1815     } else {
1816       pinfo->address = NULL;
1817     }
1818   }
1819   return TRUE;
1820
1821   /* ERRORS */
1822 invalid_packet:
1823   {
1824     GST_DEBUG ("invalid RTP packet received");
1825     return FALSE;
1826   }
1827 }
1828
1829 /* update the RTPPacketInfo structure with the current time and other bits
1830  * about the current buffer we are handling.
1831  * This function is typically called when a validated packet is received.
1832  * This function should be called with the SESSION_LOCK
1833  */
1834 static gboolean
1835 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
1836     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
1837     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
1838 {
1839   gboolean res;
1840
1841   pinfo->send = send;
1842   pinfo->rtp = rtp;
1843   pinfo->is_list = is_list;
1844   pinfo->data = data;
1845   pinfo->current_time = current_time;
1846   pinfo->running_time = running_time;
1847   pinfo->ntpnstime = ntpnstime;
1848   pinfo->header_len = sess->header_len;
1849   pinfo->bytes = 0;
1850   pinfo->payload_len = 0;
1851   pinfo->packets = 0;
1852
1853   if (is_list) {
1854     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
1855     res =
1856         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
1857         pinfo);
1858   } else {
1859     GstBuffer *buffer = GST_BUFFER_CAST (data);
1860     res = update_packet (&buffer, 0, pinfo);
1861   }
1862   return res;
1863 }
1864
1865 static void
1866 clean_packet_info (RTPPacketInfo * pinfo)
1867 {
1868   if (pinfo->address)
1869     g_object_unref (pinfo->address);
1870   if (pinfo->data) {
1871     gst_mini_object_unref (pinfo->data);
1872     pinfo->data = NULL;
1873   }
1874 }
1875
1876 static gboolean
1877 source_update_active (RTPSession * sess, RTPSource * source,
1878     gboolean prevactive)
1879 {
1880   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
1881   guint32 ssrc = source->ssrc;
1882
1883   if (prevactive == active)
1884     return FALSE;
1885
1886   if (active) {
1887     sess->stats.active_sources++;
1888     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1889         sess->stats.active_sources);
1890   } else {
1891     sess->stats.active_sources--;
1892     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1893         sess->stats.active_sources);
1894   }
1895   return TRUE;
1896 }
1897
1898 static gboolean
1899 source_update_sender (RTPSession * sess, RTPSource * source,
1900     gboolean prevsender)
1901 {
1902   gboolean sender = RTP_SOURCE_IS_SENDER (source);
1903   guint32 ssrc = source->ssrc;
1904
1905   if (prevsender == sender)
1906     return FALSE;
1907
1908   if (sender) {
1909     sess->stats.sender_sources++;
1910     if (source->internal)
1911       sess->stats.internal_sender_sources++;
1912     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1913         sess->stats.sender_sources);
1914   } else {
1915     sess->stats.sender_sources--;
1916     if (source->internal)
1917       sess->stats.internal_sender_sources--;
1918     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1919         sess->stats.sender_sources);
1920   }
1921   return TRUE;
1922 }
1923
1924 /**
1925  * rtp_session_process_rtp:
1926  * @sess: and #RTPSession
1927  * @buffer: an RTP buffer
1928  * @current_time: the current system time
1929  * @running_time: the running_time of @buffer
1930  *
1931  * Process an RTP buffer in the session manager. This function takes ownership
1932  * of @buffer.
1933  *
1934  * Returns: a #GstFlowReturn.
1935  */
1936 GstFlowReturn
1937 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1938     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
1939 {
1940   GstFlowReturn result;
1941   guint32 ssrc;
1942   RTPSource *source;
1943   gboolean created;
1944   gboolean prevsender, prevactive;
1945   RTPPacketInfo pinfo = { 0, };
1946   guint64 oldrate;
1947
1948   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1949   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1950
1951   RTP_SESSION_LOCK (sess);
1952
1953   /* update pinfo stats */
1954   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
1955           current_time, running_time, ntpnstime)) {
1956     GST_DEBUG ("invalid RTP packet received");
1957     RTP_SESSION_UNLOCK (sess);
1958     return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
1959   }
1960
1961   ssrc = pinfo.ssrc;
1962
1963   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
1964   if (!source)
1965     goto collision;
1966
1967   prevsender = RTP_SOURCE_IS_SENDER (source);
1968   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1969   oldrate = source->bitrate;
1970
1971   /* let source process the packet */
1972   result = rtp_source_process_rtp (source, &pinfo);
1973
1974   /* source became active */
1975   if (source_update_active (sess, source, prevactive))
1976     on_ssrc_validated (sess, source);
1977
1978   source_update_sender (sess, source, prevsender);
1979
1980   if (oldrate != source->bitrate)
1981     sess->recalc_bandwidth = TRUE;
1982
1983   if (created)
1984     on_new_ssrc (sess, source);
1985
1986   if (source->validated) {
1987     gboolean created;
1988     gint i;
1989
1990     /* for validated sources, we add the CSRCs as well */
1991     for (i = 0; i < pinfo.csrc_count; i++) {
1992       guint32 csrc;
1993       RTPSource *csrc_src;
1994
1995       csrc = pinfo.csrcs[i];
1996
1997       /* get source */
1998       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
1999       if (!csrc_src)
2000         continue;
2001
2002       if (created) {
2003         GST_DEBUG ("created new CSRC: %08x", csrc);
2004         rtp_source_set_as_csrc (csrc_src);
2005         source_update_active (sess, csrc_src, FALSE);
2006         on_new_ssrc (sess, csrc_src);
2007       }
2008       g_object_unref (csrc_src);
2009     }
2010   }
2011   g_object_unref (source);
2012
2013   RTP_SESSION_UNLOCK (sess);
2014
2015   clean_packet_info (&pinfo);
2016
2017   return result;
2018
2019   /* ERRORS */
2020 collision:
2021   {
2022     RTP_SESSION_UNLOCK (sess);
2023     clean_packet_info (&pinfo);
2024     GST_DEBUG ("ignoring packet because its collisioning");
2025     return GST_FLOW_OK;
2026   }
2027 }
2028
2029 static void
2030 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2031     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2032 {
2033   guint count, i;
2034
2035   count = gst_rtcp_packet_get_rb_count (packet);
2036   for (i = 0; i < count; i++) {
2037     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2038     guint8 fractionlost;
2039     gint32 packetslost;
2040     RTPSource *src;
2041
2042     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2043         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2044
2045     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2046
2047     /* find our own source */
2048     src = find_source (sess, ssrc);
2049     if (src == NULL)
2050       continue;
2051
2052     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2053       /* only deal with report blocks for our session, we update the stats of
2054        * the sender of the RTCP message. We could also compare our stats against
2055        * the other sender to see if we are better or worse. */
2056       /* FIXME, need to keep track who the RB block is from */
2057       rtp_source_process_rb (source, pinfo->ntpnstime, fractionlost,
2058           packetslost, exthighestseq, jitter, lsr, dlsr);
2059     }
2060   }
2061   on_ssrc_active (sess, source);
2062 }
2063
2064 /* A Sender report contains statistics about how the sender is doing. This
2065  * includes timing informataion such as the relation between RTP and NTP
2066  * timestamps and the number of packets/bytes it sent to us.
2067  *
2068  * In this report is also included a set of report blocks related to how this
2069  * sender is receiving data (in case we (or somebody else) is also sending stuff
2070  * to it). This info includes the packet loss, jitter and seqnum. It also
2071  * contains information to calculate the round trip time (LSR/DLSR).
2072  */
2073 static void
2074 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2075     RTPPacketInfo * pinfo, gboolean * do_sync)
2076 {
2077   guint32 senderssrc, rtptime, packet_count, octet_count;
2078   guint64 ntptime;
2079   RTPSource *source;
2080   gboolean created, prevsender;
2081
2082   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2083       &packet_count, &octet_count);
2084
2085   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2086       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2087
2088   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2089   if (!source)
2090     return;
2091
2092   /* skip non-bye packets for sources that are marked BYE */
2093   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2094     goto out;
2095
2096   /* don't try to do lip-sync for sources that sent a BYE */
2097   if (RTP_SOURCE_IS_MARKED_BYE (source))
2098     *do_sync = FALSE;
2099   else
2100     *do_sync = TRUE;
2101
2102   prevsender = RTP_SOURCE_IS_SENDER (source);
2103
2104   /* first update the source */
2105   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2106       packet_count, octet_count);
2107
2108   source_update_sender (sess, source, prevsender);
2109
2110   if (created)
2111     on_new_ssrc (sess, source);
2112
2113   rtp_session_process_rb (sess, source, packet, pinfo);
2114
2115 out:
2116   g_object_unref (source);
2117 }
2118
2119 /* A receiver report contains statistics about how a receiver is doing. It
2120  * includes stuff like packet loss, jitter and the seqnum it received last. It
2121  * also contains info to calculate the round trip time.
2122  *
2123  * We are only interested in how the sender of this report is doing wrt to us.
2124  */
2125 static void
2126 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2127     RTPPacketInfo * pinfo)
2128 {
2129   guint32 senderssrc;
2130   RTPSource *source;
2131   gboolean created;
2132
2133   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2134
2135   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2136
2137   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2138   if (!source)
2139     return;
2140
2141   /* skip non-bye packets for sources that are marked BYE */
2142   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2143     goto out;
2144
2145   if (created)
2146     on_new_ssrc (sess, source);
2147
2148   rtp_session_process_rb (sess, source, packet, pinfo);
2149
2150 out:
2151   g_object_unref (source);
2152 }
2153
2154 /* Get SDES items and store them in the SSRC */
2155 static void
2156 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2157     RTPPacketInfo * pinfo)
2158 {
2159   guint items, i, j;
2160   gboolean more_items, more_entries;
2161
2162   items = gst_rtcp_packet_sdes_get_item_count (packet);
2163   GST_DEBUG ("got SDES packet with %d items", items);
2164
2165   more_items = gst_rtcp_packet_sdes_first_item (packet);
2166   i = 0;
2167   while (more_items) {
2168     guint32 ssrc;
2169     gboolean changed, created, prevactive;
2170     RTPSource *source;
2171     GstStructure *sdes;
2172
2173     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2174
2175     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2176
2177     changed = FALSE;
2178
2179     /* find src, no probation when dealing with RTCP */
2180     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2181     if (!source)
2182       return;
2183
2184     /* skip non-bye packets for sources that are marked BYE */
2185     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2186       goto next;
2187
2188     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2189
2190     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2191     j = 0;
2192     while (more_entries) {
2193       GstRTCPSDESType type;
2194       guint8 len;
2195       guint8 *data;
2196       gchar *name;
2197       gchar *value;
2198
2199       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2200
2201       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2202           data);
2203
2204       if (type == GST_RTCP_SDES_PRIV) {
2205         name = g_strndup ((const gchar *) &data[1], data[0]);
2206         len -= data[0] + 1;
2207         data += data[0] + 1;
2208       } else {
2209         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2210       }
2211
2212       value = g_strndup ((const gchar *) data, len);
2213
2214       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2215
2216       g_free (name);
2217       g_free (value);
2218
2219       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2220       j++;
2221     }
2222
2223     /* takes ownership of sdes */
2224     changed = rtp_source_set_sdes_struct (source, sdes);
2225
2226     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2227     source->validated = TRUE;
2228
2229     if (created)
2230       on_new_ssrc (sess, source);
2231
2232     /* source became active */
2233     if (source_update_active (sess, source, prevactive))
2234       on_ssrc_validated (sess, source);
2235
2236     if (changed)
2237       on_ssrc_sdes (sess, source);
2238
2239   next:
2240     g_object_unref (source);
2241
2242     more_items = gst_rtcp_packet_sdes_next_item (packet);
2243     i++;
2244   }
2245 }
2246
2247 /* BYE is sent when a client leaves the session
2248  */
2249 static void
2250 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2251     RTPPacketInfo * pinfo)
2252 {
2253   guint count, i;
2254   gchar *reason;
2255   gboolean reconsider = FALSE;
2256
2257   reason = gst_rtcp_packet_bye_get_reason (packet);
2258   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2259
2260   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2261   for (i = 0; i < count; i++) {
2262     guint32 ssrc;
2263     RTPSource *source;
2264     gboolean created, prevactive, prevsender;
2265     guint pmembers, members;
2266
2267     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2268     GST_DEBUG ("SSRC: %08x", ssrc);
2269
2270     /* find src and mark bye, no probation when dealing with RTCP */
2271     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2272     if (!source)
2273       return;
2274
2275     if (source->internal) {
2276       /* our own source, something weird with this packet */
2277       g_object_unref (source);
2278       continue;
2279     }
2280
2281     /* store time for when we need to time out this source */
2282     source->bye_time = pinfo->current_time;
2283
2284     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2285     prevsender = RTP_SOURCE_IS_SENDER (source);
2286
2287     /* mark the source BYE */
2288     rtp_source_mark_bye (source, reason);
2289
2290     pmembers = sess->stats.active_sources;
2291
2292     source_update_active (sess, source, prevactive);
2293     source_update_sender (sess, source, prevsender);
2294
2295     members = sess->stats.active_sources;
2296
2297     if (!sess->scheduled_bye && members < pmembers) {
2298       /* some members went away since the previous timeout estimate.
2299        * Perform reverse reconsideration but only when we are not scheduling a
2300        * BYE ourselves. */
2301       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2302           pinfo->current_time < sess->next_rtcp_check_time) {
2303         GstClockTime time_remaining;
2304
2305         time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2306         sess->next_rtcp_check_time =
2307             gst_util_uint64_scale (time_remaining, members, pmembers);
2308
2309         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2310             GST_TIME_ARGS (sess->next_rtcp_check_time));
2311
2312         sess->next_rtcp_check_time += pinfo->current_time;
2313
2314         /* mark pending reconsider. We only want to signal the reconsideration
2315          * once after we handled all the source in the bye packet */
2316         reconsider = TRUE;
2317       }
2318     }
2319
2320     if (created)
2321       on_new_ssrc (sess, source);
2322
2323     on_bye_ssrc (sess, source);
2324
2325     g_object_unref (source);
2326   }
2327   if (reconsider) {
2328     RTP_SESSION_UNLOCK (sess);
2329     /* notify app of reconsideration */
2330     if (sess->callbacks.reconsider)
2331       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2332     RTP_SESSION_LOCK (sess);
2333   }
2334   g_free (reason);
2335 }
2336
2337 static void
2338 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2339     RTPPacketInfo * pinfo)
2340 {
2341   GST_DEBUG ("received APP");
2342 }
2343
2344 static gboolean
2345 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2346     gboolean fir, GstClockTime current_time)
2347 {
2348   guint32 round_trip = 0;
2349
2350   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
2351
2352   if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2353     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2354         GST_SECOND, 65536);
2355
2356     if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
2357       GST_DEBUG ("Ignoring %s request because one was send without one "
2358           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2359           fir ? "FIR" : "PLI",
2360           GST_TIME_ARGS (current_time - sess->last_keyframe_request),
2361           GST_TIME_ARGS (round_trip_in_ns));
2362       return FALSE;
2363     }
2364   }
2365
2366   sess->last_keyframe_request = current_time;
2367
2368   GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
2369       rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
2370       sess->callbacks.request_key_unit);
2371
2372   RTP_SESSION_UNLOCK (sess);
2373   sess->callbacks.request_key_unit (sess, fir,
2374       sess->request_key_unit_user_data);
2375   RTP_SESSION_LOCK (sess);
2376
2377   return TRUE;
2378 }
2379
2380 static void
2381 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2382     guint32 media_ssrc, GstClockTime current_time)
2383 {
2384   RTPSource *src;
2385
2386   if (!sess->callbacks.request_key_unit)
2387     return;
2388
2389   src = find_source (sess, sender_ssrc);
2390   if (src == NULL)
2391     return;
2392
2393   rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
2394
2395   src->stats.recv_pli_count++;
2396 }
2397
2398 static void
2399 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2400     guint8 * fci_data, guint fci_length, GstClockTime current_time)
2401 {
2402   RTPSource *src;
2403   guint32 ssrc;
2404   guint position = 0;
2405   gboolean our_request = FALSE;
2406
2407   if (!sess->callbacks.request_key_unit)
2408     return;
2409
2410   if (fci_length < 8)
2411     return;
2412
2413   src = find_source (sess, sender_ssrc);
2414
2415   /* Hack because Google fails to set the sender_ssrc correctly */
2416   if (!src && sender_ssrc == 1) {
2417     GHashTableIter iter;
2418
2419     /* we can't find the source if there are multiple */
2420     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2421       return;
2422
2423     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2424     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2425       if (!src->internal && rtp_source_is_sender (src))
2426         break;
2427       src = NULL;
2428     }
2429   }
2430   if (!src)
2431     return;
2432
2433   for (position = 0; position < fci_length; position += 8) {
2434     guint8 *data = fci_data + position;
2435     RTPSource *own;
2436
2437     ssrc = GST_READ_UINT32_BE (data);
2438
2439     own = find_source (sess, ssrc);
2440     if (own == NULL)
2441       continue;
2442
2443     if (own->internal) {
2444       our_request = TRUE;
2445       break;
2446     }
2447   }
2448   if (!our_request)
2449     return;
2450
2451   rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
2452   src->stats.recv_fir_count++;
2453 }
2454
2455 static void
2456 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2457     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2458     GstClockTime current_time)
2459 {
2460   sess->stats.nacks_received++;
2461
2462   if (!sess->callbacks.notify_nack)
2463     return;
2464
2465   while (fci_length > 0) {
2466     guint16 seqnum, blp;
2467
2468     seqnum = GST_READ_UINT16_BE (fci_data);
2469     blp = GST_READ_UINT16_BE (fci_data + 2);
2470
2471     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2472
2473     RTP_SESSION_UNLOCK (sess);
2474     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2475         sess->notify_nack_user_data);
2476     RTP_SESSION_LOCK (sess);
2477
2478     fci_data += 4;
2479     fci_length -= 4;
2480   }
2481 }
2482
2483 static void
2484 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2485     RTPPacketInfo * pinfo, GstClockTime current_time)
2486 {
2487   GstRTCPType type = gst_rtcp_packet_get_type (packet);
2488   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
2489   guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2490   guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2491   guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
2492   guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
2493   RTPSource *src;
2494
2495   src = find_source (sess, media_ssrc);
2496
2497   /* skip non-bye packets for sources that are marked BYE */
2498   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
2499     return;
2500
2501   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2502       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2503
2504   if (g_signal_has_handler_pending (sess,
2505           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2506     GstBuffer *fci_buffer = NULL;
2507
2508     if (fci_length > 0) {
2509       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2510           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2511           fci_length);
2512       GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time;
2513     }
2514
2515     RTP_SESSION_UNLOCK (sess);
2516     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2517         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2518     RTP_SESSION_LOCK (sess);
2519
2520     if (fci_buffer)
2521       gst_buffer_unref (fci_buffer);
2522   }
2523
2524   if (src && sess->rtcp_feedback_retention_window) {
2525     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
2526   }
2527
2528   if ((src && src->internal) ||
2529       /* PSFB FIR puts the media ssrc inside the FCI */
2530       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
2531     switch (type) {
2532       case GST_RTCP_TYPE_PSFB:
2533         switch (fbtype) {
2534           case GST_RTCP_PSFB_TYPE_PLI:
2535             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2536                 current_time);
2537             break;
2538           case GST_RTCP_PSFB_TYPE_FIR:
2539             rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
2540                 current_time);
2541             break;
2542           default:
2543             break;
2544         }
2545         break;
2546       case GST_RTCP_TYPE_RTPFB:
2547         switch (fbtype) {
2548           case GST_RTCP_RTPFB_TYPE_NACK:
2549             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
2550                 fci_data, fci_length, current_time);
2551             break;
2552           default:
2553             break;
2554         }
2555       default:
2556         break;
2557     }
2558   }
2559 }
2560
2561 /**
2562  * rtp_session_process_rtcp:
2563  * @sess: and #RTPSession
2564  * @buffer: an RTCP buffer
2565  * @current_time: the current system time
2566  * @ntpnstime: the current NTP time in nanoseconds
2567  *
2568  * Process an RTCP buffer in the session manager. This function takes ownership
2569  * of @buffer.
2570  *
2571  * Returns: a #GstFlowReturn.
2572  */
2573 GstFlowReturn
2574 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2575     GstClockTime current_time, guint64 ntpnstime)
2576 {
2577   GstRTCPPacket packet;
2578   gboolean more, is_bye = FALSE, do_sync = FALSE;
2579   RTPPacketInfo pinfo = { 0, };
2580   GstFlowReturn result = GST_FLOW_OK;
2581   GstRTCPBuffer rtcp = { NULL, };
2582
2583   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2584   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2585
2586   if (!gst_rtcp_buffer_validate (buffer))
2587     goto invalid_packet;
2588
2589   GST_DEBUG ("received RTCP packet");
2590
2591   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
2592       buffer);
2593
2594   RTP_SESSION_LOCK (sess);
2595   /* update pinfo stats */
2596   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
2597       -1, ntpnstime);
2598
2599   /* start processing the compound packet */
2600   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2601   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
2602   while (more) {
2603     GstRTCPType type;
2604
2605     type = gst_rtcp_packet_get_type (&packet);
2606
2607     switch (type) {
2608       case GST_RTCP_TYPE_SR:
2609         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
2610         break;
2611       case GST_RTCP_TYPE_RR:
2612         rtp_session_process_rr (sess, &packet, &pinfo);
2613         break;
2614       case GST_RTCP_TYPE_SDES:
2615         rtp_session_process_sdes (sess, &packet, &pinfo);
2616         break;
2617       case GST_RTCP_TYPE_BYE:
2618         is_bye = TRUE;
2619         /* don't try to attempt lip-sync anymore for streams with a BYE */
2620         do_sync = FALSE;
2621         rtp_session_process_bye (sess, &packet, &pinfo);
2622         break;
2623       case GST_RTCP_TYPE_APP:
2624         rtp_session_process_app (sess, &packet, &pinfo);
2625         break;
2626       case GST_RTCP_TYPE_RTPFB:
2627       case GST_RTCP_TYPE_PSFB:
2628         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
2629         break;
2630       default:
2631         GST_WARNING ("got unknown RTCP packet");
2632         break;
2633     }
2634     more = gst_rtcp_packet_move_to_next (&packet);
2635   }
2636
2637   gst_rtcp_buffer_unmap (&rtcp);
2638
2639   /* if we are scheduling a BYE, we only want to count bye packets, else we
2640    * count everything */
2641   if (sess->scheduled_bye && is_bye) {
2642     sess->bye_stats.bye_members++;
2643     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
2644   }
2645
2646   /* keep track of average packet size */
2647   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2648
2649   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2650       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2651   RTP_SESSION_UNLOCK (sess);
2652
2653   pinfo.data = NULL;
2654   clean_packet_info (&pinfo);
2655
2656   /* notify caller of sr packets in the callback */
2657   if (do_sync && sess->callbacks.sync_rtcp) {
2658     result = sess->callbacks.sync_rtcp (sess, buffer,
2659         sess->sync_rtcp_user_data);
2660   } else
2661     gst_buffer_unref (buffer);
2662
2663   return result;
2664
2665   /* ERRORS */
2666 invalid_packet:
2667   {
2668     GST_DEBUG ("invalid RTCP packet received");
2669     gst_buffer_unref (buffer);
2670     return GST_FLOW_OK;
2671   }
2672 }
2673
2674 /**
2675  * rtp_session_update_send_caps:
2676  * @sess: an #RTPSession
2677  * @caps: a #GstCaps
2678  *
2679  * Update the caps of the sender in the rtp session.
2680  */
2681 void
2682 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
2683 {
2684   GstStructure *s;
2685   guint ssrc;
2686
2687   g_return_if_fail (RTP_IS_SESSION (sess));
2688   g_return_if_fail (GST_IS_CAPS (caps));
2689
2690   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
2691
2692   s = gst_caps_get_structure (caps, 0);
2693
2694   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
2695     RTPSource *source;
2696     gboolean created;
2697
2698     RTP_SESSION_LOCK (sess);
2699     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
2700     if (source) {
2701       rtp_source_update_caps (source, caps);
2702       g_object_unref (source);
2703     }
2704     RTP_SESSION_UNLOCK (sess);
2705   }
2706 }
2707
2708 /**
2709  * rtp_session_send_rtp:
2710  * @sess: an #RTPSession
2711  * @data: pointer to either an RTP buffer or a list of RTP buffers
2712  * @is_list: TRUE when @data is a buffer list
2713  * @current_time: the current system time
2714  * @running_time: the running time of @data
2715  *
2716  * Send the RTP buffer in the session manager. This function takes ownership of
2717  * @buffer.
2718  *
2719  * Returns: a #GstFlowReturn.
2720  */
2721 GstFlowReturn
2722 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2723     GstClockTime current_time, GstClockTime running_time)
2724 {
2725   GstFlowReturn result;
2726   RTPSource *source;
2727   gboolean prevsender;
2728   guint64 oldrate;
2729   RTPPacketInfo pinfo = { 0, };
2730   gboolean created;
2731
2732   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2733   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2734
2735   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2736
2737   RTP_SESSION_LOCK (sess);
2738   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
2739           current_time, running_time, -1))
2740     goto invalid_packet;
2741
2742   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
2743
2744   prevsender = RTP_SOURCE_IS_SENDER (source);
2745   oldrate = source->bitrate;
2746
2747   /* we use our own source to send */
2748   result = rtp_source_send_rtp (source, &pinfo);
2749
2750   source_update_sender (sess, source, prevsender);
2751
2752   if (oldrate != source->bitrate)
2753     sess->recalc_bandwidth = TRUE;
2754   RTP_SESSION_UNLOCK (sess);
2755
2756   g_object_unref (source);
2757   clean_packet_info (&pinfo);
2758
2759   return result;
2760
2761 invalid_packet:
2762   {
2763     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2764     RTP_SESSION_UNLOCK (sess);
2765     GST_DEBUG ("invalid RTP packet received");
2766     return GST_FLOW_OK;
2767   }
2768 }
2769
2770 static void
2771 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2772 {
2773   *bandwidth += source->bitrate;
2774 }
2775
2776 /* must be called with session lock */
2777 static GstClockTime
2778 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2779     gboolean first)
2780 {
2781   GstClockTime result;
2782   RTPSessionStats *stats;
2783
2784   /* recalculate bandwidth when it changed */
2785   if (sess->recalc_bandwidth) {
2786     gdouble bandwidth;
2787
2788     if (sess->bandwidth > 0)
2789       bandwidth = sess->bandwidth;
2790     else {
2791       /* If it is <= 0, then try to estimate the actual bandwidth */
2792       bandwidth = 0;
2793
2794       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2795           (GHFunc) add_bitrates, &bandwidth);
2796       bandwidth /= 8.0;
2797     }
2798     if (bandwidth < 8000)
2799       bandwidth = RTP_STATS_BANDWIDTH;
2800
2801     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2802         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2803
2804     sess->recalc_bandwidth = FALSE;
2805   }
2806
2807   if (sess->scheduled_bye) {
2808     stats = &sess->bye_stats;
2809     result = rtp_stats_calculate_bye_interval (stats);
2810   } else {
2811     stats = &sess->stats;
2812     result = rtp_stats_calculate_rtcp_interval (stats,
2813         stats->internal_sender_sources > 0, first);
2814   }
2815
2816   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2817       GST_TIME_ARGS (result), first);
2818
2819   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2820     result = rtp_stats_add_rtcp_jitter (stats, result);
2821
2822   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2823
2824   return result;
2825 }
2826
2827 static void
2828 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
2829 {
2830   if (source->internal)
2831     rtp_source_mark_bye (source, reason);
2832 }
2833
2834 /**
2835  * rtp_session_mark_all_bye:
2836  * @sess: an #RTPSession
2837  * @reason: a reason
2838  *
2839  * Mark all internal sources of the session as BYE with @reason.
2840  */
2841 void
2842 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
2843 {
2844   g_return_if_fail (RTP_IS_SESSION (sess));
2845
2846   RTP_SESSION_LOCK (sess);
2847   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2848       (GHFunc) source_mark_bye, (gpointer) reason);
2849   RTP_SESSION_UNLOCK (sess);
2850 }
2851
2852 /* Stop the current @sess and schedule a BYE message for the other members.
2853  * One must have the session lock to call this function
2854  */
2855 static GstFlowReturn
2856 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
2857 {
2858   GstFlowReturn result = GST_FLOW_OK;
2859   GstClockTime interval;
2860
2861   /* nothing to do it we already scheduled bye */
2862   if (sess->scheduled_bye)
2863     goto done;
2864
2865   /* we schedule BYE now */
2866   sess->scheduled_bye = TRUE;
2867   /* at least one member wants to send a BYE */
2868   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
2869   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
2870   sess->bye_stats.bye_members = 1;
2871   sess->first_rtcp = TRUE;
2872   sess->allow_early = TRUE;
2873
2874   /* reschedule transmission */
2875   sess->last_rtcp_send_time = current_time;
2876   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2877
2878   if (interval != GST_CLOCK_TIME_NONE)
2879     sess->next_rtcp_check_time = current_time + interval;
2880   else
2881     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
2882
2883   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2884       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2885
2886   RTP_SESSION_UNLOCK (sess);
2887   /* notify app of reconsideration */
2888   if (sess->callbacks.reconsider)
2889     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2890   RTP_SESSION_LOCK (sess);
2891 done:
2892
2893   return result;
2894 }
2895
2896 /**
2897  * rtp_session_schedule_bye:
2898  * @sess: an #RTPSession
2899  * @current_time: the current system time
2900  *
2901  * Schedule a BYE message for all sources marked as BYE in @sess.
2902  *
2903  * Returns: a #GstFlowReturn.
2904  */
2905 GstFlowReturn
2906 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
2907 {
2908   GstFlowReturn result;
2909
2910   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2911
2912   RTP_SESSION_LOCK (sess);
2913   result = rtp_session_schedule_bye_locked (sess, current_time);
2914   RTP_SESSION_UNLOCK (sess);
2915
2916   return result;
2917 }
2918
2919 /**
2920  * rtp_session_next_timeout:
2921  * @sess: an #RTPSession
2922  * @current_time: the current system time
2923  *
2924  * Get the next time we should perform session maintenance tasks.
2925  *
2926  * Returns: a time when rtp_session_on_timeout() should be called with the
2927  * current system time.
2928  */
2929 GstClockTime
2930 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2931 {
2932   GstClockTime result, interval = 0;
2933
2934   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2935
2936   RTP_SESSION_LOCK (sess);
2937
2938   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
2939     GST_DEBUG ("have early rtcp time");
2940     result = sess->next_early_rtcp_time;
2941     goto early_exit;
2942   }
2943
2944   result = sess->next_rtcp_check_time;
2945
2946   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2947       ", next time: %" GST_TIME_FORMAT,
2948       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2949
2950   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
2951     GST_DEBUG ("take current time as base");
2952     /* our previous check time expired, start counting from the current time
2953      * again. */
2954     result = current_time;
2955   }
2956
2957   if (sess->scheduled_bye) {
2958     if (sess->bye_stats.active_sources >= 50) {
2959       GST_DEBUG ("reconsider BYE, more than 50 sources");
2960       /* reconsider BYE if members >= 50 */
2961       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2962     }
2963   } else {
2964     if (sess->first_rtcp) {
2965       GST_DEBUG ("first RTCP packet");
2966       /* we are called for the first time */
2967       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2968     } else if (sess->next_rtcp_check_time < current_time) {
2969       GST_DEBUG ("old check time expired, getting new timeout");
2970       /* get a new timeout when we need to */
2971       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2972     }
2973   }
2974
2975   if (interval != GST_CLOCK_TIME_NONE)
2976     result += interval;
2977   else
2978     result = GST_CLOCK_TIME_NONE;
2979
2980   sess->next_rtcp_check_time = result;
2981
2982 early_exit:
2983
2984   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2985       ", next time: %" GST_TIME_FORMAT,
2986       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2987   RTP_SESSION_UNLOCK (sess);
2988
2989   return result;
2990 }
2991
2992 typedef struct
2993 {
2994   RTPSource *source;
2995   gboolean is_bye;
2996   GstBuffer *buffer;
2997 } ReportOutput;
2998
2999 typedef struct
3000 {
3001   GstRTCPBuffer rtcpbuf;
3002   RTPSession *sess;
3003   RTPSource *source;
3004   guint num_to_report;
3005   gboolean have_fir;
3006   gboolean have_pli;
3007   gboolean have_nack;
3008   GstBuffer *rtcp;
3009   GstClockTime current_time;
3010   guint64 ntpnstime;
3011   GstClockTime running_time;
3012   GstClockTime interval;
3013   GstRTCPPacket packet;
3014   gboolean has_sdes;
3015   gboolean is_early;
3016   gboolean may_suppress;
3017   GQueue output;
3018   guint nacked_seqnums;
3019 } ReportData;
3020
3021 static void
3022 session_start_rtcp (RTPSession * sess, ReportData * data)
3023 {
3024   GstRTCPPacket *packet = &data->packet;
3025   RTPSource *own = data->source;
3026   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3027
3028   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
3029   data->has_sdes = FALSE;
3030
3031   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3032
3033   if (RTP_SOURCE_IS_SENDER (own)) {
3034     guint64 ntptime;
3035     guint32 rtptime;
3036     guint32 packet_count, octet_count;
3037
3038     /* we are a sender, create SR */
3039     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3040     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3041
3042     /* get latest stats */
3043     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3044         &ntptime, &rtptime, &packet_count, &octet_count);
3045     /* store stats */
3046     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3047         packet_count, octet_count);
3048
3049     /* fill in sender report info */
3050     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3051         ntptime, rtptime, packet_count, octet_count);
3052   } else {
3053     /* we are only receiver, create RR */
3054     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3055     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3056     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3057   }
3058 }
3059
3060 /* construct a Sender or Receiver Report */
3061 static void
3062 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3063 {
3064   RTPSession *sess = data->sess;
3065   GstRTCPPacket *packet = &data->packet;
3066   guint8 fractionlost;
3067   gint32 packetslost;
3068   guint32 exthighestseq, jitter;
3069   guint32 lsr, dlsr;
3070
3071   /* don't report for sources in future generations */
3072   if (((gint16) (source->generation - sess->generation)) > 0) {
3073     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3074         source->generation, sess->generation);
3075     return;
3076   }
3077
3078   if (g_hash_table_contains (source->reported_in_sr_of,
3079           GUINT_TO_POINTER (data->source->ssrc))) {
3080     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3081     return;
3082   }
3083
3084   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3085     GST_DEBUG ("max RB count reached");
3086     return;
3087   }
3088
3089   /* only report about other sender */
3090   if (source == data->source)
3091     goto reported;
3092
3093   if (!RTP_SOURCE_IS_SENDER (source)) {
3094     GST_DEBUG ("source %08x not sender", source->ssrc);
3095     goto reported;
3096   }
3097
3098   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3099
3100   /* get new stats */
3101   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3102       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3103
3104   /* store last generated RR packet */
3105   source->last_rr.is_valid = TRUE;
3106   source->last_rr.fractionlost = fractionlost;
3107   source->last_rr.packetslost = packetslost;
3108   source->last_rr.exthighestseq = exthighestseq;
3109   source->last_rr.jitter = jitter;
3110   source->last_rr.lsr = lsr;
3111   source->last_rr.dlsr = dlsr;
3112
3113   /* packet is not yet filled, add report block for this source. */
3114   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3115       exthighestseq, jitter, lsr, dlsr);
3116
3117 reported:
3118   g_hash_table_add (source->reported_in_sr_of,
3119       GUINT_TO_POINTER (data->source->ssrc));
3120 }
3121
3122 /* construct FIR */
3123 static void
3124 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3125 {
3126   GstRTCPPacket *packet = &data->packet;
3127   guint16 len;
3128   guint8 *fci_data;
3129
3130   if (!source->send_fir)
3131     return;
3132
3133   len = gst_rtcp_packet_fb_get_fci_length (packet);
3134   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3135     /* exit because the packet is full, will put next request in a
3136      * further packet */
3137     return;
3138
3139   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3140
3141   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3142   fci_data += 4;
3143   fci_data[0] = source->current_send_fir_seqnum;
3144   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3145
3146   source->send_fir = FALSE;
3147   source->stats.sent_fir_count++;
3148 }
3149
3150 static void
3151 session_fir (RTPSession * sess, ReportData * data)
3152 {
3153   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3154   GstRTCPPacket *packet = &data->packet;
3155
3156   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3157     return;
3158
3159   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3160   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3161   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3162
3163   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3164       (GHFunc) session_add_fir, data);
3165
3166   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3167     gst_rtcp_packet_remove (packet);
3168   else
3169     data->may_suppress = FALSE;
3170 }
3171
3172 static gboolean
3173 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3174 {
3175   GstRTCPPacket packet;
3176   GstRTCPBuffer rtcp = { NULL, };
3177   gboolean ret = FALSE;
3178
3179   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3180
3181   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3182     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3183         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3184       ret = TRUE;
3185   }
3186
3187   gst_rtcp_buffer_unmap (&rtcp);
3188
3189   return ret;
3190 }
3191
3192 /* construct PLI */
3193 static void
3194 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3195 {
3196   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3197   GstRTCPPacket *packet = &data->packet;
3198
3199   if (!source->send_pli)
3200     return;
3201
3202   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3203     return;
3204
3205   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3206     /* exit because the packet is full, will put next request in a
3207      * further packet */
3208     return;
3209
3210   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
3211   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3212   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3213
3214   source->send_pli = FALSE;
3215   data->may_suppress = FALSE;
3216
3217   source->stats.sent_pli_count++;
3218 }
3219
3220 /* construct NACK */
3221 static void
3222 session_nack (const gchar * key, RTPSource * source, ReportData * data)
3223 {
3224   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3225   GstRTCPPacket *packet = &data->packet;
3226   guint32 *nacks;
3227   guint n_nacks, i;
3228   guint8 *fci_data;
3229
3230   if (!source->send_nack)
3231     return;
3232
3233   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
3234     /* exit because the packet is full, will put next request in a
3235      * further packet */
3236     return;
3237
3238   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
3239   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3240   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3241
3242   nacks = rtp_source_get_nacks (source, &n_nacks);
3243   GST_DEBUG ("%u NACKs", n_nacks);
3244   if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
3245     return;
3246
3247   fci_data = gst_rtcp_packet_fb_get_fci (packet);
3248   for (i = 0; i < n_nacks; i++) {
3249     GST_WRITE_UINT32_BE (fci_data, nacks[i]);
3250     fci_data += 4;
3251     data->nacked_seqnums++;
3252   }
3253
3254   rtp_source_clear_nacks (source);
3255   data->may_suppress = FALSE;
3256 }
3257
3258 /* perform cleanup of sources that timed out */
3259 static void
3260 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
3261 {
3262   gboolean remove = FALSE;
3263   gboolean byetimeout = FALSE;
3264   gboolean sendertimeout = FALSE;
3265   gboolean is_sender, is_active;
3266   RTPSession *sess = data->sess;
3267   GstClockTime interval, binterval;
3268   GstClockTime btime;
3269
3270   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
3271
3272   /* check for outdated collisions */
3273   if (source->internal) {
3274     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
3275     rtp_source_timeout (source, data->current_time,
3276         data->running_time - sess->rtcp_feedback_retention_window);
3277   }
3278
3279   /* nothing else to do when without RTCP */
3280   if (data->interval == GST_CLOCK_TIME_NONE)
3281     return;
3282
3283   is_sender = RTP_SOURCE_IS_SENDER (source);
3284   is_active = RTP_SOURCE_IS_ACTIVE (source);
3285
3286   /* our own rtcp interval may have been forced low by secondary configuration,
3287    * while sender side may still operate with higher interval,
3288    * so do not just take our interval to decide on timing out sender,
3289    * but take (if data->interval <= 5 * GST_SECOND):
3290    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
3291    * where sender_interval is difference between last 2 received RTCP reports
3292    */
3293   if (data->interval >= 5 * GST_SECOND || source->internal) {
3294     binterval = data->interval;
3295   } else {
3296     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
3297         GST_TIME_ARGS (source->stats.prev_rtcptime),
3298         GST_TIME_ARGS (source->stats.last_rtcptime));
3299     /* if not received enough yet, fallback to larger default */
3300     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
3301       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
3302     else
3303       binterval = 5 * GST_SECOND;
3304     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
3305   }
3306   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
3307       GST_TIME_ARGS (binterval));
3308
3309   if (!source->internal && source->marked_bye) {
3310     /* if we received a BYE from the source, remove the source after some
3311      * time. */
3312     if (data->current_time > source->bye_time &&
3313         data->current_time - source->bye_time > sess->stats.bye_timeout) {
3314       GST_DEBUG ("removing BYE source %08x", source->ssrc);
3315       remove = TRUE;
3316       byetimeout = TRUE;
3317     }
3318   }
3319
3320   if (source->internal && source->sent_bye) {
3321     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
3322     remove = TRUE;
3323   }
3324
3325   /* sources that were inactive for more than 5 times the deterministic reporting
3326    * interval get timed out. the min timeout is 5 seconds. */
3327   /* mind old time that might pre-date last time going to PLAYING */
3328   btime = MAX (source->last_activity, sess->start_time);
3329   if (data->current_time > btime) {
3330     interval = MAX (binterval * 5, 5 * GST_SECOND);
3331     if (data->current_time - btime > interval) {
3332       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
3333           source->ssrc, GST_TIME_ARGS (btime));
3334       if (source->internal) {
3335         /* this is an internal source that is not using our suggested ssrc.
3336          * since there must be another source using this ssrc, we can remove
3337          * this one instead of making it a receiver forever */
3338         if (source->ssrc != sess->suggested_ssrc) {
3339           rtp_source_mark_bye (source, "timed out");
3340           /* do not schedule bye here, since we are inside the RTCP timeout
3341            * processing and scheduling bye will interfere with SR/RR sending */
3342         }
3343       } else {
3344         remove = TRUE;
3345       }
3346     }
3347   }
3348
3349   /* senders that did not send for a long time become a receiver, this also
3350    * holds for our own sources. */
3351   if (is_sender) {
3352     /* mind old time that might pre-date last time going to PLAYING */
3353     btime = MAX (source->last_rtp_activity, sess->start_time);
3354     if (data->current_time > btime) {
3355       interval = MAX (binterval * 2, 5 * GST_SECOND);
3356       if (data->current_time - btime > interval) {
3357         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
3358             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3359         sendertimeout = TRUE;
3360       }
3361     }
3362   }
3363
3364   if (remove) {
3365     sess->total_sources--;
3366     if (is_sender) {
3367       sess->stats.sender_sources--;
3368       if (source->internal)
3369         sess->stats.internal_sender_sources--;
3370     }
3371     if (is_active)
3372       sess->stats.active_sources--;
3373
3374     if (source->internal)
3375       sess->stats.internal_sources--;
3376
3377     if (byetimeout)
3378       on_bye_timeout (sess, source);
3379     else
3380       on_timeout (sess, source);
3381   } else {
3382     if (sendertimeout) {
3383       source->is_sender = FALSE;
3384       sess->stats.sender_sources--;
3385       if (source->internal)
3386         sess->stats.internal_sender_sources--;
3387
3388       on_sender_timeout (sess, source);
3389     }
3390     /* count how many source to report in this generation */
3391     if (((gint16) (source->generation - sess->generation)) <= 0)
3392       data->num_to_report++;
3393   }
3394   source->closing = remove;
3395 }
3396
3397 static void
3398 session_sdes (RTPSession * sess, ReportData * data)
3399 {
3400   GstRTCPPacket *packet = &data->packet;
3401   const GstStructure *sdes;
3402   gint i, n_fields;
3403   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3404
3405   /* add SDES packet */
3406   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
3407
3408   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
3409
3410   sdes = rtp_source_get_sdes_struct (data->source);
3411
3412   /* add all fields in the structure, the order is not important. */
3413   n_fields = gst_structure_n_fields (sdes);
3414   for (i = 0; i < n_fields; ++i) {
3415     const gchar *field;
3416     const gchar *value;
3417     GstRTCPSDESType type;
3418
3419     field = gst_structure_nth_field_name (sdes, i);
3420     if (field == NULL)
3421       continue;
3422     value = gst_structure_get_string (sdes, field);
3423     if (value == NULL)
3424       continue;
3425     type = gst_rtcp_sdes_name_to_type (field);
3426
3427     /* Early packets are minimal and only include the CNAME */
3428     if (data->is_early && type != GST_RTCP_SDES_CNAME)
3429       continue;
3430
3431     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
3432       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
3433           (const guint8 *) value);
3434     } else if (type == GST_RTCP_SDES_PRIV) {
3435       gsize prefix_len;
3436       gsize value_len;
3437       gsize data_len;
3438       guint8 data[256];
3439
3440       /* don't accept entries that are too big */
3441       prefix_len = strlen (field);
3442       if (prefix_len > 255)
3443         continue;
3444       value_len = strlen (value);
3445       if (value_len > 255)
3446         continue;
3447       data_len = 1 + prefix_len + value_len;
3448       if (data_len > 255)
3449         continue;
3450
3451       data[0] = prefix_len;
3452       memcpy (&data[1], field, prefix_len);
3453       memcpy (&data[1 + prefix_len], value, value_len);
3454
3455       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
3456     }
3457   }
3458
3459   data->has_sdes = TRUE;
3460 }
3461
3462 /* schedule a BYE packet */
3463 static void
3464 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
3465 {
3466   GstRTCPPacket *packet = &data->packet;
3467   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3468
3469   /* add SDES */
3470   session_sdes (sess, data);
3471   /* add a BYE packet */
3472   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
3473   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
3474   if (source->bye_reason)
3475     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
3476
3477   /* we have a BYE packet now */
3478   source->sent_bye = TRUE;
3479 }
3480
3481 static gboolean
3482 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
3483 {
3484   GstClockTime new_send_time;
3485   GstClockTime interval;
3486   RTPSessionStats *stats;
3487
3488   if (sess->scheduled_bye)
3489     stats = &sess->bye_stats;
3490   else
3491     stats = &sess->stats;
3492
3493   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3494     data->is_early = TRUE;
3495   else
3496     data->is_early = FALSE;
3497
3498   if (data->is_early && sess->next_early_rtcp_time < current_time) {
3499     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
3500         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
3501         GST_TIME_ARGS (current_time));
3502     goto early;
3503   }
3504
3505   /* no need to check yet */
3506   if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
3507       sess->next_rtcp_check_time > current_time) {
3508     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
3509         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
3510         GST_TIME_ARGS (current_time));
3511     return FALSE;
3512   }
3513
3514 early:
3515
3516   /* take interval and add jitter */
3517   interval = data->interval;
3518   if (interval != GST_CLOCK_TIME_NONE)
3519     interval = rtp_stats_add_rtcp_jitter (stats, interval);
3520
3521   if (sess->last_rtcp_send_time != GST_CLOCK_TIME_NONE) {
3522     /* perform forward reconsideration */
3523     if (interval != GST_CLOCK_TIME_NONE) {
3524       GstClockTime elapsed;
3525
3526       /* get elapsed time since we last reported */
3527       elapsed = current_time - sess->last_rtcp_send_time;
3528
3529       GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
3530           GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
3531       new_send_time = interval + sess->last_rtcp_send_time;
3532     } else {
3533       new_send_time = sess->last_rtcp_send_time;
3534     }
3535   } else {
3536     /* If this is the first RTCP packet, we can reconsider anything based
3537      * on the last RTCP send time because there was none.
3538      */
3539     g_warn_if_fail (!data->is_early);
3540     data->is_early = FALSE;
3541     new_send_time = current_time;
3542   }
3543
3544   if (!data->is_early) {
3545     /* check if reconsideration */
3546     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
3547       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
3548           GST_TIME_ARGS (new_send_time));
3549       /* store new check time */
3550       sess->next_rtcp_check_time = new_send_time;
3551       return FALSE;
3552     }
3553     sess->next_rtcp_check_time = current_time + interval;
3554   } else if (interval != GST_CLOCK_TIME_NONE) {
3555     /* Apply the rules from RFC 4585 section 3.5.3 */
3556     if (stats->min_interval != 0 && !sess->first_rtcp) {
3557       GstClockTime T_rr_current_interval =
3558           g_random_double_range (0.5, 1.5) * stats->min_interval;
3559
3560       /* This will caused the RTCP to be suppressed if no FB packets are added */
3561       if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) {
3562         GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
3563             " last: %" GST_TIME_FORMAT
3564             " + T_rr_current_interval: %" GST_TIME_FORMAT
3565             " >  new_send_time: %" GST_TIME_FORMAT,
3566             GST_TIME_ARGS (stats->min_interval),
3567             GST_TIME_ARGS (sess->last_rtcp_send_time),
3568             GST_TIME_ARGS (T_rr_current_interval),
3569             GST_TIME_ARGS (new_send_time));
3570         data->may_suppress = TRUE;
3571       }
3572     }
3573   }
3574
3575   GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
3576       GST_TIME_ARGS (new_send_time));
3577
3578   return TRUE;
3579 }
3580
3581 static void
3582 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
3583 {
3584   g_hash_table_insert (hash_table, key, g_object_ref (source));
3585 }
3586
3587 static gboolean
3588 remove_closing_sources (const gchar * key, RTPSource * source,
3589     ReportData * data)
3590 {
3591   if (source->closing)
3592     return TRUE;
3593
3594   if (source->send_fir)
3595     data->have_fir = TRUE;
3596   if (source->send_pli)
3597     data->have_pli = TRUE;
3598   if (source->send_nack)
3599     data->have_nack = TRUE;
3600
3601   return FALSE;
3602 }
3603
3604 static void
3605 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
3606 {
3607   RTPSession *sess = data->sess;
3608   gboolean is_bye = FALSE;
3609   ReportOutput *output;
3610
3611   /* only generate RTCP for active internal sources */
3612   if (!source->internal || source->sent_bye)
3613     return;
3614
3615   /* ignore other sources when we do the timeout after a scheduled BYE */
3616   if (sess->scheduled_bye && !source->marked_bye)
3617     return;
3618
3619   data->source = source;
3620
3621   /* open packet */
3622   session_start_rtcp (sess, data);
3623
3624   if (source->marked_bye) {
3625     /* send BYE */
3626     make_source_bye (sess, source, data);
3627     is_bye = TRUE;
3628   } else if (!data->is_early) {
3629     /* loop over all known sources and add report blocks. If we are early, we
3630      * just make a minimal RTCP packet and skip this step */
3631     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3632         (GHFunc) session_report_blocks, data);
3633   }
3634   if (!data->has_sdes)
3635     session_sdes (sess, data);
3636
3637   if (data->have_fir)
3638     session_fir (sess, data);
3639
3640   if (data->have_pli)
3641     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3642         (GHFunc) session_pli, data);
3643
3644   if (data->have_nack)
3645     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3646         (GHFunc) session_nack, data);
3647
3648   gst_rtcp_buffer_unmap (&data->rtcpbuf);
3649
3650   output = g_slice_new (ReportOutput);
3651   output->source = g_object_ref (source);
3652   output->is_bye = is_bye;
3653   output->buffer = data->rtcp;
3654   /* queue the RTCP packet to push later */
3655   g_queue_push_tail (&data->output, output);
3656 }
3657
3658 static void
3659 update_generation (const gchar * key, RTPSource * source, ReportData * data)
3660 {
3661   RTPSession *sess = data->sess;
3662
3663   if (g_hash_table_size (source->reported_in_sr_of) >=
3664       sess->stats.internal_sources) {
3665     /* source is reported, move to next generation */
3666     source->generation = sess->generation + 1;
3667     g_hash_table_remove_all (source->reported_in_sr_of);
3668
3669     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
3670         source->generation);
3671
3672     /* if we reported all sources in this generation, move to next */
3673     if (--data->num_to_report == 0) {
3674       sess->generation++;
3675       GST_DEBUG ("all reported, generation now %u", sess->generation);
3676     }
3677   }
3678 }
3679
3680 /**
3681  * rtp_session_on_timeout:
3682  * @sess: an #RTPSession
3683  * @current_time: the current system time
3684  * @ntpnstime: the current NTP time in nanoseconds
3685  * @running_time: the current running_time of the pipeline
3686  *
3687  * Perform maintenance actions after the timeout obtained with
3688  * rtp_session_next_timeout() expired.
3689  *
3690  * This function will perform timeouts of receivers and senders, send a BYE
3691  * packet or generate RTCP packets with current session stats.
3692  *
3693  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
3694  * times, for each packet that should be processed.
3695  *
3696  * Returns: a #GstFlowReturn.
3697  */
3698 GstFlowReturn
3699 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
3700     guint64 ntpnstime, GstClockTime running_time)
3701 {
3702   GstFlowReturn result = GST_FLOW_OK;
3703   ReportData data = { GST_RTCP_BUFFER_INIT };
3704   GHashTable *table_copy;
3705   ReportOutput *output;
3706
3707   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3708
3709   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
3710       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3711       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
3712
3713   data.sess = sess;
3714   data.current_time = current_time;
3715   data.ntpnstime = ntpnstime;
3716   data.running_time = running_time;
3717   data.num_to_report = 0;
3718   data.may_suppress = FALSE;
3719   data.nacked_seqnums = 0;
3720   g_queue_init (&data.output);
3721
3722   RTP_SESSION_LOCK (sess);
3723   /* get a new interval, we need this for various cleanups etc */
3724   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
3725
3726   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
3727
3728   /* we need an internal source now */
3729   if (sess->stats.internal_sources == 0) {
3730     RTPSource *source;
3731     gboolean created;
3732
3733     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
3734         current_time);
3735     g_object_unref (source);
3736   }
3737
3738   sess->conflicting_addresses =
3739       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
3740
3741   /* Make a local copy of the hashtable. We need to do this because the
3742    * cleanup stage below releases the session lock. */
3743   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
3744       (GDestroyNotify) g_object_unref);
3745   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3746       (GHFunc) clone_ssrcs_hashtable, table_copy);
3747
3748   /* Clean up the session, mark the source for removing, this might release the
3749    * session lock. */
3750   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
3751   g_hash_table_destroy (table_copy);
3752
3753   /* Now remove the marked sources */
3754   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
3755       (GHRFunc) remove_closing_sources, &data);
3756
3757   /* update point-to-point status */
3758   session_update_ptp (sess);
3759
3760   /* see if we need to generate SR or RR packets */
3761   if (!is_rtcp_time (sess, current_time, &data))
3762     goto done;
3763
3764   GST_DEBUG ("doing RTCP generation %u for %u sources, early %d",
3765       sess->generation, data.num_to_report, data.is_early);
3766
3767   /* generate RTCP for all internal sources */
3768   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3769       (GHFunc) generate_rtcp, &data);
3770
3771   /* update the generation for all the sources that have been reported */
3772   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3773       (GHFunc) update_generation, &data);
3774
3775   /* we keep track of the last report time in order to timeout inactive
3776    * receivers or senders */
3777   if (!data.is_early && !data.may_suppress)
3778     sess->last_rtcp_send_time = data.current_time;
3779   sess->first_rtcp = FALSE;
3780   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
3781   sess->scheduled_bye = FALSE;
3782
3783   /*  RFC 4585 section 3.5.2 step 6 */
3784   if (!data.is_early) {
3785     sess->allow_early = TRUE;
3786   }
3787
3788 done:
3789   RTP_SESSION_UNLOCK (sess);
3790
3791   /* push out the RTCP packets */
3792   while ((output = g_queue_pop_head (&data.output))) {
3793     gboolean do_not_suppress;
3794     GstBuffer *buffer = output->buffer;
3795     RTPSource *source = output->source;
3796
3797     /* Give the user a change to add its own packet */
3798     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
3799         buffer, data.is_early, &do_not_suppress);
3800
3801     if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
3802       guint packet_size;
3803
3804       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
3805
3806       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
3807       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
3808           sess->stats.avg_rtcp_packet_size, packet_size);
3809       result =
3810           sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
3811           sess->send_rtcp_user_data);
3812       sess->stats.nacks_sent += data.nacked_seqnums;
3813     } else {
3814       GST_DEBUG ("freeing packet callback: %p"
3815           " do_not_suppress: %d may_suppress: %d",
3816           sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
3817       sess->stats.nacks_dropped += data.nacked_seqnums;
3818       gst_buffer_unref (buffer);
3819     }
3820     g_object_unref (source);
3821     g_slice_free (ReportOutput, output);
3822   }
3823   return result;
3824 }
3825
3826 /**
3827  * rtp_session_request_early_rtcp:
3828  * @sess: an #RTPSession
3829  * @current_time: the current system time
3830  * @max_delay: maximum delay
3831  *
3832  * Request transmission of early RTCP
3833  *
3834  * Returns: %TRUE if the related RTCP can be scheduled.
3835  */
3836 gboolean
3837 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
3838     GstClockTime max_delay)
3839 {
3840   GstClockTime T_dither_max, T_rr;
3841   gboolean ret;
3842
3843   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
3844
3845   RTP_SESSION_LOCK (sess);
3846
3847   /* Check if already requested */
3848   /*  RFC 4585 section 3.5.2 step 2 */
3849   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3850     GST_LOG_OBJECT (sess, "already have next early rtcp time");
3851     ret = TRUE;
3852     goto end;
3853   }
3854
3855   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
3856     GST_LOG_OBJECT (sess, "no next RTCP check time");
3857     ret = FALSE;
3858     goto end;
3859   }
3860
3861   /* RFC 4585 section 3.5.3 step 1
3862    * If no regular RTCP packet has been sent before, then a regular
3863    * RTCP packet has to be scheduled first and FB messages might be
3864    * included there
3865    */
3866   if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
3867     GST_LOG_OBJECT (sess, "no RTCP sent yet");
3868
3869     if (current_time + max_delay > sess->next_rtcp_check_time) {
3870       GST_LOG_OBJECT (sess,
3871           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
3872           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3873           GST_TIME_ARGS (max_delay),
3874           GST_TIME_ARGS (sess->next_rtcp_check_time));
3875       ret = TRUE;
3876     } else {
3877       ret = FALSE;
3878     }
3879     goto end;
3880   }
3881
3882   T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
3883
3884   /*  RFC 4585 section 3.5.2 step 2b */
3885   /* If the total sources is <=2, then there is only us and one peer */
3886   /* When there is one auxiliary stream the session can still do point
3887    * to point.
3888    */
3889   if (sess->is_doing_ptp) {
3890     T_dither_max = 0;
3891   } else {
3892     /* Divide by 2 because l = 0.5 */
3893     T_dither_max = T_rr;
3894     T_dither_max /= 2;
3895   }
3896
3897   /*  RFC 4585 section 3.5.2 step 3 */
3898   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
3899     GST_LOG_OBJECT (sess, "don't send because of dither");
3900     ret = FALSE;
3901     goto end;
3902   }
3903
3904   /*  RFC 4585 section 3.5.2 step 4a */
3905   if (sess->allow_early == FALSE) {
3906     /* Ignore the request a scheduled packet will be in time anyway */
3907     if (current_time + max_delay > sess->next_rtcp_check_time) {
3908       GST_LOG_OBJECT (sess,
3909           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
3910           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3911           GST_TIME_ARGS (max_delay),
3912           GST_TIME_ARGS (sess->next_rtcp_check_time));
3913       ret = TRUE;
3914     } else {
3915       GST_LOG_OBJECT (sess, "can't allow early feedback");
3916       ret = FALSE;
3917     }
3918     goto end;
3919   }
3920
3921   /*  RFC 4585 section 3.5.2 step 4b */
3922   if (T_dither_max) {
3923     /* Schedule an early transmission later */
3924     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
3925         current_time;
3926   } else {
3927     /* If no dithering, schedule it for NOW */
3928     sess->next_early_rtcp_time = current_time;
3929   }
3930
3931   /*  RFC 4585 section 3.5.2 step 6 */
3932   sess->allow_early = FALSE;
3933   /* Delay next regular RTCP packet to not exceed the short-term
3934    * RTCP bandwidth when using early feedback as compared to
3935    * without */
3936   sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr;
3937   sess->last_rtcp_send_time += T_rr;
3938
3939   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT,
3940       GST_TIME_ARGS (sess->next_early_rtcp_time));
3941   RTP_SESSION_UNLOCK (sess);
3942
3943   /* notify app of need to send packet early
3944    * and therefore of timeout change */
3945   if (sess->callbacks.reconsider)
3946     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3947
3948   return TRUE;
3949
3950 end:
3951
3952   RTP_SESSION_UNLOCK (sess);
3953
3954   return ret;
3955 }
3956
3957 static gboolean
3958 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
3959 {
3960   GstClockTime now;
3961
3962   if (!sess->callbacks.send_rtcp)
3963     return FALSE;
3964
3965   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
3966
3967   return rtp_session_request_early_rtcp (sess, now, max_delay);
3968 }
3969
3970 gboolean
3971 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
3972     gboolean fir, gint count)
3973 {
3974   RTPSource *src;
3975
3976   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
3977     GST_DEBUG ("FIR/PLI not sent");
3978     return FALSE;
3979   }
3980
3981   RTP_SESSION_LOCK (sess);
3982   src = find_source (sess, ssrc);
3983   if (src == NULL)
3984     goto no_source;
3985
3986   if (fir) {
3987     src->send_pli = FALSE;
3988     src->send_fir = TRUE;
3989
3990     if (count == -1 || count != src->last_fir_count)
3991       src->current_send_fir_seqnum++;
3992     src->last_fir_count = count;
3993   } else if (!src->send_fir) {
3994     src->send_pli = TRUE;
3995   }
3996   RTP_SESSION_UNLOCK (sess);
3997
3998   return TRUE;
3999
4000   /* ERRORS */
4001 no_source:
4002   {
4003     RTP_SESSION_UNLOCK (sess);
4004     return FALSE;
4005   }
4006 }
4007
4008 /**
4009  * rtp_session_request_nack:
4010  * @sess: a #RTPSession
4011  * @ssrc: the SSRC
4012  * @seqnum: the missing seqnum
4013  * @max_delay: max delay to request NACK
4014  *
4015  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
4016  *
4017  * Returns: %TRUE if the NACK feedback could be scheduled
4018  */
4019 gboolean
4020 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
4021     GstClockTime max_delay)
4022 {
4023   RTPSource *source;
4024
4025   if (!rtp_session_send_rtcp (sess, max_delay)) {
4026     GST_DEBUG ("NACK not sent");
4027     return FALSE;
4028   }
4029
4030   RTP_SESSION_LOCK (sess);
4031   source = find_source (sess, ssrc);
4032   if (source == NULL)
4033     goto no_source;
4034
4035   GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
4036   rtp_source_register_nack (source, seqnum);
4037   RTP_SESSION_UNLOCK (sess);
4038
4039   return TRUE;
4040
4041   /* ERRORS */
4042 no_source:
4043   {
4044     RTP_SESSION_UNLOCK (sess);
4045     return FALSE;
4046   }
4047 }