rtpjitterbuffer: Improved expected-timer handling when gap > 0
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *  Copyright 2015 Kurento (http://kurento.org/)
9  *   @author: Miguel ParĂ­s <mparisdiaz@gmail.com>
10  *  Copyright 2016 Pexip AS
11  *   @author: Havard Graff <havard@pexip.com>
12  *   @author: Stian Selnes <stian@pexip.com>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  *
29  */
30
31 /**
32  * SECTION:element-rtpjitterbuffer
33  *
34  * This element reorders and removes duplicate RTP packets as they are received
35  * from a network source.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * The rtpjitterbuffer will wait for missing packets up to a configurable time
43  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
44  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
45  * property is set, lost packets will result in a custom serialized downstream
46  * event of name GstRTPPacketLost. The lost packet events are usually used by a
47  * depayloader or other element to create concealment data or some other logic
48  * to gracefully handle the missing packets.
49  *
50  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
51  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
52  * buffer.
53  *
54  * The jitterbuffer can also be configured to send early retransmission events
55  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
56  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
57  * sends a custom upstream event named GstRTPRetransmissionRequest when the
58  * packet is considered late. The initial expected packet arrival time is
59  * calculated as follows:
60  *
61  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
62  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
63  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
64  *     packets with different rtptime.
65  *
66  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
67  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
68  *     previously scheduled timeout is overwritten.
69  *
70  * - If seqnum N arrived, all seqnum older than
71  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
72  *     immediately. This is to request fast feedback for abonormally reorder
73  *     packets before any of the previous timeouts is triggered.
74  *
75  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
76  * event. After the initial timeout expires and the retransmission event is
77  * sent, the timeout is scheduled for
78  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
79  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
80  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
81  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
82  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
83  * retransmission requests are sent and the regular logic is performed to
84  * schedule a lost packet as discussed above.
85  *
86  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
87  * to the pipeline.
88  *
89  * This element will automatically be used inside rtpbin.
90  *
91  * <refsect2>
92  * <title>Example pipelines</title>
93  * |[
94  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
95  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
96  * inserted into the pipeline to smooth out network jitter and to reorder the
97  * out-of-order RTP packets.
98  * </refsect2>
99  */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include <stdlib.h>
106 #include <stdio.h>
107 #include <string.h>
108 #include <gst/rtp/gstrtpbuffer.h>
109 #include <gst/net/net.h>
110
111 #include "gstrtpjitterbuffer.h"
112 #include "rtpjitterbuffer.h"
113 #include "rtpstats.h"
114
115 #include <gst/glib-compat-private.h>
116
117 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
118 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
119
120 /* RTPJitterBuffer signals and args */
121 enum
122 {
123   SIGNAL_REQUEST_PT_MAP,
124   SIGNAL_CLEAR_PT_MAP,
125   SIGNAL_HANDLE_SYNC,
126   SIGNAL_ON_NPT_STOP,
127   SIGNAL_SET_ACTIVE,
128   LAST_SIGNAL
129 };
130
131 #define DEFAULT_LATENCY_MS          200
132 #define DEFAULT_DROP_ON_LATENCY     FALSE
133 #define DEFAULT_TS_OFFSET           0
134 #define DEFAULT_DO_LOST             FALSE
135 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
136 #define DEFAULT_PERCENT             0
137 #define DEFAULT_DO_RETRANSMISSION   FALSE
138 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
139 #define DEFAULT_RTX_DELAY           -1
140 #define DEFAULT_RTX_MIN_DELAY       0
141 #define DEFAULT_RTX_DELAY_REORDER   3
142 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
143 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
144 #define DEFAULT_RTX_RETRY_PERIOD    -1
145 #define DEFAULT_RTX_MAX_RETRIES    -1
146 #define DEFAULT_RTX_STATS_TIMEOUT   1000
147 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
148 #define DEFAULT_MAX_DROPOUT_TIME    60000
149 #define DEFAULT_MAX_MISORDER_TIME   2000
150 #define DEFAULT_RFC7273_SYNC        FALSE
151
152 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
153 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
154
155 enum
156 {
157   PROP_0,
158   PROP_LATENCY,
159   PROP_DROP_ON_LATENCY,
160   PROP_TS_OFFSET,
161   PROP_DO_LOST,
162   PROP_MODE,
163   PROP_PERCENT,
164   PROP_DO_RETRANSMISSION,
165   PROP_RTX_NEXT_SEQNUM,
166   PROP_RTX_DELAY,
167   PROP_RTX_MIN_DELAY,
168   PROP_RTX_DELAY_REORDER,
169   PROP_RTX_RETRY_TIMEOUT,
170   PROP_RTX_MIN_RETRY_TIMEOUT,
171   PROP_RTX_RETRY_PERIOD,
172   PROP_RTX_MAX_RETRIES,
173   PROP_RTX_STATS_TIMEOUT,
174   PROP_STATS,
175   PROP_MAX_RTCP_RTP_TIME_DIFF,
176   PROP_MAX_DROPOUT_TIME,
177   PROP_MAX_MISORDER_TIME,
178   PROP_RFC7273_SYNC
179 };
180
181 #define JBUF_LOCK(priv)   G_STMT_START {                        \
182     GST_TRACE("Locking from thread %p", g_thread_self());       \
183     (g_mutex_lock (&(priv)->jbuf_lock));                        \
184     GST_TRACE("Locked from thread %p", g_thread_self());        \
185   } G_STMT_END
186
187 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
188   JBUF_LOCK (priv);                                   \
189   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
190     goto label;                                       \
191 } G_STMT_END
192 #define JBUF_UNLOCK(priv) G_STMT_START {                        \
193     GST_TRACE ("Unlocking from thread %p", g_thread_self ());   \
194     (g_mutex_unlock (&(priv)->jbuf_lock));                      \
195 } G_STMT_END
196
197 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
198   GST_DEBUG ("waiting timer");                            \
199   (priv)->waiting_timer = TRUE;                           \
200   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
201   (priv)->waiting_timer = FALSE;                          \
202   GST_DEBUG ("waiting timer done");                       \
203 } G_STMT_END
204 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
205   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
206     GST_DEBUG ("signal timer");                           \
207     g_cond_signal (&(priv)->jbuf_timer);                  \
208   }                                                       \
209 } G_STMT_END
210
211 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
212   GST_DEBUG ("waiting event");                           \
213   (priv)->waiting_event = TRUE;                          \
214   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
215   (priv)->waiting_event = FALSE;                         \
216   GST_DEBUG ("waiting event done");                      \
217   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
218     goto label;                                          \
219 } G_STMT_END
220 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
221   if (G_UNLIKELY ((priv)->waiting_event)) {              \
222     GST_DEBUG ("signal event");                          \
223     g_cond_signal (&(priv)->jbuf_event);                 \
224   }                                                      \
225 } G_STMT_END
226
227 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
228   GST_DEBUG ("waiting query");                           \
229   (priv)->waiting_query = TRUE;                          \
230   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
231   (priv)->waiting_query = FALSE;                         \
232   GST_DEBUG ("waiting query done");                      \
233   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
234     goto label;                                          \
235 } G_STMT_END
236 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
237   (priv)->last_query = res;                              \
238   if (G_UNLIKELY ((priv)->waiting_query)) {              \
239     GST_DEBUG ("signal query");                          \
240     g_cond_signal (&(priv)->jbuf_query);                 \
241   }                                                      \
242 } G_STMT_END
243
244 #define GST_BUFFER_IS_RETRANSMISSION(buffer) \
245   GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
246
247 typedef struct TimerQueue
248 {
249   GQueue *timers;
250   GHashTable *hashtable;
251 } TimerQueue;
252
253 struct _GstRtpJitterBufferPrivate
254 {
255   GstPad *sinkpad, *srcpad;
256   GstPad *rtcpsinkpad;
257
258   RTPJitterBuffer *jbuf;
259   GMutex jbuf_lock;
260   gboolean waiting_timer;
261   GCond jbuf_timer;
262   gboolean waiting_event;
263   GCond jbuf_event;
264   gboolean waiting_query;
265   GCond jbuf_query;
266   gboolean last_query;
267   gboolean discont;
268   gboolean ts_discont;
269   gboolean active;
270   guint64 out_offset;
271
272   gboolean timer_running;
273   GThread *timer_thread;
274
275   /* properties */
276   guint latency_ms;
277   guint64 latency_ns;
278   gboolean drop_on_latency;
279   gint64 ts_offset;
280   gboolean do_lost;
281   gboolean do_retransmission;
282   gboolean rtx_next_seqnum;
283   gint rtx_delay;
284   guint rtx_min_delay;
285   gint rtx_delay_reorder;
286   gint rtx_retry_timeout;
287   gint rtx_min_retry_timeout;
288   gint rtx_retry_period;
289   gint rtx_max_retries;
290   guint rtx_stats_timeout;
291   gint max_rtcp_rtp_time_diff;
292   guint32 max_dropout_time;
293   guint32 max_misorder_time;
294
295   /* the last seqnum we pushed out */
296   guint32 last_popped_seqnum;
297   /* the next expected seqnum we push */
298   guint32 next_seqnum;
299   /* seqnum-base, if known */
300   guint32 seqnum_base;
301   /* last output time */
302   GstClockTime last_out_time;
303   /* last valid input timestamp and rtptime pair */
304   GstClockTime ips_dts;
305   guint64 ips_rtptime;
306   GstClockTime packet_spacing;
307
308   GQueue gap_packets;
309
310   /* the next expected seqnum we receive */
311   GstClockTime last_in_dts;
312   guint32 next_in_seqnum;
313
314   GArray *timers;
315   TimerQueue *rtx_stats_timers;
316
317   /* start and stop ranges */
318   GstClockTime npt_start;
319   GstClockTime npt_stop;
320   guint64 ext_timestamp;
321   guint64 last_elapsed;
322   guint64 estimated_eos;
323   GstClockID eos_id;
324
325   /* state */
326   gboolean eos;
327   guint last_percent;
328
329   /* clock rate and rtp timestamp offset */
330   gint last_pt;
331   gint32 clock_rate;
332   gint64 clock_base;
333   gint64 prev_ts_offset;
334
335   /* when we are shutting down */
336   GstFlowReturn srcresult;
337   gboolean blocked;
338
339   /* for sync */
340   GstSegment segment;
341   GstClockID clock_id;
342   GstClockTime timer_timeout;
343   guint16 timer_seqnum;
344   /* the latency of the upstream peer, we have to take this into account when
345    * synchronizing the buffers. */
346   GstClockTime peer_latency;
347   guint64 ext_rtptime;
348   GstBuffer *last_sr;
349
350   /* some accounting */
351   guint64 num_pushed;
352   guint64 num_lost;
353   guint64 num_late;
354   guint64 num_duplicates;
355   guint64 num_rtx_requests;
356   guint64 num_rtx_success;
357   guint64 num_rtx_failed;
358   gdouble avg_rtx_num;
359   guint64 avg_rtx_rtt;
360   RTPPacketRateCtx packet_rate_ctx;
361
362   /* for the jitter */
363   GstClockTime last_dts;
364   guint64 last_rtptime;
365   GstClockTime avg_jitter;
366 };
367
368 typedef enum
369 {
370   TIMER_TYPE_EXPECTED,
371   TIMER_TYPE_LOST,
372   TIMER_TYPE_DEADLINE,
373   TIMER_TYPE_EOS
374 } TimerType;
375
376 typedef struct
377 {
378   guint idx;
379   guint16 seqnum;
380   guint num;
381   TimerType type;
382   GstClockTime timeout;
383   GstClockTime duration;
384   GstClockTime rtx_base;
385   GstClockTime rtx_delay;
386   GstClockTime rtx_retry;
387   GstClockTime rtx_last;
388   guint num_rtx_retry;
389   guint num_rtx_received;
390 } TimerData;
391
392 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
393   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
394                                 GstRtpJitterBufferPrivate))
395
396 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
397 GST_STATIC_PAD_TEMPLATE ("sink",
398     GST_PAD_SINK,
399     GST_PAD_ALWAYS,
400     GST_STATIC_CAPS ("application/x-rtp"
401         /* "clock-rate = (int) [ 1, 2147483647 ], "
402          * "payload = (int) , "
403          * "encoding-name = (string) "
404          */ )
405     );
406
407 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
408 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
409     GST_PAD_SINK,
410     GST_PAD_REQUEST,
411     GST_STATIC_CAPS ("application/x-rtcp")
412     );
413
414 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
415 GST_STATIC_PAD_TEMPLATE ("src",
416     GST_PAD_SRC,
417     GST_PAD_ALWAYS,
418     GST_STATIC_CAPS ("application/x-rtp"
419         /* "payload = (int) , "
420          * "clock-rate = (int) , "
421          * "encoding-name = (string) "
422          */ )
423     );
424
425 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
426
427 #define gst_rtp_jitter_buffer_parent_class parent_class
428 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
429
430 /* object overrides */
431 static void gst_rtp_jitter_buffer_set_property (GObject * object,
432     guint prop_id, const GValue * value, GParamSpec * pspec);
433 static void gst_rtp_jitter_buffer_get_property (GObject * object,
434     guint prop_id, GValue * value, GParamSpec * pspec);
435 static void gst_rtp_jitter_buffer_finalize (GObject * object);
436
437 /* element overrides */
438 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
439     * element, GstStateChange transition);
440 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
441     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
442 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
443     GstPad * pad);
444 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
445 static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
446     GstClock * clock);
447
448 /* pad overrides */
449 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
450 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
451     GstObject * parent);
452
453 /* sinkpad overrides */
454 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
455     GstObject * parent, GstEvent * event);
456 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
457     GstObject * parent, GstBuffer * buffer);
458
459 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
460     GstObject * parent, GstEvent * event);
461 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
462     GstObject * parent, GstBuffer * buffer);
463
464 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
465     GstObject * parent, GstQuery * query);
466
467 /* srcpad overrides */
468 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
469     GstObject * parent, GstEvent * event);
470 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
471     GstObject * parent, GstPadMode mode, gboolean active);
472 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
473 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
474     GstObject * parent, GstQuery * query);
475
476 static void
477 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
478 static GstClockTime
479 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
480     gboolean active, guint64 base_time);
481 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
482
483 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
484 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
485
486 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
487
488 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
489     jitterbuffer);
490
491 static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
492     TimerData * timer, GstClockTime dts, gboolean success);
493
494 static TimerQueue *timer_queue_new (void);
495 static void timer_queue_free (TimerQueue * queue);
496
497 static void
498 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
499 {
500   GObjectClass *gobject_class;
501   GstElementClass *gstelement_class;
502
503   gobject_class = (GObjectClass *) klass;
504   gstelement_class = (GstElementClass *) klass;
505
506   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
507
508   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
509
510   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
511   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
512
513   /**
514    * GstRtpJitterBuffer:latency:
515    *
516    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
517    * for at most this time.
518    */
519   g_object_class_install_property (gobject_class, PROP_LATENCY,
520       g_param_spec_uint ("latency", "Buffer latency in ms",
521           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
522           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
523   /**
524    * GstRtpJitterBuffer:drop-on-latency:
525    *
526    * Drop oldest buffers when the queue is completely filled.
527    */
528   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
529       g_param_spec_boolean ("drop-on-latency",
530           "Drop buffers when maximum latency is reached",
531           "Tells the jitterbuffer to never exceed the given latency in size",
532           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
533   /**
534    * GstRtpJitterBuffer:ts-offset:
535    *
536    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
537    * This is mainly used to ensure interstream synchronisation.
538    */
539   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
540       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
541           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
542           G_MAXINT64, DEFAULT_TS_OFFSET,
543           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
544
545   /**
546    * GstRtpJitterBuffer:do-lost:
547    *
548    * Send out a GstRTPPacketLost event downstream when a packet is considered
549    * lost.
550    */
551   g_object_class_install_property (gobject_class, PROP_DO_LOST,
552       g_param_spec_boolean ("do-lost", "Do Lost",
553           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
554           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
555
556   /**
557    * GstRtpJitterBuffer:mode:
558    *
559    * Control the buffering and timestamping mode used by the jitterbuffer.
560    */
561   g_object_class_install_property (gobject_class, PROP_MODE,
562       g_param_spec_enum ("mode", "Mode",
563           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
564           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
565   /**
566    * GstRtpJitterBuffer:percent:
567    *
568    * The percent of the jitterbuffer that is filled.
569    */
570   g_object_class_install_property (gobject_class, PROP_PERCENT,
571       g_param_spec_int ("percent", "percent",
572           "The buffer filled percent", 0, 100,
573           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
574   /**
575    * GstRtpJitterBuffer:do-retransmission:
576    *
577    * Send out a GstRTPRetransmission event upstream when a packet is considered
578    * late and should be retransmitted.
579    *
580    * Since: 1.2
581    */
582   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
583       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
584           "Send retransmission events upstream when a packet is late",
585           DEFAULT_DO_RETRANSMISSION,
586           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
587
588   /**
589    * GstRtpJitterBuffer:rtx-next-seqnum
590    *
591    * Estimate when the next packet should arrive and schedule a retransmission
592    * request for it.
593    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
594    * for packet N+1. So it will be requested if it does not arrive at the expected time.
595    * The expected time is calculated using the dts of N and the packet spacing.
596    *
597    * Since: 1.6
598    */
599   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
600       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
601           "Estimate when the next packet should arrive and schedule a "
602           "retransmission request for it.",
603           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
604
605   /**
606    * GstRtpJitterBuffer:rtx-delay:
607    *
608    * When a packet did not arrive at the expected time, wait this extra amount
609    * of time before sending a retransmission event.
610    *
611    * When -1 is used, the max jitter will be used as extra delay.
612    *
613    * Since: 1.2
614    */
615   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
616       g_param_spec_int ("rtx-delay", "RTX Delay",
617           "Extra time in ms to wait before sending retransmission "
618           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
619           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
620
621   /**
622    * GstRtpJitterBuffer:rtx-min-delay:
623    *
624    * When a packet did not arrive at the expected time, wait at least this extra amount
625    * of time before sending a retransmission event.
626    *
627    * Since: 1.6
628    */
629   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
630       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
631           "Minimum time in ms to wait before sending retransmission "
632           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
633           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
634   /**
635    * GstRtpJitterBuffer:rtx-delay-reorder:
636    *
637    * Assume that a retransmission event should be sent when we see
638    * this much packet reordering.
639    *
640    * When -1 is used, the value will be estimated based on observed packet
641    * reordering. When 0 is used packet reordering alone will not cause a
642    * retransmission event (Since 1.10).
643    *
644    * Since: 1.2
645    */
646   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
647       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
648           "Sending retransmission event when this much reordering "
649           "(0 disable, -1 automatic)",
650           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
651           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
652   /**
653    * GstRtpJitterBuffer::rtx-retry-timeout:
654    *
655    * When no packet has been received after sending a retransmission event
656    * for this time, retry sending a retransmission event.
657    *
658    * When -1 is used, the value will be estimated based on observed round
659    * trip time.
660    *
661    * Since: 1.2
662    */
663   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
664       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
665           "Retry sending a transmission event after this timeout in "
666           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
667           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
668   /**
669    * GstRtpJitterBuffer::rtx-min-retry-timeout:
670    *
671    * The minimum amount of time between retry timeouts. When
672    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
673    * minimum interval between retry timeouts.
674    *
675    * When -1 is used, the value will be estimated based on the
676    * packet spacing.
677    *
678    * Since: 1.6
679    */
680   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
681       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
682           "Minimum timeout between sending a transmission event in "
683           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
684           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
685   /**
686    * GstRtpJitterBuffer:rtx-retry-period:
687    *
688    * The amount of time to try to get a retransmission.
689    *
690    * When -1 is used, the value will be estimated based on the jitterbuffer
691    * latency and the observed round trip time.
692    *
693    * Since: 1.2
694    */
695   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
696       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
697           "Try to get a retransmission for this many ms "
698           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
699           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
700   /**
701    * GstRtpJitterBuffer:rtx-max-retries:
702    *
703    * The maximum number of retries to request a retransmission.
704    *
705    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
706    * When -1 is used, the number of retransmission request will not be limited.
707    *
708    * Since: 1.6
709    */
710   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
711       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
712           "The maximum number of retries to request a retransmission. "
713           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
714           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
715   /**
716    * GstRtpJitterBuffer::rtx-stats-timeout:
717    *
718    * The time to wait for a retransmitted packet after it has been
719    * considered lost in order to collect RTX statistics.
720    *
721    * Since: 1.10
722    */
723   g_object_class_install_property (gobject_class, PROP_RTX_STATS_TIMEOUT,
724       g_param_spec_uint ("rtx-stats-timeout", "RTX Statistics Timeout",
725           "The time to wait for a retransmitted packet after it has been "
726           "considered lost in order to collect statistics (ms)",
727           0, G_MAXUINT, DEFAULT_RTX_STATS_TIMEOUT,
728           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
729
730   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
731       g_param_spec_uint ("max-dropout-time", "Max dropout time",
732           "The maximum time (milliseconds) of missing packets tolerated.",
733           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
734           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
735
736   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
737       g_param_spec_uint ("max-misorder-time", "Max misorder time",
738           "The maximum time (milliseconds) of misordered packets tolerated.",
739           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
740           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
741   /**
742    * GstRtpJitterBuffer:stats:
743    *
744    * Various jitterbuffer statistics. This property returns a GstStructure
745    * with name application/x-rtp-jitterbuffer-stats with the following fields:
746    *
747    * <itemizedlist>
748    * <listitem>
749    *   <para>
750    *   #guint64
751    *   <classname>&quot;num-pushed&quot;</classname>:
752    *   the number of packets pushed out.
753    *   </para>
754    * </listitem>
755    * <listitem>
756    *   <para>
757    *   #guint64
758    *   <classname>&quot;num-lost&quot;</classname>:
759    *   the number of packets considered lost.
760    *   </para>
761    * </listitem>
762    * <listitem>
763    *   <para>
764    *   #guint64
765    *   <classname>&quot;num-late&quot;</classname>:
766    *   the number of packets arriving too late.
767    *   </para>
768    * </listitem>
769    * <listitem>
770    *   <para>
771    *   #guint64
772    *   <classname>&quot;num-duplicates&quot;</classname>:
773    *   the number of duplicate packets.
774    *   </para>
775    * </listitem>
776    * <listitem>
777    *   <para>
778    *   #guint64
779    *   <classname>&quot;rtx-count&quot;</classname>:
780    *   the number of retransmissions requested.
781    *   </para>
782    * </listitem>
783    * <listitem>
784    *   <para>
785    *   #guint64
786    *   <classname>&quot;rtx-success-count&quot;</classname>:
787    *   the number of successful retransmissions.
788    *   </para>
789    * </listitem>
790    * <listitem>
791    *   <para>
792    *   #gdouble
793    *   <classname>&quot;rtx-per-packet&quot;</classname>:
794    *   average number of RTX per packet.
795    *   </para>
796    * </listitem>
797    * <listitem>
798    *   <para>
799    *   #guint64
800    *   <classname>&quot;rtx-rtt&quot;</classname>:
801    *   average round trip time per RTX.
802    *   </para>
803    * </listitem>
804    * </itemizedlist>
805    *
806    * Since: 1.4
807    */
808   g_object_class_install_property (gobject_class, PROP_STATS,
809       g_param_spec_boxed ("stats", "Statistics",
810           "Various statistics", GST_TYPE_STRUCTURE,
811           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
812
813   /**
814    * GstRtpJitterBuffer:max-rtcp-rtp-time-diff
815    *
816    * The maximum amount of time in ms that the RTP time in the RTCP SRs
817    * is allowed to be ahead of the last RTP packet we received. Use
818    * -1 to disable ignoring of RTCP packets.
819    *
820    * Since: 1.8
821    */
822   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
823       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
824           "Maximum amount of time in ms that the RTP time in RTCP SRs "
825           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
826           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
827           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
828
829   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
830       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
831           "Synchronize received streams to the RFC7273 clock "
832           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
833           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
834
835   /**
836    * GstRtpJitterBuffer::request-pt-map:
837    * @buffer: the object which received the signal
838    * @pt: the pt
839    *
840    * Request the payload type as #GstCaps for @pt.
841    */
842   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
843       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
844       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
845           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
846       GST_TYPE_CAPS, 1, G_TYPE_UINT);
847   /**
848    * GstRtpJitterBuffer::handle-sync:
849    * @buffer: the object which received the signal
850    * @struct: a GstStructure containing sync values.
851    *
852    * Be notified of new sync values.
853    */
854   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
855       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
856       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
857           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
858       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
859
860   /**
861    * GstRtpJitterBuffer::on-npt-stop:
862    * @buffer: the object which received the signal
863    *
864    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
865    * the npt-stop position.
866    */
867   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
868       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
869       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
870           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
871       G_TYPE_NONE, 0, G_TYPE_NONE);
872
873   /**
874    * GstRtpJitterBuffer::clear-pt-map:
875    * @buffer: the object which received the signal
876    *
877    * Invalidate the clock-rate as obtained with the
878    * #GstRtpJitterBuffer::request-pt-map signal.
879    */
880   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
881       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
882       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
883       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
884       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
885
886   /**
887    * GstRtpJitterBuffer::set-active:
888    * @buffer: the object which received the signal
889    *
890    * Start pushing out packets with the given base time. This signal is only
891    * useful in buffering mode.
892    *
893    * Returns: the time of the last pushed packet.
894    */
895   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
896       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
897       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
898       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
899       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
900       G_TYPE_UINT64);
901
902   gstelement_class->change_state =
903       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
904   gstelement_class->request_new_pad =
905       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
906   gstelement_class->release_pad =
907       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
908   gstelement_class->provide_clock =
909       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
910   gstelement_class->set_clock =
911       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
912
913   gst_element_class_add_static_pad_template (gstelement_class,
914       &gst_rtp_jitter_buffer_src_template);
915   gst_element_class_add_static_pad_template (gstelement_class,
916       &gst_rtp_jitter_buffer_sink_template);
917   gst_element_class_add_static_pad_template (gstelement_class,
918       &gst_rtp_jitter_buffer_sink_rtcp_template);
919
920   gst_element_class_set_static_metadata (gstelement_class,
921       "RTP packet jitter-buffer", "Filter/Network/RTP",
922       "A buffer that deals with network jitter and other transmission faults",
923       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
924       "Wim Taymans <wim.taymans@gmail.com>");
925
926   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
927   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
928
929   GST_DEBUG_CATEGORY_INIT
930       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
931 }
932
933 static void
934 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
935 {
936   GstRtpJitterBufferPrivate *priv;
937
938   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
939   jitterbuffer->priv = priv;
940
941   priv->latency_ms = DEFAULT_LATENCY_MS;
942   priv->latency_ns = priv->latency_ms * GST_MSECOND;
943   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
944   priv->do_lost = DEFAULT_DO_LOST;
945   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
946   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
947   priv->rtx_delay = DEFAULT_RTX_DELAY;
948   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
949   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
950   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
951   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
952   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
953   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
954   priv->rtx_stats_timeout = DEFAULT_RTX_STATS_TIMEOUT;
955   priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
956   priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
957   priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
958
959   priv->last_dts = -1;
960   priv->last_rtptime = -1;
961   priv->avg_jitter = 0;
962   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
963   priv->rtx_stats_timers = timer_queue_new ();
964   priv->jbuf = rtp_jitter_buffer_new ();
965   g_mutex_init (&priv->jbuf_lock);
966   g_cond_init (&priv->jbuf_timer);
967   g_cond_init (&priv->jbuf_event);
968   g_cond_init (&priv->jbuf_query);
969   g_queue_init (&priv->gap_packets);
970   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
971
972   /* reset skew detection initialy */
973   rtp_jitter_buffer_reset_skew (priv->jbuf);
974   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
975   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
976   priv->active = TRUE;
977
978   priv->srcpad =
979       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
980       "src");
981
982   gst_pad_set_activatemode_function (priv->srcpad,
983       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
984   gst_pad_set_query_function (priv->srcpad,
985       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
986   gst_pad_set_event_function (priv->srcpad,
987       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
988
989   priv->sinkpad =
990       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
991       "sink");
992
993   gst_pad_set_chain_function (priv->sinkpad,
994       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
995   gst_pad_set_event_function (priv->sinkpad,
996       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
997   gst_pad_set_query_function (priv->sinkpad,
998       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
999
1000   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
1001   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
1002
1003   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
1004 }
1005
1006 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
1007
1008 #define ITEM_TYPE_BUFFER        0
1009 #define ITEM_TYPE_LOST          1
1010 #define ITEM_TYPE_EVENT         2
1011 #define ITEM_TYPE_QUERY         3
1012
1013 static RTPJitterBufferItem *
1014 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
1015     guint seqnum, guint count, guint rtptime)
1016 {
1017   RTPJitterBufferItem *item;
1018
1019   item = g_slice_new (RTPJitterBufferItem);
1020   item->data = data;
1021   item->next = NULL;
1022   item->prev = NULL;
1023   item->type = type;
1024   item->dts = dts;
1025   item->pts = pts;
1026   item->seqnum = seqnum;
1027   item->count = count;
1028   item->rtptime = rtptime;
1029
1030   return item;
1031 }
1032
1033 static void
1034 free_item (RTPJitterBufferItem * item)
1035 {
1036   g_return_if_fail (item != NULL);
1037
1038   if (item->data && item->type != ITEM_TYPE_QUERY)
1039     gst_mini_object_unref (item->data);
1040   g_slice_free (RTPJitterBufferItem, item);
1041 }
1042
1043 static void
1044 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
1045 {
1046   GList **l = user_data;
1047
1048   if (item->data && item->type == ITEM_TYPE_EVENT
1049       && GST_EVENT_IS_STICKY (item->data)) {
1050     *l = g_list_prepend (*l, item->data);
1051   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
1052     gst_mini_object_unref (item->data);
1053   }
1054   g_slice_free (RTPJitterBufferItem, item);
1055 }
1056
1057 static void
1058 gst_rtp_jitter_buffer_finalize (GObject * object)
1059 {
1060   GstRtpJitterBuffer *jitterbuffer;
1061   GstRtpJitterBufferPrivate *priv;
1062
1063   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1064   priv = jitterbuffer->priv;
1065
1066   g_array_free (priv->timers, TRUE);
1067   timer_queue_free (priv->rtx_stats_timers);
1068   g_mutex_clear (&priv->jbuf_lock);
1069   g_cond_clear (&priv->jbuf_timer);
1070   g_cond_clear (&priv->jbuf_event);
1071   g_cond_clear (&priv->jbuf_query);
1072
1073   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1074   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1075   g_queue_clear (&priv->gap_packets);
1076   g_object_unref (priv->jbuf);
1077
1078   G_OBJECT_CLASS (parent_class)->finalize (object);
1079 }
1080
1081 static GstIterator *
1082 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
1083 {
1084   GstRtpJitterBuffer *jitterbuffer;
1085   GstPad *otherpad = NULL;
1086   GstIterator *it = NULL;
1087   GValue val = { 0, };
1088
1089   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1090
1091   if (pad == jitterbuffer->priv->sinkpad) {
1092     otherpad = jitterbuffer->priv->srcpad;
1093   } else if (pad == jitterbuffer->priv->srcpad) {
1094     otherpad = jitterbuffer->priv->sinkpad;
1095   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
1096     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1097   }
1098
1099   if (it == NULL) {
1100     g_value_init (&val, GST_TYPE_PAD);
1101     g_value_set_object (&val, otherpad);
1102     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1103     g_value_unset (&val);
1104   }
1105
1106   return it;
1107 }
1108
1109 static GstPad *
1110 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1111 {
1112   GstRtpJitterBufferPrivate *priv;
1113
1114   priv = jitterbuffer->priv;
1115
1116   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
1117
1118   priv->rtcpsinkpad =
1119       gst_pad_new_from_static_template
1120       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
1121   gst_pad_set_chain_function (priv->rtcpsinkpad,
1122       gst_rtp_jitter_buffer_chain_rtcp);
1123   gst_pad_set_event_function (priv->rtcpsinkpad,
1124       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
1125   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
1126       gst_rtp_jitter_buffer_iterate_internal_links);
1127   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
1128   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1129
1130   return priv->rtcpsinkpad;
1131 }
1132
1133 static void
1134 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1135 {
1136   GstRtpJitterBufferPrivate *priv;
1137
1138   priv = jitterbuffer->priv;
1139
1140   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
1141
1142   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
1143
1144   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1145   priv->rtcpsinkpad = NULL;
1146 }
1147
1148 static GstPad *
1149 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
1150     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
1151 {
1152   GstRtpJitterBuffer *jitterbuffer;
1153   GstElementClass *klass;
1154   GstPad *result;
1155   GstRtpJitterBufferPrivate *priv;
1156
1157   g_return_val_if_fail (templ != NULL, NULL);
1158   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
1159
1160   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1161   priv = jitterbuffer->priv;
1162   klass = GST_ELEMENT_GET_CLASS (element);
1163
1164   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1165
1166   /* figure out the template */
1167   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1168     if (priv->rtcpsinkpad != NULL)
1169       goto exists;
1170
1171     result = create_rtcp_sink (jitterbuffer);
1172   } else
1173     goto wrong_template;
1174
1175   return result;
1176
1177   /* ERRORS */
1178 wrong_template:
1179   {
1180     g_warning ("rtpjitterbuffer: this is not our template");
1181     return NULL;
1182   }
1183 exists:
1184   {
1185     g_warning ("rtpjitterbuffer: pad already requested");
1186     return NULL;
1187   }
1188 }
1189
1190 static void
1191 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1192 {
1193   GstRtpJitterBuffer *jitterbuffer;
1194   GstRtpJitterBufferPrivate *priv;
1195
1196   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1197   g_return_if_fail (GST_IS_PAD (pad));
1198
1199   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1200   priv = jitterbuffer->priv;
1201
1202   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1203
1204   if (priv->rtcpsinkpad == pad) {
1205     remove_rtcp_sink (jitterbuffer);
1206   } else
1207     goto wrong_pad;
1208
1209   return;
1210
1211   /* ERRORS */
1212 wrong_pad:
1213   {
1214     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1215     return;
1216   }
1217 }
1218
1219 static GstClock *
1220 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1221 {
1222   return gst_system_clock_obtain ();
1223 }
1224
1225 static gboolean
1226 gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
1227 {
1228   GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1229
1230   rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
1231
1232   return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
1233 }
1234
1235 static void
1236 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1237 {
1238   GstRtpJitterBufferPrivate *priv;
1239
1240   priv = jitterbuffer->priv;
1241
1242   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1243
1244   JBUF_LOCK (priv);
1245   priv->clock_rate = -1;
1246   /* do not clear current content, but refresh state for new arrival */
1247   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1248   rtp_jitter_buffer_reset_skew (priv->jbuf);
1249   JBUF_UNLOCK (priv);
1250 }
1251
1252 static GstClockTime
1253 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1254     guint64 offset)
1255 {
1256   GstRtpJitterBufferPrivate *priv;
1257   GstClockTime last_out;
1258   RTPJitterBufferItem *item;
1259
1260   priv = jbuf->priv;
1261
1262   JBUF_LOCK (priv);
1263   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1264       active, GST_TIME_ARGS (offset));
1265
1266   if (active != priv->active) {
1267     /* add the amount of time spent in paused to the output offset. All
1268      * outgoing buffers will have this offset applied to their timestamps in
1269      * order to make them arrive in time in the sink. */
1270     priv->out_offset = offset;
1271     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1272         GST_TIME_ARGS (priv->out_offset));
1273     priv->active = active;
1274     JBUF_SIGNAL_EVENT (priv);
1275   }
1276   if (!active) {
1277     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1278   }
1279   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1280     /* head buffer timestamp and offset gives our output time */
1281     last_out = item->dts + priv->ts_offset;
1282   } else {
1283     /* use last known time when the buffer is empty */
1284     last_out = priv->last_out_time;
1285   }
1286   JBUF_UNLOCK (priv);
1287
1288   return last_out;
1289 }
1290
1291 static GstCaps *
1292 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1293 {
1294   GstRtpJitterBuffer *jitterbuffer;
1295   GstRtpJitterBufferPrivate *priv;
1296   GstPad *other;
1297   GstCaps *caps;
1298   GstCaps *templ;
1299
1300   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1301   priv = jitterbuffer->priv;
1302
1303   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1304
1305   caps = gst_pad_peer_query_caps (other, filter);
1306
1307   templ = gst_pad_get_pad_template_caps (pad);
1308   if (caps == NULL) {
1309     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1310     caps = templ;
1311   } else {
1312     GstCaps *intersect;
1313
1314     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1315
1316     intersect = gst_caps_intersect (caps, templ);
1317     gst_caps_unref (caps);
1318     gst_caps_unref (templ);
1319
1320     caps = intersect;
1321   }
1322   gst_object_unref (jitterbuffer);
1323
1324   return caps;
1325 }
1326
1327 /*
1328  * Must be called with JBUF_LOCK held
1329  */
1330
1331 static gboolean
1332 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1333     GstCaps * caps, gint pt)
1334 {
1335   GstRtpJitterBufferPrivate *priv;
1336   GstStructure *caps_struct;
1337   guint val;
1338   gint payload = -1;
1339   GstClockTime tval;
1340   const gchar *ts_refclk, *mediaclk;
1341
1342   priv = jitterbuffer->priv;
1343
1344   /* first parse the caps */
1345   caps_struct = gst_caps_get_structure (caps, 0);
1346
1347   GST_DEBUG_OBJECT (jitterbuffer, "got caps %" GST_PTR_FORMAT, caps);
1348
1349   if (gst_structure_get_int (caps_struct, "payload", &payload) && pt != -1
1350       && payload != pt) {
1351     GST_ERROR_OBJECT (jitterbuffer,
1352         "Got caps with wrong payload type (got %d, expected %d)", payload, pt);
1353     return FALSE;
1354   }
1355
1356   if (payload != -1) {
1357     GST_DEBUG_OBJECT (jitterbuffer, "Got payload type %d", payload);
1358     priv->last_pt = payload;
1359   }
1360
1361   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1362    * measure the amount of data in the buffer */
1363   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1364     goto error;
1365
1366   if (priv->clock_rate <= 0)
1367     goto wrong_rate;
1368
1369   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1370
1371   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1372
1373   gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
1374
1375   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1376    * can use this to track the amount of time elapsed on the sender. */
1377   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1378     priv->clock_base = val;
1379   else
1380     priv->clock_base = -1;
1381
1382   priv->ext_timestamp = priv->clock_base;
1383
1384   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1385       priv->clock_base);
1386
1387   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1388     /* first expected seqnum, only update when we didn't have a previous base. */
1389     if (priv->next_in_seqnum == -1)
1390       priv->next_in_seqnum = val;
1391     if (priv->next_seqnum == -1) {
1392       priv->next_seqnum = val;
1393       JBUF_SIGNAL_EVENT (priv);
1394     }
1395     priv->seqnum_base = val;
1396   } else {
1397     priv->seqnum_base = -1;
1398   }
1399
1400   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1401
1402   /* the start and stop times. The seqnum-base corresponds to the start time. We
1403    * will keep track of the seqnums on the output and when we reach the one
1404    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1405   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1406     priv->npt_start = tval;
1407   else
1408     priv->npt_start = 0;
1409
1410   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1411     priv->npt_stop = tval;
1412   else
1413     priv->npt_stop = -1;
1414
1415   GST_DEBUG_OBJECT (jitterbuffer,
1416       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1417       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1418
1419   if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
1420     GstClock *clock = NULL;
1421     guint64 clock_offset = -1;
1422
1423     GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
1424         ts_refclk);
1425
1426     if (g_str_has_prefix (ts_refclk, "ntp=")) {
1427       if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
1428         GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
1429       } else {
1430         const gchar *host, *portstr;
1431         gchar *hostname;
1432         guint port;
1433
1434         host = ts_refclk + sizeof ("ntp=") - 1;
1435         if (host[0] == '[') {
1436           /* IPv6 */
1437           portstr = strchr (host, ']');
1438           if (portstr && portstr[1] == ':')
1439             portstr = portstr + 1;
1440           else
1441             portstr = NULL;
1442         } else {
1443           portstr = strrchr (host, ':');
1444         }
1445
1446
1447         if (!portstr || sscanf (portstr, ":%u", &port) != 1)
1448           port = 123;
1449
1450         if (portstr)
1451           hostname = g_strndup (host, (portstr - host));
1452         else
1453           hostname = g_strdup (host);
1454
1455         clock = gst_ntp_clock_new (NULL, hostname, port, 0);
1456         g_free (hostname);
1457       }
1458     } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
1459       const gchar *domainstr =
1460           ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
1461       guint domain;
1462
1463       if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
1464         domain = 0;
1465
1466       clock = gst_ptp_clock_new (NULL, domain);
1467     } else {
1468       GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
1469     }
1470
1471     if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
1472       GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
1473
1474       if (!g_str_has_prefix (mediaclk, "direct=")
1475           || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
1476         GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
1477       if (strstr (mediaclk, "rate=") != NULL) {
1478         GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
1479         clock_offset = -1;
1480       }
1481     }
1482
1483     rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
1484   } else {
1485     rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
1486   }
1487
1488   return TRUE;
1489
1490   /* ERRORS */
1491 error:
1492   {
1493     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1494     return FALSE;
1495   }
1496 wrong_rate:
1497   {
1498     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1499     return FALSE;
1500   }
1501 }
1502
1503 static void
1504 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1505 {
1506   GstRtpJitterBufferPrivate *priv;
1507
1508   priv = jitterbuffer->priv;
1509
1510   JBUF_LOCK (priv);
1511   /* mark ourselves as flushing */
1512   priv->srcresult = GST_FLOW_FLUSHING;
1513   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1514   /* this unblocks any waiting pops on the src pad task */
1515   JBUF_SIGNAL_EVENT (priv);
1516   JBUF_SIGNAL_QUERY (priv, FALSE);
1517   JBUF_UNLOCK (priv);
1518 }
1519
1520 static void
1521 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1522 {
1523   GstRtpJitterBufferPrivate *priv;
1524
1525   priv = jitterbuffer->priv;
1526
1527   JBUF_LOCK (priv);
1528   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1529   /* Mark as non flushing */
1530   priv->srcresult = GST_FLOW_OK;
1531   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1532   priv->last_popped_seqnum = -1;
1533   priv->last_out_time = -1;
1534   priv->next_seqnum = -1;
1535   priv->seqnum_base = -1;
1536   priv->ips_rtptime = -1;
1537   priv->ips_dts = GST_CLOCK_TIME_NONE;
1538   priv->packet_spacing = 0;
1539   priv->next_in_seqnum = -1;
1540   priv->clock_rate = -1;
1541   priv->last_pt = -1;
1542   priv->eos = FALSE;
1543   priv->estimated_eos = -1;
1544   priv->last_elapsed = 0;
1545   priv->ext_timestamp = -1;
1546   priv->avg_jitter = 0;
1547   priv->last_dts = -1;
1548   priv->last_rtptime = -1;
1549   priv->last_in_dts = 0;
1550   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1551   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1552   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1553   rtp_jitter_buffer_reset_skew (priv->jbuf);
1554   remove_all_timers (jitterbuffer);
1555   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1556   g_queue_clear (&priv->gap_packets);
1557   JBUF_UNLOCK (priv);
1558 }
1559
1560 static gboolean
1561 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1562     GstPadMode mode, gboolean active)
1563 {
1564   gboolean result;
1565   GstRtpJitterBuffer *jitterbuffer = NULL;
1566
1567   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1568
1569   switch (mode) {
1570     case GST_PAD_MODE_PUSH:
1571       if (active) {
1572         /* allow data processing */
1573         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1574
1575         /* start pushing out buffers */
1576         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1577         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1578             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1579       } else {
1580         /* make sure all data processing stops ASAP */
1581         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1582
1583         /* NOTE this will hardlock if the state change is called from the src pad
1584          * task thread because we will _join() the thread. */
1585         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1586         result = gst_pad_stop_task (pad);
1587       }
1588       break;
1589     default:
1590       result = FALSE;
1591       break;
1592   }
1593   return result;
1594 }
1595
1596 static GstStateChangeReturn
1597 gst_rtp_jitter_buffer_change_state (GstElement * element,
1598     GstStateChange transition)
1599 {
1600   GstRtpJitterBuffer *jitterbuffer;
1601   GstRtpJitterBufferPrivate *priv;
1602   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1603
1604   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1605   priv = jitterbuffer->priv;
1606
1607   switch (transition) {
1608     case GST_STATE_CHANGE_NULL_TO_READY:
1609       break;
1610     case GST_STATE_CHANGE_READY_TO_PAUSED:
1611       JBUF_LOCK (priv);
1612       /* reset negotiated values */
1613       priv->clock_rate = -1;
1614       priv->clock_base = -1;
1615       priv->peer_latency = 0;
1616       priv->last_pt = -1;
1617       /* block until we go to PLAYING */
1618       priv->blocked = TRUE;
1619       priv->timer_running = TRUE;
1620       priv->timer_thread =
1621           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1622       JBUF_UNLOCK (priv);
1623       break;
1624     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1625       JBUF_LOCK (priv);
1626       /* unblock to allow streaming in PLAYING */
1627       priv->blocked = FALSE;
1628       JBUF_SIGNAL_EVENT (priv);
1629       JBUF_SIGNAL_TIMER (priv);
1630       JBUF_UNLOCK (priv);
1631       break;
1632     default:
1633       break;
1634   }
1635
1636   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1637
1638   switch (transition) {
1639     case GST_STATE_CHANGE_READY_TO_PAUSED:
1640       /* we are a live element because we sync to the clock, which we can only
1641        * do in the PLAYING state */
1642       if (ret != GST_STATE_CHANGE_FAILURE)
1643         ret = GST_STATE_CHANGE_NO_PREROLL;
1644       break;
1645     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1646       JBUF_LOCK (priv);
1647       /* block to stop streaming when PAUSED */
1648       priv->blocked = TRUE;
1649       unschedule_current_timer (jitterbuffer);
1650       JBUF_UNLOCK (priv);
1651       if (ret != GST_STATE_CHANGE_FAILURE)
1652         ret = GST_STATE_CHANGE_NO_PREROLL;
1653       break;
1654     case GST_STATE_CHANGE_PAUSED_TO_READY:
1655       JBUF_LOCK (priv);
1656       gst_buffer_replace (&priv->last_sr, NULL);
1657       priv->timer_running = FALSE;
1658       unschedule_current_timer (jitterbuffer);
1659       JBUF_SIGNAL_TIMER (priv);
1660       JBUF_SIGNAL_QUERY (priv, FALSE);
1661       JBUF_UNLOCK (priv);
1662       g_thread_join (priv->timer_thread);
1663       priv->timer_thread = NULL;
1664       break;
1665     case GST_STATE_CHANGE_READY_TO_NULL:
1666       break;
1667     default:
1668       break;
1669   }
1670
1671   return ret;
1672 }
1673
1674 static gboolean
1675 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1676     GstEvent * event)
1677 {
1678   gboolean ret = TRUE;
1679   GstRtpJitterBuffer *jitterbuffer;
1680   GstRtpJitterBufferPrivate *priv;
1681
1682   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1683   priv = jitterbuffer->priv;
1684
1685   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1686
1687   switch (GST_EVENT_TYPE (event)) {
1688     case GST_EVENT_LATENCY:
1689     {
1690       GstClockTime latency;
1691
1692       gst_event_parse_latency (event, &latency);
1693
1694       GST_DEBUG_OBJECT (jitterbuffer,
1695           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1696
1697       JBUF_LOCK (priv);
1698       /* adjust the overall buffer delay to the total pipeline latency in
1699        * buffering mode because if downstream consumes too fast (because of
1700        * large latency or queues, we would start rebuffering again. */
1701       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1702           RTP_JITTER_BUFFER_MODE_BUFFER) {
1703         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1704       }
1705       JBUF_UNLOCK (priv);
1706
1707       ret = gst_pad_push_event (priv->sinkpad, event);
1708       break;
1709     }
1710     default:
1711       ret = gst_pad_push_event (priv->sinkpad, event);
1712       break;
1713   }
1714
1715   return ret;
1716 }
1717
1718 /* handles and stores the event in the jitterbuffer, must be called with
1719  * LOCK */
1720 static gboolean
1721 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1722 {
1723   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1724   RTPJitterBufferItem *item;
1725   gboolean head;
1726
1727   switch (GST_EVENT_TYPE (event)) {
1728     case GST_EVENT_CAPS:
1729     {
1730       GstCaps *caps;
1731
1732       gst_event_parse_caps (event, &caps);
1733       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, -1);
1734       break;
1735     }
1736     case GST_EVENT_SEGMENT:
1737     {
1738       GstSegment segment;
1739       gst_event_copy_segment (event, &segment);
1740
1741       /* we need time for now */
1742       if (segment.format != GST_FORMAT_TIME) {
1743         GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
1744         gst_event_unref (event);
1745
1746         gst_segment_init (&segment, GST_FORMAT_TIME);
1747         event = gst_event_new_segment (&segment);
1748       }
1749
1750       priv->segment = segment;
1751       break;
1752     }
1753     case GST_EVENT_EOS:
1754       priv->eos = TRUE;
1755       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1756       break;
1757     default:
1758       break;
1759   }
1760
1761
1762   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1763   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1764   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
1765   if (head)
1766     JBUF_SIGNAL_EVENT (priv);
1767
1768   return TRUE;
1769 }
1770
1771 static gboolean
1772 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1773     GstEvent * event)
1774 {
1775   gboolean ret = TRUE;
1776   GstRtpJitterBuffer *jitterbuffer;
1777   GstRtpJitterBufferPrivate *priv;
1778
1779   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1780   priv = jitterbuffer->priv;
1781
1782   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1783
1784   switch (GST_EVENT_TYPE (event)) {
1785     case GST_EVENT_FLUSH_START:
1786       ret = gst_pad_push_event (priv->srcpad, event);
1787       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1788       /* wait for the loop to go into PAUSED */
1789       gst_pad_pause_task (priv->srcpad);
1790       break;
1791     case GST_EVENT_FLUSH_STOP:
1792       ret = gst_pad_push_event (priv->srcpad, event);
1793       ret =
1794           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1795           GST_PAD_MODE_PUSH, TRUE);
1796       break;
1797     default:
1798       if (GST_EVENT_IS_SERIALIZED (event)) {
1799         /* serialized events go in the queue */
1800         JBUF_LOCK (priv);
1801         if (priv->srcresult != GST_FLOW_OK) {
1802           /* Errors in sticky event pushing are no problem and ignored here
1803            * as they will cause more meaningful errors during data flow.
1804            * For EOS events, that are not followed by data flow, we still
1805            * return FALSE here though.
1806            */
1807           if (!GST_EVENT_IS_STICKY (event) ||
1808               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1809             goto out_flow_error;
1810         }
1811         /* refuse more events on EOS */
1812         if (priv->eos)
1813           goto out_eos;
1814         ret = queue_event (jitterbuffer, event);
1815         JBUF_UNLOCK (priv);
1816       } else {
1817         /* non-serialized events are forwarded downstream immediately */
1818         ret = gst_pad_push_event (priv->srcpad, event);
1819       }
1820       break;
1821   }
1822   return ret;
1823
1824   /* ERRORS */
1825 out_flow_error:
1826   {
1827     GST_DEBUG_OBJECT (jitterbuffer,
1828         "refusing event, we have a downstream flow error: %s",
1829         gst_flow_get_name (priv->srcresult));
1830     JBUF_UNLOCK (priv);
1831     gst_event_unref (event);
1832     return FALSE;
1833   }
1834 out_eos:
1835   {
1836     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1837     JBUF_UNLOCK (priv);
1838     gst_event_unref (event);
1839     return FALSE;
1840   }
1841 }
1842
1843 static gboolean
1844 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1845     GstEvent * event)
1846 {
1847   gboolean ret = TRUE;
1848   GstRtpJitterBuffer *jitterbuffer;
1849
1850   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1851
1852   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1853
1854   switch (GST_EVENT_TYPE (event)) {
1855     case GST_EVENT_FLUSH_START:
1856       gst_event_unref (event);
1857       break;
1858     case GST_EVENT_FLUSH_STOP:
1859       gst_event_unref (event);
1860       break;
1861     default:
1862       ret = gst_pad_event_default (pad, parent, event);
1863       break;
1864   }
1865
1866   return ret;
1867 }
1868
1869 /*
1870  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1871  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1872  * GST_FLOW_FLUSHING when the element is shutting down. On success
1873  * GST_FLOW_OK is returned.
1874  */
1875 static GstFlowReturn
1876 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1877     guint8 pt)
1878 {
1879   GValue ret = { 0 };
1880   GValue args[2] = { {0}, {0} };
1881   GstCaps *caps;
1882   gboolean res;
1883
1884   g_value_init (&args[0], GST_TYPE_ELEMENT);
1885   g_value_set_object (&args[0], jitterbuffer);
1886   g_value_init (&args[1], G_TYPE_UINT);
1887   g_value_set_uint (&args[1], pt);
1888
1889   g_value_init (&ret, GST_TYPE_CAPS);
1890   g_value_set_boxed (&ret, NULL);
1891
1892   JBUF_UNLOCK (jitterbuffer->priv);
1893   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1894       &ret);
1895   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1896
1897   g_value_unset (&args[0]);
1898   g_value_unset (&args[1]);
1899   caps = (GstCaps *) g_value_dup_boxed (&ret);
1900   g_value_unset (&ret);
1901   if (!caps)
1902     goto no_caps;
1903
1904   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
1905   gst_caps_unref (caps);
1906
1907   if (G_UNLIKELY (!res))
1908     goto parse_failed;
1909
1910   return GST_FLOW_OK;
1911
1912   /* ERRORS */
1913 no_caps:
1914   {
1915     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1916     return GST_FLOW_ERROR;
1917   }
1918 out_flushing:
1919   {
1920     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1921     return GST_FLOW_FLUSHING;
1922   }
1923 parse_failed:
1924   {
1925     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1926     return GST_FLOW_ERROR;
1927   }
1928 }
1929
1930 /* call with jbuf lock held */
1931 static GstMessage *
1932 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1933 {
1934   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1935   GstMessage *message = NULL;
1936
1937   if (percent == -1)
1938     return NULL;
1939
1940   /* Post a buffering message */
1941   if (priv->last_percent != percent) {
1942     priv->last_percent = percent;
1943     message =
1944         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1945     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1946   }
1947
1948   return message;
1949 }
1950
1951 static GstClockTime
1952 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1953 {
1954   GstRtpJitterBufferPrivate *priv;
1955
1956   priv = jitterbuffer->priv;
1957
1958   if (timestamp == -1)
1959     return -1;
1960
1961   /* apply the timestamp offset, this is used for inter stream sync */
1962   timestamp += priv->ts_offset;
1963   /* add the offset, this is used when buffering */
1964   timestamp += priv->out_offset;
1965
1966   return timestamp;
1967 }
1968
1969 static TimerQueue *
1970 timer_queue_new (void)
1971 {
1972   TimerQueue *queue;
1973
1974   queue = g_slice_new (TimerQueue);
1975   queue->timers = g_queue_new ();
1976   queue->hashtable = g_hash_table_new (NULL, NULL);
1977
1978   return queue;
1979 }
1980
1981 static void
1982 timer_queue_free (TimerQueue * queue)
1983 {
1984   if (!queue)
1985     return;
1986
1987   g_hash_table_destroy (queue->hashtable);
1988   g_queue_free_full (queue->timers, g_free);
1989   g_slice_free (TimerQueue, queue);
1990 }
1991
1992 static void
1993 timer_queue_append (TimerQueue * queue, const TimerData * timer,
1994     GstClockTime timeout, gboolean lost)
1995 {
1996   TimerData *copy;
1997
1998   copy = g_memdup (timer, sizeof (*timer));
1999   copy->timeout = timeout;
2000   copy->type = lost ? TIMER_TYPE_LOST : TIMER_TYPE_EXPECTED;
2001   copy->idx = -1;
2002
2003   GST_LOG ("Append rtx-stats timer #%d, %" GST_TIME_FORMAT,
2004       copy->seqnum, GST_TIME_ARGS (copy->timeout));
2005   g_queue_push_tail (queue->timers, copy);
2006   g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (copy->seqnum), copy);
2007 }
2008
2009 static void
2010 timer_queue_clear_until (TimerQueue * queue, GstClockTime timeout)
2011 {
2012   TimerData *test;
2013
2014   test = g_queue_peek_head (queue->timers);
2015   while (test && test->timeout < timeout) {
2016     GST_LOG ("Pop rtx-stats timer #%d, %" GST_TIME_FORMAT " < %"
2017         GST_TIME_FORMAT, test->seqnum, GST_TIME_ARGS (test->timeout),
2018         GST_TIME_ARGS (timeout));
2019     g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (test->seqnum));
2020     g_free (g_queue_pop_head (queue->timers));
2021     test = g_queue_peek_head (queue->timers);
2022   }
2023 }
2024
2025 static TimerData *
2026 timer_queue_find (TimerQueue * queue, guint16 seqnum)
2027 {
2028   return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum));
2029 }
2030
2031 static TimerData *
2032 find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2033 {
2034   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2035   TimerData *timer = NULL;
2036   gint i, len;
2037
2038   len = priv->timers->len;
2039   for (i = 0; i < len; i++) {
2040     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2041     if (test->seqnum == seqnum) {
2042       timer = test;
2043       break;
2044     }
2045   }
2046   return timer;
2047 }
2048
2049 static void
2050 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
2051 {
2052   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2053
2054   if (priv->clock_id) {
2055     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
2056     gst_clock_id_unschedule (priv->clock_id);
2057     priv->clock_id = NULL;
2058   }
2059 }
2060
2061 static GstClockTime
2062 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2063 {
2064   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2065   GstClockTime test_timeout;
2066
2067   if ((test_timeout = timer->timeout) == -1)
2068     return -1;
2069
2070   if (timer->type != TIMER_TYPE_EXPECTED) {
2071     /* add our latency and offset to get output times. */
2072     test_timeout = apply_offset (jitterbuffer, test_timeout);
2073     test_timeout += priv->latency_ns;
2074   }
2075   return test_timeout;
2076 }
2077
2078 static void
2079 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2080 {
2081   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2082
2083   if (priv->clock_id) {
2084     GstClockTime timeout = get_timeout (jitterbuffer, timer);
2085
2086     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
2087         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
2088
2089     if (timeout == -1 || timeout < priv->timer_timeout)
2090       unschedule_current_timer (jitterbuffer);
2091   }
2092 }
2093
2094 static TimerData *
2095 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2096     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
2097     GstClockTime duration)
2098 {
2099   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2100   TimerData *timer;
2101   gint len;
2102
2103   GST_DEBUG_OBJECT (jitterbuffer,
2104       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
2105       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
2106       GST_TIME_ARGS (delay));
2107
2108   len = priv->timers->len;
2109   g_array_set_size (priv->timers, len + 1);
2110   timer = &g_array_index (priv->timers, TimerData, len);
2111   timer->idx = len;
2112   timer->type = type;
2113   timer->seqnum = seqnum;
2114   timer->num = num;
2115   timer->timeout = timeout + delay;
2116   timer->duration = duration;
2117   if (type == TIMER_TYPE_EXPECTED) {
2118     timer->rtx_base = timeout;
2119     timer->rtx_delay = delay;
2120     timer->rtx_retry = 0;
2121   }
2122   timer->rtx_last = GST_CLOCK_TIME_NONE;
2123   timer->num_rtx_retry = 0;
2124   timer->num_rtx_received = 0;
2125   recalculate_timer (jitterbuffer, timer);
2126   JBUF_SIGNAL_TIMER (priv);
2127
2128   return timer;
2129 }
2130
2131 static void
2132 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2133     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
2134 {
2135   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2136   gboolean seqchange, timechange;
2137   guint16 oldseq;
2138
2139   seqchange = timer->seqnum != seqnum;
2140   timechange = timer->timeout != timeout;
2141
2142   if (!seqchange && !timechange)
2143     return;
2144
2145   oldseq = timer->seqnum;
2146
2147   GST_DEBUG_OBJECT (jitterbuffer,
2148       "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT
2149       "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum,
2150       GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (timeout + delay));
2151
2152   timer->timeout = timeout + delay;
2153   timer->seqnum = seqnum;
2154   if (reset) {
2155     GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT
2156         "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay),
2157         GST_TIME_ARGS (delay));
2158     timer->rtx_base = timeout;
2159     timer->rtx_delay = delay;
2160     timer->rtx_retry = 0;
2161   }
2162   if (seqchange) {
2163     timer->num_rtx_retry = 0;
2164     timer->num_rtx_received = 0;
2165   }
2166
2167   if (priv->clock_id) {
2168     /* we changed the seqnum and there is a timer currently waiting with this
2169      * seqnum, unschedule it */
2170     if (seqchange && priv->timer_seqnum == oldseq)
2171       unschedule_current_timer (jitterbuffer);
2172     /* we changed the time, check if it is earlier than what we are waiting
2173      * for and unschedule if so */
2174     else if (timechange)
2175       recalculate_timer (jitterbuffer, timer);
2176   }
2177 }
2178
2179 static TimerData *
2180 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2181     guint16 seqnum, GstClockTime timeout)
2182 {
2183   TimerData *timer;
2184
2185   /* find the seqnum timer */
2186   timer = find_timer (jitterbuffer, seqnum);
2187   if (timer == NULL) {
2188     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
2189   } else {
2190     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
2191   }
2192   return timer;
2193 }
2194
2195 static void
2196 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2197 {
2198   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2199   guint idx;
2200
2201   if (timer->idx == -1)
2202     return;
2203
2204   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
2205     unschedule_current_timer (jitterbuffer);
2206
2207   idx = timer->idx;
2208   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
2209   g_array_remove_index_fast (priv->timers, idx);
2210   timer->idx = idx;
2211 }
2212
2213 static void
2214 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
2215 {
2216   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2217   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
2218   g_array_set_size (priv->timers, 0);
2219   unschedule_current_timer (jitterbuffer);
2220 }
2221
2222 /* get the extra delay to wait before sending RTX */
2223 static GstClockTime
2224 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
2225 {
2226   GstClockTime delay;
2227
2228   if (priv->rtx_delay == -1) {
2229     if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
2230       delay = DEFAULT_AUTO_RTX_DELAY;
2231     } else {
2232       /* jitter is in nanoseconds, maximum of 2x jitter and half the
2233        * packet spacing is a good margin */
2234       delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
2235     }
2236   } else {
2237     delay = priv->rtx_delay * GST_MSECOND;
2238   }
2239   if (priv->rtx_min_delay > 0)
2240     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
2241
2242   return delay;
2243 }
2244
2245 /* Check if packet with seqnum is already considered definitely lost by being
2246  * part of a "lost timer" for multiple packets */
2247 static gboolean
2248 already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2249 {
2250   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2251   gint i, len;
2252
2253   len = priv->timers->len;
2254   for (i = 0; i < len; i++) {
2255     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2256     gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2257
2258     if (test->num > 1 && test->type == TIMER_TYPE_LOST && gap >= 0 &&
2259         gap < test->num) {
2260       GST_DEBUG ("seqnum #%d already considered definitely lost (#%d->#%d)",
2261           seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff);
2262       return TRUE;
2263     }
2264   }
2265
2266   return FALSE;
2267 }
2268
2269 /* we just received a packet with seqnum and dts.
2270  *
2271  * First check for old seqnum that we are still expecting. If the gap with the
2272  * current seqnum is too big, unschedule the timeouts.
2273  *
2274  * If we have a valid packet spacing estimate we can set a timer for when we
2275  * should receive the next packet.
2276  * If we don't have a valid estimate, we remove any timer we might have
2277  * had for this packet.
2278  */
2279 static void
2280 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
2281     GstClockTime dts, gboolean do_next_seqnum, gboolean is_rtx,
2282     TimerData * timer)
2283 {
2284   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2285
2286   /* go through all timers and unschedule the ones with a large gap */
2287   if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
2288     gint i, len;
2289     len = priv->timers->len;
2290     for (i = 0; i < len; i++) {
2291       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2292       gint gap;
2293
2294       gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2295
2296       GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
2297           test->type, test->seqnum, seqnum, gap);
2298
2299       if (gap > priv->rtx_delay_reorder) {
2300         /* max gap, we exceeded the max reorder distance and we don't expect the
2301          * missing packet to be this reordered */
2302         if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
2303           reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
2304       }
2305     }
2306   }
2307
2308   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
2309       && priv->do_retransmission && priv->rtx_next_seqnum;
2310
2311   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2312     if (timer->num_rtx_retry > 0) {
2313       if (is_rtx) {
2314         update_rtx_stats (jitterbuffer, timer, dts, TRUE);
2315         /* don't try to estimate the next seqnum because this is a retransmitted
2316          * packet and it probably did not arrive with the expected packet
2317          * spacing. */
2318         do_next_seqnum = FALSE;
2319       }
2320
2321       if (!is_rtx || timer->num_rtx_retry > 1) {
2322         /* Store timer in order to record stats when/if the retransmitted
2323          * packet arrives. We should also store timer information if we've
2324          * requested retransmission more than once since we may receive
2325          * several retransmitted packets. For accuracy we should update the
2326          * stats also when the redundant retransmitted packets arrives. */
2327         timer_queue_append (priv->rtx_stats_timers, timer,
2328             dts + priv->rtx_stats_timeout * GST_MSECOND, FALSE);
2329       }
2330     }
2331   }
2332
2333   if (do_next_seqnum && dts != GST_CLOCK_TIME_NONE) {
2334     GstClockTime expected, delay;
2335
2336     /* calculate expected arrival time of the next seqnum */
2337     expected = dts + priv->packet_spacing;
2338
2339     delay = get_rtx_delay (priv);
2340
2341     /* and update/install timer for next seqnum */
2342     GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer #%d, expected %"
2343         GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", packet-spacing %"
2344         GST_TIME_FORMAT ", jitter %" GST_TIME_FORMAT, priv->next_in_seqnum,
2345         GST_TIME_ARGS (expected), GST_TIME_ARGS (delay),
2346         GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter));
2347
2348     if (timer) {
2349       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
2350           delay, TRUE);
2351     } else {
2352       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
2353           expected, delay, priv->packet_spacing);
2354     }
2355   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2356     /* if we had a timer, remove it, we don't know when to expect the next
2357      * packet. */
2358     remove_timer (jitterbuffer, timer);
2359   }
2360 }
2361
2362 static void
2363 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2364     GstClockTime dts)
2365 {
2366   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2367
2368   /* we need consecutive seqnums with a different
2369    * rtptime to estimate the packet spacing. */
2370   if (priv->ips_rtptime != rtptime) {
2371     /* rtptime changed, check dts diff */
2372     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
2373       GstClockTime new_packet_spacing = dts - priv->ips_dts;
2374       GstClockTime old_packet_spacing = priv->packet_spacing;
2375
2376       /* Biased towards bigger packet spacings to prevent
2377        * too many unneeded retransmission requests for next
2378        * packets that just arrive a little later than we would
2379        * expect */
2380       if (old_packet_spacing > new_packet_spacing)
2381         priv->packet_spacing =
2382             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2383       else if (old_packet_spacing > 0)
2384         priv->packet_spacing =
2385             (3 * new_packet_spacing + old_packet_spacing) / 4;
2386       else
2387         priv->packet_spacing = new_packet_spacing;
2388
2389       GST_DEBUG_OBJECT (jitterbuffer,
2390           "new packet spacing %" GST_TIME_FORMAT
2391           " old packet spacing %" GST_TIME_FORMAT
2392           " combined to %" GST_TIME_FORMAT,
2393           GST_TIME_ARGS (new_packet_spacing),
2394           GST_TIME_ARGS (old_packet_spacing),
2395           GST_TIME_ARGS (priv->packet_spacing));
2396     }
2397     priv->ips_rtptime = rtptime;
2398     priv->ips_dts = dts;
2399   }
2400 }
2401
2402 static void
2403 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2404     guint16 seqnum, GstClockTime dts, gint gap)
2405 {
2406   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2407   GstClockTime total_duration, duration, expected_dts, delay;
2408   TimerType type;
2409
2410   GST_DEBUG_OBJECT (jitterbuffer,
2411       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2412       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
2413
2414   if (dts == GST_CLOCK_TIME_NONE) {
2415     GST_WARNING_OBJECT (jitterbuffer, "Have no DTS");
2416     return;
2417   }
2418
2419   /* the total duration spanned by the missing packets */
2420   if (dts >= priv->last_in_dts)
2421     total_duration = dts - priv->last_in_dts;
2422   else
2423     total_duration = 0;
2424
2425   /* interpolate between the current time and the last time based on
2426    * number of packets we are missing, this is the estimated duration
2427    * for the missing packet based on equidistant packet spacing. */
2428   duration = total_duration / (gap + 1);
2429
2430   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2431       GST_TIME_ARGS (duration));
2432
2433   if (total_duration > priv->latency_ns) {
2434     GstClockTime gap_time;
2435     guint lost_packets;
2436
2437     if (duration > 0) {
2438       GstClockTime gap_dur = gap * duration;
2439       if (gap_dur > priv->latency_ns)
2440         gap_time = gap_dur - priv->latency_ns;
2441       else
2442         gap_time = 0;
2443       lost_packets = gap_time / duration;
2444     } else {
2445       gap_time = total_duration - priv->latency_ns;
2446       lost_packets = gap;
2447     }
2448
2449     /* too many lost packets, some of the missing packets are already
2450      * too late and we can generate lost packet events for them. */
2451     GST_DEBUG_OBJECT (jitterbuffer,
2452         "lost packets (%d, #%d->#%d) duration too large %" GST_TIME_FORMAT
2453         " > %" GST_TIME_FORMAT ", consider %u lost (%" GST_TIME_FORMAT ")",
2454         gap, expected, seqnum - 1, GST_TIME_ARGS (total_duration),
2455         GST_TIME_ARGS (priv->latency_ns), lost_packets,
2456         GST_TIME_ARGS (gap_time));
2457
2458     /* this timer will fire immediately and the lost event will be pushed from
2459      * the timer thread */
2460     if (lost_packets > 0) {
2461       add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2462           priv->last_in_dts + duration, 0, gap_time);
2463       expected += lost_packets;
2464       priv->last_in_dts += gap_time;
2465     }
2466   }
2467
2468   expected_dts = priv->last_in_dts + duration;
2469   delay = 0;
2470
2471   if (priv->do_retransmission) {
2472     TimerData *timer = find_timer (jitterbuffer, expected);
2473
2474     type = TIMER_TYPE_EXPECTED;
2475     delay = get_rtx_delay (priv);
2476
2477     /* if we had a timer for the first missing packet, update it. */
2478     if (timer && timer->type == TIMER_TYPE_EXPECTED) {
2479       GstClockTime timeout = timer->timeout;
2480
2481       timer->duration = duration;
2482       if (timeout > (expected_dts + delay) && timer->num_rtx_retry == 0) {
2483         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
2484             delay, TRUE);
2485       }
2486       expected++;
2487       expected_dts += duration;
2488     }
2489   } else {
2490     type = TIMER_TYPE_LOST;
2491   }
2492
2493   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2494     add_timer (jitterbuffer, type, expected, 0, expected_dts, delay, duration);
2495     expected_dts += duration;
2496     expected++;
2497   }
2498 }
2499
2500 static void
2501 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2502     guint rtptime)
2503 {
2504   gint32 rtpdiff;
2505   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2506   GstRtpJitterBufferPrivate *priv;
2507
2508   priv = jitterbuffer->priv;
2509
2510   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2511     goto no_time;
2512
2513   if (priv->last_dts != -1)
2514     dtsdiff = dts - priv->last_dts;
2515   else
2516     dtsdiff = 0;
2517
2518   if (priv->last_rtptime != -1)
2519     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2520   else
2521     rtpdiff = 0;
2522
2523   priv->last_dts = dts;
2524   priv->last_rtptime = rtptime;
2525
2526   if (rtpdiff > 0)
2527     rtpdiffns =
2528         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2529   else
2530     rtpdiffns =
2531         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2532
2533   diff = ABS (dtsdiff - rtpdiffns);
2534
2535   /* jitter is stored in nanoseconds */
2536   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2537
2538   GST_LOG_OBJECT (jitterbuffer,
2539       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2540       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2541       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2542       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2543
2544   return;
2545
2546   /* ERRORS */
2547 no_time:
2548   {
2549     GST_DEBUG_OBJECT (jitterbuffer,
2550         "no dts or no clock-rate, can't calculate jitter");
2551     return;
2552   }
2553 }
2554
2555 static gint
2556 compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
2557 {
2558   GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
2559   GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
2560   guint seq_a, seq_b;
2561
2562   gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
2563   seq_a = gst_rtp_buffer_get_seq (&rtp_a);
2564   gst_rtp_buffer_unmap (&rtp_a);
2565
2566   gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
2567   seq_b = gst_rtp_buffer_get_seq (&rtp_b);
2568   gst_rtp_buffer_unmap (&rtp_b);
2569
2570   return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
2571 }
2572
2573 static gboolean
2574 handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, gboolean future,
2575     GstBuffer * buffer, guint8 pt, guint16 seqnum, gint gap, guint max_dropout,
2576     guint max_misorder)
2577 {
2578   GstRtpJitterBufferPrivate *priv;
2579   guint gap_packets_length;
2580   gboolean reset = FALSE;
2581
2582   priv = jitterbuffer->priv;
2583
2584   if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
2585     GList *l;
2586     guint32 prev_gap_seq = -1;
2587     gboolean all_consecutive = TRUE;
2588
2589     g_queue_insert_sorted (&priv->gap_packets, buffer,
2590         (GCompareDataFunc) compare_buffer_seqnum, NULL);
2591
2592     for (l = priv->gap_packets.head; l; l = l->next) {
2593       GstBuffer *gap_buffer = l->data;
2594       GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2595       guint32 gap_seq;
2596
2597       gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2598
2599       all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
2600
2601       gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
2602       if (prev_gap_seq == -1)
2603         prev_gap_seq = gap_seq;
2604       else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
2605         all_consecutive = FALSE;
2606       else
2607         prev_gap_seq = gap_seq;
2608
2609       gst_rtp_buffer_unmap (&gap_rtp);
2610       if (!all_consecutive)
2611         break;
2612     }
2613
2614     if (all_consecutive && gap_packets_length > 3) {
2615       GST_DEBUG_OBJECT (jitterbuffer,
2616           "buffer too %s %d < %d, got 5 consecutive ones - reset",
2617           (future ? "new" : "old"), gap,
2618           (future ? max_dropout : -max_misorder));
2619       reset = TRUE;
2620     } else if (!all_consecutive) {
2621       g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2622       g_queue_clear (&priv->gap_packets);
2623       GST_DEBUG_OBJECT (jitterbuffer,
2624           "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
2625           (future ? "new" : "old"), gap,
2626           (future ? max_dropout : -max_misorder));
2627       buffer = NULL;
2628     } else {
2629       GST_DEBUG_OBJECT (jitterbuffer,
2630           "buffer too %s %d < %d, got %u consecutive ones - waiting",
2631           (future ? "new" : "old"), gap,
2632           (future ? max_dropout : -max_misorder), gap_packets_length + 1);
2633       buffer = NULL;
2634     }
2635   } else {
2636     GST_DEBUG_OBJECT (jitterbuffer,
2637         "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
2638         gap, -max_misorder);
2639     g_queue_push_tail (&priv->gap_packets, buffer);
2640     buffer = NULL;
2641   }
2642
2643   return reset;
2644 }
2645
2646 static GstClockTime
2647 get_current_running_time (GstRtpJitterBuffer * jitterbuffer)
2648 {
2649   GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer));
2650   GstClockTime running_time = GST_CLOCK_TIME_NONE;
2651
2652   if (clock) {
2653     GstClockTime base_time =
2654         gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer));
2655     GstClockTime clock_time = gst_clock_get_time (clock);
2656
2657     if (clock_time > base_time)
2658       running_time = clock_time - base_time;
2659     else
2660       running_time = 0;
2661
2662     gst_object_unref (clock);
2663   }
2664
2665   return running_time;
2666 }
2667
2668 static GstFlowReturn
2669 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2670     GstBuffer * buffer)
2671 {
2672   GstRtpJitterBuffer *jitterbuffer;
2673   GstRtpJitterBufferPrivate *priv;
2674   guint16 seqnum;
2675   guint32 expected, rtptime;
2676   GstFlowReturn ret = GST_FLOW_OK;
2677   GstClockTime dts, pts;
2678   guint64 latency_ts;
2679   gboolean head;
2680   gint percent = -1;
2681   guint8 pt;
2682   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2683   gboolean do_next_seqnum = FALSE;
2684   RTPJitterBufferItem *item;
2685   GstMessage *msg = NULL;
2686   gboolean estimated_dts = FALSE;
2687   guint32 packet_rate, max_dropout, max_misorder;
2688   TimerData *timer = NULL;
2689
2690   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2691
2692   priv = jitterbuffer->priv;
2693
2694   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2695     goto invalid_buffer;
2696
2697   pt = gst_rtp_buffer_get_payload_type (&rtp);
2698   seqnum = gst_rtp_buffer_get_seq (&rtp);
2699   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2700   gst_rtp_buffer_unmap (&rtp);
2701
2702   /* make sure we have PTS and DTS set */
2703   pts = GST_BUFFER_PTS (buffer);
2704   dts = GST_BUFFER_DTS (buffer);
2705   if (dts == -1)
2706     dts = pts;
2707   else if (pts == -1)
2708     pts = dts;
2709
2710   if (dts == -1) {
2711     /* If we have no DTS here, i.e. no capture time, get one from the
2712      * clock now to have something to calculate with in the future. */
2713     dts = get_current_running_time (jitterbuffer);
2714     pts = dts;
2715
2716     /* Remember that we estimated the DTS if we are running already
2717      * and this is not our first packet (or first packet after a reset).
2718      * If it's the first packet, we somehow must generate a timestamp for
2719      * everything, otherwise we can't calculate any times
2720      */
2721     estimated_dts = (priv->next_in_seqnum != -1);
2722   } else {
2723     /* take the DTS of the buffer. This is the time when the packet was
2724      * received and is used to calculate jitter and clock skew. We will adjust
2725      * this DTS with the smoothed value after processing it in the
2726      * jitterbuffer and assign it as the PTS. */
2727     /* bring to running time */
2728     dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2729   }
2730
2731   GST_DEBUG_OBJECT (jitterbuffer,
2732       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
2733       seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer),
2734       GST_BUFFER_IS_RETRANSMISSION (buffer));
2735
2736   JBUF_LOCK_CHECK (priv, out_flushing);
2737
2738   if (G_UNLIKELY (priv->last_pt != pt)) {
2739     GstCaps *caps;
2740
2741     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2742         pt);
2743
2744     priv->last_pt = pt;
2745     /* reset clock-rate so that we get a new one */
2746     priv->clock_rate = -1;
2747
2748     /* Try to get the clock-rate from the caps first if we can. If there are no
2749      * caps we must fire the signal to get the clock-rate. */
2750     if ((caps = gst_pad_get_current_caps (pad))) {
2751       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
2752       gst_caps_unref (caps);
2753     }
2754   }
2755
2756   if (G_UNLIKELY (priv->clock_rate == -1)) {
2757     /* no clock rate given on the caps, try to get one with the signal */
2758     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2759             pt) == GST_FLOW_FLUSHING)
2760       goto out_flushing;
2761
2762     if (G_UNLIKELY (priv->clock_rate == -1))
2763       goto no_clock_rate;
2764
2765     gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
2766   }
2767
2768   /* don't accept more data on EOS */
2769   if (G_UNLIKELY (priv->eos))
2770     goto have_eos;
2771
2772   if (!GST_BUFFER_IS_RETRANSMISSION (buffer))
2773     calculate_jitter (jitterbuffer, dts, rtptime);
2774
2775   if (priv->seqnum_base != -1) {
2776     gint gap;
2777
2778     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2779
2780     if (gap < 0) {
2781       GST_DEBUG_OBJECT (jitterbuffer,
2782           "packet seqnum #%d before seqnum-base #%d", seqnum,
2783           priv->seqnum_base);
2784       gst_buffer_unref (buffer);
2785       ret = GST_FLOW_OK;
2786       goto finished;
2787     } else if (gap > 16384) {
2788       /* From now on don't compare against the seqnum base anymore as
2789        * at some point in the future we will wrap around and also that
2790        * much reordering is very unlikely */
2791       priv->seqnum_base = -1;
2792     }
2793   }
2794
2795   expected = priv->next_in_seqnum;
2796
2797   packet_rate =
2798       gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx, seqnum, rtptime);
2799   max_dropout =
2800       gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
2801       priv->max_dropout_time);
2802   max_misorder =
2803       gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
2804       priv->max_misorder_time);
2805   GST_TRACE_OBJECT (jitterbuffer,
2806       "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
2807       max_dropout, max_misorder);
2808
2809   /* now check against our expected seqnum */
2810   if (G_LIKELY (expected != -1)) {
2811     gint gap;
2812
2813     /* now calculate gap */
2814     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2815
2816     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2817         expected, seqnum, gap);
2818
2819     if (G_LIKELY (gap == 0)) {
2820       /* packet is expected */
2821       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2822       do_next_seqnum = TRUE;
2823     } else {
2824       gboolean reset = FALSE;
2825
2826       if (gap < 0) {
2827         /* we received an old packet */
2828         if (G_UNLIKELY (gap != -1 && gap < -max_misorder)) {
2829           reset =
2830               handle_big_gap_buffer (jitterbuffer, FALSE, buffer, pt, seqnum,
2831               gap, max_dropout, max_misorder);
2832           buffer = NULL;
2833         } else {
2834           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2835         }
2836       } else {
2837         /* new packet, we are missing some packets */
2838         if (G_UNLIKELY (priv->timers->len >= max_dropout)) {
2839           /* If we have timers for more than RTP_MAX_DROPOUT packets
2840            * pending this means that we have a huge gap overall. We can
2841            * reset the jitterbuffer at this point because there's
2842            * just too much data missing to be able to do anything
2843            * sensible with the past data. Just try again from the
2844            * next packet */
2845           GST_WARNING_OBJECT (jitterbuffer,
2846               "%d pending timers > %d - resetting", priv->timers->len,
2847               max_dropout);
2848           reset = TRUE;
2849           gst_buffer_unref (buffer);
2850           buffer = NULL;
2851         } else if (G_UNLIKELY (gap >= max_dropout)) {
2852           reset =
2853               handle_big_gap_buffer (jitterbuffer, TRUE, buffer, pt, seqnum,
2854               gap, max_dropout, max_misorder);
2855           buffer = NULL;
2856         } else {
2857           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2858           /* fill in the gap with EXPECTED timers */
2859           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2860
2861           do_next_seqnum = TRUE;
2862         }
2863       }
2864       if (G_UNLIKELY (reset)) {
2865         GList *events = NULL, *l;
2866         GList *buffers;
2867
2868         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2869         rtp_jitter_buffer_flush (priv->jbuf,
2870             (GFunc) free_item_and_retain_events, &events);
2871         rtp_jitter_buffer_reset_skew (priv->jbuf);
2872         remove_all_timers (jitterbuffer);
2873         priv->discont = TRUE;
2874         priv->last_popped_seqnum = -1;
2875
2876         if (priv->gap_packets.head) {
2877           GstBuffer *gap_buffer = priv->gap_packets.head->data;
2878           GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2879
2880           gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2881           priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp);
2882           gst_rtp_buffer_unmap (&gap_rtp);
2883         } else {
2884           priv->next_seqnum = seqnum;
2885         }
2886
2887         priv->last_in_dts = -1;
2888         priv->next_in_seqnum = -1;
2889
2890         /* Insert all sticky events again in order, otherwise we would
2891          * potentially loose STREAM_START, CAPS or SEGMENT events
2892          */
2893         events = g_list_reverse (events);
2894         for (l = events; l; l = l->next) {
2895           RTPJitterBufferItem *item;
2896
2897           item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2898           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
2899         }
2900         g_list_free (events);
2901
2902         JBUF_SIGNAL_EVENT (priv);
2903
2904         /* reset spacing estimation when gap */
2905         priv->ips_rtptime = -1;
2906         priv->ips_dts = GST_CLOCK_TIME_NONE;
2907
2908         buffers = g_list_copy (priv->gap_packets.head);
2909         g_queue_clear (&priv->gap_packets);
2910
2911         priv->ips_rtptime = -1;
2912         priv->ips_dts = GST_CLOCK_TIME_NONE;
2913         JBUF_UNLOCK (jitterbuffer->priv);
2914
2915         for (l = buffers; l; l = l->next) {
2916           ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
2917           l->data = NULL;
2918           if (ret != GST_FLOW_OK) {
2919             l = l->next;
2920             break;
2921           }
2922         }
2923         for (; l; l = l->next)
2924           gst_buffer_unref (l->data);
2925         g_list_free (buffers);
2926
2927         return ret;
2928       }
2929       /* reset spacing estimation when gap */
2930       priv->ips_rtptime = -1;
2931       priv->ips_dts = GST_CLOCK_TIME_NONE;
2932     }
2933   } else {
2934     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2935
2936     /* we don't know what the next_in_seqnum should be, wait for the last
2937      * possible moment to push this buffer, maybe we get an earlier seqnum
2938      * while we wait */
2939     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2940     do_next_seqnum = TRUE;
2941     /* take rtptime and dts to calculate packet spacing */
2942     priv->ips_rtptime = rtptime;
2943     priv->ips_dts = dts;
2944   }
2945
2946   /* We had no huge gap, let's drop all the gap packets */
2947   if (buffer != NULL) {
2948     GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
2949     g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2950     g_queue_clear (&priv->gap_packets);
2951   } else {
2952     GST_DEBUG_OBJECT (jitterbuffer,
2953         "Had big gap, waiting for more consecutive packets");
2954     JBUF_UNLOCK (jitterbuffer->priv);
2955     return GST_FLOW_OK;
2956   }
2957
2958   if (do_next_seqnum) {
2959     priv->last_in_dts = dts;
2960     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2961   }
2962
2963   timer = find_timer (jitterbuffer, seqnum);
2964   if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
2965     if (!timer)
2966       timer = timer_queue_find (priv->rtx_stats_timers, seqnum);
2967     if (timer)
2968       timer->num_rtx_received++;
2969   }
2970
2971   /* let's check if this buffer is too late, we can only accept packets with
2972    * bigger seqnum than the one we last pushed. */
2973   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2974     gint gap;
2975
2976     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2977
2978     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2979     if (G_UNLIKELY (gap <= 0)) {
2980       if (priv->do_retransmission) {
2981         if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) {
2982           update_rtx_stats (jitterbuffer, timer, dts, FALSE);
2983           /* Only count the retranmitted packet too late if it has been
2984            * considered lost. If the original packet arrived before the
2985            * retransmitted we just count it as a duplicate. */
2986           if (timer->type != TIMER_TYPE_LOST)
2987             goto rtx_duplicate;
2988         }
2989       }
2990       goto too_late;
2991     }
2992   }
2993
2994   if (already_lost (jitterbuffer, seqnum))
2995     goto already_lost;
2996
2997   /* let's drop oldest packet if the queue is already full and drop-on-latency
2998    * is set. We can only do this when there actually is a latency. When no
2999    * latency is set, we just pump it in the queue and let the other end push it
3000    * out as fast as possible. */
3001   if (priv->latency_ms && priv->drop_on_latency) {
3002     latency_ts =
3003         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
3004
3005     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
3006       RTPJitterBufferItem *old_item;
3007
3008       old_item = rtp_jitter_buffer_peek (priv->jbuf);
3009
3010       if (IS_DROPABLE (old_item)) {
3011         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3012         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
3013             old_item);
3014         priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
3015         free_item (old_item);
3016       }
3017       /* we might have removed some head buffers, signal the pushing thread to
3018        * see if it can push now */
3019       JBUF_SIGNAL_EVENT (priv);
3020     }
3021   }
3022
3023   /* If we estimated the DTS, don't consider it in the clock skew calculations
3024    * later. The code above always sets dts to pts or the other way around if
3025    * any of those is valid in the buffer, so we know that if we estimated the
3026    * dts that both are unknown */
3027   if (estimated_dts)
3028     item =
3029         alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE,
3030         GST_CLOCK_TIME_NONE, seqnum, 1, rtptime);
3031   else
3032     item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
3033
3034   /* now insert the packet into the queue in sorted order. This function returns
3035    * FALSE if a packet with the same seqnum was already in the queue, meaning we
3036    * have a duplicate. */
3037   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
3038               &head, &percent,
3039               gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer))))) {
3040     if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer)
3041       update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3042     goto duplicate;
3043   }
3044
3045   /* update timers */
3046   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum,
3047       GST_BUFFER_IS_RETRANSMISSION (buffer), timer);
3048
3049   /* we had an unhandled SR, handle it now */
3050   if (priv->last_sr)
3051     do_handle_sync (jitterbuffer);
3052
3053   if (G_UNLIKELY (head)) {
3054     /* signal addition of new buffer when the _loop is waiting. */
3055     if (G_LIKELY (priv->active))
3056       JBUF_SIGNAL_EVENT (priv);
3057
3058     /* let's unschedule and unblock any waiting buffers. We only want to do this
3059      * when the head buffer changed */
3060     if (G_UNLIKELY (priv->clock_id)) {
3061       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
3062       unschedule_current_timer (jitterbuffer);
3063     }
3064   }
3065
3066   GST_DEBUG_OBJECT (jitterbuffer,
3067       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
3068       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
3069
3070   msg = check_buffering_percent (jitterbuffer, percent);
3071
3072 finished:
3073   JBUF_UNLOCK (priv);
3074
3075   if (msg)
3076     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3077
3078   return ret;
3079
3080   /* ERRORS */
3081 invalid_buffer:
3082   {
3083     /* this is not fatal but should be filtered earlier */
3084     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3085         ("Received invalid RTP payload, dropping"));
3086     gst_buffer_unref (buffer);
3087     return GST_FLOW_OK;
3088   }
3089 no_clock_rate:
3090   {
3091     GST_WARNING_OBJECT (jitterbuffer,
3092         "No clock-rate in caps!, dropping buffer");
3093     gst_buffer_unref (buffer);
3094     goto finished;
3095   }
3096 out_flushing:
3097   {
3098     ret = priv->srcresult;
3099     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
3100     gst_buffer_unref (buffer);
3101     goto finished;
3102   }
3103 have_eos:
3104   {
3105     ret = GST_FLOW_EOS;
3106     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
3107     gst_buffer_unref (buffer);
3108     goto finished;
3109   }
3110 too_late:
3111   {
3112     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
3113         " popped, dropping", seqnum, priv->last_popped_seqnum);
3114     priv->num_late++;
3115     gst_buffer_unref (buffer);
3116     goto finished;
3117   }
3118 already_lost:
3119   {
3120     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as it was already "
3121         "considered lost", seqnum);
3122     priv->num_late++;
3123     gst_buffer_unref (buffer);
3124     goto finished;
3125   }
3126 duplicate:
3127   {
3128     GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
3129         seqnum);
3130     priv->num_duplicates++;
3131     free_item (item);
3132     goto finished;
3133   }
3134 rtx_duplicate:
3135   {
3136     GST_DEBUG_OBJECT (jitterbuffer,
3137         "Duplicate RTX packet #%d detected, dropping", seqnum);
3138     priv->num_duplicates++;
3139     gst_buffer_unref (buffer);
3140     goto finished;
3141   }
3142 }
3143
3144 static GstClockTime
3145 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
3146 {
3147   guint64 ext_time, elapsed;
3148   guint32 rtp_time;
3149   GstRtpJitterBufferPrivate *priv;
3150
3151   priv = jitterbuffer->priv;
3152   rtp_time = item->rtptime;
3153
3154   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
3155       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
3156
3157   ext_time = priv->ext_timestamp;
3158   ext_time = gst_rtp_buffer_ext_timestamp (&ext_time, rtp_time);
3159   if (ext_time < priv->ext_timestamp) {
3160     ext_time = priv->ext_timestamp;
3161   } else {
3162     priv->ext_timestamp = ext_time;
3163   }
3164
3165   if (ext_time > priv->clock_base)
3166     elapsed = ext_time - priv->clock_base;
3167   else
3168     elapsed = 0;
3169
3170   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
3171   return elapsed;
3172 }
3173
3174 static void
3175 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
3176     RTPJitterBufferItem * item)
3177 {
3178   guint64 total, elapsed, left, estimated;
3179   GstClockTime out_time;
3180   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3181
3182   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
3183       || priv->clock_base == -1 || priv->clock_rate <= 0)
3184     return;
3185
3186   /* compute the elapsed time */
3187   elapsed = compute_elapsed (jitterbuffer, item);
3188
3189   /* do nothing if elapsed time doesn't increment */
3190   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
3191     return;
3192
3193   priv->last_elapsed = elapsed;
3194
3195   /* this is the total time we need to play */
3196   total = priv->npt_stop - priv->npt_start;
3197   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
3198       GST_TIME_ARGS (total));
3199
3200   /* this is how much time there is left */
3201   if (total > elapsed)
3202     left = total - elapsed;
3203   else
3204     left = 0;
3205
3206   /* if we have less time left that the size of the buffer, we will not
3207    * be able to keep it filled, disabled buffering then */
3208   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
3209     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
3210         ", disable buffering close to EOS", GST_TIME_ARGS (left));
3211     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
3212   }
3213
3214   /* this is the current time as running-time */
3215   out_time = item->dts;
3216
3217   if (elapsed > 0)
3218     estimated = gst_util_uint64_scale (out_time, total, elapsed);
3219   else {
3220     /* if there is almost nothing left,
3221      * we may never advance enough to end up in the above case */
3222     if (total < GST_SECOND)
3223       estimated = GST_SECOND;
3224     else
3225       estimated = -1;
3226   }
3227   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
3228       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
3229
3230   if (estimated != -1 && priv->estimated_eos != estimated) {
3231     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
3232     priv->estimated_eos = estimated;
3233   }
3234 }
3235
3236 /* take a buffer from the queue and push it */
3237 static GstFlowReturn
3238 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
3239 {
3240   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3241   GstFlowReturn result = GST_FLOW_OK;
3242   RTPJitterBufferItem *item;
3243   GstBuffer *outbuf = NULL;
3244   GstEvent *outevent = NULL;
3245   GstQuery *outquery = NULL;
3246   GstClockTime dts, pts;
3247   gint percent = -1;
3248   gboolean do_push = TRUE;
3249   guint type;
3250   GstMessage *msg;
3251
3252   /* when we get here we are ready to pop and push the buffer */
3253   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3254   type = item->type;
3255
3256   switch (type) {
3257     case ITEM_TYPE_BUFFER:
3258
3259       /* we need to make writable to change the flags and timestamps */
3260       outbuf = gst_buffer_make_writable (item->data);
3261
3262       if (G_UNLIKELY (priv->discont)) {
3263         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
3264          * into the jitterbuffer so we can modify now. */
3265         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
3266         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
3267         priv->discont = FALSE;
3268       }
3269       if (G_UNLIKELY (priv->ts_discont)) {
3270         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
3271         priv->ts_discont = FALSE;
3272       }
3273
3274       dts =
3275           gst_segment_position_from_running_time (&priv->segment,
3276           GST_FORMAT_TIME, item->dts);
3277       pts =
3278           gst_segment_position_from_running_time (&priv->segment,
3279           GST_FORMAT_TIME, item->pts);
3280
3281       /* apply timestamp with offset to buffer now */
3282       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
3283       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
3284
3285       /* update the elapsed time when we need to check against the npt stop time. */
3286       update_estimated_eos (jitterbuffer, item);
3287
3288       priv->last_out_time = GST_BUFFER_PTS (outbuf);
3289       break;
3290     case ITEM_TYPE_LOST:
3291       priv->discont = TRUE;
3292       if (!priv->do_lost)
3293         do_push = FALSE;
3294       /* FALLTHROUGH */
3295     case ITEM_TYPE_EVENT:
3296       outevent = item->data;
3297       break;
3298     case ITEM_TYPE_QUERY:
3299       outquery = item->data;
3300       break;
3301   }
3302
3303   /* now we are ready to push the buffer. Save the seqnum and release the lock
3304    * so the other end can push stuff in the queue again. */
3305   if (seqnum != -1) {
3306     priv->last_popped_seqnum = seqnum;
3307     priv->next_seqnum = (seqnum + item->count) & 0xffff;
3308   }
3309   msg = check_buffering_percent (jitterbuffer, percent);
3310   JBUF_UNLOCK (priv);
3311
3312   item->data = NULL;
3313   free_item (item);
3314
3315   if (msg)
3316     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3317
3318   switch (type) {
3319     case ITEM_TYPE_BUFFER:
3320       /* push buffer */
3321       GST_DEBUG_OBJECT (jitterbuffer,
3322           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
3323           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
3324           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
3325       priv->num_pushed++;
3326       result = gst_pad_push (priv->srcpad, outbuf);
3327
3328       JBUF_LOCK_CHECK (priv, out_flushing);
3329       break;
3330     case ITEM_TYPE_LOST:
3331     case ITEM_TYPE_EVENT:
3332       /* We got not enough consecutive packets with a huge gap, we can
3333        * as well just drop them here now on EOS */
3334       if (outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3335         GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
3336         g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3337         g_queue_clear (&priv->gap_packets);
3338       }
3339
3340       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
3341           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
3342
3343       if (do_push)
3344         gst_pad_push_event (priv->srcpad, outevent);
3345       else if (outevent)
3346         gst_event_unref (outevent);
3347
3348       result = GST_FLOW_OK;
3349
3350       JBUF_LOCK_CHECK (priv, out_flushing);
3351       break;
3352     case ITEM_TYPE_QUERY:
3353     {
3354       gboolean res;
3355
3356       res = gst_pad_peer_query (priv->srcpad, outquery);
3357
3358       JBUF_LOCK_CHECK (priv, out_flushing);
3359       result = GST_FLOW_OK;
3360       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
3361       JBUF_SIGNAL_QUERY (priv, res);
3362       break;
3363     }
3364   }
3365   return result;
3366
3367   /* ERRORS */
3368 out_flushing:
3369   {
3370     return priv->srcresult;
3371   }
3372 }
3373
3374 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
3375
3376 /* Peek a buffer and compare the seqnum to the expected seqnum.
3377  * If all is fine, the buffer is pushed.
3378  * If something is wrong, we wait for some event
3379  */
3380 static GstFlowReturn
3381 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
3382 {
3383   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3384   GstFlowReturn result;
3385   RTPJitterBufferItem *item;
3386   guint seqnum;
3387   guint32 next_seqnum;
3388
3389   /* only push buffers when PLAYING and active and not buffering */
3390   if (priv->blocked || !priv->active ||
3391       rtp_jitter_buffer_is_buffering (priv->jbuf)) {
3392     return GST_FLOW_WAIT;
3393   }
3394
3395   /* peek a buffer, we're just looking at the sequence number.
3396    * If all is fine, we'll pop and push it. If the sequence number is wrong we
3397    * wait for a timeout or something to change.
3398    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
3399   item = rtp_jitter_buffer_peek (priv->jbuf);
3400   if (item == NULL) {
3401     goto wait;
3402   }
3403
3404   /* get the seqnum and the next expected seqnum */
3405   seqnum = item->seqnum;
3406   if (seqnum == -1) {
3407     return pop_and_push_next (jitterbuffer, seqnum);
3408   }
3409
3410   next_seqnum = priv->next_seqnum;
3411
3412   /* get the gap between this and the previous packet. If we don't know the
3413    * previous packet seqnum assume no gap. */
3414   if (G_UNLIKELY (next_seqnum == -1)) {
3415     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3416     /* we don't know what the next_seqnum should be, the chain function should
3417      * have scheduled a DEADLINE timer that will increment next_seqnum when it
3418      * fires, so wait for that */
3419     result = GST_FLOW_WAIT;
3420   } else {
3421     gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
3422
3423     if (G_LIKELY (gap == 0)) {
3424       /* no missing packet, pop and push */
3425       result = pop_and_push_next (jitterbuffer, seqnum);
3426     } else if (G_UNLIKELY (gap < 0)) {
3427       /* if we have a packet that we already pushed or considered dropped, pop it
3428        * off and get the next packet */
3429       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
3430           seqnum, next_seqnum);
3431       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
3432       free_item (item);
3433       result = GST_FLOW_OK;
3434     } else {
3435       /* the chain function has scheduled timers to request retransmission or
3436        * when to consider the packet lost, wait for that */
3437       GST_DEBUG_OBJECT (jitterbuffer,
3438           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
3439           next_seqnum, seqnum, gap);
3440       result = GST_FLOW_WAIT;
3441     }
3442   }
3443
3444   return result;
3445
3446 wait:
3447   {
3448     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
3449     if (priv->eos) {
3450       return GST_FLOW_EOS;
3451     } else {
3452       return GST_FLOW_WAIT;
3453     }
3454   }
3455 }
3456
3457 static GstClockTime
3458 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
3459 {
3460   GstClockTime rtx_retry_timeout;
3461   GstClockTime rtx_min_retry_timeout;
3462
3463   if (priv->rtx_retry_timeout == -1) {
3464     if (priv->avg_rtx_rtt == 0)
3465       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
3466     else
3467       /* we want to ask for a retransmission after we waited for a
3468        * complete RTT and the additional jitter */
3469       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
3470   } else {
3471     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
3472   }
3473   /* make sure we don't retry too often. On very low latency networks,
3474    * the RTT and jitter can be very low. */
3475   if (priv->rtx_min_retry_timeout == -1) {
3476     rtx_min_retry_timeout = priv->packet_spacing;
3477   } else {
3478     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
3479   }
3480   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
3481
3482   return rtx_retry_timeout;
3483 }
3484
3485 static GstClockTime
3486 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
3487     GstClockTime rtx_retry_timeout)
3488 {
3489   GstClockTime rtx_retry_period;
3490
3491   if (priv->rtx_retry_period == -1) {
3492     /* we retry up to the configured jitterbuffer size but leaving some
3493      * room for the retransmission to arrive in time */
3494     if (rtx_retry_timeout > priv->latency_ns) {
3495       rtx_retry_period = 0;
3496     } else {
3497       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
3498     }
3499   } else {
3500     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
3501   }
3502   return rtx_retry_period;
3503 }
3504
3505 static void
3506 update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3507     GstClockTime dts, gboolean success)
3508 {
3509   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3510   GstClockTime delay;
3511
3512   if (success) {
3513     /* we scheduled a retry for this packet and now we have it */
3514     priv->num_rtx_success++;
3515     /* all the previous retry attempts failed */
3516     priv->num_rtx_failed += timer->num_rtx_retry - 1;
3517   } else {
3518     /* All retries failed or was too late */
3519     priv->num_rtx_failed += timer->num_rtx_retry;
3520   }
3521
3522   /* number of retries before (hopefully) receiving the packet */
3523   if (priv->avg_rtx_num == 0.0)
3524     priv->avg_rtx_num = timer->num_rtx_retry;
3525   else
3526     priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
3527
3528   /* Calculate the delay between retransmission request and receiving this
3529    * packet. We have a valid delay if and only if this packet is a response to
3530    * our last request. If not we don't know if this is a response to an
3531    * earlier request and delay could be way off. For RTT is more important
3532    * with correct values than to update for every packet. */
3533   if (timer->num_rtx_retry == timer->num_rtx_received &&
3534       dts != GST_CLOCK_TIME_NONE && dts > timer->rtx_last) {
3535     delay = dts - timer->rtx_last;
3536     if (priv->avg_rtx_rtt == 0)
3537       priv->avg_rtx_rtt = delay;
3538     else
3539       priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
3540   } else {
3541     delay = 0;
3542   }
3543
3544   GST_LOG_OBJECT (jitterbuffer,
3545       "RTX #%d, result %d, success %" G_GUINT64_FORMAT ", failed %"
3546       G_GUINT64_FORMAT ", requests %" G_GUINT64_FORMAT ", dups %"
3547       G_GUINT64_FORMAT ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %"
3548       GST_TIME_FORMAT, timer->seqnum, success, priv->num_rtx_success,
3549       priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
3550       priv->avg_rtx_num, GST_TIME_ARGS (delay),
3551       GST_TIME_ARGS (priv->avg_rtx_rtt));
3552 }
3553
3554 /* the timeout for when we expected a packet expired */
3555 static gboolean
3556 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3557     GstClockTime now)
3558 {
3559   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3560   GstEvent *event;
3561   guint delay, delay_ms, avg_rtx_rtt_ms;
3562   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
3563   GstClockTime rtx_retry_period;
3564   GstClockTime rtx_retry_timeout;
3565   GstClock *clock;
3566
3567   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
3568       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
3569
3570   rtx_retry_timeout = get_rtx_retry_timeout (priv);
3571   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
3572
3573   delay = timer->rtx_delay + timer->rtx_retry;
3574
3575   delay_ms = GST_TIME_AS_MSECONDS (delay);
3576   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
3577   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
3578   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
3579
3580   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
3581       gst_structure_new ("GstRTPRetransmissionRequest",
3582           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
3583           "running-time", G_TYPE_UINT64, timer->rtx_base,
3584           "delay", G_TYPE_UINT, delay_ms,
3585           "retry", G_TYPE_UINT, timer->num_rtx_retry,
3586           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
3587           "period", G_TYPE_UINT, rtx_retry_period_ms,
3588           "deadline", G_TYPE_UINT, priv->latency_ms,
3589           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
3590           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
3591   GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
3592
3593   priv->num_rtx_requests++;
3594   timer->num_rtx_retry++;
3595
3596   GST_OBJECT_LOCK (jitterbuffer);
3597   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
3598     timer->rtx_last = gst_clock_get_time (clock);
3599     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
3600   } else {
3601     timer->rtx_last = now;
3602   }
3603   GST_OBJECT_UNLOCK (jitterbuffer);
3604
3605   /* calculate the timeout for the next retransmission attempt */
3606   timer->rtx_retry += rtx_retry_timeout;
3607   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
3608       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
3609       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
3610       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
3611   if ((priv->rtx_max_retries != -1
3612           && timer->num_rtx_retry >= priv->rtx_max_retries)
3613       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)) {
3614     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
3615     /* too many retransmission request, we now convert the timer
3616      * to a lost timer, leave the num_rtx_retry as it is for stats */
3617     timer->type = TIMER_TYPE_LOST;
3618     timer->rtx_delay = 0;
3619     timer->rtx_retry = 0;
3620   }
3621   reschedule_timer (jitterbuffer, timer, timer->seqnum,
3622       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
3623
3624   JBUF_UNLOCK (priv);
3625   gst_pad_push_event (priv->sinkpad, event);
3626   JBUF_LOCK (priv);
3627
3628   return FALSE;
3629 }
3630
3631 /* a packet is lost */
3632 static gboolean
3633 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3634     GstClockTime now)
3635 {
3636   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3637   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
3638   gboolean head;
3639   GstEvent *event = NULL;
3640   RTPJitterBufferItem *item;
3641
3642   seqnum = timer->seqnum;
3643   lost_packets = MAX (timer->num, 1);
3644   num_rtx_retry = timer->num_rtx_retry;
3645
3646   /* we had a gap and thus we lost some packets. Create an event for this.  */
3647   if (lost_packets > 1)
3648     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
3649         seqnum + lost_packets - 1);
3650   else
3651     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
3652
3653   priv->num_lost += lost_packets;
3654   priv->num_rtx_failed += num_rtx_retry;
3655
3656   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
3657
3658   /* we now only accept seqnum bigger than this */
3659   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
3660     priv->next_in_seqnum = next_in_seqnum;
3661
3662   /* Avoid creating events if we don't need it. Note that we still need to create
3663    * the lost *ITEM* since it will be used to notify the outgoing thread of
3664    * lost items (so that we can set discont flags and such) */
3665   if (priv->do_lost) {
3666     GstClockTime duration, timestamp;
3667     /* create paket lost event */
3668     timestamp = apply_offset (jitterbuffer, timer->timeout);
3669     duration = timer->duration;
3670     if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
3671       duration = priv->packet_spacing;
3672     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
3673         gst_structure_new ("GstRTPPacketLost",
3674             "seqnum", G_TYPE_UINT, (guint) seqnum,
3675             "timestamp", G_TYPE_UINT64, timestamp,
3676             "duration", G_TYPE_UINT64, duration,
3677             "retry", G_TYPE_UINT, num_rtx_retry, NULL));
3678   }
3679   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
3680   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
3681
3682   if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
3683     /* Store info to update stats if the packet arrives too late */
3684     timer_queue_append (priv->rtx_stats_timers, timer,
3685         now + priv->rtx_stats_timeout * GST_MSECOND, TRUE);
3686   }
3687   remove_timer (jitterbuffer, timer);
3688
3689   if (head)
3690     JBUF_SIGNAL_EVENT (priv);
3691
3692   return TRUE;
3693 }
3694
3695 static gboolean
3696 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3697     GstClockTime now)
3698 {
3699   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3700
3701   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3702   remove_timer (jitterbuffer, timer);
3703   if (!priv->eos) {
3704     /* there was no EOS in the buffer, put one in there now */
3705     queue_event (jitterbuffer, gst_event_new_eos ());
3706   }
3707   JBUF_SIGNAL_EVENT (priv);
3708
3709   return TRUE;
3710 }
3711
3712 static gboolean
3713 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3714     GstClockTime now)
3715 {
3716   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3717
3718   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3719
3720   /* timer seqnum might have been obsoleted by caps seqnum-base,
3721    * only mess with current ongoing seqnum if still unknown */
3722   if (priv->next_seqnum == -1)
3723     priv->next_seqnum = timer->seqnum;
3724   remove_timer (jitterbuffer, timer);
3725   JBUF_SIGNAL_EVENT (priv);
3726
3727   return TRUE;
3728 }
3729
3730 static gboolean
3731 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3732     GstClockTime now)
3733 {
3734   gboolean removed = FALSE;
3735
3736   switch (timer->type) {
3737     case TIMER_TYPE_EXPECTED:
3738       removed = do_expected_timeout (jitterbuffer, timer, now);
3739       break;
3740     case TIMER_TYPE_LOST:
3741       removed = do_lost_timeout (jitterbuffer, timer, now);
3742       break;
3743     case TIMER_TYPE_DEADLINE:
3744       removed = do_deadline_timeout (jitterbuffer, timer, now);
3745       break;
3746     case TIMER_TYPE_EOS:
3747       removed = do_eos_timeout (jitterbuffer, timer, now);
3748       break;
3749   }
3750   return removed;
3751 }
3752
3753 /* called when we need to wait for the next timeout.
3754  *
3755  * We loop over the array of recorded timeouts and wait for the earliest one.
3756  * When it timed out, do the logic associated with the timer.
3757  *
3758  * If there are no timers, we wait on a gcond until something new happens.
3759  */
3760 static void
3761 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3762 {
3763   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3764   GstClockTime now = 0;
3765
3766   JBUF_LOCK (priv);
3767   while (priv->timer_running) {
3768     TimerData *timer = NULL;
3769     GstClockTime timer_timeout = -1;
3770     gint i, len;
3771
3772     /* If we have a clock, update "now" now with the very
3773      * latest running time we have. If timers are unscheduled below we
3774      * otherwise wouldn't update now (it's only updated when timers
3775      * expire), and also for the very first loop iteration now would
3776      * otherwise always be 0
3777      */
3778     GST_OBJECT_LOCK (jitterbuffer);
3779     if (GST_ELEMENT_CLOCK (jitterbuffer)) {
3780       now =
3781           gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
3782           GST_ELEMENT_CAST (jitterbuffer)->base_time;
3783     }
3784     GST_OBJECT_UNLOCK (jitterbuffer);
3785
3786     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3787         GST_TIME_ARGS (now));
3788
3789     /* Clear expired rtx-stats timers */
3790     if (priv->do_retransmission)
3791       timer_queue_clear_until (priv->rtx_stats_timers, now);
3792
3793     /* Iterate "normal" timers */
3794     len = priv->timers->len;
3795     for (i = 0; i < len;) {
3796       TimerData *test = &g_array_index (priv->timers, TimerData, i);
3797       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3798       gboolean save_best = FALSE;
3799
3800       GST_DEBUG_OBJECT (jitterbuffer,
3801           "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i,
3802           test->type, test->seqnum, GST_TIME_ARGS (test_timeout),
3803           GST_STIME_ARGS ((gint64) (test_timeout - now)));
3804
3805       /* Weed out anything too late */
3806       if (test->type == TIMER_TYPE_LOST &&
3807           (test_timeout == -1 || test_timeout <= now)) {
3808         GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry");
3809         do_lost_timeout (jitterbuffer, test, now);
3810         if (!priv->timer_running)
3811           break;
3812         /* We don't move the iterator forward since we just removed the current entry,
3813          * but we update the termination condition */
3814         len = priv->timers->len;
3815       } else {
3816         /* find the smallest timeout */
3817         if (timer == NULL) {
3818           save_best = TRUE;
3819         } else if (timer_timeout == -1) {
3820           /* we already have an immediate timeout, the new timer must be an
3821            * immediate timer with smaller seqnum to become the best */
3822           if (test_timeout == -1
3823               && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3824                       timer->seqnum) > 0))
3825             save_best = TRUE;
3826         } else if (test_timeout == -1) {
3827           /* first immediate timer */
3828           save_best = TRUE;
3829         } else if (test_timeout < timer_timeout) {
3830           /* earlier timer */
3831           save_best = TRUE;
3832         } else if (test_timeout == timer_timeout
3833             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3834                     timer->seqnum) > 0)) {
3835           /* same timer, smaller seqnum */
3836           save_best = TRUE;
3837         }
3838
3839         if (save_best) {
3840           GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3841           timer = test;
3842           timer_timeout = test_timeout;
3843         }
3844         i++;
3845       }
3846     }
3847     if (timer && !priv->blocked) {
3848       GstClock *clock;
3849       GstClockTime sync_time;
3850       GstClockID id;
3851       GstClockReturn ret;
3852       GstClockTimeDiff clock_jitter;
3853
3854       if (timer_timeout == -1 || timer_timeout <= now) {
3855         /* We have normally removed all lost timers in the loop above */
3856         g_assert (timer->type != TIMER_TYPE_LOST);
3857
3858         do_timeout (jitterbuffer, timer, now);
3859         /* check here, do_timeout could have released the lock */
3860         if (!priv->timer_running)
3861           break;
3862         continue;
3863       }
3864
3865       GST_OBJECT_LOCK (jitterbuffer);
3866       clock = GST_ELEMENT_CLOCK (jitterbuffer);
3867       if (!clock) {
3868         GST_OBJECT_UNLOCK (jitterbuffer);
3869         /* let's just push if there is no clock */
3870         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
3871         now = timer_timeout;
3872         continue;
3873       }
3874
3875       /* prepare for sync against clock */
3876       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
3877       /* add latency of peer to get input time */
3878       sync_time += priv->peer_latency;
3879
3880       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
3881           " with sync time %" GST_TIME_FORMAT,
3882           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
3883
3884       /* create an entry for the clock */
3885       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
3886       priv->timer_timeout = timer_timeout;
3887       priv->timer_seqnum = timer->seqnum;
3888       GST_OBJECT_UNLOCK (jitterbuffer);
3889
3890       /* release the lock so that the other end can push stuff or unlock */
3891       JBUF_UNLOCK (priv);
3892
3893       ret = gst_clock_id_wait (id, &clock_jitter);
3894
3895       JBUF_LOCK (priv);
3896       if (!priv->timer_running) {
3897         gst_clock_id_unref (id);
3898         priv->clock_id = NULL;
3899         break;
3900       }
3901
3902       if (ret != GST_CLOCK_UNSCHEDULED) {
3903         now = timer_timeout + MAX (clock_jitter, 0);
3904         GST_DEBUG_OBJECT (jitterbuffer,
3905             "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
3906             GST_STIME_ARGS (clock_jitter));
3907       } else {
3908         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
3909       }
3910       /* and free the entry */
3911       gst_clock_id_unref (id);
3912       priv->clock_id = NULL;
3913     } else {
3914       /* no timers, wait for activity */
3915       JBUF_WAIT_TIMER (priv);
3916     }
3917   }
3918   JBUF_UNLOCK (priv);
3919
3920   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
3921   return;
3922 }
3923
3924 /*
3925  * This funcion implements the main pushing loop on the source pad.
3926  *
3927  * It first tries to push as many buffers as possible. If there is a seqnum
3928  * mismatch, we wait for the next timeouts.
3929  */
3930 static void
3931 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
3932 {
3933   GstRtpJitterBufferPrivate *priv;
3934   GstFlowReturn result = GST_FLOW_OK;
3935
3936   priv = jitterbuffer->priv;
3937
3938   JBUF_LOCK_CHECK (priv, flushing);
3939   do {
3940     result = handle_next_buffer (jitterbuffer);
3941     if (G_LIKELY (result == GST_FLOW_WAIT)) {
3942       /* now wait for the next event */
3943       JBUF_WAIT_EVENT (priv, flushing);
3944       result = GST_FLOW_OK;
3945     }
3946   } while (result == GST_FLOW_OK);
3947   /* store result for upstream */
3948   priv->srcresult = result;
3949   /* if we get here we need to pause */
3950   goto pause;
3951
3952   /* ERRORS */
3953 flushing:
3954   {
3955     result = priv->srcresult;
3956     goto pause;
3957   }
3958 pause:
3959   {
3960     GstEvent *event;
3961
3962     JBUF_SIGNAL_QUERY (priv, FALSE);
3963     JBUF_UNLOCK (priv);
3964
3965     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
3966         gst_flow_get_name (result));
3967     gst_pad_pause_task (priv->srcpad);
3968     if (result == GST_FLOW_EOS) {
3969       event = gst_event_new_eos ();
3970       gst_pad_push_event (priv->srcpad, event);
3971     }
3972     return;
3973   }
3974 }
3975
3976 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
3977  * some sanity checks and then emit the handle-sync signal with the parameters.
3978  * This function must be called with the LOCK */
3979 static void
3980 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
3981 {
3982   GstRtpJitterBufferPrivate *priv;
3983   guint64 base_rtptime, base_time;
3984   guint32 clock_rate;
3985   guint64 last_rtptime;
3986   guint64 clock_base;
3987   guint64 ext_rtptime, diff;
3988   gboolean valid = TRUE, keep = FALSE;
3989
3990   priv = jitterbuffer->priv;
3991
3992   /* get the last values from the jitterbuffer */
3993   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
3994       &clock_rate, &last_rtptime);
3995
3996   clock_base = priv->clock_base;
3997   ext_rtptime = priv->ext_rtptime;
3998
3999   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
4000       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
4001       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
4002       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
4003
4004   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
4005     /* we keep this SR packet for later. When we get a valid RTP packet the
4006      * above values will be set and we can try to use the SR packet */
4007     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
4008     keep = TRUE;
4009   } else {
4010     /* we can't accept anything that happened before we did the last resync */
4011     if (base_rtptime > ext_rtptime) {
4012       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
4013       valid = FALSE;
4014     } else {
4015       /* the SR RTP timestamp must be something close to what we last observed
4016        * in the jitterbuffer */
4017       if (ext_rtptime > last_rtptime) {
4018         /* check how far ahead it is to our RTP timestamps */
4019         diff = ext_rtptime - last_rtptime;
4020         /* if bigger than 1 second, we drop it */
4021         if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 &&
4022             diff >
4023             gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff,
4024                 clock_rate, 1000)) {
4025           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
4026           /* should drop this, but some RTSP servers end up with bogus
4027            * way too ahead RTCP packet when repeated PAUSE/PLAY,
4028            * so still trigger rptbin sync but invalidate RTCP data
4029            * (sync might use other methods) */
4030           ext_rtptime = -1;
4031         }
4032         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
4033             G_GUINT64_FORMAT, last_rtptime, diff);
4034       }
4035     }
4036   }
4037
4038   if (keep) {
4039     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
4040   } else if (valid) {
4041     GstStructure *s;
4042
4043     s = gst_structure_new ("application/x-rtp-sync",
4044         "base-rtptime", G_TYPE_UINT64, base_rtptime,
4045         "base-time", G_TYPE_UINT64, base_time,
4046         "clock-rate", G_TYPE_UINT, clock_rate,
4047         "clock-base", G_TYPE_UINT64, clock_base,
4048         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
4049         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
4050
4051     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
4052     gst_buffer_replace (&priv->last_sr, NULL);
4053     JBUF_UNLOCK (priv);
4054     g_signal_emit (jitterbuffer,
4055         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
4056     JBUF_LOCK (priv);
4057     gst_structure_free (s);
4058   } else {
4059     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
4060     gst_buffer_replace (&priv->last_sr, NULL);
4061   }
4062 }
4063
4064 static GstFlowReturn
4065 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
4066     GstBuffer * buffer)
4067 {
4068   GstRtpJitterBuffer *jitterbuffer;
4069   GstRtpJitterBufferPrivate *priv;
4070   GstFlowReturn ret = GST_FLOW_OK;
4071   guint32 ssrc;
4072   GstRTCPPacket packet;
4073   guint64 ext_rtptime;
4074   guint32 rtptime;
4075   GstRTCPBuffer rtcp = { NULL, };
4076
4077   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4078
4079   if (G_UNLIKELY (!gst_rtcp_buffer_validate_reduced (buffer)))
4080     goto invalid_buffer;
4081
4082   priv = jitterbuffer->priv;
4083
4084   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
4085
4086   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
4087     goto empty_buffer;
4088
4089   /* first packet must be SR or RR or else the validate would have failed */
4090   switch (gst_rtcp_packet_get_type (&packet)) {
4091     case GST_RTCP_TYPE_SR:
4092       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
4093           NULL, NULL);
4094       break;
4095     default:
4096       goto ignore_buffer;
4097   }
4098   gst_rtcp_buffer_unmap (&rtcp);
4099
4100   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
4101
4102   JBUF_LOCK (priv);
4103   /* convert the RTP timestamp to our extended timestamp, using the same offset
4104    * we used in the jitterbuffer */
4105   ext_rtptime = priv->jbuf->ext_rtptime;
4106   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
4107
4108   priv->ext_rtptime = ext_rtptime;
4109   gst_buffer_replace (&priv->last_sr, buffer);
4110
4111   do_handle_sync (jitterbuffer);
4112   JBUF_UNLOCK (priv);
4113
4114 done:
4115   gst_buffer_unref (buffer);
4116
4117   return ret;
4118
4119 invalid_buffer:
4120   {
4121     /* this is not fatal but should be filtered earlier */
4122     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4123         ("Received invalid RTCP payload, dropping"));
4124     ret = GST_FLOW_OK;
4125     goto done;
4126   }
4127 empty_buffer:
4128   {
4129     /* this is not fatal but should be filtered earlier */
4130     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4131         ("Received empty RTCP payload, dropping"));
4132     gst_rtcp_buffer_unmap (&rtcp);
4133     ret = GST_FLOW_OK;
4134     goto done;
4135   }
4136 ignore_buffer:
4137   {
4138     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
4139     gst_rtcp_buffer_unmap (&rtcp);
4140     ret = GST_FLOW_OK;
4141     goto done;
4142   }
4143 }
4144
4145 static gboolean
4146 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
4147     GstQuery * query)
4148 {
4149   gboolean res = FALSE;
4150   GstRtpJitterBuffer *jitterbuffer;
4151   GstRtpJitterBufferPrivate *priv;
4152
4153   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4154   priv = jitterbuffer->priv;
4155
4156   switch (GST_QUERY_TYPE (query)) {
4157     case GST_QUERY_CAPS:
4158     {
4159       GstCaps *filter, *caps;
4160
4161       gst_query_parse_caps (query, &filter);
4162       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4163       gst_query_set_caps_result (query, caps);
4164       gst_caps_unref (caps);
4165       res = TRUE;
4166       break;
4167     }
4168     default:
4169       if (GST_QUERY_IS_SERIALIZED (query)) {
4170         RTPJitterBufferItem *item;
4171         gboolean head;
4172
4173         JBUF_LOCK_CHECK (priv, out_flushing);
4174         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
4175             RTP_JITTER_BUFFER_MODE_BUFFER) {
4176           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
4177           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
4178           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
4179           if (head)
4180             JBUF_SIGNAL_EVENT (priv);
4181           JBUF_WAIT_QUERY (priv, out_flushing);
4182           res = priv->last_query;
4183         } else {
4184           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
4185           res = FALSE;
4186         }
4187         JBUF_UNLOCK (priv);
4188       } else {
4189         res = gst_pad_query_default (pad, parent, query);
4190       }
4191       break;
4192   }
4193   return res;
4194   /* ERRORS */
4195 out_flushing:
4196   {
4197     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
4198     JBUF_UNLOCK (priv);
4199     return FALSE;
4200   }
4201
4202 }
4203
4204 static gboolean
4205 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
4206     GstQuery * query)
4207 {
4208   GstRtpJitterBuffer *jitterbuffer;
4209   GstRtpJitterBufferPrivate *priv;
4210   gboolean res = FALSE;
4211
4212   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4213   priv = jitterbuffer->priv;
4214
4215   switch (GST_QUERY_TYPE (query)) {
4216     case GST_QUERY_LATENCY:
4217     {
4218       /* We need to send the query upstream and add the returned latency to our
4219        * own */
4220       GstClockTime min_latency, max_latency;
4221       gboolean us_live;
4222       GstClockTime our_latency;
4223
4224       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
4225         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
4226
4227         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
4228             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4229             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4230
4231         /* store this so that we can safely sync on the peer buffers. */
4232         JBUF_LOCK (priv);
4233         priv->peer_latency = min_latency;
4234         our_latency = priv->latency_ns;
4235         JBUF_UNLOCK (priv);
4236
4237         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
4238             GST_TIME_ARGS (our_latency));
4239
4240         /* we add some latency but can buffer an infinite amount of time */
4241         min_latency += our_latency;
4242         max_latency = -1;
4243
4244         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
4245             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4246             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4247
4248         gst_query_set_latency (query, TRUE, min_latency, max_latency);
4249       }
4250       break;
4251     }
4252     case GST_QUERY_POSITION:
4253     {
4254       GstClockTime start, last_out;
4255       GstFormat fmt;
4256
4257       gst_query_parse_position (query, &fmt, NULL);
4258       if (fmt != GST_FORMAT_TIME) {
4259         res = gst_pad_query_default (pad, parent, query);
4260         break;
4261       }
4262
4263       JBUF_LOCK (priv);
4264       start = priv->npt_start;
4265       last_out = priv->last_out_time;
4266       JBUF_UNLOCK (priv);
4267
4268       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
4269           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
4270           GST_TIME_ARGS (last_out));
4271
4272       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
4273         /* bring 0-based outgoing time to stream time */
4274         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
4275         res = TRUE;
4276       } else {
4277         res = gst_pad_query_default (pad, parent, query);
4278       }
4279       break;
4280     }
4281     case GST_QUERY_CAPS:
4282     {
4283       GstCaps *filter, *caps;
4284
4285       gst_query_parse_caps (query, &filter);
4286       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4287       gst_query_set_caps_result (query, caps);
4288       gst_caps_unref (caps);
4289       res = TRUE;
4290       break;
4291     }
4292     default:
4293       res = gst_pad_query_default (pad, parent, query);
4294       break;
4295   }
4296
4297   return res;
4298 }
4299
4300 static void
4301 gst_rtp_jitter_buffer_set_property (GObject * object,
4302     guint prop_id, const GValue * value, GParamSpec * pspec)
4303 {
4304   GstRtpJitterBuffer *jitterbuffer;
4305   GstRtpJitterBufferPrivate *priv;
4306
4307   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4308   priv = jitterbuffer->priv;
4309
4310   switch (prop_id) {
4311     case PROP_LATENCY:
4312     {
4313       guint new_latency, old_latency;
4314
4315       new_latency = g_value_get_uint (value);
4316
4317       JBUF_LOCK (priv);
4318       old_latency = priv->latency_ms;
4319       priv->latency_ms = new_latency;
4320       priv->latency_ns = priv->latency_ms * GST_MSECOND;
4321       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
4322       JBUF_UNLOCK (priv);
4323
4324       /* post message if latency changed, this will inform the parent pipeline
4325        * that a latency reconfiguration is possible/needed. */
4326       if (new_latency != old_latency) {
4327         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
4328             GST_TIME_ARGS (new_latency * GST_MSECOND));
4329
4330         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
4331             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
4332       }
4333       break;
4334     }
4335     case PROP_DROP_ON_LATENCY:
4336       JBUF_LOCK (priv);
4337       priv->drop_on_latency = g_value_get_boolean (value);
4338       JBUF_UNLOCK (priv);
4339       break;
4340     case PROP_TS_OFFSET:
4341       JBUF_LOCK (priv);
4342       priv->ts_offset = g_value_get_int64 (value);
4343       priv->ts_discont = TRUE;
4344       JBUF_UNLOCK (priv);
4345       break;
4346     case PROP_DO_LOST:
4347       JBUF_LOCK (priv);
4348       priv->do_lost = g_value_get_boolean (value);
4349       JBUF_UNLOCK (priv);
4350       break;
4351     case PROP_MODE:
4352       JBUF_LOCK (priv);
4353       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
4354       JBUF_UNLOCK (priv);
4355       break;
4356     case PROP_DO_RETRANSMISSION:
4357       JBUF_LOCK (priv);
4358       priv->do_retransmission = g_value_get_boolean (value);
4359       JBUF_UNLOCK (priv);
4360       break;
4361     case PROP_RTX_NEXT_SEQNUM:
4362       JBUF_LOCK (priv);
4363       priv->rtx_next_seqnum = g_value_get_boolean (value);
4364       JBUF_UNLOCK (priv);
4365       break;
4366     case PROP_RTX_DELAY:
4367       JBUF_LOCK (priv);
4368       priv->rtx_delay = g_value_get_int (value);
4369       JBUF_UNLOCK (priv);
4370       break;
4371     case PROP_RTX_MIN_DELAY:
4372       JBUF_LOCK (priv);
4373       priv->rtx_min_delay = g_value_get_uint (value);
4374       JBUF_UNLOCK (priv);
4375       break;
4376     case PROP_RTX_DELAY_REORDER:
4377       JBUF_LOCK (priv);
4378       priv->rtx_delay_reorder = g_value_get_int (value);
4379       JBUF_UNLOCK (priv);
4380       break;
4381     case PROP_RTX_RETRY_TIMEOUT:
4382       JBUF_LOCK (priv);
4383       priv->rtx_retry_timeout = g_value_get_int (value);
4384       JBUF_UNLOCK (priv);
4385       break;
4386     case PROP_RTX_MIN_RETRY_TIMEOUT:
4387       JBUF_LOCK (priv);
4388       priv->rtx_min_retry_timeout = g_value_get_int (value);
4389       JBUF_UNLOCK (priv);
4390       break;
4391     case PROP_RTX_RETRY_PERIOD:
4392       JBUF_LOCK (priv);
4393       priv->rtx_retry_period = g_value_get_int (value);
4394       JBUF_UNLOCK (priv);
4395       break;
4396     case PROP_RTX_MAX_RETRIES:
4397       JBUF_LOCK (priv);
4398       priv->rtx_max_retries = g_value_get_int (value);
4399       JBUF_UNLOCK (priv);
4400       break;
4401     case PROP_RTX_STATS_TIMEOUT:
4402       JBUF_LOCK (priv);
4403       priv->rtx_stats_timeout = g_value_get_uint (value);
4404       JBUF_UNLOCK (priv);
4405       break;
4406     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4407       JBUF_LOCK (priv);
4408       priv->max_rtcp_rtp_time_diff = g_value_get_int (value);
4409       JBUF_UNLOCK (priv);
4410       break;
4411     case PROP_MAX_DROPOUT_TIME:
4412       JBUF_LOCK (priv);
4413       priv->max_dropout_time = g_value_get_uint (value);
4414       JBUF_UNLOCK (priv);
4415       break;
4416     case PROP_MAX_MISORDER_TIME:
4417       JBUF_LOCK (priv);
4418       priv->max_misorder_time = g_value_get_uint (value);
4419       JBUF_UNLOCK (priv);
4420       break;
4421     case PROP_RFC7273_SYNC:
4422       JBUF_LOCK (priv);
4423       rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
4424           g_value_get_boolean (value));
4425       JBUF_UNLOCK (priv);
4426       break;
4427     default:
4428       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4429       break;
4430   }
4431 }
4432
4433 static void
4434 gst_rtp_jitter_buffer_get_property (GObject * object,
4435     guint prop_id, GValue * value, GParamSpec * pspec)
4436 {
4437   GstRtpJitterBuffer *jitterbuffer;
4438   GstRtpJitterBufferPrivate *priv;
4439
4440   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4441   priv = jitterbuffer->priv;
4442
4443   switch (prop_id) {
4444     case PROP_LATENCY:
4445       JBUF_LOCK (priv);
4446       g_value_set_uint (value, priv->latency_ms);
4447       JBUF_UNLOCK (priv);
4448       break;
4449     case PROP_DROP_ON_LATENCY:
4450       JBUF_LOCK (priv);
4451       g_value_set_boolean (value, priv->drop_on_latency);
4452       JBUF_UNLOCK (priv);
4453       break;
4454     case PROP_TS_OFFSET:
4455       JBUF_LOCK (priv);
4456       g_value_set_int64 (value, priv->ts_offset);
4457       JBUF_UNLOCK (priv);
4458       break;
4459     case PROP_DO_LOST:
4460       JBUF_LOCK (priv);
4461       g_value_set_boolean (value, priv->do_lost);
4462       JBUF_UNLOCK (priv);
4463       break;
4464     case PROP_MODE:
4465       JBUF_LOCK (priv);
4466       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
4467       JBUF_UNLOCK (priv);
4468       break;
4469     case PROP_PERCENT:
4470     {
4471       gint percent;
4472
4473       JBUF_LOCK (priv);
4474       if (priv->srcresult != GST_FLOW_OK)
4475         percent = 100;
4476       else
4477         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
4478
4479       g_value_set_int (value, percent);
4480       JBUF_UNLOCK (priv);
4481       break;
4482     }
4483     case PROP_DO_RETRANSMISSION:
4484       JBUF_LOCK (priv);
4485       g_value_set_boolean (value, priv->do_retransmission);
4486       JBUF_UNLOCK (priv);
4487       break;
4488     case PROP_RTX_NEXT_SEQNUM:
4489       JBUF_LOCK (priv);
4490       g_value_set_boolean (value, priv->rtx_next_seqnum);
4491       JBUF_UNLOCK (priv);
4492       break;
4493     case PROP_RTX_DELAY:
4494       JBUF_LOCK (priv);
4495       g_value_set_int (value, priv->rtx_delay);
4496       JBUF_UNLOCK (priv);
4497       break;
4498     case PROP_RTX_MIN_DELAY:
4499       JBUF_LOCK (priv);
4500       g_value_set_uint (value, priv->rtx_min_delay);
4501       JBUF_UNLOCK (priv);
4502       break;
4503     case PROP_RTX_DELAY_REORDER:
4504       JBUF_LOCK (priv);
4505       g_value_set_int (value, priv->rtx_delay_reorder);
4506       JBUF_UNLOCK (priv);
4507       break;
4508     case PROP_RTX_RETRY_TIMEOUT:
4509       JBUF_LOCK (priv);
4510       g_value_set_int (value, priv->rtx_retry_timeout);
4511       JBUF_UNLOCK (priv);
4512       break;
4513     case PROP_RTX_MIN_RETRY_TIMEOUT:
4514       JBUF_LOCK (priv);
4515       g_value_set_int (value, priv->rtx_min_retry_timeout);
4516       JBUF_UNLOCK (priv);
4517       break;
4518     case PROP_RTX_RETRY_PERIOD:
4519       JBUF_LOCK (priv);
4520       g_value_set_int (value, priv->rtx_retry_period);
4521       JBUF_UNLOCK (priv);
4522       break;
4523     case PROP_RTX_MAX_RETRIES:
4524       JBUF_LOCK (priv);
4525       g_value_set_int (value, priv->rtx_max_retries);
4526       JBUF_UNLOCK (priv);
4527       break;
4528     case PROP_RTX_STATS_TIMEOUT:
4529       JBUF_LOCK (priv);
4530       g_value_set_uint (value, priv->rtx_stats_timeout);
4531       JBUF_UNLOCK (priv);
4532       break;
4533     case PROP_STATS:
4534       g_value_take_boxed (value,
4535           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
4536       break;
4537     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4538       JBUF_LOCK (priv);
4539       g_value_set_int (value, priv->max_rtcp_rtp_time_diff);
4540       JBUF_UNLOCK (priv);
4541       break;
4542     case PROP_MAX_DROPOUT_TIME:
4543       JBUF_LOCK (priv);
4544       g_value_set_uint (value, priv->max_dropout_time);
4545       JBUF_UNLOCK (priv);
4546       break;
4547     case PROP_MAX_MISORDER_TIME:
4548       JBUF_LOCK (priv);
4549       g_value_set_uint (value, priv->max_misorder_time);
4550       JBUF_UNLOCK (priv);
4551       break;
4552     case PROP_RFC7273_SYNC:
4553       JBUF_LOCK (priv);
4554       g_value_set_boolean (value,
4555           rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
4556       JBUF_UNLOCK (priv);
4557       break;
4558     default:
4559       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4560       break;
4561   }
4562 }
4563
4564 static GstStructure *
4565 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
4566 {
4567   GstRtpJitterBufferPrivate *priv = jbuf->priv;
4568   GstStructure *s;
4569
4570   JBUF_LOCK (priv);
4571   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
4572       "num-pushed", G_TYPE_UINT64, priv->num_pushed,
4573       "num-lost", G_TYPE_UINT64, priv->num_lost,
4574       "num-late", G_TYPE_UINT64, priv->num_late,
4575       "num-duplicates", G_TYPE_UINT64, priv->num_duplicates,
4576       "avg-jitter", G_TYPE_UINT64, priv->avg_jitter,
4577       "rtx-count", G_TYPE_UINT64, priv->num_rtx_requests,
4578       "rtx-success-count", G_TYPE_UINT64, priv->num_rtx_success,
4579       "rtx-per-packet", G_TYPE_DOUBLE, priv->avg_rtx_num,
4580       "rtx-rtt", G_TYPE_UINT64, priv->avg_rtx_rtt, NULL);
4581   JBUF_UNLOCK (priv);
4582
4583   return s;
4584 }