session: add FIR and PLI like other RTCP packets
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
21  * with newer GLib versions (>= 2.31.0) */
22 #define GLIB_DISABLE_DEPRECATION_WARNINGS
23
24 #include <string.h>
25
26 #include <gst/rtp/gstrtpbuffer.h>
27 #include <gst/rtp/gstrtcpbuffer.h>
28
29 #include <gst/glib-compat-private.h>
30
31 #include "gstrtpbin-marshal.h"
32 #include "rtpsession.h"
33
34 GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
35 #define GST_CAT_DEFAULT rtp_session_debug
36
37 /* signals and args */
38 enum
39 {
40   SIGNAL_GET_SOURCE_BY_SSRC,
41   SIGNAL_ON_NEW_SSRC,
42   SIGNAL_ON_SSRC_COLLISION,
43   SIGNAL_ON_SSRC_VALIDATED,
44   SIGNAL_ON_SSRC_ACTIVE,
45   SIGNAL_ON_SSRC_SDES,
46   SIGNAL_ON_BYE_SSRC,
47   SIGNAL_ON_BYE_TIMEOUT,
48   SIGNAL_ON_TIMEOUT,
49   SIGNAL_ON_SENDER_TIMEOUT,
50   SIGNAL_ON_SENDING_RTCP,
51   SIGNAL_ON_FEEDBACK_RTCP,
52   SIGNAL_SEND_RTCP,
53   LAST_SIGNAL
54 };
55
56 #define DEFAULT_INTERNAL_SOURCE      NULL
57 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
58 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
59 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
60 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
61 #define DEFAULT_RTCP_MTU             1400
62 #define DEFAULT_SDES                 NULL
63 #define DEFAULT_NUM_SOURCES          0
64 #define DEFAULT_NUM_ACTIVE_SOURCES   0
65 #define DEFAULT_SOURCES              NULL
66 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
67 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
68 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
69 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
70
71 enum
72 {
73   PROP_0,
74   PROP_INTERNAL_SSRC,
75   PROP_INTERNAL_SOURCE,
76   PROP_BANDWIDTH,
77   PROP_RTCP_FRACTION,
78   PROP_RTCP_RR_BANDWIDTH,
79   PROP_RTCP_RS_BANDWIDTH,
80   PROP_RTCP_MTU,
81   PROP_SDES,
82   PROP_NUM_SOURCES,
83   PROP_NUM_ACTIVE_SOURCES,
84   PROP_SOURCES,
85   PROP_FAVOR_NEW,
86   PROP_RTCP_MIN_INTERVAL,
87   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
88   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
89   PROP_PROBATION,
90   PROP_LAST
91 };
92
93 /* update average packet size */
94 #define INIT_AVG(avg, val) \
95    (avg) = (val);
96 #define UPDATE_AVG(avg, val)            \
97   if ((avg) == 0)                       \
98    (avg) = (val);                       \
99   else                                  \
100    (avg) = ((val) + (15 * (avg))) >> 4;
101
102
103 /* The number RTCP intervals after which to timeout entries in the
104  * collision table
105  */
106 #define RTCP_INTERVAL_COLLISION_TIMEOUT 10
107
108 /* GObject vmethods */
109 static void rtp_session_finalize (GObject * object);
110 static void rtp_session_set_property (GObject * object, guint prop_id,
111     const GValue * value, GParamSpec * pspec);
112 static void rtp_session_get_property (GObject * object, guint prop_id,
113     GValue * value, GParamSpec * pspec);
114
115 static void rtp_session_send_rtcp (RTPSession * sess,
116     GstClockTimeDiff max_delay);
117
118
119 static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
120
121 G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
122
123 static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
124 static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
125     gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
126 static RTPSource *obtain_internal_source (RTPSession * sess,
127     guint32 ssrc, gboolean * created);
128 static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
129     GstClockTime current_time);
130 static GstClockTime calculate_rtcp_interval (RTPSession * sess,
131     gboolean deterministic, gboolean first);
132
133 static gboolean
134 accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
135     const GValue * handler_return, gpointer data)
136 {
137   if (g_value_get_boolean (handler_return))
138     g_value_set_boolean (return_accu, TRUE);
139
140   return TRUE;
141 }
142
143 static void
144 rtp_session_class_init (RTPSessionClass * klass)
145 {
146   GObjectClass *gobject_class;
147
148   gobject_class = (GObjectClass *) klass;
149
150   gobject_class->finalize = rtp_session_finalize;
151   gobject_class->set_property = rtp_session_set_property;
152   gobject_class->get_property = rtp_session_get_property;
153
154   /**
155    * RTPSession::get-source-by-ssrc:
156    * @session: the object which received the signal
157    * @ssrc: the SSRC of the RTPSource
158    *
159    * Request the #RTPSource object with SSRC @ssrc in @session.
160    */
161   rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
162       g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
163       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
164           get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
165       RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
166
167   /**
168    * RTPSession::on-new-ssrc:
169    * @session: the object which received the signal
170    * @src: the new RTPSource
171    *
172    * Notify of a new SSRC that entered @session.
173    */
174   rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
175       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
176       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
177       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
178       RTP_TYPE_SOURCE);
179   /**
180    * RTPSession::on-ssrc-collision:
181    * @session: the object which received the signal
182    * @src: the #RTPSource that caused a collision
183    *
184    * Notify when we have an SSRC collision
185    */
186   rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
187       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
188       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
189       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
190       RTP_TYPE_SOURCE);
191   /**
192    * RTPSession::on-ssrc-validated:
193    * @session: the object which received the signal
194    * @src: the new validated RTPSource
195    *
196    * Notify of a new SSRC that became validated.
197    */
198   rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
199       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
200       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
201       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
202       RTP_TYPE_SOURCE);
203   /**
204    * RTPSession::on-ssrc-active:
205    * @session: the object which received the signal
206    * @src: the active RTPSource
207    *
208    * Notify of a SSRC that is active, i.e., sending RTCP.
209    */
210   rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
211       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
212       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
213       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
214       RTP_TYPE_SOURCE);
215   /**
216    * RTPSession::on-ssrc-sdes:
217    * @session: the object which received the signal
218    * @src: the RTPSource
219    *
220    * Notify that a new SDES was received for SSRC.
221    */
222   rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
223       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
224       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
225       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
226       RTP_TYPE_SOURCE);
227   /**
228    * RTPSession::on-bye-ssrc:
229    * @session: the object which received the signal
230    * @src: the RTPSource that went away
231    *
232    * Notify of an SSRC that became inactive because of a BYE packet.
233    */
234   rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
235       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
237       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
238       RTP_TYPE_SOURCE);
239   /**
240    * RTPSession::on-bye-timeout:
241    * @session: the object which received the signal
242    * @src: the RTPSource that timed out
243    *
244    * Notify of an SSRC that has timed out because of BYE
245    */
246   rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
247       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
248       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
249       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
250       RTP_TYPE_SOURCE);
251   /**
252    * RTPSession::on-timeout:
253    * @session: the object which received the signal
254    * @src: the RTPSource that timed out
255    *
256    * Notify of an SSRC that has timed out
257    */
258   rtp_session_signals[SIGNAL_ON_TIMEOUT] =
259       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
260       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
261       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
262       RTP_TYPE_SOURCE);
263   /**
264    * RTPSession::on-sender-timeout:
265    * @session: the object which received the signal
266    * @src: the RTPSource that timed out
267    *
268    * Notify of an SSRC that was a sender but timed out and became a receiver.
269    */
270   rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
271       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
272       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
273       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
274       RTP_TYPE_SOURCE);
275
276   /**
277    * RTPSession::on-sending-rtcp
278    * @session: the object which received the signal
279    * @buffer: the #GstBuffer containing the RTCP packet about to be sent
280    * @early: %TRUE if the packet is early, %FALSE if it is regular
281    *
282    * This signal is emitted before sending an RTCP packet, it can be used
283    * to add extra RTCP Packets.
284    *
285    * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
286    * if suppressing it is acceptable
287    */
288   rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
289       g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
290       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
291       accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__BOXED_BOOLEAN,
292       G_TYPE_BOOLEAN, 2, GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE,
293       G_TYPE_BOOLEAN);
294
295   /**
296    * RTPSession::on-feedback-rtcp:
297    * @session: the object which received the signal
298    * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
299    *  %GST_RTCP_TYPE_RTPFB
300    * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
301    * @sender_ssrc: The SSRC of the sender
302    * @media_ssrc: The SSRC of the media this refers to
303    * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
304    * there was no FCI
305    *
306    * Notify that a RTCP feedback packet has been received
307    */
308   rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
309       g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
310       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
311       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_BOXED,
312       G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
313       GST_TYPE_BUFFER);
314
315   /**
316    * RTPSession::send-rtcp:
317    * @session: the object which received the signal
318    * @max_delay: The maximum delay after which the feedback will not be useful
319    *  anymore
320    *
321    * Requests that the #RTPSession initiate a new RTCP packet as soon as
322    * possible within the requested delay.
323    */
324   rtp_session_signals[SIGNAL_SEND_RTCP] =
325       g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
326       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
327       G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
328       gst_rtp_bin_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64);
329
330   g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
331       g_param_spec_uint ("internal-ssrc", "Internal SSRC",
332           "The internal SSRC used for the session (deprecated)",
333           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334
335   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
336       g_param_spec_object ("internal-source", "Internal Source",
337           "The internal source element of the session (deprecated)",
338           RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
339
340   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
341       g_param_spec_double ("bandwidth", "Bandwidth",
342           "The bandwidth of the session (0 for auto-discover)",
343           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
344           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345
346   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
347       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
348           "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
349           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
350           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
351
352   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
353       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
354           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
355           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
356           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357
358   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
359       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
360           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
361           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363
364   g_object_class_install_property (gobject_class, PROP_RTCP_MTU,
365       g_param_spec_uint ("rtcp-mtu", "RTCP MTU",
366           "The maximum size of the RTCP packets",
367           16, G_MAXINT16, DEFAULT_RTCP_MTU,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_SDES,
371       g_param_spec_boxed ("sdes", "SDES",
372           "The SDES items of this session",
373           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374
375   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
376       g_param_spec_uint ("num-sources", "Num Sources",
377           "The number of sources in the session", 0, G_MAXUINT,
378           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
379
380   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
381       g_param_spec_uint ("num-active-sources", "Num Active Sources",
382           "The number of active sources in the session", 0, G_MAXUINT,
383           DEFAULT_NUM_ACTIVE_SOURCES,
384           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
385   /**
386    * RTPSource::sources
387    *
388    * Get a GValue Array of all sources in the session.
389    *
390    * <example>
391    * <title>Getting the #RTPSources of a session
392    * <programlisting>
393    * {
394    *   GValueArray *arr;
395    *   GValue *val;
396    *   guint i;
397    *
398    *   g_object_get (sess, "sources", &arr, NULL);
399    *
400    *   for (i = 0; i < arr->n_values; i++) {
401    *     RTPSource *source;
402    *
403    *     val = g_value_array_get_nth (arr, i);
404    *     source = g_value_get_object (val);
405    *   }
406    *   g_value_array_free (arr);
407    * }
408    * </programlisting>
409    * </example>
410    */
411   g_object_class_install_property (gobject_class, PROP_SOURCES,
412       g_param_spec_boxed ("sources", "Sources",
413           "An array of all known sources in the session",
414           G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
415
416   g_object_class_install_property (gobject_class, PROP_FAVOR_NEW,
417       g_param_spec_boolean ("favor-new", "Favor new sources",
418           "Resolve SSRC conflict in favor of new sources", FALSE,
419           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
420
421   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
422       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
423           "Minimum interval between Regular RTCP packet (in ns)",
424           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
425           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
426
427   g_object_class_install_property (gobject_class,
428       PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
429       g_param_spec_uint64 ("rtcp-feedback-retention-window",
430           "RTCP Feedback retention window",
431           "Duration during which RTCP Feedback packets are retained (in ns)",
432           0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
433           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434
435   g_object_class_install_property (gobject_class,
436       PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
437       g_param_spec_uint ("rtcp-immediate-feedback-threshold",
438           "RTCP Immediate Feedback threshold",
439           "The maximum number of members of a RTP session for which immediate"
440           " feedback is used",
441           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
442           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
443
444   g_object_class_install_property (gobject_class, PROP_PROBATION,
445       g_param_spec_uint ("probation", "Number of probations",
446           "Consecutive packet sequence numbers to accept the source",
447           0, G_MAXUINT, DEFAULT_PROBATION,
448           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
449
450   klass->get_source_by_ssrc =
451       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
452   klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
453
454   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
455 }
456
457 static void
458 rtp_session_init (RTPSession * sess)
459 {
460   gint i;
461   gchar *str;
462
463   g_mutex_init (&sess->lock);
464   sess->key = g_random_int ();
465   sess->mask_idx = 0;
466   sess->mask = 0;
467
468   for (i = 0; i < 32; i++) {
469     sess->ssrcs[i] =
470         g_hash_table_new_full (NULL, NULL, NULL,
471         (GDestroyNotify) g_object_unref);
472   }
473
474   rtp_stats_init_defaults (&sess->stats);
475   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
476   rtp_stats_set_min_interval (&sess->stats,
477       (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
478
479   sess->recalc_bandwidth = TRUE;
480   sess->bandwidth = DEFAULT_BANDWIDTH;
481   sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION;
482   sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
483   sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
484
485   /* default UDP header length */
486   sess->header_len = 28;
487   sess->mtu = DEFAULT_RTCP_MTU;
488
489   sess->probation = DEFAULT_PROBATION;
490
491   /* some default SDES entries */
492   sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
493
494   /* we do not want to leak details like the username or hostname here */
495   str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
496   gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
497   g_free (str);
498
499 #if 0
500   /* we do not want to leak the user's real name here */
501   str = g_strdup_printf ("Anon%u", g_random_int ());
502   gst_structure_set (sdes, "name", G_TYPE_STRING, str, NULL);
503   g_free (str);
504 #endif
505
506   gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
507
508   /* this is the SSRC we suggest */
509   sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
510
511   sess->first_rtcp = TRUE;
512   sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
513
514   sess->allow_early = TRUE;
515   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
516   sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
517   sess->rtcp_immediate_feedback_threshold =
518       DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
519
520   sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
521 }
522
523 static void
524 rtp_session_finalize (GObject * object)
525 {
526   RTPSession *sess;
527   gint i;
528
529   sess = RTP_SESSION_CAST (object);
530
531   gst_structure_free (sess->sdes);
532
533   for (i = 0; i < 32; i++)
534     g_hash_table_destroy (sess->ssrcs[i]);
535
536   g_mutex_clear (&sess->lock);
537
538   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
539 }
540
541 static void
542 copy_source (gpointer key, RTPSource * source, GValueArray * arr)
543 {
544   GValue value = { 0 };
545
546   g_value_init (&value, RTP_TYPE_SOURCE);
547   g_value_take_object (&value, source);
548   /* copies the value */
549   g_value_array_append (arr, &value);
550 }
551
552 static GValueArray *
553 rtp_session_create_sources (RTPSession * sess)
554 {
555   GValueArray *res;
556   guint size;
557
558   RTP_SESSION_LOCK (sess);
559   /* get number of elements in the table */
560   size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
561   /* create the result value array */
562   res = g_value_array_new (size);
563
564   /* and copy all values into the array */
565   g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) copy_source, res);
566   RTP_SESSION_UNLOCK (sess);
567
568   return res;
569 }
570
571 static void
572 rtp_session_set_property (GObject * object, guint prop_id,
573     const GValue * value, GParamSpec * pspec)
574 {
575   RTPSession *sess;
576
577   sess = RTP_SESSION (object);
578
579   switch (prop_id) {
580     case PROP_INTERNAL_SSRC:
581       break;
582     case PROP_BANDWIDTH:
583       RTP_SESSION_LOCK (sess);
584       sess->bandwidth = g_value_get_double (value);
585       sess->recalc_bandwidth = TRUE;
586       RTP_SESSION_UNLOCK (sess);
587       break;
588     case PROP_RTCP_FRACTION:
589       RTP_SESSION_LOCK (sess);
590       sess->rtcp_bandwidth = g_value_get_double (value);
591       sess->recalc_bandwidth = TRUE;
592       RTP_SESSION_UNLOCK (sess);
593       break;
594     case PROP_RTCP_RR_BANDWIDTH:
595       RTP_SESSION_LOCK (sess);
596       sess->rtcp_rr_bandwidth = g_value_get_int (value);
597       sess->recalc_bandwidth = TRUE;
598       RTP_SESSION_UNLOCK (sess);
599       break;
600     case PROP_RTCP_RS_BANDWIDTH:
601       RTP_SESSION_LOCK (sess);
602       sess->rtcp_rs_bandwidth = g_value_get_int (value);
603       sess->recalc_bandwidth = TRUE;
604       RTP_SESSION_UNLOCK (sess);
605       break;
606     case PROP_RTCP_MTU:
607       sess->mtu = g_value_get_uint (value);
608       break;
609     case PROP_SDES:
610       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
611       break;
612     case PROP_FAVOR_NEW:
613       sess->favor_new = g_value_get_boolean (value);
614       break;
615     case PROP_RTCP_MIN_INTERVAL:
616       rtp_stats_set_min_interval (&sess->stats,
617           (gdouble) g_value_get_uint64 (value) / GST_SECOND);
618       /* trigger reconsideration */
619       RTP_SESSION_LOCK (sess);
620       sess->next_rtcp_check_time = 0;
621       RTP_SESSION_UNLOCK (sess);
622       if (sess->callbacks.reconsider)
623         sess->callbacks.reconsider (sess, sess->reconsider_user_data);
624       break;
625     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
626       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
627       break;
628     case PROP_PROBATION:
629       sess->probation = g_value_get_uint (value);
630       break;
631     default:
632       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
633       break;
634   }
635 }
636
637 static void
638 rtp_session_get_property (GObject * object, guint prop_id,
639     GValue * value, GParamSpec * pspec)
640 {
641   RTPSession *sess;
642
643   sess = RTP_SESSION (object);
644
645   switch (prop_id) {
646     case PROP_INTERNAL_SSRC:
647       g_value_set_uint (value, rtp_session_suggest_ssrc (sess));
648       break;
649     case PROP_INTERNAL_SOURCE:
650       /* FIXME, return a random source */
651       g_value_set_object (value, NULL);
652       break;
653     case PROP_BANDWIDTH:
654       g_value_set_double (value, sess->bandwidth);
655       break;
656     case PROP_RTCP_FRACTION:
657       g_value_set_double (value, sess->rtcp_bandwidth);
658       break;
659     case PROP_RTCP_RR_BANDWIDTH:
660       g_value_set_int (value, sess->rtcp_rr_bandwidth);
661       break;
662     case PROP_RTCP_RS_BANDWIDTH:
663       g_value_set_int (value, sess->rtcp_rs_bandwidth);
664       break;
665     case PROP_RTCP_MTU:
666       g_value_set_uint (value, sess->mtu);
667       break;
668     case PROP_SDES:
669       g_value_take_boxed (value, rtp_session_get_sdes_struct (sess));
670       break;
671     case PROP_NUM_SOURCES:
672       g_value_set_uint (value, rtp_session_get_num_sources (sess));
673       break;
674     case PROP_NUM_ACTIVE_SOURCES:
675       g_value_set_uint (value, rtp_session_get_num_active_sources (sess));
676       break;
677     case PROP_SOURCES:
678       g_value_take_boxed (value, rtp_session_create_sources (sess));
679       break;
680     case PROP_FAVOR_NEW:
681       g_value_set_boolean (value, sess->favor_new);
682       break;
683     case PROP_RTCP_MIN_INTERVAL:
684       g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
685       break;
686     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
687       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
688       break;
689     case PROP_PROBATION:
690       g_value_set_uint (value, sess->probation);
691       break;
692     default:
693       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
694       break;
695   }
696 }
697
698 static void
699 on_new_ssrc (RTPSession * sess, RTPSource * source)
700 {
701   g_object_ref (source);
702   RTP_SESSION_UNLOCK (sess);
703   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
704   RTP_SESSION_LOCK (sess);
705   g_object_unref (source);
706 }
707
708 static void
709 on_ssrc_collision (RTPSession * sess, RTPSource * source)
710 {
711   g_object_ref (source);
712   RTP_SESSION_UNLOCK (sess);
713   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
714       source);
715   RTP_SESSION_LOCK (sess);
716   g_object_unref (source);
717 }
718
719 static void
720 on_ssrc_validated (RTPSession * sess, RTPSource * source)
721 {
722   g_object_ref (source);
723   RTP_SESSION_UNLOCK (sess);
724   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
725       source);
726   RTP_SESSION_LOCK (sess);
727   g_object_unref (source);
728 }
729
730 static void
731 on_ssrc_active (RTPSession * sess, RTPSource * source)
732 {
733   g_object_ref (source);
734   RTP_SESSION_UNLOCK (sess);
735   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0, source);
736   RTP_SESSION_LOCK (sess);
737   g_object_unref (source);
738 }
739
740 static void
741 on_ssrc_sdes (RTPSession * sess, RTPSource * source)
742 {
743   g_object_ref (source);
744   GST_DEBUG ("SDES changed for SSRC %08x", source->ssrc);
745   RTP_SESSION_UNLOCK (sess);
746   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0, source);
747   RTP_SESSION_LOCK (sess);
748   g_object_unref (source);
749 }
750
751 static void
752 on_bye_ssrc (RTPSession * sess, RTPSource * source)
753 {
754   g_object_ref (source);
755   RTP_SESSION_UNLOCK (sess);
756   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
757   RTP_SESSION_LOCK (sess);
758   g_object_unref (source);
759 }
760
761 static void
762 on_bye_timeout (RTPSession * sess, RTPSource * source)
763 {
764   g_object_ref (source);
765   RTP_SESSION_UNLOCK (sess);
766   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
767   RTP_SESSION_LOCK (sess);
768   g_object_unref (source);
769 }
770
771 static void
772 on_timeout (RTPSession * sess, RTPSource * source)
773 {
774   g_object_ref (source);
775   RTP_SESSION_UNLOCK (sess);
776   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
777   RTP_SESSION_LOCK (sess);
778   g_object_unref (source);
779 }
780
781 static void
782 on_sender_timeout (RTPSession * sess, RTPSource * source)
783 {
784   g_object_ref (source);
785   RTP_SESSION_UNLOCK (sess);
786   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
787       source);
788   RTP_SESSION_LOCK (sess);
789   g_object_unref (source);
790 }
791
792 /**
793  * rtp_session_new:
794  *
795  * Create a new session object.
796  *
797  * Returns: a new #RTPSession. g_object_unref() after usage.
798  */
799 RTPSession *
800 rtp_session_new (void)
801 {
802   RTPSession *sess;
803
804   sess = g_object_new (RTP_TYPE_SESSION, NULL);
805
806   return sess;
807 }
808
809 /**
810  * rtp_session_set_callbacks:
811  * @sess: an #RTPSession
812  * @callbacks: callbacks to configure
813  * @user_data: user data passed in the callbacks
814  *
815  * Configure a set of callbacks to be notified of actions.
816  */
817 void
818 rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
819     gpointer user_data)
820 {
821   g_return_if_fail (RTP_IS_SESSION (sess));
822
823   if (callbacks->process_rtp) {
824     sess->callbacks.process_rtp = callbacks->process_rtp;
825     sess->process_rtp_user_data = user_data;
826   }
827   if (callbacks->send_rtp) {
828     sess->callbacks.send_rtp = callbacks->send_rtp;
829     sess->send_rtp_user_data = user_data;
830   }
831   if (callbacks->send_rtcp) {
832     sess->callbacks.send_rtcp = callbacks->send_rtcp;
833     sess->send_rtcp_user_data = user_data;
834   }
835   if (callbacks->sync_rtcp) {
836     sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
837     sess->sync_rtcp_user_data = user_data;
838   }
839   if (callbacks->clock_rate) {
840     sess->callbacks.clock_rate = callbacks->clock_rate;
841     sess->clock_rate_user_data = user_data;
842   }
843   if (callbacks->reconsider) {
844     sess->callbacks.reconsider = callbacks->reconsider;
845     sess->reconsider_user_data = user_data;
846   }
847   if (callbacks->request_key_unit) {
848     sess->callbacks.request_key_unit = callbacks->request_key_unit;
849     sess->request_key_unit_user_data = user_data;
850   }
851   if (callbacks->request_time) {
852     sess->callbacks.request_time = callbacks->request_time;
853     sess->request_time_user_data = user_data;
854   }
855 }
856
857 /**
858  * rtp_session_set_process_rtp_callback:
859  * @sess: an #RTPSession
860  * @callback: callback to set
861  * @user_data: user data passed in the callback
862  *
863  * Configure only the process_rtp callback to be notified of the process_rtp action.
864  */
865 void
866 rtp_session_set_process_rtp_callback (RTPSession * sess,
867     RTPSessionProcessRTP callback, gpointer user_data)
868 {
869   g_return_if_fail (RTP_IS_SESSION (sess));
870
871   sess->callbacks.process_rtp = callback;
872   sess->process_rtp_user_data = user_data;
873 }
874
875 /**
876  * rtp_session_set_send_rtp_callback:
877  * @sess: an #RTPSession
878  * @callback: callback to set
879  * @user_data: user data passed in the callback
880  *
881  * Configure only the send_rtp callback to be notified of the send_rtp action.
882  */
883 void
884 rtp_session_set_send_rtp_callback (RTPSession * sess,
885     RTPSessionSendRTP callback, gpointer user_data)
886 {
887   g_return_if_fail (RTP_IS_SESSION (sess));
888
889   sess->callbacks.send_rtp = callback;
890   sess->send_rtp_user_data = user_data;
891 }
892
893 /**
894  * rtp_session_set_send_rtcp_callback:
895  * @sess: an #RTPSession
896  * @callback: callback to set
897  * @user_data: user data passed in the callback
898  *
899  * Configure only the send_rtcp callback to be notified of the send_rtcp action.
900  */
901 void
902 rtp_session_set_send_rtcp_callback (RTPSession * sess,
903     RTPSessionSendRTCP callback, gpointer user_data)
904 {
905   g_return_if_fail (RTP_IS_SESSION (sess));
906
907   sess->callbacks.send_rtcp = callback;
908   sess->send_rtcp_user_data = user_data;
909 }
910
911 /**
912  * rtp_session_set_sync_rtcp_callback:
913  * @sess: an #RTPSession
914  * @callback: callback to set
915  * @user_data: user data passed in the callback
916  *
917  * Configure only the sync_rtcp callback to be notified of the sync_rtcp action.
918  */
919 void
920 rtp_session_set_sync_rtcp_callback (RTPSession * sess,
921     RTPSessionSyncRTCP callback, gpointer user_data)
922 {
923   g_return_if_fail (RTP_IS_SESSION (sess));
924
925   sess->callbacks.sync_rtcp = callback;
926   sess->sync_rtcp_user_data = user_data;
927 }
928
929 /**
930  * rtp_session_set_clock_rate_callback:
931  * @sess: an #RTPSession
932  * @callback: callback to set
933  * @user_data: user data passed in the callback
934  *
935  * Configure only the clock_rate callback to be notified of the clock_rate action.
936  */
937 void
938 rtp_session_set_clock_rate_callback (RTPSession * sess,
939     RTPSessionClockRate callback, gpointer user_data)
940 {
941   g_return_if_fail (RTP_IS_SESSION (sess));
942
943   sess->callbacks.clock_rate = callback;
944   sess->clock_rate_user_data = user_data;
945 }
946
947 /**
948  * rtp_session_set_reconsider_callback:
949  * @sess: an #RTPSession
950  * @callback: callback to set
951  * @user_data: user data passed in the callback
952  *
953  * Configure only the reconsider callback to be notified of the reconsider action.
954  */
955 void
956 rtp_session_set_reconsider_callback (RTPSession * sess,
957     RTPSessionReconsider callback, gpointer user_data)
958 {
959   g_return_if_fail (RTP_IS_SESSION (sess));
960
961   sess->callbacks.reconsider = callback;
962   sess->reconsider_user_data = user_data;
963 }
964
965 /**
966  * rtp_session_set_request_time_callback:
967  * @sess: an #RTPSession
968  * @callback: callback to set
969  * @user_data: user data passed in the callback
970  *
971  * Configure only the request_time callback
972  */
973 void
974 rtp_session_set_request_time_callback (RTPSession * sess,
975     RTPSessionRequestTime callback, gpointer user_data)
976 {
977   g_return_if_fail (RTP_IS_SESSION (sess));
978
979   sess->callbacks.request_time = callback;
980   sess->request_time_user_data = user_data;
981 }
982
983 /**
984  * rtp_session_set_bandwidth:
985  * @sess: an #RTPSession
986  * @bandwidth: the bandwidth allocated
987  *
988  * Set the session bandwidth in bytes per second.
989  */
990 void
991 rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
992 {
993   g_return_if_fail (RTP_IS_SESSION (sess));
994
995   RTP_SESSION_LOCK (sess);
996   sess->stats.bandwidth = bandwidth;
997   RTP_SESSION_UNLOCK (sess);
998 }
999
1000 /**
1001  * rtp_session_get_bandwidth:
1002  * @sess: an #RTPSession
1003  *
1004  * Get the session bandwidth.
1005  *
1006  * Returns: the session bandwidth.
1007  */
1008 gdouble
1009 rtp_session_get_bandwidth (RTPSession * sess)
1010 {
1011   gdouble result;
1012
1013   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1014
1015   RTP_SESSION_LOCK (sess);
1016   result = sess->stats.bandwidth;
1017   RTP_SESSION_UNLOCK (sess);
1018
1019   return result;
1020 }
1021
1022 /**
1023  * rtp_session_set_rtcp_fraction:
1024  * @sess: an #RTPSession
1025  * @bandwidth: the RTCP bandwidth
1026  *
1027  * Set the bandwidth in bytes per second that should be used for RTCP
1028  * messages.
1029  */
1030 void
1031 rtp_session_set_rtcp_fraction (RTPSession * sess, gdouble bandwidth)
1032 {
1033   g_return_if_fail (RTP_IS_SESSION (sess));
1034
1035   RTP_SESSION_LOCK (sess);
1036   sess->stats.rtcp_bandwidth = bandwidth;
1037   RTP_SESSION_UNLOCK (sess);
1038 }
1039
1040 /**
1041  * rtp_session_get_rtcp_fraction:
1042  * @sess: an #RTPSession
1043  *
1044  * Get the session bandwidth used for RTCP.
1045  *
1046  * Returns: The bandwidth used for RTCP messages.
1047  */
1048 gdouble
1049 rtp_session_get_rtcp_fraction (RTPSession * sess)
1050 {
1051   gdouble result;
1052
1053   g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0);
1054
1055   RTP_SESSION_LOCK (sess);
1056   result = sess->stats.rtcp_bandwidth;
1057   RTP_SESSION_UNLOCK (sess);
1058
1059   return result;
1060 }
1061
1062 /**
1063  * rtp_session_get_sdes_struct:
1064  * @sess: an #RTSPSession
1065  *
1066  * Get the SDES data as a #GstStructure
1067  *
1068  * Returns: a GstStructure with SDES items for @sess. This function returns a
1069  * copy of the SDES structure, use gst_structure_free() after usage.
1070  */
1071 GstStructure *
1072 rtp_session_get_sdes_struct (RTPSession * sess)
1073 {
1074   GstStructure *result = NULL;
1075
1076   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1077
1078   RTP_SESSION_LOCK (sess);
1079   if (sess->sdes)
1080     result = gst_structure_copy (sess->sdes);
1081   RTP_SESSION_UNLOCK (sess);
1082
1083   return result;
1084 }
1085
1086 /**
1087  * rtp_session_set_sdes_struct:
1088  * @sess: an #RTSPSession
1089  * @sdes: a #GstStructure
1090  *
1091  * Set the SDES data as a #GstStructure. This function makes a copy of @sdes.
1092  */
1093 void
1094 rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes)
1095 {
1096   g_return_if_fail (sdes);
1097   g_return_if_fail (RTP_IS_SESSION (sess));
1098
1099   RTP_SESSION_LOCK (sess);
1100   if (sess->sdes)
1101     gst_structure_free (sess->sdes);
1102   sess->sdes = gst_structure_copy (sdes);
1103   RTP_SESSION_UNLOCK (sess);
1104 }
1105
1106 static GstFlowReturn
1107 source_push_rtp (RTPSource * source, gpointer data, RTPSession * session)
1108 {
1109   GstFlowReturn result = GST_FLOW_OK;
1110
1111   if (source->internal) {
1112     GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
1113
1114     RTP_SESSION_UNLOCK (session);
1115
1116     if (session->callbacks.send_rtp)
1117       result =
1118           session->callbacks.send_rtp (session, source, data,
1119           session->send_rtp_user_data);
1120     else {
1121       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1122     }
1123   } else {
1124     GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc);
1125     RTP_SESSION_UNLOCK (session);
1126
1127     if (session->callbacks.process_rtp)
1128       result =
1129           session->callbacks.process_rtp (session, source,
1130           GST_BUFFER_CAST (data), session->process_rtp_user_data);
1131     else
1132       gst_buffer_unref (GST_BUFFER_CAST (data));
1133   }
1134   RTP_SESSION_LOCK (session);
1135
1136   return result;
1137 }
1138
1139 static gint
1140 source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session)
1141 {
1142   gint result;
1143
1144   RTP_SESSION_UNLOCK (session);
1145
1146   if (session->callbacks.clock_rate)
1147     result =
1148         session->callbacks.clock_rate (session, pt,
1149         session->clock_rate_user_data);
1150   else
1151     result = -1;
1152
1153   RTP_SESSION_LOCK (session);
1154
1155   GST_DEBUG ("got clock-rate %d for pt %d", result, pt);
1156
1157   return result;
1158 }
1159
1160 static RTPSourceCallbacks callbacks = {
1161   (RTPSourcePushRTP) source_push_rtp,
1162   (RTPSourceClockRate) source_clock_rate,
1163 };
1164
1165 static gboolean
1166 check_collision (RTPSession * sess, RTPSource * source,
1167     RTPArrivalStats * arrival, gboolean rtp)
1168 {
1169   guint32 ssrc;
1170
1171   /* If we have no arrival address, we can't do collision checking */
1172   if (!arrival->address)
1173     return FALSE;
1174
1175   ssrc = rtp_source_get_ssrc (source);
1176
1177   if (!source->internal) {
1178     GSocketAddress *from;
1179
1180     /* This is not our local source, but lets check if two remote
1181      * source collide */
1182     if (rtp) {
1183       from = source->rtp_from;
1184     } else {
1185       from = source->rtcp_from;
1186     }
1187
1188     if (from) {
1189       if (__g_socket_address_equal (from, arrival->address)) {
1190         /* Address is the same */
1191         return FALSE;
1192       } else {
1193         GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
1194         if (sess->favor_new) {
1195           if (rtp_source_find_conflicting_address (source,
1196                   arrival->address, arrival->current_time)) {
1197             gchar *buf1;
1198
1199             buf1 = __g_socket_address_to_string (arrival->address);
1200             GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
1201                 buf1);
1202             g_free (buf1);
1203
1204             return TRUE;
1205           } else {
1206             gchar *buf1, *buf2;
1207
1208             /* Current address is not a known conflict, lets assume this is
1209              * a new source. Save old address in possible conflict list
1210              */
1211             rtp_source_add_conflicting_address (source, from,
1212                 arrival->current_time);
1213
1214             buf1 = __g_socket_address_to_string (from);
1215             buf2 = __g_socket_address_to_string (arrival->address);
1216
1217             GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
1218                 " saving old as known conflict", ssrc, buf1, buf2);
1219
1220             if (rtp)
1221               rtp_source_set_rtp_from (source, arrival->address);
1222             else
1223               rtp_source_set_rtcp_from (source, arrival->address);
1224
1225             g_free (buf1);
1226             g_free (buf2);
1227
1228             return FALSE;
1229           }
1230         } else {
1231           /* Don't need to save old addresses, we ignore new sources */
1232           return TRUE;
1233         }
1234       }
1235     } else {
1236       /* We don't already have a from address for RTP, just set it */
1237       if (rtp)
1238         rtp_source_set_rtp_from (source, arrival->address);
1239       else
1240         rtp_source_set_rtcp_from (source, arrival->address);
1241       return FALSE;
1242     }
1243
1244     /* FIXME: Log 3rd party collision somehow
1245      * Maybe should be done in upper layer, only the SDES can tell us
1246      * if its a collision or a loop
1247      */
1248   } else {
1249     /* This is sending with our ssrc, is it an address we already know */
1250     if (rtp_source_find_conflicting_address (source, arrival->address,
1251             arrival->current_time)) {
1252       /* Its a known conflict, its probably a loop, not a collision
1253        * lets just drop the incoming packet
1254        */
1255       GST_DEBUG ("Our packets are being looped back to us, dropping");
1256     } else {
1257       /* Its a new collision, lets change our SSRC */
1258       rtp_source_add_conflicting_address (source, arrival->address,
1259           arrival->current_time);
1260
1261       GST_DEBUG ("Collision for SSRC %x", ssrc);
1262       /* mark the source BYE */
1263       rtp_source_mark_bye (source, "SSRC Collision");
1264       /* if we were suggesting this SSRC, change to something else */
1265       if (sess->suggested_ssrc == ssrc)
1266         sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
1267
1268       on_ssrc_collision (sess, source);
1269
1270       rtp_session_schedule_bye_locked (sess, arrival->current_time);
1271     }
1272   }
1273
1274   return TRUE;
1275 }
1276
1277 static RTPSource *
1278 find_source (RTPSession * sess, guint32 ssrc)
1279 {
1280   return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
1281       GINT_TO_POINTER (ssrc));
1282 }
1283
1284 static void
1285 add_source (RTPSession * sess, RTPSource * src)
1286 {
1287   g_hash_table_insert (sess->ssrcs[sess->mask_idx],
1288       GINT_TO_POINTER (src->ssrc), src);
1289   /* report the new source ASAP */
1290   src->generation = sess->generation;
1291   /* we have one more source now */
1292   sess->total_sources++;
1293   if (RTP_SOURCE_IS_ACTIVE (src))
1294     sess->stats.active_sources++;
1295   if (src->internal) {
1296     sess->stats.internal_sources++;
1297     if (sess->suggested_ssrc != src->ssrc)
1298       sess->suggested_ssrc = src->ssrc;
1299   }
1300 }
1301
1302 /* must be called with the session lock, the returned source needs to be
1303  * unreffed after usage. */
1304 static RTPSource *
1305 obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
1306     RTPArrivalStats * arrival, gboolean rtp)
1307 {
1308   RTPSource *source;
1309
1310   source = find_source (sess, ssrc);
1311   if (source == NULL) {
1312     /* make new Source in probation and insert */
1313     source = rtp_source_new (ssrc);
1314
1315     GST_DEBUG ("creating new source %08x %p", ssrc, source);
1316
1317     /* for RTP packets we need to set the source in probation. Receiving RTCP
1318      * packets of an SSRC, on the other hand, is a strong indication that we
1319      * are dealing with a valid source. */
1320     if (rtp)
1321       g_object_set (source, "probation", sess->probation, NULL);
1322     else
1323       g_object_set (source, "probation", 0, NULL);
1324
1325     /* store from address, if any */
1326     if (arrival->address) {
1327       if (rtp)
1328         rtp_source_set_rtp_from (source, arrival->address);
1329       else
1330         rtp_source_set_rtcp_from (source, arrival->address);
1331     }
1332
1333     /* configure a callback on the source */
1334     rtp_source_set_callbacks (source, &callbacks, sess);
1335
1336     add_source (sess, source);
1337     *created = TRUE;
1338   } else {
1339     *created = FALSE;
1340     /* check for collision, this updates the address when not previously set */
1341     if (check_collision (sess, source, arrival, rtp)) {
1342       return NULL;
1343     }
1344     /* Receiving RTCP packets of an SSRC is a strong indication that we
1345      * are dealing with a valid source. */
1346     if (!rtp)
1347       g_object_set (source, "probation", 0, NULL);
1348   }
1349   /* update last activity */
1350   source->last_activity = arrival->current_time;
1351   if (rtp)
1352     source->last_rtp_activity = arrival->current_time;
1353   g_object_ref (source);
1354
1355   return source;
1356 }
1357
1358 /* must be called with the session lock, the returned source needs to be
1359  * unreffed after usage. */
1360 static RTPSource *
1361 obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
1362 {
1363   RTPSource *source;
1364
1365   source = find_source (sess, ssrc);
1366   if (source == NULL) {
1367     /* make new internal Source and insert */
1368     source = rtp_source_new (ssrc);
1369
1370     GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
1371
1372     source->validated = TRUE;
1373     source->internal = TRUE;
1374     rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
1375     rtp_source_set_callbacks (source, &callbacks, sess);
1376
1377     add_source (sess, source);
1378     *created = TRUE;
1379   } else {
1380     *created = FALSE;
1381   }
1382   g_object_ref (source);
1383
1384   return source;
1385 }
1386
1387 /**
1388  * rtp_session_suggest_ssrc:
1389  * @sess: a #RTPSession
1390  *
1391  * Suggest an unused SSRC in @sess.
1392  *
1393  * Returns: a free unused SSRC
1394  */
1395 guint32
1396 rtp_session_suggest_ssrc (RTPSession * sess)
1397 {
1398   guint32 result;
1399
1400   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1401
1402   RTP_SESSION_LOCK (sess);
1403   result = sess->suggested_ssrc;
1404   RTP_SESSION_UNLOCK (sess);
1405
1406   return result;
1407 }
1408
1409 /**
1410  * rtp_session_add_source:
1411  * @sess: a #RTPSession
1412  * @src: #RTPSource to add
1413  *
1414  * Add @src to @session.
1415  *
1416  * Returns: %TRUE on success, %FALSE if a source with the same SSRC already
1417  * existed in the session.
1418  */
1419 gboolean
1420 rtp_session_add_source (RTPSession * sess, RTPSource * src)
1421 {
1422   gboolean result = FALSE;
1423   RTPSource *find;
1424
1425   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1426   g_return_val_if_fail (src != NULL, FALSE);
1427
1428   RTP_SESSION_LOCK (sess);
1429   find = find_source (sess, src->ssrc);
1430   if (find == NULL) {
1431     add_source (sess, src);
1432     result = TRUE;
1433   }
1434   RTP_SESSION_UNLOCK (sess);
1435
1436   return result;
1437 }
1438
1439 /**
1440  * rtp_session_get_num_sources:
1441  * @sess: an #RTPSession
1442  *
1443  * Get the number of sources in @sess.
1444  *
1445  * Returns: The number of sources in @sess.
1446  */
1447 guint
1448 rtp_session_get_num_sources (RTPSession * sess)
1449 {
1450   guint result;
1451
1452   g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE);
1453
1454   RTP_SESSION_LOCK (sess);
1455   result = sess->total_sources;
1456   RTP_SESSION_UNLOCK (sess);
1457
1458   return result;
1459 }
1460
1461 /**
1462  * rtp_session_get_num_active_sources:
1463  * @sess: an #RTPSession
1464  *
1465  * Get the number of active sources in @sess. A source is considered active when
1466  * it has been validated and has not yet received a BYE RTCP message.
1467  *
1468  * Returns: The number of active sources in @sess.
1469  */
1470 guint
1471 rtp_session_get_num_active_sources (RTPSession * sess)
1472 {
1473   guint result;
1474
1475   g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
1476
1477   RTP_SESSION_LOCK (sess);
1478   result = sess->stats.active_sources;
1479   RTP_SESSION_UNLOCK (sess);
1480
1481   return result;
1482 }
1483
1484 /**
1485  * rtp_session_get_source_by_ssrc:
1486  * @sess: an #RTPSession
1487  * @ssrc: an SSRC
1488  *
1489  * Find the source with @ssrc in @sess.
1490  *
1491  * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found.
1492  * g_object_unref() after usage.
1493  */
1494 RTPSource *
1495 rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc)
1496 {
1497   RTPSource *result;
1498
1499   g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
1500
1501   RTP_SESSION_LOCK (sess);
1502   result = find_source (sess, ssrc);
1503   if (result)
1504     g_object_ref (result);
1505   RTP_SESSION_UNLOCK (sess);
1506
1507   return result;
1508 }
1509
1510 /* should be called with the SESSION lock */
1511 static guint32
1512 rtp_session_create_new_ssrc (RTPSession * sess)
1513 {
1514   guint32 ssrc;
1515
1516   while (TRUE) {
1517     ssrc = g_random_int ();
1518
1519     /* see if it exists in the session, we're done if it doesn't */
1520     if (find_source (sess, ssrc) == NULL)
1521       break;
1522   }
1523   return ssrc;
1524 }
1525
1526
1527 /**
1528  * rtp_session_create_source:
1529  * @sess: an #RTPSession
1530  *
1531  * Create an #RTPSource for use in @sess. This function will create a source
1532  * with an ssrc that is currently not used by any participants in the session.
1533  *
1534  * Returns: an #RTPSource.
1535  */
1536 RTPSource *
1537 rtp_session_create_source (RTPSession * sess)
1538 {
1539   guint32 ssrc;
1540   RTPSource *source;
1541
1542   RTP_SESSION_LOCK (sess);
1543   ssrc = rtp_session_create_new_ssrc (sess);
1544   source = rtp_source_new (ssrc);
1545   rtp_source_set_callbacks (source, &callbacks, sess);
1546   /* we need an additional ref for the source in the hashtable */
1547   g_object_ref (source);
1548   add_source (sess, source);
1549   RTP_SESSION_UNLOCK (sess);
1550
1551   return source;
1552 }
1553
1554 /* update the RTPArrivalStats structure with the current time and other bits
1555  * about the current buffer we are handling.
1556  * This function is typically called when a validated packet is received.
1557  * This function should be called with the SESSION_LOCK
1558  */
1559 static void
1560 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
1561     gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
1562     GstClockTime running_time, guint64 ntpnstime)
1563 {
1564   GstNetAddressMeta *meta;
1565   GstRTPBuffer rtpb = { NULL };
1566
1567   /* get time of arrival */
1568   arrival->current_time = current_time;
1569   arrival->running_time = running_time;
1570   arrival->ntpnstime = ntpnstime;
1571
1572   /* get packet size including header overhead */
1573   arrival->bytes = gst_buffer_get_size (buffer) + sess->header_len;
1574
1575   if (rtp) {
1576     gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpb);
1577     arrival->payload_len = gst_rtp_buffer_get_payload_len (&rtpb);
1578     gst_rtp_buffer_unmap (&rtpb);
1579   } else {
1580     arrival->payload_len = 0;
1581   }
1582
1583   /* for netbuffer we can store the IP address to check for collisions */
1584   meta = gst_buffer_get_net_address_meta (buffer);
1585   if (arrival->address)
1586     g_object_unref (arrival->address);
1587   if (meta) {
1588     arrival->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
1589   } else {
1590     arrival->address = NULL;
1591   }
1592 }
1593
1594 static void
1595 clean_arrival_stats (RTPArrivalStats * arrival)
1596 {
1597   if (arrival->address)
1598     g_object_unref (arrival->address);
1599 }
1600
1601 static gboolean
1602 source_update_active (RTPSession * sess, RTPSource * source,
1603     gboolean prevactive)
1604 {
1605   gboolean active = RTP_SOURCE_IS_ACTIVE (source);
1606   guint32 ssrc = source->ssrc;
1607
1608   if (prevactive == active)
1609     return FALSE;
1610
1611   if (active) {
1612     sess->stats.active_sources++;
1613     GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
1614         sess->stats.active_sources);
1615   } else {
1616     sess->stats.active_sources--;
1617     GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
1618         sess->stats.active_sources);
1619   }
1620   return TRUE;
1621 }
1622
1623 static gboolean
1624 source_update_sender (RTPSession * sess, RTPSource * source,
1625     gboolean prevsender)
1626 {
1627   gboolean sender = RTP_SOURCE_IS_SENDER (source);
1628   guint32 ssrc = source->ssrc;
1629
1630   if (prevsender == sender)
1631     return FALSE;
1632
1633   if (sender) {
1634     sess->stats.sender_sources++;
1635     if (source->internal)
1636       sess->stats.internal_sender_sources++;
1637     GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
1638         sess->stats.sender_sources);
1639   } else {
1640     sess->stats.sender_sources--;
1641     if (source->internal)
1642       sess->stats.internal_sender_sources--;
1643     GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
1644         sess->stats.sender_sources);
1645   }
1646   return TRUE;
1647 }
1648
1649 /**
1650  * rtp_session_process_rtp:
1651  * @sess: and #RTPSession
1652  * @buffer: an RTP buffer
1653  * @current_time: the current system time
1654  * @running_time: the running_time of @buffer
1655  *
1656  * Process an RTP buffer in the session manager. This function takes ownership
1657  * of @buffer.
1658  *
1659  * Returns: a #GstFlowReturn.
1660  */
1661 GstFlowReturn
1662 rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
1663     GstClockTime current_time, GstClockTime running_time)
1664 {
1665   GstFlowReturn result;
1666   guint32 ssrc;
1667   RTPSource *source;
1668   gboolean created;
1669   gboolean prevsender, prevactive;
1670   RTPArrivalStats arrival = { NULL, };
1671   guint32 csrcs[16];
1672   guint8 i, count;
1673   guint64 oldrate;
1674   GstRTPBuffer rtp = { NULL };
1675
1676   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
1677   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1678
1679   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
1680     goto invalid_packet;
1681
1682   /* get SSRC to look up in session database */
1683   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
1684   /* copy available csrc for later */
1685   count = gst_rtp_buffer_get_csrc_count (&rtp);
1686   /* make sure to not overflow our array. An RTP buffer can maximally contain
1687    * 16 CSRCs */
1688   count = MIN (count, 16);
1689
1690   for (i = 0; i < count; i++)
1691     csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
1692
1693   gst_rtp_buffer_unmap (&rtp);
1694
1695   RTP_SESSION_LOCK (sess);
1696
1697   /* update arrival stats */
1698   update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
1699       running_time, -1);
1700
1701   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
1702   if (!source)
1703     goto collision;
1704
1705   prevsender = RTP_SOURCE_IS_SENDER (source);
1706   prevactive = RTP_SOURCE_IS_ACTIVE (source);
1707   oldrate = source->bitrate;
1708
1709   /* let source process the packet */
1710   result = rtp_source_process_rtp (source, buffer, &arrival);
1711
1712   /* source became active */
1713   if (source_update_active (sess, source, prevactive))
1714     on_ssrc_validated (sess, source);
1715
1716   source_update_sender (sess, source, prevsender);
1717
1718   if (oldrate != source->bitrate)
1719     sess->recalc_bandwidth = TRUE;
1720
1721   if (created)
1722     on_new_ssrc (sess, source);
1723
1724   if (source->validated) {
1725     gboolean created;
1726
1727     /* for validated sources, we add the CSRCs as well */
1728     for (i = 0; i < count; i++) {
1729       guint32 csrc;
1730       RTPSource *csrc_src;
1731
1732       csrc = csrcs[i];
1733
1734       /* get source */
1735       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
1736       if (!csrc_src)
1737         continue;
1738
1739       if (created) {
1740         GST_DEBUG ("created new CSRC: %08x", csrc);
1741         rtp_source_set_as_csrc (csrc_src);
1742         source_update_active (sess, csrc_src, FALSE);
1743         on_new_ssrc (sess, csrc_src);
1744       }
1745       g_object_unref (csrc_src);
1746     }
1747   }
1748   g_object_unref (source);
1749
1750   RTP_SESSION_UNLOCK (sess);
1751
1752   clean_arrival_stats (&arrival);
1753
1754   return result;
1755
1756   /* ERRORS */
1757 invalid_packet:
1758   {
1759     gst_buffer_unref (buffer);
1760     GST_DEBUG ("invalid RTP packet received");
1761     return GST_FLOW_OK;
1762   }
1763 collision:
1764   {
1765     RTP_SESSION_UNLOCK (sess);
1766     gst_buffer_unref (buffer);
1767     clean_arrival_stats (&arrival);
1768     GST_DEBUG ("ignoring packet because its collisioning");
1769     return GST_FLOW_OK;
1770   }
1771 }
1772
1773 static void
1774 rtp_session_process_rb (RTPSession * sess, RTPSource * source,
1775     GstRTCPPacket * packet, RTPArrivalStats * arrival)
1776 {
1777   guint count, i;
1778
1779   count = gst_rtcp_packet_get_rb_count (packet);
1780   for (i = 0; i < count; i++) {
1781     guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1782     guint8 fractionlost;
1783     gint32 packetslost;
1784     RTPSource *src;
1785
1786     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
1787         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1788
1789     GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
1790
1791     /* find our own source */
1792     src = find_source (sess, ssrc);
1793     if (src == NULL)
1794       continue;
1795
1796     if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
1797       /* only deal with report blocks for our session, we update the stats of
1798        * the sender of the RTCP message. We could also compare our stats against
1799        * the other sender to see if we are better or worse. */
1800       /* FIXME, need to keep track who the RB block is from */
1801       rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
1802           packetslost, exthighestseq, jitter, lsr, dlsr);
1803     }
1804   }
1805   on_ssrc_active (sess, source);
1806 }
1807
1808 /* A Sender report contains statistics about how the sender is doing. This
1809  * includes timing informataion such as the relation between RTP and NTP
1810  * timestamps and the number of packets/bytes it sent to us.
1811  *
1812  * In this report is also included a set of report blocks related to how this
1813  * sender is receiving data (in case we (or somebody else) is also sending stuff
1814  * to it). This info includes the packet loss, jitter and seqnum. It also
1815  * contains information to calculate the round trip time (LSR/DLSR).
1816  */
1817 static void
1818 rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
1819     RTPArrivalStats * arrival, gboolean * do_sync)
1820 {
1821   guint32 senderssrc, rtptime, packet_count, octet_count;
1822   guint64 ntptime;
1823   RTPSource *source;
1824   gboolean created, prevsender;
1825
1826   gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime,
1827       &packet_count, &octet_count);
1828
1829   GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
1830       senderssrc, GST_TIME_ARGS (arrival->current_time));
1831
1832   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1833   if (!source)
1834     return;
1835
1836   /* don't try to do lip-sync for sources that sent a BYE */
1837   if (RTP_SOURCE_IS_MARKED_BYE (source))
1838     *do_sync = FALSE;
1839   else
1840     *do_sync = TRUE;
1841
1842   prevsender = RTP_SOURCE_IS_SENDER (source);
1843
1844   /* first update the source */
1845   rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
1846       packet_count, octet_count);
1847
1848   source_update_sender (sess, source, prevsender);
1849
1850   if (created)
1851     on_new_ssrc (sess, source);
1852
1853   rtp_session_process_rb (sess, source, packet, arrival);
1854   g_object_unref (source);
1855 }
1856
1857 /* A receiver report contains statistics about how a receiver is doing. It
1858  * includes stuff like packet loss, jitter and the seqnum it received last. It
1859  * also contains info to calculate the round trip time.
1860  *
1861  * We are only interested in how the sender of this report is doing wrt to us.
1862  */
1863 static void
1864 rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
1865     RTPArrivalStats * arrival)
1866 {
1867   guint32 senderssrc;
1868   RTPSource *source;
1869   gboolean created;
1870
1871   senderssrc = gst_rtcp_packet_rr_get_ssrc (packet);
1872
1873   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
1874
1875   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
1876   if (!source)
1877     return;
1878
1879   if (created)
1880     on_new_ssrc (sess, source);
1881
1882   rtp_session_process_rb (sess, source, packet, arrival);
1883   g_object_unref (source);
1884 }
1885
1886 /* Get SDES items and store them in the SSRC */
1887 static void
1888 rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
1889     RTPArrivalStats * arrival)
1890 {
1891   guint items, i, j;
1892   gboolean more_items, more_entries;
1893
1894   items = gst_rtcp_packet_sdes_get_item_count (packet);
1895   GST_DEBUG ("got SDES packet with %d items", items);
1896
1897   more_items = gst_rtcp_packet_sdes_first_item (packet);
1898   i = 0;
1899   while (more_items) {
1900     guint32 ssrc;
1901     gboolean changed, created, prevactive;
1902     RTPSource *source;
1903     GstStructure *sdes;
1904
1905     ssrc = gst_rtcp_packet_sdes_get_ssrc (packet);
1906
1907     GST_DEBUG ("item %d, SSRC %08x", i, ssrc);
1908
1909     changed = FALSE;
1910
1911     /* find src, no probation when dealing with RTCP */
1912     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1913     if (!source)
1914       return;
1915
1916     sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
1917
1918     more_entries = gst_rtcp_packet_sdes_first_entry (packet);
1919     j = 0;
1920     while (more_entries) {
1921       GstRTCPSDESType type;
1922       guint8 len;
1923       guint8 *data;
1924       gchar *name;
1925       gchar *value;
1926
1927       gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data);
1928
1929       GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len,
1930           data);
1931
1932       if (type == GST_RTCP_SDES_PRIV) {
1933         name = g_strndup ((const gchar *) &data[1], data[0]);
1934         len -= data[0] + 1;
1935         data += data[0] + 1;
1936       } else {
1937         name = g_strdup (gst_rtcp_sdes_type_to_name (type));
1938       }
1939
1940       value = g_strndup ((const gchar *) data, len);
1941
1942       gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
1943
1944       g_free (name);
1945       g_free (value);
1946
1947       more_entries = gst_rtcp_packet_sdes_next_entry (packet);
1948       j++;
1949     }
1950
1951     /* takes ownership of sdes */
1952     changed = rtp_source_set_sdes_struct (source, sdes);
1953
1954     prevactive = RTP_SOURCE_IS_ACTIVE (source);
1955     source->validated = TRUE;
1956
1957     if (created)
1958       on_new_ssrc (sess, source);
1959
1960     /* source became active */
1961     if (source_update_active (sess, source, prevactive))
1962       on_ssrc_validated (sess, source);
1963
1964     if (changed)
1965       on_ssrc_sdes (sess, source);
1966
1967     g_object_unref (source);
1968
1969     more_items = gst_rtcp_packet_sdes_next_item (packet);
1970     i++;
1971   }
1972 }
1973
1974 /* BYE is sent when a client leaves the session
1975  */
1976 static void
1977 rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
1978     RTPArrivalStats * arrival)
1979 {
1980   guint count, i;
1981   gchar *reason;
1982   gboolean reconsider = FALSE;
1983
1984   reason = gst_rtcp_packet_bye_get_reason (packet);
1985   GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason));
1986
1987   count = gst_rtcp_packet_bye_get_ssrc_count (packet);
1988   for (i = 0; i < count; i++) {
1989     guint32 ssrc;
1990     RTPSource *source;
1991     gboolean created, prevactive, prevsender;
1992     guint pmembers, members;
1993
1994     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
1995     GST_DEBUG ("SSRC: %08x", ssrc);
1996
1997     /* find src and mark bye, no probation when dealing with RTCP */
1998     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
1999     if (!source)
2000       return;
2001
2002     if (source->internal) {
2003       /* our own source, something weird with this packet */
2004       g_object_unref (source);
2005       continue;
2006     }
2007
2008     /* store time for when we need to time out this source */
2009     source->bye_time = arrival->current_time;
2010
2011     prevactive = RTP_SOURCE_IS_ACTIVE (source);
2012     prevsender = RTP_SOURCE_IS_SENDER (source);
2013
2014     /* mark the source BYE */
2015     rtp_source_mark_bye (source, reason);
2016
2017     pmembers = sess->stats.active_sources;
2018
2019     source_update_active (sess, source, prevactive);
2020     source_update_sender (sess, source, prevsender);
2021
2022     members = sess->stats.active_sources;
2023
2024     if (!sess->scheduled_bye && members < pmembers) {
2025       /* some members went away since the previous timeout estimate.
2026        * Perform reverse reconsideration but only when we are not scheduling a
2027        * BYE ourselves. */
2028       if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
2029           arrival->current_time < sess->next_rtcp_check_time) {
2030         GstClockTime time_remaining;
2031
2032         time_remaining = sess->next_rtcp_check_time - arrival->current_time;
2033         sess->next_rtcp_check_time =
2034             gst_util_uint64_scale (time_remaining, members, pmembers);
2035
2036         GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
2037             GST_TIME_ARGS (sess->next_rtcp_check_time));
2038
2039         sess->next_rtcp_check_time += arrival->current_time;
2040
2041         /* mark pending reconsider. We only want to signal the reconsideration
2042          * once after we handled all the source in the bye packet */
2043         reconsider = TRUE;
2044       }
2045     }
2046
2047     if (created)
2048       on_new_ssrc (sess, source);
2049
2050     on_bye_ssrc (sess, source);
2051
2052     g_object_unref (source);
2053   }
2054   if (reconsider) {
2055     RTP_SESSION_UNLOCK (sess);
2056     /* notify app of reconsideration */
2057     if (sess->callbacks.reconsider)
2058       sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2059     RTP_SESSION_LOCK (sess);
2060   }
2061   g_free (reason);
2062 }
2063
2064 static void
2065 rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
2066     RTPArrivalStats * arrival)
2067 {
2068   GST_DEBUG ("received APP");
2069 }
2070
2071 static gboolean
2072 rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
2073     gboolean fir, GstClockTime current_time)
2074 {
2075   guint32 round_trip = 0;
2076
2077   rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
2078
2079   if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
2080     GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
2081         GST_SECOND, 65536);
2082
2083     if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
2084       GST_DEBUG ("Ignoring %s request because one was send without one "
2085           "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
2086           fir ? "FIR" : "PLI",
2087           GST_TIME_ARGS (current_time - sess->last_keyframe_request),
2088           GST_TIME_ARGS (round_trip_in_ns));;
2089       return FALSE;
2090     }
2091   }
2092
2093   sess->last_keyframe_request = current_time;
2094
2095   GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
2096       rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
2097       sess->callbacks.request_key_unit);
2098
2099   RTP_SESSION_UNLOCK (sess);
2100   sess->callbacks.request_key_unit (sess, fir,
2101       sess->request_key_unit_user_data);
2102   RTP_SESSION_LOCK (sess);
2103
2104   return TRUE;
2105 }
2106
2107 static void
2108 rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
2109     guint32 media_ssrc, GstClockTime current_time)
2110 {
2111   RTPSource *src;
2112
2113   if (!sess->callbacks.request_key_unit)
2114     return;
2115
2116   src = find_source (sess, sender_ssrc);
2117   if (!src)
2118     return;
2119
2120   rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
2121 }
2122
2123 static void
2124 rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
2125     guint8 * fci_data, guint fci_length, GstClockTime current_time)
2126 {
2127   RTPSource *src;
2128   guint32 ssrc;
2129   guint position = 0;
2130   gboolean our_request = FALSE;
2131
2132   if (!sess->callbacks.request_key_unit)
2133     return;
2134
2135   if (fci_length < 8)
2136     return;
2137
2138   src = find_source (sess, sender_ssrc);
2139
2140   /* Hack because Google fails to set the sender_ssrc correctly */
2141   if (!src && sender_ssrc == 1) {
2142     GHashTableIter iter;
2143
2144     /* we can't find the source if there are multiple */
2145     if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
2146       return;
2147
2148     g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
2149     while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
2150       if (!src->internal && rtp_source_is_sender (src))
2151         break;
2152       src = NULL;
2153     }
2154   }
2155   if (!src)
2156     return;
2157
2158   for (position = 0; position < fci_length; position += 8) {
2159     guint8 *data = fci_data + position;
2160     RTPSource *own;
2161
2162     ssrc = GST_READ_UINT32_BE (data);
2163
2164     own = find_source (sess, ssrc);
2165     if (own->internal) {
2166       our_request = TRUE;
2167       break;
2168     }
2169   }
2170   if (!our_request)
2171     return;
2172
2173   rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
2174 }
2175
2176 static void
2177 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
2178     RTPArrivalStats * arrival, GstClockTime current_time)
2179 {
2180   GstRTCPType type = gst_rtcp_packet_get_type (packet);
2181   GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
2182   guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
2183   guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
2184   guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
2185   guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
2186   RTPSource *src;
2187
2188   GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
2189       "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
2190
2191   if (g_signal_has_handler_pending (sess,
2192           rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
2193     GstBuffer *fci_buffer = NULL;
2194
2195     if (fci_length > 0) {
2196       fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
2197           GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
2198           fci_length);
2199       GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
2200     }
2201
2202     RTP_SESSION_UNLOCK (sess);
2203     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
2204         type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
2205     RTP_SESSION_LOCK (sess);
2206
2207     if (fci_buffer)
2208       gst_buffer_unref (fci_buffer);
2209   }
2210
2211   src = find_source (sess, media_ssrc);
2212   if (!src)
2213     return;
2214
2215   if (sess->rtcp_feedback_retention_window) {
2216     rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
2217   }
2218
2219   if (src->internal ||
2220       /* PSFB FIR puts the media ssrc inside the FCI */
2221       (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
2222     switch (type) {
2223       case GST_RTCP_TYPE_PSFB:
2224         switch (fbtype) {
2225           case GST_RTCP_PSFB_TYPE_PLI:
2226             rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
2227                 current_time);
2228             break;
2229           case GST_RTCP_PSFB_TYPE_FIR:
2230             rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
2231                 current_time);
2232             break;
2233           default:
2234             break;
2235         }
2236         break;
2237       case GST_RTCP_TYPE_RTPFB:
2238       default:
2239         break;
2240     }
2241   }
2242 }
2243
2244 /**
2245  * rtp_session_process_rtcp:
2246  * @sess: and #RTPSession
2247  * @buffer: an RTCP buffer
2248  * @current_time: the current system time
2249  * @ntpnstime: the current NTP time in nanoseconds
2250  *
2251  * Process an RTCP buffer in the session manager. This function takes ownership
2252  * of @buffer.
2253  *
2254  * Returns: a #GstFlowReturn.
2255  */
2256 GstFlowReturn
2257 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
2258     GstClockTime current_time, guint64 ntpnstime)
2259 {
2260   GstRTCPPacket packet;
2261   gboolean more, is_bye = FALSE, do_sync = FALSE;
2262   RTPArrivalStats arrival = { NULL, };
2263   GstFlowReturn result = GST_FLOW_OK;
2264   GstRTCPBuffer rtcp = { NULL, };
2265
2266   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2267   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2268
2269   if (!gst_rtcp_buffer_validate (buffer))
2270     goto invalid_packet;
2271
2272   GST_DEBUG ("received RTCP packet");
2273
2274   RTP_SESSION_LOCK (sess);
2275   /* update arrival stats */
2276   update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
2277       ntpnstime);
2278
2279   /* start processing the compound packet */
2280   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2281   more = gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
2282   while (more) {
2283     GstRTCPType type;
2284
2285     type = gst_rtcp_packet_get_type (&packet);
2286
2287     /* when we are leaving the session, we should ignore all non-BYE messages */
2288     if (sess->scheduled_bye && type != GST_RTCP_TYPE_BYE) {
2289       GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
2290       goto next;
2291     }
2292
2293     switch (type) {
2294       case GST_RTCP_TYPE_SR:
2295         rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
2296         break;
2297       case GST_RTCP_TYPE_RR:
2298         rtp_session_process_rr (sess, &packet, &arrival);
2299         break;
2300       case GST_RTCP_TYPE_SDES:
2301         rtp_session_process_sdes (sess, &packet, &arrival);
2302         break;
2303       case GST_RTCP_TYPE_BYE:
2304         is_bye = TRUE;
2305         /* don't try to attempt lip-sync anymore for streams with a BYE */
2306         do_sync = FALSE;
2307         rtp_session_process_bye (sess, &packet, &arrival);
2308         break;
2309       case GST_RTCP_TYPE_APP:
2310         rtp_session_process_app (sess, &packet, &arrival);
2311         break;
2312       case GST_RTCP_TYPE_RTPFB:
2313       case GST_RTCP_TYPE_PSFB:
2314         rtp_session_process_feedback (sess, &packet, &arrival, current_time);
2315         break;
2316       default:
2317         GST_WARNING ("got unknown RTCP packet");
2318         break;
2319     }
2320   next:
2321     more = gst_rtcp_packet_move_to_next (&packet);
2322   }
2323
2324   gst_rtcp_buffer_unmap (&rtcp);
2325
2326   /* if we are scheduling a BYE, we only want to count bye packets, else we
2327    * count everything */
2328   if (sess->scheduled_bye) {
2329     if (is_bye) {
2330       sess->stats.bye_members++;
2331       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2332     }
2333   } else {
2334     /* keep track of average packet size */
2335     UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
2336   }
2337   GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
2338       sess->stats.avg_rtcp_packet_size, arrival.bytes);
2339   RTP_SESSION_UNLOCK (sess);
2340
2341   clean_arrival_stats (&arrival);
2342
2343   /* notify caller of sr packets in the callback */
2344   if (do_sync && sess->callbacks.sync_rtcp) {
2345     result = sess->callbacks.sync_rtcp (sess, buffer,
2346         sess->sync_rtcp_user_data);
2347   } else
2348     gst_buffer_unref (buffer);
2349
2350   return result;
2351
2352   /* ERRORS */
2353 invalid_packet:
2354   {
2355     GST_DEBUG ("invalid RTCP packet received");
2356     gst_buffer_unref (buffer);
2357     return GST_FLOW_OK;
2358   }
2359 }
2360
2361 /**
2362  * rtp_session_update_send_caps:
2363  * @sess: an #RTPSession
2364  * @caps: a #GstCaps
2365  *
2366  * Update the caps of the sender in the rtp session.
2367  */
2368 void
2369 rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
2370 {
2371   GstStructure *s;
2372   guint ssrc;
2373
2374   g_return_if_fail (RTP_IS_SESSION (sess));
2375   g_return_if_fail (GST_IS_CAPS (caps));
2376
2377   GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
2378
2379   s = gst_caps_get_structure (caps, 0);
2380
2381   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
2382     RTPSource *source;
2383     gboolean created;
2384
2385     RTP_SESSION_LOCK (sess);
2386     source = obtain_internal_source (sess, ssrc, &created);
2387     if (source) {
2388       rtp_source_update_caps (source, caps);
2389       g_object_unref (source);
2390     }
2391     RTP_SESSION_UNLOCK (sess);
2392   }
2393 }
2394
2395 /**
2396  * rtp_session_send_rtp:
2397  * @sess: an #RTPSession
2398  * @data: pointer to either an RTP buffer or a list of RTP buffers
2399  * @is_list: TRUE when @data is a buffer list
2400  * @current_time: the current system time
2401  * @running_time: the running time of @data
2402  *
2403  * Send the RTP buffer in the session manager. This function takes ownership of
2404  * @buffer.
2405  *
2406  * Returns: a #GstFlowReturn.
2407  */
2408 GstFlowReturn
2409 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
2410     GstClockTime current_time, GstClockTime running_time)
2411 {
2412   GstFlowReturn result;
2413   RTPSource *source;
2414   gboolean prevsender;
2415   guint64 oldrate;
2416   GstBuffer *buffer;
2417   GstRTPBuffer rtp = { NULL };
2418   guint32 ssrc;
2419   gboolean created;
2420
2421   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2422   g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
2423
2424   GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
2425
2426   if (is_list) {
2427     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
2428
2429     buffer = gst_buffer_list_get (list, 0);
2430     if (!buffer)
2431       goto no_buffer;
2432   } else {
2433     buffer = GST_BUFFER_CAST (data);
2434   }
2435
2436   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
2437     goto invalid_packet;
2438
2439   /* get SSRC and look up in session database */
2440   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
2441
2442   gst_rtp_buffer_unmap (&rtp);
2443
2444   RTP_SESSION_LOCK (sess);
2445   source = obtain_internal_source (sess, ssrc, &created);
2446
2447   /* update last activity */
2448   source->last_rtp_activity = current_time;
2449
2450   prevsender = RTP_SOURCE_IS_SENDER (source);
2451   oldrate = source->bitrate;
2452
2453   /* we use our own source to send */
2454   result = rtp_source_send_rtp (source, data, is_list, running_time);
2455
2456   source_update_sender (sess, source, prevsender);
2457
2458   if (oldrate != source->bitrate)
2459     sess->recalc_bandwidth = TRUE;
2460   RTP_SESSION_UNLOCK (sess);
2461
2462   g_object_unref (source);
2463
2464   return result;
2465
2466 invalid_packet:
2467   {
2468     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2469     GST_DEBUG ("invalid RTP packet received");
2470     return GST_FLOW_OK;
2471   }
2472 no_buffer:
2473   {
2474     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
2475     GST_DEBUG ("no buffer in list");
2476     return GST_FLOW_OK;
2477   }
2478 }
2479
2480 static void
2481 add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth)
2482 {
2483   *bandwidth += source->bitrate;
2484 }
2485
2486 /* must be called with session lock */
2487 static GstClockTime
2488 calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
2489     gboolean first)
2490 {
2491   GstClockTime result;
2492
2493   /* recalculate bandwidth when it changed */
2494   if (sess->recalc_bandwidth) {
2495     gdouble bandwidth;
2496
2497     if (sess->bandwidth > 0)
2498       bandwidth = sess->bandwidth;
2499     else {
2500       /* If it is <= 0, then try to estimate the actual bandwidth */
2501       bandwidth = 0;
2502
2503       g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2504           (GHFunc) add_bitrates, &bandwidth);
2505       bandwidth /= 8.0;
2506     }
2507     if (bandwidth < 8000)
2508       bandwidth = RTP_STATS_BANDWIDTH;
2509
2510     rtp_stats_set_bandwidths (&sess->stats, bandwidth,
2511         sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth);
2512
2513     sess->recalc_bandwidth = FALSE;
2514   }
2515
2516   if (sess->scheduled_bye) {
2517     result = rtp_stats_calculate_bye_interval (&sess->stats);
2518   } else {
2519     result = rtp_stats_calculate_rtcp_interval (&sess->stats,
2520         sess->stats.internal_sender_sources > 0, first);
2521   }
2522
2523   GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
2524       GST_TIME_ARGS (result), first);
2525
2526   if (!deterministic && result != GST_CLOCK_TIME_NONE)
2527     result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
2528
2529   GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
2530
2531   return result;
2532 }
2533
2534 static void
2535 source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
2536 {
2537   if (source->internal)
2538     rtp_source_mark_bye (source, reason);
2539 }
2540
2541 /**
2542  * rtp_session_mark_all_bye:
2543  * @sess: an #RTPSession
2544  * @reason: a reason
2545  *
2546  * Mark all internal sources of the session as BYE with @reason.
2547  */
2548 void
2549 rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
2550 {
2551   g_return_if_fail (RTP_IS_SESSION (sess));
2552
2553   RTP_SESSION_LOCK (sess);
2554   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2555       (GHFunc) source_mark_bye, (gpointer) reason);
2556   RTP_SESSION_UNLOCK (sess);
2557 }
2558
2559 /* Stop the current @sess and schedule a BYE message for the other members.
2560  * One must have the session lock to call this function
2561  */
2562 static GstFlowReturn
2563 rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
2564 {
2565   GstFlowReturn result = GST_FLOW_OK;
2566   GstClockTime interval;
2567
2568   /* nothing to do it we already scheduled bye */
2569   if (sess->scheduled_bye)
2570     goto done;
2571
2572   /* we schedule BYE now */
2573   sess->scheduled_bye = TRUE;
2574   /* at least one member wants to send a BYE */
2575   INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
2576   sess->stats.bye_members = 1;
2577   sess->first_rtcp = TRUE;
2578   sess->allow_early = TRUE;
2579
2580   /* reschedule transmission */
2581   sess->last_rtcp_send_time = current_time;
2582   interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2583
2584   if (interval != GST_CLOCK_TIME_NONE)
2585     sess->next_rtcp_check_time = current_time + interval;
2586   else
2587     sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
2588
2589   GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
2590       GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
2591
2592   RTP_SESSION_UNLOCK (sess);
2593   /* notify app of reconsideration */
2594   if (sess->callbacks.reconsider)
2595     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
2596   RTP_SESSION_LOCK (sess);
2597 done:
2598
2599   return result;
2600 }
2601
2602 /**
2603  * rtp_session_schedule_bye:
2604  * @sess: an #RTPSession
2605  * @current_time: the current system time
2606  *
2607  * Schedule a BYE message for all sources marked as BYE in @sess.
2608  *
2609  * Returns: a #GstFlowReturn.
2610  */
2611 GstFlowReturn
2612 rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
2613 {
2614   GstFlowReturn result = GST_FLOW_OK;
2615
2616   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
2617
2618   RTP_SESSION_LOCK (sess);
2619   result = rtp_session_schedule_bye_locked (sess, current_time);
2620   RTP_SESSION_UNLOCK (sess);
2621
2622   return result;
2623 }
2624
2625 /**
2626  * rtp_session_next_timeout:
2627  * @sess: an #RTPSession
2628  * @current_time: the current system time
2629  *
2630  * Get the next time we should perform session maintenance tasks.
2631  *
2632  * Returns: a time when rtp_session_on_timeout() should be called with the
2633  * current system time.
2634  */
2635 GstClockTime
2636 rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
2637 {
2638   GstClockTime result, interval = 0;
2639
2640   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE);
2641
2642   RTP_SESSION_LOCK (sess);
2643
2644   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
2645     result = sess->next_early_rtcp_time;
2646     goto early_exit;
2647   }
2648
2649   result = sess->next_rtcp_check_time;
2650
2651   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2652       ", next time: %" GST_TIME_FORMAT,
2653       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2654
2655   if (result == GST_CLOCK_TIME_NONE || result < current_time) {
2656     GST_DEBUG ("take current time as base");
2657     /* our previous check time expired, start counting from the current time
2658      * again. */
2659     result = current_time;
2660   }
2661
2662   if (sess->scheduled_bye) {
2663     if (sess->stats.active_sources >= 50) {
2664       GST_DEBUG ("reconsider BYE, more than 50 sources");
2665       /* reconsider BYE if members >= 50 */
2666       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2667     }
2668   } else {
2669     if (sess->first_rtcp) {
2670       GST_DEBUG ("first RTCP packet");
2671       /* we are called for the first time */
2672       interval = calculate_rtcp_interval (sess, FALSE, TRUE);
2673     } else if (sess->next_rtcp_check_time < current_time) {
2674       GST_DEBUG ("old check time expired, getting new timeout");
2675       /* get a new timeout when we need to */
2676       interval = calculate_rtcp_interval (sess, FALSE, FALSE);
2677     }
2678   }
2679
2680   if (interval != GST_CLOCK_TIME_NONE)
2681     result += interval;
2682   else
2683     result = GST_CLOCK_TIME_NONE;
2684
2685   sess->next_rtcp_check_time = result;
2686
2687 early_exit:
2688
2689   GST_DEBUG ("current time: %" GST_TIME_FORMAT
2690       ", next time: %" GST_TIME_FORMAT,
2691       GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
2692   RTP_SESSION_UNLOCK (sess);
2693
2694   return result;
2695 }
2696
2697 typedef struct
2698 {
2699   RTPSource *source;
2700   gboolean is_bye;
2701   GstBuffer *buffer;
2702 } ReportOutput;
2703
2704 typedef struct
2705 {
2706   GstRTCPBuffer rtcpbuf;
2707   RTPSession *sess;
2708   RTPSource *source;
2709   guint num_to_report;
2710   gboolean have_fir;
2711   gboolean have_pli;
2712   GstBuffer *rtcp;
2713   GstClockTime current_time;
2714   guint64 ntpnstime;
2715   GstClockTime running_time;
2716   GstClockTime interval;
2717   GstRTCPPacket packet;
2718   gboolean has_sdes;
2719   gboolean is_early;
2720   gboolean may_suppress;
2721   GQueue output;
2722 } ReportData;
2723
2724 static void
2725 session_start_rtcp (RTPSession * sess, ReportData * data)
2726 {
2727   GstRTCPPacket *packet = &data->packet;
2728   RTPSource *own = data->source;
2729   GstRTCPBuffer *rtcp = &data->rtcpbuf;
2730
2731   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
2732   data->has_sdes = FALSE;
2733
2734   gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
2735
2736   if (RTP_SOURCE_IS_SENDER (own)) {
2737     guint64 ntptime;
2738     guint32 rtptime;
2739     guint32 packet_count, octet_count;
2740
2741     /* we are a sender, create SR */
2742     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
2743     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SR, packet);
2744
2745     /* get latest stats */
2746     rtp_source_get_new_sr (own, data->ntpnstime, data->running_time,
2747         &ntptime, &rtptime, &packet_count, &octet_count);
2748     /* store stats */
2749     rtp_source_process_sr (own, data->current_time, ntptime, rtptime,
2750         packet_count, octet_count);
2751
2752     /* fill in sender report info */
2753     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
2754         ntptime, rtptime, packet_count, octet_count);
2755   } else {
2756     /* we are only receiver, create RR */
2757     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
2758     gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RR, packet);
2759     gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
2760   }
2761 }
2762
2763 /* construct a Sender or Receiver Report */
2764 static void
2765 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
2766 {
2767   RTPSession *sess = data->sess;
2768   GstRTCPPacket *packet = &data->packet;
2769   guint8 fractionlost;
2770   gint32 packetslost;
2771   guint32 exthighestseq, jitter;
2772   guint32 lsr, dlsr;
2773
2774   /* don't report for sources in future generations */
2775   if (((gint16) (source->generation - sess->generation)) > 0) {
2776     GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
2777         source->generation, sess->generation);
2778     return;
2779   }
2780
2781   /* only report about other sender */
2782   if (source == data->source)
2783     goto reported;
2784
2785   if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
2786     GST_DEBUG ("max RB count reached");
2787     return;
2788   }
2789
2790   if (!RTP_SOURCE_IS_SENDER (source)) {
2791     GST_DEBUG ("source %08x not sender", source->ssrc);
2792     goto reported;
2793   }
2794
2795   GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
2796
2797   /* get new stats */
2798   rtp_source_get_new_rb (source, data->current_time, &fractionlost,
2799       &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
2800
2801   /* store last generated RR packet */
2802   source->last_rr.is_valid = TRUE;
2803   source->last_rr.fractionlost = fractionlost;
2804   source->last_rr.packetslost = packetslost;
2805   source->last_rr.exthighestseq = exthighestseq;
2806   source->last_rr.jitter = jitter;
2807   source->last_rr.lsr = lsr;
2808   source->last_rr.dlsr = dlsr;
2809
2810   /* packet is not yet filled, add report block for this source. */
2811   gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
2812       exthighestseq, jitter, lsr, dlsr);
2813
2814 reported:
2815   /* source is reported, move to next generation */
2816   source->generation = sess->generation + 1;
2817
2818   /* if we reported all sources in this generation, move to next */
2819   if (--data->num_to_report == 0) {
2820     sess->generation++;
2821     GST_DEBUG ("all reported, generation now %u", sess->generation);
2822   }
2823 }
2824
2825 /* construct FIR */
2826 static void
2827 session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
2828 {
2829   GstRTCPPacket *packet = &data->packet;
2830   guint16 len;
2831   guint8 *fci_data;
2832
2833   if (!source->send_fir)
2834     return;
2835
2836   len = gst_rtcp_packet_fb_get_fci_length (packet);
2837   if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
2838     /* exit because the packet is full, will put next request in a
2839      * further packet */
2840     return;
2841
2842   fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
2843
2844   GST_WRITE_UINT32_BE (fci_data, source->ssrc);
2845   fci_data += 4;
2846   fci_data[0] = source->current_send_fir_seqnum;
2847   fci_data[1] = fci_data[2] = fci_data[3] = 0;
2848
2849   source->send_fir = FALSE;
2850 }
2851
2852 static void
2853 session_fir (RTPSession * sess, ReportData * data)
2854 {
2855   GstRTCPBuffer *rtcp = &data->rtcpbuf;
2856   GstRTCPPacket *packet = &data->packet;
2857
2858   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
2859     return;
2860
2861   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
2862   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
2863   gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
2864
2865   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
2866       (GHFunc) session_add_fir, data);
2867
2868   if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
2869     gst_rtcp_packet_remove (packet);
2870   else
2871     data->may_suppress = FALSE;
2872 }
2873
2874 static gboolean
2875 has_pli_compare_func (gconstpointer a, gconstpointer ignored)
2876 {
2877   GstRTCPPacket packet;
2878   GstRTCPBuffer rtcp = { NULL, };
2879   gboolean ret = FALSE;
2880
2881   gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
2882
2883   if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
2884     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
2885         gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
2886       ret = TRUE;
2887   }
2888
2889   gst_rtcp_buffer_unmap (&rtcp);
2890
2891   return ret;
2892 }
2893
2894 /* construct PLI */
2895 static void
2896 session_pli (const gchar * key, RTPSource * source, ReportData * data)
2897 {
2898   GstRTCPBuffer *rtcp = &data->rtcpbuf;
2899   GstRTCPPacket *packet = &data->packet;
2900
2901   if (!source->send_pli)
2902     return;
2903
2904   if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
2905     return;
2906
2907   if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
2908     /* exit because the packet is full, will put next request in a
2909      * further packet */
2910     return;
2911
2912   gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
2913   gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
2914   gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
2915
2916   source->send_pli = FALSE;
2917   data->may_suppress = FALSE;
2918 }
2919
2920 /* perform cleanup of sources that timed out */
2921 static void
2922 session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
2923 {
2924   gboolean remove = FALSE;
2925   gboolean byetimeout = FALSE;
2926   gboolean sendertimeout = FALSE;
2927   gboolean is_sender, is_active;
2928   RTPSession *sess = data->sess;
2929   GstClockTime interval, binterval;
2930   GstClockTime btime;
2931
2932   GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
2933
2934   /* check for outdated collisions */
2935   if (source->internal) {
2936     GST_DEBUG ("Timing out collisions for %x", source->ssrc);
2937     rtp_source_timeout (source, data->current_time,
2938         /* "a relatively long time" -- RFC 3550 section 8.2 */
2939         RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
2940         data->running_time - sess->rtcp_feedback_retention_window);
2941   }
2942
2943   /* nothing else to do when without RTCP */
2944   if (data->interval == GST_CLOCK_TIME_NONE)
2945     return;
2946
2947   is_sender = RTP_SOURCE_IS_SENDER (source);
2948   is_active = RTP_SOURCE_IS_ACTIVE (source);
2949
2950   /* our own rtcp interval may have been forced low by secondary configuration,
2951    * while sender side may still operate with higher interval,
2952    * so do not just take our interval to decide on timing out sender,
2953    * but take (if data->interval <= 5 * GST_SECOND):
2954    *   interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
2955    * where sender_interval is difference between last 2 received RTCP reports
2956    */
2957   if (data->interval >= 5 * GST_SECOND || source->internal) {
2958     binterval = data->interval;
2959   } else {
2960     GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
2961         GST_TIME_ARGS (source->stats.prev_rtcptime),
2962         GST_TIME_ARGS (source->stats.last_rtcptime));
2963     /* if not received enough yet, fallback to larger default */
2964     if (source->stats.last_rtcptime > source->stats.prev_rtcptime)
2965       binterval = source->stats.last_rtcptime - source->stats.prev_rtcptime;
2966     else
2967       binterval = 5 * GST_SECOND;
2968     binterval = CLAMP (binterval, data->interval, 5 * GST_SECOND);
2969   }
2970   GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
2971       GST_TIME_ARGS (binterval));
2972
2973   if (!source->internal) {
2974     if (source->marked_bye) {
2975       /* if we received a BYE from the source, remove the source after some
2976        * time. */
2977       if (data->current_time > source->bye_time &&
2978           data->current_time - source->bye_time > sess->stats.bye_timeout) {
2979         GST_DEBUG ("removing BYE source %08x", source->ssrc);
2980         remove = TRUE;
2981         byetimeout = TRUE;
2982       }
2983     }
2984     /* sources that were inactive for more than 5 times the deterministic reporting
2985      * interval get timed out. the min timeout is 5 seconds. */
2986     /* mind old time that might pre-date last time going to PLAYING */
2987     btime = MAX (source->last_activity, sess->start_time);
2988     if (data->current_time > btime) {
2989       interval = MAX (binterval * 5, 5 * GST_SECOND);
2990       if (data->current_time - btime > interval) {
2991         GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
2992             source->ssrc, GST_TIME_ARGS (btime));
2993         remove = TRUE;
2994       }
2995     }
2996   }
2997
2998   /* senders that did not send for a long time become a receiver, this also
2999    * holds for our own sources. */
3000   if (is_sender) {
3001     /* mind old time that might pre-date last time going to PLAYING */
3002     btime = MAX (source->last_rtp_activity, sess->start_time);
3003     if (data->current_time > btime) {
3004       interval = MAX (binterval * 2, 5 * GST_SECOND);
3005       if (data->current_time - btime > interval) {
3006         if (source->internal && source->sent_bye) {
3007           /* an internal source is BYE and stopped sending RTP, remove */
3008           GST_DEBUG ("internal BYE source %08x timed out, last %"
3009               GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3010           remove = TRUE;
3011         } else {
3012           GST_DEBUG ("sender source %08x timed out and became receiver, last %"
3013               GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
3014           sendertimeout = TRUE;
3015         }
3016       }
3017     }
3018   }
3019
3020   if (remove) {
3021     sess->total_sources--;
3022     if (is_sender) {
3023       sess->stats.sender_sources--;
3024       if (source->internal)
3025         sess->stats.internal_sender_sources--;
3026     }
3027     if (is_active)
3028       sess->stats.active_sources--;
3029
3030     if (source->internal)
3031       sess->stats.internal_sources--;
3032
3033     if (byetimeout)
3034       on_bye_timeout (sess, source);
3035     else
3036       on_timeout (sess, source);
3037   } else {
3038     if (sendertimeout) {
3039       source->is_sender = FALSE;
3040       sess->stats.sender_sources--;
3041       if (source->internal)
3042         sess->stats.internal_sender_sources--;
3043
3044       on_sender_timeout (sess, source);
3045     }
3046     /* count how many source to report in this generation */
3047     if (((gint16) (source->generation - sess->generation)) <= 0)
3048       data->num_to_report++;
3049   }
3050   source->closing = remove;
3051 }
3052
3053 static void
3054 session_sdes (RTPSession * sess, ReportData * data)
3055 {
3056   GstRTCPPacket *packet = &data->packet;
3057   const GstStructure *sdes;
3058   gint i, n_fields;
3059   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3060
3061   /* add SDES packet */
3062   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
3063
3064   gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
3065
3066   sdes = rtp_source_get_sdes_struct (data->source);
3067
3068   /* add all fields in the structure, the order is not important. */
3069   n_fields = gst_structure_n_fields (sdes);
3070   for (i = 0; i < n_fields; ++i) {
3071     const gchar *field;
3072     const gchar *value;
3073     GstRTCPSDESType type;
3074
3075     field = gst_structure_nth_field_name (sdes, i);
3076     if (field == NULL)
3077       continue;
3078     value = gst_structure_get_string (sdes, field);
3079     if (value == NULL)
3080       continue;
3081     type = gst_rtcp_sdes_name_to_type (field);
3082
3083     /* Early packets are minimal and only include the CNAME */
3084     if (data->is_early && type != GST_RTCP_SDES_CNAME)
3085       continue;
3086
3087     if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
3088       gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
3089           (const guint8 *) value);
3090     } else if (type == GST_RTCP_SDES_PRIV) {
3091       gsize prefix_len;
3092       gsize value_len;
3093       gsize data_len;
3094       guint8 data[256];
3095
3096       /* don't accept entries that are too big */
3097       prefix_len = strlen (field);
3098       if (prefix_len > 255)
3099         continue;
3100       value_len = strlen (value);
3101       if (value_len > 255)
3102         continue;
3103       data_len = 1 + prefix_len + value_len;
3104       if (data_len > 255)
3105         continue;
3106
3107       data[0] = prefix_len;
3108       memcpy (&data[1], field, prefix_len);
3109       memcpy (&data[1 + prefix_len], value, value_len);
3110
3111       gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data);
3112     }
3113   }
3114
3115   data->has_sdes = TRUE;
3116 }
3117
3118 /* schedule a BYE packet */
3119 static void
3120 make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
3121 {
3122   GstRTCPPacket *packet = &data->packet;
3123   GstRTCPBuffer *rtcp = &data->rtcpbuf;
3124
3125   /* add SDES */
3126   session_sdes (sess, data);
3127   /* add a BYE packet */
3128   gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
3129   gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
3130   if (source->bye_reason)
3131     gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
3132
3133   /* we have a BYE packet now */
3134   source->sent_bye = TRUE;
3135 }
3136
3137 static gboolean
3138 is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
3139 {
3140   GstClockTime new_send_time, elapsed;
3141
3142   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3143     data->is_early = TRUE;
3144   else
3145     data->is_early = FALSE;
3146
3147   if (data->is_early && sess->next_early_rtcp_time < current_time)
3148     goto early;
3149
3150   /* no need to check yet */
3151   if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
3152       sess->next_rtcp_check_time > current_time) {
3153     GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
3154         GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
3155         GST_TIME_ARGS (current_time));
3156     return FALSE;
3157   }
3158
3159   /* get elapsed time since we last reported */
3160   elapsed = current_time - sess->last_rtcp_send_time;
3161
3162   new_send_time = data->interval;
3163   /* perform forward reconsideration */
3164   if (new_send_time != GST_CLOCK_TIME_NONE) {
3165     new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, new_send_time);
3166
3167     GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
3168         GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time),
3169         GST_TIME_ARGS (elapsed));
3170
3171     new_send_time += sess->last_rtcp_send_time;
3172   }
3173
3174   /* check if reconsideration */
3175   if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
3176     GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
3177         GST_TIME_ARGS (new_send_time));
3178     /* store new check time */
3179     sess->next_rtcp_check_time = new_send_time;
3180     return FALSE;
3181   }
3182
3183 early:
3184
3185   new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
3186
3187   GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
3188       GST_TIME_ARGS (new_send_time));
3189
3190   sess->next_rtcp_check_time = new_send_time;
3191   if (new_send_time != GST_CLOCK_TIME_NONE) {
3192     sess->next_rtcp_check_time += current_time;
3193
3194     /* Apply the rules from RFC 4585 section 3.5.3 */
3195     if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
3196       GstClockTimeDiff T_rr_current_interval =
3197           g_random_double_range (0.5, 1.5) * sess->stats.min_interval;
3198
3199       /* This will caused the RTCP to be suppressed if no FB packets are added */
3200       if (sess->last_rtcp_send_time + T_rr_current_interval >
3201           sess->next_rtcp_check_time) {
3202         GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
3203             " last: %" GST_TIME_FORMAT
3204             " + T_rr_current_interval: %" GST_TIME_FORMAT
3205             " >  sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
3206             GST_TIME_ARGS (sess->stats.min_interval),
3207             GST_TIME_ARGS (sess->last_rtcp_send_time),
3208             GST_TIME_ARGS (T_rr_current_interval),
3209             GST_TIME_ARGS (sess->next_rtcp_check_time));
3210         data->may_suppress = TRUE;
3211       }
3212     }
3213   }
3214
3215   return TRUE;
3216 }
3217
3218 static void
3219 clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table)
3220 {
3221   g_hash_table_insert (hash_table, key, g_object_ref (source));
3222 }
3223
3224 static gboolean
3225 remove_closing_sources (const gchar * key, RTPSource * source,
3226     ReportData * data)
3227 {
3228   if (source->closing)
3229     return TRUE;
3230
3231   if (source->send_fir)
3232     data->have_fir = TRUE;
3233   if (source->send_pli)
3234     data->have_pli = TRUE;
3235
3236   return FALSE;
3237 }
3238
3239 static void
3240 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
3241 {
3242   RTPSession *sess = data->sess;
3243   gboolean is_bye = FALSE;
3244   ReportOutput *output;
3245
3246   /* only generate RTCP for active internal sources */
3247   if (!source->internal || source->sent_bye)
3248     return;
3249
3250   data->source = source;
3251
3252   /* open packet */
3253   session_start_rtcp (sess, data);
3254
3255   if (source->marked_bye) {
3256     /* send BYE */
3257     make_source_bye (sess, source, data);
3258     is_bye = TRUE;
3259   } else if (!data->is_early) {
3260     /* loop over all known sources and add report blocks. If we are early, we
3261      * just make a minimal RTCP packet and skip this step */
3262     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3263         (GHFunc) session_report_blocks, data);
3264   }
3265   if (!data->has_sdes)
3266     session_sdes (sess, data);
3267
3268   if (data->have_fir)
3269     session_fir (sess, data);
3270
3271   if (data->have_pli)
3272     g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3273         (GHFunc) session_pli, data);
3274
3275   gst_rtcp_buffer_unmap (&data->rtcpbuf);
3276
3277   output = g_slice_new (ReportOutput);
3278   output->source = g_object_ref (source);
3279   output->is_bye = is_bye;
3280   output->buffer = data->rtcp;
3281   /* queue the RTCP packet to push later */
3282   g_queue_push_tail (&data->output, output);
3283 }
3284
3285 /**
3286  * rtp_session_on_timeout:
3287  * @sess: an #RTPSession
3288  * @current_time: the current system time
3289  * @ntpnstime: the current NTP time in nanoseconds
3290  * @running_time: the current running_time of the pipeline
3291  *
3292  * Perform maintenance actions after the timeout obtained with
3293  * rtp_session_next_timeout() expired.
3294  *
3295  * This function will perform timeouts of receivers and senders, send a BYE
3296  * packet or generate RTCP packets with current session stats.
3297  *
3298  * This function can call the #RTPSessionSendRTCP callback, possibly multiple
3299  * times, for each packet that should be processed.
3300  *
3301  * Returns: a #GstFlowReturn.
3302  */
3303 GstFlowReturn
3304 rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
3305     guint64 ntpnstime, GstClockTime running_time)
3306 {
3307   GstFlowReturn result = GST_FLOW_OK;
3308   ReportData data = { GST_RTCP_BUFFER_INIT };
3309   GHashTable *table_copy;
3310   ReportOutput *output;
3311
3312   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
3313
3314   GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT
3315       ", running-time %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
3316       GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
3317
3318   data.sess = sess;
3319   data.current_time = current_time;
3320   data.ntpnstime = ntpnstime;
3321   data.running_time = running_time;
3322   data.num_to_report = 0;
3323   data.may_suppress = FALSE;
3324   g_queue_init (&data.output);
3325
3326   RTP_SESSION_LOCK (sess);
3327   /* get a new interval, we need this for various cleanups etc */
3328   data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
3329
3330   /* we need an internal source now */
3331   if (sess->stats.internal_sources == 0) {
3332     RTPSource *source;
3333     gboolean created;
3334
3335     source = obtain_internal_source (sess, sess->suggested_ssrc, &created);
3336     g_object_unref (source);
3337   }
3338
3339   /* Make a local copy of the hashtable. We need to do this because the
3340    * cleanup stage below releases the session lock. */
3341   table_copy = g_hash_table_new_full (NULL, NULL, NULL,
3342       (GDestroyNotify) g_object_unref);
3343   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3344       (GHFunc) clone_ssrcs_hashtable, table_copy);
3345
3346   /* Clean up the session, mark the source for removing, this might release the
3347    * session lock. */
3348   g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data);
3349   g_hash_table_destroy (table_copy);
3350
3351   /* Now remove the marked sources */
3352   g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
3353       (GHRFunc) remove_closing_sources, NULL);
3354
3355   /* see if we need to generate SR or RR packets */
3356   if (!is_rtcp_time (sess, current_time, &data))
3357     goto done;
3358
3359   GST_DEBUG ("doing RTCP generation %u for %u sources", sess->generation,
3360       data.num_to_report);
3361
3362   /* generate RTCP for all internal sources */
3363   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
3364       (GHFunc) generate_rtcp, &data);
3365
3366   /* we keep track of the last report time in order to timeout inactive
3367    * receivers or senders */
3368   if (!data.is_early && !data.may_suppress)
3369     sess->last_rtcp_send_time = data.current_time;
3370   sess->first_rtcp = FALSE;
3371   sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
3372
3373 done:
3374   RTP_SESSION_UNLOCK (sess);
3375
3376   /* push out the RTCP packets */
3377   while ((output = g_queue_pop_head (&data.output))) {
3378     gboolean do_not_suppress;
3379     GstBuffer *buffer = output->buffer;
3380     RTPSource *source = output->source;
3381
3382     /* Give the user a change to add its own packet */
3383     g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
3384         buffer, data.is_early, &do_not_suppress);
3385
3386     if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
3387       guint packet_size;
3388
3389       packet_size = gst_buffer_get_size (buffer) + sess->header_len;
3390
3391       UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
3392       GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
3393           sess->stats.avg_rtcp_packet_size, packet_size);
3394       result =
3395           sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
3396           sess->send_rtcp_user_data);
3397     } else {
3398       GST_DEBUG ("freeing packet callback: %p"
3399           " do_not_suppress: %d may_suppress: %d",
3400           sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
3401       gst_buffer_unref (buffer);
3402     }
3403     g_object_unref (source);
3404     g_slice_free (ReportOutput, output);
3405   }
3406   return result;
3407 }
3408
3409 /**
3410  * rtp_session_request_early_rtcp:
3411  * @sess: an #RTPSession
3412  * @current_time: the current system time
3413  * @max_delay: maximum delay
3414  *
3415  * Request transmission of early RTCP
3416  */
3417 void
3418 rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
3419     GstClockTimeDiff max_delay)
3420 {
3421   GstClockTime T_dither_max;
3422
3423   /* Implements the algorithm described in RFC 4585 section 3.5.2 */
3424
3425   RTP_SESSION_LOCK (sess);
3426
3427   /* Check if already requested */
3428   /*  RFC 4585 section 3.5.2 step 2 */
3429   if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
3430     goto dont_send;
3431
3432   if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time))
3433     goto dont_send;
3434
3435   /* Ignore the request a scheduled packet will be in time anyway */
3436   if (current_time + max_delay > sess->next_rtcp_check_time)
3437     goto dont_send;
3438
3439   /*  RFC 4585 section 3.5.2 step 2b */
3440   /* If the total sources is <=2, then there is only us and one peer */
3441   if (sess->total_sources <= 2) {
3442     T_dither_max = 0;
3443   } else {
3444     /* Divide by 2 because l = 0.5 */
3445     T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
3446     T_dither_max /= 2;
3447   }
3448
3449   /*  RFC 4585 section 3.5.2 step 3 */
3450   if (current_time + T_dither_max > sess->next_rtcp_check_time)
3451     goto dont_send;
3452
3453   /*  RFC 4585 section 3.5.2 step 4
3454    * Don't send if allow_early is FALSE, but not if we are in
3455    * immediate mode, meaning we are part of a group of at most the
3456    * application-specific threshold.
3457    */
3458   if (sess->total_sources > sess->rtcp_immediate_feedback_threshold &&
3459       sess->allow_early == FALSE)
3460     goto dont_send;
3461
3462   if (T_dither_max) {
3463     /* Schedule an early transmission later */
3464     sess->next_early_rtcp_time = g_random_double () * T_dither_max +
3465         current_time;
3466   } else {
3467     /* If no dithering, schedule it for NOW */
3468     sess->next_early_rtcp_time = current_time;
3469   }
3470
3471   RTP_SESSION_UNLOCK (sess);
3472
3473   /* notify app of need to send packet early
3474    * and therefore of timeout change */
3475   if (sess->callbacks.reconsider)
3476     sess->callbacks.reconsider (sess, sess->reconsider_user_data);
3477
3478   return;
3479
3480 dont_send:
3481
3482   RTP_SESSION_UNLOCK (sess);
3483 }
3484
3485 gboolean
3486 rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, GstClockTime now,
3487     gboolean fir, gint count)
3488 {
3489   RTPSource *src = find_source (sess, ssrc);
3490
3491   if (!src)
3492     return FALSE;
3493
3494   if (fir) {
3495     src->send_pli = FALSE;
3496     src->send_fir = TRUE;
3497
3498     if (count == -1 || count != src->last_fir_count)
3499       src->current_send_fir_seqnum++;
3500     src->last_fir_count = count;
3501   } else if (!src->send_fir) {
3502     src->send_pli = TRUE;
3503   }
3504
3505   rtp_session_request_early_rtcp (sess, now, 200 * GST_MSECOND);
3506
3507   return TRUE;
3508 }
3509
3510 static void
3511 rtp_session_send_rtcp (RTPSession * sess, GstClockTimeDiff max_delay)
3512 {
3513   GstClockTime now;
3514
3515   if (!sess->callbacks.send_rtcp)
3516     return;
3517
3518   now = sess->callbacks.request_time (sess, sess->request_time_user_data);
3519
3520   rtp_session_request_early_rtcp (sess, now, max_delay);
3521 }