rtpsession: Handle first RTCP packet and early feedback correctly
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25
26 #include <gst/rtp/gstrtpbuffer.h>
27 #include <gst/rtp/gstrtcpbuffer.h>
28
29 #include <gst/glib-compat-private.h>
30
31 #include "rtpsession.h"
32
33 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
34 #define GST_CAT_DEFAULT rtp_session_debug
35
36 /* signals and args */
37 enum
38 {
39   SIGNAL_GET_SOURCE_BY_SSRC,
40   SIGNAL_ON_NEW_SSRC,
41   SIGNAL_ON_SSRC_COLLISION,
42   SIGNAL_ON_SSRC_VALIDATED,
43   SIGNAL_ON_SSRC_ACTIVE,
44   SIGNAL_ON_SSRC_SDES,
45   SIGNAL_ON_BYE_SSRC,
46   SIGNAL_ON_BYE_TIMEOUT,
47   SIGNAL_ON_TIMEOUT,
48   SIGNAL_ON_SENDER_TIMEOUT,
49   SIGNAL_ON_SENDING_RTCP,
50   SIGNAL_ON_FEEDBACK_RTCP,
51   SIGNAL_SEND_RTCP,
52   SIGNAL_SEND_RTCP_FULL,
53   SIGNAL_ON_RECEIVING_RTCP,
54   LAST_SIGNAL
55 };
56
57 #define DEFAULT_INTERNAL_SOURCE      NULL
58 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
59 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
60 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
61 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
62 #define DEFAULT_RTCP_MTU             1400
63 #define DEFAULT_SDES                 NULL
64 #define DEFAULT_NUM_SOURCES          0
65 #define DEFAULT_NUM_ACTIVE_SOURCES   0
66 #define DEFAULT_SOURCES              NULL
67 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
68 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
69 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
70 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
71
72 enum
73 {
74   PROP_0,
75   PROP_INTERNAL_SSRC,
76   PROP_INTERNAL_SOURCE,
77   PROP_BANDWIDTH,
78   PROP_RTCP_FRACTION,
79   PROP_RTCP_RR_BANDWIDTH,
80   PROP_RTCP_RS_BANDWIDTH,
81   PROP_RTCP_MTU,
82   PROP_SDES,
83   PROP_NUM_SOURCES,
84   PROP_NUM_ACTIVE_SOURCES,
85   PROP_SOURCES,
86   PROP_FAVOR_NEW,
87   PROP_RTCP_MIN_INTERVAL,
88   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
89   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
90   PROP_PROBATION,
91   PROP_STATS,
92   PROP_LAST
93 };
94
95 /* update average packet size */
96 #define INIT_AVG(avg, val) \
97    (avg) = (val);
98 #define UPDATE_AVG(avg, val)            \
99   if ((avg) == 0)                       \
100    (avg) = (val);                       \
101   else                                  \
102    (avg) = ((val) + (15 * (avg))) >> 4;
103
104
105 /* GObject vmethods */
106 static void rtp_session_finalize (GObject * object);
107 static void rtp_session_set_property (GObject * object, guint prop_id,
108     const GValue * value, GParamSpec * pspec);
109 static void rtp_session_get_property (GObject * object, guint prop_id,
110     GValue * value, GParamSpec * pspec);
111
112 static gboolean rtp_session_send_rtcp (RTPSession * sess,
113     GstClockTime max_delay);
114
115 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
116
117 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
118
119 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
120 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
121     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
122 static RTPSource *obtain_internal_source (RTPSession * sess,
123     guint32 ssrc, gboolean * created, GstClockTime current_time);
124 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
125     GstClockTime current_time);
126 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
127     gboolean deterministic, gboolean first);
128
129 static gboolean
130 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
131     const GValue * handler_return, gpointer data)
132 {
133   if (g_value_get_boolean (handler_return))
134     g_value_set_boolean (return_accu, TRUE);
135
136   return TRUE;
137 }
138
139 static void
140 rtp_session_class_init (RTPSessionClass * klass)
141 {
142   GObjectClass *gobject_class;
143
144   gobject_class = (GObjectClass *) klass;
145
146   gobject_class->finalize = rtp_session_finalize;
147   gobject_class->set_property = rtp_session_set_property;
148   gobject_class->get_property = rtp_session_get_property;
149
150   /**
151    * RTPSession::get-source-by-ssrc:
152    * @session: the object which received the signal
153    * @ssrc: the SSRC of the RTPSource
154    *
155    * Request the #RTPSource object with SSRC @ssrc in @session.
156    */
157   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
158       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
159       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
160           get_source_by_ssrc), NULL, NULL, g_cclosure_marshal_generic,
161       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
162
163   /**
164    * RTPSession::on-new-ssrc:
165    * @session: the object which received the signal
166    * @src: the new RTPSource
167    *
168    * Notify of a new SSRC that entered @session.
169    */
170   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
171       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
172       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
173       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
174       RTP_TYPE_SOURCE);
175   /**
176    * RTPSession::on-ssrc-collision:
177    * @session: the object which received the signal
178    * @src: the #RTPSource that caused a collision
179    *
180    * Notify when we have an SSRC collision
181    */
182   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
183       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
184       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
185       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
186       RTP_TYPE_SOURCE);
187   /**
188    * RTPSession::on-ssrc-validated:
189    * @session: the object which received the signal
190    * @src: the new validated RTPSource
191    *
192    * Notify of a new SSRC that became validated.
193    */
194   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
195       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
196       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
197       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
198       RTP_TYPE_SOURCE);
199   /**
200    * RTPSession::on-ssrc-active:
201    * @session: the object which received the signal
202    * @src: the active RTPSource
203    *
204    * Notify of a SSRC that is active, i.e., sending RTCP.
205    */
206   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
207       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
208       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
209       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
210       RTP_TYPE_SOURCE);
211   /**
212    * RTPSession::on-ssrc-sdes:
213    * @session: the object which received the signal
214    * @src: the RTPSource
215    *
216    * Notify that a new SDES was received for SSRC.
217    */
218   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
219       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
220       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
221       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
222       RTP_TYPE_SOURCE);
223   /**
224    * RTPSession::on-bye-ssrc:
225    * @session: the object which received the signal
226    * @src: the RTPSource that went away
227    *
228    * Notify of an SSRC that became inactive because of a BYE packet.
229    */
230   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
231       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
232       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
233       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
234       RTP_TYPE_SOURCE);
235   /**
236    * RTPSession::on-bye-timeout:
237    * @session: the object which received the signal
238    * @src: the RTPSource that timed out
239    *
240    * Notify of an SSRC that has timed out because of BYE
241    */
242   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
243       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
245       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
246       RTP_TYPE_SOURCE);
247   /**
248    * RTPSession::on-timeout:
249    * @session: the object which received the signal
250    * @src: the RTPSource that timed out
251    *
252    * Notify of an SSRC that has timed out
253    */
254   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
255       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
256       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
257       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
258       RTP_TYPE_SOURCE);
259   /**
260    * RTPSession::on-sender-timeout:
261    * @session: the object which received the signal
262    * @src: the RTPSource that timed out
263    *
264    * Notify of an SSRC that was a sender but timed out and became a receiver.
265    */
266   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
267       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
268       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
269       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
270       RTP_TYPE_SOURCE);
271
272   /**
273    * RTPSession::on-sending-rtcp
274    * @session: the object which received the signal
275    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
276    * @early: %TRUE if the packet is early, %FALSE if it is regular
277    *
278    * This signal is emitted before sending an RTCP packet, it can be used
279    * to add extra RTCP Packets.
280    *
281    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
282    * if suppressing it is acceptable
283    */
284   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
285       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
286       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
287       accumulate_trues, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2,
288       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
289
290   /**
291    * RTPSession::on-feedback-rtcp:
292    * @session: the object which received the signal
293    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
294    *  %GST_RTCP_TYPE_RTPFB
295    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
296    * @sender_ssrc: The SSRC of the sender
297    * @media_ssrc: The SSRC of the media this refers to
298    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
299    * there was no FCI
300    *
301    * Notify that a RTCP feedback packet has been received
302    */
303   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
304       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
305       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
306       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 5, G_TYPE_UINT,
307       G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, GST_TYPE_BUFFER);
308
309   /**
310    * RTPSession::send-rtcp:
311    * @session: the object which received the signal
312    * @max_delay: The maximum delay after which the feedback will not be useful
313    *  anymore
314    *
315    * Requests that the #RTPSession initiate a new RTCP packet as soon as
316    * possible within the requested delay.
317    */
318   rtp_session_signals[SIGNAL_SEND_RTCP] =
319       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
320       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
321       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
322       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
323
324   /**
325    * RTPSession::send-rtcp-full:
326    * @session: the object which received the signal
327    * @max_delay: The maximum delay after which the feedback will not be useful
328    *  anymore
329    *
330    * Requests that the #RTPSession initiate a new RTCP packet as soon as
331    * possible within the requested delay.
332    *
333    * Returns: TRUE if the new RTCP packet could be scheduled within the
334    * requested delay, FALSE otherwise.
335    *
336    * Since: 1.6
337    */
338   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
339       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
340       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
341       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
342       g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
343
344   /**
345    * RTPSession::on-receiving-rtcp
346    * @session: the object which received the signal
347    * @buffer: the #GstBuffer containing the RTCP packet that was received
348    *
349    * This signal is emitted when receiving an RTCP packet before it is handled
350    * by the session. It can be used to extract custom information from RTCP packets.
351    *
352    * Since: 1.6
353    */
354   rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
355       g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
356       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
357       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
358       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
359
360   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
361       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
362           "The internal SSRC used for the session (deprecated)",
363           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364
365   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
366       g_param_spec_object ("internal-source", "Internal Source",
367           "The internal source element of the session (deprecated)",
368           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
371       g_param_spec_double ("bandwidth", "Bandwidth",
372           "The bandwidth of the session (0 for auto-discover)",
373           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
374           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375
376   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
377       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
378           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
379           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
380           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381
382   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
383       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
384           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
385           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
386           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
387
388   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
389       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
390           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
391           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
392           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393
394   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
395       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
396           "The maximum size of the RTCP packets",
397           16, G_MAXINT16, DEFAULT_RTCP_MTU,
398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399
400   g_object_class_install_property (gobject_class, PROP_SDES,
401       g_param_spec_boxed ("sdes", "SDES",
402           "The SDES items of this session",
403           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404
405   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
406       g_param_spec_uint ("num-sources", "Num Sources",
407           "The number of sources in the session", 0, G_MAXUINT,
408           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
409
410   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
411       g_param_spec_uint ("num-active-sources", "Num Active Sources",
412           "The number of active sources in the session", 0, G_MAXUINT,
413           DEFAULT_NUM_ACTIVE_SOURCES,
414           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
415   /**
416    * RTPSource::sources
417    *
418    * Get a GValue Array of all sources in the session.
419    *
420    * <example>
421    * <title>Getting the #RTPSources of a session
422    * <programlisting>
423    * {
424    *   GValueArray *arr;
425    *   GValue *val;
426    *   guint i;
427    *
428    *   g_object_get (sess, "sources", &arr, NULL);
429    *
430    *   for (i = 0; i < arr->n_values; i++) {
431    *     RTPSource *source;
432    *
433    *     val = g_value_array_get_nth (arr, i);
434    *     source = g_value_get_object (val);
435    *   }
436    *   g_value_array_free (arr);
437    * }
438    * </programlisting>
439    * </example>
440    */
441   g_object_class_install_property (gobject_class, PROP_SOURCES,
442       g_param_spec_boxed ("sources", "Sources",
443           "An array of all known sources in the session",
444           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
445
446   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
447       g_param_spec_boolean ("favor-new", "Favor new sources",
448           "Resolve SSRC conflict in favor of new sources", FALSE,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450
451   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
452       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
453           "Minimum interval between Regular RTCP packet (in ns)",
454           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
455           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456
457   g_object_class_install_property (gobject_class,
458       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
459       g_param_spec_uint64 ("rtcp-feedback-retention-window",
460           "RTCP Feedback retention window",
461           "Duration during which RTCP Feedback packets are retained (in ns)",
462           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
463           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
464
465   g_object_class_install_property (gobject_class,
466       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
467       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
468           "RTCP Immediate Feedback threshold",
469           "The maximum number of members of a RTP session for which immediate"
470           " feedback is used (DEPRECATED: has no effect and is not needed)",
471           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
472           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
473
474   g_object_class_install_property (gobject_class, PROP_PROBATION,
475       g_param_spec_uint ("probation", "Number of probations",
476           "Consecutive packet sequence numbers to accept the source",
477           0, G_MAXUINT, DEFAULT_PROBATION,
478           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
479
480   /**
481    * RTPSession::stats:
482    *
483    * Various session statistics. This property returns a GstStructure
484    * with name application/x-rtp-session-stats with the following fields:
485    *
486    *  "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
487    *      dropped (due to bandwidth constraints)
488    *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
489    *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
490    *
491    * Since: 1.4
492    */
493   g_object_class_install_property (gobject_class, PROP_STATS,
494       g_param_spec_boxed ("stats", "Statistics",
495           "Various statistics", GST_TYPE_STRUCTURE,
496           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
497
498   klass->get_source_by_ssrc =
499       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
500   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
501
502   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
503 }
504
505 static void
506 rtp_session_init (RTPSession * sess)
507 {
508   gint i;
509   gchar *str;
510
511   g_mutex_init (&sess->lock);
512   sess->key = g_random_int ();
513   sess->mask_idx = 0;
514   sess->mask = 0;
515
516   for (i = 0; i < 32; i++) {
517     sess->ssrcs[i] =
518         g_hash_table_new_full (NULL, NULL, NULL,
519         (GDestroyNotify) g_object_unref);
520   }
521
522   rtp_stats_init_defaults (&sess->stats);
523   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
524   rtp_stats_set_min_interval (&sess->stats,
525       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
526
527   sess->recalc_bandwidth = TRUE;
528   sess->bandwidth = DEFAULT_BANDWIDTH;
529   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
530   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
531   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
532
533   /* default UDP header length */
534   sess->header_len = 28;
535   sess->mtu = DEFAULT_RTCP_MTU;
536
537   sess->probation = DEFAULT_PROBATION;
538
539   /* some default SDES entries */
540   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
541
542   /* we do not want to leak details like the username or hostname here */
543   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
544   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
545   g_free (str);
546
547 #if 0
548   /* we do not want to leak the user's real name here */
549   str = g_strdup_printf ("Anon%u", g_random_int ());
550   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
551   g_free (str);
552 #endif
553
554   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
555
556   /* this is the SSRC we suggest */
557   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
558
559   sess->first_rtcp = TRUE;
560   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
561   sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
562
563   sess->allow_early = TRUE;
564   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
565   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
566   sess->rtcp_immediate_feedback_threshold =
567       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
568
569   sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
570
571   sess->is_doing_ptp = TRUE;
572 }
573
574 static void
575 rtp_session_finalize (GObject * object)
576 {
577   RTPSession *sess;
578   gint i;
579
580   sess = RTP_SESSION_CAST (object);
581
582   gst_structure_free (sess->sdes);
583
584   g_list_free_full (sess->conflicting_addresses,
585       (GDestroyNotify) rtp_conflicting_address_free);
586
587   for (i = 0; i < 32; i++)
588     g_hash_table_destroy (sess->ssrcs[i]);
589
590   g_mutex_clear (&sess->lock);
591
592   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
593 }
594
595 static void
596 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
597 {
598   GValue value = { 0 };
599
600   g_value_init (&value, RTP_TYPE_SOURCE);
601   g_value_take_object (&value, source);
602   /* copies the value */
603   g_value_array_append (arr, &value);
604 }
605
606 static GValueArray *
607 rtp_session_create_sources (RTPSession * sess)
608 {
609   GValueArray *res;
610   guint size;
611
612   RTP_SESSION_LOCK (sess);
613   /* get number of elements in the table */
614   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
615   /* create the result value array */
616   res = g_value_array_new (size);
617
618   /* and copy all values into the array */
619   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
620   RTP_SESSION_UNLOCK (sess);
621
622   return res;
623 }
624
625 static GstStructure *
626 rtp_session_create_stats (RTPSession * sess)
627 {
628   GstStructure *s;
629
630   s = gst_structure_new ("application/x-rtp-session-stats",
631       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
632       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
633       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
634
635   return s;
636 }
637
638 static void
639 rtp_session_set_property (GObject * object, guint prop_id,
640     const GValue * value, GParamSpec * pspec)
641 {
642   RTPSession *sess;
643
644   sess = RTP_SESSION (object);
645
646   switch (prop_id) {
647     case PROP_INTERNAL_SSRC:
648       RTP_SESSION_LOCK (sess);
649       sess->suggested_ssrc = g_value_get_uint (value);
650       RTP_SESSION_UNLOCK (sess);
651       if (sess->callbacks.reconfigure)
652         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
653       break;
654     case PROP_BANDWIDTH:
655       RTP_SESSION_LOCK (sess);
656       sess->bandwidth = g_value_get_double (value);
657       sess->recalc_bandwidth = TRUE;
658       RTP_SESSION_UNLOCK (sess);
659       break;
660     case PROP_RTCP_FRACTION:
661       RTP_SESSION_LOCK (sess);
662       sess->rtcp_bandwidth = g_value_get_double (value);
663       sess->recalc_bandwidth = TRUE;
664       RTP_SESSION_UNLOCK (sess);
665       break;
666     case PROP_RTCP_RR_BANDWIDTH:
667       RTP_SESSION_LOCK (sess);
668       sess->rtcp_rr_bandwidth = g_value_get_int (value);
669       sess->recalc_bandwidth = TRUE;
670       RTP_SESSION_UNLOCK (sess);
671       break;
672     case PROP_RTCP_RS_BANDWIDTH:
673       RTP_SESSION_LOCK (sess);
674       sess->rtcp_rs_bandwidth = g_value_get_int (value);
675       sess->recalc_bandwidth = TRUE;
676       RTP_SESSION_UNLOCK (sess);
677       break;
678     case PROP_RTCP_MTU:
679       sess->mtu = g_value_get_uint (value);
680       break;
681     case PROP_SDES:
682       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
683       break;
684     case PROP_FAVOR_NEW:
685       sess->favor_new = g_value_get_boolean (value);
686       break;
687     case PROP_RTCP_MIN_INTERVAL:
688       rtp_stats_set_min_interval (&sess->stats,
689           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
690       /* trigger reconsideration */
691       RTP_SESSION_LOCK (sess);
692       sess->next_rtcp_check_time = 0;
693       RTP_SESSION_UNLOCK (sess);
694       if (sess->callbacks.reconsider)
695         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
696       break;
697     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
698       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
699       break;
700     case PROP_PROBATION:
701       sess->probation = g_value_get_uint (value);
702       break;
703     default:
704       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
705       break;
706   }
707 }
708
709 static void
710 rtp_session_get_property (GObject * object, guint prop_id,
711     GValue * value, GParamSpec * pspec)
712 {
713   RTPSession *sess;
714
715   sess = RTP_SESSION (object);
716
717   switch (prop_id) {
718     case PROP_INTERNAL_SSRC:
719       g_value_set_uint (value, rtp_session_suggest_ssrc (sess));
720       break;
721     case PROP_INTERNAL_SOURCE:
722       /* FIXME, return a random source */
723       g_value_set_object (value, NULL);
724       break;
725     case PROP_BANDWIDTH:
726       g_value_set_double (value, sess->bandwidth);
727       break;
728     case PROP_RTCP_FRACTION:
729       g_value_set_double (value, sess->rtcp_bandwidth);
730       break;
731     case PROP_RTCP_RR_BANDWIDTH:
732       g_value_set_int (value, sess->rtcp_rr_bandwidth);
733       break;
734     case PROP_RTCP_RS_BANDWIDTH:
735       g_value_set_int (value, sess->rtcp_rs_bandwidth);
736       break;
737     case PROP_RTCP_MTU:
738       g_value_set_uint (value, sess->mtu);
739       break;
740     case PROP_SDES:
741       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
742       break;
743     case PROP_NUM_SOURCES:
744       g_value_set_uint (value, rtp_session_get_num_sources (sess));
745       break;
746     case PROP_NUM_ACTIVE_SOURCES:
747       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
748       break;
749     case PROP_SOURCES:
750       g_value_take_boxed (value, rtp_session_create_sources (sess));
751       break;
752     case PROP_FAVOR_NEW:
753       g_value_set_boolean (value, sess->favor_new);
754       break;
755     case PROP_RTCP_MIN_INTERVAL:
756       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
757       break;
758     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
759       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
760       break;
761     case PROP_PROBATION:
762       g_value_set_uint (value, sess->probation);
763       break;
764     case PROP_STATS:
765       g_value_take_boxed (value, rtp_session_create_stats (sess));
766       break;
767     default:
768       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
769       break;
770   }
771 }
772
773 static void
774 on_new_ssrc (RTPSession * sess, RTPSource * source)
775 {
776   g_object_ref (source);
777   RTP_SESSION_UNLOCK (sess);
778   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
779   RTP_SESSION_LOCK (sess);
780   g_object_unref (source);
781 }
782
783 static void
784 on_ssrc_collision (RTPSession * sess, RTPSource * source)
785 {
786   g_object_ref (source);
787   RTP_SESSION_UNLOCK (sess);
788   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
789       source);
790   RTP_SESSION_LOCK (sess);
791   g_object_unref (source);
792 }
793
794 static void
795 on_ssrc_validated (RTPSession * sess, RTPSource * source)
796 {
797   g_object_ref (source);
798   RTP_SESSION_UNLOCK (sess);
799   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
800       source);
801   RTP_SESSION_LOCK (sess);
802   g_object_unref (source);
803 }
804
805 static void
806 on_ssrc_active (RTPSession * sess, RTPSource * source)
807 {
808   g_object_ref (source);
809   RTP_SESSION_UNLOCK (sess);
810   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
811   RTP_SESSION_LOCK (sess);
812   g_object_unref (source);
813 }
814
815 static void
816 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
817 {
818   g_object_ref (source);
819   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
820   RTP_SESSION_UNLOCK (sess);
821   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
822   RTP_SESSION_LOCK (sess);
823   g_object_unref (source);
824 }
825
826 static void
827 on_bye_ssrc (RTPSession * sess, RTPSource * source)
828 {
829   g_object_ref (source);
830   RTP_SESSION_UNLOCK (sess);
831   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
832   RTP_SESSION_LOCK (sess);
833   g_object_unref (source);
834 }
835
836 static void
837 on_bye_timeout (RTPSession * sess, RTPSource * source)
838 {
839   g_object_ref (source);
840   RTP_SESSION_UNLOCK (sess);
841   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
842   RTP_SESSION_LOCK (sess);
843   g_object_unref (source);
844 }
845
846 static void
847 on_timeout (RTPSession * sess, RTPSource * source)
848 {
849   g_object_ref (source);
850   RTP_SESSION_UNLOCK (sess);
851   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
852   RTP_SESSION_LOCK (sess);
853   g_object_unref (source);
854 }
855
856 static void
857 on_sender_timeout (RTPSession * sess, RTPSource * source)
858 {
859   g_object_ref (source);
860   RTP_SESSION_UNLOCK (sess);
861   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
862       source);
863   RTP_SESSION_LOCK (sess);
864   g_object_unref (source);
865 }
866
867 /**
868  * rtp_session_new:
869  *
870  * Create a new session object.
871  *
872  * Returns: a new #RTPSession. g_object_unref() after usage.
873  */
874 RTPSession *
875 rtp_session_new (void)
876 {
877   RTPSession *sess;
878
879   sess = g_object_new (RTP_TYPE_SESSION, NULL);
880
881   return sess;
882 }
883
884 /**
885  * rtp_session_set_callbacks:
886  * @sess: an #RTPSession
887  * @callbacks: callbacks to configure
888  * @user_data: user data passed in the callbacks
889  *
890  * Configure a set of callbacks to be notified of actions.
891  */
892 void
893 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
894     gpointer user_data)
895 {
896   g_return_if_fail (RTP_IS_SESSION (sess));
897
898   if (callbacks->process_rtp) {
899     sess->callbacks.process_rtp = callbacks->process_rtp;
900     sess->process_rtp_user_data = user_data;
901   }
902   if (callbacks->send_rtp) {
903     sess->callbacks.send_rtp = callbacks->send_rtp;
904     sess->send_rtp_user_data = user_data;
905   }
906   if (callbacks->send_rtcp) {
907     sess->callbacks.send_rtcp = callbacks->send_rtcp;
908     sess->send_rtcp_user_data = user_data;
909   }
910   if (callbacks->sync_rtcp) {
911     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
912     sess->sync_rtcp_user_data = user_data;
913   }
914   if (callbacks->clock_rate) {
915     sess->callbacks.clock_rate = callbacks->clock_rate;
916     sess->clock_rate_user_data = user_data;
917   }
918   if (callbacks->reconsider) {
919     sess->callbacks.reconsider = callbacks->reconsider;
920     sess->reconsider_user_data = user_data;
921   }
922   if (callbacks->request_key_unit) {
923     sess->callbacks.request_key_unit = callbacks->request_key_unit;
924     sess->request_key_unit_user_data = user_data;
925   }
926   if (callbacks->request_time) {
927     sess->callbacks.request_time = callbacks->request_time;
928     sess->request_time_user_data = user_data;
929   }
930   if (callbacks->notify_nack) {
931     sess->callbacks.notify_nack = callbacks->notify_nack;
932     sess->notify_nack_user_data = user_data;
933   }
934   if (callbacks->reconfigure) {
935     sess->callbacks.reconfigure = callbacks->reconfigure;
936     sess->reconfigure_user_data = user_data;
937   }
938 }
939
940 /**
941  * rtp_session_set_process_rtp_callback:
942  * @sess: an #RTPSession
943  * @callback: callback to set
944  * @user_data: user data passed in the callback
945  *
946  * Configure only the process_rtp callback to be notified of the process_rtp action.
947  */
948 void
949 rtp_session_set_process_rtp_callback (RTPSession * sess,
950     RTPSessionProcessRTP callback, gpointer user_data)
951 {
952   g_return_if_fail (RTP_IS_SESSION (sess));
953
954   sess->callbacks.process_rtp = callback;
955   sess->process_rtp_user_data = user_data;
956 }
957
958 /**
959  * rtp_session_set_send_rtp_callback:
960  * @sess: an #RTPSession
961  * @callback: callback to set
962  * @user_data: user data passed in the callback
963  *
964  * Configure only the send_rtp callback to be notified of the send_rtp action.
965  */
966 void
967 rtp_session_set_send_rtp_callback (RTPSession * sess,
968     RTPSessionSendRTP callback, gpointer user_data)
969 {
970   g_return_if_fail (RTP_IS_SESSION (sess));
971
972   sess->callbacks.send_rtp = callback;
973   sess->send_rtp_user_data = user_data;
974 }
975
976 /**
977  * rtp_session_set_send_rtcp_callback:
978  * @sess: an #RTPSession
979  * @callback: callback to set
980  * @user_data: user data passed in the callback
981  *
982  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
983  */
984 void
985 rtp_session_set_send_rtcp_callback (RTPSession * sess,
986     RTPSessionSendRTCP callback, gpointer user_data)
987 {
988   g_return_if_fail (RTP_IS_SESSION (sess));
989
990   sess->callbacks.send_rtcp = callback;
991   sess->send_rtcp_user_data = user_data;
992 }
993
994 /**
995  * rtp_session_set_sync_rtcp_callback:
996  * @sess: an #RTPSession
997  * @callback: callback to set
998  * @user_data: user data passed in the callback
999  *
1000  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
1001  */
1002 void
1003 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
1004     RTPSessionSyncRTCP callback, gpointer user_data)
1005 {
1006   g_return_if_fail (RTP_IS_SESSION (sess));
1007
1008   sess->callbacks.sync_rtcp = callback;
1009   sess->sync_rtcp_user_data = user_data;
1010 }
1011
1012 /**
1013  * rtp_session_set_clock_rate_callback:
1014  * @sess: an #RTPSession
1015  * @callback: callback to set
1016  * @user_data: user data passed in the callback
1017  *
1018  * Configure only the clock_rate callback to be notified of the clock_rate action.
1019  */
1020 void
1021 rtp_session_set_clock_rate_callback (RTPSession * sess,
1022     RTPSessionClockRate callback, gpointer user_data)
1023 {
1024   g_return_if_fail (RTP_IS_SESSION (sess));
1025
1026   sess->callbacks.clock_rate = callback;
1027   sess->clock_rate_user_data = user_data;
1028 }
1029
1030 /**
1031  * rtp_session_set_reconsider_callback:
1032  * @sess: an #RTPSession
1033  * @callback: callback to set
1034  * @user_data: user data passed in the callback
1035  *
1036  * Configure only the reconsider callback to be notified of the reconsider action.
1037  */
1038 void
1039 rtp_session_set_reconsider_callback (RTPSession * sess,
1040     RTPSessionReconsider callback, gpointer user_data)
1041 {
1042   g_return_if_fail (RTP_IS_SESSION (sess));
1043
1044   sess->callbacks.reconsider = callback;
1045   sess->reconsider_user_data = user_data;
1046 }
1047
1048 /**
1049  * rtp_session_set_request_time_callback:
1050  * @sess: an #RTPSession
1051  * @callback: callback to set
1052  * @user_data: user data passed in the callback
1053  *
1054  * Configure only the request_time callback
1055  */
1056 void
1057 rtp_session_set_request_time_callback (RTPSession * sess,
1058     RTPSessionRequestTime callback, gpointer user_data)
1059 {
1060   g_return_if_fail (RTP_IS_SESSION (sess));
1061
1062   sess->callbacks.request_time = callback;
1063   sess->request_time_user_data = user_data;
1064 }
1065
1066 /**
1067  * rtp_session_set_bandwidth:
1068  * @sess: an #RTPSession
1069  * @bandwidth: the bandwidth allocated
1070  *
1071  * Set the session bandwidth in bytes per second.
1072  */
1073 void
1074 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1075 {
1076   g_return_if_fail (RTP_IS_SESSION (sess));
1077
1078   RTP_SESSION_LOCK (sess);
1079   sess->stats.bandwidth = bandwidth;
1080   RTP_SESSION_UNLOCK (sess);
1081 }
1082
1083 /**
1084  * rtp_session_get_bandwidth:
1085  * @sess: an #RTPSession
1086  *
1087  * Get the session bandwidth.
1088  *
1089  * Returns: the session bandwidth.
1090  */
1091 gdouble
1092 rtp_session_get_bandwidth (RTPSession * sess)
1093 {
1094   gdouble result;
1095
1096   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1097
1098   RTP_SESSION_LOCK (sess);
1099   result = sess->stats.bandwidth;
1100   RTP_SESSION_UNLOCK (sess);
1101
1102   return result;
1103 }
1104
1105 /**
1106  * rtp_session_set_rtcp_fraction:
1107  * @sess: an #RTPSession
1108  * @bandwidth: the RTCP bandwidth
1109  *
1110  * Set the bandwidth in bytes per second that should be used for RTCP
1111  * messages.
1112  */
1113 void
1114 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1115 {
1116   g_return_if_fail (RTP_IS_SESSION (sess));
1117
1118   RTP_SESSION_LOCK (sess);
1119   sess->stats.rtcp_bandwidth = bandwidth;
1120   RTP_SESSION_UNLOCK (sess);
1121 }
1122
1123 /**
1124  * rtp_session_get_rtcp_fraction:
1125  * @sess: an #RTPSession
1126  *
1127  * Get the session bandwidth used for RTCP.
1128  *
1129  * Returns: The bandwidth used for RTCP messages.
1130  */
1131 gdouble
1132 rtp_session_get_rtcp_fraction (RTPSession * sess)
1133 {
1134   gdouble result;
1135
1136   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1137
1138   RTP_SESSION_LOCK (sess);
1139   result = sess->stats.rtcp_bandwidth;
1140   RTP_SESSION_UNLOCK (sess);
1141
1142   return result;
1143 }
1144
1145 /**
1146  * rtp_session_get_sdes_struct:
1147  * @sess: an #RTSPSession
1148  *
1149  * Get the SDES data as a #GstStructure
1150  *
1151  * Returns: a GstStructure with SDES items for @sess. This function returns a
1152  * copy of the SDES structure, use gst_structure_free() after usage.
1153  */
1154 GstStructure *
1155 rtp_session_get_sdes_struct (RTPSession * sess)
1156 {
1157   GstStructure *result = NULL;
1158
1159   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1160
1161   RTP_SESSION_LOCK (sess);
1162   if (sess->sdes)
1163     result = gst_structure_copy (sess->sdes);
1164   RTP_SESSION_UNLOCK (sess);
1165
1166   return result;
1167 }
1168
1169 /**
1170  * rtp_session_set_sdes_struct:
1171  * @sess: an #RTSPSession
1172  * @sdes: a #GstStructure
1173  *
1174  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1175  */
1176 void
1177 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1178 {
1179   g_return_if_fail (sdes);
1180   g_return_if_fail (RTP_IS_SESSION (sess));
1181
1182   RTP_SESSION_LOCK (sess);
1183   if (sess->sdes)
1184     gst_structure_free (sess->sdes);
1185   sess->sdes = gst_structure_copy (sdes);
1186   RTP_SESSION_UNLOCK (sess);
1187 }
1188
1189 static GstFlowReturn
1190 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1191 {
1192   GstFlowReturn result = GST_FLOW_OK;
1193
1194   if (source->internal) {
1195     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1196
1197     RTP_SESSION_UNLOCK (session);
1198
1199     if (session->callbacks.send_rtp)
1200       result =
1201           session->callbacks.send_rtp (session, source, data,
1202           session->send_rtp_user_data);
1203     else {
1204       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1205     }
1206   } else {
1207     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1208     RTP_SESSION_UNLOCK (session);
1209
1210     if (session->callbacks.process_rtp)
1211       result =
1212           session->callbacks.process_rtp (session, source,
1213           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1214     else
1215       gst_buffer_unref (GST_BUFFER_CAST (data));
1216   }
1217   RTP_SESSION_LOCK (session);
1218
1219   return result;
1220 }
1221
1222 static gint
1223 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1224 {
1225   gint result;
1226
1227   RTP_SESSION_UNLOCK (session);
1228
1229   if (session->callbacks.clock_rate)
1230     result =
1231         session->callbacks.clock_rate (session, pt,
1232         session->clock_rate_user_data);
1233   else
1234     result = -1;
1235
1236   RTP_SESSION_LOCK (session);
1237
1238   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1239
1240   return result;
1241 }
1242
1243 static RTPSourceCallbacks callbacks = {
1244   (RTPSourcePushRTP) source_push_rtp,
1245   (RTPSourceClockRate) source_clock_rate,
1246 };
1247
1248
1249 /**
1250  * rtp_session_find_conflicting_address:
1251  * @session: The session the packet came in
1252  * @address: address to check for
1253  * @time: The time when the packet that is possibly in conflict arrived
1254  *
1255  * Checks if an address which has a conflict is already known. If it is
1256  * a known conflict, remember the time
1257  *
1258  * Returns: TRUE if it was a known conflict, FALSE otherwise
1259  */
1260 static gboolean
1261 rtp_session_find_conflicting_address (RTPSession * session,
1262     GSocketAddress * address, GstClockTime time)
1263 {
1264   return find_conflicting_address (session->conflicting_addresses, address,
1265       time);
1266 }
1267
1268 /**
1269  * rtp_session_add_conflicting_address:
1270  * @session: The session the packet came in
1271  * @address: address to remember
1272  * @time: The time when the packet that is in conflict arrived
1273  *
1274  * Adds a new conflict address
1275  */
1276 static void
1277 rtp_session_add_conflicting_address (RTPSession * sess,
1278     GSocketAddress * address, GstClockTime time)
1279 {
1280   sess->conflicting_addresses =
1281       add_conflicting_address (sess->conflicting_addresses, address, time);
1282 }
1283
1284
1285 static gboolean
1286 check_collision (RTPSession * sess, RTPSource * source,
1287     RTPPacketInfo * pinfo, gboolean rtp)
1288 {
1289   guint32 ssrc;
1290
1291   /* If we have no pinfo address, we can't do collision checking */
1292   if (!pinfo->address)
1293     return FALSE;
1294
1295   ssrc = rtp_source_get_ssrc (source);
1296
1297   if (!source->internal) {
1298     GSocketAddress *from;
1299
1300     /* This is not our local source, but lets check if two remote
1301      * source collide */
1302     if (rtp) {
1303       from = source->rtp_from;
1304     } else {
1305       from = source->rtcp_from;
1306     }
1307
1308     if (from) {
1309       if (__g_socket_address_equal (from, pinfo->address)) {
1310         /* Address is the same */
1311         return FALSE;
1312       } else {
1313         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1314         if (sess->favor_new) {
1315           if (rtp_source_find_conflicting_address (source,
1316                   pinfo->address, pinfo->current_time)) {
1317             gchar *buf1;
1318
1319             buf1 = __g_socket_address_to_string (pinfo->address);
1320             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1321                 buf1);
1322             g_free (buf1);
1323
1324             return TRUE;
1325           } else {
1326             gchar *buf1, *buf2;
1327
1328             /* Current address is not a known conflict, lets assume this is
1329              * a new source. Save old address in possible conflict list
1330              */
1331             rtp_source_add_conflicting_address (source, from,
1332                 pinfo->current_time);
1333
1334             buf1 = __g_socket_address_to_string (from);
1335             buf2 = __g_socket_address_to_string (pinfo->address);
1336
1337             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1338                 " saving old as known conflict", ssrc, buf1, buf2);
1339
1340             if (rtp)
1341               rtp_source_set_rtp_from (source, pinfo->address);
1342             else
1343               rtp_source_set_rtcp_from (source, pinfo->address);
1344
1345             g_free (buf1);
1346             g_free (buf2);
1347
1348             return FALSE;
1349           }
1350         } else {
1351           /* Don't need to save old addresses, we ignore new sources */
1352           return TRUE;
1353         }
1354       }
1355     } else {
1356       /* We don't already have a from address for RTP, just set it */
1357       if (rtp)
1358         rtp_source_set_rtp_from (source, pinfo->address);
1359       else
1360         rtp_source_set_rtcp_from (source, pinfo->address);
1361       return FALSE;
1362     }
1363
1364     /* FIXME: Log 3rd party collision somehow
1365      * Maybe should be done in upper layer, only the SDES can tell us
1366      * if its a collision or a loop
1367      */
1368   } else {
1369     /* This is sending with our ssrc, is it an address we already know */
1370     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1371             pinfo->current_time)) {
1372       /* Its a known conflict, its probably a loop, not a collision
1373        * lets just drop the incoming packet
1374        */
1375       GST_DEBUG ("Our packets are being looped back to us, dropping");
1376     } else {
1377       /* Its a new collision, lets change our SSRC */
1378       rtp_session_add_conflicting_address (sess, pinfo->address,
1379           pinfo->current_time);
1380
1381       GST_DEBUG ("Collision for SSRC %x", ssrc);
1382       /* mark the source BYE */
1383       rtp_source_mark_bye (source, "SSRC Collision");
1384       /* if we were suggesting this SSRC, change to something else */
1385       if (sess->suggested_ssrc == ssrc)
1386         sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1387
1388       on_ssrc_collision (sess, source);
1389
1390       rtp_session_schedule_bye_locked (sess, pinfo->current_time);
1391     }
1392   }
1393
1394   return TRUE;
1395 }
1396
1397 typedef struct
1398 {
1399   gboolean is_doing_ptp;
1400   GSocketAddress *new_addr;
1401 } CompareAddrData;
1402
1403 /* check if the two given ip addr are the same (do not care about the port) */
1404 static gboolean
1405 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1406 {
1407   return
1408       g_inet_address_equal (g_inet_socket_address_get_address
1409       (G_INET_SOCKET_ADDRESS (a)),
1410       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1411 }
1412
1413 static void
1414 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1415     CompareAddrData * data)
1416 {
1417   /* only compare ip addr of remote sources which are also not closing */
1418   if (!source->internal && !source->closing && source->rtp_from) {
1419     /* look for the first rtp source */
1420     if (!data->new_addr)
1421       data->new_addr = source->rtp_from;
1422     /* compare current ip addr with the first one */
1423     else
1424       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1425   }
1426 }
1427
1428 static void
1429 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1430     CompareAddrData * data)
1431 {
1432   /* only compare ip addr of remote sources which are also not closing */
1433   if (!source->internal && !source->closing && source->rtcp_from) {
1434     /* look for the first rtcp source */
1435     if (!data->new_addr)
1436       data->new_addr = source->rtcp_from;
1437     else
1438       /* compare current ip addr with the first one */
1439       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1440   }
1441 }
1442
1443 /* loop over our non-internal source to know if the session
1444  * is doing point-to-point */
1445 static void
1446 session_update_ptp (RTPSession * sess)
1447 {
1448   /* to know if the session is doing point to point, the ip addr
1449    * of each non-internal (=remotes) source have to be compared
1450    * to each other.
1451    */
1452   gboolean is_doing_rtp_ptp;
1453   gboolean is_doing_rtcp_ptp;
1454   CompareAddrData data;
1455
1456   /* compare the first remote source's ip addr that receive rtp packets
1457    * with other remote rtp source.
1458    * it's enough because the session just needs to know if they are all
1459    * equals or not
1460    */
1461   data.is_doing_ptp = TRUE;
1462   data.new_addr = NULL;
1463   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1464       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1465   is_doing_rtp_ptp = data.is_doing_ptp;
1466
1467   /* same but about rtcp */
1468   data.is_doing_ptp = TRUE;
1469   data.new_addr = NULL;
1470   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1471       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1472   is_doing_rtcp_ptp = data.is_doing_ptp;
1473
1474   /* the session is doing point-to-point if all rtp remote have the same
1475    * ip addr and if all rtcp remote sources have the same ip addr */
1476   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1477
1478   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1479 }
1480
1481 static void
1482 add_source (RTPSession * sess, RTPSource * src)
1483 {
1484   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1485       GINT_TO_POINTER (src->ssrc), src);
1486   /* report the new source ASAP */
1487   src->generation = sess->generation;
1488   /* we have one more source now */
1489   sess->total_sources++;
1490   if (RTP_SOURCE_IS_ACTIVE (src))
1491     sess->stats.active_sources++;
1492   if (src->internal) {
1493     sess->stats.internal_sources++;
1494     if (sess->suggested_ssrc != src->ssrc)
1495       sess->suggested_ssrc = src->ssrc;
1496   }
1497
1498   /* update point-to-point status */
1499   if (!src->internal)
1500     session_update_ptp (sess);
1501 }
1502
1503 static RTPSource *
1504 find_source (RTPSession * sess, guint32 ssrc)
1505 {
1506   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1507       GINT_TO_POINTER (ssrc));
1508 }
1509
1510 /* must be called with the session lock, the returned source needs to be
1511  * unreffed after usage. */
1512 static RTPSource *
1513 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1514     RTPPacketInfo * pinfo, gboolean rtp)
1515 {
1516   RTPSource *source;
1517
1518   source = find_source (sess, ssrc);
1519   if (source == NULL) {
1520     /* make new Source in probation and insert */
1521     source = rtp_source_new (ssrc);
1522
1523     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1524
1525     /* for RTP packets we need to set the source in probation. Receiving RTCP
1526      * packets of an SSRC, on the other hand, is a strong indication that we
1527      * are dealing with a valid source. */
1528     if (rtp)
1529       g_object_set (source, "probation", sess->probation, NULL);
1530     else
1531       g_object_set (source, "probation", 0, NULL);
1532
1533     /* store from address, if any */
1534     if (pinfo->address) {
1535       if (rtp)
1536         rtp_source_set_rtp_from (source, pinfo->address);
1537       else
1538         rtp_source_set_rtcp_from (source, pinfo->address);
1539     }
1540
1541     /* configure a callback on the source */
1542     rtp_source_set_callbacks (source, &callbacks, sess);
1543
1544     add_source (sess, source);
1545     *created = TRUE;
1546   } else {
1547     *created = FALSE;
1548     /* check for collision, this updates the address when not previously set */
1549     if (check_collision (sess, source, pinfo, rtp)) {
1550       return NULL;
1551     }
1552     /* Receiving RTCP packets of an SSRC is a strong indication that we
1553      * are dealing with a valid source. */
1554     if (!rtp)
1555       g_object_set (source, "probation", 0, NULL);
1556   }
1557   /* update last activity */
1558   source->last_activity = pinfo->current_time;
1559   if (rtp)
1560     source->last_rtp_activity = pinfo->current_time;
1561   g_object_ref (source);
1562
1563   return source;
1564 }
1565
1566 /* must be called with the session lock, the returned source needs to be
1567  * unreffed after usage. */
1568 static RTPSource *
1569 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1570     GstClockTime current_time)
1571 {
1572   RTPSource *source;
1573
1574   source = find_source (sess, ssrc);
1575   if (source == NULL) {
1576     /* make new internal Source and insert */
1577     source = rtp_source_new (ssrc);
1578
1579     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1580
1581     source->validated = TRUE;
1582     source->internal = TRUE;
1583     source->probation = FALSE;
1584     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1585     rtp_source_set_callbacks (source, &callbacks, sess);
1586
1587     add_source (sess, source);
1588     *created = TRUE;
1589   } else {
1590     *created = FALSE;
1591   }
1592   /* update last activity */
1593   if (current_time != GST_CLOCK_TIME_NONE) {
1594     source->last_activity = current_time;
1595     source->last_rtp_activity = current_time;
1596   }
1597   g_object_ref (source);
1598
1599   return source;
1600 }
1601
1602 /**
1603  * rtp_session_suggest_ssrc:
1604  * @sess: a #RTPSession
1605  *
1606  * Suggest an unused SSRC in @sess.
1607  *
1608  * Returns: a free unused SSRC
1609  */
1610 guint32
1611 rtp_session_suggest_ssrc (RTPSession * sess)
1612 {
1613   guint32 result;
1614
1615   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1616
1617   RTP_SESSION_LOCK (sess);
1618   result = sess->suggested_ssrc;
1619   RTP_SESSION_UNLOCK (sess);
1620
1621   return result;
1622 }
1623
1624 /**
1625  * rtp_session_add_source:
1626  * @sess: a #RTPSession
1627  * @src: #RTPSource to add
1628  *
1629  * Add @src to @session.
1630  *
1631  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1632  * existed in the session.
1633  */
1634 gboolean
1635 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1636 {
1637   gboolean result = FALSE;
1638   RTPSource *find;
1639
1640   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1641   g_return_val_if_fail (src != NULL, FALSE);
1642
1643   RTP_SESSION_LOCK (sess);
1644   find = find_source (sess, src->ssrc);
1645   if (find == NULL) {
1646     add_source (sess, src);
1647     result = TRUE;
1648   }
1649   RTP_SESSION_UNLOCK (sess);
1650
1651   return result;
1652 }
1653
1654 /**
1655  * rtp_session_get_num_sources:
1656  * @sess: an #RTPSession
1657  *
1658  * Get the number of sources in @sess.
1659  *
1660  * Returns: The number of sources in @sess.
1661  */
1662 guint
1663 rtp_session_get_num_sources (RTPSession * sess)
1664 {
1665   guint result;
1666
1667   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1668
1669   RTP_SESSION_LOCK (sess);
1670   result = sess->total_sources;
1671   RTP_SESSION_UNLOCK (sess);
1672
1673   return result;
1674 }
1675
1676 /**
1677  * rtp_session_get_num_active_sources:
1678  * @sess: an #RTPSession
1679  *
1680  * Get the number of active sources in @sess. A source is considered active when
1681  * it has been validated and has not yet received a BYE RTCP message.
1682  *
1683  * Returns: The number of active sources in @sess.
1684  */
1685 guint
1686 rtp_session_get_num_active_sources (RTPSession * sess)
1687 {
1688   guint result;
1689
1690   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1691
1692   RTP_SESSION_LOCK (sess);
1693   result = sess->stats.active_sources;
1694   RTP_SESSION_UNLOCK (sess);
1695
1696   return result;
1697 }
1698
1699 /**
1700  * rtp_session_get_source_by_ssrc:
1701  * @sess: an #RTPSession
1702  * @ssrc: an SSRC
1703  *
1704  * Find the source with @ssrc in @sess.
1705  *
1706  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1707  * g_object_unref() after usage.
1708  */
1709 RTPSource *
1710 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1711 {
1712   RTPSource *result;
1713
1714   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1715
1716   RTP_SESSION_LOCK (sess);
1717   result = find_source (sess, ssrc);
1718   if (result != NULL)
1719     g_object_ref (result);
1720   RTP_SESSION_UNLOCK (sess);
1721
1722   return result;
1723 }
1724
1725 /* should be called with the SESSION lock */
1726 static guint32
1727 rtp_session_create_new_ssrc (RTPSession * sess)
1728 {
1729   guint32 ssrc;
1730
1731   while (TRUE) {
1732     ssrc = g_random_int ();
1733
1734     /* see if it exists in the session, we're done if it doesn't */
1735     if (find_source (sess, ssrc) == NULL)
1736       break;
1737   }
1738   return ssrc;
1739 }
1740
1741
1742 /**
1743  * rtp_session_create_source:
1744  * @sess: an #RTPSession
1745  *
1746  * Create an #RTPSource for use in @sess. This function will create a source
1747  * with an ssrc that is currently not used by any participants in the session.
1748  *
1749  * Returns: an #RTPSource.
1750  */
1751 RTPSource *
1752 rtp_session_create_source (RTPSession * sess)
1753 {
1754   guint32 ssrc;
1755   RTPSource *source;
1756
1757   RTP_SESSION_LOCK (sess);
1758   ssrc = rtp_session_create_new_ssrc (sess);
1759   source = rtp_source_new (ssrc);
1760   rtp_source_set_callbacks (source, &callbacks, sess);
1761   /* we need an additional ref for the source in the hashtable */
1762   g_object_ref (source);
1763   add_source (sess, source);
1764   RTP_SESSION_UNLOCK (sess);
1765
1766   return source;
1767 }
1768
1769 static gboolean
1770 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
1771 {
1772   GstNetAddressMeta *meta;
1773
1774   /* get packet size including header overhead */
1775   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
1776   pinfo->packets++;
1777
1778   if (pinfo->rtp) {
1779     GstRTPBuffer rtp = { NULL };
1780
1781     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
1782       goto invalid_packet;
1783
1784     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
1785     if (idx == 0) {
1786       gint i;
1787
1788       /* only keep info for first buffer */
1789       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
1790       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
1791       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
1792       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1793       /* copy available csrc */
1794       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
1795       for (i = 0; i < pinfo->csrc_count; i++)
1796         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
1797     }
1798     gst_rtp_buffer_unmap (&rtp);
1799   }
1800
1801   if (idx == 0) {
1802     /* for netbuffer we can store the IP address to check for collisions */
1803     meta = gst_buffer_get_net_address_meta (*buffer);
1804     if (pinfo->address)
1805       g_object_unref (pinfo->address);
1806     if (meta) {
1807       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
1808     } else {
1809       pinfo->address = NULL;
1810     }
1811   }
1812   return TRUE;
1813
1814   /* ERRORS */
1815 invalid_packet:
1816   {
1817     GST_DEBUG ("invalid RTP packet received");
1818     return FALSE;
1819   }
1820 }
1821
1822 /* update the RTPPacketInfo structure with the current time and other bits
1823  * about the current buffer we are handling.
1824  * This function is typically called when a validated packet is received.
1825  * This function should be called with the SESSION_LOCK
1826  */
1827 static gboolean
1828 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
1829     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
1830     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
1831 {
1832   gboolean res;
1833
1834   pinfo->send = send;
1835   pinfo->rtp = rtp;
1836   pinfo->is_list = is_list;
1837   pinfo->data = data;
1838   pinfo->current_time = current_time;
1839   pinfo->running_time = running_time;
1840   pinfo->ntpnstime = ntpnstime;
1841   pinfo->header_len = sess->header_len;
1842   pinfo->bytes = 0;
1843   pinfo->payload_len = 0;
1844   pinfo->packets = 0;
1845
1846   if (is_list) {
1847     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
1848     res =
1849         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
1850         pinfo);
1851   } else {
1852     GstBuffer *buffer = GST_BUFFER_CAST (data);
1853     res = update_packet (&buffer, 0, pinfo);
1854   }
1855   return res;
1856 }
1857
1858 static void
1859 clean_packet_info (RTPPacketInfo * pinfo)
1860 {
1861   if (pinfo->address)
1862     g_object_unref (pinfo->address);
1863   if (pinfo->data) {
1864     gst_mini_object_unref (pinfo->data);
1865     pinfo->data = NULL;
1866   }
1867 }
1868
1869 static gboolean
1870 source_update_active (RTPSession * sess, RTPSource * source,
1871     gboolean prevactive)
1872 {
1873   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
1874   guint32 ssrc = source->ssrc;
1875
1876   if (prevactive == active)
1877     return FALSE;
1878
1879   if (active) {
1880     sess->stats.active_sources++;
1881     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1882         sess->stats.active_sources);
1883   } else {
1884     sess->stats.active_sources--;
1885     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1886         sess->stats.active_sources);
1887   }
1888   return TRUE;
1889 }
1890
1891 static gboolean
1892 source_update_sender (RTPSession * sess, RTPSource * source,
1893     gboolean prevsender)
1894 {
1895   gboolean sender = RTP_SOURCE_IS_SENDER (source);
1896   guint32 ssrc = source->ssrc;
1897
1898   if (prevsender == sender)
1899     return FALSE;
1900
1901   if (sender) {
1902     sess->stats.sender_sources++;
1903     if (source->internal)
1904       sess->stats.internal_sender_sources++;
1905     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1906         sess->stats.sender_sources);
1907   } else {
1908     sess->stats.sender_sources--;
1909     if (source->internal)
1910       sess->stats.internal_sender_sources--;
1911     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1912         sess->stats.sender_sources);
1913   }
1914   return TRUE;
1915 }
1916
1917 /**
1918  * rtp_session_process_rtp:
1919  * @sess: and #RTPSession
1920  * @buffer: an RTP buffer
1921  * @current_time: the current system time
1922  * @running_time: the running_time of @buffer
1923  *
1924  * Process an RTP buffer in the session manager. This function takes ownership
1925  * of @buffer.
1926  *
1927  * Returns: a #GstFlowReturn.
1928  */
1929 GstFlowReturn
1930 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1931     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
1932 {
1933   GstFlowReturn result;
1934   guint32 ssrc;
1935   RTPSource *source;
1936   gboolean created;
1937   gboolean prevsender, prevactive;
1938   RTPPacketInfo pinfo = { 0, };
1939   guint64 oldrate;
1940
1941   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1942   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1943
1944   RTP_SESSION_LOCK (sess);
1945
1946   /* update pinfo stats */
1947   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
1948           current_time, running_time, ntpnstime)) {
1949     GST_DEBUG ("invalid RTP packet received");
1950     RTP_SESSION_UNLOCK (sess);
1951     return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
1952   }
1953
1954   ssrc = pinfo.ssrc;
1955
1956   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
1957   if (!source)
1958     goto collision;
1959
1960   prevsender = RTP_SOURCE_IS_SENDER (source);
1961   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1962   oldrate = source->bitrate;
1963
1964   /* let source process the packet */
1965   result = rtp_source_process_rtp (source, &pinfo);
1966
1967   /* source became active */
1968   if (source_update_active (sess, source, prevactive))
1969     on_ssrc_validated (sess, source);
1970
1971   source_update_sender (sess, source, prevsender);
1972
1973   if (oldrate != source->bitrate)
1974     sess->recalc_bandwidth = TRUE;
1975
1976   if (created)
1977     on_new_ssrc (sess, source);
1978
1979   if (source->validated) {
1980     gboolean created;
1981     gint i;
1982
1983     /* for validated sources, we add the CSRCs as well */
1984     for (i = 0; i < pinfo.csrc_count; i++) {
1985       guint32 csrc;
1986       RTPSource *csrc_src;
1987
1988       csrc = pinfo.csrcs[i];
1989
1990       /* get source */
1991       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
1992       if (!csrc_src)
1993         continue;
1994
1995       if (created) {
1996         GST_DEBUG ("created new CSRC: %08x", csrc);
1997         rtp_source_set_as_csrc (csrc_src);
1998         source_update_active (sess, csrc_src, FALSE);
1999         on_new_ssrc (sess, csrc_src);
2000       }
2001       g_object_unref (csrc_src);
2002     }
2003   }
2004   g_object_unref (source);
2005
2006   RTP_SESSION_UNLOCK (sess);
2007
2008   clean_packet_info (&pinfo);
2009
2010   return result;
2011
2012   /* ERRORS */
2013 collision:
2014   {
2015     RTP_SESSION_UNLOCK (sess);
2016     clean_packet_info (&pinfo);
2017     GST_DEBUG ("ignoring packet because its collisioning");
2018     return GST_FLOW_OK;
2019   }
2020 }
2021
2022 static void
2023 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2024     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2025 {
2026   guint count, i;
2027
2028   count = gst_rtcp_packet_get_rb_count (packet);
2029   for (i = 0; i < count; i++) {
2030     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2031     guint8 fractionlost;
2032     gint32 packetslost;
2033     RTPSource *src;
2034
2035     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2036         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2037
2038     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2039
2040     /* find our own source */
2041     src = find_source (sess, ssrc);
2042     if (src == NULL)
2043       continue;
2044
2045     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2046       /* only deal with report blocks for our session, we update the stats of
2047        * the sender of the RTCP message. We could also compare our stats against
2048        * the other sender to see if we are better or worse. */
2049       /* FIXME, need to keep track who the RB block is from */
2050       rtp_source_process_rb (source, pinfo->ntpnstime, fractionlost,
2051           packetslost, exthighestseq, jitter, lsr, dlsr);
2052     }
2053   }
2054   on_ssrc_active (sess, source);
2055 }
2056
2057 /* A Sender report contains statistics about how the sender is doing. This
2058  * includes timing informataion such as the relation between RTP and NTP
2059  * timestamps and the number of packets/bytes it sent to us.
2060  *
2061  * In this report is also included a set of report blocks related to how this
2062  * sender is receiving data (in case we (or somebody else) is also sending stuff
2063  * to it). This info includes the packet loss, jitter and seqnum. It also
2064  * contains information to calculate the round trip time (LSR/DLSR).
2065  */
2066 static void
2067 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2068     RTPPacketInfo * pinfo, gboolean * do_sync)
2069 {
2070   guint32 senderssrc, rtptime, packet_count, octet_count;
2071   guint64 ntptime;
2072   RTPSource *source;
2073   gboolean created, prevsender;
2074
2075   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2076       &packet_count, &octet_count);
2077
2078   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2079       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2080
2081   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2082   if (!source)
2083     return;
2084
2085   /* skip non-bye packets for sources that are marked BYE */
2086   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2087     goto out;
2088
2089   /* don't try to do lip-sync for sources that sent a BYE */
2090   if (RTP_SOURCE_IS_MARKED_BYE (source))
2091     *do_sync = FALSE;
2092   else
2093     *do_sync = TRUE;
2094
2095   prevsender = RTP_SOURCE_IS_SENDER (source);
2096
2097   /* first update the source */
2098   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2099       packet_count, octet_count);
2100
2101   source_update_sender (sess, source, prevsender);
2102
2103   if (created)
2104     on_new_ssrc (sess, source);
2105
2106   rtp_session_process_rb (sess, source, packet, pinfo);
2107
2108 out:
2109   g_object_unref (source);
2110 }
2111
2112 /* A receiver report contains statistics about how a receiver is doing. It
2113  * includes stuff like packet loss, jitter and the seqnum it received last. It
2114  * also contains info to calculate the round trip time.
2115  *
2116  * We are only interested in how the sender of this report is doing wrt to us.
2117  */
2118 static void
2119 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2120     RTPPacketInfo * pinfo)
2121 {
2122   guint32 senderssrc;
2123   RTPSource *source;
2124   gboolean created;
2125
2126   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2127
2128   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2129
2130   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2131   if (!source)
2132     return;
2133
2134   /* skip non-bye packets for sources that are marked BYE */
2135   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2136     goto out;
2137
2138   if (created)
2139     on_new_ssrc (sess, source);
2140
2141   rtp_session_process_rb (sess, source, packet, pinfo);
2142
2143 out:
2144   g_object_unref (source);
2145 }
2146
2147 /* Get SDES items and store them in the SSRC */
2148 static void
2149 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2150     RTPPacketInfo * pinfo)
2151 {
2152   guint items, i, j;
2153   gboolean more_items, more_entries;
2154
2155   items = gst_rtcp_packet_sdes_get_item_count (packet);
2156   GST_DEBUG ("got SDES packet with %d items", items);
2157
2158   more_items = gst_rtcp_packet_sdes_first_item (packet);
2159   i = 0;
2160   while (more_items) {
2161     guint32 ssrc;
2162     gboolean changed, created, prevactive;
2163     RTPSource *source;
2164     GstStructure *sdes;
2165
2166     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2167
2168     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2169
2170     changed = FALSE;
2171
2172     /* find src, no probation when dealing with RTCP */
2173     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2174     if (!source)
2175       return;
2176
2177     /* skip non-bye packets for sources that are marked BYE */
2178     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2179       goto next;
2180
2181     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2182
2183     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2184     j = 0;
2185     while (more_entries) {
2186       GstRTCPSDESType type;
2187       guint8 len;
2188       guint8 *data;
2189       gchar *name;
2190       gchar *value;
2191
2192       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2193
2194       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2195           data);
2196
2197       if (type == GST_RTCP_SDES_PRIV) {
2198         name = g_strndup ((const gchar *) &data[1], data[0]);
2199         len -= data[0] + 1;
2200         data += data[0] + 1;
2201       } else {
2202         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2203       }
2204
2205       value = g_strndup ((const gchar *) data, len);
2206
2207       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2208
2209       g_free (name);
2210       g_free (value);
2211
2212       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2213       j++;
2214     }
2215
2216     /* takes ownership of sdes */
2217     changed = rtp_source_set_sdes_struct (source, sdes);
2218
2219     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2220     source->validated = TRUE;
2221
2222     if (created)
2223       on_new_ssrc (sess, source);
2224
2225     /* source became active */
2226     if (source_update_active (sess, source, prevactive))
2227       on_ssrc_validated (sess, source);
2228
2229     if (changed)
2230       on_ssrc_sdes (sess, source);
2231
2232   next:
2233     g_object_unref (source);
2234
2235     more_items = gst_rtcp_packet_sdes_next_item (packet);
2236     i++;
2237   }
2238 }
2239
2240 /* BYE is sent when a client leaves the session
2241  */
2242 static void
2243 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2244     RTPPacketInfo * pinfo)
2245 {
2246   guint count, i;
2247   gchar *reason;
2248   gboolean reconsider = FALSE;
2249
2250   reason = gst_rtcp_packet_bye_get_reason (packet);
2251   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2252
2253   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2254   for (i = 0; i < count; i++) {
2255     guint32 ssrc;
2256     RTPSource *source;
2257     gboolean created, prevactive, prevsender;
2258     guint pmembers, members;
2259
2260     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2261     GST_DEBUG ("SSRC: %08x", ssrc);
2262
2263     /* find src and mark bye, no probation when dealing with RTCP */
2264     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2265     if (!source)
2266       return;
2267
2268     if (source->internal) {
2269       /* our own source, something weird with this packet */
2270       g_object_unref (source);
2271       continue;
2272     }
2273
2274     /* store time for when we need to time out this source */
2275     source->bye_time = pinfo->current_time;
2276
2277     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2278     prevsender = RTP_SOURCE_IS_SENDER (source);
2279
2280     /* mark the source BYE */
2281     rtp_source_mark_bye (source, reason);
2282
2283     pmembers = sess->stats.active_sources;
2284
2285     source_update_active (sess, source, prevactive);
2286     source_update_sender (sess, source, prevsender);
2287
2288     members = sess->stats.active_sources;
2289
2290     if (!sess->scheduled_bye && members < pmembers) {
2291       /* some members went away since the previous timeout estimate.
2292        * Perform reverse reconsideration but only when we are not scheduling a
2293        * BYE ourselves. */
2294       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2295           pinfo->current_time < sess->next_rtcp_check_time) {
2296         GstClockTime time_remaining;
2297
2298         time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2299         sess->next_rtcp_check_time =
2300             gst_util_uint64_scale (time_remaining, members, pmembers);
2301
2302         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2303             GST_TIME_ARGS (sess->next_rtcp_check_time));
2304
2305         sess->next_rtcp_check_time += pinfo->current_time;
2306
2307         /* mark pending reconsider. We only want to signal the reconsideration
2308          * once after we handled all the source in the bye packet */
2309         reconsider = TRUE;
2310       }
2311     }
2312
2313     if (created)
2314       on_new_ssrc (sess, source);
2315
2316     on_bye_ssrc (sess, source);
2317
2318     g_object_unref (source);
2319   }
2320   if (reconsider) {
2321     RTP_SESSION_UNLOCK (sess);
2322     /* notify app of reconsideration */
2323     if (sess->callbacks.reconsider)
2324       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2325     RTP_SESSION_LOCK (sess);
2326   }
2327   g_free (reason);
2328 }
2329
2330 static void
2331 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2332     RTPPacketInfo * pinfo)
2333 {
2334   GST_DEBUG ("received APP");
2335 }
2336
2337 static gboolean
2338 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2339     gboolean fir, GstClockTime current_time)
2340 {
2341   guint32 round_trip = 0;
2342
2343   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
2344
2345   if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2346     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2347         GST_SECOND, 65536);
2348
2349     if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
2350       GST_DEBUG ("Ignoring %s request because one was send without one "
2351           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2352           fir ? "FIR" : "PLI",
2353           GST_TIME_ARGS (current_time - sess->last_keyframe_request),
2354           GST_TIME_ARGS (round_trip_in_ns));;
2355       return FALSE;
2356     }
2357   }
2358
2359   sess->last_keyframe_request = current_time;
2360
2361   GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
2362       rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
2363       sess->callbacks.request_key_unit);
2364
2365   RTP_SESSION_UNLOCK (sess);
2366   sess->callbacks.request_key_unit (sess, fir,
2367       sess->request_key_unit_user_data);
2368   RTP_SESSION_LOCK (sess);
2369
2370   return TRUE;
2371 }
2372
2373 static void
2374 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2375     guint32 media_ssrc, GstClockTime current_time)
2376 {
2377   RTPSource *src;
2378
2379   if (!sess->callbacks.request_key_unit)
2380     return;
2381
2382   src = find_source (sess, sender_ssrc);
2383   if (src == NULL)
2384     return;
2385
2386   rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
2387 }
2388
2389 static void
2390 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2391     guint8 * fci_data, guint fci_length, GstClockTime current_time)
2392 {
2393   RTPSource *src;
2394   guint32 ssrc;
2395   guint position = 0;
2396   gboolean our_request = FALSE;
2397
2398   if (!sess->callbacks.request_key_unit)
2399     return;
2400
2401   if (fci_length < 8)
2402     return;
2403
2404   src = find_source (sess, sender_ssrc);
2405
2406   /* Hack because Google fails to set the sender_ssrc correctly */
2407   if (!src && sender_ssrc == 1) {
2408     GHashTableIter iter;
2409
2410     /* we can't find the source if there are multiple */
2411     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2412       return;
2413
2414     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2415     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2416       if (!src->internal && rtp_source_is_sender (src))
2417         break;
2418       src = NULL;
2419     }
2420   }
2421   if (!src)
2422     return;
2423
2424   for (position = 0; position < fci_length; position += 8) {
2425     guint8 *data = fci_data + position;
2426     RTPSource *own;
2427
2428     ssrc = GST_READ_UINT32_BE (data);
2429
2430     own = find_source (sess, ssrc);
2431     if (own == NULL)
2432       continue;
2433
2434     if (own->internal) {
2435       our_request = TRUE;
2436       break;
2437     }
2438   }
2439   if (!our_request)
2440     return;
2441
2442   rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
2443 }
2444
2445 static void
2446 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2447     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2448     GstClockTime current_time)
2449 {
2450   sess->stats.nacks_received++;
2451
2452   if (!sess->callbacks.notify_nack)
2453     return;
2454
2455   while (fci_length > 0) {
2456     guint16 seqnum, blp;
2457
2458     seqnum = GST_READ_UINT16_BE (fci_data);
2459     blp = GST_READ_UINT16_BE (fci_data + 2);
2460
2461     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2462
2463     RTP_SESSION_UNLOCK (sess);
2464     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2465         sess->notify_nack_user_data);
2466     RTP_SESSION_LOCK (sess);
2467
2468     fci_data += 4;
2469     fci_length -= 4;
2470   }
2471 }
2472
2473 static void
2474 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2475     RTPPacketInfo * pinfo, GstClockTime current_time)
2476 {
2477   GstRTCPType type = gst_rtcp_packet_get_type (packet);
2478   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
2479   guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2480   guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2481   guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
2482   guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
2483   RTPSource *src;
2484
2485   src = find_source (sess, media_ssrc);
2486
2487   /* skip non-bye packets for sources that are marked BYE */
2488   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
2489     return;
2490
2491   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2492       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2493
2494   if (g_signal_has_handler_pending (sess,
2495           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2496     GstBuffer *fci_buffer = NULL;
2497
2498     if (fci_length > 0) {
2499       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2500           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2501           fci_length);
2502       GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time;
2503     }
2504
2505     RTP_SESSION_UNLOCK (sess);
2506     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2507         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2508     RTP_SESSION_LOCK (sess);
2509
2510     if (fci_buffer)
2511       gst_buffer_unref (fci_buffer);
2512   }
2513
2514   if (src && sess->rtcp_feedback_retention_window) {
2515     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
2516   }
2517
2518   if ((src && src->internal) ||
2519       /* PSFB FIR puts the media ssrc inside the FCI */
2520       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
2521     switch (type) {
2522       case GST_RTCP_TYPE_PSFB:
2523         switch (fbtype) {
2524           case GST_RTCP_PSFB_TYPE_PLI:
2525             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2526                 current_time);
2527             break;
2528           case GST_RTCP_PSFB_TYPE_FIR:
2529             rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
2530                 current_time);
2531             break;
2532           default:
2533             break;
2534         }
2535         break;
2536       case GST_RTCP_TYPE_RTPFB:
2537         switch (fbtype) {
2538           case GST_RTCP_RTPFB_TYPE_NACK:
2539             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
2540                 fci_data, fci_length, current_time);
2541             break;
2542           default:
2543             break;
2544         }
2545       default:
2546         break;
2547     }
2548   }
2549 }
2550
2551 /**
2552  * rtp_session_process_rtcp:
2553  * @sess: and #RTPSession
2554  * @buffer: an RTCP buffer
2555  * @current_time: the current system time
2556  * @ntpnstime: the current NTP time in nanoseconds
2557  *
2558  * Process an RTCP buffer in the session manager. This function takes ownership
2559  * of @buffer.
2560  *
2561  * Returns: a #GstFlowReturn.
2562  */
2563 GstFlowReturn
2564 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2565     GstClockTime current_time, guint64 ntpnstime)
2566 {
2567   GstRTCPPacket packet;
2568   gboolean more, is_bye = FALSE, do_sync = FALSE;
2569   RTPPacketInfo pinfo = { 0, };
2570   GstFlowReturn result = GST_FLOW_OK;
2571   GstRTCPBuffer rtcp = { NULL, };
2572
2573   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2574   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2575
2576   if (!gst_rtcp_buffer_validate (buffer))
2577     goto invalid_packet;
2578
2579   GST_DEBUG ("received RTCP packet");
2580
2581   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
2582       buffer);
2583
2584   RTP_SESSION_LOCK (sess);
2585   /* update pinfo stats */
2586   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
2587       -1, ntpnstime);
2588
2589   /* start processing the compound packet */
2590   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2591   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
2592   while (more) {
2593     GstRTCPType type;
2594
2595     type = gst_rtcp_packet_get_type (&packet);
2596
2597     switch (type) {
2598       case GST_RTCP_TYPE_SR:
2599         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
2600         break;
2601       case GST_RTCP_TYPE_RR:
2602         rtp_session_process_rr (sess, &packet, &pinfo);
2603         break;
2604       case GST_RTCP_TYPE_SDES:
2605         rtp_session_process_sdes (sess, &packet, &pinfo);
2606         break;
2607       case GST_RTCP_TYPE_BYE:
2608         is_bye = TRUE;
2609         /* don't try to attempt lip-sync anymore for streams with a BYE */
2610         do_sync = FALSE;
2611         rtp_session_process_bye (sess, &packet, &pinfo);
2612         break;
2613       case GST_RTCP_TYPE_APP:
2614         rtp_session_process_app (sess, &packet, &pinfo);
2615         break;
2616       case GST_RTCP_TYPE_RTPFB:
2617       case GST_RTCP_TYPE_PSFB:
2618         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
2619         break;
2620       default:
2621         GST_WARNING ("got unknown RTCP packet");
2622         break;
2623     }
2624     more = gst_rtcp_packet_move_to_next (&packet);
2625   }
2626
2627   gst_rtcp_buffer_unmap (&rtcp);
2628
2629   /* if we are scheduling a BYE, we only want to count bye packets, else we
2630    * count everything */
2631   if (sess->scheduled_bye && is_bye) {
2632     sess->bye_stats.bye_members++;
2633     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
2634   }
2635
2636   /* keep track of average packet size */
2637   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2638
2639   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2640       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2641   RTP_SESSION_UNLOCK (sess);
2642
2643   pinfo.data = NULL;
2644   clean_packet_info (&pinfo);
2645
2646   /* notify caller of sr packets in the callback */
2647   if (do_sync && sess->callbacks.sync_rtcp) {
2648     result = sess->callbacks.sync_rtcp (sess, buffer,
2649         sess->sync_rtcp_user_data);
2650   } else
2651     gst_buffer_unref (buffer);
2652
2653   return result;
2654
2655   /* ERRORS */
2656 invalid_packet:
2657   {
2658     GST_DEBUG ("invalid RTCP packet received");
2659     gst_buffer_unref (buffer);
2660     return GST_FLOW_OK;
2661   }
2662 }
2663
2664 /**
2665  * rtp_session_update_send_caps:
2666  * @sess: an #RTPSession
2667  * @caps: a #GstCaps
2668  *
2669  * Update the caps of the sender in the rtp session.
2670  */
2671 void
2672 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
2673 {
2674   GstStructure *s;
2675   guint ssrc;
2676
2677   g_return_if_fail (RTP_IS_SESSION (sess));
2678   g_return_if_fail (GST_IS_CAPS (caps));
2679
2680   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
2681
2682   s = gst_caps_get_structure (caps, 0);
2683
2684   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
2685     RTPSource *source;
2686     gboolean created;
2687
2688     RTP_SESSION_LOCK (sess);
2689     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
2690     if (source) {
2691       rtp_source_update_caps (source, caps);
2692       g_object_unref (source);
2693     }
2694     RTP_SESSION_UNLOCK (sess);
2695   }
2696 }
2697
2698 /**
2699  * rtp_session_send_rtp:
2700  * @sess: an #RTPSession
2701  * @data: pointer to either an RTP buffer or a list of RTP buffers
2702  * @is_list: TRUE when @data is a buffer list
2703  * @current_time: the current system time
2704  * @running_time: the running time of @data
2705  *
2706  * Send the RTP buffer in the session manager. This function takes ownership of
2707  * @buffer.
2708  *
2709  * Returns: a #GstFlowReturn.
2710  */
2711 GstFlowReturn
2712 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2713     GstClockTime current_time, GstClockTime running_time)
2714 {
2715   GstFlowReturn result;
2716   RTPSource *source;
2717   gboolean prevsender;
2718   guint64 oldrate;
2719   RTPPacketInfo pinfo = { 0, };
2720   gboolean created;
2721
2722   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2723   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2724
2725   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2726
2727   RTP_SESSION_LOCK (sess);
2728   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
2729           current_time, running_time, -1))
2730     goto invalid_packet;
2731
2732   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
2733
2734   prevsender = RTP_SOURCE_IS_SENDER (source);
2735   oldrate = source->bitrate;
2736
2737   /* we use our own source to send */
2738   result = rtp_source_send_rtp (source, &pinfo);
2739
2740   source_update_sender (sess, source, prevsender);
2741
2742   if (oldrate != source->bitrate)
2743     sess->recalc_bandwidth = TRUE;
2744   RTP_SESSION_UNLOCK (sess);
2745
2746   g_object_unref (source);
2747   clean_packet_info (&pinfo);
2748
2749   return result;
2750
2751 invalid_packet:
2752   {
2753     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2754     RTP_SESSION_UNLOCK (sess);
2755     GST_DEBUG ("invalid RTP packet received");
2756     return GST_FLOW_OK;
2757   }
2758 }
2759
2760 static void
2761 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2762 {
2763   *bandwidth += source->bitrate;
2764 }
2765
2766 /* must be called with session lock */
2767 static GstClockTime
2768 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2769     gboolean first)
2770 {
2771   GstClockTime result;
2772   RTPSessionStats *stats;
2773
2774   /* recalculate bandwidth when it changed */
2775   if (sess->recalc_bandwidth) {
2776     gdouble bandwidth;
2777
2778     if (sess->bandwidth > 0)
2779       bandwidth = sess->bandwidth;
2780     else {
2781       /* If it is <= 0, then try to estimate the actual bandwidth */
2782       bandwidth = 0;
2783
2784       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2785           (GHFunc) add_bitrates, &bandwidth);
2786       bandwidth /= 8.0;
2787     }
2788     if (bandwidth < 8000)
2789       bandwidth = RTP_STATS_BANDWIDTH;
2790
2791     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2792         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2793
2794     sess->recalc_bandwidth = FALSE;
2795   }
2796
2797   if (sess->scheduled_bye) {
2798     stats = &sess->bye_stats;
2799     result = rtp_stats_calculate_bye_interval (stats);
2800   } else {
2801     stats = &sess->stats;
2802     result = rtp_stats_calculate_rtcp_interval (stats,
2803         stats->internal_sender_sources > 0, first);
2804   }
2805
2806   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2807       GST_TIME_ARGS (result), first);
2808
2809   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2810     result = rtp_stats_add_rtcp_jitter (stats, result);
2811
2812   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2813
2814   return result;
2815 }
2816
2817 static void
2818 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
2819 {
2820   if (source->internal)
2821     rtp_source_mark_bye (source, reason);
2822 }
2823
2824 /**
2825  * rtp_session_mark_all_bye:
2826  * @sess: an #RTPSession
2827  * @reason: a reason
2828  *
2829  * Mark all internal sources of the session as BYE with @reason.
2830  */
2831 void
2832 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
2833 {
2834   g_return_if_fail (RTP_IS_SESSION (sess));
2835
2836   RTP_SESSION_LOCK (sess);
2837   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2838       (GHFunc) source_mark_bye, (gpointer) reason);
2839   RTP_SESSION_UNLOCK (sess);
2840 }
2841
2842 /* Stop the current @sess and schedule a BYE message for the other members.
2843  * One must have the session lock to call this function
2844  */
2845 static GstFlowReturn
2846 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
2847 {
2848   GstFlowReturn result = GST_FLOW_OK;
2849   GstClockTime interval;
2850
2851   /* nothing to do it we already scheduled bye */
2852   if (sess->scheduled_bye)
2853     goto done;
2854
2855   /* we schedule BYE now */
2856   sess->scheduled_bye = TRUE;
2857   /* at least one member wants to send a BYE */
2858   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
2859   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
2860   sess->bye_stats.bye_members = 1;
2861   sess->first_rtcp = TRUE;
2862   sess->allow_early = TRUE;
2863
2864   /* reschedule transmission */
2865   sess->last_rtcp_send_time = current_time;
2866   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2867
2868   if (interval != GST_CLOCK_TIME_NONE)
2869     sess->next_rtcp_check_time = current_time + interval;
2870   else
2871     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
2872
2873   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2874       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2875
2876   RTP_SESSION_UNLOCK (sess);
2877   /* notify app of reconsideration */
2878   if (sess->callbacks.reconsider)
2879     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2880   RTP_SESSION_LOCK (sess);
2881 done:
2882
2883   return result;
2884 }
2885
2886 /**
2887  * rtp_session_schedule_bye:
2888  * @sess: an #RTPSession
2889  * @current_time: the current system time
2890  *
2891  * Schedule a BYE message for all sources marked as BYE in @sess.
2892  *
2893  * Returns: a #GstFlowReturn.
2894  */
2895 GstFlowReturn
2896 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
2897 {
2898   GstFlowReturn result;
2899
2900   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2901
2902   RTP_SESSION_LOCK (sess);
2903   result = rtp_session_schedule_bye_locked (sess, current_time);
2904   RTP_SESSION_UNLOCK (sess);
2905
2906   return result;
2907 }
2908
2909 /**
2910  * rtp_session_next_timeout:
2911  * @sess: an #RTPSession
2912  * @current_time: the current system time
2913  *
2914  * Get the next time we should perform session maintenance tasks.
2915  *
2916  * Returns: a time when rtp_session_on_timeout() should be called with the
2917  * current system time.
2918  */
2919 GstClockTime
2920 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2921 {
2922   GstClockTime result, interval = 0;
2923
2924   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2925
2926   RTP_SESSION_LOCK (sess);
2927
2928   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
2929     GST_DEBUG ("have early rtcp time");
2930     result = sess->next_early_rtcp_time;
2931     goto early_exit;
2932   }
2933
2934   result = sess->next_rtcp_check_time;
2935
2936   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2937       ", next time: %" GST_TIME_FORMAT,
2938       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2939
2940   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
2941     GST_DEBUG ("take current time as base");
2942     /* our previous check time expired, start counting from the current time
2943      * again. */
2944     result = current_time;
2945   }
2946
2947   if (sess->scheduled_bye) {
2948     if (sess->bye_stats.active_sources >= 50) {
2949       GST_DEBUG ("reconsider BYE, more than 50 sources");
2950       /* reconsider BYE if members >= 50 */
2951       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2952     }
2953   } else {
2954     if (sess->first_rtcp) {
2955       GST_DEBUG ("first RTCP packet");
2956       /* we are called for the first time */
2957       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2958     } else if (sess->next_rtcp_check_time < current_time) {
2959       GST_DEBUG ("old check time expired, getting new timeout");
2960       /* get a new timeout when we need to */
2961       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2962     }
2963   }
2964
2965   if (interval != GST_CLOCK_TIME_NONE)
2966     result += interval;
2967   else
2968     result = GST_CLOCK_TIME_NONE;
2969
2970   sess->next_rtcp_check_time = result;
2971
2972 early_exit:
2973
2974   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2975       ", next time: %" GST_TIME_FORMAT,
2976       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2977   RTP_SESSION_UNLOCK (sess);
2978
2979   return result;
2980 }
2981
2982 typedef struct
2983 {
2984   RTPSource *source;
2985   gboolean is_bye;
2986   GstBuffer *buffer;
2987 } ReportOutput;
2988
2989 typedef struct
2990 {
2991   GstRTCPBuffer rtcpbuf;
2992   RTPSession *sess;
2993   RTPSource *source;
2994   guint num_to_report;
2995   gboolean have_fir;
2996   gboolean have_pli;
2997   gboolean have_nack;
2998   GstBuffer *rtcp;
2999   GstClockTime current_time;
3000   guint64 ntpnstime;
3001   GstClockTime running_time;
3002   GstClockTime interval;
3003   GstRTCPPacket packet;
3004   gboolean has_sdes;
3005   gboolean is_early;
3006   gboolean may_suppress;
3007   GQueue output;
3008   guint nacked_seqnums;
3009 } ReportData;
3010
3011 static void
3012 session_start_rtcp (RTPSession * sess, ReportData * data)
3013 {
3014   GstRTCPPacket *packet = &data->packet;
3015   RTPSource *own = data->source;
3016   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3017
3018   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
3019   data->has_sdes = FALSE;
3020
3021   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3022
3023   if (RTP_SOURCE_IS_SENDER (own)) {
3024     guint64 ntptime;
3025     guint32 rtptime;
3026     guint32 packet_count, octet_count;
3027
3028     /* we are a sender, create SR */
3029     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3030     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3031
3032     /* get latest stats */
3033     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3034         &ntptime, &rtptime, &packet_count, &octet_count);
3035     /* store stats */
3036     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3037         packet_count, octet_count);
3038
3039     /* fill in sender report info */
3040     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3041         ntptime, rtptime, packet_count, octet_count);
3042   } else {
3043     /* we are only receiver, create RR */
3044     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3045     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3046     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3047   }
3048 }
3049
3050 /* construct a Sender or Receiver Report */
3051 static void
3052 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3053 {
3054   RTPSession *sess = data->sess;
3055   GstRTCPPacket *packet = &data->packet;
3056   guint8 fractionlost;
3057   gint32 packetslost;
3058   guint32 exthighestseq, jitter;
3059   guint32 lsr, dlsr;
3060
3061   /* don't report for sources in future generations */
3062   if (((gint16) (source->generation - sess->generation)) > 0) {
3063     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3064         source->generation, sess->generation);
3065     return;
3066   }
3067
3068   if (g_hash_table_contains (source->reported_in_sr_of,
3069           GUINT_TO_POINTER (data->source->ssrc))) {
3070     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3071     return;
3072   }
3073
3074   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3075     GST_DEBUG ("max RB count reached");
3076     return;
3077   }
3078
3079   /* only report about other sender */
3080   if (source == data->source)
3081     goto reported;
3082
3083   if (!RTP_SOURCE_IS_SENDER (source)) {
3084     GST_DEBUG ("source %08x not sender", source->ssrc);
3085     goto reported;
3086   }
3087
3088   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3089
3090   /* get new stats */
3091   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3092       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3093
3094   /* store last generated RR packet */
3095   source->last_rr.is_valid = TRUE;
3096   source->last_rr.fractionlost = fractionlost;
3097   source->last_rr.packetslost = packetslost;
3098   source->last_rr.exthighestseq = exthighestseq;
3099   source->last_rr.jitter = jitter;
3100   source->last_rr.lsr = lsr;
3101   source->last_rr.dlsr = dlsr;
3102
3103   /* packet is not yet filled, add report block for this source. */
3104   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3105       exthighestseq, jitter, lsr, dlsr);
3106
3107 reported:
3108   g_hash_table_add (source->reported_in_sr_of,
3109       GUINT_TO_POINTER (data->source->ssrc));
3110 }
3111
3112 /* construct FIR */
3113 static void
3114 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3115 {
3116   GstRTCPPacket *packet = &data->packet;
3117   guint16 len;
3118   guint8 *fci_data;
3119
3120   if (!source->send_fir)
3121     return;
3122
3123   len = gst_rtcp_packet_fb_get_fci_length (packet);
3124   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3125     /* exit because the packet is full, will put next request in a
3126      * further packet */
3127     return;
3128
3129   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3130
3131   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3132   fci_data += 4;
3133   fci_data[0] = source->current_send_fir_seqnum;
3134   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3135
3136   source->send_fir = FALSE;
3137 }
3138
3139 static void
3140 session_fir (RTPSession * sess, ReportData * data)
3141 {
3142   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3143   GstRTCPPacket *packet = &data->packet;
3144
3145   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3146     return;
3147
3148   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3149   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3150   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3151
3152   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3153       (GHFunc) session_add_fir, data);
3154
3155   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3156     gst_rtcp_packet_remove (packet);
3157   else
3158     data->may_suppress = FALSE;
3159 }
3160
3161 static gboolean
3162 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3163 {
3164   GstRTCPPacket packet;
3165   GstRTCPBuffer rtcp = { NULL, };
3166   gboolean ret = FALSE;
3167
3168   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3169
3170   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3171     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3172         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3173       ret = TRUE;
3174   }
3175
3176   gst_rtcp_buffer_unmap (&rtcp);
3177
3178   return ret;
3179 }
3180
3181 /* construct PLI */
3182 static void
3183 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3184 {
3185   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3186   GstRTCPPacket *packet = &data->packet;
3187
3188   if (!source->send_pli)
3189     return;
3190
3191   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3192     return;
3193
3194   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3195     /* exit because the packet is full, will put next request in a
3196      * further packet */
3197     return;
3198
3199   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
3200   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3201   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3202
3203   source->send_pli = FALSE;
3204   data->may_suppress = FALSE;
3205 }
3206
3207 /* construct NACK */
3208 static void
3209 session_nack (const gchar * key, RTPSource * source, ReportData * data)
3210 {
3211   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3212   GstRTCPPacket *packet = &data->packet;
3213   guint32 *nacks;
3214   guint n_nacks, i;
3215   guint8 *fci_data;
3216
3217   if (!source->send_nack)
3218     return;
3219
3220   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
3221     /* exit because the packet is full, will put next request in a
3222      * further packet */
3223     return;
3224
3225   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
3226   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3227   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3228
3229   nacks = rtp_source_get_nacks (source, &n_nacks);
3230   GST_DEBUG ("%u NACKs", n_nacks);
3231   if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
3232     return;
3233
3234   fci_data = gst_rtcp_packet_fb_get_fci (packet);
3235   for (i = 0; i < n_nacks; i++) {
3236     GST_WRITE_UINT32_BE (fci_data, nacks[i]);
3237     fci_data += 4;
3238     data->nacked_seqnums++;
3239   }
3240
3241   rtp_source_clear_nacks (source);
3242   data->may_suppress = FALSE;
3243 }
3244
3245 /* perform cleanup of sources that timed out */
3246 static void
3247 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
3248 {
3249   gboolean remove = FALSE;
3250   gboolean byetimeout = FALSE;
3251   gboolean sendertimeout = FALSE;
3252   gboolean is_sender, is_active;
3253   RTPSession *sess = data->sess;
3254   GstClockTime interval, binterval;
3255   GstClockTime btime;
3256
3257   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
3258
3259   /* check for outdated collisions */
3260   if (source->internal) {
3261     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
3262     rtp_source_timeout (source, data->current_time,
3263         data->running_time - sess->rtcp_feedback_retention_window);
3264   }
3265
3266   /* nothing else to do when without RTCP */
3267   if (data->interval == GST_CLOCK_TIME_NONE)
3268     return;
3269
3270   is_sender = RTP_SOURCE_IS_SENDER (source);
3271   is_active = RTP_SOURCE_IS_ACTIVE (source);
3272
3273   /* our own rtcp interval may have been forced low by secondary configuration,
3274    * while sender side may still operate with higher interval,
3275    * so do not just take our interval to decide on timing out sender,
3276    * but take (if data->interval <= 5 * GST_SECOND):
3277    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
3278    * where sender_interval is difference between last 2 received RTCP reports
3279    */
3280   if (data->interval >= 5 * GST_SECOND || source->internal) {
3281     binterval = data->interval;
3282   } else {
3283     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
3284         GST_TIME_ARGS (source->stats.prev_rtcptime),
3285         GST_TIME_ARGS (source->stats.last_rtcptime));
3286     /* if not received enough yet, fallback to larger default */
3287     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
3288       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
3289     else
3290       binterval = 5 * GST_SECOND;
3291     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
3292   }
3293   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
3294       GST_TIME_ARGS (binterval));
3295
3296   if (!source->internal && source->marked_bye) {
3297     /* if we received a BYE from the source, remove the source after some
3298      * time. */
3299     if (data->current_time > source->bye_time &&
3300         data->current_time - source->bye_time > sess->stats.bye_timeout) {
3301       GST_DEBUG ("removing BYE source %08x", source->ssrc);
3302       remove = TRUE;
3303       byetimeout = TRUE;
3304     }
3305   }
3306
3307   if (source->internal && source->sent_bye) {
3308     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
3309     remove = TRUE;
3310   }
3311
3312   /* sources that were inactive for more than 5 times the deterministic reporting
3313    * interval get timed out. the min timeout is 5 seconds. */
3314   /* mind old time that might pre-date last time going to PLAYING */
3315   btime = MAX (source->last_activity, sess->start_time);
3316   if (data->current_time > btime) {
3317     interval = MAX (binterval * 5, 5 * GST_SECOND);
3318     if (data->current_time - btime > interval) {
3319       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
3320           source->ssrc, GST_TIME_ARGS (btime));
3321       if (source->internal) {
3322         /* this is an internal source that is not using our suggested ssrc.
3323          * since there must be another source using this ssrc, we can remove
3324          * this one instead of making it a receiver forever */
3325         if (source->ssrc != sess->suggested_ssrc) {
3326           rtp_source_mark_bye (source, "timed out");
3327           /* do not schedule bye here, since we are inside the RTCP timeout
3328            * processing and scheduling bye will interfere with SR/RR sending */
3329         }
3330       } else {
3331         remove = TRUE;
3332       }
3333     }
3334   }
3335
3336   /* senders that did not send for a long time become a receiver, this also
3337    * holds for our own sources. */
3338   if (is_sender) {
3339     /* mind old time that might pre-date last time going to PLAYING */
3340     btime = MAX (source->last_rtp_activity, sess->start_time);
3341     if (data->current_time > btime) {
3342       interval = MAX (binterval * 2, 5 * GST_SECOND);
3343       if (data->current_time - btime > interval) {
3344         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
3345             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3346         sendertimeout = TRUE;
3347       }
3348     }
3349   }
3350
3351   if (remove) {
3352     sess->total_sources--;
3353     if (is_sender) {
3354       sess->stats.sender_sources--;
3355       if (source->internal)
3356         sess->stats.internal_sender_sources--;
3357     }
3358     if (is_active)
3359       sess->stats.active_sources--;
3360
3361     if (source->internal)
3362       sess->stats.internal_sources--;
3363
3364     if (byetimeout)
3365       on_bye_timeout (sess, source);
3366     else
3367       on_timeout (sess, source);
3368   } else {
3369     if (sendertimeout) {
3370       source->is_sender = FALSE;
3371       sess->stats.sender_sources--;
3372       if (source->internal)
3373         sess->stats.internal_sender_sources--;
3374
3375       on_sender_timeout (sess, source);
3376     }
3377     /* count how many source to report in this generation */
3378     if (((gint16) (source->generation - sess->generation)) <= 0)
3379       data->num_to_report++;
3380   }
3381   source->closing = remove;
3382 }
3383
3384 static void
3385 session_sdes (RTPSession * sess, ReportData * data)
3386 {
3387   GstRTCPPacket *packet = &data->packet;
3388   const GstStructure *sdes;
3389   gint i, n_fields;
3390   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3391
3392   /* add SDES packet */
3393   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
3394
3395   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
3396
3397   sdes = rtp_source_get_sdes_struct (data->source);
3398
3399   /* add all fields in the structure, the order is not important. */
3400   n_fields = gst_structure_n_fields (sdes);
3401   for (i = 0; i < n_fields; ++i) {
3402     const gchar *field;
3403     const gchar *value;
3404     GstRTCPSDESType type;
3405
3406     field = gst_structure_nth_field_name (sdes, i);
3407     if (field == NULL)
3408       continue;
3409     value = gst_structure_get_string (sdes, field);
3410     if (value == NULL)
3411       continue;
3412     type = gst_rtcp_sdes_name_to_type (field);
3413
3414     /* Early packets are minimal and only include the CNAME */
3415     if (data->is_early && type != GST_RTCP_SDES_CNAME)
3416       continue;
3417
3418     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
3419       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
3420           (const guint8 *) value);
3421     } else if (type == GST_RTCP_SDES_PRIV) {
3422       gsize prefix_len;
3423       gsize value_len;
3424       gsize data_len;
3425       guint8 data[256];
3426
3427       /* don't accept entries that are too big */
3428       prefix_len = strlen (field);
3429       if (prefix_len > 255)
3430         continue;
3431       value_len = strlen (value);
3432       if (value_len > 255)
3433         continue;
3434       data_len = 1 + prefix_len + value_len;
3435       if (data_len > 255)
3436         continue;
3437
3438       data[0] = prefix_len;
3439       memcpy (&data[1], field, prefix_len);
3440       memcpy (&data[1 + prefix_len], value, value_len);
3441
3442       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
3443     }
3444   }
3445
3446   data->has_sdes = TRUE;
3447 }
3448
3449 /* schedule a BYE packet */
3450 static void
3451 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
3452 {
3453   GstRTCPPacket *packet = &data->packet;
3454   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3455
3456   /* add SDES */
3457   session_sdes (sess, data);
3458   /* add a BYE packet */
3459   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
3460   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
3461   if (source->bye_reason)
3462     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
3463
3464   /* we have a BYE packet now */
3465   source->sent_bye = TRUE;
3466 }
3467
3468 static gboolean
3469 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
3470 {
3471   GstClockTime new_send_time;
3472   GstClockTime interval;
3473   RTPSessionStats *stats;
3474
3475   if (sess->scheduled_bye)
3476     stats = &sess->bye_stats;
3477   else
3478     stats = &sess->stats;
3479
3480   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3481     data->is_early = TRUE;
3482   else
3483     data->is_early = FALSE;
3484
3485   if (data->is_early && sess->next_early_rtcp_time < current_time) {
3486     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
3487         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
3488         GST_TIME_ARGS (current_time));
3489     goto early;
3490   }
3491
3492   /* no need to check yet */
3493   if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
3494       sess->next_rtcp_check_time > current_time) {
3495     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
3496         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
3497         GST_TIME_ARGS (current_time));
3498     return FALSE;
3499   }
3500
3501 early:
3502
3503   /* take interval and add jitter */
3504   interval = data->interval;
3505   if (interval != GST_CLOCK_TIME_NONE)
3506     interval = rtp_stats_add_rtcp_jitter (stats, interval);
3507
3508   if (sess->last_rtcp_send_time != GST_CLOCK_TIME_NONE) {
3509     /* perform forward reconsideration */
3510     if (interval != GST_CLOCK_TIME_NONE) {
3511       GstClockTime elapsed;
3512
3513       /* get elapsed time since we last reported */
3514       elapsed = current_time - sess->last_rtcp_send_time;
3515
3516       GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
3517           GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
3518       new_send_time = interval + sess->last_rtcp_send_time;
3519     } else {
3520       new_send_time = sess->last_rtcp_send_time;
3521     }
3522   } else {
3523     /* If this is the first RTCP packet, we can reconsider anything based
3524      * on the last RTCP send time because there was none.
3525      */
3526     g_warn_if_fail (!data->is_early);
3527     data->is_early = FALSE;
3528     new_send_time = current_time;
3529   }
3530
3531   if (!data->is_early) {
3532     /* check if reconsideration */
3533     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
3534       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
3535           GST_TIME_ARGS (new_send_time));
3536       /* store new check time */
3537       sess->next_rtcp_check_time = new_send_time;
3538       return FALSE;
3539     }
3540     sess->next_rtcp_check_time = current_time + interval;
3541   } else if (interval != GST_CLOCK_TIME_NONE) {
3542     /* Apply the rules from RFC 4585 section 3.5.3 */
3543     if (stats->min_interval != 0 && !sess->first_rtcp) {
3544       GstClockTime T_rr_current_interval =
3545           g_random_double_range (0.5, 1.5) * stats->min_interval;
3546
3547       /* This will caused the RTCP to be suppressed if no FB packets are added */
3548       if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) {
3549         GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
3550             " last: %" GST_TIME_FORMAT
3551             " + T_rr_current_interval: %" GST_TIME_FORMAT
3552             " >  new_send_time: %" GST_TIME_FORMAT,
3553             GST_TIME_ARGS (stats->min_interval),
3554             GST_TIME_ARGS (sess->last_rtcp_send_time),
3555             GST_TIME_ARGS (T_rr_current_interval),
3556             GST_TIME_ARGS (new_send_time));
3557         data->may_suppress = TRUE;
3558       }
3559     }
3560   }
3561
3562   GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
3563       GST_TIME_ARGS (new_send_time));
3564
3565   return TRUE;
3566 }
3567
3568 static void
3569 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
3570 {
3571   g_hash_table_insert (hash_table, key, g_object_ref (source));
3572 }
3573
3574 static gboolean
3575 remove_closing_sources (const gchar * key, RTPSource * source,
3576     ReportData * data)
3577 {
3578   if (source->closing)
3579     return TRUE;
3580
3581   if (source->send_fir)
3582     data->have_fir = TRUE;
3583   if (source->send_pli)
3584     data->have_pli = TRUE;
3585   if (source->send_nack)
3586     data->have_nack = TRUE;
3587
3588   return FALSE;
3589 }
3590
3591 static void
3592 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
3593 {
3594   RTPSession *sess = data->sess;
3595   gboolean is_bye = FALSE;
3596   ReportOutput *output;
3597
3598   /* only generate RTCP for active internal sources */
3599   if (!source->internal || source->sent_bye)
3600     return;
3601
3602   /* ignore other sources when we do the timeout after a scheduled BYE */
3603   if (sess->scheduled_bye && !source->marked_bye)
3604     return;
3605
3606   data->source = source;
3607
3608   /* open packet */
3609   session_start_rtcp (sess, data);
3610
3611   if (source->marked_bye) {
3612     /* send BYE */
3613     make_source_bye (sess, source, data);
3614     is_bye = TRUE;
3615   } else if (!data->is_early) {
3616     /* loop over all known sources and add report blocks. If we are early, we
3617      * just make a minimal RTCP packet and skip this step */
3618     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3619         (GHFunc) session_report_blocks, data);
3620   }
3621   if (!data->has_sdes)
3622     session_sdes (sess, data);
3623
3624   if (data->have_fir)
3625     session_fir (sess, data);
3626
3627   if (data->have_pli)
3628     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3629         (GHFunc) session_pli, data);
3630
3631   if (data->have_nack)
3632     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3633         (GHFunc) session_nack, data);
3634
3635   gst_rtcp_buffer_unmap (&data->rtcpbuf);
3636
3637   output = g_slice_new (ReportOutput);
3638   output->source = g_object_ref (source);
3639   output->is_bye = is_bye;
3640   output->buffer = data->rtcp;
3641   /* queue the RTCP packet to push later */
3642   g_queue_push_tail (&data->output, output);
3643 }
3644
3645 static void
3646 update_generation (const gchar * key, RTPSource * source, ReportData * data)
3647 {
3648   RTPSession *sess = data->sess;
3649
3650   if (g_hash_table_size (source->reported_in_sr_of) >=
3651       sess->stats.internal_sources) {
3652     /* source is reported, move to next generation */
3653     source->generation = sess->generation + 1;
3654     g_hash_table_remove_all (source->reported_in_sr_of);
3655
3656     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
3657         source->generation);
3658
3659     /* if we reported all sources in this generation, move to next */
3660     if (--data->num_to_report == 0) {
3661       sess->generation++;
3662       GST_DEBUG ("all reported, generation now %u", sess->generation);
3663     }
3664   }
3665 }
3666
3667 /**
3668  * rtp_session_on_timeout:
3669  * @sess: an #RTPSession
3670  * @current_time: the current system time
3671  * @ntpnstime: the current NTP time in nanoseconds
3672  * @running_time: the current running_time of the pipeline
3673  *
3674  * Perform maintenance actions after the timeout obtained with
3675  * rtp_session_next_timeout() expired.
3676  *
3677  * This function will perform timeouts of receivers and senders, send a BYE
3678  * packet or generate RTCP packets with current session stats.
3679  *
3680  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
3681  * times, for each packet that should be processed.
3682  *
3683  * Returns: a #GstFlowReturn.
3684  */
3685 GstFlowReturn
3686 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
3687     guint64 ntpnstime, GstClockTime running_time)
3688 {
3689   GstFlowReturn result = GST_FLOW_OK;
3690   ReportData data = { GST_RTCP_BUFFER_INIT };
3691   GHashTable *table_copy;
3692   ReportOutput *output;
3693
3694   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3695
3696   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
3697       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3698       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
3699
3700   data.sess = sess;
3701   data.current_time = current_time;
3702   data.ntpnstime = ntpnstime;
3703   data.running_time = running_time;
3704   data.num_to_report = 0;
3705   data.may_suppress = FALSE;
3706   data.nacked_seqnums = 0;
3707   g_queue_init (&data.output);
3708
3709   RTP_SESSION_LOCK (sess);
3710   /* get a new interval, we need this for various cleanups etc */
3711   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
3712
3713   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
3714
3715   /* we need an internal source now */
3716   if (sess->stats.internal_sources == 0) {
3717     RTPSource *source;
3718     gboolean created;
3719
3720     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
3721         current_time);
3722     g_object_unref (source);
3723   }
3724
3725   sess->conflicting_addresses =
3726       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
3727
3728   /* Make a local copy of the hashtable. We need to do this because the
3729    * cleanup stage below releases the session lock. */
3730   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
3731       (GDestroyNotify) g_object_unref);
3732   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3733       (GHFunc) clone_ssrcs_hashtable, table_copy);
3734
3735   /* Clean up the session, mark the source for removing, this might release the
3736    * session lock. */
3737   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
3738   g_hash_table_destroy (table_copy);
3739
3740   /* Now remove the marked sources */
3741   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
3742       (GHRFunc) remove_closing_sources, &data);
3743
3744   /* update point-to-point status */
3745   session_update_ptp (sess);
3746
3747   /* see if we need to generate SR or RR packets */
3748   if (!is_rtcp_time (sess, current_time, &data))
3749     goto done;
3750
3751   GST_DEBUG ("doing RTCP generation %u for %u sources, early %d",
3752       sess->generation, data.num_to_report, data.is_early);
3753
3754   /* generate RTCP for all internal sources */
3755   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3756       (GHFunc) generate_rtcp, &data);
3757
3758   /* update the generation for all the sources that have been reported */
3759   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3760       (GHFunc) update_generation, &data);
3761
3762   /* we keep track of the last report time in order to timeout inactive
3763    * receivers or senders */
3764   if (!data.is_early && !data.may_suppress)
3765     sess->last_rtcp_send_time = data.current_time;
3766   sess->first_rtcp = FALSE;
3767   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
3768   sess->scheduled_bye = FALSE;
3769
3770   /*  RFC 4585 section 3.5.2 step 6 */
3771   if (!data.is_early) {
3772     sess->allow_early = TRUE;
3773   }
3774
3775 done:
3776   RTP_SESSION_UNLOCK (sess);
3777
3778   /* push out the RTCP packets */
3779   while ((output = g_queue_pop_head (&data.output))) {
3780     gboolean do_not_suppress;
3781     GstBuffer *buffer = output->buffer;
3782     RTPSource *source = output->source;
3783
3784     /* Give the user a change to add its own packet */
3785     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
3786         buffer, data.is_early, &do_not_suppress);
3787
3788     if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
3789       guint packet_size;
3790
3791       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
3792
3793       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
3794       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
3795           sess->stats.avg_rtcp_packet_size, packet_size);
3796       result =
3797           sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
3798           sess->send_rtcp_user_data);
3799       sess->stats.nacks_sent += data.nacked_seqnums;
3800     } else {
3801       GST_DEBUG ("freeing packet callback: %p"
3802           " do_not_suppress: %d may_suppress: %d",
3803           sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
3804       sess->stats.nacks_dropped += data.nacked_seqnums;
3805       gst_buffer_unref (buffer);
3806     }
3807     g_object_unref (source);
3808     g_slice_free (ReportOutput, output);
3809   }
3810   return result;
3811 }
3812
3813 /**
3814  * rtp_session_request_early_rtcp:
3815  * @sess: an #RTPSession
3816  * @current_time: the current system time
3817  * @max_delay: maximum delay
3818  *
3819  * Request transmission of early RTCP
3820  *
3821  * Returns: %TRUE if the related RTCP can be scheduled.
3822  */
3823 gboolean
3824 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
3825     GstClockTime max_delay)
3826 {
3827   GstClockTime T_dither_max, T_rr;
3828   gboolean ret;
3829
3830   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
3831
3832   RTP_SESSION_LOCK (sess);
3833
3834   /* Check if already requested */
3835   /*  RFC 4585 section 3.5.2 step 2 */
3836   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3837     GST_LOG_OBJECT (sess, "already have next early rtcp time");
3838     ret = TRUE;
3839     goto end;
3840   }
3841
3842   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
3843     GST_LOG_OBJECT (sess, "no next RTCP check time");
3844     ret = FALSE;
3845     goto end;
3846   }
3847
3848   /* RFC 4585 section 3.5.3 step 1
3849    * If no regular RTCP packet has been sent before, then a regular
3850    * RTCP packet has to be scheduled first and FB messages might be
3851    * included there
3852    */
3853   if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
3854     GST_LOG_OBJECT (sess, "no RTCP sent yet");
3855
3856     if (current_time + max_delay > sess->next_rtcp_check_time) {
3857       GST_LOG_OBJECT (sess,
3858           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
3859           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3860           GST_TIME_ARGS (max_delay),
3861           GST_TIME_ARGS (sess->next_rtcp_check_time));
3862       ret = TRUE;
3863     } else {
3864       ret = FALSE;
3865     }
3866     goto end;
3867   }
3868
3869   T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
3870
3871   /*  RFC 4585 section 3.5.2 step 2b */
3872   /* If the total sources is <=2, then there is only us and one peer */
3873   /* When there is one auxiliary stream the session can still do point
3874    * to point.
3875    */
3876   if (sess->is_doing_ptp) {
3877     T_dither_max = 0;
3878   } else {
3879     /* Divide by 2 because l = 0.5 */
3880     T_dither_max = T_rr;
3881     T_dither_max /= 2;
3882   }
3883
3884   /*  RFC 4585 section 3.5.2 step 3 */
3885   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
3886     GST_LOG_OBJECT (sess, "don't send because of dither");
3887     ret = FALSE;
3888     goto end;
3889   }
3890
3891   /*  RFC 4585 section 3.5.2 step 4a */
3892   if (sess->allow_early == FALSE) {
3893     /* Ignore the request a scheduled packet will be in time anyway */
3894     if (current_time + max_delay > sess->next_rtcp_check_time) {
3895       GST_LOG_OBJECT (sess,
3896           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
3897           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3898           GST_TIME_ARGS (max_delay),
3899           GST_TIME_ARGS (sess->next_rtcp_check_time));
3900       ret = TRUE;
3901     } else {
3902       GST_LOG_OBJECT (sess, "can't allow early feedback");
3903       ret = FALSE;
3904     }
3905     goto end;
3906   }
3907
3908   /*  RFC 4585 section 3.5.2 step 4b */
3909   if (T_dither_max) {
3910     /* Schedule an early transmission later */
3911     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
3912         current_time;
3913   } else {
3914     /* If no dithering, schedule it for NOW */
3915     sess->next_early_rtcp_time = current_time;
3916   }
3917
3918   /*  RFC 4585 section 3.5.2 step 6 */
3919   sess->allow_early = FALSE;
3920   /* Delay next regular RTCP packet to not exceed the short-term
3921    * RTCP bandwidth when using early feedback as compared to
3922    * without */
3923   sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr;
3924   sess->last_rtcp_send_time += T_rr;
3925
3926   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT,
3927       GST_TIME_ARGS (sess->next_early_rtcp_time));
3928   RTP_SESSION_UNLOCK (sess);
3929
3930   /* notify app of need to send packet early
3931    * and therefore of timeout change */
3932   if (sess->callbacks.reconsider)
3933     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3934
3935   return TRUE;
3936
3937 end:
3938
3939   RTP_SESSION_UNLOCK (sess);
3940
3941   return ret;
3942 }
3943
3944 static gboolean
3945 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
3946 {
3947   GstClockTime now;
3948
3949   if (!sess->callbacks.send_rtcp)
3950     return FALSE;
3951
3952   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
3953
3954   return rtp_session_request_early_rtcp (sess, now, max_delay);
3955 }
3956
3957 gboolean
3958 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
3959     gboolean fir, gint count)
3960 {
3961   RTPSource *src;
3962
3963   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
3964     GST_DEBUG ("FIR/PLI not sent");
3965     return FALSE;
3966   }
3967
3968   RTP_SESSION_LOCK (sess);
3969   src = find_source (sess, ssrc);
3970   if (src == NULL)
3971     goto no_source;
3972
3973   if (fir) {
3974     src->send_pli = FALSE;
3975     src->send_fir = TRUE;
3976
3977     if (count == -1 || count != src->last_fir_count)
3978       src->current_send_fir_seqnum++;
3979     src->last_fir_count = count;
3980   } else if (!src->send_fir) {
3981     src->send_pli = TRUE;
3982   }
3983   RTP_SESSION_UNLOCK (sess);
3984
3985   return TRUE;
3986
3987   /* ERRORS */
3988 no_source:
3989   {
3990     RTP_SESSION_UNLOCK (sess);
3991     return FALSE;
3992   }
3993 }
3994
3995 /**
3996  * rtp_session_request_nack:
3997  * @sess: a #RTPSession
3998  * @ssrc: the SSRC
3999  * @seqnum: the missing seqnum
4000  * @max_delay: max delay to request NACK
4001  *
4002  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
4003  *
4004  * Returns: %TRUE if the NACK feedback could be scheduled
4005  */
4006 gboolean
4007 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
4008     GstClockTime max_delay)
4009 {
4010   RTPSource *source;
4011
4012   if (!rtp_session_send_rtcp (sess, max_delay)) {
4013     GST_DEBUG ("NACK not sent");
4014     return FALSE;
4015   }
4016
4017   RTP_SESSION_LOCK (sess);
4018   source = find_source (sess, ssrc);
4019   if (source == NULL)
4020     goto no_source;
4021
4022   GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
4023   rtp_source_register_nack (source, seqnum);
4024   RTP_SESSION_UNLOCK (sess);
4025
4026   return TRUE;
4027
4028   /* ERRORS */
4029 no_source:
4030   {
4031     RTP_SESSION_UNLOCK (sess);
4032     return FALSE;
4033   }
4034 }