2c49fb1eea95ae05a6ccdcc44230b9e57cf0485b
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25
26 #include <gst/rtp/gstrtpbuffer.h>
27 #include <gst/rtp/gstrtcpbuffer.h>
28
29 #include <gst/glib-compat-private.h>
30
31 #include "rtpsession.h"
32
33 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
34 #define GST_CAT_DEFAULT rtp_session_debug
35
36 /* signals and args */
37 enum
38 {
39   SIGNAL_GET_SOURCE_BY_SSRC,
40   SIGNAL_ON_NEW_SSRC,
41   SIGNAL_ON_SSRC_COLLISION,
42   SIGNAL_ON_SSRC_VALIDATED,
43   SIGNAL_ON_SSRC_ACTIVE,
44   SIGNAL_ON_SSRC_SDES,
45   SIGNAL_ON_BYE_SSRC,
46   SIGNAL_ON_BYE_TIMEOUT,
47   SIGNAL_ON_TIMEOUT,
48   SIGNAL_ON_SENDER_TIMEOUT,
49   SIGNAL_ON_SENDING_RTCP,
50   SIGNAL_ON_FEEDBACK_RTCP,
51   SIGNAL_SEND_RTCP,
52   SIGNAL_SEND_RTCP_FULL,
53   LAST_SIGNAL
54 };
55
56 #define DEFAULT_INTERNAL_SOURCE      NULL
57 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
58 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
59 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
60 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
61 #define DEFAULT_RTCP_MTU             1400
62 #define DEFAULT_SDES                 NULL
63 #define DEFAULT_NUM_SOURCES          0
64 #define DEFAULT_NUM_ACTIVE_SOURCES   0
65 #define DEFAULT_SOURCES              NULL
66 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
67 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
68 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
69 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
70
71 enum
72 {
73   PROP_0,
74   PROP_INTERNAL_SSRC,
75   PROP_INTERNAL_SOURCE,
76   PROP_BANDWIDTH,
77   PROP_RTCP_FRACTION,
78   PROP_RTCP_RR_BANDWIDTH,
79   PROP_RTCP_RS_BANDWIDTH,
80   PROP_RTCP_MTU,
81   PROP_SDES,
82   PROP_NUM_SOURCES,
83   PROP_NUM_ACTIVE_SOURCES,
84   PROP_SOURCES,
85   PROP_FAVOR_NEW,
86   PROP_RTCP_MIN_INTERVAL,
87   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
88   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
89   PROP_PROBATION,
90   PROP_STATS,
91   PROP_LAST
92 };
93
94 /* update average packet size */
95 #define INIT_AVG(avg, val) \
96    (avg) = (val);
97 #define UPDATE_AVG(avg, val)            \
98   if ((avg) == 0)                       \
99    (avg) = (val);                       \
100   else                                  \
101    (avg) = ((val) + (15 * (avg))) >> 4;
102
103
104 /* GObject vmethods */
105 static void rtp_session_finalize (GObject * object);
106 static void rtp_session_set_property (GObject * object, guint prop_id,
107     const GValue * value, GParamSpec * pspec);
108 static void rtp_session_get_property (GObject * object, guint prop_id,
109     GValue * value, GParamSpec * pspec);
110
111 static gboolean rtp_session_send_rtcp (RTPSession * sess,
112     GstClockTime max_delay);
113
114 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
115
116 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
117
118 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
119 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
120     gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
121 static RTPSource *obtain_internal_source (RTPSession * sess,
122     guint32 ssrc, gboolean * created, GstClockTime current_time);
123 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
124     GstClockTime current_time);
125 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
126     gboolean deterministic, gboolean first);
127
128 static gboolean
129 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
130     const GValue * handler_return, gpointer data)
131 {
132   if (g_value_get_boolean (handler_return))
133     g_value_set_boolean (return_accu, TRUE);
134
135   return TRUE;
136 }
137
138 static void
139 rtp_session_class_init (RTPSessionClass * klass)
140 {
141   GObjectClass *gobject_class;
142
143   gobject_class = (GObjectClass *) klass;
144
145   gobject_class->finalize = rtp_session_finalize;
146   gobject_class->set_property = rtp_session_set_property;
147   gobject_class->get_property = rtp_session_get_property;
148
149   /**
150    * RTPSession::get-source-by-ssrc:
151    * @session: the object which received the signal
152    * @ssrc: the SSRC of the RTPSource
153    *
154    * Request the #RTPSource object with SSRC @ssrc in @session.
155    */
156   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
157       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
158       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
159           get_source_by_ssrc), NULL, NULL, g_cclosure_marshal_generic,
160       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
161
162   /**
163    * RTPSession::on-new-ssrc:
164    * @session: the object which received the signal
165    * @src: the new RTPSource
166    *
167    * Notify of a new SSRC that entered @session.
168    */
169   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
170       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
171       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
172       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
173       RTP_TYPE_SOURCE);
174   /**
175    * RTPSession::on-ssrc-collision:
176    * @session: the object which received the signal
177    * @src: the #RTPSource that caused a collision
178    *
179    * Notify when we have an SSRC collision
180    */
181   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
182       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
183       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
184       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
185       RTP_TYPE_SOURCE);
186   /**
187    * RTPSession::on-ssrc-validated:
188    * @session: the object which received the signal
189    * @src: the new validated RTPSource
190    *
191    * Notify of a new SSRC that became validated.
192    */
193   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
194       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
195       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
196       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
197       RTP_TYPE_SOURCE);
198   /**
199    * RTPSession::on-ssrc-active:
200    * @session: the object which received the signal
201    * @src: the active RTPSource
202    *
203    * Notify of a SSRC that is active, i.e., sending RTCP.
204    */
205   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
206       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
207       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
208       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
209       RTP_TYPE_SOURCE);
210   /**
211    * RTPSession::on-ssrc-sdes:
212    * @session: the object which received the signal
213    * @src: the RTPSource
214    *
215    * Notify that a new SDES was received for SSRC.
216    */
217   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
218       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
219       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
220       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
221       RTP_TYPE_SOURCE);
222   /**
223    * RTPSession::on-bye-ssrc:
224    * @session: the object which received the signal
225    * @src: the RTPSource that went away
226    *
227    * Notify of an SSRC that became inactive because of a BYE packet.
228    */
229   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
230       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
231       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
232       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
233       RTP_TYPE_SOURCE);
234   /**
235    * RTPSession::on-bye-timeout:
236    * @session: the object which received the signal
237    * @src: the RTPSource that timed out
238    *
239    * Notify of an SSRC that has timed out because of BYE
240    */
241   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
242       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
243       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
244       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
245       RTP_TYPE_SOURCE);
246   /**
247    * RTPSession::on-timeout:
248    * @session: the object which received the signal
249    * @src: the RTPSource that timed out
250    *
251    * Notify of an SSRC that has timed out
252    */
253   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
254       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
255       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
256       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
257       RTP_TYPE_SOURCE);
258   /**
259    * RTPSession::on-sender-timeout:
260    * @session: the object which received the signal
261    * @src: the RTPSource that timed out
262    *
263    * Notify of an SSRC that was a sender but timed out and became a receiver.
264    */
265   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
266       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
267       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
268       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
269       RTP_TYPE_SOURCE);
270
271   /**
272    * RTPSession::on-sending-rtcp
273    * @session: the object which received the signal
274    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
275    * @early: %TRUE if the packet is early, %FALSE if it is regular
276    *
277    * This signal is emitted before sending an RTCP packet, it can be used
278    * to add extra RTCP Packets.
279    *
280    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
281    * if suppressing it is acceptable
282    */
283   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
284       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
285       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
286       accumulate_trues, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2,
287       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
288
289   /**
290    * RTPSession::on-feedback-rtcp:
291    * @session: the object which received the signal
292    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
293    *  %GST_RTCP_TYPE_RTPFB
294    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
295    * @sender_ssrc: The SSRC of the sender
296    * @media_ssrc: The SSRC of the media this refers to
297    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
298    * there was no FCI
299    *
300    * Notify that a RTCP feedback packet has been received
301    */
302   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
303       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
304       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
305       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 5, G_TYPE_UINT,
306       G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, GST_TYPE_BUFFER);
307
308   /**
309    * RTPSession::send-rtcp:
310    * @session: the object which received the signal
311    * @max_delay: The maximum delay after which the feedback will not be useful
312    *  anymore
313    *
314    * Requests that the #RTPSession initiate a new RTCP packet as soon as
315    * possible within the requested delay.
316    */
317   rtp_session_signals[SIGNAL_SEND_RTCP] =
318       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
319       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
320       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
321       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
322
323   /**
324    * RTPSession::send-rtcp-full:
325    * @session: the object which received the signal
326    * @max_delay: The maximum delay after which the feedback will not be useful
327    *  anymore
328    *
329    * Requests that the #RTPSession initiate a new RTCP packet as soon as
330    * possible within the requested delay.
331    *
332    * Returns: TRUE if the new RTCP packet could be scheduled within the
333    * requested delay, FALSE otherwise.
334    *
335    * Since: 1.6
336    */
337   rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
338       g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
339       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
340       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
341       g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
342
343   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
344       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
345           "The internal SSRC used for the session (deprecated)",
346           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347
348   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
349       g_param_spec_object ("internal-source", "Internal Source",
350           "The internal source element of the session (deprecated)",
351           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
352
353   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
354       g_param_spec_double ("bandwidth", "Bandwidth",
355           "The bandwidth of the session (0 for auto-discover)",
356           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
357           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
358
359   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
360       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
361           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
362           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364
365   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
366       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
367           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
368           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
369           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370
371   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
372       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
373           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
374           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
375           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
376
377   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
378       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
379           "The maximum size of the RTCP packets",
380           16, G_MAXINT16, DEFAULT_RTCP_MTU,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382
383   g_object_class_install_property (gobject_class, PROP_SDES,
384       g_param_spec_boxed ("sdes", "SDES",
385           "The SDES items of this session",
386           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
387
388   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
389       g_param_spec_uint ("num-sources", "Num Sources",
390           "The number of sources in the session", 0, G_MAXUINT,
391           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
392
393   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
394       g_param_spec_uint ("num-active-sources", "Num Active Sources",
395           "The number of active sources in the session", 0, G_MAXUINT,
396           DEFAULT_NUM_ACTIVE_SOURCES,
397           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
398   /**
399    * RTPSource::sources
400    *
401    * Get a GValue Array of all sources in the session.
402    *
403    * <example>
404    * <title>Getting the #RTPSources of a session
405    * <programlisting>
406    * {
407    *   GValueArray *arr;
408    *   GValue *val;
409    *   guint i;
410    *
411    *   g_object_get (sess, "sources", &arr, NULL);
412    *
413    *   for (i = 0; i < arr->n_values; i++) {
414    *     RTPSource *source;
415    *
416    *     val = g_value_array_get_nth (arr, i);
417    *     source = g_value_get_object (val);
418    *   }
419    *   g_value_array_free (arr);
420    * }
421    * </programlisting>
422    * </example>
423    */
424   g_object_class_install_property (gobject_class, PROP_SOURCES,
425       g_param_spec_boxed ("sources", "Sources",
426           "An array of all known sources in the session",
427           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
428
429   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
430       g_param_spec_boolean ("favor-new", "Favor new sources",
431           "Resolve SSRC conflict in favor of new sources", FALSE,
432           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433
434   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
435       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
436           "Minimum interval between Regular RTCP packet (in ns)",
437           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
438           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439
440   g_object_class_install_property (gobject_class,
441       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
442       g_param_spec_uint64 ("rtcp-feedback-retention-window",
443           "RTCP Feedback retention window",
444           "Duration during which RTCP Feedback packets are retained (in ns)",
445           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
446           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447
448   g_object_class_install_property (gobject_class,
449       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
450       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
451           "RTCP Immediate Feedback threshold",
452           "The maximum number of members of a RTP session for which immediate"
453           " feedback is used",
454           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
455           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456
457   g_object_class_install_property (gobject_class, PROP_PROBATION,
458       g_param_spec_uint ("probation", "Number of probations",
459           "Consecutive packet sequence numbers to accept the source",
460           0, G_MAXUINT, DEFAULT_PROBATION,
461           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
462
463   /**
464    * RTPSession::stats:
465    *
466    * Various session statistics. This property returns a GstStructure
467    * with name application/x-rtp-session-stats with the following fields:
468    *
469    *  "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
470    *      dropped (due to bandwidth constraints)
471    *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
472    *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
473    *
474    * Since: 1.4
475    */
476   g_object_class_install_property (gobject_class, PROP_STATS,
477       g_param_spec_boxed ("stats", "Statistics",
478           "Various statistics", GST_TYPE_STRUCTURE,
479           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
480
481   klass->get_source_by_ssrc =
482       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
483   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
484
485   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
486 }
487
488 static void
489 rtp_session_init (RTPSession * sess)
490 {
491   gint i;
492   gchar *str;
493
494   g_mutex_init (&sess->lock);
495   sess->key = g_random_int ();
496   sess->mask_idx = 0;
497   sess->mask = 0;
498
499   for (i = 0; i < 32; i++) {
500     sess->ssrcs[i] =
501         g_hash_table_new_full (NULL, NULL, NULL,
502         (GDestroyNotify) g_object_unref);
503   }
504
505   rtp_stats_init_defaults (&sess->stats);
506   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
507   rtp_stats_set_min_interval (&sess->stats,
508       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
509
510   sess->recalc_bandwidth = TRUE;
511   sess->bandwidth = DEFAULT_BANDWIDTH;
512   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
513   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
514   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
515
516   /* default UDP header length */
517   sess->header_len = 28;
518   sess->mtu = DEFAULT_RTCP_MTU;
519
520   sess->probation = DEFAULT_PROBATION;
521
522   /* some default SDES entries */
523   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
524
525   /* we do not want to leak details like the username or hostname here */
526   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
527   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
528   g_free (str);
529
530 #if 0
531   /* we do not want to leak the user's real name here */
532   str = g_strdup_printf ("Anon%u", g_random_int ());
533   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
534   g_free (str);
535 #endif
536
537   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
538
539   /* this is the SSRC we suggest */
540   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
541
542   sess->first_rtcp = TRUE;
543   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
544
545   sess->allow_early = TRUE;
546   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
547   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
548   sess->rtcp_immediate_feedback_threshold =
549       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
550
551   sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
552
553   sess->is_doing_ptp = TRUE;
554 }
555
556 static void
557 rtp_session_finalize (GObject * object)
558 {
559   RTPSession *sess;
560   gint i;
561
562   sess = RTP_SESSION_CAST (object);
563
564   gst_structure_free (sess->sdes);
565
566   g_list_free_full (sess->conflicting_addresses,
567       (GDestroyNotify) rtp_conflicting_address_free);
568
569   for (i = 0; i < 32; i++)
570     g_hash_table_destroy (sess->ssrcs[i]);
571
572   g_mutex_clear (&sess->lock);
573
574   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
575 }
576
577 static void
578 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
579 {
580   GValue value = { 0 };
581
582   g_value_init (&value, RTP_TYPE_SOURCE);
583   g_value_take_object (&value, source);
584   /* copies the value */
585   g_value_array_append (arr, &value);
586 }
587
588 static GValueArray *
589 rtp_session_create_sources (RTPSession * sess)
590 {
591   GValueArray *res;
592   guint size;
593
594   RTP_SESSION_LOCK (sess);
595   /* get number of elements in the table */
596   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
597   /* create the result value array */
598   res = g_value_array_new (size);
599
600   /* and copy all values into the array */
601   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
602   RTP_SESSION_UNLOCK (sess);
603
604   return res;
605 }
606
607 static GstStructure *
608 rtp_session_create_stats (RTPSession * sess)
609 {
610   GstStructure *s;
611
612   s = gst_structure_new ("application/x-rtp-session-stats",
613       "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
614       "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
615       "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
616
617   return s;
618 }
619
620 static void
621 rtp_session_set_property (GObject * object, guint prop_id,
622     const GValue * value, GParamSpec * pspec)
623 {
624   RTPSession *sess;
625
626   sess = RTP_SESSION (object);
627
628   switch (prop_id) {
629     case PROP_INTERNAL_SSRC:
630       RTP_SESSION_LOCK (sess);
631       sess->suggested_ssrc = g_value_get_uint (value);
632       RTP_SESSION_UNLOCK (sess);
633       if (sess->callbacks.reconfigure)
634         sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
635       break;
636     case PROP_BANDWIDTH:
637       RTP_SESSION_LOCK (sess);
638       sess->bandwidth = g_value_get_double (value);
639       sess->recalc_bandwidth = TRUE;
640       RTP_SESSION_UNLOCK (sess);
641       break;
642     case PROP_RTCP_FRACTION:
643       RTP_SESSION_LOCK (sess);
644       sess->rtcp_bandwidth = g_value_get_double (value);
645       sess->recalc_bandwidth = TRUE;
646       RTP_SESSION_UNLOCK (sess);
647       break;
648     case PROP_RTCP_RR_BANDWIDTH:
649       RTP_SESSION_LOCK (sess);
650       sess->rtcp_rr_bandwidth = g_value_get_int (value);
651       sess->recalc_bandwidth = TRUE;
652       RTP_SESSION_UNLOCK (sess);
653       break;
654     case PROP_RTCP_RS_BANDWIDTH:
655       RTP_SESSION_LOCK (sess);
656       sess->rtcp_rs_bandwidth = g_value_get_int (value);
657       sess->recalc_bandwidth = TRUE;
658       RTP_SESSION_UNLOCK (sess);
659       break;
660     case PROP_RTCP_MTU:
661       sess->mtu = g_value_get_uint (value);
662       break;
663     case PROP_SDES:
664       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
665       break;
666     case PROP_FAVOR_NEW:
667       sess->favor_new = g_value_get_boolean (value);
668       break;
669     case PROP_RTCP_MIN_INTERVAL:
670       rtp_stats_set_min_interval (&sess->stats,
671           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
672       /* trigger reconsideration */
673       RTP_SESSION_LOCK (sess);
674       sess->next_rtcp_check_time = 0;
675       RTP_SESSION_UNLOCK (sess);
676       if (sess->callbacks.reconsider)
677         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
678       break;
679     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
680       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
681       break;
682     case PROP_PROBATION:
683       sess->probation = g_value_get_uint (value);
684       break;
685     default:
686       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
687       break;
688   }
689 }
690
691 static void
692 rtp_session_get_property (GObject * object, guint prop_id,
693     GValue * value, GParamSpec * pspec)
694 {
695   RTPSession *sess;
696
697   sess = RTP_SESSION (object);
698
699   switch (prop_id) {
700     case PROP_INTERNAL_SSRC:
701       g_value_set_uint (value, rtp_session_suggest_ssrc (sess));
702       break;
703     case PROP_INTERNAL_SOURCE:
704       /* FIXME, return a random source */
705       g_value_set_object (value, NULL);
706       break;
707     case PROP_BANDWIDTH:
708       g_value_set_double (value, sess->bandwidth);
709       break;
710     case PROP_RTCP_FRACTION:
711       g_value_set_double (value, sess->rtcp_bandwidth);
712       break;
713     case PROP_RTCP_RR_BANDWIDTH:
714       g_value_set_int (value, sess->rtcp_rr_bandwidth);
715       break;
716     case PROP_RTCP_RS_BANDWIDTH:
717       g_value_set_int (value, sess->rtcp_rs_bandwidth);
718       break;
719     case PROP_RTCP_MTU:
720       g_value_set_uint (value, sess->mtu);
721       break;
722     case PROP_SDES:
723       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
724       break;
725     case PROP_NUM_SOURCES:
726       g_value_set_uint (value, rtp_session_get_num_sources (sess));
727       break;
728     case PROP_NUM_ACTIVE_SOURCES:
729       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
730       break;
731     case PROP_SOURCES:
732       g_value_take_boxed (value, rtp_session_create_sources (sess));
733       break;
734     case PROP_FAVOR_NEW:
735       g_value_set_boolean (value, sess->favor_new);
736       break;
737     case PROP_RTCP_MIN_INTERVAL:
738       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
739       break;
740     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
741       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
742       break;
743     case PROP_PROBATION:
744       g_value_set_uint (value, sess->probation);
745       break;
746     case PROP_STATS:
747       g_value_take_boxed (value, rtp_session_create_stats (sess));
748       break;
749     default:
750       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
751       break;
752   }
753 }
754
755 static void
756 on_new_ssrc (RTPSession * sess, RTPSource * source)
757 {
758   g_object_ref (source);
759   RTP_SESSION_UNLOCK (sess);
760   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
761   RTP_SESSION_LOCK (sess);
762   g_object_unref (source);
763 }
764
765 static void
766 on_ssrc_collision (RTPSession * sess, RTPSource * source)
767 {
768   g_object_ref (source);
769   RTP_SESSION_UNLOCK (sess);
770   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
771       source);
772   RTP_SESSION_LOCK (sess);
773   g_object_unref (source);
774 }
775
776 static void
777 on_ssrc_validated (RTPSession * sess, RTPSource * source)
778 {
779   g_object_ref (source);
780   RTP_SESSION_UNLOCK (sess);
781   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
782       source);
783   RTP_SESSION_LOCK (sess);
784   g_object_unref (source);
785 }
786
787 static void
788 on_ssrc_active (RTPSession * sess, RTPSource * source)
789 {
790   g_object_ref (source);
791   RTP_SESSION_UNLOCK (sess);
792   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
793   RTP_SESSION_LOCK (sess);
794   g_object_unref (source);
795 }
796
797 static void
798 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
799 {
800   g_object_ref (source);
801   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
802   RTP_SESSION_UNLOCK (sess);
803   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
804   RTP_SESSION_LOCK (sess);
805   g_object_unref (source);
806 }
807
808 static void
809 on_bye_ssrc (RTPSession * sess, RTPSource * source)
810 {
811   g_object_ref (source);
812   RTP_SESSION_UNLOCK (sess);
813   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
814   RTP_SESSION_LOCK (sess);
815   g_object_unref (source);
816 }
817
818 static void
819 on_bye_timeout (RTPSession * sess, RTPSource * source)
820 {
821   g_object_ref (source);
822   RTP_SESSION_UNLOCK (sess);
823   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
824   RTP_SESSION_LOCK (sess);
825   g_object_unref (source);
826 }
827
828 static void
829 on_timeout (RTPSession * sess, RTPSource * source)
830 {
831   g_object_ref (source);
832   RTP_SESSION_UNLOCK (sess);
833   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
834   RTP_SESSION_LOCK (sess);
835   g_object_unref (source);
836 }
837
838 static void
839 on_sender_timeout (RTPSession * sess, RTPSource * source)
840 {
841   g_object_ref (source);
842   RTP_SESSION_UNLOCK (sess);
843   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
844       source);
845   RTP_SESSION_LOCK (sess);
846   g_object_unref (source);
847 }
848
849 /**
850  * rtp_session_new:
851  *
852  * Create a new session object.
853  *
854  * Returns: a new #RTPSession. g_object_unref() after usage.
855  */
856 RTPSession *
857 rtp_session_new (void)
858 {
859   RTPSession *sess;
860
861   sess = g_object_new (RTP_TYPE_SESSION, NULL);
862
863   return sess;
864 }
865
866 /**
867  * rtp_session_set_callbacks:
868  * @sess: an #RTPSession
869  * @callbacks: callbacks to configure
870  * @user_data: user data passed in the callbacks
871  *
872  * Configure a set of callbacks to be notified of actions.
873  */
874 void
875 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
876     gpointer user_data)
877 {
878   g_return_if_fail (RTP_IS_SESSION (sess));
879
880   if (callbacks->process_rtp) {
881     sess->callbacks.process_rtp = callbacks->process_rtp;
882     sess->process_rtp_user_data = user_data;
883   }
884   if (callbacks->send_rtp) {
885     sess->callbacks.send_rtp = callbacks->send_rtp;
886     sess->send_rtp_user_data = user_data;
887   }
888   if (callbacks->send_rtcp) {
889     sess->callbacks.send_rtcp = callbacks->send_rtcp;
890     sess->send_rtcp_user_data = user_data;
891   }
892   if (callbacks->sync_rtcp) {
893     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
894     sess->sync_rtcp_user_data = user_data;
895   }
896   if (callbacks->clock_rate) {
897     sess->callbacks.clock_rate = callbacks->clock_rate;
898     sess->clock_rate_user_data = user_data;
899   }
900   if (callbacks->reconsider) {
901     sess->callbacks.reconsider = callbacks->reconsider;
902     sess->reconsider_user_data = user_data;
903   }
904   if (callbacks->request_key_unit) {
905     sess->callbacks.request_key_unit = callbacks->request_key_unit;
906     sess->request_key_unit_user_data = user_data;
907   }
908   if (callbacks->request_time) {
909     sess->callbacks.request_time = callbacks->request_time;
910     sess->request_time_user_data = user_data;
911   }
912   if (callbacks->notify_nack) {
913     sess->callbacks.notify_nack = callbacks->notify_nack;
914     sess->notify_nack_user_data = user_data;
915   }
916   if (callbacks->reconfigure) {
917     sess->callbacks.reconfigure = callbacks->reconfigure;
918     sess->reconfigure_user_data = user_data;
919   }
920 }
921
922 /**
923  * rtp_session_set_process_rtp_callback:
924  * @sess: an #RTPSession
925  * @callback: callback to set
926  * @user_data: user data passed in the callback
927  *
928  * Configure only the process_rtp callback to be notified of the process_rtp action.
929  */
930 void
931 rtp_session_set_process_rtp_callback (RTPSession * sess,
932     RTPSessionProcessRTP callback, gpointer user_data)
933 {
934   g_return_if_fail (RTP_IS_SESSION (sess));
935
936   sess->callbacks.process_rtp = callback;
937   sess->process_rtp_user_data = user_data;
938 }
939
940 /**
941  * rtp_session_set_send_rtp_callback:
942  * @sess: an #RTPSession
943  * @callback: callback to set
944  * @user_data: user data passed in the callback
945  *
946  * Configure only the send_rtp callback to be notified of the send_rtp action.
947  */
948 void
949 rtp_session_set_send_rtp_callback (RTPSession * sess,
950     RTPSessionSendRTP callback, gpointer user_data)
951 {
952   g_return_if_fail (RTP_IS_SESSION (sess));
953
954   sess->callbacks.send_rtp = callback;
955   sess->send_rtp_user_data = user_data;
956 }
957
958 /**
959  * rtp_session_set_send_rtcp_callback:
960  * @sess: an #RTPSession
961  * @callback: callback to set
962  * @user_data: user data passed in the callback
963  *
964  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
965  */
966 void
967 rtp_session_set_send_rtcp_callback (RTPSession * sess,
968     RTPSessionSendRTCP callback, gpointer user_data)
969 {
970   g_return_if_fail (RTP_IS_SESSION (sess));
971
972   sess->callbacks.send_rtcp = callback;
973   sess->send_rtcp_user_data = user_data;
974 }
975
976 /**
977  * rtp_session_set_sync_rtcp_callback:
978  * @sess: an #RTPSession
979  * @callback: callback to set
980  * @user_data: user data passed in the callback
981  *
982  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
983  */
984 void
985 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
986     RTPSessionSyncRTCP callback, gpointer user_data)
987 {
988   g_return_if_fail (RTP_IS_SESSION (sess));
989
990   sess->callbacks.sync_rtcp = callback;
991   sess->sync_rtcp_user_data = user_data;
992 }
993
994 /**
995  * rtp_session_set_clock_rate_callback:
996  * @sess: an #RTPSession
997  * @callback: callback to set
998  * @user_data: user data passed in the callback
999  *
1000  * Configure only the clock_rate callback to be notified of the clock_rate action.
1001  */
1002 void
1003 rtp_session_set_clock_rate_callback (RTPSession * sess,
1004     RTPSessionClockRate callback, gpointer user_data)
1005 {
1006   g_return_if_fail (RTP_IS_SESSION (sess));
1007
1008   sess->callbacks.clock_rate = callback;
1009   sess->clock_rate_user_data = user_data;
1010 }
1011
1012 /**
1013  * rtp_session_set_reconsider_callback:
1014  * @sess: an #RTPSession
1015  * @callback: callback to set
1016  * @user_data: user data passed in the callback
1017  *
1018  * Configure only the reconsider callback to be notified of the reconsider action.
1019  */
1020 void
1021 rtp_session_set_reconsider_callback (RTPSession * sess,
1022     RTPSessionReconsider callback, gpointer user_data)
1023 {
1024   g_return_if_fail (RTP_IS_SESSION (sess));
1025
1026   sess->callbacks.reconsider = callback;
1027   sess->reconsider_user_data = user_data;
1028 }
1029
1030 /**
1031  * rtp_session_set_request_time_callback:
1032  * @sess: an #RTPSession
1033  * @callback: callback to set
1034  * @user_data: user data passed in the callback
1035  *
1036  * Configure only the request_time callback
1037  */
1038 void
1039 rtp_session_set_request_time_callback (RTPSession * sess,
1040     RTPSessionRequestTime callback, gpointer user_data)
1041 {
1042   g_return_if_fail (RTP_IS_SESSION (sess));
1043
1044   sess->callbacks.request_time = callback;
1045   sess->request_time_user_data = user_data;
1046 }
1047
1048 /**
1049  * rtp_session_set_bandwidth:
1050  * @sess: an #RTPSession
1051  * @bandwidth: the bandwidth allocated
1052  *
1053  * Set the session bandwidth in bytes per second.
1054  */
1055 void
1056 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
1057 {
1058   g_return_if_fail (RTP_IS_SESSION (sess));
1059
1060   RTP_SESSION_LOCK (sess);
1061   sess->stats.bandwidth = bandwidth;
1062   RTP_SESSION_UNLOCK (sess);
1063 }
1064
1065 /**
1066  * rtp_session_get_bandwidth:
1067  * @sess: an #RTPSession
1068  *
1069  * Get the session bandwidth.
1070  *
1071  * Returns: the session bandwidth.
1072  */
1073 gdouble
1074 rtp_session_get_bandwidth (RTPSession * sess)
1075 {
1076   gdouble result;
1077
1078   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1079
1080   RTP_SESSION_LOCK (sess);
1081   result = sess->stats.bandwidth;
1082   RTP_SESSION_UNLOCK (sess);
1083
1084   return result;
1085 }
1086
1087 /**
1088  * rtp_session_set_rtcp_fraction:
1089  * @sess: an #RTPSession
1090  * @bandwidth: the RTCP bandwidth
1091  *
1092  * Set the bandwidth in bytes per second that should be used for RTCP
1093  * messages.
1094  */
1095 void
1096 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1097 {
1098   g_return_if_fail (RTP_IS_SESSION (sess));
1099
1100   RTP_SESSION_LOCK (sess);
1101   sess->stats.rtcp_bandwidth = bandwidth;
1102   RTP_SESSION_UNLOCK (sess);
1103 }
1104
1105 /**
1106  * rtp_session_get_rtcp_fraction:
1107  * @sess: an #RTPSession
1108  *
1109  * Get the session bandwidth used for RTCP.
1110  *
1111  * Returns: The bandwidth used for RTCP messages.
1112  */
1113 gdouble
1114 rtp_session_get_rtcp_fraction (RTPSession * sess)
1115 {
1116   gdouble result;
1117
1118   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1119
1120   RTP_SESSION_LOCK (sess);
1121   result = sess->stats.rtcp_bandwidth;
1122   RTP_SESSION_UNLOCK (sess);
1123
1124   return result;
1125 }
1126
1127 /**
1128  * rtp_session_get_sdes_struct:
1129  * @sess: an #RTSPSession
1130  *
1131  * Get the SDES data as a #GstStructure
1132  *
1133  * Returns: a GstStructure with SDES items for @sess. This function returns a
1134  * copy of the SDES structure, use gst_structure_free() after usage.
1135  */
1136 GstStructure *
1137 rtp_session_get_sdes_struct (RTPSession * sess)
1138 {
1139   GstStructure *result = NULL;
1140
1141   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1142
1143   RTP_SESSION_LOCK (sess);
1144   if (sess->sdes)
1145     result = gst_structure_copy (sess->sdes);
1146   RTP_SESSION_UNLOCK (sess);
1147
1148   return result;
1149 }
1150
1151 /**
1152  * rtp_session_set_sdes_struct:
1153  * @sess: an #RTSPSession
1154  * @sdes: a #GstStructure
1155  *
1156  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1157  */
1158 void
1159 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1160 {
1161   g_return_if_fail (sdes);
1162   g_return_if_fail (RTP_IS_SESSION (sess));
1163
1164   RTP_SESSION_LOCK (sess);
1165   if (sess->sdes)
1166     gst_structure_free (sess->sdes);
1167   sess->sdes = gst_structure_copy (sdes);
1168   RTP_SESSION_UNLOCK (sess);
1169 }
1170
1171 static GstFlowReturn
1172 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1173 {
1174   GstFlowReturn result = GST_FLOW_OK;
1175
1176   if (source->internal) {
1177     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1178
1179     RTP_SESSION_UNLOCK (session);
1180
1181     if (session->callbacks.send_rtp)
1182       result =
1183           session->callbacks.send_rtp (session, source, data,
1184           session->send_rtp_user_data);
1185     else {
1186       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1187     }
1188   } else {
1189     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1190     RTP_SESSION_UNLOCK (session);
1191
1192     if (session->callbacks.process_rtp)
1193       result =
1194           session->callbacks.process_rtp (session, source,
1195           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1196     else
1197       gst_buffer_unref (GST_BUFFER_CAST (data));
1198   }
1199   RTP_SESSION_LOCK (session);
1200
1201   return result;
1202 }
1203
1204 static gint
1205 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1206 {
1207   gint result;
1208
1209   RTP_SESSION_UNLOCK (session);
1210
1211   if (session->callbacks.clock_rate)
1212     result =
1213         session->callbacks.clock_rate (session, pt,
1214         session->clock_rate_user_data);
1215   else
1216     result = -1;
1217
1218   RTP_SESSION_LOCK (session);
1219
1220   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1221
1222   return result;
1223 }
1224
1225 static RTPSourceCallbacks callbacks = {
1226   (RTPSourcePushRTP) source_push_rtp,
1227   (RTPSourceClockRate) source_clock_rate,
1228 };
1229
1230
1231 /**
1232  * rtp_session_find_conflicting_address:
1233  * @session: The session the packet came in
1234  * @address: address to check for
1235  * @time: The time when the packet that is possibly in conflict arrived
1236  *
1237  * Checks if an address which has a conflict is already known. If it is
1238  * a known conflict, remember the time
1239  *
1240  * Returns: TRUE if it was a known conflict, FALSE otherwise
1241  */
1242 static gboolean
1243 rtp_session_find_conflicting_address (RTPSession * session,
1244     GSocketAddress * address, GstClockTime time)
1245 {
1246   return find_conflicting_address (session->conflicting_addresses, address,
1247       time);
1248 }
1249
1250 /**
1251  * rtp_session_add_conflicting_address:
1252  * @session: The session the packet came in
1253  * @address: address to remember
1254  * @time: The time when the packet that is in conflict arrived
1255  *
1256  * Adds a new conflict address
1257  */
1258 static void
1259 rtp_session_add_conflicting_address (RTPSession * sess,
1260     GSocketAddress * address, GstClockTime time)
1261 {
1262   sess->conflicting_addresses =
1263       add_conflicting_address (sess->conflicting_addresses, address, time);
1264 }
1265
1266
1267 static gboolean
1268 check_collision (RTPSession * sess, RTPSource * source,
1269     RTPPacketInfo * pinfo, gboolean rtp)
1270 {
1271   guint32 ssrc;
1272
1273   /* If we have no pinfo address, we can't do collision checking */
1274   if (!pinfo->address)
1275     return FALSE;
1276
1277   ssrc = rtp_source_get_ssrc (source);
1278
1279   if (!source->internal) {
1280     GSocketAddress *from;
1281
1282     /* This is not our local source, but lets check if two remote
1283      * source collide */
1284     if (rtp) {
1285       from = source->rtp_from;
1286     } else {
1287       from = source->rtcp_from;
1288     }
1289
1290     if (from) {
1291       if (__g_socket_address_equal (from, pinfo->address)) {
1292         /* Address is the same */
1293         return FALSE;
1294       } else {
1295         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1296         if (sess->favor_new) {
1297           if (rtp_source_find_conflicting_address (source,
1298                   pinfo->address, pinfo->current_time)) {
1299             gchar *buf1;
1300
1301             buf1 = __g_socket_address_to_string (pinfo->address);
1302             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1303                 buf1);
1304             g_free (buf1);
1305
1306             return TRUE;
1307           } else {
1308             gchar *buf1, *buf2;
1309
1310             /* Current address is not a known conflict, lets assume this is
1311              * a new source. Save old address in possible conflict list
1312              */
1313             rtp_source_add_conflicting_address (source, from,
1314                 pinfo->current_time);
1315
1316             buf1 = __g_socket_address_to_string (from);
1317             buf2 = __g_socket_address_to_string (pinfo->address);
1318
1319             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1320                 " saving old as known conflict", ssrc, buf1, buf2);
1321
1322             if (rtp)
1323               rtp_source_set_rtp_from (source, pinfo->address);
1324             else
1325               rtp_source_set_rtcp_from (source, pinfo->address);
1326
1327             g_free (buf1);
1328             g_free (buf2);
1329
1330             return FALSE;
1331           }
1332         } else {
1333           /* Don't need to save old addresses, we ignore new sources */
1334           return TRUE;
1335         }
1336       }
1337     } else {
1338       /* We don't already have a from address for RTP, just set it */
1339       if (rtp)
1340         rtp_source_set_rtp_from (source, pinfo->address);
1341       else
1342         rtp_source_set_rtcp_from (source, pinfo->address);
1343       return FALSE;
1344     }
1345
1346     /* FIXME: Log 3rd party collision somehow
1347      * Maybe should be done in upper layer, only the SDES can tell us
1348      * if its a collision or a loop
1349      */
1350   } else {
1351     /* This is sending with our ssrc, is it an address we already know */
1352     if (rtp_session_find_conflicting_address (sess, pinfo->address,
1353             pinfo->current_time)) {
1354       /* Its a known conflict, its probably a loop, not a collision
1355        * lets just drop the incoming packet
1356        */
1357       GST_DEBUG ("Our packets are being looped back to us, dropping");
1358     } else {
1359       /* Its a new collision, lets change our SSRC */
1360       rtp_session_add_conflicting_address (sess, pinfo->address,
1361           pinfo->current_time);
1362
1363       GST_DEBUG ("Collision for SSRC %x", ssrc);
1364       /* mark the source BYE */
1365       rtp_source_mark_bye (source, "SSRC Collision");
1366       /* if we were suggesting this SSRC, change to something else */
1367       if (sess->suggested_ssrc == ssrc)
1368         sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1369
1370       on_ssrc_collision (sess, source);
1371
1372       rtp_session_schedule_bye_locked (sess, pinfo->current_time);
1373     }
1374   }
1375
1376   return TRUE;
1377 }
1378
1379 typedef struct
1380 {
1381   gboolean is_doing_ptp;
1382   GSocketAddress *new_addr;
1383 } CompareAddrData;
1384
1385 /* check if the two given ip addr are the same (do not care about the port) */
1386 static gboolean
1387 ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
1388 {
1389   return
1390       g_inet_address_equal (g_inet_socket_address_get_address
1391       (G_INET_SOCKET_ADDRESS (a)),
1392       g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
1393 }
1394
1395 static void
1396 compare_rtp_source_addr (const gchar * key, RTPSource * source,
1397     CompareAddrData * data)
1398 {
1399   /* only compare ip addr of remote sources which are also not closing */
1400   if (!source->internal && !source->closing && source->rtp_from) {
1401     /* look for the first rtp source */
1402     if (!data->new_addr)
1403       data->new_addr = source->rtp_from;
1404     /* compare current ip addr with the first one */
1405     else
1406       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
1407   }
1408 }
1409
1410 static void
1411 compare_rtcp_source_addr (const gchar * key, RTPSource * source,
1412     CompareAddrData * data)
1413 {
1414   /* only compare ip addr of remote sources which are also not closing */
1415   if (!source->internal && !source->closing && source->rtcp_from) {
1416     /* look for the first rtcp source */
1417     if (!data->new_addr)
1418       data->new_addr = source->rtcp_from;
1419     else
1420       /* compare current ip addr with the first one */
1421       data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
1422   }
1423 }
1424
1425 /* loop over our non-internal source to know if the session
1426  * is doing point-to-point */
1427 static void
1428 session_update_ptp (RTPSession * sess)
1429 {
1430   /* to know if the session is doing point to point, the ip addr
1431    * of each non-internal (=remotes) source have to be compared
1432    * to each other.
1433    */
1434   gboolean is_doing_rtp_ptp;
1435   gboolean is_doing_rtcp_ptp;
1436   CompareAddrData data;
1437
1438   /* compare the first remote source's ip addr that receive rtp packets
1439    * with other remote rtp source.
1440    * it's enough because the session just needs to know if they are all
1441    * equals or not
1442    */
1443   data.is_doing_ptp = TRUE;
1444   data.new_addr = NULL;
1445   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1446       (GHFunc) compare_rtp_source_addr, (gpointer) & data);
1447   is_doing_rtp_ptp = data.is_doing_ptp;
1448
1449   /* same but about rtcp */
1450   data.is_doing_ptp = TRUE;
1451   data.new_addr = NULL;
1452   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
1453       (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
1454   is_doing_rtcp_ptp = data.is_doing_ptp;
1455
1456   /* the session is doing point-to-point if all rtp remote have the same
1457    * ip addr and if all rtcp remote sources have the same ip addr */
1458   sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
1459
1460   GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
1461 }
1462
1463 static void
1464 add_source (RTPSession * sess, RTPSource * src)
1465 {
1466   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1467       GINT_TO_POINTER (src->ssrc), src);
1468   /* report the new source ASAP */
1469   src->generation = sess->generation;
1470   /* we have one more source now */
1471   sess->total_sources++;
1472   if (RTP_SOURCE_IS_ACTIVE (src))
1473     sess->stats.active_sources++;
1474   if (src->internal) {
1475     sess->stats.internal_sources++;
1476     if (sess->suggested_ssrc != src->ssrc)
1477       sess->suggested_ssrc = src->ssrc;
1478   }
1479
1480   /* update point-to-point status */
1481   if (!src->internal)
1482     session_update_ptp (sess);
1483 }
1484
1485 static RTPSource *
1486 find_source (RTPSession * sess, guint32 ssrc)
1487 {
1488   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1489       GINT_TO_POINTER (ssrc));
1490 }
1491
1492 /* must be called with the session lock, the returned source needs to be
1493  * unreffed after usage. */
1494 static RTPSource *
1495 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1496     RTPPacketInfo * pinfo, gboolean rtp)
1497 {
1498   RTPSource *source;
1499
1500   source = find_source (sess, ssrc);
1501   if (source == NULL) {
1502     /* make new Source in probation and insert */
1503     source = rtp_source_new (ssrc);
1504
1505     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1506
1507     /* for RTP packets we need to set the source in probation. Receiving RTCP
1508      * packets of an SSRC, on the other hand, is a strong indication that we
1509      * are dealing with a valid source. */
1510     if (rtp)
1511       g_object_set (source, "probation", sess->probation, NULL);
1512     else
1513       g_object_set (source, "probation", 0, NULL);
1514
1515     /* store from address, if any */
1516     if (pinfo->address) {
1517       if (rtp)
1518         rtp_source_set_rtp_from (source, pinfo->address);
1519       else
1520         rtp_source_set_rtcp_from (source, pinfo->address);
1521     }
1522
1523     /* configure a callback on the source */
1524     rtp_source_set_callbacks (source, &callbacks, sess);
1525
1526     add_source (sess, source);
1527     *created = TRUE;
1528   } else {
1529     *created = FALSE;
1530     /* check for collision, this updates the address when not previously set */
1531     if (check_collision (sess, source, pinfo, rtp)) {
1532       return NULL;
1533     }
1534     /* Receiving RTCP packets of an SSRC is a strong indication that we
1535      * are dealing with a valid source. */
1536     if (!rtp)
1537       g_object_set (source, "probation", 0, NULL);
1538   }
1539   /* update last activity */
1540   source->last_activity = pinfo->current_time;
1541   if (rtp)
1542     source->last_rtp_activity = pinfo->current_time;
1543   g_object_ref (source);
1544
1545   return source;
1546 }
1547
1548 /* must be called with the session lock, the returned source needs to be
1549  * unreffed after usage. */
1550 static RTPSource *
1551 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1552     GstClockTime current_time)
1553 {
1554   RTPSource *source;
1555
1556   source = find_source (sess, ssrc);
1557   if (source == NULL) {
1558     /* make new internal Source and insert */
1559     source = rtp_source_new (ssrc);
1560
1561     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1562
1563     source->validated = TRUE;
1564     source->internal = TRUE;
1565     source->probation = FALSE;
1566     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1567     rtp_source_set_callbacks (source, &callbacks, sess);
1568
1569     add_source (sess, source);
1570     *created = TRUE;
1571   } else {
1572     *created = FALSE;
1573   }
1574   /* update last activity */
1575   if (current_time != GST_CLOCK_TIME_NONE) {
1576     source->last_activity = current_time;
1577     source->last_rtp_activity = current_time;
1578   }
1579   g_object_ref (source);
1580
1581   return source;
1582 }
1583
1584 /**
1585  * rtp_session_suggest_ssrc:
1586  * @sess: a #RTPSession
1587  *
1588  * Suggest an unused SSRC in @sess.
1589  *
1590  * Returns: a free unused SSRC
1591  */
1592 guint32
1593 rtp_session_suggest_ssrc (RTPSession * sess)
1594 {
1595   guint32 result;
1596
1597   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1598
1599   RTP_SESSION_LOCK (sess);
1600   result = sess->suggested_ssrc;
1601   RTP_SESSION_UNLOCK (sess);
1602
1603   return result;
1604 }
1605
1606 /**
1607  * rtp_session_add_source:
1608  * @sess: a #RTPSession
1609  * @src: #RTPSource to add
1610  *
1611  * Add @src to @session.
1612  *
1613  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1614  * existed in the session.
1615  */
1616 gboolean
1617 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1618 {
1619   gboolean result = FALSE;
1620   RTPSource *find;
1621
1622   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1623   g_return_val_if_fail (src != NULL, FALSE);
1624
1625   RTP_SESSION_LOCK (sess);
1626   find = find_source (sess, src->ssrc);
1627   if (find == NULL) {
1628     add_source (sess, src);
1629     result = TRUE;
1630   }
1631   RTP_SESSION_UNLOCK (sess);
1632
1633   return result;
1634 }
1635
1636 /**
1637  * rtp_session_get_num_sources:
1638  * @sess: an #RTPSession
1639  *
1640  * Get the number of sources in @sess.
1641  *
1642  * Returns: The number of sources in @sess.
1643  */
1644 guint
1645 rtp_session_get_num_sources (RTPSession * sess)
1646 {
1647   guint result;
1648
1649   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1650
1651   RTP_SESSION_LOCK (sess);
1652   result = sess->total_sources;
1653   RTP_SESSION_UNLOCK (sess);
1654
1655   return result;
1656 }
1657
1658 /**
1659  * rtp_session_get_num_active_sources:
1660  * @sess: an #RTPSession
1661  *
1662  * Get the number of active sources in @sess. A source is considered active when
1663  * it has been validated and has not yet received a BYE RTCP message.
1664  *
1665  * Returns: The number of active sources in @sess.
1666  */
1667 guint
1668 rtp_session_get_num_active_sources (RTPSession * sess)
1669 {
1670   guint result;
1671
1672   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1673
1674   RTP_SESSION_LOCK (sess);
1675   result = sess->stats.active_sources;
1676   RTP_SESSION_UNLOCK (sess);
1677
1678   return result;
1679 }
1680
1681 /**
1682  * rtp_session_get_source_by_ssrc:
1683  * @sess: an #RTPSession
1684  * @ssrc: an SSRC
1685  *
1686  * Find the source with @ssrc in @sess.
1687  *
1688  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1689  * g_object_unref() after usage.
1690  */
1691 RTPSource *
1692 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1693 {
1694   RTPSource *result;
1695
1696   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1697
1698   RTP_SESSION_LOCK (sess);
1699   result = find_source (sess, ssrc);
1700   if (result != NULL)
1701     g_object_ref (result);
1702   RTP_SESSION_UNLOCK (sess);
1703
1704   return result;
1705 }
1706
1707 /* should be called with the SESSION lock */
1708 static guint32
1709 rtp_session_create_new_ssrc (RTPSession * sess)
1710 {
1711   guint32 ssrc;
1712
1713   while (TRUE) {
1714     ssrc = g_random_int ();
1715
1716     /* see if it exists in the session, we're done if it doesn't */
1717     if (find_source (sess, ssrc) == NULL)
1718       break;
1719   }
1720   return ssrc;
1721 }
1722
1723
1724 /**
1725  * rtp_session_create_source:
1726  * @sess: an #RTPSession
1727  *
1728  * Create an #RTPSource for use in @sess. This function will create a source
1729  * with an ssrc that is currently not used by any participants in the session.
1730  *
1731  * Returns: an #RTPSource.
1732  */
1733 RTPSource *
1734 rtp_session_create_source (RTPSession * sess)
1735 {
1736   guint32 ssrc;
1737   RTPSource *source;
1738
1739   RTP_SESSION_LOCK (sess);
1740   ssrc = rtp_session_create_new_ssrc (sess);
1741   source = rtp_source_new (ssrc);
1742   rtp_source_set_callbacks (source, &callbacks, sess);
1743   /* we need an additional ref for the source in the hashtable */
1744   g_object_ref (source);
1745   add_source (sess, source);
1746   RTP_SESSION_UNLOCK (sess);
1747
1748   return source;
1749 }
1750
1751 static gboolean
1752 update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
1753 {
1754   GstNetAddressMeta *meta;
1755
1756   /* get packet size including header overhead */
1757   pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
1758   pinfo->packets++;
1759
1760   if (pinfo->rtp) {
1761     GstRTPBuffer rtp = { NULL };
1762
1763     if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
1764       goto invalid_packet;
1765
1766     pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
1767     if (idx == 0) {
1768       gint i;
1769
1770       /* only keep info for first buffer */
1771       pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
1772       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
1773       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
1774       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1775       /* copy available csrc */
1776       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
1777       for (i = 0; i < pinfo->csrc_count; i++)
1778         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
1779     }
1780     gst_rtp_buffer_unmap (&rtp);
1781   }
1782
1783   if (idx == 0) {
1784     /* for netbuffer we can store the IP address to check for collisions */
1785     meta = gst_buffer_get_net_address_meta (*buffer);
1786     if (pinfo->address)
1787       g_object_unref (pinfo->address);
1788     if (meta) {
1789       pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
1790     } else {
1791       pinfo->address = NULL;
1792     }
1793   }
1794   return TRUE;
1795
1796   /* ERRORS */
1797 invalid_packet:
1798   {
1799     GST_DEBUG ("invalid RTP packet received");
1800     return FALSE;
1801   }
1802 }
1803
1804 /* update the RTPPacketInfo structure with the current time and other bits
1805  * about the current buffer we are handling.
1806  * This function is typically called when a validated packet is received.
1807  * This function should be called with the SESSION_LOCK
1808  */
1809 static gboolean
1810 update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
1811     gboolean send, gboolean rtp, gboolean is_list, gpointer data,
1812     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
1813 {
1814   gboolean res;
1815
1816   pinfo->send = send;
1817   pinfo->rtp = rtp;
1818   pinfo->is_list = is_list;
1819   pinfo->data = data;
1820   pinfo->current_time = current_time;
1821   pinfo->running_time = running_time;
1822   pinfo->ntpnstime = ntpnstime;
1823   pinfo->header_len = sess->header_len;
1824   pinfo->bytes = 0;
1825   pinfo->payload_len = 0;
1826   pinfo->packets = 0;
1827
1828   if (is_list) {
1829     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
1830     res =
1831         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
1832         pinfo);
1833   } else {
1834     GstBuffer *buffer = GST_BUFFER_CAST (data);
1835     res = update_packet (&buffer, 0, pinfo);
1836   }
1837   return res;
1838 }
1839
1840 static void
1841 clean_packet_info (RTPPacketInfo * pinfo)
1842 {
1843   if (pinfo->address)
1844     g_object_unref (pinfo->address);
1845   if (pinfo->data) {
1846     gst_mini_object_unref (pinfo->data);
1847     pinfo->data = NULL;
1848   }
1849 }
1850
1851 static gboolean
1852 source_update_active (RTPSession * sess, RTPSource * source,
1853     gboolean prevactive)
1854 {
1855   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
1856   guint32 ssrc = source->ssrc;
1857
1858   if (prevactive == active)
1859     return FALSE;
1860
1861   if (active) {
1862     sess->stats.active_sources++;
1863     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1864         sess->stats.active_sources);
1865   } else {
1866     sess->stats.active_sources--;
1867     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1868         sess->stats.active_sources);
1869   }
1870   return TRUE;
1871 }
1872
1873 static gboolean
1874 source_update_sender (RTPSession * sess, RTPSource * source,
1875     gboolean prevsender)
1876 {
1877   gboolean sender = RTP_SOURCE_IS_SENDER (source);
1878   guint32 ssrc = source->ssrc;
1879
1880   if (prevsender == sender)
1881     return FALSE;
1882
1883   if (sender) {
1884     sess->stats.sender_sources++;
1885     if (source->internal)
1886       sess->stats.internal_sender_sources++;
1887     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1888         sess->stats.sender_sources);
1889   } else {
1890     sess->stats.sender_sources--;
1891     if (source->internal)
1892       sess->stats.internal_sender_sources--;
1893     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1894         sess->stats.sender_sources);
1895   }
1896   return TRUE;
1897 }
1898
1899 /**
1900  * rtp_session_process_rtp:
1901  * @sess: and #RTPSession
1902  * @buffer: an RTP buffer
1903  * @current_time: the current system time
1904  * @running_time: the running_time of @buffer
1905  *
1906  * Process an RTP buffer in the session manager. This function takes ownership
1907  * of @buffer.
1908  *
1909  * Returns: a #GstFlowReturn.
1910  */
1911 GstFlowReturn
1912 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1913     GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
1914 {
1915   GstFlowReturn result;
1916   guint32 ssrc;
1917   RTPSource *source;
1918   gboolean created;
1919   gboolean prevsender, prevactive;
1920   RTPPacketInfo pinfo = { 0, };
1921   guint64 oldrate;
1922
1923   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1924   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1925
1926   RTP_SESSION_LOCK (sess);
1927
1928   /* update pinfo stats */
1929   if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
1930           current_time, running_time, ntpnstime)) {
1931     GST_DEBUG ("invalid RTP packet received");
1932     RTP_SESSION_UNLOCK (sess);
1933     return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
1934   }
1935
1936   ssrc = pinfo.ssrc;
1937
1938   source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
1939   if (!source)
1940     goto collision;
1941
1942   prevsender = RTP_SOURCE_IS_SENDER (source);
1943   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1944   oldrate = source->bitrate;
1945
1946   /* let source process the packet */
1947   result = rtp_source_process_rtp (source, &pinfo);
1948
1949   /* source became active */
1950   if (source_update_active (sess, source, prevactive))
1951     on_ssrc_validated (sess, source);
1952
1953   source_update_sender (sess, source, prevsender);
1954
1955   if (oldrate != source->bitrate)
1956     sess->recalc_bandwidth = TRUE;
1957
1958   if (created)
1959     on_new_ssrc (sess, source);
1960
1961   if (source->validated) {
1962     gboolean created;
1963     gint i;
1964
1965     /* for validated sources, we add the CSRCs as well */
1966     for (i = 0; i < pinfo.csrc_count; i++) {
1967       guint32 csrc;
1968       RTPSource *csrc_src;
1969
1970       csrc = pinfo.csrcs[i];
1971
1972       /* get source */
1973       csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
1974       if (!csrc_src)
1975         continue;
1976
1977       if (created) {
1978         GST_DEBUG ("created new CSRC: %08x", csrc);
1979         rtp_source_set_as_csrc (csrc_src);
1980         source_update_active (sess, csrc_src, FALSE);
1981         on_new_ssrc (sess, csrc_src);
1982       }
1983       g_object_unref (csrc_src);
1984     }
1985   }
1986   g_object_unref (source);
1987
1988   RTP_SESSION_UNLOCK (sess);
1989
1990   clean_packet_info (&pinfo);
1991
1992   return result;
1993
1994   /* ERRORS */
1995 collision:
1996   {
1997     RTP_SESSION_UNLOCK (sess);
1998     clean_packet_info (&pinfo);
1999     GST_DEBUG ("ignoring packet because its collisioning");
2000     return GST_FLOW_OK;
2001   }
2002 }
2003
2004 static void
2005 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
2006     GstRTCPPacket * packet, RTPPacketInfo * pinfo)
2007 {
2008   guint count, i;
2009
2010   count = gst_rtcp_packet_get_rb_count (packet);
2011   for (i = 0; i < count; i++) {
2012     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
2013     guint8 fractionlost;
2014     gint32 packetslost;
2015     RTPSource *src;
2016
2017     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
2018         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2019
2020     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
2021
2022     /* find our own source */
2023     src = find_source (sess, ssrc);
2024     if (src == NULL)
2025       continue;
2026
2027     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
2028       /* only deal with report blocks for our session, we update the stats of
2029        * the sender of the RTCP message. We could also compare our stats against
2030        * the other sender to see if we are better or worse. */
2031       /* FIXME, need to keep track who the RB block is from */
2032       rtp_source_process_rb (source, pinfo->ntpnstime, fractionlost,
2033           packetslost, exthighestseq, jitter, lsr, dlsr);
2034     }
2035   }
2036   on_ssrc_active (sess, source);
2037 }
2038
2039 /* A Sender report contains statistics about how the sender is doing. This
2040  * includes timing informataion such as the relation between RTP and NTP
2041  * timestamps and the number of packets/bytes it sent to us.
2042  *
2043  * In this report is also included a set of report blocks related to how this
2044  * sender is receiving data (in case we (or somebody else) is also sending stuff
2045  * to it). This info includes the packet loss, jitter and seqnum. It also
2046  * contains information to calculate the round trip time (LSR/DLSR).
2047  */
2048 static void
2049 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
2050     RTPPacketInfo * pinfo, gboolean * do_sync)
2051 {
2052   guint32 senderssrc, rtptime, packet_count, octet_count;
2053   guint64 ntptime;
2054   RTPSource *source;
2055   gboolean created, prevsender;
2056
2057   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
2058       &packet_count, &octet_count);
2059
2060   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
2061       senderssrc, GST_TIME_ARGS (pinfo->current_time));
2062
2063   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2064   if (!source)
2065     return;
2066
2067   /* skip non-bye packets for sources that are marked BYE */
2068   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2069     goto out;
2070
2071   /* don't try to do lip-sync for sources that sent a BYE */
2072   if (RTP_SOURCE_IS_MARKED_BYE (source))
2073     *do_sync = FALSE;
2074   else
2075     *do_sync = TRUE;
2076
2077   prevsender = RTP_SOURCE_IS_SENDER (source);
2078
2079   /* first update the source */
2080   rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
2081       packet_count, octet_count);
2082
2083   source_update_sender (sess, source, prevsender);
2084
2085   if (created)
2086     on_new_ssrc (sess, source);
2087
2088   rtp_session_process_rb (sess, source, packet, pinfo);
2089
2090 out:
2091   g_object_unref (source);
2092 }
2093
2094 /* A receiver report contains statistics about how a receiver is doing. It
2095  * includes stuff like packet loss, jitter and the seqnum it received last. It
2096  * also contains info to calculate the round trip time.
2097  *
2098  * We are only interested in how the sender of this report is doing wrt to us.
2099  */
2100 static void
2101 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
2102     RTPPacketInfo * pinfo)
2103 {
2104   guint32 senderssrc;
2105   RTPSource *source;
2106   gboolean created;
2107
2108   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
2109
2110   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
2111
2112   source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
2113   if (!source)
2114     return;
2115
2116   /* skip non-bye packets for sources that are marked BYE */
2117   if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2118     goto out;
2119
2120   if (created)
2121     on_new_ssrc (sess, source);
2122
2123   rtp_session_process_rb (sess, source, packet, pinfo);
2124
2125 out:
2126   g_object_unref (source);
2127 }
2128
2129 /* Get SDES items and store them in the SSRC */
2130 static void
2131 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
2132     RTPPacketInfo * pinfo)
2133 {
2134   guint items, i, j;
2135   gboolean more_items, more_entries;
2136
2137   items = gst_rtcp_packet_sdes_get_item_count (packet);
2138   GST_DEBUG ("got SDES packet with %d items", items);
2139
2140   more_items = gst_rtcp_packet_sdes_first_item (packet);
2141   i = 0;
2142   while (more_items) {
2143     guint32 ssrc;
2144     gboolean changed, created, prevactive;
2145     RTPSource *source;
2146     GstStructure *sdes;
2147
2148     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
2149
2150     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
2151
2152     changed = FALSE;
2153
2154     /* find src, no probation when dealing with RTCP */
2155     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2156     if (!source)
2157       return;
2158
2159     /* skip non-bye packets for sources that are marked BYE */
2160     if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
2161       goto next;
2162
2163     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
2164
2165     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
2166     j = 0;
2167     while (more_entries) {
2168       GstRTCPSDESType type;
2169       guint8 len;
2170       guint8 *data;
2171       gchar *name;
2172       gchar *value;
2173
2174       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
2175
2176       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
2177           data);
2178
2179       if (type == GST_RTCP_SDES_PRIV) {
2180         name = g_strndup ((const gchar *) &data[1], data[0]);
2181         len -= data[0] + 1;
2182         data += data[0] + 1;
2183       } else {
2184         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
2185       }
2186
2187       value = g_strndup ((const gchar *) data, len);
2188
2189       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
2190
2191       g_free (name);
2192       g_free (value);
2193
2194       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
2195       j++;
2196     }
2197
2198     /* takes ownership of sdes */
2199     changed = rtp_source_set_sdes_struct (source, sdes);
2200
2201     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2202     source->validated = TRUE;
2203
2204     if (created)
2205       on_new_ssrc (sess, source);
2206
2207     /* source became active */
2208     if (source_update_active (sess, source, prevactive))
2209       on_ssrc_validated (sess, source);
2210
2211     if (changed)
2212       on_ssrc_sdes (sess, source);
2213
2214   next:
2215     g_object_unref (source);
2216
2217     more_items = gst_rtcp_packet_sdes_next_item (packet);
2218     i++;
2219   }
2220 }
2221
2222 /* BYE is sent when a client leaves the session
2223  */
2224 static void
2225 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
2226     RTPPacketInfo * pinfo)
2227 {
2228   guint count, i;
2229   gchar *reason;
2230   gboolean reconsider = FALSE;
2231
2232   reason = gst_rtcp_packet_bye_get_reason (packet);
2233   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
2234
2235   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
2236   for (i = 0; i < count; i++) {
2237     guint32 ssrc;
2238     RTPSource *source;
2239     gboolean created, prevactive, prevsender;
2240     guint pmembers, members;
2241
2242     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
2243     GST_DEBUG ("SSRC: %08x", ssrc);
2244
2245     /* find src and mark bye, no probation when dealing with RTCP */
2246     source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
2247     if (!source)
2248       return;
2249
2250     if (source->internal) {
2251       /* our own source, something weird with this packet */
2252       g_object_unref (source);
2253       continue;
2254     }
2255
2256     /* store time for when we need to time out this source */
2257     source->bye_time = pinfo->current_time;
2258
2259     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2260     prevsender = RTP_SOURCE_IS_SENDER (source);
2261
2262     /* mark the source BYE */
2263     rtp_source_mark_bye (source, reason);
2264
2265     pmembers = sess->stats.active_sources;
2266
2267     source_update_active (sess, source, prevactive);
2268     source_update_sender (sess, source, prevsender);
2269
2270     members = sess->stats.active_sources;
2271
2272     if (!sess->scheduled_bye && members < pmembers) {
2273       /* some members went away since the previous timeout estimate.
2274        * Perform reverse reconsideration but only when we are not scheduling a
2275        * BYE ourselves. */
2276       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2277           pinfo->current_time < sess->next_rtcp_check_time) {
2278         GstClockTime time_remaining;
2279
2280         time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
2281         sess->next_rtcp_check_time =
2282             gst_util_uint64_scale (time_remaining, members, pmembers);
2283
2284         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2285             GST_TIME_ARGS (sess->next_rtcp_check_time));
2286
2287         sess->next_rtcp_check_time += pinfo->current_time;
2288
2289         /* mark pending reconsider. We only want to signal the reconsideration
2290          * once after we handled all the source in the bye packet */
2291         reconsider = TRUE;
2292       }
2293     }
2294
2295     if (created)
2296       on_new_ssrc (sess, source);
2297
2298     on_bye_ssrc (sess, source);
2299
2300     g_object_unref (source);
2301   }
2302   if (reconsider) {
2303     RTP_SESSION_UNLOCK (sess);
2304     /* notify app of reconsideration */
2305     if (sess->callbacks.reconsider)
2306       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2307     RTP_SESSION_LOCK (sess);
2308   }
2309   g_free (reason);
2310 }
2311
2312 static void
2313 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2314     RTPPacketInfo * pinfo)
2315 {
2316   GST_DEBUG ("received APP");
2317 }
2318
2319 static gboolean
2320 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2321     gboolean fir, GstClockTime current_time)
2322 {
2323   guint32 round_trip = 0;
2324
2325   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
2326
2327   if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2328     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2329         GST_SECOND, 65536);
2330
2331     if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
2332       GST_DEBUG ("Ignoring %s request because one was send without one "
2333           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2334           fir ? "FIR" : "PLI",
2335           GST_TIME_ARGS (current_time - sess->last_keyframe_request),
2336           GST_TIME_ARGS (round_trip_in_ns));;
2337       return FALSE;
2338     }
2339   }
2340
2341   sess->last_keyframe_request = current_time;
2342
2343   GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
2344       rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
2345       sess->callbacks.request_key_unit);
2346
2347   RTP_SESSION_UNLOCK (sess);
2348   sess->callbacks.request_key_unit (sess, fir,
2349       sess->request_key_unit_user_data);
2350   RTP_SESSION_LOCK (sess);
2351
2352   return TRUE;
2353 }
2354
2355 static void
2356 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2357     guint32 media_ssrc, GstClockTime current_time)
2358 {
2359   RTPSource *src;
2360
2361   if (!sess->callbacks.request_key_unit)
2362     return;
2363
2364   src = find_source (sess, sender_ssrc);
2365   if (src == NULL)
2366     return;
2367
2368   rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
2369 }
2370
2371 static void
2372 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2373     guint8 * fci_data, guint fci_length, GstClockTime current_time)
2374 {
2375   RTPSource *src;
2376   guint32 ssrc;
2377   guint position = 0;
2378   gboolean our_request = FALSE;
2379
2380   if (!sess->callbacks.request_key_unit)
2381     return;
2382
2383   if (fci_length < 8)
2384     return;
2385
2386   src = find_source (sess, sender_ssrc);
2387
2388   /* Hack because Google fails to set the sender_ssrc correctly */
2389   if (!src && sender_ssrc == 1) {
2390     GHashTableIter iter;
2391
2392     /* we can't find the source if there are multiple */
2393     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2394       return;
2395
2396     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2397     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2398       if (!src->internal && rtp_source_is_sender (src))
2399         break;
2400       src = NULL;
2401     }
2402   }
2403   if (!src)
2404     return;
2405
2406   for (position = 0; position < fci_length; position += 8) {
2407     guint8 *data = fci_data + position;
2408     RTPSource *own;
2409
2410     ssrc = GST_READ_UINT32_BE (data);
2411
2412     own = find_source (sess, ssrc);
2413     if (own == NULL)
2414       continue;
2415
2416     if (own->internal) {
2417       our_request = TRUE;
2418       break;
2419     }
2420   }
2421   if (!our_request)
2422     return;
2423
2424   rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
2425 }
2426
2427 static void
2428 rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
2429     guint32 media_ssrc, guint8 * fci_data, guint fci_length,
2430     GstClockTime current_time)
2431 {
2432   sess->stats.nacks_received++;
2433
2434   if (!sess->callbacks.notify_nack)
2435     return;
2436
2437   while (fci_length > 0) {
2438     guint16 seqnum, blp;
2439
2440     seqnum = GST_READ_UINT16_BE (fci_data);
2441     blp = GST_READ_UINT16_BE (fci_data + 2);
2442
2443     GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
2444
2445     RTP_SESSION_UNLOCK (sess);
2446     sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
2447         sess->notify_nack_user_data);
2448     RTP_SESSION_LOCK (sess);
2449
2450     fci_data += 4;
2451     fci_length -= 4;
2452   }
2453 }
2454
2455 static void
2456 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2457     RTPPacketInfo * pinfo, GstClockTime current_time)
2458 {
2459   GstRTCPType type = gst_rtcp_packet_get_type (packet);
2460   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
2461   guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2462   guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2463   guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
2464   guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
2465   RTPSource *src;
2466
2467   src = find_source (sess, media_ssrc);
2468
2469   /* skip non-bye packets for sources that are marked BYE */
2470   if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
2471     return;
2472
2473   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2474       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2475
2476   if (g_signal_has_handler_pending (sess,
2477           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2478     GstBuffer *fci_buffer = NULL;
2479
2480     if (fci_length > 0) {
2481       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2482           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2483           fci_length);
2484       GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time;
2485     }
2486
2487     RTP_SESSION_UNLOCK (sess);
2488     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2489         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2490     RTP_SESSION_LOCK (sess);
2491
2492     if (fci_buffer)
2493       gst_buffer_unref (fci_buffer);
2494   }
2495
2496   if (src && sess->rtcp_feedback_retention_window) {
2497     rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
2498   }
2499
2500   if ((src && src->internal) ||
2501       /* PSFB FIR puts the media ssrc inside the FCI */
2502       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
2503     switch (type) {
2504       case GST_RTCP_TYPE_PSFB:
2505         switch (fbtype) {
2506           case GST_RTCP_PSFB_TYPE_PLI:
2507             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2508                 current_time);
2509             break;
2510           case GST_RTCP_PSFB_TYPE_FIR:
2511             rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
2512                 current_time);
2513             break;
2514           default:
2515             break;
2516         }
2517         break;
2518       case GST_RTCP_TYPE_RTPFB:
2519         switch (fbtype) {
2520           case GST_RTCP_RTPFB_TYPE_NACK:
2521             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
2522                 fci_data, fci_length, current_time);
2523             break;
2524           default:
2525             break;
2526         }
2527       default:
2528         break;
2529     }
2530   }
2531 }
2532
2533 /**
2534  * rtp_session_process_rtcp:
2535  * @sess: and #RTPSession
2536  * @buffer: an RTCP buffer
2537  * @current_time: the current system time
2538  * @ntpnstime: the current NTP time in nanoseconds
2539  *
2540  * Process an RTCP buffer in the session manager. This function takes ownership
2541  * of @buffer.
2542  *
2543  * Returns: a #GstFlowReturn.
2544  */
2545 GstFlowReturn
2546 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2547     GstClockTime current_time, guint64 ntpnstime)
2548 {
2549   GstRTCPPacket packet;
2550   gboolean more, is_bye = FALSE, do_sync = FALSE;
2551   RTPPacketInfo pinfo = { 0, };
2552   GstFlowReturn result = GST_FLOW_OK;
2553   GstRTCPBuffer rtcp = { NULL, };
2554
2555   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2556   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2557
2558   if (!gst_rtcp_buffer_validate (buffer))
2559     goto invalid_packet;
2560
2561   GST_DEBUG ("received RTCP packet");
2562
2563   RTP_SESSION_LOCK (sess);
2564   /* update pinfo stats */
2565   update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
2566       -1, ntpnstime);
2567
2568   /* start processing the compound packet */
2569   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2570   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
2571   while (more) {
2572     GstRTCPType type;
2573
2574     type = gst_rtcp_packet_get_type (&packet);
2575
2576     switch (type) {
2577       case GST_RTCP_TYPE_SR:
2578         rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
2579         break;
2580       case GST_RTCP_TYPE_RR:
2581         rtp_session_process_rr (sess, &packet, &pinfo);
2582         break;
2583       case GST_RTCP_TYPE_SDES:
2584         rtp_session_process_sdes (sess, &packet, &pinfo);
2585         break;
2586       case GST_RTCP_TYPE_BYE:
2587         is_bye = TRUE;
2588         /* don't try to attempt lip-sync anymore for streams with a BYE */
2589         do_sync = FALSE;
2590         rtp_session_process_bye (sess, &packet, &pinfo);
2591         break;
2592       case GST_RTCP_TYPE_APP:
2593         rtp_session_process_app (sess, &packet, &pinfo);
2594         break;
2595       case GST_RTCP_TYPE_RTPFB:
2596       case GST_RTCP_TYPE_PSFB:
2597         rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
2598         break;
2599       default:
2600         GST_WARNING ("got unknown RTCP packet");
2601         break;
2602     }
2603     more = gst_rtcp_packet_move_to_next (&packet);
2604   }
2605
2606   gst_rtcp_buffer_unmap (&rtcp);
2607
2608   /* if we are scheduling a BYE, we only want to count bye packets, else we
2609    * count everything */
2610   if (sess->scheduled_bye && is_bye) {
2611     sess->bye_stats.bye_members++;
2612     UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
2613   }
2614
2615   /* keep track of average packet size */
2616   UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2617
2618   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2619       sess->stats.avg_rtcp_packet_size, pinfo.bytes);
2620   RTP_SESSION_UNLOCK (sess);
2621
2622   pinfo.data = NULL;
2623   clean_packet_info (&pinfo);
2624
2625   /* notify caller of sr packets in the callback */
2626   if (do_sync && sess->callbacks.sync_rtcp) {
2627     result = sess->callbacks.sync_rtcp (sess, buffer,
2628         sess->sync_rtcp_user_data);
2629   } else
2630     gst_buffer_unref (buffer);
2631
2632   return result;
2633
2634   /* ERRORS */
2635 invalid_packet:
2636   {
2637     GST_DEBUG ("invalid RTCP packet received");
2638     gst_buffer_unref (buffer);
2639     return GST_FLOW_OK;
2640   }
2641 }
2642
2643 /**
2644  * rtp_session_update_send_caps:
2645  * @sess: an #RTPSession
2646  * @caps: a #GstCaps
2647  *
2648  * Update the caps of the sender in the rtp session.
2649  */
2650 void
2651 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
2652 {
2653   GstStructure *s;
2654   guint ssrc;
2655
2656   g_return_if_fail (RTP_IS_SESSION (sess));
2657   g_return_if_fail (GST_IS_CAPS (caps));
2658
2659   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
2660
2661   s = gst_caps_get_structure (caps, 0);
2662
2663   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
2664     RTPSource *source;
2665     gboolean created;
2666
2667     RTP_SESSION_LOCK (sess);
2668     source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
2669     if (source) {
2670       rtp_source_update_caps (source, caps);
2671       g_object_unref (source);
2672     }
2673     RTP_SESSION_UNLOCK (sess);
2674   }
2675 }
2676
2677 /**
2678  * rtp_session_send_rtp:
2679  * @sess: an #RTPSession
2680  * @data: pointer to either an RTP buffer or a list of RTP buffers
2681  * @is_list: TRUE when @data is a buffer list
2682  * @current_time: the current system time
2683  * @running_time: the running time of @data
2684  *
2685  * Send the RTP buffer in the session manager. This function takes ownership of
2686  * @buffer.
2687  *
2688  * Returns: a #GstFlowReturn.
2689  */
2690 GstFlowReturn
2691 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2692     GstClockTime current_time, GstClockTime running_time)
2693 {
2694   GstFlowReturn result;
2695   RTPSource *source;
2696   gboolean prevsender;
2697   guint64 oldrate;
2698   RTPPacketInfo pinfo = { 0, };
2699   gboolean created;
2700
2701   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2702   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2703
2704   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2705
2706   RTP_SESSION_LOCK (sess);
2707   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
2708           current_time, running_time, -1))
2709     goto invalid_packet;
2710
2711   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
2712
2713   prevsender = RTP_SOURCE_IS_SENDER (source);
2714   oldrate = source->bitrate;
2715
2716   /* we use our own source to send */
2717   result = rtp_source_send_rtp (source, &pinfo);
2718
2719   source_update_sender (sess, source, prevsender);
2720
2721   if (oldrate != source->bitrate)
2722     sess->recalc_bandwidth = TRUE;
2723   RTP_SESSION_UNLOCK (sess);
2724
2725   g_object_unref (source);
2726   clean_packet_info (&pinfo);
2727
2728   return result;
2729
2730 invalid_packet:
2731   {
2732     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2733     RTP_SESSION_UNLOCK (sess);
2734     GST_DEBUG ("invalid RTP packet received");
2735     return GST_FLOW_OK;
2736   }
2737 }
2738
2739 static void
2740 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2741 {
2742   *bandwidth += source->bitrate;
2743 }
2744
2745 /* must be called with session lock */
2746 static GstClockTime
2747 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2748     gboolean first)
2749 {
2750   GstClockTime result;
2751   RTPSessionStats *stats;
2752
2753   /* recalculate bandwidth when it changed */
2754   if (sess->recalc_bandwidth) {
2755     gdouble bandwidth;
2756
2757     if (sess->bandwidth > 0)
2758       bandwidth = sess->bandwidth;
2759     else {
2760       /* If it is <= 0, then try to estimate the actual bandwidth */
2761       bandwidth = 0;
2762
2763       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2764           (GHFunc) add_bitrates, &bandwidth);
2765       bandwidth /= 8.0;
2766     }
2767     if (bandwidth < 8000)
2768       bandwidth = RTP_STATS_BANDWIDTH;
2769
2770     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2771         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2772
2773     sess->recalc_bandwidth = FALSE;
2774   }
2775
2776   if (sess->scheduled_bye) {
2777     stats = &sess->bye_stats;
2778     result = rtp_stats_calculate_bye_interval (stats);
2779   } else {
2780     stats = &sess->stats;
2781     result = rtp_stats_calculate_rtcp_interval (stats,
2782         stats->internal_sender_sources > 0, first);
2783   }
2784
2785   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2786       GST_TIME_ARGS (result), first);
2787
2788   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2789     result = rtp_stats_add_rtcp_jitter (stats, result);
2790
2791   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2792
2793   return result;
2794 }
2795
2796 static void
2797 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
2798 {
2799   if (source->internal)
2800     rtp_source_mark_bye (source, reason);
2801 }
2802
2803 /**
2804  * rtp_session_mark_all_bye:
2805  * @sess: an #RTPSession
2806  * @reason: a reason
2807  *
2808  * Mark all internal sources of the session as BYE with @reason.
2809  */
2810 void
2811 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
2812 {
2813   g_return_if_fail (RTP_IS_SESSION (sess));
2814
2815   RTP_SESSION_LOCK (sess);
2816   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2817       (GHFunc) source_mark_bye, (gpointer) reason);
2818   RTP_SESSION_UNLOCK (sess);
2819 }
2820
2821 /* Stop the current @sess and schedule a BYE message for the other members.
2822  * One must have the session lock to call this function
2823  */
2824 static GstFlowReturn
2825 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
2826 {
2827   GstFlowReturn result = GST_FLOW_OK;
2828   GstClockTime interval;
2829
2830   /* nothing to do it we already scheduled bye */
2831   if (sess->scheduled_bye)
2832     goto done;
2833
2834   /* we schedule BYE now */
2835   sess->scheduled_bye = TRUE;
2836   /* at least one member wants to send a BYE */
2837   memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
2838   INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
2839   sess->bye_stats.bye_members = 1;
2840   sess->first_rtcp = TRUE;
2841   sess->allow_early = TRUE;
2842
2843   /* reschedule transmission */
2844   sess->last_rtcp_send_time = current_time;
2845   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2846
2847   if (interval != GST_CLOCK_TIME_NONE)
2848     sess->next_rtcp_check_time = current_time + interval;
2849   else
2850     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
2851
2852   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2853       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2854
2855   RTP_SESSION_UNLOCK (sess);
2856   /* notify app of reconsideration */
2857   if (sess->callbacks.reconsider)
2858     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2859   RTP_SESSION_LOCK (sess);
2860 done:
2861
2862   return result;
2863 }
2864
2865 /**
2866  * rtp_session_schedule_bye:
2867  * @sess: an #RTPSession
2868  * @current_time: the current system time
2869  *
2870  * Schedule a BYE message for all sources marked as BYE in @sess.
2871  *
2872  * Returns: a #GstFlowReturn.
2873  */
2874 GstFlowReturn
2875 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
2876 {
2877   GstFlowReturn result;
2878
2879   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2880
2881   RTP_SESSION_LOCK (sess);
2882   result = rtp_session_schedule_bye_locked (sess, current_time);
2883   RTP_SESSION_UNLOCK (sess);
2884
2885   return result;
2886 }
2887
2888 /**
2889  * rtp_session_next_timeout:
2890  * @sess: an #RTPSession
2891  * @current_time: the current system time
2892  *
2893  * Get the next time we should perform session maintenance tasks.
2894  *
2895  * Returns: a time when rtp_session_on_timeout() should be called with the
2896  * current system time.
2897  */
2898 GstClockTime
2899 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2900 {
2901   GstClockTime result, interval = 0;
2902
2903   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2904
2905   RTP_SESSION_LOCK (sess);
2906
2907   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
2908     GST_DEBUG ("have early rtcp time");
2909     result = sess->next_early_rtcp_time;
2910     goto early_exit;
2911   }
2912
2913   result = sess->next_rtcp_check_time;
2914
2915   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2916       ", next time: %" GST_TIME_FORMAT,
2917       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2918
2919   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
2920     GST_DEBUG ("take current time as base");
2921     /* our previous check time expired, start counting from the current time
2922      * again. */
2923     result = current_time;
2924   }
2925
2926   if (sess->scheduled_bye) {
2927     if (sess->bye_stats.active_sources >= 50) {
2928       GST_DEBUG ("reconsider BYE, more than 50 sources");
2929       /* reconsider BYE if members >= 50 */
2930       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2931     }
2932   } else {
2933     if (sess->first_rtcp) {
2934       GST_DEBUG ("first RTCP packet");
2935       /* we are called for the first time */
2936       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2937     } else if (sess->next_rtcp_check_time < current_time) {
2938       GST_DEBUG ("old check time expired, getting new timeout");
2939       /* get a new timeout when we need to */
2940       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2941     }
2942   }
2943
2944   if (interval != GST_CLOCK_TIME_NONE)
2945     result += interval;
2946   else
2947     result = GST_CLOCK_TIME_NONE;
2948
2949   sess->next_rtcp_check_time = result;
2950
2951 early_exit:
2952
2953   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2954       ", next time: %" GST_TIME_FORMAT,
2955       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2956   RTP_SESSION_UNLOCK (sess);
2957
2958   return result;
2959 }
2960
2961 typedef struct
2962 {
2963   RTPSource *source;
2964   gboolean is_bye;
2965   GstBuffer *buffer;
2966 } ReportOutput;
2967
2968 typedef struct
2969 {
2970   GstRTCPBuffer rtcpbuf;
2971   RTPSession *sess;
2972   RTPSource *source;
2973   guint num_to_report;
2974   gboolean have_fir;
2975   gboolean have_pli;
2976   gboolean have_nack;
2977   GstBuffer *rtcp;
2978   GstClockTime current_time;
2979   guint64 ntpnstime;
2980   GstClockTime running_time;
2981   GstClockTime interval;
2982   GstRTCPPacket packet;
2983   gboolean has_sdes;
2984   gboolean is_early;
2985   gboolean may_suppress;
2986   GQueue output;
2987   guint nacked_seqnums;
2988 } ReportData;
2989
2990 static void
2991 session_start_rtcp (RTPSession * sess, ReportData * data)
2992 {
2993   GstRTCPPacket *packet = &data->packet;
2994   RTPSource *own = data->source;
2995   GstRTCPBuffer *rtcp = &data->rtcpbuf;
2996
2997   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2998   data->has_sdes = FALSE;
2999
3000   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
3001
3002   if (RTP_SOURCE_IS_SENDER (own)) {
3003     guint64 ntptime;
3004     guint32 rtptime;
3005     guint32 packet_count, octet_count;
3006
3007     /* we are a sender, create SR */
3008     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
3009     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
3010
3011     /* get latest stats */
3012     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
3013         &ntptime, &rtptime, &packet_count, &octet_count);
3014     /* store stats */
3015     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
3016         packet_count, octet_count);
3017
3018     /* fill in sender report info */
3019     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
3020         ntptime, rtptime, packet_count, octet_count);
3021   } else {
3022     /* we are only receiver, create RR */
3023     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
3024     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
3025     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
3026   }
3027 }
3028
3029 /* construct a Sender or Receiver Report */
3030 static void
3031 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
3032 {
3033   RTPSession *sess = data->sess;
3034   GstRTCPPacket *packet = &data->packet;
3035   guint8 fractionlost;
3036   gint32 packetslost;
3037   guint32 exthighestseq, jitter;
3038   guint32 lsr, dlsr;
3039
3040   /* don't report for sources in future generations */
3041   if (((gint16) (source->generation - sess->generation)) > 0) {
3042     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
3043         source->generation, sess->generation);
3044     return;
3045   }
3046
3047   if (g_hash_table_contains (source->reported_in_sr_of,
3048           GUINT_TO_POINTER (data->source->ssrc))) {
3049     GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
3050     return;
3051   }
3052
3053   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
3054     GST_DEBUG ("max RB count reached");
3055     return;
3056   }
3057
3058   /* only report about other sender */
3059   if (source == data->source)
3060     goto reported;
3061
3062   if (!RTP_SOURCE_IS_SENDER (source)) {
3063     GST_DEBUG ("source %08x not sender", source->ssrc);
3064     goto reported;
3065   }
3066
3067   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
3068
3069   /* get new stats */
3070   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
3071       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
3072
3073   /* store last generated RR packet */
3074   source->last_rr.is_valid = TRUE;
3075   source->last_rr.fractionlost = fractionlost;
3076   source->last_rr.packetslost = packetslost;
3077   source->last_rr.exthighestseq = exthighestseq;
3078   source->last_rr.jitter = jitter;
3079   source->last_rr.lsr = lsr;
3080   source->last_rr.dlsr = dlsr;
3081
3082   /* packet is not yet filled, add report block for this source. */
3083   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
3084       exthighestseq, jitter, lsr, dlsr);
3085
3086 reported:
3087   g_hash_table_add (source->reported_in_sr_of,
3088       GUINT_TO_POINTER (data->source->ssrc));
3089 }
3090
3091 /* construct FIR */
3092 static void
3093 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
3094 {
3095   GstRTCPPacket *packet = &data->packet;
3096   guint16 len;
3097   guint8 *fci_data;
3098
3099   if (!source->send_fir)
3100     return;
3101
3102   len = gst_rtcp_packet_fb_get_fci_length (packet);
3103   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
3104     /* exit because the packet is full, will put next request in a
3105      * further packet */
3106     return;
3107
3108   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
3109
3110   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
3111   fci_data += 4;
3112   fci_data[0] = source->current_send_fir_seqnum;
3113   fci_data[1] = fci_data[2] = fci_data[3] = 0;
3114
3115   source->send_fir = FALSE;
3116 }
3117
3118 static void
3119 session_fir (RTPSession * sess, ReportData * data)
3120 {
3121   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3122   GstRTCPPacket *packet = &data->packet;
3123
3124   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3125     return;
3126
3127   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
3128   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3129   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
3130
3131   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3132       (GHFunc) session_add_fir, data);
3133
3134   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
3135     gst_rtcp_packet_remove (packet);
3136   else
3137     data->may_suppress = FALSE;
3138 }
3139
3140 static gboolean
3141 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
3142 {
3143   GstRTCPPacket packet;
3144   GstRTCPBuffer rtcp = { NULL, };
3145   gboolean ret = FALSE;
3146
3147   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
3148
3149   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
3150     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
3151         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
3152       ret = TRUE;
3153   }
3154
3155   gst_rtcp_buffer_unmap (&rtcp);
3156
3157   return ret;
3158 }
3159
3160 /* construct PLI */
3161 static void
3162 session_pli (const gchar * key, RTPSource * source, ReportData * data)
3163 {
3164   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3165   GstRTCPPacket *packet = &data->packet;
3166
3167   if (!source->send_pli)
3168     return;
3169
3170   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
3171     return;
3172
3173   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
3174     /* exit because the packet is full, will put next request in a
3175      * further packet */
3176     return;
3177
3178   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
3179   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3180   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3181
3182   source->send_pli = FALSE;
3183   data->may_suppress = FALSE;
3184 }
3185
3186 /* construct NACK */
3187 static void
3188 session_nack (const gchar * key, RTPSource * source, ReportData * data)
3189 {
3190   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3191   GstRTCPPacket *packet = &data->packet;
3192   guint32 *nacks;
3193   guint n_nacks, i;
3194   guint8 *fci_data;
3195
3196   if (!source->send_nack)
3197     return;
3198
3199   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
3200     /* exit because the packet is full, will put next request in a
3201      * further packet */
3202     return;
3203
3204   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
3205   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
3206   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
3207
3208   nacks = rtp_source_get_nacks (source, &n_nacks);
3209   GST_DEBUG ("%u NACKs", n_nacks);
3210   if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
3211     return;
3212
3213   fci_data = gst_rtcp_packet_fb_get_fci (packet);
3214   for (i = 0; i < n_nacks; i++) {
3215     GST_WRITE_UINT32_BE (fci_data, nacks[i]);
3216     fci_data += 4;
3217     data->nacked_seqnums++;
3218   }
3219
3220   rtp_source_clear_nacks (source);
3221   data->may_suppress = FALSE;
3222 }
3223
3224 /* perform cleanup of sources that timed out */
3225 static void
3226 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
3227 {
3228   gboolean remove = FALSE;
3229   gboolean byetimeout = FALSE;
3230   gboolean sendertimeout = FALSE;
3231   gboolean is_sender, is_active;
3232   RTPSession *sess = data->sess;
3233   GstClockTime interval, binterval;
3234   GstClockTime btime;
3235
3236   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
3237
3238   /* check for outdated collisions */
3239   if (source->internal) {
3240     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
3241     rtp_source_timeout (source, data->current_time,
3242         data->running_time - sess->rtcp_feedback_retention_window);
3243   }
3244
3245   /* nothing else to do when without RTCP */
3246   if (data->interval == GST_CLOCK_TIME_NONE)
3247     return;
3248
3249   is_sender = RTP_SOURCE_IS_SENDER (source);
3250   is_active = RTP_SOURCE_IS_ACTIVE (source);
3251
3252   /* our own rtcp interval may have been forced low by secondary configuration,
3253    * while sender side may still operate with higher interval,
3254    * so do not just take our interval to decide on timing out sender,
3255    * but take (if data->interval <= 5 * GST_SECOND):
3256    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
3257    * where sender_interval is difference between last 2 received RTCP reports
3258    */
3259   if (data->interval >= 5 * GST_SECOND || source->internal) {
3260     binterval = data->interval;
3261   } else {
3262     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
3263         GST_TIME_ARGS (source->stats.prev_rtcptime),
3264         GST_TIME_ARGS (source->stats.last_rtcptime));
3265     /* if not received enough yet, fallback to larger default */
3266     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
3267       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
3268     else
3269       binterval = 5 * GST_SECOND;
3270     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
3271   }
3272   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
3273       GST_TIME_ARGS (binterval));
3274
3275   if (!source->internal && source->marked_bye) {
3276     /* if we received a BYE from the source, remove the source after some
3277      * time. */
3278     if (data->current_time > source->bye_time &&
3279         data->current_time - source->bye_time > sess->stats.bye_timeout) {
3280       GST_DEBUG ("removing BYE source %08x", source->ssrc);
3281       remove = TRUE;
3282       byetimeout = TRUE;
3283     }
3284   }
3285
3286   if (source->internal && source->sent_bye) {
3287     GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
3288     remove = TRUE;
3289   }
3290
3291   /* sources that were inactive for more than 5 times the deterministic reporting
3292    * interval get timed out. the min timeout is 5 seconds. */
3293   /* mind old time that might pre-date last time going to PLAYING */
3294   btime = MAX (source->last_activity, sess->start_time);
3295   if (data->current_time > btime) {
3296     interval = MAX (binterval * 5, 5 * GST_SECOND);
3297     if (data->current_time - btime > interval) {
3298       GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
3299           source->ssrc, GST_TIME_ARGS (btime));
3300       if (source->internal) {
3301         /* this is an internal source that is not using our suggested ssrc.
3302          * since there must be another source using this ssrc, we can remove
3303          * this one instead of making it a receiver forever */
3304         if (source->ssrc != sess->suggested_ssrc) {
3305           rtp_source_mark_bye (source, "timed out");
3306           /* do not schedule bye here, since we are inside the RTCP timeout
3307            * processing and scheduling bye will interfere with SR/RR sending */
3308         }
3309       } else {
3310         remove = TRUE;
3311       }
3312     }
3313   }
3314
3315   /* senders that did not send for a long time become a receiver, this also
3316    * holds for our own sources. */
3317   if (is_sender) {
3318     /* mind old time that might pre-date last time going to PLAYING */
3319     btime = MAX (source->last_rtp_activity, sess->start_time);
3320     if (data->current_time > btime) {
3321       interval = MAX (binterval * 2, 5 * GST_SECOND);
3322       if (data->current_time - btime > interval) {
3323         GST_DEBUG ("sender source %08x timed out and became receiver, last %"
3324             GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3325         sendertimeout = TRUE;
3326       }
3327     }
3328   }
3329
3330   if (remove) {
3331     sess->total_sources--;
3332     if (is_sender) {
3333       sess->stats.sender_sources--;
3334       if (source->internal)
3335         sess->stats.internal_sender_sources--;
3336     }
3337     if (is_active)
3338       sess->stats.active_sources--;
3339
3340     if (source->internal)
3341       sess->stats.internal_sources--;
3342
3343     if (byetimeout)
3344       on_bye_timeout (sess, source);
3345     else
3346       on_timeout (sess, source);
3347   } else {
3348     if (sendertimeout) {
3349       source->is_sender = FALSE;
3350       sess->stats.sender_sources--;
3351       if (source->internal)
3352         sess->stats.internal_sender_sources--;
3353
3354       on_sender_timeout (sess, source);
3355     }
3356     /* count how many source to report in this generation */
3357     if (((gint16) (source->generation - sess->generation)) <= 0)
3358       data->num_to_report++;
3359   }
3360   source->closing = remove;
3361 }
3362
3363 static void
3364 session_sdes (RTPSession * sess, ReportData * data)
3365 {
3366   GstRTCPPacket *packet = &data->packet;
3367   const GstStructure *sdes;
3368   gint i, n_fields;
3369   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3370
3371   /* add SDES packet */
3372   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
3373
3374   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
3375
3376   sdes = rtp_source_get_sdes_struct (data->source);
3377
3378   /* add all fields in the structure, the order is not important. */
3379   n_fields = gst_structure_n_fields (sdes);
3380   for (i = 0; i < n_fields; ++i) {
3381     const gchar *field;
3382     const gchar *value;
3383     GstRTCPSDESType type;
3384
3385     field = gst_structure_nth_field_name (sdes, i);
3386     if (field == NULL)
3387       continue;
3388     value = gst_structure_get_string (sdes, field);
3389     if (value == NULL)
3390       continue;
3391     type = gst_rtcp_sdes_name_to_type (field);
3392
3393     /* Early packets are minimal and only include the CNAME */
3394     if (data->is_early && type != GST_RTCP_SDES_CNAME)
3395       continue;
3396
3397     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
3398       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
3399           (const guint8 *) value);
3400     } else if (type == GST_RTCP_SDES_PRIV) {
3401       gsize prefix_len;
3402       gsize value_len;
3403       gsize data_len;
3404       guint8 data[256];
3405
3406       /* don't accept entries that are too big */
3407       prefix_len = strlen (field);
3408       if (prefix_len > 255)
3409         continue;
3410       value_len = strlen (value);
3411       if (value_len > 255)
3412         continue;
3413       data_len = 1 + prefix_len + value_len;
3414       if (data_len > 255)
3415         continue;
3416
3417       data[0] = prefix_len;
3418       memcpy (&data[1], field, prefix_len);
3419       memcpy (&data[1 + prefix_len], value, value_len);
3420
3421       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
3422     }
3423   }
3424
3425   data->has_sdes = TRUE;
3426 }
3427
3428 /* schedule a BYE packet */
3429 static void
3430 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
3431 {
3432   GstRTCPPacket *packet = &data->packet;
3433   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3434
3435   /* add SDES */
3436   session_sdes (sess, data);
3437   /* add a BYE packet */
3438   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
3439   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
3440   if (source->bye_reason)
3441     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
3442
3443   /* we have a BYE packet now */
3444   source->sent_bye = TRUE;
3445 }
3446
3447 static gboolean
3448 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
3449 {
3450   GstClockTime new_send_time, elapsed;
3451   GstClockTime interval;
3452   RTPSessionStats *stats;
3453
3454   if (sess->scheduled_bye)
3455     stats = &sess->bye_stats;
3456   else
3457     stats = &sess->stats;
3458
3459   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3460     data->is_early = TRUE;
3461   else
3462     data->is_early = FALSE;
3463
3464   if (data->is_early && sess->next_early_rtcp_time < current_time) {
3465     GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
3466         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
3467         GST_TIME_ARGS (current_time));
3468     goto early;
3469   }
3470
3471   /* no need to check yet */
3472   if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
3473       sess->next_rtcp_check_time > current_time) {
3474     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
3475         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
3476         GST_TIME_ARGS (current_time));
3477     return FALSE;
3478   }
3479
3480 early:
3481   /* get elapsed time since we last reported */
3482   elapsed = current_time - sess->last_rtcp_send_time;
3483
3484   /* take interval and add jitter */
3485   interval = data->interval;
3486   if (interval != GST_CLOCK_TIME_NONE)
3487     interval = rtp_stats_add_rtcp_jitter (stats, interval);
3488
3489   /* perform forward reconsideration */
3490   if (interval != GST_CLOCK_TIME_NONE) {
3491     GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
3492         GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
3493     new_send_time = interval + sess->last_rtcp_send_time;
3494   } else {
3495     new_send_time = sess->last_rtcp_send_time;
3496   }
3497
3498   if (!data->is_early) {
3499     /* check if reconsideration */
3500     if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
3501       GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
3502           GST_TIME_ARGS (new_send_time));
3503       /* store new check time */
3504       sess->next_rtcp_check_time = new_send_time;
3505       return FALSE;
3506     }
3507     sess->next_rtcp_check_time = current_time + interval;
3508   } else if (interval != GST_CLOCK_TIME_NONE) {
3509     /* Apply the rules from RFC 4585 section 3.5.3 */
3510     if (stats->min_interval != 0 && !sess->first_rtcp) {
3511       GstClockTime T_rr_current_interval =
3512           g_random_double_range (0.5, 1.5) * stats->min_interval;
3513
3514       /* This will caused the RTCP to be suppressed if no FB packets are added */
3515       if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) {
3516         GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
3517             " last: %" GST_TIME_FORMAT
3518             " + T_rr_current_interval: %" GST_TIME_FORMAT
3519             " >  new_send_time: %" GST_TIME_FORMAT,
3520             GST_TIME_ARGS (stats->min_interval),
3521             GST_TIME_ARGS (sess->last_rtcp_send_time),
3522             GST_TIME_ARGS (T_rr_current_interval),
3523             GST_TIME_ARGS (new_send_time));
3524         data->may_suppress = TRUE;
3525       }
3526     }
3527   }
3528
3529   GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
3530       GST_TIME_ARGS (new_send_time));
3531
3532   return TRUE;
3533 }
3534
3535 static void
3536 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
3537 {
3538   g_hash_table_insert (hash_table, key, g_object_ref (source));
3539 }
3540
3541 static gboolean
3542 remove_closing_sources (const gchar * key, RTPSource * source,
3543     ReportData * data)
3544 {
3545   if (source->closing)
3546     return TRUE;
3547
3548   if (source->send_fir)
3549     data->have_fir = TRUE;
3550   if (source->send_pli)
3551     data->have_pli = TRUE;
3552   if (source->send_nack)
3553     data->have_nack = TRUE;
3554
3555   return FALSE;
3556 }
3557
3558 static void
3559 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
3560 {
3561   RTPSession *sess = data->sess;
3562   gboolean is_bye = FALSE;
3563   ReportOutput *output;
3564
3565   /* only generate RTCP for active internal sources */
3566   if (!source->internal || source->sent_bye)
3567     return;
3568
3569   /* ignore other sources when we do the timeout after a scheduled BYE */
3570   if (sess->scheduled_bye && !source->marked_bye)
3571     return;
3572
3573   data->source = source;
3574
3575   /* open packet */
3576   session_start_rtcp (sess, data);
3577
3578   if (source->marked_bye) {
3579     /* send BYE */
3580     make_source_bye (sess, source, data);
3581     is_bye = TRUE;
3582   } else if (!data->is_early) {
3583     /* loop over all known sources and add report blocks. If we are early, we
3584      * just make a minimal RTCP packet and skip this step */
3585     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3586         (GHFunc) session_report_blocks, data);
3587   }
3588   if (!data->has_sdes)
3589     session_sdes (sess, data);
3590
3591   if (data->have_fir)
3592     session_fir (sess, data);
3593
3594   if (data->have_pli)
3595     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3596         (GHFunc) session_pli, data);
3597
3598   if (data->have_nack)
3599     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3600         (GHFunc) session_nack, data);
3601
3602   gst_rtcp_buffer_unmap (&data->rtcpbuf);
3603
3604   output = g_slice_new (ReportOutput);
3605   output->source = g_object_ref (source);
3606   output->is_bye = is_bye;
3607   output->buffer = data->rtcp;
3608   /* queue the RTCP packet to push later */
3609   g_queue_push_tail (&data->output, output);
3610 }
3611
3612 static void
3613 update_generation (const gchar * key, RTPSource * source, ReportData * data)
3614 {
3615   RTPSession *sess = data->sess;
3616
3617   if (g_hash_table_size (source->reported_in_sr_of) >=
3618       sess->stats.internal_sources) {
3619     /* source is reported, move to next generation */
3620     source->generation = sess->generation + 1;
3621     g_hash_table_remove_all (source->reported_in_sr_of);
3622
3623     GST_LOG ("reported source %x, new generation: %d", source->ssrc,
3624         source->generation);
3625
3626     /* if we reported all sources in this generation, move to next */
3627     if (--data->num_to_report == 0) {
3628       sess->generation++;
3629       GST_DEBUG ("all reported, generation now %u", sess->generation);
3630     }
3631   }
3632 }
3633
3634 /**
3635  * rtp_session_on_timeout:
3636  * @sess: an #RTPSession
3637  * @current_time: the current system time
3638  * @ntpnstime: the current NTP time in nanoseconds
3639  * @running_time: the current running_time of the pipeline
3640  *
3641  * Perform maintenance actions after the timeout obtained with
3642  * rtp_session_next_timeout() expired.
3643  *
3644  * This function will perform timeouts of receivers and senders, send a BYE
3645  * packet or generate RTCP packets with current session stats.
3646  *
3647  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
3648  * times, for each packet that should be processed.
3649  *
3650  * Returns: a #GstFlowReturn.
3651  */
3652 GstFlowReturn
3653 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
3654     guint64 ntpnstime, GstClockTime running_time)
3655 {
3656   GstFlowReturn result = GST_FLOW_OK;
3657   ReportData data = { GST_RTCP_BUFFER_INIT };
3658   GHashTable *table_copy;
3659   ReportOutput *output;
3660
3661   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3662
3663   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
3664       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3665       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
3666
3667   data.sess = sess;
3668   data.current_time = current_time;
3669   data.ntpnstime = ntpnstime;
3670   data.running_time = running_time;
3671   data.num_to_report = 0;
3672   data.may_suppress = FALSE;
3673   data.nacked_seqnums = 0;
3674   g_queue_init (&data.output);
3675
3676   RTP_SESSION_LOCK (sess);
3677   /* get a new interval, we need this for various cleanups etc */
3678   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
3679
3680   GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
3681
3682   /* we need an internal source now */
3683   if (sess->stats.internal_sources == 0) {
3684     RTPSource *source;
3685     gboolean created;
3686
3687     source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
3688         current_time);
3689     g_object_unref (source);
3690   }
3691
3692   sess->conflicting_addresses =
3693       timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
3694
3695   /* Make a local copy of the hashtable. We need to do this because the
3696    * cleanup stage below releases the session lock. */
3697   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
3698       (GDestroyNotify) g_object_unref);
3699   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3700       (GHFunc) clone_ssrcs_hashtable, table_copy);
3701
3702   /* Clean up the session, mark the source for removing, this might release the
3703    * session lock. */
3704   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
3705   g_hash_table_destroy (table_copy);
3706
3707   /* Now remove the marked sources */
3708   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
3709       (GHRFunc) remove_closing_sources, &data);
3710
3711   /* update point-to-point status */
3712   session_update_ptp (sess);
3713
3714   /* see if we need to generate SR or RR packets */
3715   if (!is_rtcp_time (sess, current_time, &data))
3716     goto done;
3717
3718   GST_DEBUG ("doing RTCP generation %u for %u sources, early %d",
3719       sess->generation, data.num_to_report, data.is_early);
3720
3721   /* generate RTCP for all internal sources */
3722   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3723       (GHFunc) generate_rtcp, &data);
3724
3725   /* update the generation for all the sources that have been reported */
3726   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3727       (GHFunc) update_generation, &data);
3728
3729   /* we keep track of the last report time in order to timeout inactive
3730    * receivers or senders */
3731   if (!data.is_early && !data.may_suppress)
3732     sess->last_rtcp_send_time = data.current_time;
3733   sess->first_rtcp = FALSE;
3734   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
3735   sess->scheduled_bye = FALSE;
3736
3737   /*  RFC 4585 section 3.5.2 step 6 */
3738   if (!data.is_early) {
3739     sess->allow_early = TRUE;
3740   }
3741
3742 done:
3743   RTP_SESSION_UNLOCK (sess);
3744
3745   /* push out the RTCP packets */
3746   while ((output = g_queue_pop_head (&data.output))) {
3747     gboolean do_not_suppress;
3748     GstBuffer *buffer = output->buffer;
3749     RTPSource *source = output->source;
3750
3751     /* Give the user a change to add its own packet */
3752     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
3753         buffer, data.is_early, &do_not_suppress);
3754
3755     if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
3756       guint packet_size;
3757
3758       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
3759
3760       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
3761       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
3762           sess->stats.avg_rtcp_packet_size, packet_size);
3763       result =
3764           sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
3765           sess->send_rtcp_user_data);
3766       sess->stats.nacks_sent += data.nacked_seqnums;
3767     } else {
3768       GST_DEBUG ("freeing packet callback: %p"
3769           " do_not_suppress: %d may_suppress: %d",
3770           sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
3771       sess->stats.nacks_dropped += data.nacked_seqnums;
3772       gst_buffer_unref (buffer);
3773     }
3774     g_object_unref (source);
3775     g_slice_free (ReportOutput, output);
3776   }
3777   return result;
3778 }
3779
3780 /**
3781  * rtp_session_request_early_rtcp:
3782  * @sess: an #RTPSession
3783  * @current_time: the current system time
3784  * @max_delay: maximum delay
3785  *
3786  * Request transmission of early RTCP
3787  *
3788  * Returns: %TRUE if the related RTCP can be scheduled.
3789  */
3790 gboolean
3791 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
3792     GstClockTime max_delay)
3793 {
3794   GstClockTime T_dither_max, T_rr;
3795   gboolean ret;
3796
3797   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
3798
3799   RTP_SESSION_LOCK (sess);
3800
3801   /* Check if already requested */
3802   /*  RFC 4585 section 3.5.2 step 2 */
3803   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
3804     GST_LOG_OBJECT (sess, "already have next early rtcp time");
3805     ret = TRUE;
3806     goto end;
3807   }
3808
3809   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
3810     GST_LOG_OBJECT (sess, "no next RTCP check time");
3811     ret = FALSE;
3812     goto end;
3813   }
3814
3815   T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
3816
3817   /*  RFC 4585 section 3.5.2 step 2b */
3818   /* If the total sources is <=2, then there is only us and one peer */
3819   /* When there is one auxiliary stream the session can still do point
3820    * to point.
3821    */
3822   if (sess->is_doing_ptp) {
3823     T_dither_max = 0;
3824   } else {
3825     /* Divide by 2 because l = 0.5 */
3826     T_dither_max = T_rr;
3827     T_dither_max /= 2;
3828   }
3829
3830   /*  RFC 4585 section 3.5.2 step 3 */
3831   if (current_time + T_dither_max > sess->next_rtcp_check_time) {
3832     GST_LOG_OBJECT (sess, "don't send because of dither");
3833     ret = FALSE;
3834     goto end;
3835   }
3836
3837   /*  RFC 4585 section 3.5.2 step 4a */
3838   if (sess->allow_early == FALSE) {
3839     /* Ignore the request a scheduled packet will be in time anyway */
3840     if (current_time + max_delay > sess->next_rtcp_check_time) {
3841       GST_LOG_OBJECT (sess,
3842           "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
3843           " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3844           GST_TIME_ARGS (max_delay),
3845           GST_TIME_ARGS (sess->next_rtcp_check_time));
3846       ret = TRUE;
3847     } else {
3848       GST_LOG_OBJECT (sess, "can't allow early feedback");
3849       ret = FALSE;
3850     }
3851     goto end;
3852   }
3853
3854   /*  RFC 4585 section 3.5.2 step 4b */
3855   if (T_dither_max) {
3856     /* Schedule an early transmission later */
3857     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
3858         current_time;
3859   } else {
3860     /* If no dithering, schedule it for NOW */
3861     sess->next_early_rtcp_time = current_time;
3862   }
3863
3864   /*  RFC 4585 section 3.5.2 step 6 */
3865   sess->allow_early = FALSE;
3866   /* Delay next regular RTCP packet to not exceed the short-term
3867    * RTCP bandwidth when using early feedback as compared to
3868    * without */
3869   sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr;
3870   sess->last_rtcp_send_time += T_rr;
3871
3872   GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT,
3873       GST_TIME_ARGS (sess->next_early_rtcp_time));
3874   RTP_SESSION_UNLOCK (sess);
3875
3876   /* notify app of need to send packet early
3877    * and therefore of timeout change */
3878   if (sess->callbacks.reconsider)
3879     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3880
3881   return TRUE;
3882
3883 end:
3884
3885   RTP_SESSION_UNLOCK (sess);
3886
3887   return ret;
3888 }
3889
3890 static gboolean
3891 rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
3892 {
3893   GstClockTime now;
3894
3895   if (!sess->callbacks.send_rtcp)
3896     return FALSE;
3897
3898   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
3899
3900   return rtp_session_request_early_rtcp (sess, now, max_delay);
3901 }
3902
3903 gboolean
3904 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
3905     gboolean fir, gint count)
3906 {
3907   RTPSource *src;
3908
3909   if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
3910     GST_DEBUG ("FIR/PLI not sent");
3911     return FALSE;
3912   }
3913
3914   RTP_SESSION_LOCK (sess);
3915   src = find_source (sess, ssrc);
3916   if (src == NULL)
3917     goto no_source;
3918
3919   if (fir) {
3920     src->send_pli = FALSE;
3921     src->send_fir = TRUE;
3922
3923     if (count == -1 || count != src->last_fir_count)
3924       src->current_send_fir_seqnum++;
3925     src->last_fir_count = count;
3926   } else if (!src->send_fir) {
3927     src->send_pli = TRUE;
3928   }
3929   RTP_SESSION_UNLOCK (sess);
3930
3931   return TRUE;
3932
3933   /* ERRORS */
3934 no_source:
3935   {
3936     RTP_SESSION_UNLOCK (sess);
3937     return FALSE;
3938   }
3939 }
3940
3941 /**
3942  * rtp_session_request_nack:
3943  * @sess: a #RTPSession
3944  * @ssrc: the SSRC
3945  * @seqnum: the missing seqnum
3946  * @max_delay: max delay to request NACK
3947  *
3948  * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
3949  *
3950  * Returns: %TRUE if the NACK feedback could be scheduled
3951  */
3952 gboolean
3953 rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
3954     GstClockTime max_delay)
3955 {
3956   RTPSource *source;
3957
3958   if (!rtp_session_send_rtcp (sess, max_delay)) {
3959     GST_DEBUG ("NACK not sent");
3960     return FALSE;
3961   }
3962
3963   RTP_SESSION_LOCK (sess);
3964   source = find_source (sess, ssrc);
3965   if (source == NULL)
3966     goto no_source;
3967
3968   GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
3969   rtp_source_register_nack (source, seqnum);
3970   RTP_SESSION_UNLOCK (sess);
3971
3972   return TRUE;
3973
3974   /* ERRORS */
3975 no_source:
3976   {
3977     RTP_SESSION_UNLOCK (sess);
3978     return FALSE;
3979   }
3980 }