rtpjitterbuffer: Add a faststart-min-packets property
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *  Copyright 2015 Kurento (http://kurento.org/)
9  *   @author: Miguel ParĂ­s <mparisdiaz@gmail.com>
10  *  Copyright 2016 Pexip AS
11  *   @author: Havard Graff <havard@pexip.com>
12  *   @author: Stian Selnes <stian@pexip.com>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  *
29  */
30
31 /**
32  * SECTION:element-rtpjitterbuffer
33  *
34  * This element reorders and removes duplicate RTP packets as they are received
35  * from a network source.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * The rtpjitterbuffer will wait for missing packets up to a configurable time
43  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
44  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
45  * property is set, lost packets will result in a custom serialized downstream
46  * event of name GstRTPPacketLost. The lost packet events are usually used by a
47  * depayloader or other element to create concealment data or some other logic
48  * to gracefully handle the missing packets.
49  *
50  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
51  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
52  * buffer.
53  *
54  * The jitterbuffer can also be configured to send early retransmission events
55  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
56  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
57  * sends a custom upstream event named GstRTPRetransmissionRequest when the
58  * packet is considered late. The initial expected packet arrival time is
59  * calculated as follows:
60  *
61  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
62  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
63  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
64  *     packets with different rtptime.
65  *
66  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
67  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
68  *     previously scheduled timeout is overwritten.
69  *
70  * - If seqnum N arrived, all seqnum older than
71  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
72  *     immediately. This is to request fast feedback for abonormally reorder
73  *     packets before any of the previous timeouts is triggered.
74  *
75  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
76  * event. After the initial timeout expires and the retransmission event is
77  * sent, the timeout is scheduled for
78  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
79  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
80  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
81  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
82  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
83  * retransmission requests are sent and the regular logic is performed to
84  * schedule a lost packet as discussed above.
85  *
86  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
87  * to the pipeline.
88  *
89  * This element will automatically be used inside rtpbin.
90  *
91  * <refsect2>
92  * <title>Example pipelines</title>
93  * |[
94  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
95  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
96  * inserted into the pipeline to smooth out network jitter and to reorder the
97  * out-of-order RTP packets.
98  * </refsect2>
99  */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include <stdlib.h>
106 #include <stdio.h>
107 #include <string.h>
108 #include <gst/rtp/gstrtpbuffer.h>
109 #include <gst/net/net.h>
110
111 #include "gstrtpjitterbuffer.h"
112 #include "rtpjitterbuffer.h"
113 #include "rtpstats.h"
114
115 #include <gst/glib-compat-private.h>
116
117 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
118 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
119
120 /* RTPJitterBuffer signals and args */
121 enum
122 {
123   SIGNAL_REQUEST_PT_MAP,
124   SIGNAL_CLEAR_PT_MAP,
125   SIGNAL_HANDLE_SYNC,
126   SIGNAL_ON_NPT_STOP,
127   SIGNAL_SET_ACTIVE,
128   LAST_SIGNAL
129 };
130
131 #define DEFAULT_LATENCY_MS          200
132 #define DEFAULT_DROP_ON_LATENCY     FALSE
133 #define DEFAULT_TS_OFFSET           0
134 #define DEFAULT_DO_LOST             FALSE
135 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
136 #define DEFAULT_PERCENT             0
137 #define DEFAULT_DO_RETRANSMISSION   FALSE
138 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
139 #define DEFAULT_RTX_DELAY           -1
140 #define DEFAULT_RTX_MIN_DELAY       0
141 #define DEFAULT_RTX_DELAY_REORDER   3
142 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
143 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
144 #define DEFAULT_RTX_RETRY_PERIOD    -1
145 #define DEFAULT_RTX_MAX_RETRIES    -1
146 #define DEFAULT_RTX_DEADLINE       -1
147 #define DEFAULT_RTX_STATS_TIMEOUT   1000
148 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
149 #define DEFAULT_MAX_DROPOUT_TIME    60000
150 #define DEFAULT_MAX_MISORDER_TIME   2000
151 #define DEFAULT_RFC7273_SYNC        FALSE
152 #define DEFAULT_FASTSTART_MIN_PACKETS 0
153
154 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
155 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
156
157 enum
158 {
159   PROP_0,
160   PROP_LATENCY,
161   PROP_DROP_ON_LATENCY,
162   PROP_TS_OFFSET,
163   PROP_DO_LOST,
164   PROP_MODE,
165   PROP_PERCENT,
166   PROP_DO_RETRANSMISSION,
167   PROP_RTX_NEXT_SEQNUM,
168   PROP_RTX_DELAY,
169   PROP_RTX_MIN_DELAY,
170   PROP_RTX_DELAY_REORDER,
171   PROP_RTX_RETRY_TIMEOUT,
172   PROP_RTX_MIN_RETRY_TIMEOUT,
173   PROP_RTX_RETRY_PERIOD,
174   PROP_RTX_MAX_RETRIES,
175   PROP_RTX_DEADLINE,
176   PROP_RTX_STATS_TIMEOUT,
177   PROP_STATS,
178   PROP_MAX_RTCP_RTP_TIME_DIFF,
179   PROP_MAX_DROPOUT_TIME,
180   PROP_MAX_MISORDER_TIME,
181   PROP_RFC7273_SYNC,
182   PROP_FASTSTART_MIN_PACKETS
183 };
184
185 #define JBUF_LOCK(priv)   G_STMT_START {                        \
186     GST_TRACE("Locking from thread %p", g_thread_self());       \
187     (g_mutex_lock (&(priv)->jbuf_lock));                        \
188     GST_TRACE("Locked from thread %p", g_thread_self());        \
189   } G_STMT_END
190
191 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
192   JBUF_LOCK (priv);                                   \
193   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
194     goto label;                                       \
195 } G_STMT_END
196 #define JBUF_UNLOCK(priv) G_STMT_START {                        \
197     GST_TRACE ("Unlocking from thread %p", g_thread_self ());   \
198     (g_mutex_unlock (&(priv)->jbuf_lock));                      \
199 } G_STMT_END
200
201 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
202   GST_DEBUG ("waiting timer");                            \
203   (priv)->waiting_timer = TRUE;                           \
204   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
205   (priv)->waiting_timer = FALSE;                          \
206   GST_DEBUG ("waiting timer done");                       \
207 } G_STMT_END
208 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
209   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
210     GST_DEBUG ("signal timer");                           \
211     g_cond_signal (&(priv)->jbuf_timer);                  \
212   }                                                       \
213 } G_STMT_END
214
215 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
216   GST_DEBUG ("waiting event");                           \
217   (priv)->waiting_event = TRUE;                          \
218   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
219   (priv)->waiting_event = FALSE;                         \
220   GST_DEBUG ("waiting event done");                      \
221   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
222     goto label;                                          \
223 } G_STMT_END
224 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
225   if (G_UNLIKELY ((priv)->waiting_event)) {              \
226     GST_DEBUG ("signal event");                          \
227     g_cond_signal (&(priv)->jbuf_event);                 \
228   }                                                      \
229 } G_STMT_END
230
231 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
232   GST_DEBUG ("waiting query");                           \
233   (priv)->waiting_query = TRUE;                          \
234   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
235   (priv)->waiting_query = FALSE;                         \
236   GST_DEBUG ("waiting query done");                      \
237   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
238     goto label;                                          \
239 } G_STMT_END
240 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
241   (priv)->last_query = res;                              \
242   if (G_UNLIKELY ((priv)->waiting_query)) {              \
243     GST_DEBUG ("signal query");                          \
244     g_cond_signal (&(priv)->jbuf_query);                 \
245   }                                                      \
246 } G_STMT_END
247
248 #define GST_BUFFER_IS_RETRANSMISSION(buffer) \
249   GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
250
251 typedef struct TimerQueue
252 {
253   GQueue *timers;
254   GHashTable *hashtable;
255 } TimerQueue;
256
257 struct _GstRtpJitterBufferPrivate
258 {
259   GstPad *sinkpad, *srcpad;
260   GstPad *rtcpsinkpad;
261
262   RTPJitterBuffer *jbuf;
263   GMutex jbuf_lock;
264   gboolean waiting_timer;
265   GCond jbuf_timer;
266   gboolean waiting_event;
267   GCond jbuf_event;
268   gboolean waiting_query;
269   GCond jbuf_query;
270   gboolean last_query;
271   gboolean discont;
272   gboolean ts_discont;
273   gboolean active;
274   guint64 out_offset;
275
276   gboolean timer_running;
277   GThread *timer_thread;
278
279   /* properties */
280   guint latency_ms;
281   guint64 latency_ns;
282   gboolean drop_on_latency;
283   gint64 ts_offset;
284   gboolean do_lost;
285   gboolean do_retransmission;
286   gboolean rtx_next_seqnum;
287   gint rtx_delay;
288   guint rtx_min_delay;
289   gint rtx_delay_reorder;
290   gint rtx_retry_timeout;
291   gint rtx_min_retry_timeout;
292   gint rtx_retry_period;
293   gint rtx_max_retries;
294   guint rtx_stats_timeout;
295   gint rtx_deadline_ms;
296   gint max_rtcp_rtp_time_diff;
297   guint32 max_dropout_time;
298   guint32 max_misorder_time;
299   guint faststart_min_packets;
300
301   /* the last seqnum we pushed out */
302   guint32 last_popped_seqnum;
303   /* the next expected seqnum we push */
304   guint32 next_seqnum;
305   /* seqnum-base, if known */
306   guint32 seqnum_base;
307   /* last output time */
308   GstClockTime last_out_time;
309   /* last valid input timestamp and rtptime pair */
310   GstClockTime ips_pts;
311   guint64 ips_rtptime;
312   GstClockTime packet_spacing;
313   gint equidistant;
314
315   GQueue gap_packets;
316
317   /* the next expected seqnum we receive */
318   GstClockTime last_in_pts;
319   guint32 next_in_seqnum;
320
321   GArray *timers;
322   TimerQueue *rtx_stats_timers;
323
324   /* start and stop ranges */
325   GstClockTime npt_start;
326   GstClockTime npt_stop;
327   guint64 ext_timestamp;
328   guint64 last_elapsed;
329   guint64 estimated_eos;
330   GstClockID eos_id;
331
332   /* state */
333   gboolean eos;
334   guint last_percent;
335
336   /* clock rate and rtp timestamp offset */
337   gint last_pt;
338   gint32 clock_rate;
339   gint64 clock_base;
340   gint64 prev_ts_offset;
341
342   /* when we are shutting down */
343   GstFlowReturn srcresult;
344   gboolean blocked;
345
346   /* for sync */
347   GstSegment segment;
348   GstClockID clock_id;
349   GstClockTime timer_timeout;
350   guint16 timer_seqnum;
351   /* the latency of the upstream peer, we have to take this into account when
352    * synchronizing the buffers. */
353   GstClockTime peer_latency;
354   guint64 ext_rtptime;
355   GstBuffer *last_sr;
356
357   /* some accounting */
358   guint64 num_pushed;
359   guint64 num_lost;
360   guint64 num_late;
361   guint64 num_duplicates;
362   guint64 num_rtx_requests;
363   guint64 num_rtx_success;
364   guint64 num_rtx_failed;
365   gdouble avg_rtx_num;
366   guint64 avg_rtx_rtt;
367   RTPPacketRateCtx packet_rate_ctx;
368
369   /* for the jitter */
370   GstClockTime last_dts;
371   guint64 last_rtptime;
372   GstClockTime avg_jitter;
373 };
374
375 typedef enum
376 {
377   TIMER_TYPE_EXPECTED,
378   TIMER_TYPE_LOST,
379   TIMER_TYPE_DEADLINE,
380   TIMER_TYPE_EOS
381 } TimerType;
382
383 typedef struct
384 {
385   guint idx;
386   guint16 seqnum;
387   guint num;
388   TimerType type;
389   GstClockTime timeout;
390   GstClockTime duration;
391   GstClockTime rtx_base;
392   GstClockTime rtx_delay;
393   GstClockTime rtx_retry;
394   GstClockTime rtx_last;
395   guint num_rtx_retry;
396   guint num_rtx_received;
397 } TimerData;
398
399 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
400   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
401                                 GstRtpJitterBufferPrivate))
402
403 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
404 GST_STATIC_PAD_TEMPLATE ("sink",
405     GST_PAD_SINK,
406     GST_PAD_ALWAYS,
407     GST_STATIC_CAPS ("application/x-rtp"
408         /* "clock-rate = (int) [ 1, 2147483647 ], "
409          * "payload = (int) , "
410          * "encoding-name = (string) "
411          */ )
412     );
413
414 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
415 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
416     GST_PAD_SINK,
417     GST_PAD_REQUEST,
418     GST_STATIC_CAPS ("application/x-rtcp")
419     );
420
421 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
422 GST_STATIC_PAD_TEMPLATE ("src",
423     GST_PAD_SRC,
424     GST_PAD_ALWAYS,
425     GST_STATIC_CAPS ("application/x-rtp"
426         /* "payload = (int) , "
427          * "clock-rate = (int) , "
428          * "encoding-name = (string) "
429          */ )
430     );
431
432 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
433
434 #define gst_rtp_jitter_buffer_parent_class parent_class
435 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
436
437 /* object overrides */
438 static void gst_rtp_jitter_buffer_set_property (GObject * object,
439     guint prop_id, const GValue * value, GParamSpec * pspec);
440 static void gst_rtp_jitter_buffer_get_property (GObject * object,
441     guint prop_id, GValue * value, GParamSpec * pspec);
442 static void gst_rtp_jitter_buffer_finalize (GObject * object);
443
444 /* element overrides */
445 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
446     * element, GstStateChange transition);
447 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
448     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
449 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
450     GstPad * pad);
451 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
452 static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
453     GstClock * clock);
454
455 /* pad overrides */
456 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
457 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
458     GstObject * parent);
459
460 /* sinkpad overrides */
461 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
462     GstObject * parent, GstEvent * event);
463 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
464     GstObject * parent, GstBuffer * buffer);
465
466 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
467     GstObject * parent, GstEvent * event);
468 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
469     GstObject * parent, GstBuffer * buffer);
470
471 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
472     GstObject * parent, GstQuery * query);
473
474 /* srcpad overrides */
475 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
476     GstObject * parent, GstEvent * event);
477 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
478     GstObject * parent, GstPadMode mode, gboolean active);
479 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
480 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
481     GstObject * parent, GstQuery * query);
482
483 static void
484 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
485 static GstClockTime
486 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
487     gboolean active, guint64 base_time);
488 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
489
490 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
491 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
492
493 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
494
495 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
496     jitterbuffer);
497
498 static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
499     TimerData * timer, GstClockTime dts, gboolean success);
500
501 static TimerQueue *timer_queue_new (void);
502 static void timer_queue_free (TimerQueue * queue);
503
504 static void
505 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
506 {
507   GObjectClass *gobject_class;
508   GstElementClass *gstelement_class;
509
510   gobject_class = (GObjectClass *) klass;
511   gstelement_class = (GstElementClass *) klass;
512
513   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
514
515   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
516
517   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
518   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
519
520   /**
521    * GstRtpJitterBuffer:latency:
522    *
523    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
524    * for at most this time.
525    */
526   g_object_class_install_property (gobject_class, PROP_LATENCY,
527       g_param_spec_uint ("latency", "Buffer latency in ms",
528           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
529           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
530   /**
531    * GstRtpJitterBuffer:drop-on-latency:
532    *
533    * Drop oldest buffers when the queue is completely filled.
534    */
535   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
536       g_param_spec_boolean ("drop-on-latency",
537           "Drop buffers when maximum latency is reached",
538           "Tells the jitterbuffer to never exceed the given latency in size",
539           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
540   /**
541    * GstRtpJitterBuffer:ts-offset:
542    *
543    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
544    * This is mainly used to ensure interstream synchronisation.
545    */
546   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
547       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
548           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
549           G_MAXINT64, DEFAULT_TS_OFFSET,
550           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
551
552   /**
553    * GstRtpJitterBuffer:do-lost:
554    *
555    * Send out a GstRTPPacketLost event downstream when a packet is considered
556    * lost.
557    */
558   g_object_class_install_property (gobject_class, PROP_DO_LOST,
559       g_param_spec_boolean ("do-lost", "Do Lost",
560           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
561           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
562
563   /**
564    * GstRtpJitterBuffer:mode:
565    *
566    * Control the buffering and timestamping mode used by the jitterbuffer.
567    */
568   g_object_class_install_property (gobject_class, PROP_MODE,
569       g_param_spec_enum ("mode", "Mode",
570           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
571           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
572   /**
573    * GstRtpJitterBuffer:percent:
574    *
575    * The percent of the jitterbuffer that is filled.
576    */
577   g_object_class_install_property (gobject_class, PROP_PERCENT,
578       g_param_spec_int ("percent", "percent",
579           "The buffer filled percent", 0, 100,
580           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
581   /**
582    * GstRtpJitterBuffer:do-retransmission:
583    *
584    * Send out a GstRTPRetransmission event upstream when a packet is considered
585    * late and should be retransmitted.
586    *
587    * Since: 1.2
588    */
589   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
590       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
591           "Send retransmission events upstream when a packet is late",
592           DEFAULT_DO_RETRANSMISSION,
593           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
594
595   /**
596    * GstRtpJitterBuffer:rtx-next-seqnum
597    *
598    * Estimate when the next packet should arrive and schedule a retransmission
599    * request for it.
600    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
601    * for packet N+1. So it will be requested if it does not arrive at the expected time.
602    * The expected time is calculated using the dts of N and the packet spacing.
603    *
604    * Since: 1.6
605    */
606   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
607       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
608           "Estimate when the next packet should arrive and schedule a "
609           "retransmission request for it.",
610           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
611
612   /**
613    * GstRtpJitterBuffer:rtx-delay:
614    *
615    * When a packet did not arrive at the expected time, wait this extra amount
616    * of time before sending a retransmission event.
617    *
618    * When -1 is used, the max jitter will be used as extra delay.
619    *
620    * Since: 1.2
621    */
622   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
623       g_param_spec_int ("rtx-delay", "RTX Delay",
624           "Extra time in ms to wait before sending retransmission "
625           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
626           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
627
628   /**
629    * GstRtpJitterBuffer:rtx-min-delay:
630    *
631    * When a packet did not arrive at the expected time, wait at least this extra amount
632    * of time before sending a retransmission event.
633    *
634    * Since: 1.6
635    */
636   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
637       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
638           "Minimum time in ms to wait before sending retransmission "
639           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
640           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
641   /**
642    * GstRtpJitterBuffer:rtx-delay-reorder:
643    *
644    * Assume that a retransmission event should be sent when we see
645    * this much packet reordering.
646    *
647    * When -1 is used, the value will be estimated based on observed packet
648    * reordering. When 0 is used packet reordering alone will not cause a
649    * retransmission event (Since 1.10).
650    *
651    * Since: 1.2
652    */
653   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
654       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
655           "Sending retransmission event when this much reordering "
656           "(0 disable, -1 automatic)",
657           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
658           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
659   /**
660    * GstRtpJitterBuffer::rtx-retry-timeout:
661    *
662    * When no packet has been received after sending a retransmission event
663    * for this time, retry sending a retransmission event.
664    *
665    * When -1 is used, the value will be estimated based on observed round
666    * trip time.
667    *
668    * Since: 1.2
669    */
670   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
671       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
672           "Retry sending a transmission event after this timeout in "
673           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
674           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
675   /**
676    * GstRtpJitterBuffer::rtx-min-retry-timeout:
677    *
678    * The minimum amount of time between retry timeouts. When
679    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
680    * minimum interval between retry timeouts.
681    *
682    * When -1 is used, the value will be estimated based on the
683    * packet spacing.
684    *
685    * Since: 1.6
686    */
687   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
688       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
689           "Minimum timeout between sending a transmission event in "
690           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
691           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
692   /**
693    * GstRtpJitterBuffer:rtx-retry-period:
694    *
695    * The amount of time to try to get a retransmission.
696    *
697    * When -1 is used, the value will be estimated based on the jitterbuffer
698    * latency and the observed round trip time.
699    *
700    * Since: 1.2
701    */
702   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
703       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
704           "Try to get a retransmission for this many ms "
705           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
706           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
707   /**
708    * GstRtpJitterBuffer:rtx-max-retries:
709    *
710    * The maximum number of retries to request a retransmission.
711    *
712    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
713    * When -1 is used, the number of retransmission request will not be limited.
714    *
715    * Since: 1.6
716    */
717   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
718       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
719           "The maximum number of retries to request a retransmission. "
720           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
721           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
722   /**
723    * GstRtpJitterBuffer:rtx-deadline:
724    *
725    * The deadline for a valid RTX request in ms.
726    *
727    * How long the RTX RTCP will be valid for.
728    * When -1 is used, the size of the jitterbuffer will be used.
729    *
730    * Since: 1.10
731    */
732   g_object_class_install_property (gobject_class, PROP_RTX_DEADLINE,
733       g_param_spec_int ("rtx-deadline", "RTX Deadline (ms)",
734           "The deadline for a valid RTX request in milliseconds. "
735           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DEADLINE,
736           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
737 /**
738    * GstRtpJitterBuffer::rtx-stats-timeout:
739    *
740    * The time to wait for a retransmitted packet after it has been
741    * considered lost in order to collect RTX statistics.
742    *
743    * Since: 1.10
744    */
745   g_object_class_install_property (gobject_class, PROP_RTX_STATS_TIMEOUT,
746       g_param_spec_uint ("rtx-stats-timeout", "RTX Statistics Timeout",
747           "The time to wait for a retransmitted packet after it has been "
748           "considered lost in order to collect statistics (ms)",
749           0, G_MAXUINT, DEFAULT_RTX_STATS_TIMEOUT,
750           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
751
752   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
753       g_param_spec_uint ("max-dropout-time", "Max dropout time",
754           "The maximum time (milliseconds) of missing packets tolerated.",
755           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
756           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
757
758   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
759       g_param_spec_uint ("max-misorder-time", "Max misorder time",
760           "The maximum time (milliseconds) of misordered packets tolerated.",
761           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
762           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
763   /**
764    * GstRtpJitterBuffer:stats:
765    *
766    * Various jitterbuffer statistics. This property returns a GstStructure
767    * with name application/x-rtp-jitterbuffer-stats with the following fields:
768    *
769    * <itemizedlist>
770    * <listitem>
771    *   <para>
772    *   #guint64
773    *   <classname>&quot;num-pushed&quot;</classname>:
774    *   the number of packets pushed out.
775    *   </para>
776    * </listitem>
777    * <listitem>
778    *   <para>
779    *   #guint64
780    *   <classname>&quot;num-lost&quot;</classname>:
781    *   the number of packets considered lost.
782    *   </para>
783    * </listitem>
784    * <listitem>
785    *   <para>
786    *   #guint64
787    *   <classname>&quot;num-late&quot;</classname>:
788    *   the number of packets arriving too late.
789    *   </para>
790    * </listitem>
791    * <listitem>
792    *   <para>
793    *   #guint64
794    *   <classname>&quot;num-duplicates&quot;</classname>:
795    *   the number of duplicate packets.
796    *   </para>
797    * </listitem>
798    * <listitem>
799    *   <para>
800    *   #guint64
801    *   <classname>&quot;rtx-count&quot;</classname>:
802    *   the number of retransmissions requested.
803    *   </para>
804    * </listitem>
805    * <listitem>
806    *   <para>
807    *   #guint64
808    *   <classname>&quot;rtx-success-count&quot;</classname>:
809    *   the number of successful retransmissions.
810    *   </para>
811    * </listitem>
812    * <listitem>
813    *   <para>
814    *   #gdouble
815    *   <classname>&quot;rtx-per-packet&quot;</classname>:
816    *   average number of RTX per packet.
817    *   </para>
818    * </listitem>
819    * <listitem>
820    *   <para>
821    *   #guint64
822    *   <classname>&quot;rtx-rtt&quot;</classname>:
823    *   average round trip time per RTX.
824    *   </para>
825    * </listitem>
826    * </itemizedlist>
827    *
828    * Since: 1.4
829    */
830   g_object_class_install_property (gobject_class, PROP_STATS,
831       g_param_spec_boxed ("stats", "Statistics",
832           "Various statistics", GST_TYPE_STRUCTURE,
833           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
834
835   /**
836    * GstRtpJitterBuffer:max-rtcp-rtp-time-diff
837    *
838    * The maximum amount of time in ms that the RTP time in the RTCP SRs
839    * is allowed to be ahead of the last RTP packet we received. Use
840    * -1 to disable ignoring of RTCP packets.
841    *
842    * Since: 1.8
843    */
844   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
845       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
846           "Maximum amount of time in ms that the RTP time in RTCP SRs "
847           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
848           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
849           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
850
851   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
852       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
853           "Synchronize received streams to the RFC7273 clock "
854           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
855           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
856
857   /**
858    * GstRtpJitterBuffer:faststart-min-packets
859    *
860    * The number of consecutive packets needed to start (set to 0 to
861    * disable faststart. The jitterbuffer will by default start after the
862    * latency has elapsed)
863    *
864    * Since: 1.14
865    */
866   g_object_class_install_property (gobject_class, PROP_FASTSTART_MIN_PACKETS,
867       g_param_spec_uint ("faststart-min-packets", "Faststart minimum packets",
868           "The number of consecutive packets needed to start (set to 0 to "
869           "disable faststart. The jitterbuffer will by default start after "
870           "the latency has elapsed)",
871           0, G_MAXUINT, DEFAULT_FASTSTART_MIN_PACKETS,
872           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
873
874   /**
875    * GstRtpJitterBuffer::request-pt-map:
876    * @buffer: the object which received the signal
877    * @pt: the pt
878    *
879    * Request the payload type as #GstCaps for @pt.
880    */
881   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
882       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
883       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
884           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
885       GST_TYPE_CAPS, 1, G_TYPE_UINT);
886   /**
887    * GstRtpJitterBuffer::handle-sync:
888    * @buffer: the object which received the signal
889    * @struct: a GstStructure containing sync values.
890    *
891    * Be notified of new sync values.
892    */
893   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
894       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
895       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
896           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
897       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
898
899   /**
900    * GstRtpJitterBuffer::on-npt-stop:
901    * @buffer: the object which received the signal
902    *
903    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
904    * the npt-stop position.
905    */
906   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
907       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
908       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
909           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
910       G_TYPE_NONE, 0, G_TYPE_NONE);
911
912   /**
913    * GstRtpJitterBuffer::clear-pt-map:
914    * @buffer: the object which received the signal
915    *
916    * Invalidate the clock-rate as obtained with the
917    * #GstRtpJitterBuffer::request-pt-map signal.
918    */
919   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
920       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
921       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
922       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
923       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
924
925   /**
926    * GstRtpJitterBuffer::set-active:
927    * @buffer: the object which received the signal
928    *
929    * Start pushing out packets with the given base time. This signal is only
930    * useful in buffering mode.
931    *
932    * Returns: the time of the last pushed packet.
933    */
934   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
935       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
936       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
937       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
938       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
939       G_TYPE_UINT64);
940
941   gstelement_class->change_state =
942       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
943   gstelement_class->request_new_pad =
944       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
945   gstelement_class->release_pad =
946       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
947   gstelement_class->provide_clock =
948       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
949   gstelement_class->set_clock =
950       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
951
952   gst_element_class_add_static_pad_template (gstelement_class,
953       &gst_rtp_jitter_buffer_src_template);
954   gst_element_class_add_static_pad_template (gstelement_class,
955       &gst_rtp_jitter_buffer_sink_template);
956   gst_element_class_add_static_pad_template (gstelement_class,
957       &gst_rtp_jitter_buffer_sink_rtcp_template);
958
959   gst_element_class_set_static_metadata (gstelement_class,
960       "RTP packet jitter-buffer", "Filter/Network/RTP",
961       "A buffer that deals with network jitter and other transmission faults",
962       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
963       "Wim Taymans <wim.taymans@gmail.com>");
964
965   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
966   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
967
968   GST_DEBUG_CATEGORY_INIT
969       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
970 }
971
972 static void
973 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
974 {
975   GstRtpJitterBufferPrivate *priv;
976
977   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
978   jitterbuffer->priv = priv;
979
980   priv->latency_ms = DEFAULT_LATENCY_MS;
981   priv->latency_ns = priv->latency_ms * GST_MSECOND;
982   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
983   priv->do_lost = DEFAULT_DO_LOST;
984   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
985   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
986   priv->rtx_delay = DEFAULT_RTX_DELAY;
987   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
988   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
989   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
990   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
991   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
992   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
993   priv->rtx_deadline_ms = DEFAULT_RTX_DEADLINE;
994   priv->rtx_stats_timeout = DEFAULT_RTX_STATS_TIMEOUT;
995   priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
996   priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
997   priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
998   priv->faststart_min_packets = DEFAULT_FASTSTART_MIN_PACKETS;
999
1000   priv->last_dts = -1;
1001   priv->last_rtptime = -1;
1002   priv->avg_jitter = 0;
1003   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
1004   priv->rtx_stats_timers = timer_queue_new ();
1005   priv->jbuf = rtp_jitter_buffer_new ();
1006   g_mutex_init (&priv->jbuf_lock);
1007   g_cond_init (&priv->jbuf_timer);
1008   g_cond_init (&priv->jbuf_event);
1009   g_cond_init (&priv->jbuf_query);
1010   g_queue_init (&priv->gap_packets);
1011   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1012
1013   /* reset skew detection initialy */
1014   rtp_jitter_buffer_reset_skew (priv->jbuf);
1015   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
1016   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1017   priv->active = TRUE;
1018
1019   priv->srcpad =
1020       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
1021       "src");
1022
1023   gst_pad_set_activatemode_function (priv->srcpad,
1024       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
1025   gst_pad_set_query_function (priv->srcpad,
1026       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
1027   gst_pad_set_event_function (priv->srcpad,
1028       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
1029
1030   priv->sinkpad =
1031       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
1032       "sink");
1033
1034   gst_pad_set_chain_function (priv->sinkpad,
1035       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
1036   gst_pad_set_event_function (priv->sinkpad,
1037       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
1038   gst_pad_set_query_function (priv->sinkpad,
1039       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
1040
1041   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
1042   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
1043
1044   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
1045 }
1046
1047 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
1048
1049 #define ITEM_TYPE_BUFFER        0
1050 #define ITEM_TYPE_LOST          1
1051 #define ITEM_TYPE_EVENT         2
1052 #define ITEM_TYPE_QUERY         3
1053
1054 static RTPJitterBufferItem *
1055 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
1056     guint seqnum, guint count, guint rtptime)
1057 {
1058   RTPJitterBufferItem *item;
1059
1060   item = g_slice_new (RTPJitterBufferItem);
1061   item->data = data;
1062   item->next = NULL;
1063   item->prev = NULL;
1064   item->type = type;
1065   item->dts = dts;
1066   item->pts = pts;
1067   item->seqnum = seqnum;
1068   item->count = count;
1069   item->rtptime = rtptime;
1070
1071   return item;
1072 }
1073
1074 static void
1075 free_item (RTPJitterBufferItem * item)
1076 {
1077   g_return_if_fail (item != NULL);
1078
1079   if (item->data && item->type != ITEM_TYPE_QUERY)
1080     gst_mini_object_unref (item->data);
1081   g_slice_free (RTPJitterBufferItem, item);
1082 }
1083
1084 static void
1085 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
1086 {
1087   GList **l = user_data;
1088
1089   if (item->data && item->type == ITEM_TYPE_EVENT
1090       && GST_EVENT_IS_STICKY (item->data)) {
1091     *l = g_list_prepend (*l, item->data);
1092   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
1093     gst_mini_object_unref (item->data);
1094   }
1095   g_slice_free (RTPJitterBufferItem, item);
1096 }
1097
1098 static void
1099 gst_rtp_jitter_buffer_finalize (GObject * object)
1100 {
1101   GstRtpJitterBuffer *jitterbuffer;
1102   GstRtpJitterBufferPrivate *priv;
1103
1104   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1105   priv = jitterbuffer->priv;
1106
1107   g_array_free (priv->timers, TRUE);
1108   timer_queue_free (priv->rtx_stats_timers);
1109   g_mutex_clear (&priv->jbuf_lock);
1110   g_cond_clear (&priv->jbuf_timer);
1111   g_cond_clear (&priv->jbuf_event);
1112   g_cond_clear (&priv->jbuf_query);
1113
1114   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1115   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1116   g_queue_clear (&priv->gap_packets);
1117   g_object_unref (priv->jbuf);
1118
1119   G_OBJECT_CLASS (parent_class)->finalize (object);
1120 }
1121
1122 static GstIterator *
1123 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
1124 {
1125   GstRtpJitterBuffer *jitterbuffer;
1126   GstPad *otherpad = NULL;
1127   GstIterator *it = NULL;
1128   GValue val = { 0, };
1129
1130   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1131
1132   if (pad == jitterbuffer->priv->sinkpad) {
1133     otherpad = jitterbuffer->priv->srcpad;
1134   } else if (pad == jitterbuffer->priv->srcpad) {
1135     otherpad = jitterbuffer->priv->sinkpad;
1136   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
1137     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1138   }
1139
1140   if (it == NULL) {
1141     g_value_init (&val, GST_TYPE_PAD);
1142     g_value_set_object (&val, otherpad);
1143     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1144     g_value_unset (&val);
1145   }
1146
1147   return it;
1148 }
1149
1150 static GstPad *
1151 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1152 {
1153   GstRtpJitterBufferPrivate *priv;
1154
1155   priv = jitterbuffer->priv;
1156
1157   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
1158
1159   priv->rtcpsinkpad =
1160       gst_pad_new_from_static_template
1161       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
1162   gst_pad_set_chain_function (priv->rtcpsinkpad,
1163       gst_rtp_jitter_buffer_chain_rtcp);
1164   gst_pad_set_event_function (priv->rtcpsinkpad,
1165       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
1166   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
1167       gst_rtp_jitter_buffer_iterate_internal_links);
1168   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
1169   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1170
1171   return priv->rtcpsinkpad;
1172 }
1173
1174 static void
1175 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1176 {
1177   GstRtpJitterBufferPrivate *priv;
1178
1179   priv = jitterbuffer->priv;
1180
1181   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
1182
1183   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
1184
1185   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1186   priv->rtcpsinkpad = NULL;
1187 }
1188
1189 static GstPad *
1190 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
1191     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
1192 {
1193   GstRtpJitterBuffer *jitterbuffer;
1194   GstElementClass *klass;
1195   GstPad *result;
1196   GstRtpJitterBufferPrivate *priv;
1197
1198   g_return_val_if_fail (templ != NULL, NULL);
1199   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
1200
1201   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1202   priv = jitterbuffer->priv;
1203   klass = GST_ELEMENT_GET_CLASS (element);
1204
1205   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1206
1207   /* figure out the template */
1208   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1209     if (priv->rtcpsinkpad != NULL)
1210       goto exists;
1211
1212     result = create_rtcp_sink (jitterbuffer);
1213   } else
1214     goto wrong_template;
1215
1216   return result;
1217
1218   /* ERRORS */
1219 wrong_template:
1220   {
1221     g_warning ("rtpjitterbuffer: this is not our template");
1222     return NULL;
1223   }
1224 exists:
1225   {
1226     g_warning ("rtpjitterbuffer: pad already requested");
1227     return NULL;
1228   }
1229 }
1230
1231 static void
1232 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1233 {
1234   GstRtpJitterBuffer *jitterbuffer;
1235   GstRtpJitterBufferPrivate *priv;
1236
1237   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1238   g_return_if_fail (GST_IS_PAD (pad));
1239
1240   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1241   priv = jitterbuffer->priv;
1242
1243   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1244
1245   if (priv->rtcpsinkpad == pad) {
1246     remove_rtcp_sink (jitterbuffer);
1247   } else
1248     goto wrong_pad;
1249
1250   return;
1251
1252   /* ERRORS */
1253 wrong_pad:
1254   {
1255     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1256     return;
1257   }
1258 }
1259
1260 static GstClock *
1261 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1262 {
1263   return gst_system_clock_obtain ();
1264 }
1265
1266 static gboolean
1267 gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
1268 {
1269   GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1270
1271   rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
1272
1273   return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
1274 }
1275
1276 static void
1277 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1278 {
1279   GstRtpJitterBufferPrivate *priv;
1280
1281   priv = jitterbuffer->priv;
1282
1283   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1284
1285   JBUF_LOCK (priv);
1286   priv->clock_rate = -1;
1287   /* do not clear current content, but refresh state for new arrival */
1288   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1289   rtp_jitter_buffer_reset_skew (priv->jbuf);
1290   JBUF_UNLOCK (priv);
1291 }
1292
1293 static GstClockTime
1294 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1295     guint64 offset)
1296 {
1297   GstRtpJitterBufferPrivate *priv;
1298   GstClockTime last_out;
1299   RTPJitterBufferItem *item;
1300
1301   priv = jbuf->priv;
1302
1303   JBUF_LOCK (priv);
1304   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1305       active, GST_TIME_ARGS (offset));
1306
1307   if (active != priv->active) {
1308     /* add the amount of time spent in paused to the output offset. All
1309      * outgoing buffers will have this offset applied to their timestamps in
1310      * order to make them arrive in time in the sink. */
1311     priv->out_offset = offset;
1312     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1313         GST_TIME_ARGS (priv->out_offset));
1314     priv->active = active;
1315     JBUF_SIGNAL_EVENT (priv);
1316   }
1317   if (!active) {
1318     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1319   }
1320   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1321     /* head buffer timestamp and offset gives our output time */
1322     last_out = item->pts + priv->ts_offset;
1323   } else {
1324     /* use last known time when the buffer is empty */
1325     last_out = priv->last_out_time;
1326   }
1327   JBUF_UNLOCK (priv);
1328
1329   return last_out;
1330 }
1331
1332 static GstCaps *
1333 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1334 {
1335   GstRtpJitterBuffer *jitterbuffer;
1336   GstRtpJitterBufferPrivate *priv;
1337   GstPad *other;
1338   GstCaps *caps;
1339   GstCaps *templ;
1340
1341   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1342   priv = jitterbuffer->priv;
1343
1344   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1345
1346   caps = gst_pad_peer_query_caps (other, filter);
1347
1348   templ = gst_pad_get_pad_template_caps (pad);
1349   if (caps == NULL) {
1350     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1351     caps = templ;
1352   } else {
1353     GstCaps *intersect;
1354
1355     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1356
1357     intersect = gst_caps_intersect (caps, templ);
1358     gst_caps_unref (caps);
1359     gst_caps_unref (templ);
1360
1361     caps = intersect;
1362   }
1363   gst_object_unref (jitterbuffer);
1364
1365   return caps;
1366 }
1367
1368 /*
1369  * Must be called with JBUF_LOCK held
1370  */
1371
1372 static gboolean
1373 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1374     GstCaps * caps, gint pt)
1375 {
1376   GstRtpJitterBufferPrivate *priv;
1377   GstStructure *caps_struct;
1378   guint val;
1379   gint payload = -1;
1380   GstClockTime tval;
1381   const gchar *ts_refclk, *mediaclk;
1382
1383   priv = jitterbuffer->priv;
1384
1385   /* first parse the caps */
1386   caps_struct = gst_caps_get_structure (caps, 0);
1387
1388   GST_DEBUG_OBJECT (jitterbuffer, "got caps %" GST_PTR_FORMAT, caps);
1389
1390   if (gst_structure_get_int (caps_struct, "payload", &payload) && pt != -1
1391       && payload != pt) {
1392     GST_ERROR_OBJECT (jitterbuffer,
1393         "Got caps with wrong payload type (got %d, expected %d)", payload, pt);
1394     return FALSE;
1395   }
1396
1397   if (payload != -1) {
1398     GST_DEBUG_OBJECT (jitterbuffer, "Got payload type %d", payload);
1399     priv->last_pt = payload;
1400   }
1401
1402   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1403    * measure the amount of data in the buffer */
1404   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1405     goto error;
1406
1407   if (priv->clock_rate <= 0)
1408     goto wrong_rate;
1409
1410   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1411
1412   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1413
1414   gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
1415
1416   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1417    * can use this to track the amount of time elapsed on the sender. */
1418   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1419     priv->clock_base = val;
1420   else
1421     priv->clock_base = -1;
1422
1423   priv->ext_timestamp = priv->clock_base;
1424
1425   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1426       priv->clock_base);
1427
1428   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1429     /* first expected seqnum, only update when we didn't have a previous base. */
1430     if (priv->next_in_seqnum == -1)
1431       priv->next_in_seqnum = val;
1432     if (priv->next_seqnum == -1) {
1433       priv->next_seqnum = val;
1434       JBUF_SIGNAL_EVENT (priv);
1435     }
1436     priv->seqnum_base = val;
1437   } else {
1438     priv->seqnum_base = -1;
1439   }
1440
1441   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1442
1443   /* the start and stop times. The seqnum-base corresponds to the start time. We
1444    * will keep track of the seqnums on the output and when we reach the one
1445    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1446   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1447     priv->npt_start = tval;
1448   else
1449     priv->npt_start = 0;
1450
1451   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1452     priv->npt_stop = tval;
1453   else
1454     priv->npt_stop = -1;
1455
1456   GST_DEBUG_OBJECT (jitterbuffer,
1457       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1458       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1459
1460   if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
1461     GstClock *clock = NULL;
1462     guint64 clock_offset = -1;
1463
1464     GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
1465         ts_refclk);
1466
1467     if (g_str_has_prefix (ts_refclk, "ntp=")) {
1468       if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
1469         GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
1470       } else {
1471         const gchar *host, *portstr;
1472         gchar *hostname;
1473         guint port;
1474
1475         host = ts_refclk + sizeof ("ntp=") - 1;
1476         if (host[0] == '[') {
1477           /* IPv6 */
1478           portstr = strchr (host, ']');
1479           if (portstr && portstr[1] == ':')
1480             portstr = portstr + 1;
1481           else
1482             portstr = NULL;
1483         } else {
1484           portstr = strrchr (host, ':');
1485         }
1486
1487
1488         if (!portstr || sscanf (portstr, ":%u", &port) != 1)
1489           port = 123;
1490
1491         if (portstr)
1492           hostname = g_strndup (host, (portstr - host));
1493         else
1494           hostname = g_strdup (host);
1495
1496         clock = gst_ntp_clock_new (NULL, hostname, port, 0);
1497         g_free (hostname);
1498       }
1499     } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
1500       const gchar *domainstr =
1501           ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
1502       guint domain;
1503
1504       if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
1505         domain = 0;
1506
1507       clock = gst_ptp_clock_new (NULL, domain);
1508     } else {
1509       GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
1510     }
1511
1512     if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
1513       GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
1514
1515       if (!g_str_has_prefix (mediaclk, "direct=")
1516           || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
1517         GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
1518       if (strstr (mediaclk, "rate=") != NULL) {
1519         GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
1520         clock_offset = -1;
1521       }
1522     }
1523
1524     rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
1525   } else {
1526     rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
1527   }
1528
1529   return TRUE;
1530
1531   /* ERRORS */
1532 error:
1533   {
1534     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1535     return FALSE;
1536   }
1537 wrong_rate:
1538   {
1539     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1540     return FALSE;
1541   }
1542 }
1543
1544 static void
1545 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1546 {
1547   GstRtpJitterBufferPrivate *priv;
1548
1549   priv = jitterbuffer->priv;
1550
1551   JBUF_LOCK (priv);
1552   /* mark ourselves as flushing */
1553   priv->srcresult = GST_FLOW_FLUSHING;
1554   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1555   /* this unblocks any waiting pops on the src pad task */
1556   JBUF_SIGNAL_EVENT (priv);
1557   JBUF_SIGNAL_QUERY (priv, FALSE);
1558   JBUF_UNLOCK (priv);
1559 }
1560
1561 static void
1562 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1563 {
1564   GstRtpJitterBufferPrivate *priv;
1565
1566   priv = jitterbuffer->priv;
1567
1568   JBUF_LOCK (priv);
1569   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1570   /* Mark as non flushing */
1571   priv->srcresult = GST_FLOW_OK;
1572   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1573   priv->last_popped_seqnum = -1;
1574   priv->last_out_time = -1;
1575   priv->next_seqnum = -1;
1576   priv->seqnum_base = -1;
1577   priv->ips_rtptime = -1;
1578   priv->ips_pts = GST_CLOCK_TIME_NONE;
1579   priv->packet_spacing = 0;
1580   priv->next_in_seqnum = -1;
1581   priv->clock_rate = -1;
1582   priv->last_pt = -1;
1583   priv->eos = FALSE;
1584   priv->estimated_eos = -1;
1585   priv->last_elapsed = 0;
1586   priv->ext_timestamp = -1;
1587   priv->avg_jitter = 0;
1588   priv->last_dts = -1;
1589   priv->last_rtptime = -1;
1590   priv->last_in_pts = 0;
1591   priv->equidistant = 0;
1592   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1593   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1594   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1595   rtp_jitter_buffer_reset_skew (priv->jbuf);
1596   remove_all_timers (jitterbuffer);
1597   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1598   g_queue_clear (&priv->gap_packets);
1599   JBUF_UNLOCK (priv);
1600 }
1601
1602 static gboolean
1603 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1604     GstPadMode mode, gboolean active)
1605 {
1606   gboolean result;
1607   GstRtpJitterBuffer *jitterbuffer = NULL;
1608
1609   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1610
1611   switch (mode) {
1612     case GST_PAD_MODE_PUSH:
1613       if (active) {
1614         /* allow data processing */
1615         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1616
1617         /* start pushing out buffers */
1618         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1619         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1620             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1621       } else {
1622         /* make sure all data processing stops ASAP */
1623         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1624
1625         /* NOTE this will hardlock if the state change is called from the src pad
1626          * task thread because we will _join() the thread. */
1627         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1628         result = gst_pad_stop_task (pad);
1629       }
1630       break;
1631     default:
1632       result = FALSE;
1633       break;
1634   }
1635   return result;
1636 }
1637
1638 static GstStateChangeReturn
1639 gst_rtp_jitter_buffer_change_state (GstElement * element,
1640     GstStateChange transition)
1641 {
1642   GstRtpJitterBuffer *jitterbuffer;
1643   GstRtpJitterBufferPrivate *priv;
1644   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1645
1646   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1647   priv = jitterbuffer->priv;
1648
1649   switch (transition) {
1650     case GST_STATE_CHANGE_NULL_TO_READY:
1651       break;
1652     case GST_STATE_CHANGE_READY_TO_PAUSED:
1653       JBUF_LOCK (priv);
1654       /* reset negotiated values */
1655       priv->clock_rate = -1;
1656       priv->clock_base = -1;
1657       priv->peer_latency = 0;
1658       priv->last_pt = -1;
1659       /* block until we go to PLAYING */
1660       priv->blocked = TRUE;
1661       priv->timer_running = TRUE;
1662       priv->timer_thread =
1663           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1664       JBUF_UNLOCK (priv);
1665       break;
1666     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1667       JBUF_LOCK (priv);
1668       /* unblock to allow streaming in PLAYING */
1669       priv->blocked = FALSE;
1670       JBUF_SIGNAL_EVENT (priv);
1671       JBUF_SIGNAL_TIMER (priv);
1672       JBUF_UNLOCK (priv);
1673       break;
1674     default:
1675       break;
1676   }
1677
1678   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1679
1680   switch (transition) {
1681     case GST_STATE_CHANGE_READY_TO_PAUSED:
1682       /* we are a live element because we sync to the clock, which we can only
1683        * do in the PLAYING state */
1684       if (ret != GST_STATE_CHANGE_FAILURE)
1685         ret = GST_STATE_CHANGE_NO_PREROLL;
1686       break;
1687     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1688       JBUF_LOCK (priv);
1689       /* block to stop streaming when PAUSED */
1690       priv->blocked = TRUE;
1691       unschedule_current_timer (jitterbuffer);
1692       JBUF_UNLOCK (priv);
1693       if (ret != GST_STATE_CHANGE_FAILURE)
1694         ret = GST_STATE_CHANGE_NO_PREROLL;
1695       break;
1696     case GST_STATE_CHANGE_PAUSED_TO_READY:
1697       JBUF_LOCK (priv);
1698       gst_buffer_replace (&priv->last_sr, NULL);
1699       priv->timer_running = FALSE;
1700       unschedule_current_timer (jitterbuffer);
1701       JBUF_SIGNAL_TIMER (priv);
1702       JBUF_SIGNAL_QUERY (priv, FALSE);
1703       JBUF_UNLOCK (priv);
1704       g_thread_join (priv->timer_thread);
1705       priv->timer_thread = NULL;
1706       break;
1707     case GST_STATE_CHANGE_READY_TO_NULL:
1708       break;
1709     default:
1710       break;
1711   }
1712
1713   return ret;
1714 }
1715
1716 static gboolean
1717 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1718     GstEvent * event)
1719 {
1720   gboolean ret = TRUE;
1721   GstRtpJitterBuffer *jitterbuffer;
1722   GstRtpJitterBufferPrivate *priv;
1723
1724   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1725   priv = jitterbuffer->priv;
1726
1727   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1728
1729   switch (GST_EVENT_TYPE (event)) {
1730     case GST_EVENT_LATENCY:
1731     {
1732       GstClockTime latency;
1733
1734       gst_event_parse_latency (event, &latency);
1735
1736       GST_DEBUG_OBJECT (jitterbuffer,
1737           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1738
1739       JBUF_LOCK (priv);
1740       /* adjust the overall buffer delay to the total pipeline latency in
1741        * buffering mode because if downstream consumes too fast (because of
1742        * large latency or queues, we would start rebuffering again. */
1743       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1744           RTP_JITTER_BUFFER_MODE_BUFFER) {
1745         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1746       }
1747       JBUF_UNLOCK (priv);
1748
1749       ret = gst_pad_push_event (priv->sinkpad, event);
1750       break;
1751     }
1752     default:
1753       ret = gst_pad_push_event (priv->sinkpad, event);
1754       break;
1755   }
1756
1757   return ret;
1758 }
1759
1760 /* handles and stores the event in the jitterbuffer, must be called with
1761  * LOCK */
1762 static gboolean
1763 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1764 {
1765   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1766   RTPJitterBufferItem *item;
1767   gboolean head;
1768
1769   switch (GST_EVENT_TYPE (event)) {
1770     case GST_EVENT_CAPS:
1771     {
1772       GstCaps *caps;
1773
1774       gst_event_parse_caps (event, &caps);
1775       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, -1);
1776       break;
1777     }
1778     case GST_EVENT_SEGMENT:
1779     {
1780       GstSegment segment;
1781       gst_event_copy_segment (event, &segment);
1782
1783       /* we need time for now */
1784       if (segment.format != GST_FORMAT_TIME) {
1785         GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
1786         gst_event_unref (event);
1787
1788         gst_segment_init (&segment, GST_FORMAT_TIME);
1789         event = gst_event_new_segment (&segment);
1790       }
1791
1792       priv->segment = segment;
1793       break;
1794     }
1795     case GST_EVENT_EOS:
1796       priv->eos = TRUE;
1797       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1798       break;
1799     default:
1800       break;
1801   }
1802
1803
1804   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1805   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1806   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1807   if (head)
1808     JBUF_SIGNAL_EVENT (priv);
1809
1810   return TRUE;
1811 }
1812
1813 static gboolean
1814 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1815     GstEvent * event)
1816 {
1817   gboolean ret = TRUE;
1818   GstRtpJitterBuffer *jitterbuffer;
1819   GstRtpJitterBufferPrivate *priv;
1820
1821   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1822   priv = jitterbuffer->priv;
1823
1824   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1825
1826   switch (GST_EVENT_TYPE (event)) {
1827     case GST_EVENT_FLUSH_START:
1828       ret = gst_pad_push_event (priv->srcpad, event);
1829       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1830       /* wait for the loop to go into PAUSED */
1831       gst_pad_pause_task (priv->srcpad);
1832       break;
1833     case GST_EVENT_FLUSH_STOP:
1834       ret = gst_pad_push_event (priv->srcpad, event);
1835       ret =
1836           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1837           GST_PAD_MODE_PUSH, TRUE);
1838       break;
1839     default:
1840       if (GST_EVENT_IS_SERIALIZED (event)) {
1841         /* serialized events go in the queue */
1842         JBUF_LOCK (priv);
1843         if (priv->srcresult != GST_FLOW_OK) {
1844           /* Errors in sticky event pushing are no problem and ignored here
1845            * as they will cause more meaningful errors during data flow.
1846            * For EOS events, that are not followed by data flow, we still
1847            * return FALSE here though.
1848            */
1849           if (!GST_EVENT_IS_STICKY (event) ||
1850               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1851             goto out_flow_error;
1852         }
1853         /* refuse more events on EOS */
1854         if (priv->eos)
1855           goto out_eos;
1856         ret = queue_event (jitterbuffer, event);
1857         JBUF_UNLOCK (priv);
1858       } else {
1859         /* non-serialized events are forwarded downstream immediately */
1860         ret = gst_pad_push_event (priv->srcpad, event);
1861       }
1862       break;
1863   }
1864   return ret;
1865
1866   /* ERRORS */
1867 out_flow_error:
1868   {
1869     GST_DEBUG_OBJECT (jitterbuffer,
1870         "refusing event, we have a downstream flow error: %s",
1871         gst_flow_get_name (priv->srcresult));
1872     JBUF_UNLOCK (priv);
1873     gst_event_unref (event);
1874     return FALSE;
1875   }
1876 out_eos:
1877   {
1878     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1879     JBUF_UNLOCK (priv);
1880     gst_event_unref (event);
1881     return FALSE;
1882   }
1883 }
1884
1885 static gboolean
1886 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1887     GstEvent * event)
1888 {
1889   gboolean ret = TRUE;
1890   GstRtpJitterBuffer *jitterbuffer;
1891
1892   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1893
1894   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1895
1896   switch (GST_EVENT_TYPE (event)) {
1897     case GST_EVENT_FLUSH_START:
1898       gst_event_unref (event);
1899       break;
1900     case GST_EVENT_FLUSH_STOP:
1901       gst_event_unref (event);
1902       break;
1903     default:
1904       ret = gst_pad_event_default (pad, parent, event);
1905       break;
1906   }
1907
1908   return ret;
1909 }
1910
1911 /*
1912  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1913  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1914  * GST_FLOW_FLUSHING when the element is shutting down. On success
1915  * GST_FLOW_OK is returned.
1916  */
1917 static GstFlowReturn
1918 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1919     guint8 pt)
1920 {
1921   GValue ret = { 0 };
1922   GValue args[2] = { {0}, {0} };
1923   GstCaps *caps;
1924   gboolean res;
1925
1926   g_value_init (&args[0], GST_TYPE_ELEMENT);
1927   g_value_set_object (&args[0], jitterbuffer);
1928   g_value_init (&args[1], G_TYPE_UINT);
1929   g_value_set_uint (&args[1], pt);
1930
1931   g_value_init (&ret, GST_TYPE_CAPS);
1932   g_value_set_boxed (&ret, NULL);
1933
1934   JBUF_UNLOCK (jitterbuffer->priv);
1935   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1936       &ret);
1937   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1938
1939   g_value_unset (&args[0]);
1940   g_value_unset (&args[1]);
1941   caps = (GstCaps *) g_value_dup_boxed (&ret);
1942   g_value_unset (&ret);
1943   if (!caps)
1944     goto no_caps;
1945
1946   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
1947   gst_caps_unref (caps);
1948
1949   if (G_UNLIKELY (!res))
1950     goto parse_failed;
1951
1952   return GST_FLOW_OK;
1953
1954   /* ERRORS */
1955 no_caps:
1956   {
1957     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1958     return GST_FLOW_ERROR;
1959   }
1960 out_flushing:
1961   {
1962     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1963     return GST_FLOW_FLUSHING;
1964   }
1965 parse_failed:
1966   {
1967     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1968     return GST_FLOW_ERROR;
1969   }
1970 }
1971
1972 /* call with jbuf lock held */
1973 static GstMessage *
1974 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1975 {
1976   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1977   GstMessage *message = NULL;
1978
1979   if (percent == -1)
1980     return NULL;
1981
1982   /* Post a buffering message */
1983   if (priv->last_percent != percent) {
1984     priv->last_percent = percent;
1985     message =
1986         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1987     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1988   }
1989
1990   return message;
1991 }
1992
1993 static GstClockTime
1994 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1995 {
1996   GstRtpJitterBufferPrivate *priv;
1997
1998   priv = jitterbuffer->priv;
1999
2000   if (timestamp == -1)
2001     return -1;
2002
2003   /* apply the timestamp offset, this is used for inter stream sync */
2004   timestamp += priv->ts_offset;
2005   /* add the offset, this is used when buffering */
2006   timestamp += priv->out_offset;
2007
2008   return timestamp;
2009 }
2010
2011 static TimerQueue *
2012 timer_queue_new (void)
2013 {
2014   TimerQueue *queue;
2015
2016   queue = g_slice_new (TimerQueue);
2017   queue->timers = g_queue_new ();
2018   queue->hashtable = g_hash_table_new (NULL, NULL);
2019
2020   return queue;
2021 }
2022
2023 static void
2024 timer_queue_free (TimerQueue * queue)
2025 {
2026   if (!queue)
2027     return;
2028
2029   g_hash_table_destroy (queue->hashtable);
2030   g_queue_free_full (queue->timers, g_free);
2031   g_slice_free (TimerQueue, queue);
2032 }
2033
2034 static void
2035 timer_queue_append (TimerQueue * queue, const TimerData * timer,
2036     GstClockTime timeout, gboolean lost)
2037 {
2038   TimerData *copy;
2039
2040   copy = g_memdup (timer, sizeof (*timer));
2041   copy->timeout = timeout;
2042   copy->type = lost ? TIMER_TYPE_LOST : TIMER_TYPE_EXPECTED;
2043   copy->idx = -1;
2044
2045   GST_LOG ("Append rtx-stats timer #%d, %" GST_TIME_FORMAT,
2046       copy->seqnum, GST_TIME_ARGS (copy->timeout));
2047   g_queue_push_tail (queue->timers, copy);
2048   g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (copy->seqnum), copy);
2049 }
2050
2051 static void
2052 timer_queue_clear_until (TimerQueue * queue, GstClockTime timeout)
2053 {
2054   TimerData *test;
2055
2056   test = g_queue_peek_head (queue->timers);
2057   while (test && test->timeout < timeout) {
2058     GST_LOG ("Pop rtx-stats timer #%d, %" GST_TIME_FORMAT " < %"
2059         GST_TIME_FORMAT, test->seqnum, GST_TIME_ARGS (test->timeout),
2060         GST_TIME_ARGS (timeout));
2061     g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (test->seqnum));
2062     g_free (g_queue_pop_head (queue->timers));
2063     test = g_queue_peek_head (queue->timers);
2064   }
2065 }
2066
2067 static TimerData *
2068 timer_queue_find (TimerQueue * queue, guint16 seqnum)
2069 {
2070   return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum));
2071 }
2072
2073 static TimerData *
2074 find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2075 {
2076   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2077   TimerData *timer = NULL;
2078   gint i, len;
2079
2080   len = priv->timers->len;
2081   for (i = 0; i < len; i++) {
2082     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2083     if (test->seqnum == seqnum) {
2084       timer = test;
2085       break;
2086     }
2087   }
2088   return timer;
2089 }
2090
2091 static void
2092 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
2093 {
2094   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2095
2096   if (priv->clock_id) {
2097     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
2098     gst_clock_id_unschedule (priv->clock_id);
2099     priv->clock_id = NULL;
2100   }
2101 }
2102
2103 static GstClockTime
2104 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2105 {
2106   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2107   GstClockTime test_timeout;
2108
2109   if ((test_timeout = timer->timeout) == -1)
2110     return -1;
2111
2112   if (timer->type != TIMER_TYPE_EXPECTED) {
2113     /* add our latency and offset to get output times. */
2114     test_timeout = apply_offset (jitterbuffer, test_timeout);
2115     test_timeout += priv->latency_ns;
2116   }
2117   return test_timeout;
2118 }
2119
2120 static void
2121 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2122 {
2123   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2124
2125   if (priv->clock_id) {
2126     GstClockTime timeout = get_timeout (jitterbuffer, timer);
2127
2128     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
2129         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
2130
2131     if (timeout == -1 || timeout < priv->timer_timeout)
2132       unschedule_current_timer (jitterbuffer);
2133   }
2134 }
2135
2136 static TimerData *
2137 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2138     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
2139     GstClockTime duration)
2140 {
2141   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2142   TimerData *timer;
2143   gint len;
2144
2145   GST_DEBUG_OBJECT (jitterbuffer,
2146       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
2147       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
2148       GST_TIME_ARGS (delay));
2149
2150   len = priv->timers->len;
2151   g_array_set_size (priv->timers, len + 1);
2152   timer = &g_array_index (priv->timers, TimerData, len);
2153   timer->idx = len;
2154   timer->type = type;
2155   timer->seqnum = seqnum;
2156   timer->num = num;
2157   timer->timeout = timeout + delay;
2158   timer->duration = duration;
2159   if (type == TIMER_TYPE_EXPECTED) {
2160     timer->rtx_base = timeout;
2161     timer->rtx_delay = delay;
2162     timer->rtx_retry = 0;
2163   }
2164   timer->rtx_last = GST_CLOCK_TIME_NONE;
2165   timer->num_rtx_retry = 0;
2166   timer->num_rtx_received = 0;
2167   recalculate_timer (jitterbuffer, timer);
2168   JBUF_SIGNAL_TIMER (priv);
2169
2170   return timer;
2171 }
2172
2173 static void
2174 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2175     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
2176 {
2177   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2178   gboolean seqchange, timechange;
2179   guint16 oldseq;
2180   GstClockTime new_timeout;
2181
2182   oldseq = timer->seqnum;
2183   new_timeout = timeout + delay;
2184   seqchange = oldseq != seqnum;
2185   timechange = timer->timeout != new_timeout;
2186
2187   if (!seqchange && !timechange) {
2188     GST_DEBUG_OBJECT (jitterbuffer,
2189         "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT
2190         "), skipping", oldseq, GST_TIME_ARGS (timer->timeout));
2191     return;
2192   }
2193
2194   GST_DEBUG_OBJECT (jitterbuffer,
2195       "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT
2196       "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum,
2197       GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout));
2198
2199   timer->timeout = new_timeout;
2200   timer->seqnum = seqnum;
2201   if (reset) {
2202     GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT
2203         "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay),
2204         GST_TIME_ARGS (delay));
2205     timer->rtx_base = timeout;
2206     timer->rtx_delay = delay;
2207     timer->rtx_retry = 0;
2208   }
2209   if (seqchange) {
2210     timer->num_rtx_retry = 0;
2211     timer->num_rtx_received = 0;
2212   }
2213
2214   if (priv->clock_id) {
2215     /* we changed the seqnum and there is a timer currently waiting with this
2216      * seqnum, unschedule it */
2217     if (seqchange && priv->timer_seqnum == oldseq)
2218       unschedule_current_timer (jitterbuffer);
2219     /* we changed the time, check if it is earlier than what we are waiting
2220      * for and unschedule if so */
2221     else if (timechange)
2222       recalculate_timer (jitterbuffer, timer);
2223   }
2224 }
2225
2226 static TimerData *
2227 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2228     guint16 seqnum, GstClockTime timeout)
2229 {
2230   TimerData *timer;
2231
2232   /* find the seqnum timer */
2233   timer = find_timer (jitterbuffer, seqnum);
2234   if (timer == NULL) {
2235     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
2236   } else {
2237     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
2238   }
2239   return timer;
2240 }
2241
2242 static void
2243 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2244 {
2245   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2246   guint idx;
2247
2248   if (timer->idx == -1)
2249     return;
2250
2251   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
2252     unschedule_current_timer (jitterbuffer);
2253
2254   idx = timer->idx;
2255   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
2256   g_array_remove_index_fast (priv->timers, idx);
2257   timer->idx = idx;
2258 }
2259
2260 static void
2261 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
2262 {
2263   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2264   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
2265   g_array_set_size (priv->timers, 0);
2266   unschedule_current_timer (jitterbuffer);
2267 }
2268
2269 /* get the extra delay to wait before sending RTX */
2270 static GstClockTime
2271 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
2272 {
2273   GstClockTime delay;
2274
2275   if (priv->rtx_delay == -1) {
2276     if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
2277       delay = DEFAULT_AUTO_RTX_DELAY;
2278     } else {
2279       /* jitter is in nanoseconds, maximum of 2x jitter and half the
2280        * packet spacing is a good margin */
2281       delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
2282     }
2283   } else {
2284     delay = priv->rtx_delay * GST_MSECOND;
2285   }
2286   if (priv->rtx_min_delay > 0)
2287     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
2288
2289   return delay;
2290 }
2291
2292 /* Check if packet with seqnum is already considered definitely lost by being
2293  * part of a "lost timer" for multiple packets */
2294 static gboolean
2295 already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2296 {
2297   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2298   gint i, len;
2299
2300   len = priv->timers->len;
2301   for (i = 0; i < len; i++) {
2302     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2303     gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2304
2305     if (test->num > 1 && test->type == TIMER_TYPE_LOST && gap >= 0 &&
2306         gap < test->num) {
2307       GST_DEBUG ("seqnum #%d already considered definitely lost (#%d->#%d)",
2308           seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff);
2309       return TRUE;
2310     }
2311   }
2312
2313   return FALSE;
2314 }
2315
2316 /* we just received a packet with seqnum and dts.
2317  *
2318  * First check for old seqnum that we are still expecting. If the gap with the
2319  * current seqnum is too big, unschedule the timeouts.
2320  *
2321  * If we have a valid packet spacing estimate we can set a timer for when we
2322  * should receive the next packet.
2323  * If we don't have a valid estimate, we remove any timer we might have
2324  * had for this packet.
2325  */
2326 static void
2327 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
2328     GstClockTime dts, GstClockTime pts, gboolean do_next_seqnum,
2329     gboolean is_rtx, TimerData * timer)
2330 {
2331   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2332
2333   /* go through all timers and unschedule the ones with a large gap */
2334   if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
2335     gint i, len;
2336     len = priv->timers->len;
2337     for (i = 0; i < len; i++) {
2338       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2339       gint gap;
2340
2341       gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2342
2343       GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
2344           test->type, test->seqnum, seqnum, gap);
2345
2346       if (gap > priv->rtx_delay_reorder) {
2347         /* max gap, we exceeded the max reorder distance and we don't expect the
2348          * missing packet to be this reordered */
2349         if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
2350           reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
2351       }
2352     }
2353   }
2354
2355   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
2356       && priv->do_retransmission && priv->rtx_next_seqnum;
2357
2358   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2359     if (timer->num_rtx_retry > 0) {
2360       if (is_rtx) {
2361         update_rtx_stats (jitterbuffer, timer, dts, TRUE);
2362         /* don't try to estimate the next seqnum because this is a retransmitted
2363          * packet and it probably did not arrive with the expected packet
2364          * spacing. */
2365         do_next_seqnum = FALSE;
2366       }
2367
2368       if (!is_rtx || timer->num_rtx_retry > 1) {
2369         /* Store timer in order to record stats when/if the retransmitted
2370          * packet arrives. We should also store timer information if we've
2371          * requested retransmission more than once since we may receive
2372          * several retransmitted packets. For accuracy we should update the
2373          * stats also when the redundant retransmitted packets arrives. */
2374         timer_queue_append (priv->rtx_stats_timers, timer,
2375             pts + priv->rtx_stats_timeout * GST_MSECOND, FALSE);
2376       }
2377     }
2378   }
2379
2380   if (do_next_seqnum && pts != GST_CLOCK_TIME_NONE) {
2381     GstClockTime expected, delay;
2382
2383     /* calculate expected arrival time of the next seqnum */
2384     expected = pts + priv->packet_spacing;
2385
2386     delay = get_rtx_delay (priv);
2387
2388     /* and update/install timer for next seqnum */
2389     GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer #%d, expected %"
2390         GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", packet-spacing %"
2391         GST_TIME_FORMAT ", jitter %" GST_TIME_FORMAT, priv->next_in_seqnum,
2392         GST_TIME_ARGS (expected), GST_TIME_ARGS (delay),
2393         GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter));
2394
2395     if (timer) {
2396       timer->type = TIMER_TYPE_EXPECTED;
2397       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
2398           delay, TRUE);
2399     } else {
2400       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
2401           expected, delay, priv->packet_spacing);
2402     }
2403   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2404     /* if we had a timer, remove it, we don't know when to expect the next
2405      * packet. */
2406     remove_timer (jitterbuffer, timer);
2407   }
2408 }
2409
2410 static void
2411 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2412     GstClockTime pts)
2413 {
2414   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2415
2416   /* we need consecutive seqnums with a different
2417    * rtptime to estimate the packet spacing. */
2418   if (priv->ips_rtptime != rtptime) {
2419     /* rtptime changed, check pts diff */
2420     if (priv->ips_pts != -1 && pts != -1 && pts > priv->ips_pts) {
2421       GstClockTime new_packet_spacing = pts - priv->ips_pts;
2422       GstClockTime old_packet_spacing = priv->packet_spacing;
2423
2424       /* Biased towards bigger packet spacings to prevent
2425        * too many unneeded retransmission requests for next
2426        * packets that just arrive a little later than we would
2427        * expect */
2428       if (old_packet_spacing > new_packet_spacing)
2429         priv->packet_spacing =
2430             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2431       else if (old_packet_spacing > 0)
2432         priv->packet_spacing =
2433             (3 * new_packet_spacing + old_packet_spacing) / 4;
2434       else
2435         priv->packet_spacing = new_packet_spacing;
2436
2437       GST_DEBUG_OBJECT (jitterbuffer,
2438           "new packet spacing %" GST_TIME_FORMAT
2439           " old packet spacing %" GST_TIME_FORMAT
2440           " combined to %" GST_TIME_FORMAT,
2441           GST_TIME_ARGS (new_packet_spacing),
2442           GST_TIME_ARGS (old_packet_spacing),
2443           GST_TIME_ARGS (priv->packet_spacing));
2444     }
2445     priv->ips_rtptime = rtptime;
2446     priv->ips_pts = pts;
2447   }
2448 }
2449
2450 static void
2451 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2452     guint16 seqnum, GstClockTime pts, gint gap)
2453 {
2454   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2455   GstClockTime duration, expected_pts, delay;
2456   TimerType type;
2457   gboolean equidistant = priv->equidistant > 0;
2458
2459   GST_DEBUG_OBJECT (jitterbuffer,
2460       "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2461       GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts));
2462
2463   if (pts == GST_CLOCK_TIME_NONE) {
2464     GST_WARNING_OBJECT (jitterbuffer, "Have no PTS");
2465     return;
2466   }
2467
2468   if (equidistant) {
2469     GstClockTime total_duration;
2470     /* the total duration spanned by the missing packets */
2471     if (pts >= priv->last_in_pts)
2472       total_duration = pts - priv->last_in_pts;
2473     else
2474       total_duration = 0;
2475
2476     /* interpolate between the current time and the last time based on
2477      * number of packets we are missing, this is the estimated duration
2478      * for the missing packet based on equidistant packet spacing. */
2479     duration = total_duration / (gap + 1);
2480
2481     GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2482         GST_TIME_ARGS (duration));
2483
2484     if (total_duration > priv->latency_ns) {
2485       GstClockTime gap_time;
2486       guint lost_packets;
2487
2488       if (duration > 0) {
2489         GstClockTime gap_dur = gap * duration;
2490         if (gap_dur > priv->latency_ns)
2491           gap_time = gap_dur - priv->latency_ns;
2492         else
2493           gap_time = 0;
2494         lost_packets = gap_time / duration;
2495       } else {
2496         gap_time = total_duration - priv->latency_ns;
2497         lost_packets = gap;
2498       }
2499
2500       /* too many lost packets, some of the missing packets are already
2501        * too late and we can generate lost packet events for them. */
2502       GST_INFO_OBJECT (jitterbuffer,
2503           "lost packets (%d, #%d->#%d) duration too large %" GST_TIME_FORMAT
2504           " > %" GST_TIME_FORMAT ", consider %u lost (%" GST_TIME_FORMAT ")",
2505           gap, expected, seqnum - 1, GST_TIME_ARGS (total_duration),
2506           GST_TIME_ARGS (priv->latency_ns), lost_packets,
2507           GST_TIME_ARGS (gap_time));
2508
2509       /* this timer will fire immediately and the lost event will be pushed from
2510        * the timer thread */
2511       if (lost_packets > 0) {
2512         add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2513             priv->last_in_pts + duration, 0, gap_time);
2514         expected += lost_packets;
2515         priv->last_in_pts += gap_time;
2516       }
2517     }
2518
2519     expected_pts = priv->last_in_pts + duration;
2520   } else {
2521     /* If we cannot assume equidistant packet spacing, the only thing we now
2522      * for sure is that the missing packets have expected pts not later than
2523      * the last received pts. */
2524     duration = 0;
2525     expected_pts = pts;
2526   }
2527
2528   delay = 0;
2529
2530   if (priv->do_retransmission) {
2531     TimerData *timer = find_timer (jitterbuffer, expected);
2532
2533     type = TIMER_TYPE_EXPECTED;
2534     delay = get_rtx_delay (priv);
2535
2536     /* if we had a timer for the first missing packet, update it. */
2537     if (timer && timer->type == TIMER_TYPE_EXPECTED) {
2538       GstClockTime timeout = timer->timeout;
2539
2540       timer->duration = duration;
2541       if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) {
2542         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts,
2543             delay, TRUE);
2544       }
2545       expected++;
2546       expected_pts += duration;
2547     }
2548   } else {
2549     type = TIMER_TYPE_LOST;
2550   }
2551
2552   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2553     add_timer (jitterbuffer, type, expected, 0, expected_pts, delay, duration);
2554     expected_pts += duration;
2555     expected++;
2556   }
2557 }
2558
2559 static void
2560 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2561     guint32 rtptime)
2562 {
2563   gint32 rtpdiff;
2564   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2565   GstRtpJitterBufferPrivate *priv;
2566
2567   priv = jitterbuffer->priv;
2568
2569   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2570     goto no_time;
2571
2572   if (priv->last_dts != -1)
2573     dtsdiff = dts - priv->last_dts;
2574   else
2575     dtsdiff = 0;
2576
2577   if (priv->last_rtptime != -1)
2578     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2579   else
2580     rtpdiff = 0;
2581
2582   /* Guess whether stream currently uses equidistant packet spacing. If we
2583    * often see identical timestamps it means the packets are not
2584    * equidistant. */
2585   if (rtptime == priv->last_rtptime)
2586     priv->equidistant -= 2;
2587   else
2588     priv->equidistant += 1;
2589   priv->equidistant = CLAMP (priv->equidistant, -7, 7);
2590
2591   priv->last_dts = dts;
2592   priv->last_rtptime = rtptime;
2593
2594   if (rtpdiff > 0)
2595     rtpdiffns =
2596         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2597   else
2598     rtpdiffns =
2599         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2600
2601   diff = ABS (dtsdiff - rtpdiffns);
2602
2603   /* jitter is stored in nanoseconds */
2604   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2605
2606   GST_LOG_OBJECT (jitterbuffer,
2607       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2608       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2609       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2610       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2611
2612   return;
2613
2614   /* ERRORS */
2615 no_time:
2616   {
2617     GST_DEBUG_OBJECT (jitterbuffer,
2618         "no dts or no clock-rate, can't calculate jitter");
2619     return;
2620   }
2621 }
2622
2623 static gint
2624 compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
2625 {
2626   GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
2627   GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
2628   guint seq_a, seq_b;
2629
2630   gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
2631   seq_a = gst_rtp_buffer_get_seq (&rtp_a);
2632   gst_rtp_buffer_unmap (&rtp_a);
2633
2634   gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
2635   seq_b = gst_rtp_buffer_get_seq (&rtp_b);
2636   gst_rtp_buffer_unmap (&rtp_b);
2637
2638   return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
2639 }
2640
2641 static gboolean
2642 handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, GstBuffer * buffer,
2643     guint8 pt, guint16 seqnum, gint gap, guint max_dropout, guint max_misorder)
2644 {
2645   GstRtpJitterBufferPrivate *priv;
2646   guint gap_packets_length;
2647   gboolean reset = FALSE;
2648   gboolean future = gap > 0;
2649
2650   priv = jitterbuffer->priv;
2651
2652   if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
2653     GList *l;
2654     guint32 prev_gap_seq = -1;
2655     gboolean all_consecutive = TRUE;
2656
2657     g_queue_insert_sorted (&priv->gap_packets, buffer,
2658         (GCompareDataFunc) compare_buffer_seqnum, NULL);
2659
2660     for (l = priv->gap_packets.head; l; l = l->next) {
2661       GstBuffer *gap_buffer = l->data;
2662       GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2663       guint32 gap_seq;
2664
2665       gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2666
2667       all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
2668
2669       gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
2670       if (prev_gap_seq == -1)
2671         prev_gap_seq = gap_seq;
2672       else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
2673         all_consecutive = FALSE;
2674       else
2675         prev_gap_seq = gap_seq;
2676
2677       gst_rtp_buffer_unmap (&gap_rtp);
2678       if (!all_consecutive)
2679         break;
2680     }
2681
2682     if (all_consecutive && gap_packets_length > 3) {
2683       GST_DEBUG_OBJECT (jitterbuffer,
2684           "buffer too %s %d < %d, got 5 consecutive ones - reset",
2685           (future ? "new" : "old"), gap,
2686           (future ? max_dropout : -max_misorder));
2687       reset = TRUE;
2688     } else if (!all_consecutive) {
2689       g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2690       g_queue_clear (&priv->gap_packets);
2691       GST_DEBUG_OBJECT (jitterbuffer,
2692           "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
2693           (future ? "new" : "old"), gap,
2694           (future ? max_dropout : -max_misorder));
2695       buffer = NULL;
2696     } else {
2697       GST_DEBUG_OBJECT (jitterbuffer,
2698           "buffer too %s %d < %d, got %u consecutive ones - waiting",
2699           (future ? "new" : "old"), gap,
2700           (future ? max_dropout : -max_misorder), gap_packets_length + 1);
2701       buffer = NULL;
2702     }
2703   } else {
2704     GST_DEBUG_OBJECT (jitterbuffer,
2705         "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
2706         gap, -max_misorder);
2707     g_queue_push_tail (&priv->gap_packets, buffer);
2708     buffer = NULL;
2709   }
2710
2711   return reset;
2712 }
2713
2714 static GstClockTime
2715 get_current_running_time (GstRtpJitterBuffer * jitterbuffer)
2716 {
2717   GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer));
2718   GstClockTime running_time = GST_CLOCK_TIME_NONE;
2719
2720   if (clock) {
2721     GstClockTime base_time =
2722         gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer));
2723     GstClockTime clock_time = gst_clock_get_time (clock);
2724
2725     if (clock_time > base_time)
2726       running_time = clock_time - base_time;
2727     else
2728       running_time = 0;
2729
2730     gst_object_unref (clock);
2731   }
2732
2733   return running_time;
2734 }
2735
2736 static GstFlowReturn
2737 gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
2738     GstPad * pad, GstObject * parent, guint16 seqnum)
2739 {
2740   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2741   GstFlowReturn ret = GST_FLOW_OK;
2742   GList *events = NULL, *l;
2743   GList *buffers;
2744   gboolean head;
2745
2746   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2747   rtp_jitter_buffer_flush (priv->jbuf,
2748       (GFunc) free_item_and_retain_events, &events);
2749   rtp_jitter_buffer_reset_skew (priv->jbuf);
2750   remove_all_timers (jitterbuffer);
2751   priv->discont = TRUE;
2752   priv->last_popped_seqnum = -1;
2753
2754   if (priv->gap_packets.head) {
2755     GstBuffer *gap_buffer = priv->gap_packets.head->data;
2756     GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2757
2758     gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2759     priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp);
2760     gst_rtp_buffer_unmap (&gap_rtp);
2761   } else {
2762     priv->next_seqnum = seqnum;
2763   }
2764
2765   priv->last_in_pts = -1;
2766   priv->next_in_seqnum = -1;
2767
2768   /* Insert all sticky events again in order, otherwise we would
2769    * potentially loose STREAM_START, CAPS or SEGMENT events
2770    */
2771   events = g_list_reverse (events);
2772   for (l = events; l; l = l->next) {
2773     RTPJitterBufferItem *item;
2774
2775     item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2776     rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2777   }
2778   g_list_free (events);
2779
2780   JBUF_SIGNAL_EVENT (priv);
2781
2782   /* reset spacing estimation when gap */
2783   priv->ips_rtptime = -1;
2784   priv->ips_pts = GST_CLOCK_TIME_NONE;
2785
2786   buffers = g_list_copy (priv->gap_packets.head);
2787   g_queue_clear (&priv->gap_packets);
2788
2789   priv->ips_rtptime = -1;
2790   priv->ips_pts = GST_CLOCK_TIME_NONE;
2791   JBUF_UNLOCK (jitterbuffer->priv);
2792
2793   for (l = buffers; l; l = l->next) {
2794     ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
2795     l->data = NULL;
2796     if (ret != GST_FLOW_OK) {
2797       l = l->next;
2798       break;
2799     }
2800   }
2801   for (; l; l = l->next)
2802     gst_buffer_unref (l->data);
2803   g_list_free (buffers);
2804
2805   return ret;
2806 }
2807
2808 static gboolean
2809 gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer)
2810 {
2811   GstRtpJitterBufferPrivate *priv;
2812   RTPJitterBufferItem *item;
2813   TimerData *timer;
2814
2815   priv = jitterbuffer->priv;
2816
2817   if (priv->faststart_min_packets == 0)
2818     return FALSE;
2819
2820   item = rtp_jitter_buffer_peek (priv->jbuf);
2821   if (!item)
2822     return FALSE;
2823
2824   timer = find_timer (jitterbuffer, item->seqnum);
2825   if (!timer || timer->type != TIMER_TYPE_DEADLINE)
2826     return FALSE;
2827
2828   if (rtp_jitter_buffer_can_fast_start (priv->jbuf,
2829           priv->faststart_min_packets)) {
2830     GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
2831         priv->faststart_min_packets);
2832     timer->timeout = -1;
2833     return TRUE;
2834   }
2835
2836   return FALSE;
2837 }
2838
2839 static GstFlowReturn
2840 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2841     GstBuffer * buffer)
2842 {
2843   GstRtpJitterBuffer *jitterbuffer;
2844   GstRtpJitterBufferPrivate *priv;
2845   guint16 seqnum;
2846   guint32 expected, rtptime;
2847   GstFlowReturn ret = GST_FLOW_OK;
2848   GstClockTime dts, pts;
2849   guint64 latency_ts;
2850   gboolean head;
2851   gint percent = -1;
2852   guint8 pt;
2853   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2854   gboolean do_next_seqnum = FALSE;
2855   RTPJitterBufferItem *item;
2856   GstMessage *msg = NULL;
2857   gboolean estimated_dts = FALSE;
2858   gint32 packet_rate, max_dropout, max_misorder;
2859   TimerData *timer = NULL;
2860
2861   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2862
2863   priv = jitterbuffer->priv;
2864
2865   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2866     goto invalid_buffer;
2867
2868   pt = gst_rtp_buffer_get_payload_type (&rtp);
2869   seqnum = gst_rtp_buffer_get_seq (&rtp);
2870   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2871   gst_rtp_buffer_unmap (&rtp);
2872
2873   /* make sure we have PTS and DTS set */
2874   pts = GST_BUFFER_PTS (buffer);
2875   dts = GST_BUFFER_DTS (buffer);
2876   if (dts == -1)
2877     dts = pts;
2878   else if (pts == -1)
2879     pts = dts;
2880
2881   if (dts == -1) {
2882     /* If we have no DTS here, i.e. no capture time, get one from the
2883      * clock now to have something to calculate with in the future. */
2884     dts = get_current_running_time (jitterbuffer);
2885     pts = dts;
2886
2887     /* Remember that we estimated the DTS if we are running already
2888      * and this is not our first packet (or first packet after a reset).
2889      * If it's the first packet, we somehow must generate a timestamp for
2890      * everything, otherwise we can't calculate any times
2891      */
2892     estimated_dts = (priv->next_in_seqnum != -1);
2893   } else {
2894     /* take the DTS of the buffer. This is the time when the packet was
2895      * received and is used to calculate jitter and clock skew. We will adjust
2896      * this DTS with the smoothed value after processing it in the
2897      * jitterbuffer and assign it as the PTS. */
2898     /* bring to running time */
2899     dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2900   }
2901
2902   GST_DEBUG_OBJECT (jitterbuffer,
2903       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
2904       seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer),
2905       GST_BUFFER_IS_RETRANSMISSION (buffer));
2906
2907   JBUF_LOCK_CHECK (priv, out_flushing);
2908
2909   if (G_UNLIKELY (priv->last_pt != pt)) {
2910     GstCaps *caps;
2911
2912     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2913         pt);
2914
2915     priv->last_pt = pt;
2916     /* reset clock-rate so that we get a new one */
2917     priv->clock_rate = -1;
2918
2919     /* Try to get the clock-rate from the caps first if we can. If there are no
2920      * caps we must fire the signal to get the clock-rate. */
2921     if ((caps = gst_pad_get_current_caps (pad))) {
2922       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
2923       gst_caps_unref (caps);
2924     }
2925   }
2926
2927   if (G_UNLIKELY (priv->clock_rate == -1)) {
2928     /* no clock rate given on the caps, try to get one with the signal */
2929     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2930             pt) == GST_FLOW_FLUSHING)
2931       goto out_flushing;
2932
2933     if (G_UNLIKELY (priv->clock_rate == -1))
2934       goto no_clock_rate;
2935
2936     gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
2937   }
2938
2939   /* don't accept more data on EOS */
2940   if (G_UNLIKELY (priv->eos))
2941     goto have_eos;
2942
2943   if (!GST_BUFFER_IS_RETRANSMISSION (buffer))
2944     calculate_jitter (jitterbuffer, dts, rtptime);
2945
2946   if (priv->seqnum_base != -1) {
2947     gint gap;
2948
2949     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2950
2951     if (gap < 0) {
2952       GST_DEBUG_OBJECT (jitterbuffer,
2953           "packet seqnum #%d before seqnum-base #%d", seqnum,
2954           priv->seqnum_base);
2955       gst_buffer_unref (buffer);
2956       goto finished;
2957     } else if (gap > 16384) {
2958       /* From now on don't compare against the seqnum base anymore as
2959        * at some point in the future we will wrap around and also that
2960        * much reordering is very unlikely */
2961       priv->seqnum_base = -1;
2962     }
2963   }
2964
2965   expected = priv->next_in_seqnum;
2966
2967   packet_rate =
2968       gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx, seqnum, rtptime);
2969   max_dropout =
2970       gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
2971       priv->max_dropout_time);
2972   max_misorder =
2973       gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
2974       priv->max_misorder_time);
2975   GST_TRACE_OBJECT (jitterbuffer,
2976       "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
2977       max_dropout, max_misorder);
2978
2979   /* now check against our expected seqnum */
2980   if (G_UNLIKELY (expected == -1)) {
2981     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2982
2983     /* calculate a pts based on rtptime and arrival time (dts) */
2984     pts =
2985         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
2986         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
2987
2988     /* we don't know what the next_in_seqnum should be, wait for the last
2989      * possible moment to push this buffer, maybe we get an earlier seqnum
2990      * while we wait */
2991     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts);
2992
2993     do_next_seqnum = TRUE;
2994     /* take rtptime and pts to calculate packet spacing */
2995     priv->ips_rtptime = rtptime;
2996     priv->ips_pts = pts;
2997
2998   } else {
2999     gint gap;
3000     /* now calculate gap */
3001     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
3002     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
3003         expected, seqnum, gap);
3004
3005     if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) {
3006       /* If we have timers for more than RTP_MAX_DROPOUT packets
3007        * pending this means that we have a huge gap overall. We can
3008        * reset the jitterbuffer at this point because there's
3009        * just too much data missing to be able to do anything
3010        * sensible with the past data. Just try again from the
3011        * next packet */
3012       GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
3013           priv->timers->len, max_dropout);
3014       gst_buffer_unref (buffer);
3015       return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3016     }
3017
3018     /* Special handling of large gaps */
3019     if ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout)) {
3020       gboolean reset = handle_big_gap_buffer (jitterbuffer, buffer, pt, seqnum,
3021           gap, max_dropout, max_misorder);
3022       if (reset) {
3023         return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3024       } else {
3025         GST_DEBUG_OBJECT (jitterbuffer,
3026             "Had big gap, waiting for more consecutive packets");
3027         goto finished;
3028       }
3029     }
3030
3031     /* We had no huge gap, let's drop all the gap packets */
3032     GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
3033     g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3034     g_queue_clear (&priv->gap_packets);
3035
3036     /* calculate a pts based on rtptime and arrival time (dts) */
3037     /* If we estimated the DTS, don't consider it in the clock skew calculations */
3038     pts =
3039         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3040         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
3041
3042     if (G_LIKELY (gap == 0)) {
3043       /* packet is expected */
3044       calculate_packet_spacing (jitterbuffer, rtptime, pts);
3045       do_next_seqnum = TRUE;
3046     } else {
3047
3048       /* we have a gap */
3049       if (gap > 0) {
3050         GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
3051         /* fill in the gap with EXPECTED timers */
3052         calculate_expected (jitterbuffer, expected, seqnum, pts, gap);
3053         do_next_seqnum = TRUE;
3054       } else {
3055         GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
3056         do_next_seqnum = FALSE;
3057       }
3058
3059       /* reset spacing estimation when gap */
3060       priv->ips_rtptime = -1;
3061       priv->ips_pts = GST_CLOCK_TIME_NONE;
3062     }
3063   }
3064
3065   if (do_next_seqnum) {
3066     priv->last_in_pts = pts;
3067     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
3068   }
3069
3070   timer = find_timer (jitterbuffer, seqnum);
3071   if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
3072     if (!timer)
3073       timer = timer_queue_find (priv->rtx_stats_timers, seqnum);
3074     if (timer)
3075       timer->num_rtx_received++;
3076   }
3077
3078   /* let's check if this buffer is too late, we can only accept packets with
3079    * bigger seqnum than the one we last pushed. */
3080   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
3081     gint gap;
3082
3083     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
3084
3085     /* priv->last_popped_seqnum >= seqnum, we're too late. */
3086     if (G_UNLIKELY (gap <= 0)) {
3087       if (priv->do_retransmission) {
3088         if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) {
3089           update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3090           /* Only count the retranmitted packet too late if it has been
3091            * considered lost. If the original packet arrived before the
3092            * retransmitted we just count it as a duplicate. */
3093           if (timer->type != TIMER_TYPE_LOST)
3094             goto rtx_duplicate;
3095         }
3096       }
3097       goto too_late;
3098     }
3099   }
3100
3101   if (already_lost (jitterbuffer, seqnum))
3102     goto already_lost;
3103
3104   /* let's drop oldest packet if the queue is already full and drop-on-latency
3105    * is set. We can only do this when there actually is a latency. When no
3106    * latency is set, we just pump it in the queue and let the other end push it
3107    * out as fast as possible. */
3108   if (priv->latency_ms && priv->drop_on_latency) {
3109     latency_ts =
3110         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
3111
3112     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
3113       RTPJitterBufferItem *old_item;
3114
3115       old_item = rtp_jitter_buffer_peek (priv->jbuf);
3116
3117       if (IS_DROPABLE (old_item)) {
3118         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3119         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
3120             old_item);
3121         priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
3122         free_item (old_item);
3123       }
3124       /* we might have removed some head buffers, signal the pushing thread to
3125        * see if it can push now */
3126       JBUF_SIGNAL_EVENT (priv);
3127     }
3128   }
3129
3130   /* If we estimated the DTS, don't consider it in the clock skew calculations
3131    * later. The code above always sets dts to pts or the other way around if
3132    * any of those is valid in the buffer, so we know that if we estimated the
3133    * dts that both are unknown */
3134   if (estimated_dts)
3135     item =
3136         alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE,
3137         pts, seqnum, 1, rtptime);
3138   else
3139     item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
3140
3141   /* now insert the packet into the queue in sorted order. This function returns
3142    * FALSE if a packet with the same seqnum was already in the queue, meaning we
3143    * have a duplicate. */
3144   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, &head,
3145               &percent))) {
3146     if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer)
3147       update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3148     goto duplicate;
3149   }
3150
3151   /* Trigger fast start if needed */
3152   if (gst_rtp_jitter_buffer_fast_start (jitterbuffer))
3153     head = TRUE;
3154
3155   /* update timers */
3156   update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum,
3157       GST_BUFFER_IS_RETRANSMISSION (buffer), timer);
3158
3159   /* we had an unhandled SR, handle it now */
3160   if (priv->last_sr)
3161     do_handle_sync (jitterbuffer);
3162
3163   if (G_UNLIKELY (head)) {
3164     /* signal addition of new buffer when the _loop is waiting. */
3165     if (G_LIKELY (priv->active))
3166       JBUF_SIGNAL_EVENT (priv);
3167
3168     /* let's unschedule and unblock any waiting buffers. We only want to do this
3169      * when the head buffer changed */
3170     if (G_UNLIKELY (priv->clock_id)) {
3171       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
3172       unschedule_current_timer (jitterbuffer);
3173     }
3174   }
3175
3176   GST_DEBUG_OBJECT (jitterbuffer,
3177       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
3178       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
3179
3180   msg = check_buffering_percent (jitterbuffer, percent);
3181
3182 finished:
3183   JBUF_UNLOCK (priv);
3184
3185   if (msg)
3186     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3187
3188   return ret;
3189
3190   /* ERRORS */
3191 invalid_buffer:
3192   {
3193     /* this is not fatal but should be filtered earlier */
3194     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3195         ("Received invalid RTP payload, dropping"));
3196     gst_buffer_unref (buffer);
3197     return GST_FLOW_OK;
3198   }
3199 no_clock_rate:
3200   {
3201     GST_WARNING_OBJECT (jitterbuffer,
3202         "No clock-rate in caps!, dropping buffer");
3203     gst_buffer_unref (buffer);
3204     goto finished;
3205   }
3206 out_flushing:
3207   {
3208     ret = priv->srcresult;
3209     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
3210     gst_buffer_unref (buffer);
3211     goto finished;
3212   }
3213 have_eos:
3214   {
3215     ret = GST_FLOW_EOS;
3216     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
3217     gst_buffer_unref (buffer);
3218     goto finished;
3219   }
3220 too_late:
3221   {
3222     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
3223         " popped, dropping", seqnum, priv->last_popped_seqnum);
3224     priv->num_late++;
3225     gst_buffer_unref (buffer);
3226     goto finished;
3227   }
3228 already_lost:
3229   {
3230     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as it was already "
3231         "considered lost", seqnum);
3232     priv->num_late++;
3233     gst_buffer_unref (buffer);
3234     goto finished;
3235   }
3236 duplicate:
3237   {
3238     GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
3239         seqnum);
3240     priv->num_duplicates++;
3241     free_item (item);
3242     goto finished;
3243   }
3244 rtx_duplicate:
3245   {
3246     GST_DEBUG_OBJECT (jitterbuffer,
3247         "Duplicate RTX packet #%d detected, dropping", seqnum);
3248     priv->num_duplicates++;
3249     gst_buffer_unref (buffer);
3250     goto finished;
3251   }
3252 }
3253
3254 static GstClockTime
3255 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
3256 {
3257   guint64 ext_time, elapsed;
3258   guint32 rtp_time;
3259   GstRtpJitterBufferPrivate *priv;
3260
3261   priv = jitterbuffer->priv;
3262   rtp_time = item->rtptime;
3263
3264   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
3265       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
3266
3267   ext_time = priv->ext_timestamp;
3268   ext_time = gst_rtp_buffer_ext_timestamp (&ext_time, rtp_time);
3269   if (ext_time < priv->ext_timestamp) {
3270     ext_time = priv->ext_timestamp;
3271   } else {
3272     priv->ext_timestamp = ext_time;
3273   }
3274
3275   if (ext_time > priv->clock_base)
3276     elapsed = ext_time - priv->clock_base;
3277   else
3278     elapsed = 0;
3279
3280   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
3281   return elapsed;
3282 }
3283
3284 static void
3285 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
3286     RTPJitterBufferItem * item)
3287 {
3288   guint64 total, elapsed, left, estimated;
3289   GstClockTime out_time;
3290   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3291
3292   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
3293       || priv->clock_base == -1 || priv->clock_rate <= 0)
3294     return;
3295
3296   /* compute the elapsed time */
3297   elapsed = compute_elapsed (jitterbuffer, item);
3298
3299   /* do nothing if elapsed time doesn't increment */
3300   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
3301     return;
3302
3303   priv->last_elapsed = elapsed;
3304
3305   /* this is the total time we need to play */
3306   total = priv->npt_stop - priv->npt_start;
3307   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
3308       GST_TIME_ARGS (total));
3309
3310   /* this is how much time there is left */
3311   if (total > elapsed)
3312     left = total - elapsed;
3313   else
3314     left = 0;
3315
3316   /* if we have less time left that the size of the buffer, we will not
3317    * be able to keep it filled, disabled buffering then */
3318   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
3319     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
3320         ", disable buffering close to EOS", GST_TIME_ARGS (left));
3321     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
3322   }
3323
3324   /* this is the current time as running-time */
3325   out_time = item->pts;
3326
3327   if (elapsed > 0)
3328     estimated = gst_util_uint64_scale (out_time, total, elapsed);
3329   else {
3330     /* if there is almost nothing left,
3331      * we may never advance enough to end up in the above case */
3332     if (total < GST_SECOND)
3333       estimated = GST_SECOND;
3334     else
3335       estimated = -1;
3336   }
3337   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
3338       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
3339
3340   if (estimated != -1 && priv->estimated_eos != estimated) {
3341     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
3342     priv->estimated_eos = estimated;
3343   }
3344 }
3345
3346 /* take a buffer from the queue and push it */
3347 static GstFlowReturn
3348 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
3349 {
3350   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3351   GstFlowReturn result = GST_FLOW_OK;
3352   RTPJitterBufferItem *item;
3353   GstBuffer *outbuf = NULL;
3354   GstEvent *outevent = NULL;
3355   GstQuery *outquery = NULL;
3356   GstClockTime dts, pts;
3357   gint percent = -1;
3358   gboolean do_push = TRUE;
3359   guint type;
3360   GstMessage *msg;
3361
3362   /* when we get here we are ready to pop and push the buffer */
3363   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3364   type = item->type;
3365
3366   switch (type) {
3367     case ITEM_TYPE_BUFFER:
3368
3369       /* we need to make writable to change the flags and timestamps */
3370       outbuf = gst_buffer_make_writable (item->data);
3371
3372       if (G_UNLIKELY (priv->discont)) {
3373         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
3374          * into the jitterbuffer so we can modify now. */
3375         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
3376         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
3377         priv->discont = FALSE;
3378       }
3379       if (G_UNLIKELY (priv->ts_discont)) {
3380         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
3381         priv->ts_discont = FALSE;
3382       }
3383
3384       dts =
3385           gst_segment_position_from_running_time (&priv->segment,
3386           GST_FORMAT_TIME, item->dts);
3387       pts =
3388           gst_segment_position_from_running_time (&priv->segment,
3389           GST_FORMAT_TIME, item->pts);
3390
3391       /* apply timestamp with offset to buffer now */
3392       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
3393       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
3394
3395       /* update the elapsed time when we need to check against the npt stop time. */
3396       update_estimated_eos (jitterbuffer, item);
3397
3398       priv->last_out_time = GST_BUFFER_PTS (outbuf);
3399       break;
3400     case ITEM_TYPE_LOST:
3401       priv->discont = TRUE;
3402       if (!priv->do_lost)
3403         do_push = FALSE;
3404       /* FALLTHROUGH */
3405     case ITEM_TYPE_EVENT:
3406       outevent = item->data;
3407       break;
3408     case ITEM_TYPE_QUERY:
3409       outquery = item->data;
3410       break;
3411   }
3412
3413   /* now we are ready to push the buffer. Save the seqnum and release the lock
3414    * so the other end can push stuff in the queue again. */
3415   if (seqnum != -1) {
3416     priv->last_popped_seqnum = seqnum;
3417     priv->next_seqnum = (seqnum + item->count) & 0xffff;
3418   }
3419   msg = check_buffering_percent (jitterbuffer, percent);
3420   JBUF_UNLOCK (priv);
3421
3422   item->data = NULL;
3423   free_item (item);
3424
3425   if (msg)
3426     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3427
3428   switch (type) {
3429     case ITEM_TYPE_BUFFER:
3430       /* push buffer */
3431       GST_DEBUG_OBJECT (jitterbuffer,
3432           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
3433           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
3434           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
3435       priv->num_pushed++;
3436       result = gst_pad_push (priv->srcpad, outbuf);
3437
3438       JBUF_LOCK_CHECK (priv, out_flushing);
3439       break;
3440     case ITEM_TYPE_LOST:
3441     case ITEM_TYPE_EVENT:
3442       /* We got not enough consecutive packets with a huge gap, we can
3443        * as well just drop them here now on EOS */
3444       if (outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3445         GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
3446         g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3447         g_queue_clear (&priv->gap_packets);
3448       }
3449
3450       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
3451           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
3452
3453       if (do_push)
3454         gst_pad_push_event (priv->srcpad, outevent);
3455       else if (outevent)
3456         gst_event_unref (outevent);
3457
3458       result = GST_FLOW_OK;
3459
3460       JBUF_LOCK_CHECK (priv, out_flushing);
3461       break;
3462     case ITEM_TYPE_QUERY:
3463     {
3464       gboolean res;
3465
3466       res = gst_pad_peer_query (priv->srcpad, outquery);
3467
3468       JBUF_LOCK_CHECK (priv, out_flushing);
3469       result = GST_FLOW_OK;
3470       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
3471       JBUF_SIGNAL_QUERY (priv, res);
3472       break;
3473     }
3474   }
3475   return result;
3476
3477   /* ERRORS */
3478 out_flushing:
3479   {
3480     return priv->srcresult;
3481   }
3482 }
3483
3484 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
3485
3486 /* Peek a buffer and compare the seqnum to the expected seqnum.
3487  * If all is fine, the buffer is pushed.
3488  * If something is wrong, we wait for some event
3489  */
3490 static GstFlowReturn
3491 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
3492 {
3493   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3494   GstFlowReturn result;
3495   RTPJitterBufferItem *item;
3496   guint seqnum;
3497   guint32 next_seqnum;
3498
3499   /* only push buffers when PLAYING and active and not buffering */
3500   if (priv->blocked || !priv->active ||
3501       rtp_jitter_buffer_is_buffering (priv->jbuf)) {
3502     return GST_FLOW_WAIT;
3503   }
3504
3505   /* peek a buffer, we're just looking at the sequence number.
3506    * If all is fine, we'll pop and push it. If the sequence number is wrong we
3507    * wait for a timeout or something to change.
3508    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
3509   item = rtp_jitter_buffer_peek (priv->jbuf);
3510   if (item == NULL) {
3511     goto wait;
3512   }
3513
3514   /* get the seqnum and the next expected seqnum */
3515   seqnum = item->seqnum;
3516   if (seqnum == -1) {
3517     return pop_and_push_next (jitterbuffer, seqnum);
3518   }
3519
3520   next_seqnum = priv->next_seqnum;
3521
3522   /* get the gap between this and the previous packet. If we don't know the
3523    * previous packet seqnum assume no gap. */
3524   if (G_UNLIKELY (next_seqnum == -1)) {
3525     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3526     /* we don't know what the next_seqnum should be, the chain function should
3527      * have scheduled a DEADLINE timer that will increment next_seqnum when it
3528      * fires, so wait for that */
3529     result = GST_FLOW_WAIT;
3530   } else {
3531     gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
3532
3533     if (G_LIKELY (gap == 0)) {
3534       /* no missing packet, pop and push */
3535       result = pop_and_push_next (jitterbuffer, seqnum);
3536     } else if (G_UNLIKELY (gap < 0)) {
3537       /* if we have a packet that we already pushed or considered dropped, pop it
3538        * off and get the next packet */
3539       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
3540           seqnum, next_seqnum);
3541       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
3542       free_item (item);
3543       result = GST_FLOW_OK;
3544     } else {
3545       /* the chain function has scheduled timers to request retransmission or
3546        * when to consider the packet lost, wait for that */
3547       GST_DEBUG_OBJECT (jitterbuffer,
3548           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
3549           next_seqnum, seqnum, gap);
3550       result = GST_FLOW_WAIT;
3551     }
3552   }
3553
3554   return result;
3555
3556 wait:
3557   {
3558     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
3559     if (priv->eos) {
3560       return GST_FLOW_EOS;
3561     } else {
3562       return GST_FLOW_WAIT;
3563     }
3564   }
3565 }
3566
3567 static GstClockTime
3568 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
3569 {
3570   GstClockTime rtx_retry_timeout;
3571   GstClockTime rtx_min_retry_timeout;
3572
3573   if (priv->rtx_retry_timeout == -1) {
3574     if (priv->avg_rtx_rtt == 0)
3575       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
3576     else
3577       /* we want to ask for a retransmission after we waited for a
3578        * complete RTT and the additional jitter */
3579       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
3580   } else {
3581     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
3582   }
3583   /* make sure we don't retry too often. On very low latency networks,
3584    * the RTT and jitter can be very low. */
3585   if (priv->rtx_min_retry_timeout == -1) {
3586     rtx_min_retry_timeout = priv->packet_spacing;
3587   } else {
3588     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
3589   }
3590   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
3591
3592   return rtx_retry_timeout;
3593 }
3594
3595 static GstClockTime
3596 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
3597     GstClockTime rtx_retry_timeout)
3598 {
3599   GstClockTime rtx_retry_period;
3600
3601   if (priv->rtx_retry_period == -1) {
3602     /* we retry up to the configured jitterbuffer size but leaving some
3603      * room for the retransmission to arrive in time */
3604     if (rtx_retry_timeout > priv->latency_ns) {
3605       rtx_retry_period = 0;
3606     } else {
3607       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
3608     }
3609   } else {
3610     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
3611   }
3612   return rtx_retry_period;
3613 }
3614
3615 /*
3616   1. For *larger* rtx-rtt, weigh a new measurement as before (1/8th)
3617   2. For *smaller* rtx-rtt, be a bit more conservative and weigh a bit less (1/16th)
3618   3. For very large measurements (> avg * 2), consider them "outliers"
3619      and count them a lot less (1/48th)
3620 */
3621 static void
3622 update_avg_rtx_rtt (GstRtpJitterBufferPrivate * priv, GstClockTime rtt)
3623 {
3624   gint weight;
3625
3626   if (priv->avg_rtx_rtt == 0) {
3627     priv->avg_rtx_rtt = rtt;
3628     return;
3629   }
3630
3631   if (rtt > 2 * priv->avg_rtx_rtt)
3632     weight = 48;
3633   else if (rtt > priv->avg_rtx_rtt)
3634     weight = 8;
3635   else
3636     weight = 16;
3637
3638   priv->avg_rtx_rtt = (rtt + (weight - 1) * priv->avg_rtx_rtt) / weight;
3639 }
3640
3641 static void
3642 update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3643     GstClockTime dts, gboolean success)
3644 {
3645   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3646   GstClockTime delay;
3647
3648   if (success) {
3649     /* we scheduled a retry for this packet and now we have it */
3650     priv->num_rtx_success++;
3651     /* all the previous retry attempts failed */
3652     priv->num_rtx_failed += timer->num_rtx_retry - 1;
3653   } else {
3654     /* All retries failed or was too late */
3655     priv->num_rtx_failed += timer->num_rtx_retry;
3656   }
3657
3658   /* number of retries before (hopefully) receiving the packet */
3659   if (priv->avg_rtx_num == 0.0)
3660     priv->avg_rtx_num = timer->num_rtx_retry;
3661   else
3662     priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
3663
3664   /* Calculate the delay between retransmission request and receiving this
3665    * packet. We have a valid delay if and only if this packet is a response to
3666    * our last request. If not we don't know if this is a response to an
3667    * earlier request and delay could be way off. For RTT is more important
3668    * with correct values than to update for every packet. */
3669   if (timer->num_rtx_retry == timer->num_rtx_received &&
3670       dts != GST_CLOCK_TIME_NONE && dts > timer->rtx_last) {
3671     delay = dts - timer->rtx_last;
3672     update_avg_rtx_rtt (priv, delay);
3673   } else {
3674     delay = 0;
3675   }
3676
3677   GST_LOG_OBJECT (jitterbuffer,
3678       "RTX #%d, result %d, success %" G_GUINT64_FORMAT ", failed %"
3679       G_GUINT64_FORMAT ", requests %" G_GUINT64_FORMAT ", dups %"
3680       G_GUINT64_FORMAT ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %"
3681       GST_TIME_FORMAT, timer->seqnum, success, priv->num_rtx_success,
3682       priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
3683       priv->avg_rtx_num, GST_TIME_ARGS (delay),
3684       GST_TIME_ARGS (priv->avg_rtx_rtt));
3685 }
3686
3687 /* the timeout for when we expected a packet expired */
3688 static gboolean
3689 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3690     GstClockTime now)
3691 {
3692   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3693   GstEvent *event;
3694   guint delay, delay_ms, avg_rtx_rtt_ms;
3695   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
3696   guint rtx_deadline_ms;
3697   GstClockTime rtx_retry_period;
3698   GstClockTime rtx_retry_timeout;
3699   GstClock *clock;
3700
3701   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
3702       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
3703
3704   rtx_retry_timeout = get_rtx_retry_timeout (priv);
3705   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
3706
3707   delay = timer->rtx_delay + timer->rtx_retry;
3708
3709   delay_ms = GST_TIME_AS_MSECONDS (delay);
3710   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
3711   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
3712   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
3713   rtx_deadline_ms =
3714       priv->rtx_deadline_ms != -1 ? priv->rtx_deadline_ms : priv->latency_ms;
3715
3716   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
3717       gst_structure_new ("GstRTPRetransmissionRequest",
3718           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
3719           "running-time", G_TYPE_UINT64, timer->rtx_base,
3720           "delay", G_TYPE_UINT, delay_ms,
3721           "retry", G_TYPE_UINT, timer->num_rtx_retry,
3722           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
3723           "period", G_TYPE_UINT, rtx_retry_period_ms,
3724           "deadline", G_TYPE_UINT, rtx_deadline_ms,
3725           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
3726           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
3727   GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
3728
3729   priv->num_rtx_requests++;
3730   timer->num_rtx_retry++;
3731
3732   GST_OBJECT_LOCK (jitterbuffer);
3733   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
3734     timer->rtx_last = gst_clock_get_time (clock);
3735     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
3736   } else {
3737     timer->rtx_last = now;
3738   }
3739   GST_OBJECT_UNLOCK (jitterbuffer);
3740
3741   /* calculate the timeout for the next retransmission attempt */
3742   timer->rtx_retry += rtx_retry_timeout;
3743   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
3744       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
3745       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
3746       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
3747   if ((priv->rtx_max_retries != -1
3748           && timer->num_rtx_retry >= priv->rtx_max_retries)
3749       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)
3750       || (timer->rtx_base + rtx_retry_period < now)) {
3751     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
3752     /* too many retransmission request, we now convert the timer
3753      * to a lost timer, leave the num_rtx_retry as it is for stats */
3754     timer->type = TIMER_TYPE_LOST;
3755     timer->rtx_delay = 0;
3756     timer->rtx_retry = 0;
3757   }
3758   reschedule_timer (jitterbuffer, timer, timer->seqnum,
3759       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
3760
3761   JBUF_UNLOCK (priv);
3762   gst_pad_push_event (priv->sinkpad, event);
3763   JBUF_LOCK (priv);
3764
3765   return FALSE;
3766 }
3767
3768 /* a packet is lost */
3769 static gboolean
3770 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3771     GstClockTime now)
3772 {
3773   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3774   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
3775   gboolean head;
3776   GstEvent *event = NULL;
3777   RTPJitterBufferItem *item;
3778
3779   seqnum = timer->seqnum;
3780   lost_packets = MAX (timer->num, 1);
3781   num_rtx_retry = timer->num_rtx_retry;
3782
3783   /* we had a gap and thus we lost some packets. Create an event for this.  */
3784   if (lost_packets > 1)
3785     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
3786         seqnum + lost_packets - 1);
3787   else
3788     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
3789
3790   priv->num_lost += lost_packets;
3791   priv->num_rtx_failed += num_rtx_retry;
3792
3793   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
3794
3795   /* we now only accept seqnum bigger than this */
3796   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
3797     priv->next_in_seqnum = next_in_seqnum;
3798     priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout);
3799   }
3800
3801   /* Avoid creating events if we don't need it. Note that we still need to create
3802    * the lost *ITEM* since it will be used to notify the outgoing thread of
3803    * lost items (so that we can set discont flags and such) */
3804   if (priv->do_lost) {
3805     GstClockTime duration, timestamp;
3806     /* create paket lost event */
3807     timestamp = apply_offset (jitterbuffer, timer->timeout);
3808     duration = timer->duration;
3809     if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
3810       duration = priv->packet_spacing;
3811     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
3812         gst_structure_new ("GstRTPPacketLost",
3813             "seqnum", G_TYPE_UINT, (guint) seqnum,
3814             "timestamp", G_TYPE_UINT64, timestamp,
3815             "duration", G_TYPE_UINT64, duration,
3816             "retry", G_TYPE_UINT, num_rtx_retry, NULL));
3817   }
3818   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
3819   if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL))
3820     /* Duplicate */
3821     free_item (item);
3822
3823   if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
3824     /* Store info to update stats if the packet arrives too late */
3825     timer_queue_append (priv->rtx_stats_timers, timer,
3826         now + priv->rtx_stats_timeout * GST_MSECOND, TRUE);
3827   }
3828   remove_timer (jitterbuffer, timer);
3829
3830   if (head)
3831     JBUF_SIGNAL_EVENT (priv);
3832
3833   return TRUE;
3834 }
3835
3836 static gboolean
3837 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3838     GstClockTime now)
3839 {
3840   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3841
3842   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3843   remove_timer (jitterbuffer, timer);
3844   if (!priv->eos) {
3845     /* there was no EOS in the buffer, put one in there now */
3846     queue_event (jitterbuffer, gst_event_new_eos ());
3847   }
3848   JBUF_SIGNAL_EVENT (priv);
3849
3850   return TRUE;
3851 }
3852
3853 static gboolean
3854 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3855     GstClockTime now)
3856 {
3857   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3858
3859   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3860
3861   /* timer seqnum might have been obsoleted by caps seqnum-base,
3862    * only mess with current ongoing seqnum if still unknown */
3863   if (priv->next_seqnum == -1)
3864     priv->next_seqnum = timer->seqnum;
3865   remove_timer (jitterbuffer, timer);
3866   JBUF_SIGNAL_EVENT (priv);
3867
3868   return TRUE;
3869 }
3870
3871 static gboolean
3872 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3873     GstClockTime now)
3874 {
3875   gboolean removed = FALSE;
3876
3877   switch (timer->type) {
3878     case TIMER_TYPE_EXPECTED:
3879       removed = do_expected_timeout (jitterbuffer, timer, now);
3880       break;
3881     case TIMER_TYPE_LOST:
3882       removed = do_lost_timeout (jitterbuffer, timer, now);
3883       break;
3884     case TIMER_TYPE_DEADLINE:
3885       removed = do_deadline_timeout (jitterbuffer, timer, now);
3886       break;
3887     case TIMER_TYPE_EOS:
3888       removed = do_eos_timeout (jitterbuffer, timer, now);
3889       break;
3890   }
3891   return removed;
3892 }
3893
3894 /* called when we need to wait for the next timeout.
3895  *
3896  * We loop over the array of recorded timeouts and wait for the earliest one.
3897  * When it timed out, do the logic associated with the timer.
3898  *
3899  * If there are no timers, we wait on a gcond until something new happens.
3900  */
3901 static void
3902 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3903 {
3904   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3905   GstClockTime now = 0;
3906
3907   JBUF_LOCK (priv);
3908   while (priv->timer_running) {
3909     TimerData *timer = NULL;
3910     GstClockTime timer_timeout = -1;
3911     gint i, len;
3912
3913     /* If we have a clock, update "now" now with the very
3914      * latest running time we have. If timers are unscheduled below we
3915      * otherwise wouldn't update now (it's only updated when timers
3916      * expire), and also for the very first loop iteration now would
3917      * otherwise always be 0
3918      */
3919     GST_OBJECT_LOCK (jitterbuffer);
3920     if (GST_ELEMENT_CLOCK (jitterbuffer)) {
3921       now =
3922           gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
3923           GST_ELEMENT_CAST (jitterbuffer)->base_time;
3924     }
3925     GST_OBJECT_UNLOCK (jitterbuffer);
3926
3927     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3928         GST_TIME_ARGS (now));
3929
3930     /* Clear expired rtx-stats timers */
3931     if (priv->do_retransmission)
3932       timer_queue_clear_until (priv->rtx_stats_timers, now);
3933
3934     /* Iterate "normal" timers */
3935     len = priv->timers->len;
3936     for (i = 0; i < len;) {
3937       TimerData *test = &g_array_index (priv->timers, TimerData, i);
3938       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3939       gboolean save_best = FALSE;
3940
3941       GST_DEBUG_OBJECT (jitterbuffer,
3942           "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i,
3943           test->type, test->seqnum, GST_TIME_ARGS (test_timeout),
3944           GST_STIME_ARGS ((gint64) (test_timeout - now)));
3945
3946       /* Weed out anything too late */
3947       if (test->type == TIMER_TYPE_LOST &&
3948           (test_timeout == -1 || test_timeout <= now)) {
3949         GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry");
3950         do_lost_timeout (jitterbuffer, test, now);
3951         if (!priv->timer_running)
3952           break;
3953         /* We don't move the iterator forward since we just removed the current entry,
3954          * but we update the termination condition */
3955         len = priv->timers->len;
3956       } else {
3957         /* find the smallest timeout */
3958         if (timer == NULL) {
3959           save_best = TRUE;
3960         } else if (timer_timeout == -1) {
3961           /* we already have an immediate timeout, the new timer must be an
3962            * immediate timer with smaller seqnum to become the best */
3963           if (test_timeout == -1
3964               && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3965                       timer->seqnum) > 0))
3966             save_best = TRUE;
3967         } else if (test_timeout == -1) {
3968           /* first immediate timer */
3969           save_best = TRUE;
3970         } else if (test_timeout < timer_timeout) {
3971           /* earlier timer */
3972           save_best = TRUE;
3973         } else if (test_timeout == timer_timeout
3974             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3975                     timer->seqnum) > 0)) {
3976           /* same timer, smaller seqnum */
3977           save_best = TRUE;
3978         }
3979
3980         if (save_best) {
3981           GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3982           timer = test;
3983           timer_timeout = test_timeout;
3984         }
3985         i++;
3986       }
3987     }
3988     if (timer && !priv->blocked) {
3989       GstClock *clock;
3990       GstClockTime sync_time;
3991       GstClockID id;
3992       GstClockReturn ret;
3993       GstClockTimeDiff clock_jitter;
3994
3995       if (timer_timeout == -1 || timer_timeout <= now) {
3996         /* We have normally removed all lost timers in the loop above */
3997         g_assert (timer->type != TIMER_TYPE_LOST);
3998
3999         do_timeout (jitterbuffer, timer, now);
4000         /* check here, do_timeout could have released the lock */
4001         if (!priv->timer_running)
4002           break;
4003         continue;
4004       }
4005
4006       GST_OBJECT_LOCK (jitterbuffer);
4007       clock = GST_ELEMENT_CLOCK (jitterbuffer);
4008       if (!clock) {
4009         GST_OBJECT_UNLOCK (jitterbuffer);
4010         /* let's just push if there is no clock */
4011         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
4012         now = timer_timeout;
4013         continue;
4014       }
4015
4016       /* prepare for sync against clock */
4017       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
4018       /* add latency of peer to get input time */
4019       sync_time += priv->peer_latency;
4020
4021       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
4022           " with sync time %" GST_TIME_FORMAT,
4023           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
4024
4025       /* create an entry for the clock */
4026       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
4027       priv->timer_timeout = timer_timeout;
4028       priv->timer_seqnum = timer->seqnum;
4029       GST_OBJECT_UNLOCK (jitterbuffer);
4030
4031       /* release the lock so that the other end can push stuff or unlock */
4032       JBUF_UNLOCK (priv);
4033
4034       ret = gst_clock_id_wait (id, &clock_jitter);
4035
4036       JBUF_LOCK (priv);
4037       if (!priv->timer_running) {
4038         gst_clock_id_unref (id);
4039         priv->clock_id = NULL;
4040         break;
4041       }
4042
4043       if (ret != GST_CLOCK_UNSCHEDULED) {
4044         now = timer_timeout + MAX (clock_jitter, 0);
4045         GST_DEBUG_OBJECT (jitterbuffer,
4046             "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
4047             GST_STIME_ARGS (clock_jitter));
4048       } else {
4049         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
4050       }
4051       /* and free the entry */
4052       gst_clock_id_unref (id);
4053       priv->clock_id = NULL;
4054     } else {
4055       /* no timers, wait for activity */
4056       JBUF_WAIT_TIMER (priv);
4057     }
4058   }
4059   JBUF_UNLOCK (priv);
4060
4061   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
4062   return;
4063 }
4064
4065 /*
4066  * This funcion implements the main pushing loop on the source pad.
4067  *
4068  * It first tries to push as many buffers as possible. If there is a seqnum
4069  * mismatch, we wait for the next timeouts.
4070  */
4071 static void
4072 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
4073 {
4074   GstRtpJitterBufferPrivate *priv;
4075   GstFlowReturn result = GST_FLOW_OK;
4076
4077   priv = jitterbuffer->priv;
4078
4079   JBUF_LOCK_CHECK (priv, flushing);
4080   do {
4081     result = handle_next_buffer (jitterbuffer);
4082     if (G_LIKELY (result == GST_FLOW_WAIT)) {
4083       /* now wait for the next event */
4084       JBUF_WAIT_EVENT (priv, flushing);
4085       result = GST_FLOW_OK;
4086     }
4087   } while (result == GST_FLOW_OK);
4088   /* store result for upstream */
4089   priv->srcresult = result;
4090   /* if we get here we need to pause */
4091   goto pause;
4092
4093   /* ERRORS */
4094 flushing:
4095   {
4096     result = priv->srcresult;
4097     goto pause;
4098   }
4099 pause:
4100   {
4101     GstEvent *event;
4102
4103     JBUF_SIGNAL_QUERY (priv, FALSE);
4104     JBUF_UNLOCK (priv);
4105
4106     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
4107         gst_flow_get_name (result));
4108     gst_pad_pause_task (priv->srcpad);
4109     if (result == GST_FLOW_EOS) {
4110       event = gst_event_new_eos ();
4111       gst_pad_push_event (priv->srcpad, event);
4112     }
4113     return;
4114   }
4115 }
4116
4117 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
4118  * some sanity checks and then emit the handle-sync signal with the parameters.
4119  * This function must be called with the LOCK */
4120 static void
4121 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
4122 {
4123   GstRtpJitterBufferPrivate *priv;
4124   guint64 base_rtptime, base_time;
4125   guint32 clock_rate;
4126   guint64 last_rtptime;
4127   guint64 clock_base;
4128   guint64 ext_rtptime, diff;
4129   gboolean valid = TRUE, keep = FALSE;
4130
4131   priv = jitterbuffer->priv;
4132
4133   /* get the last values from the jitterbuffer */
4134   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
4135       &clock_rate, &last_rtptime);
4136
4137   clock_base = priv->clock_base;
4138   ext_rtptime = priv->ext_rtptime;
4139
4140   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
4141       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
4142       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
4143       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
4144
4145   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
4146     /* we keep this SR packet for later. When we get a valid RTP packet the
4147      * above values will be set and we can try to use the SR packet */
4148     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
4149     keep = TRUE;
4150   } else {
4151     /* we can't accept anything that happened before we did the last resync */
4152     if (base_rtptime > ext_rtptime) {
4153       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
4154       valid = FALSE;
4155     } else {
4156       /* the SR RTP timestamp must be something close to what we last observed
4157        * in the jitterbuffer */
4158       if (ext_rtptime > last_rtptime) {
4159         /* check how far ahead it is to our RTP timestamps */
4160         diff = ext_rtptime - last_rtptime;
4161         /* if bigger than 1 second, we drop it */
4162         if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 &&
4163             diff >
4164             gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff,
4165                 clock_rate, 1000)) {
4166           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
4167           /* should drop this, but some RTSP servers end up with bogus
4168            * way too ahead RTCP packet when repeated PAUSE/PLAY,
4169            * so still trigger rptbin sync but invalidate RTCP data
4170            * (sync might use other methods) */
4171           ext_rtptime = -1;
4172         }
4173         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
4174             G_GUINT64_FORMAT, last_rtptime, diff);
4175       }
4176     }
4177   }
4178
4179   if (keep) {
4180     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
4181   } else if (valid) {
4182     GstStructure *s;
4183
4184     s = gst_structure_new ("application/x-rtp-sync",
4185         "base-rtptime", G_TYPE_UINT64, base_rtptime,
4186         "base-time", G_TYPE_UINT64, base_time,
4187         "clock-rate", G_TYPE_UINT, clock_rate,
4188         "clock-base", G_TYPE_UINT64, clock_base,
4189         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
4190         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
4191
4192     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
4193     gst_buffer_replace (&priv->last_sr, NULL);
4194     JBUF_UNLOCK (priv);
4195     g_signal_emit (jitterbuffer,
4196         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
4197     JBUF_LOCK (priv);
4198     gst_structure_free (s);
4199   } else {
4200     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
4201     gst_buffer_replace (&priv->last_sr, NULL);
4202   }
4203 }
4204
4205 static GstFlowReturn
4206 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
4207     GstBuffer * buffer)
4208 {
4209   GstRtpJitterBuffer *jitterbuffer;
4210   GstRtpJitterBufferPrivate *priv;
4211   GstFlowReturn ret = GST_FLOW_OK;
4212   guint32 ssrc;
4213   GstRTCPPacket packet;
4214   guint64 ext_rtptime;
4215   guint32 rtptime;
4216   GstRTCPBuffer rtcp = { NULL, };
4217
4218   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4219
4220   if (G_UNLIKELY (!gst_rtcp_buffer_validate_reduced (buffer)))
4221     goto invalid_buffer;
4222
4223   priv = jitterbuffer->priv;
4224
4225   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
4226
4227   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
4228     goto empty_buffer;
4229
4230   /* first packet must be SR or RR or else the validate would have failed */
4231   switch (gst_rtcp_packet_get_type (&packet)) {
4232     case GST_RTCP_TYPE_SR:
4233       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
4234           NULL, NULL);
4235       break;
4236     default:
4237       goto ignore_buffer;
4238   }
4239   gst_rtcp_buffer_unmap (&rtcp);
4240
4241   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
4242
4243   JBUF_LOCK (priv);
4244   /* convert the RTP timestamp to our extended timestamp, using the same offset
4245    * we used in the jitterbuffer */
4246   ext_rtptime = priv->jbuf->ext_rtptime;
4247   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
4248
4249   priv->ext_rtptime = ext_rtptime;
4250   gst_buffer_replace (&priv->last_sr, buffer);
4251
4252   do_handle_sync (jitterbuffer);
4253   JBUF_UNLOCK (priv);
4254
4255 done:
4256   gst_buffer_unref (buffer);
4257
4258   return ret;
4259
4260 invalid_buffer:
4261   {
4262     /* this is not fatal but should be filtered earlier */
4263     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4264         ("Received invalid RTCP payload, dropping"));
4265     ret = GST_FLOW_OK;
4266     goto done;
4267   }
4268 empty_buffer:
4269   {
4270     /* this is not fatal but should be filtered earlier */
4271     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4272         ("Received empty RTCP payload, dropping"));
4273     gst_rtcp_buffer_unmap (&rtcp);
4274     ret = GST_FLOW_OK;
4275     goto done;
4276   }
4277 ignore_buffer:
4278   {
4279     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
4280     gst_rtcp_buffer_unmap (&rtcp);
4281     ret = GST_FLOW_OK;
4282     goto done;
4283   }
4284 }
4285
4286 static gboolean
4287 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
4288     GstQuery * query)
4289 {
4290   gboolean res = FALSE;
4291   GstRtpJitterBuffer *jitterbuffer;
4292   GstRtpJitterBufferPrivate *priv;
4293
4294   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4295   priv = jitterbuffer->priv;
4296
4297   switch (GST_QUERY_TYPE (query)) {
4298     case GST_QUERY_CAPS:
4299     {
4300       GstCaps *filter, *caps;
4301
4302       gst_query_parse_caps (query, &filter);
4303       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4304       gst_query_set_caps_result (query, caps);
4305       gst_caps_unref (caps);
4306       res = TRUE;
4307       break;
4308     }
4309     default:
4310       if (GST_QUERY_IS_SERIALIZED (query)) {
4311         RTPJitterBufferItem *item;
4312         gboolean head;
4313
4314         JBUF_LOCK_CHECK (priv, out_flushing);
4315         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
4316             RTP_JITTER_BUFFER_MODE_BUFFER) {
4317           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
4318           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
4319           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
4320           if (head)
4321             JBUF_SIGNAL_EVENT (priv);
4322           JBUF_WAIT_QUERY (priv, out_flushing);
4323           res = priv->last_query;
4324         } else {
4325           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
4326           res = FALSE;
4327         }
4328         JBUF_UNLOCK (priv);
4329       } else {
4330         res = gst_pad_query_default (pad, parent, query);
4331       }
4332       break;
4333   }
4334   return res;
4335   /* ERRORS */
4336 out_flushing:
4337   {
4338     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
4339     JBUF_UNLOCK (priv);
4340     return FALSE;
4341   }
4342
4343 }
4344
4345 static gboolean
4346 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
4347     GstQuery * query)
4348 {
4349   GstRtpJitterBuffer *jitterbuffer;
4350   GstRtpJitterBufferPrivate *priv;
4351   gboolean res = FALSE;
4352
4353   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4354   priv = jitterbuffer->priv;
4355
4356   switch (GST_QUERY_TYPE (query)) {
4357     case GST_QUERY_LATENCY:
4358     {
4359       /* We need to send the query upstream and add the returned latency to our
4360        * own */
4361       GstClockTime min_latency, max_latency;
4362       gboolean us_live;
4363       GstClockTime our_latency;
4364
4365       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
4366         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
4367
4368         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
4369             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4370             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4371
4372         /* store this so that we can safely sync on the peer buffers. */
4373         JBUF_LOCK (priv);
4374         priv->peer_latency = min_latency;
4375         our_latency = priv->latency_ns;
4376         JBUF_UNLOCK (priv);
4377
4378         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
4379             GST_TIME_ARGS (our_latency));
4380
4381         /* we add some latency but can buffer an infinite amount of time */
4382         min_latency += our_latency;
4383         max_latency = -1;
4384
4385         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
4386             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4387             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4388
4389         gst_query_set_latency (query, TRUE, min_latency, max_latency);
4390       }
4391       break;
4392     }
4393     case GST_QUERY_POSITION:
4394     {
4395       GstClockTime start, last_out;
4396       GstFormat fmt;
4397
4398       gst_query_parse_position (query, &fmt, NULL);
4399       if (fmt != GST_FORMAT_TIME) {
4400         res = gst_pad_query_default (pad, parent, query);
4401         break;
4402       }
4403
4404       JBUF_LOCK (priv);
4405       start = priv->npt_start;
4406       last_out = priv->last_out_time;
4407       JBUF_UNLOCK (priv);
4408
4409       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
4410           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
4411           GST_TIME_ARGS (last_out));
4412
4413       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
4414         /* bring 0-based outgoing time to stream time */
4415         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
4416         res = TRUE;
4417       } else {
4418         res = gst_pad_query_default (pad, parent, query);
4419       }
4420       break;
4421     }
4422     case GST_QUERY_CAPS:
4423     {
4424       GstCaps *filter, *caps;
4425
4426       gst_query_parse_caps (query, &filter);
4427       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4428       gst_query_set_caps_result (query, caps);
4429       gst_caps_unref (caps);
4430       res = TRUE;
4431       break;
4432     }
4433     default:
4434       res = gst_pad_query_default (pad, parent, query);
4435       break;
4436   }
4437
4438   return res;
4439 }
4440
4441 static void
4442 gst_rtp_jitter_buffer_set_property (GObject * object,
4443     guint prop_id, const GValue * value, GParamSpec * pspec)
4444 {
4445   GstRtpJitterBuffer *jitterbuffer;
4446   GstRtpJitterBufferPrivate *priv;
4447
4448   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4449   priv = jitterbuffer->priv;
4450
4451   switch (prop_id) {
4452     case PROP_LATENCY:
4453     {
4454       guint new_latency, old_latency;
4455
4456       new_latency = g_value_get_uint (value);
4457
4458       JBUF_LOCK (priv);
4459       old_latency = priv->latency_ms;
4460       priv->latency_ms = new_latency;
4461       priv->latency_ns = priv->latency_ms * GST_MSECOND;
4462       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
4463       JBUF_UNLOCK (priv);
4464
4465       /* post message if latency changed, this will inform the parent pipeline
4466        * that a latency reconfiguration is possible/needed. */
4467       if (new_latency != old_latency) {
4468         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
4469             GST_TIME_ARGS (new_latency * GST_MSECOND));
4470
4471         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
4472             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
4473       }
4474       break;
4475     }
4476     case PROP_DROP_ON_LATENCY:
4477       JBUF_LOCK (priv);
4478       priv->drop_on_latency = g_value_get_boolean (value);
4479       JBUF_UNLOCK (priv);
4480       break;
4481     case PROP_TS_OFFSET:
4482       JBUF_LOCK (priv);
4483       priv->ts_offset = g_value_get_int64 (value);
4484       priv->ts_discont = TRUE;
4485       JBUF_UNLOCK (priv);
4486       break;
4487     case PROP_DO_LOST:
4488       JBUF_LOCK (priv);
4489       priv->do_lost = g_value_get_boolean (value);
4490       JBUF_UNLOCK (priv);
4491       break;
4492     case PROP_MODE:
4493       JBUF_LOCK (priv);
4494       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
4495       JBUF_UNLOCK (priv);
4496       break;
4497     case PROP_DO_RETRANSMISSION:
4498       JBUF_LOCK (priv);
4499       priv->do_retransmission = g_value_get_boolean (value);
4500       JBUF_UNLOCK (priv);
4501       break;
4502     case PROP_RTX_NEXT_SEQNUM:
4503       JBUF_LOCK (priv);
4504       priv->rtx_next_seqnum = g_value_get_boolean (value);
4505       JBUF_UNLOCK (priv);
4506       break;
4507     case PROP_RTX_DELAY:
4508       JBUF_LOCK (priv);
4509       priv->rtx_delay = g_value_get_int (value);
4510       JBUF_UNLOCK (priv);
4511       break;
4512     case PROP_RTX_MIN_DELAY:
4513       JBUF_LOCK (priv);
4514       priv->rtx_min_delay = g_value_get_uint (value);
4515       JBUF_UNLOCK (priv);
4516       break;
4517     case PROP_RTX_DELAY_REORDER:
4518       JBUF_LOCK (priv);
4519       priv->rtx_delay_reorder = g_value_get_int (value);
4520       JBUF_UNLOCK (priv);
4521       break;
4522     case PROP_RTX_RETRY_TIMEOUT:
4523       JBUF_LOCK (priv);
4524       priv->rtx_retry_timeout = g_value_get_int (value);
4525       JBUF_UNLOCK (priv);
4526       break;
4527     case PROP_RTX_MIN_RETRY_TIMEOUT:
4528       JBUF_LOCK (priv);
4529       priv->rtx_min_retry_timeout = g_value_get_int (value);
4530       JBUF_UNLOCK (priv);
4531       break;
4532     case PROP_RTX_RETRY_PERIOD:
4533       JBUF_LOCK (priv);
4534       priv->rtx_retry_period = g_value_get_int (value);
4535       JBUF_UNLOCK (priv);
4536       break;
4537     case PROP_RTX_MAX_RETRIES:
4538       JBUF_LOCK (priv);
4539       priv->rtx_max_retries = g_value_get_int (value);
4540       JBUF_UNLOCK (priv);
4541       break;
4542     case PROP_RTX_DEADLINE:
4543       JBUF_LOCK (priv);
4544       priv->rtx_deadline_ms = g_value_get_int (value);
4545       JBUF_UNLOCK (priv);
4546       break;
4547     case PROP_RTX_STATS_TIMEOUT:
4548       JBUF_LOCK (priv);
4549       priv->rtx_stats_timeout = g_value_get_uint (value);
4550       JBUF_UNLOCK (priv);
4551       break;
4552     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4553       JBUF_LOCK (priv);
4554       priv->max_rtcp_rtp_time_diff = g_value_get_int (value);
4555       JBUF_UNLOCK (priv);
4556       break;
4557     case PROP_MAX_DROPOUT_TIME:
4558       JBUF_LOCK (priv);
4559       priv->max_dropout_time = g_value_get_uint (value);
4560       JBUF_UNLOCK (priv);
4561       break;
4562     case PROP_MAX_MISORDER_TIME:
4563       JBUF_LOCK (priv);
4564       priv->max_misorder_time = g_value_get_uint (value);
4565       JBUF_UNLOCK (priv);
4566       break;
4567     case PROP_RFC7273_SYNC:
4568       JBUF_LOCK (priv);
4569       rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
4570           g_value_get_boolean (value));
4571       JBUF_UNLOCK (priv);
4572       break;
4573     case PROP_FASTSTART_MIN_PACKETS:
4574       JBUF_LOCK (priv);
4575       priv->faststart_min_packets = g_value_get_uint (value);
4576       JBUF_UNLOCK (priv);
4577       break;
4578     default:
4579       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4580       break;
4581   }
4582 }
4583
4584 static void
4585 gst_rtp_jitter_buffer_get_property (GObject * object,
4586     guint prop_id, GValue * value, GParamSpec * pspec)
4587 {
4588   GstRtpJitterBuffer *jitterbuffer;
4589   GstRtpJitterBufferPrivate *priv;
4590
4591   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4592   priv = jitterbuffer->priv;
4593
4594   switch (prop_id) {
4595     case PROP_LATENCY:
4596       JBUF_LOCK (priv);
4597       g_value_set_uint (value, priv->latency_ms);
4598       JBUF_UNLOCK (priv);
4599       break;
4600     case PROP_DROP_ON_LATENCY:
4601       JBUF_LOCK (priv);
4602       g_value_set_boolean (value, priv->drop_on_latency);
4603       JBUF_UNLOCK (priv);
4604       break;
4605     case PROP_TS_OFFSET:
4606       JBUF_LOCK (priv);
4607       g_value_set_int64 (value, priv->ts_offset);
4608       JBUF_UNLOCK (priv);
4609       break;
4610     case PROP_DO_LOST:
4611       JBUF_LOCK (priv);
4612       g_value_set_boolean (value, priv->do_lost);
4613       JBUF_UNLOCK (priv);
4614       break;
4615     case PROP_MODE:
4616       JBUF_LOCK (priv);
4617       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
4618       JBUF_UNLOCK (priv);
4619       break;
4620     case PROP_PERCENT:
4621     {
4622       gint percent;
4623
4624       JBUF_LOCK (priv);
4625       if (priv->srcresult != GST_FLOW_OK)
4626         percent = 100;
4627       else
4628         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
4629
4630       g_value_set_int (value, percent);
4631       JBUF_UNLOCK (priv);
4632       break;
4633     }
4634     case PROP_DO_RETRANSMISSION:
4635       JBUF_LOCK (priv);
4636       g_value_set_boolean (value, priv->do_retransmission);
4637       JBUF_UNLOCK (priv);
4638       break;
4639     case PROP_RTX_NEXT_SEQNUM:
4640       JBUF_LOCK (priv);
4641       g_value_set_boolean (value, priv->rtx_next_seqnum);
4642       JBUF_UNLOCK (priv);
4643       break;
4644     case PROP_RTX_DELAY:
4645       JBUF_LOCK (priv);
4646       g_value_set_int (value, priv->rtx_delay);
4647       JBUF_UNLOCK (priv);
4648       break;
4649     case PROP_RTX_MIN_DELAY:
4650       JBUF_LOCK (priv);
4651       g_value_set_uint (value, priv->rtx_min_delay);
4652       JBUF_UNLOCK (priv);
4653       break;
4654     case PROP_RTX_DELAY_REORDER:
4655       JBUF_LOCK (priv);
4656       g_value_set_int (value, priv->rtx_delay_reorder);
4657       JBUF_UNLOCK (priv);
4658       break;
4659     case PROP_RTX_RETRY_TIMEOUT:
4660       JBUF_LOCK (priv);
4661       g_value_set_int (value, priv->rtx_retry_timeout);
4662       JBUF_UNLOCK (priv);
4663       break;
4664     case PROP_RTX_MIN_RETRY_TIMEOUT:
4665       JBUF_LOCK (priv);
4666       g_value_set_int (value, priv->rtx_min_retry_timeout);
4667       JBUF_UNLOCK (priv);
4668       break;
4669     case PROP_RTX_RETRY_PERIOD:
4670       JBUF_LOCK (priv);
4671       g_value_set_int (value, priv->rtx_retry_period);
4672       JBUF_UNLOCK (priv);
4673       break;
4674     case PROP_RTX_MAX_RETRIES:
4675       JBUF_LOCK (priv);
4676       g_value_set_int (value, priv->rtx_max_retries);
4677       JBUF_UNLOCK (priv);
4678       break;
4679     case PROP_RTX_DEADLINE:
4680       JBUF_LOCK (priv);
4681       g_value_set_int (value, priv->rtx_deadline_ms);
4682       JBUF_UNLOCK (priv);
4683       break;
4684     case PROP_RTX_STATS_TIMEOUT:
4685       JBUF_LOCK (priv);
4686       g_value_set_uint (value, priv->rtx_stats_timeout);
4687       JBUF_UNLOCK (priv);
4688       break;
4689     case PROP_STATS:
4690       g_value_take_boxed (value,
4691           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
4692       break;
4693     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4694       JBUF_LOCK (priv);
4695       g_value_set_int (value, priv->max_rtcp_rtp_time_diff);
4696       JBUF_UNLOCK (priv);
4697       break;
4698     case PROP_MAX_DROPOUT_TIME:
4699       JBUF_LOCK (priv);
4700       g_value_set_uint (value, priv->max_dropout_time);
4701       JBUF_UNLOCK (priv);
4702       break;
4703     case PROP_MAX_MISORDER_TIME:
4704       JBUF_LOCK (priv);
4705       g_value_set_uint (value, priv->max_misorder_time);
4706       JBUF_UNLOCK (priv);
4707       break;
4708     case PROP_RFC7273_SYNC:
4709       JBUF_LOCK (priv);
4710       g_value_set_boolean (value,
4711           rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
4712       JBUF_UNLOCK (priv);
4713       break;
4714     case PROP_FASTSTART_MIN_PACKETS:
4715       JBUF_LOCK (priv);
4716       g_value_set_uint (value, priv->faststart_min_packets);
4717       JBUF_UNLOCK (priv);
4718       break;
4719     default:
4720       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4721       break;
4722   }
4723 }
4724
4725 static GstStructure *
4726 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
4727 {
4728   GstRtpJitterBufferPrivate *priv = jbuf->priv;
4729   GstStructure *s;
4730
4731   JBUF_LOCK (priv);
4732   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
4733       "num-pushed", G_TYPE_UINT64, priv->num_pushed,
4734       "num-lost", G_TYPE_UINT64, priv->num_lost,
4735       "num-late", G_TYPE_UINT64, priv->num_late,
4736       "num-duplicates", G_TYPE_UINT64, priv->num_duplicates,
4737       "avg-jitter", G_TYPE_UINT64, priv->avg_jitter,
4738       "rtx-count", G_TYPE_UINT64, priv->num_rtx_requests,
4739       "rtx-success-count", G_TYPE_UINT64, priv->num_rtx_success,
4740       "rtx-per-packet", G_TYPE_DOUBLE, priv->avg_rtx_num,
4741       "rtx-rtt", G_TYPE_UINT64, priv->avg_rtx_rtt, NULL);
4742   JBUF_UNLOCK (priv);
4743
4744   return s;
4745 }