rtpjitterbuffer: Don't warn for duplicate packets
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpjitterbuffer.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 #include <string.h>
20 #include <stdlib.h>
21
22 #include <gst/rtp/gstrtpbuffer.h>
23 #include <gst/rtp/gstrtcpbuffer.h>
24
25 #include "rtpjitterbuffer.h"
26
27 GST_DEBUG_CATEGORY_STATIC (rtp_jitter_buffer_debug);
28 #define GST_CAT_DEFAULT rtp_jitter_buffer_debug
29
30 #define MAX_WINDOW      RTP_JITTER_BUFFER_MAX_WINDOW
31 #define MAX_TIME        (2 * GST_SECOND)
32
33 /* signals and args */
34 enum
35 {
36   LAST_SIGNAL
37 };
38
39 enum
40 {
41   PROP_0
42 };
43
44 /* GObject vmethods */
45 static void rtp_jitter_buffer_finalize (GObject * object);
46
47 GType
48 rtp_jitter_buffer_mode_get_type (void)
49 {
50   static GType jitter_buffer_mode_type = 0;
51   static const GEnumValue jitter_buffer_modes[] = {
52     {RTP_JITTER_BUFFER_MODE_NONE, "Only use RTP timestamps", "none"},
53     {RTP_JITTER_BUFFER_MODE_SLAVE, "Slave receiver to sender clock", "slave"},
54     {RTP_JITTER_BUFFER_MODE_BUFFER, "Do low/high watermark buffering",
55         "buffer"},
56     {RTP_JITTER_BUFFER_MODE_SYNCED, "Synchronized sender and receiver clocks",
57         "synced"},
58     {0, NULL, NULL},
59   };
60
61   if (!jitter_buffer_mode_type) {
62     jitter_buffer_mode_type =
63         g_enum_register_static ("RTPJitterBufferMode", jitter_buffer_modes);
64   }
65   return jitter_buffer_mode_type;
66 }
67
68 /* static guint rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 }; */
69
70 G_DEFINE_TYPE (RTPJitterBuffer, rtp_jitter_buffer, G_TYPE_OBJECT);
71
72 static void
73 rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass)
74 {
75   GObjectClass *gobject_class;
76
77   gobject_class = (GObjectClass *) klass;
78
79   gobject_class->finalize = rtp_jitter_buffer_finalize;
80
81   GST_DEBUG_CATEGORY_INIT (rtp_jitter_buffer_debug, "rtpjitterbuffer", 0,
82       "RTP Jitter Buffer");
83 }
84
85 static void
86 rtp_jitter_buffer_init (RTPJitterBuffer * jbuf)
87 {
88   g_mutex_init (&jbuf->clock_lock);
89
90   jbuf->packets = g_queue_new ();
91   jbuf->mode = RTP_JITTER_BUFFER_MODE_SLAVE;
92
93   rtp_jitter_buffer_reset_skew (jbuf);
94 }
95
96 static void
97 rtp_jitter_buffer_finalize (GObject * object)
98 {
99   RTPJitterBuffer *jbuf;
100
101   jbuf = RTP_JITTER_BUFFER_CAST (object);
102
103   if (jbuf->media_clock_synced_id)
104     g_signal_handler_disconnect (jbuf->media_clock,
105         jbuf->media_clock_synced_id);
106   if (jbuf->media_clock)
107     gst_object_unref (jbuf->media_clock);
108
109   if (jbuf->pipeline_clock)
110     gst_object_unref (jbuf->pipeline_clock);
111
112   g_queue_free (jbuf->packets);
113
114   g_mutex_clear (&jbuf->clock_lock);
115
116   G_OBJECT_CLASS (rtp_jitter_buffer_parent_class)->finalize (object);
117 }
118
119 /**
120  * rtp_jitter_buffer_new:
121  *
122  * Create an #RTPJitterBuffer.
123  *
124  * Returns: a new #RTPJitterBuffer. Use g_object_unref() after usage.
125  */
126 RTPJitterBuffer *
127 rtp_jitter_buffer_new (void)
128 {
129   RTPJitterBuffer *jbuf;
130
131   jbuf = g_object_new (RTP_TYPE_JITTER_BUFFER, NULL);
132
133   return jbuf;
134 }
135
136 /**
137  * rtp_jitter_buffer_get_mode:
138  * @jbuf: an #RTPJitterBuffer
139  *
140  * Get the current jitterbuffer mode.
141  *
142  * Returns: the current jitterbuffer mode.
143  */
144 RTPJitterBufferMode
145 rtp_jitter_buffer_get_mode (RTPJitterBuffer * jbuf)
146 {
147   return jbuf->mode;
148 }
149
150 /**
151  * rtp_jitter_buffer_set_mode:
152  * @jbuf: an #RTPJitterBuffer
153  * @mode: a #RTPJitterBufferMode
154  *
155  * Set the buffering and clock slaving algorithm used in the @jbuf.
156  */
157 void
158 rtp_jitter_buffer_set_mode (RTPJitterBuffer * jbuf, RTPJitterBufferMode mode)
159 {
160   jbuf->mode = mode;
161 }
162
163 GstClockTime
164 rtp_jitter_buffer_get_delay (RTPJitterBuffer * jbuf)
165 {
166   return jbuf->delay;
167 }
168
169 void
170 rtp_jitter_buffer_set_delay (RTPJitterBuffer * jbuf, GstClockTime delay)
171 {
172   jbuf->delay = delay;
173   jbuf->low_level = (delay * 15) / 100;
174   /* the high level is at 90% in order to release packets before we fill up the
175    * buffer up to the latency */
176   jbuf->high_level = (delay * 90) / 100;
177
178   GST_DEBUG ("delay %" GST_TIME_FORMAT ", min %" GST_TIME_FORMAT ", max %"
179       GST_TIME_FORMAT, GST_TIME_ARGS (jbuf->delay),
180       GST_TIME_ARGS (jbuf->low_level), GST_TIME_ARGS (jbuf->high_level));
181 }
182
183 /**
184  * rtp_jitter_buffer_set_clock_rate:
185  * @jbuf: an #RTPJitterBuffer
186  * @clock_rate: the new clock rate
187  *
188  * Set the clock rate in the jitterbuffer.
189  */
190 void
191 rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer * jbuf, guint32 clock_rate)
192 {
193   if (jbuf->clock_rate != clock_rate) {
194     GST_DEBUG ("Clock rate changed from %" G_GUINT32_FORMAT " to %"
195         G_GUINT32_FORMAT, jbuf->clock_rate, clock_rate);
196     jbuf->clock_rate = clock_rate;
197     rtp_jitter_buffer_reset_skew (jbuf);
198   }
199 }
200
201 /**
202  * rtp_jitter_buffer_get_clock_rate:
203  * @jbuf: an #RTPJitterBuffer
204  *
205  * Get the currently configure clock rate in @jbuf.
206  *
207  * Returns: the current clock-rate
208  */
209 guint32
210 rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer * jbuf)
211 {
212   return jbuf->clock_rate;
213 }
214
215 static void
216 media_clock_synced_cb (GstClock * clock, gboolean synced,
217     RTPJitterBuffer * jbuf)
218 {
219   GstClockTime internal, external;
220
221   g_mutex_lock (&jbuf->clock_lock);
222   if (jbuf->pipeline_clock) {
223     internal = gst_clock_get_internal_time (jbuf->media_clock);
224     external = gst_clock_get_time (jbuf->pipeline_clock);
225
226     gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
227   }
228   g_mutex_unlock (&jbuf->clock_lock);
229 }
230
231 /**
232  * rtp_jitter_buffer_set_media_clock:
233  * @jbuf: an #RTPJitterBuffer
234  * @clock: (transfer full): media #GstClock
235  * @clock_offset: RTP time at clock epoch or -1
236  *
237  * Sets the media clock for the media and the clock offset
238  *
239  */
240 void
241 rtp_jitter_buffer_set_media_clock (RTPJitterBuffer * jbuf, GstClock * clock,
242     guint64 clock_offset)
243 {
244   g_mutex_lock (&jbuf->clock_lock);
245   if (jbuf->media_clock) {
246     if (jbuf->media_clock_synced_id)
247       g_signal_handler_disconnect (jbuf->media_clock,
248           jbuf->media_clock_synced_id);
249     jbuf->media_clock_synced_id = 0;
250     gst_object_unref (jbuf->media_clock);
251   }
252   jbuf->media_clock = clock;
253   jbuf->media_clock_offset = clock_offset;
254
255   if (jbuf->pipeline_clock && jbuf->media_clock &&
256       jbuf->pipeline_clock != jbuf->media_clock) {
257     jbuf->media_clock_synced_id =
258         g_signal_connect (jbuf->media_clock, "synced",
259         G_CALLBACK (media_clock_synced_cb), jbuf);
260     if (gst_clock_is_synced (jbuf->media_clock)) {
261       GstClockTime internal, external;
262
263       internal = gst_clock_get_internal_time (jbuf->media_clock);
264       external = gst_clock_get_time (jbuf->pipeline_clock);
265
266       gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
267     }
268
269     gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock);
270   }
271   g_mutex_unlock (&jbuf->clock_lock);
272 }
273
274 /**
275  * rtp_jitter_buffer_set_pipeline_clock:
276  * @jbuf: an #RTPJitterBuffer
277  * @clock: pipeline #GstClock
278  *
279  * Sets the pipeline clock
280  *
281  */
282 void
283 rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer * jbuf, GstClock * clock)
284 {
285   g_mutex_lock (&jbuf->clock_lock);
286   if (jbuf->pipeline_clock)
287     gst_object_unref (jbuf->pipeline_clock);
288   jbuf->pipeline_clock = clock ? gst_object_ref (clock) : NULL;
289
290   if (jbuf->pipeline_clock && jbuf->media_clock &&
291       jbuf->pipeline_clock != jbuf->media_clock) {
292     if (gst_clock_is_synced (jbuf->media_clock)) {
293       GstClockTime internal, external;
294
295       internal = gst_clock_get_internal_time (jbuf->media_clock);
296       external = gst_clock_get_time (jbuf->pipeline_clock);
297
298       gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
299     }
300
301     gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock);
302   }
303   g_mutex_unlock (&jbuf->clock_lock);
304 }
305
306 gboolean
307 rtp_jitter_buffer_get_rfc7273_sync (RTPJitterBuffer * jbuf)
308 {
309   return jbuf->rfc7273_sync;
310 }
311
312 void
313 rtp_jitter_buffer_set_rfc7273_sync (RTPJitterBuffer * jbuf,
314     gboolean rfc7273_sync)
315 {
316   jbuf->rfc7273_sync = rfc7273_sync;
317 }
318
319 /**
320  * rtp_jitter_buffer_reset_skew:
321  * @jbuf: an #RTPJitterBuffer
322  *
323  * Reset the skew calculations in @jbuf.
324  */
325 void
326 rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
327 {
328   jbuf->base_time = -1;
329   jbuf->base_rtptime = -1;
330   jbuf->base_extrtp = -1;
331   jbuf->media_clock_base_time = -1;
332   jbuf->ext_rtptime = -1;
333   jbuf->last_rtptime = -1;
334   jbuf->window_pos = 0;
335   jbuf->window_filling = TRUE;
336   jbuf->window_min = 0;
337   jbuf->skew = 0;
338   jbuf->prev_send_diff = -1;
339   jbuf->prev_out_time = -1;
340   jbuf->need_resync = TRUE;
341
342   GST_DEBUG ("reset skew correction");
343 }
344
345 /**
346  * rtp_jitter_buffer_disable_buffering:
347  * @jbuf: an #RTPJitterBuffer
348  * @disabled: the new state
349  *
350  * Enable or disable buffering on @jbuf.
351  */
352 void
353 rtp_jitter_buffer_disable_buffering (RTPJitterBuffer * jbuf, gboolean disabled)
354 {
355   jbuf->buffering_disabled = disabled;
356 }
357
358 static void
359 rtp_jitter_buffer_resync (RTPJitterBuffer * jbuf, GstClockTime time,
360     GstClockTime gstrtptime, guint64 ext_rtptime, gboolean reset_skew)
361 {
362   jbuf->base_time = time;
363   jbuf->media_clock_base_time = -1;
364   jbuf->base_rtptime = gstrtptime;
365   jbuf->base_extrtp = ext_rtptime;
366   jbuf->prev_out_time = -1;
367   jbuf->prev_send_diff = -1;
368   if (reset_skew) {
369     jbuf->window_filling = TRUE;
370     jbuf->window_pos = 0;
371     jbuf->window_min = 0;
372     jbuf->window_size = 0;
373     jbuf->skew = 0;
374   }
375   jbuf->need_resync = FALSE;
376 }
377
378 static guint64
379 get_buffer_level (RTPJitterBuffer * jbuf)
380 {
381   RTPJitterBufferItem *high_buf = NULL, *low_buf = NULL;
382   guint64 level;
383
384   /* first buffer with timestamp */
385   high_buf = (RTPJitterBufferItem *) g_queue_peek_tail_link (jbuf->packets);
386   while (high_buf) {
387     if (high_buf->dts != -1 || high_buf->pts != -1)
388       break;
389
390     high_buf = (RTPJitterBufferItem *) g_list_previous (high_buf);
391   }
392
393   low_buf = (RTPJitterBufferItem *) g_queue_peek_head_link (jbuf->packets);
394   while (low_buf) {
395     if (low_buf->dts != -1 || low_buf->pts != -1)
396       break;
397
398     low_buf = (RTPJitterBufferItem *) g_list_next (low_buf);
399   }
400
401   if (!high_buf || !low_buf || high_buf == low_buf) {
402     level = 0;
403   } else {
404     guint64 high_ts, low_ts;
405
406     high_ts = high_buf->dts != -1 ? high_buf->dts : high_buf->pts;
407     low_ts = low_buf->dts != -1 ? low_buf->dts : low_buf->pts;
408
409     if (high_ts > low_ts)
410       level = high_ts - low_ts;
411     else
412       level = 0;
413
414     GST_LOG_OBJECT (jbuf,
415         "low %" GST_TIME_FORMAT " high %" GST_TIME_FORMAT " level %"
416         G_GUINT64_FORMAT, GST_TIME_ARGS (low_ts), GST_TIME_ARGS (high_ts),
417         level);
418   }
419   return level;
420 }
421
422 static void
423 update_buffer_level (RTPJitterBuffer * jbuf, gint * percent)
424 {
425   gboolean post = FALSE;
426   guint64 level;
427
428   level = get_buffer_level (jbuf);
429   GST_DEBUG ("buffer level %" GST_TIME_FORMAT, GST_TIME_ARGS (level));
430
431   if (jbuf->buffering_disabled) {
432     GST_DEBUG ("buffering is disabled");
433     level = jbuf->high_level;
434   }
435
436   if (jbuf->buffering) {
437     post = TRUE;
438     if (level >= jbuf->high_level) {
439       GST_DEBUG ("buffering finished");
440       jbuf->buffering = FALSE;
441     }
442   } else {
443     if (level < jbuf->low_level) {
444       GST_DEBUG ("buffering started");
445       jbuf->buffering = TRUE;
446       post = TRUE;
447     }
448   }
449   if (post) {
450     gint perc;
451
452     if (jbuf->buffering && (jbuf->high_level != 0)) {
453       perc = (level * 100 / jbuf->high_level);
454       perc = MIN (perc, 100);
455     } else {
456       perc = 100;
457     }
458
459     if (percent)
460       *percent = perc;
461
462     GST_DEBUG ("buffering %d", perc);
463   }
464 }
465
466 /* For the clock skew we use a windowed low point averaging algorithm as can be
467  * found in Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation
468  * over Network Delays":
469  * http://www.grame.fr/Ressources/pub/TR-050601.pdf
470  * http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
471  *
472  * The idea is that the jitter is composed of:
473  *
474  *  J = N + n
475  *
476  *   N   : a constant network delay.
477  *   n   : random added noise. The noise is concentrated around 0
478  *
479  * In the receiver we can track the elapsed time at the sender with:
480  *
481  *  send_diff(i) = (Tsi - Ts0);
482  *
483  *   Tsi : The time at the sender at packet i
484  *   Ts0 : The time at the sender at the first packet
485  *
486  * This is the difference between the RTP timestamp in the first received packet
487  * and the current packet.
488  *
489  * At the receiver we have to deal with the jitter introduced by the network.
490  *
491  *  recv_diff(i) = (Tri - Tr0)
492  *
493  *   Tri : The time at the receiver at packet i
494  *   Tr0 : The time at the receiver at the first packet
495  *
496  * Both of these values contain a jitter Ji, a jitter for packet i, so we can
497  * write:
498  *
499  *  recv_diff(i) = (Cri + D + ni) - (Cr0 + D + n0))
500  *
501  *    Cri    : The time of the clock at the receiver for packet i
502  *    D + ni : The jitter when receiving packet i
503  *
504  * We see that the network delay is irrelevant here as we can elliminate D:
505  *
506  *  recv_diff(i) = (Cri + ni) - (Cr0 + n0))
507  *
508  * The drift is now expressed as:
509  *
510  *  Drift(i) = recv_diff(i) - send_diff(i);
511  *
512  * We now keep the W latest values of Drift and find the minimum (this is the
513  * one with the lowest network jitter and thus the one which is least affected
514  * by it). We average this lowest value to smooth out the resulting network skew.
515  *
516  * Both the window and the weighting used for averaging influence the accuracy
517  * of the drift estimation. Finding the correct parameters turns out to be a
518  * compromise between accuracy and inertia.
519  *
520  * We use a 2 second window or up to 512 data points, which is statistically big
521  * enough to catch spikes (FIXME, detect spikes).
522  * We also use a rather large weighting factor (125) to smoothly adapt. During
523  * startup, when filling the window, we use a parabolic weighting factor, the
524  * more the window is filled, the faster we move to the detected possible skew.
525  *
526  * Returns: @time adjusted with the clock skew.
527  */
528 static GstClockTime
529 calculate_skew (RTPJitterBuffer * jbuf, guint64 ext_rtptime,
530     GstClockTime gstrtptime, GstClockTime time)
531 {
532   guint64 send_diff, recv_diff;
533   gint64 delta;
534   gint64 old;
535   gint pos, i;
536   GstClockTime out_time;
537   guint64 slope;
538
539   /* elapsed time at sender */
540   send_diff = gstrtptime - jbuf->base_rtptime;
541
542   /* we don't have an arrival timestamp so we can't do skew detection. we
543    * should still apply a timestamp based on RTP timestamp and base_time */
544   if (time == -1 || jbuf->base_time == -1)
545     goto no_skew;
546
547   /* elapsed time at receiver, includes the jitter */
548   recv_diff = time - jbuf->base_time;
549
550   /* measure the diff */
551   delta = ((gint64) recv_diff) - ((gint64) send_diff);
552
553   /* measure the slope, this gives a rought estimate between the sender speed
554    * and the receiver speed. This should be approximately 8, higher values
555    * indicate a burst (especially when the connection starts) */
556   if (recv_diff > 0)
557     slope = (send_diff * 8) / recv_diff;
558   else
559     slope = 8;
560
561   GST_DEBUG ("time %" GST_TIME_FORMAT ", base %" GST_TIME_FORMAT ", recv_diff %"
562       GST_TIME_FORMAT ", slope %" G_GUINT64_FORMAT, GST_TIME_ARGS (time),
563       GST_TIME_ARGS (jbuf->base_time), GST_TIME_ARGS (recv_diff), slope);
564
565   /* if the difference between the sender timeline and the receiver timeline
566    * changed too quickly we have to resync because the server likely restarted
567    * its timestamps. */
568   if (ABS (delta - jbuf->skew) > GST_SECOND) {
569     GST_WARNING ("delta - skew: %" GST_TIME_FORMAT " too big, reset skew",
570         GST_TIME_ARGS (ABS (delta - jbuf->skew)));
571     rtp_jitter_buffer_resync (jbuf, time, gstrtptime, ext_rtptime, TRUE);
572     send_diff = 0;
573     delta = 0;
574   }
575
576   pos = jbuf->window_pos;
577
578   if (G_UNLIKELY (jbuf->window_filling)) {
579     /* we are filling the window */
580     GST_DEBUG ("filling %d, delta %" G_GINT64_FORMAT, pos, delta);
581     jbuf->window[pos++] = delta;
582     /* calc the min delta we observed */
583     if (G_UNLIKELY (pos == 1 || delta < jbuf->window_min))
584       jbuf->window_min = delta;
585
586     if (G_UNLIKELY (send_diff >= MAX_TIME || pos >= MAX_WINDOW)) {
587       jbuf->window_size = pos;
588
589       /* window filled */
590       GST_DEBUG ("min %" G_GINT64_FORMAT, jbuf->window_min);
591
592       /* the skew is now the min */
593       jbuf->skew = jbuf->window_min;
594       jbuf->window_filling = FALSE;
595     } else {
596       gint perc_time, perc_window, perc;
597
598       /* figure out how much we filled the window, this depends on the amount of
599        * time we have or the max number of points we keep. */
600       perc_time = send_diff * 100 / MAX_TIME;
601       perc_window = pos * 100 / MAX_WINDOW;
602       perc = MAX (perc_time, perc_window);
603
604       /* make a parabolic function, the closer we get to the MAX, the more value
605        * we give to the scaling factor of the new value */
606       perc = perc * perc;
607
608       /* quickly go to the min value when we are filling up, slowly when we are
609        * just starting because we're not sure it's a good value yet. */
610       jbuf->skew =
611           (perc * jbuf->window_min + ((10000 - perc) * jbuf->skew)) / 10000;
612       jbuf->window_size = pos + 1;
613     }
614   } else {
615     /* pick old value and store new value. We keep the previous value in order
616      * to quickly check if the min of the window changed */
617     old = jbuf->window[pos];
618     jbuf->window[pos++] = delta;
619
620     if (G_UNLIKELY (delta <= jbuf->window_min)) {
621       /* if the new value we inserted is smaller or equal to the current min,
622        * it becomes the new min */
623       jbuf->window_min = delta;
624     } else if (G_UNLIKELY (old == jbuf->window_min)) {
625       gint64 min = G_MAXINT64;
626
627       /* if we removed the old min, we have to find a new min */
628       for (i = 0; i < jbuf->window_size; i++) {
629         /* we found another value equal to the old min, we can stop searching now */
630         if (jbuf->window[i] == old) {
631           min = old;
632           break;
633         }
634         if (jbuf->window[i] < min)
635           min = jbuf->window[i];
636       }
637       jbuf->window_min = min;
638     }
639     /* average the min values */
640     jbuf->skew = (jbuf->window_min + (124 * jbuf->skew)) / 125;
641     GST_DEBUG ("delta %" G_GINT64_FORMAT ", new min: %" G_GINT64_FORMAT,
642         delta, jbuf->window_min);
643   }
644   /* wrap around in the window */
645   if (G_UNLIKELY (pos >= jbuf->window_size))
646     pos = 0;
647   jbuf->window_pos = pos;
648
649 no_skew:
650   /* the output time is defined as the base timestamp plus the RTP time
651    * adjusted for the clock skew .*/
652   if (jbuf->base_time != -1) {
653     out_time = jbuf->base_time + send_diff;
654     /* skew can be negative and we don't want to make invalid timestamps */
655     if (jbuf->skew < 0 && out_time < -jbuf->skew) {
656       out_time = 0;
657     } else {
658       out_time += jbuf->skew;
659     }
660   } else
661     out_time = -1;
662
663   GST_DEBUG ("skew %" G_GINT64_FORMAT ", out %" GST_TIME_FORMAT,
664       jbuf->skew, GST_TIME_ARGS (out_time));
665
666   return out_time;
667 }
668
669 static void
670 queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item)
671 {
672   GQueue *queue = jbuf->packets;
673
674   /* It's more likely that the packet was inserted at the tail of the queue */
675   if (G_LIKELY (list)) {
676     item->prev = list;
677     item->next = list->next;
678     list->next = item;
679   } else {
680     item->prev = NULL;
681     item->next = queue->head;
682     queue->head = item;
683   }
684   if (item->next)
685     item->next->prev = item;
686   else
687     queue->tail = item;
688   queue->length++;
689 }
690
691 /**
692  * rtp_jitter_buffer_insert:
693  * @jbuf: an #RTPJitterBuffer
694  * @item: an #RTPJitterBufferItem to insert
695  * @head: TRUE when the head element changed.
696  * @percent: the buffering percent after insertion
697  * @base_time: base time of the pipeline
698  *
699  * Inserts @item into the packet queue of @jbuf. The sequence number of the
700  * packet will be used to sort the packets. This function takes ownerhip of
701  * @buf when the function returns %TRUE.
702  *
703  * When @head is %TRUE, the new packet was added at the head of the queue and
704  * will be available with the next call to rtp_jitter_buffer_pop() and
705  * rtp_jitter_buffer_peek().
706  *
707  * Returns: %FALSE if a packet with the same number already existed.
708  */
709 gboolean
710 rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
711     gboolean * head, gint * percent, GstClockTime base_time)
712 {
713   GList *list, *event = NULL;
714   guint32 rtptime;
715   guint64 ext_rtptime;
716   guint16 seqnum;
717   GstClockTime gstrtptime, dts;
718   GstClock *media_clock, *pipeline_clock;
719   guint64 media_clock_offset;
720   gboolean rfc7273_mode;
721
722   g_return_val_if_fail (jbuf != NULL, FALSE);
723   g_return_val_if_fail (item != NULL, FALSE);
724
725   list = jbuf->packets->tail;
726
727   /* no seqnum, simply append then */
728   if (item->seqnum == -1)
729     goto append;
730
731   seqnum = item->seqnum;
732
733   /* loop the list to skip strictly larger seqnum buffers */
734   for (; list; list = g_list_previous (list)) {
735     guint16 qseq;
736     gint gap;
737     RTPJitterBufferItem *qitem = (RTPJitterBufferItem *) list;
738
739     if (qitem->seqnum == -1) {
740       /* keep a pointer to the first consecutive event if not already
741        * set. we will insert the packet after the event if we can't find
742        * a packet with lower sequence number before the event. */
743       if (event == NULL)
744         event = list;
745       continue;
746     }
747
748     qseq = qitem->seqnum;
749
750     /* compare the new seqnum to the one in the buffer */
751     gap = gst_rtp_buffer_compare_seqnum (seqnum, qseq);
752
753     /* we hit a packet with the same seqnum, notify a duplicate */
754     if (G_UNLIKELY (gap == 0))
755       goto duplicate;
756
757     /* seqnum > qseq, we can stop looking */
758     if (G_LIKELY (gap < 0))
759       break;
760
761     /* if we've found a packet with greater sequence number, cleanup the
762      * event pointer as the packet will be inserted before the event */
763     event = NULL;
764   }
765
766   /* if event is set it means that packets before the event had smaller
767    * sequence number, so we will insert our packet after the event */
768   if (event)
769     list = event;
770
771   dts = item->dts;
772   if (item->rtptime == -1)
773     goto append;
774
775   rtptime = item->rtptime;
776
777   /* rtp time jumps are checked for during skew calculation, but bypassed
778    * in other mode, so mind those here and reset jb if needed.
779    * Only reset if valid input time, which is likely for UDP input
780    * where we expect this might happen due to async thread effects
781    * (in seek and state change cycles), but not so much for TCP input */
782   if (GST_CLOCK_TIME_IS_VALID (dts) &&
783       jbuf->mode != RTP_JITTER_BUFFER_MODE_SLAVE &&
784       jbuf->base_time != -1 && jbuf->last_rtptime != -1) {
785     GstClockTime ext_rtptime = jbuf->ext_rtptime;
786
787     ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
788     if (ext_rtptime > jbuf->last_rtptime + 3 * jbuf->clock_rate ||
789         ext_rtptime + 3 * jbuf->clock_rate < jbuf->last_rtptime) {
790       /* reset even if we don't have valid incoming time;
791        * still better than producing possibly very bogus output timestamp */
792       GST_WARNING ("rtp delta too big, reset skew");
793       rtp_jitter_buffer_reset_skew (jbuf);
794     }
795   }
796
797   /* Return the last time if we got the same RTP timestamp again */
798   ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime);
799   if (jbuf->last_rtptime != -1 && ext_rtptime == jbuf->last_rtptime) {
800     item->pts = jbuf->prev_out_time;
801     goto append;
802   }
803
804   /* keep track of the last extended rtptime */
805   jbuf->last_rtptime = ext_rtptime;
806
807   g_mutex_lock (&jbuf->clock_lock);
808   media_clock = jbuf->media_clock ? gst_object_ref (jbuf->media_clock) : NULL;
809   pipeline_clock =
810       jbuf->pipeline_clock ? gst_object_ref (jbuf->pipeline_clock) : NULL;
811   media_clock_offset = jbuf->media_clock_offset;
812   g_mutex_unlock (&jbuf->clock_lock);
813
814   gstrtptime =
815       gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, jbuf->clock_rate);
816
817   if (G_LIKELY (jbuf->base_rtptime != -1)) {
818     /* check elapsed time in RTP units */
819     if (gstrtptime < jbuf->base_rtptime) {
820       /* elapsed time at sender, timestamps can go backwards and thus be
821        * smaller than our base time, schedule to take a new base time in
822        * that case. */
823       GST_WARNING ("backward timestamps at server, schedule resync");
824       jbuf->need_resync = TRUE;
825     }
826   }
827
828   switch (jbuf->mode) {
829     case RTP_JITTER_BUFFER_MODE_NONE:
830     case RTP_JITTER_BUFFER_MODE_BUFFER:
831       /* send 0 as the first timestamp and -1 for the other ones. This will
832        * interpolate them from the RTP timestamps with a 0 origin. In buffering
833        * mode we will adjust the outgoing timestamps according to the amount of
834        * time we spent buffering. */
835       if (jbuf->base_time == -1)
836         dts = 0;
837       else
838         dts = -1;
839       break;
840     case RTP_JITTER_BUFFER_MODE_SYNCED:
841       /* synchronized clocks, take first timestamp as base, use RTP timestamps
842        * to interpolate */
843       if (jbuf->base_time != -1 && !jbuf->need_resync)
844         dts = -1;
845       break;
846     case RTP_JITTER_BUFFER_MODE_SLAVE:
847     default:
848       break;
849   }
850
851   /* need resync, lock on to time and gstrtptime if we can, otherwise we
852    * do with the previous values */
853   if (G_UNLIKELY (jbuf->need_resync && dts != -1)) {
854     GST_INFO ("resync to time %" GST_TIME_FORMAT ", rtptime %"
855         GST_TIME_FORMAT, GST_TIME_ARGS (dts), GST_TIME_ARGS (gstrtptime));
856     rtp_jitter_buffer_resync (jbuf, dts, gstrtptime, ext_rtptime, FALSE);
857   }
858
859   GST_DEBUG ("extrtp %" G_GUINT64_FORMAT ", gstrtp %" GST_TIME_FORMAT ", base %"
860       GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT, ext_rtptime,
861       GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime),
862       GST_TIME_ARGS (gstrtptime - jbuf->base_rtptime));
863
864   rfc7273_mode = media_clock && pipeline_clock
865       && gst_clock_is_synced (media_clock);
866
867   if (rfc7273_mode && jbuf->mode == RTP_JITTER_BUFFER_MODE_SLAVE
868       && (media_clock_offset == -1 || !jbuf->rfc7273_sync)) {
869     GstClockTime internal, external;
870     GstClockTime rate_num, rate_denom;
871     GstClockTime nsrtptimediff, rtpntptime, rtpsystime;
872
873     gst_clock_get_calibration (media_clock, &internal, &external, &rate_num,
874         &rate_denom);
875
876     /* Slave to the RFC7273 media clock instead of trying to estimate it
877      * based on receive times and RTP timestamps */
878
879     if (jbuf->media_clock_base_time == -1) {
880       if (jbuf->base_time != -1) {
881         jbuf->media_clock_base_time =
882             gst_clock_unadjust_with_calibration (media_clock,
883             jbuf->base_time + base_time, internal, external, rate_num,
884             rate_denom);
885       } else {
886         if (dts != -1)
887           jbuf->media_clock_base_time =
888               gst_clock_unadjust_with_calibration (media_clock, dts + base_time,
889               internal, external, rate_num, rate_denom);
890         else
891           jbuf->media_clock_base_time =
892               gst_clock_get_internal_time (media_clock);
893         jbuf->base_rtptime = gstrtptime;
894       }
895     }
896
897     if (gstrtptime > jbuf->base_rtptime)
898       nsrtptimediff = gstrtptime - jbuf->base_rtptime;
899     else
900       nsrtptimediff = 0;
901
902     rtpntptime = nsrtptimediff + jbuf->media_clock_base_time;
903
904     rtpsystime =
905         gst_clock_adjust_with_calibration (media_clock, rtpntptime, internal,
906         external, rate_num, rate_denom);
907
908     if (rtpsystime > base_time)
909       item->pts = rtpsystime - base_time;
910     else
911       item->pts = 0;
912
913     GST_DEBUG ("RFC7273 clock time %" GST_TIME_FORMAT ", out %" GST_TIME_FORMAT,
914         GST_TIME_ARGS (rtpsystime), GST_TIME_ARGS (item->pts));
915   } else if (rfc7273_mode && (jbuf->mode == RTP_JITTER_BUFFER_MODE_SLAVE
916           || jbuf->mode == RTP_JITTER_BUFFER_MODE_SYNCED)
917       && media_clock_offset != -1 && jbuf->rfc7273_sync) {
918     GstClockTime ntptime, rtptime_tmp;
919     GstClockTime ntprtptime, rtpsystime;
920     GstClockTime internal, external;
921     GstClockTime rate_num, rate_denom;
922
923     /* Don't do any of the dts related adjustments further down */
924     dts = -1;
925
926     /* Calculate the actual clock time on the sender side based on the
927      * RFC7273 clock and convert it to our pipeline clock
928      */
929
930     gst_clock_get_calibration (media_clock, &internal, &external, &rate_num,
931         &rate_denom);
932
933     ntptime = gst_clock_get_internal_time (media_clock);
934
935     ntprtptime = gst_util_uint64_scale (ntptime, jbuf->clock_rate, GST_SECOND);
936     ntprtptime += media_clock_offset;
937     ntprtptime &= 0xffffffff;
938
939     rtptime_tmp = rtptime;
940     /* Check for wraparounds, we assume that the diff between current RTP
941      * timestamp and current media clock time can't be bigger than
942      * 2**31 clock units */
943     if (ntprtptime > rtptime_tmp && ntprtptime - rtptime_tmp >= 0x80000000)
944       rtptime_tmp += G_GUINT64_CONSTANT (0x100000000);
945     else if (rtptime_tmp > ntprtptime && rtptime_tmp - ntprtptime >= 0x80000000)
946       ntprtptime += G_GUINT64_CONSTANT (0x100000000);
947
948     if (ntprtptime > rtptime_tmp)
949       ntptime -=
950           gst_util_uint64_scale (ntprtptime - rtptime_tmp, jbuf->clock_rate,
951           GST_SECOND);
952     else
953       ntptime +=
954           gst_util_uint64_scale (rtptime_tmp - ntprtptime, jbuf->clock_rate,
955           GST_SECOND);
956
957     rtpsystime =
958         gst_clock_adjust_with_calibration (media_clock, ntptime, internal,
959         external, rate_num, rate_denom);
960     /* All this assumes that the pipeline has enough additional
961      * latency to cover for the network delay */
962     if (rtpsystime > base_time)
963       item->pts = rtpsystime - base_time;
964     else
965       item->pts = 0;
966
967     GST_DEBUG ("RFC7273 clock time %" GST_TIME_FORMAT ", out %" GST_TIME_FORMAT,
968         GST_TIME_ARGS (rtpsystime), GST_TIME_ARGS (item->pts));
969   } else {
970     /* If we used the RFC7273 clock before and not anymore,
971      * we need to resync it later again */
972     jbuf->media_clock_base_time = -1;
973
974     /* do skew calculation by measuring the difference between rtptime and the
975      * receive dts, this function will return the skew corrected rtptime. */
976     item->pts = calculate_skew (jbuf, ext_rtptime, gstrtptime, dts);
977   }
978
979   /* check if timestamps are not going backwards, we can only check this if we
980    * have a previous out time and a previous send_diff */
981   if (G_LIKELY (item->pts != -1 && jbuf->prev_out_time != -1
982           && jbuf->prev_send_diff != -1)) {
983     /* now check for backwards timestamps */
984     if (G_UNLIKELY (
985             /* if the server timestamps went up and the out_time backwards */
986             (gstrtptime - jbuf->base_rtptime > jbuf->prev_send_diff
987                 && item->pts < jbuf->prev_out_time) ||
988             /* if the server timestamps went backwards and the out_time forwards */
989             (gstrtptime - jbuf->base_rtptime < jbuf->prev_send_diff
990                 && item->pts > jbuf->prev_out_time) ||
991             /* if the server timestamps did not change */
992             gstrtptime - jbuf->base_rtptime == jbuf->prev_send_diff)) {
993       GST_DEBUG ("backwards timestamps, using previous time");
994       item->pts = jbuf->prev_out_time;
995     }
996   }
997   if (dts != -1 && item->pts + jbuf->delay < dts) {
998     /* if we are going to produce a timestamp that is later than the input
999      * timestamp, we need to reset the jitterbuffer. Likely the server paused
1000      * temporarily */
1001     GST_DEBUG ("out %" GST_TIME_FORMAT " + %" G_GUINT64_FORMAT " < time %"
1002         GST_TIME_FORMAT ", reset jitterbuffer", GST_TIME_ARGS (item->pts),
1003         jbuf->delay, GST_TIME_ARGS (dts));
1004     rtp_jitter_buffer_resync (jbuf, dts, gstrtptime, ext_rtptime, TRUE);
1005     item->pts = dts;
1006   }
1007
1008   jbuf->prev_out_time = item->pts;
1009   jbuf->prev_send_diff = gstrtptime - jbuf->base_rtptime;
1010
1011   if (media_clock)
1012     gst_object_unref (media_clock);
1013   if (pipeline_clock)
1014     gst_object_unref (pipeline_clock);
1015
1016 append:
1017   queue_do_insert (jbuf, list, (GList *) item);
1018
1019   /* buffering mode, update buffer stats */
1020   if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
1021     update_buffer_level (jbuf, percent);
1022   else if (percent)
1023     *percent = -1;
1024
1025   /* head was changed when we did not find a previous packet, we set the return
1026    * flag when requested. */
1027   if (G_LIKELY (head))
1028     *head = (list == NULL);
1029
1030   return TRUE;
1031
1032   /* ERRORS */
1033 duplicate:
1034   {
1035     GST_DEBUG ("duplicate packet %d found", (gint) seqnum);
1036     return FALSE;
1037   }
1038 }
1039
1040 /**
1041  * rtp_jitter_buffer_pop:
1042  * @jbuf: an #RTPJitterBuffer
1043  * @percent: the buffering percent
1044  *
1045  * Pops the oldest buffer from the packet queue of @jbuf. The popped buffer will
1046  * have its timestamp adjusted with the incomming running_time and the detected
1047  * clock skew.
1048  *
1049  * Returns: a #GstBuffer or %NULL when there was no packet in the queue.
1050  */
1051 RTPJitterBufferItem *
1052 rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent)
1053 {
1054   GList *item = NULL;
1055   GQueue *queue;
1056
1057   g_return_val_if_fail (jbuf != NULL, NULL);
1058
1059   queue = jbuf->packets;
1060
1061   item = queue->head;
1062   if (item) {
1063     queue->head = item->next;
1064     if (queue->head)
1065       queue->head->prev = NULL;
1066     else
1067       queue->tail = NULL;
1068     queue->length--;
1069   }
1070
1071   /* buffering mode, update buffer stats */
1072   if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
1073     update_buffer_level (jbuf, percent);
1074   else if (percent)
1075     *percent = -1;
1076
1077   return (RTPJitterBufferItem *) item;
1078 }
1079
1080 /**
1081  * rtp_jitter_buffer_peek:
1082  * @jbuf: an #RTPJitterBuffer
1083  *
1084  * Peek the oldest buffer from the packet queue of @jbuf.
1085  *
1086  * See rtp_jitter_buffer_insert() to check when an older packet was
1087  * added.
1088  *
1089  * Returns: a #GstBuffer or %NULL when there was no packet in the queue.
1090  */
1091 RTPJitterBufferItem *
1092 rtp_jitter_buffer_peek (RTPJitterBuffer * jbuf)
1093 {
1094   g_return_val_if_fail (jbuf != NULL, NULL);
1095
1096   return (RTPJitterBufferItem *) jbuf->packets->head;
1097 }
1098
1099 /**
1100  * rtp_jitter_buffer_flush:
1101  * @jbuf: an #RTPJitterBuffer
1102  * @free_func: function to free each item
1103  * @user_data: user data passed to @free_func
1104  *
1105  * Flush all packets from the jitterbuffer.
1106  */
1107 void
1108 rtp_jitter_buffer_flush (RTPJitterBuffer * jbuf, GFunc free_func,
1109     gpointer user_data)
1110 {
1111   GList *item;
1112
1113   g_return_if_fail (jbuf != NULL);
1114   g_return_if_fail (free_func != NULL);
1115
1116   while ((item = g_queue_pop_head_link (jbuf->packets)))
1117     free_func ((RTPJitterBufferItem *) item, user_data);
1118 }
1119
1120 /**
1121  * rtp_jitter_buffer_is_buffering:
1122  * @jbuf: an #RTPJitterBuffer
1123  *
1124  * Check if @jbuf is buffering currently. Users of the jitterbuffer should not
1125  * pop packets while in buffering mode.
1126  *
1127  * Returns: the buffering state of @jbuf
1128  */
1129 gboolean
1130 rtp_jitter_buffer_is_buffering (RTPJitterBuffer * jbuf)
1131 {
1132   return jbuf->buffering && !jbuf->buffering_disabled;
1133 }
1134
1135 /**
1136  * rtp_jitter_buffer_set_buffering:
1137  * @jbuf: an #RTPJitterBuffer
1138  * @buffering: the new buffering state
1139  *
1140  * Forces @jbuf to go into the buffering state.
1141  */
1142 void
1143 rtp_jitter_buffer_set_buffering (RTPJitterBuffer * jbuf, gboolean buffering)
1144 {
1145   jbuf->buffering = buffering;
1146 }
1147
1148 /**
1149  * rtp_jitter_buffer_get_percent:
1150  * @jbuf: an #RTPJitterBuffer
1151  *
1152  * Get the buffering percent of the jitterbuffer.
1153  *
1154  * Returns: the buffering percent
1155  */
1156 gint
1157 rtp_jitter_buffer_get_percent (RTPJitterBuffer * jbuf)
1158 {
1159   gint percent;
1160   guint64 level;
1161
1162   if (G_UNLIKELY (jbuf->high_level == 0))
1163     return 100;
1164
1165   if (G_UNLIKELY (jbuf->buffering_disabled))
1166     return 100;
1167
1168   level = get_buffer_level (jbuf);
1169   percent = (level * 100 / jbuf->high_level);
1170   percent = MIN (percent, 100);
1171
1172   return percent;
1173 }
1174
1175 /**
1176  * rtp_jitter_buffer_num_packets:
1177  * @jbuf: an #RTPJitterBuffer
1178  *
1179  * Get the number of packets currently in "jbuf.
1180  *
1181  * Returns: The number of packets in @jbuf.
1182  */
1183 guint
1184 rtp_jitter_buffer_num_packets (RTPJitterBuffer * jbuf)
1185 {
1186   g_return_val_if_fail (jbuf != NULL, 0);
1187
1188   return jbuf->packets->length;
1189 }
1190
1191 /**
1192  * rtp_jitter_buffer_get_ts_diff:
1193  * @jbuf: an #RTPJitterBuffer
1194  *
1195  * Get the difference between the timestamps of first and last packet in the
1196  * jitterbuffer.
1197  *
1198  * Returns: The difference expressed in the timestamp units of the packets.
1199  */
1200 guint32
1201 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf)
1202 {
1203   guint64 high_ts, low_ts;
1204   RTPJitterBufferItem *high_buf, *low_buf;
1205   guint32 result;
1206
1207   g_return_val_if_fail (jbuf != NULL, 0);
1208
1209   high_buf = (RTPJitterBufferItem *) g_queue_peek_tail_link (jbuf->packets);
1210   low_buf = (RTPJitterBufferItem *) g_queue_peek_head_link (jbuf->packets);
1211
1212   if (!high_buf || !low_buf || high_buf == low_buf)
1213     return 0;
1214
1215   high_ts = high_buf->rtptime;
1216   low_ts = low_buf->rtptime;
1217
1218   /* it needs to work if ts wraps */
1219   if (high_ts >= low_ts) {
1220     result = (guint32) (high_ts - low_ts);
1221   } else {
1222     result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
1223   }
1224   return result;
1225 }
1226
1227 /**
1228  * rtp_jitter_buffer_get_sync:
1229  * @jbuf: an #RTPJitterBuffer
1230  * @rtptime: result RTP time
1231  * @timestamp: result GStreamer timestamp
1232  * @clock_rate: clock-rate of @rtptime
1233  * @last_rtptime: last seen rtptime.
1234  *
1235  * Calculates the relation between the RTP timestamp and the GStreamer timestamp
1236  * used for constructing timestamps.
1237  *
1238  * For extended RTP timestamp @rtptime with a clock-rate of @clock_rate,
1239  * the GStreamer timestamp is currently @timestamp.
1240  *
1241  * The last seen extended RTP timestamp with clock-rate @clock-rate is returned in
1242  * @last_rtptime.
1243  */
1244 void
1245 rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime,
1246     guint64 * timestamp, guint32 * clock_rate, guint64 * last_rtptime)
1247 {
1248   if (rtptime)
1249     *rtptime = jbuf->base_extrtp;
1250   if (timestamp)
1251     *timestamp = jbuf->base_time + jbuf->skew;
1252   if (clock_rate)
1253     *clock_rate = jbuf->clock_rate;
1254   if (last_rtptime)
1255     *last_rtptime = jbuf->last_rtptime;
1256 }