Update for g_type_class_add_private() deprecation in recent GLib
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *  Copyright 2015 Kurento (http://kurento.org/)
9  *   @author: Miguel ParĂ­s <mparisdiaz@gmail.com>
10  *  Copyright 2016 Pexip AS
11  *   @author: Havard Graff <havard@pexip.com>
12  *   @author: Stian Selnes <stian@pexip.com>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  *
29  */
30
31 /**
32  * SECTION:element-rtpjitterbuffer
33  *
34  * This element reorders and removes duplicate RTP packets as they are received
35  * from a network source.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * The rtpjitterbuffer will wait for missing packets up to a configurable time
43  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
44  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
45  * property is set, lost packets will result in a custom serialized downstream
46  * event of name GstRTPPacketLost. The lost packet events are usually used by a
47  * depayloader or other element to create concealment data or some other logic
48  * to gracefully handle the missing packets.
49  *
50  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
51  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
52  * buffer.
53  *
54  * The jitterbuffer can also be configured to send early retransmission events
55  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
56  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
57  * sends a custom upstream event named GstRTPRetransmissionRequest when the
58  * packet is considered late. The initial expected packet arrival time is
59  * calculated as follows:
60  *
61  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
62  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
63  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
64  *     packets with different rtptime.
65  *
66  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
67  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
68  *     previously scheduled timeout is overwritten.
69  *
70  * - If seqnum N arrived, all seqnum older than
71  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
72  *     immediately. This is to request fast feedback for abonormally reorder
73  *     packets before any of the previous timeouts is triggered.
74  *
75  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
76  * event. After the initial timeout expires and the retransmission event is
77  * sent, the timeout is scheduled for
78  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
79  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
80  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
81  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
82  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
83  * retransmission requests are sent and the regular logic is performed to
84  * schedule a lost packet as discussed above.
85  *
86  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
87  * to the pipeline.
88  *
89  * This element will automatically be used inside rtpbin.
90  *
91  * <refsect2>
92  * <title>Example pipelines</title>
93  * |[
94  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
95  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
96  * inserted into the pipeline to smooth out network jitter and to reorder the
97  * out-of-order RTP packets.
98  * </refsect2>
99  */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include <stdlib.h>
106 #include <stdio.h>
107 #include <string.h>
108 #include <gst/rtp/gstrtpbuffer.h>
109 #include <gst/net/net.h>
110
111 #include "gstrtpjitterbuffer.h"
112 #include "rtpjitterbuffer.h"
113 #include "rtpstats.h"
114
115 #include <gst/glib-compat-private.h>
116
117 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
118 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
119
120 /* RTPJitterBuffer signals and args */
121 enum
122 {
123   SIGNAL_REQUEST_PT_MAP,
124   SIGNAL_CLEAR_PT_MAP,
125   SIGNAL_HANDLE_SYNC,
126   SIGNAL_ON_NPT_STOP,
127   SIGNAL_SET_ACTIVE,
128   LAST_SIGNAL
129 };
130
131 #define DEFAULT_LATENCY_MS          200
132 #define DEFAULT_DROP_ON_LATENCY     FALSE
133 #define DEFAULT_TS_OFFSET           0
134 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT 0
135 #define DEFAULT_DO_LOST             FALSE
136 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
137 #define DEFAULT_PERCENT             0
138 #define DEFAULT_DO_RETRANSMISSION   FALSE
139 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
140 #define DEFAULT_RTX_DELAY           -1
141 #define DEFAULT_RTX_MIN_DELAY       0
142 #define DEFAULT_RTX_DELAY_REORDER   3
143 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
144 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
145 #define DEFAULT_RTX_RETRY_PERIOD    -1
146 #define DEFAULT_RTX_MAX_RETRIES    -1
147 #define DEFAULT_RTX_DEADLINE       -1
148 #define DEFAULT_RTX_STATS_TIMEOUT   1000
149 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
150 #define DEFAULT_MAX_DROPOUT_TIME    60000
151 #define DEFAULT_MAX_MISORDER_TIME   2000
152 #define DEFAULT_RFC7273_SYNC        FALSE
153 #define DEFAULT_FASTSTART_MIN_PACKETS 0
154
155 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
156 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
157
158 enum
159 {
160   PROP_0,
161   PROP_LATENCY,
162   PROP_DROP_ON_LATENCY,
163   PROP_TS_OFFSET,
164   PROP_MAX_TS_OFFSET_ADJUSTMENT,
165   PROP_DO_LOST,
166   PROP_MODE,
167   PROP_PERCENT,
168   PROP_DO_RETRANSMISSION,
169   PROP_RTX_NEXT_SEQNUM,
170   PROP_RTX_DELAY,
171   PROP_RTX_MIN_DELAY,
172   PROP_RTX_DELAY_REORDER,
173   PROP_RTX_RETRY_TIMEOUT,
174   PROP_RTX_MIN_RETRY_TIMEOUT,
175   PROP_RTX_RETRY_PERIOD,
176   PROP_RTX_MAX_RETRIES,
177   PROP_RTX_DEADLINE,
178   PROP_RTX_STATS_TIMEOUT,
179   PROP_STATS,
180   PROP_MAX_RTCP_RTP_TIME_DIFF,
181   PROP_MAX_DROPOUT_TIME,
182   PROP_MAX_MISORDER_TIME,
183   PROP_RFC7273_SYNC,
184   PROP_FASTSTART_MIN_PACKETS
185 };
186
187 #define JBUF_LOCK(priv)   G_STMT_START {                        \
188     GST_TRACE("Locking from thread %p", g_thread_self());       \
189     (g_mutex_lock (&(priv)->jbuf_lock));                        \
190     GST_TRACE("Locked from thread %p", g_thread_self());        \
191   } G_STMT_END
192
193 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
194   JBUF_LOCK (priv);                                   \
195   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
196     goto label;                                       \
197 } G_STMT_END
198 #define JBUF_UNLOCK(priv) G_STMT_START {                        \
199     GST_TRACE ("Unlocking from thread %p", g_thread_self ());   \
200     (g_mutex_unlock (&(priv)->jbuf_lock));                      \
201 } G_STMT_END
202
203 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
204   GST_DEBUG ("waiting timer");                            \
205   (priv)->waiting_timer = TRUE;                           \
206   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
207   (priv)->waiting_timer = FALSE;                          \
208   GST_DEBUG ("waiting timer done");                       \
209 } G_STMT_END
210 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
211   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
212     GST_DEBUG ("signal timer");                           \
213     g_cond_signal (&(priv)->jbuf_timer);                  \
214   }                                                       \
215 } G_STMT_END
216
217 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
218   GST_DEBUG ("waiting event");                           \
219   (priv)->waiting_event = TRUE;                          \
220   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
221   (priv)->waiting_event = FALSE;                         \
222   GST_DEBUG ("waiting event done");                      \
223   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
224     goto label;                                          \
225 } G_STMT_END
226 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
227   if (G_UNLIKELY ((priv)->waiting_event)) {              \
228     GST_DEBUG ("signal event");                          \
229     g_cond_signal (&(priv)->jbuf_event);                 \
230   }                                                      \
231 } G_STMT_END
232
233 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
234   GST_DEBUG ("waiting query");                           \
235   (priv)->waiting_query = TRUE;                          \
236   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
237   (priv)->waiting_query = FALSE;                         \
238   GST_DEBUG ("waiting query done");                      \
239   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
240     goto label;                                          \
241 } G_STMT_END
242 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
243   (priv)->last_query = res;                              \
244   if (G_UNLIKELY ((priv)->waiting_query)) {              \
245     GST_DEBUG ("signal query");                          \
246     g_cond_signal (&(priv)->jbuf_query);                 \
247   }                                                      \
248 } G_STMT_END
249
250 #define GST_BUFFER_IS_RETRANSMISSION(buffer) \
251   GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
252
253 typedef struct TimerQueue
254 {
255   GQueue *timers;
256   GHashTable *hashtable;
257 } TimerQueue;
258
259 struct _GstRtpJitterBufferPrivate
260 {
261   GstPad *sinkpad, *srcpad;
262   GstPad *rtcpsinkpad;
263
264   RTPJitterBuffer *jbuf;
265   GMutex jbuf_lock;
266   gboolean waiting_timer;
267   GCond jbuf_timer;
268   gboolean waiting_event;
269   GCond jbuf_event;
270   gboolean waiting_query;
271   GCond jbuf_query;
272   gboolean last_query;
273   gboolean discont;
274   gboolean ts_discont;
275   gboolean active;
276   guint64 out_offset;
277
278   gboolean timer_running;
279   GThread *timer_thread;
280
281   /* properties */
282   guint latency_ms;
283   guint64 latency_ns;
284   gboolean drop_on_latency;
285   gint64 ts_offset;
286   guint64 max_ts_offset_adjustment;
287   gboolean do_lost;
288   gboolean do_retransmission;
289   gboolean rtx_next_seqnum;
290   gint rtx_delay;
291   guint rtx_min_delay;
292   gint rtx_delay_reorder;
293   gint rtx_retry_timeout;
294   gint rtx_min_retry_timeout;
295   gint rtx_retry_period;
296   gint rtx_max_retries;
297   guint rtx_stats_timeout;
298   gint rtx_deadline_ms;
299   gint max_rtcp_rtp_time_diff;
300   guint32 max_dropout_time;
301   guint32 max_misorder_time;
302   guint faststart_min_packets;
303
304   /* the last seqnum we pushed out */
305   guint32 last_popped_seqnum;
306   /* the next expected seqnum we push */
307   guint32 next_seqnum;
308   /* seqnum-base, if known */
309   guint32 seqnum_base;
310   /* last output time */
311   GstClockTime last_out_time;
312   /* last valid input timestamp and rtptime pair */
313   GstClockTime ips_pts;
314   guint64 ips_rtptime;
315   GstClockTime packet_spacing;
316   gint equidistant;
317
318   GQueue gap_packets;
319
320   /* the next expected seqnum we receive */
321   GstClockTime last_in_pts;
322   guint32 next_in_seqnum;
323
324   GArray *timers;
325   TimerQueue *rtx_stats_timers;
326
327   /* start and stop ranges */
328   GstClockTime npt_start;
329   GstClockTime npt_stop;
330   guint64 ext_timestamp;
331   guint64 last_elapsed;
332   guint64 estimated_eos;
333   GstClockID eos_id;
334
335   /* state */
336   gboolean eos;
337   guint last_percent;
338
339   /* clock rate and rtp timestamp offset */
340   gint last_pt;
341   gint32 clock_rate;
342   gint64 clock_base;
343   gint64 ts_offset_remainder;
344
345   /* when we are shutting down */
346   GstFlowReturn srcresult;
347   gboolean blocked;
348
349   /* for sync */
350   GstSegment segment;
351   GstClockID clock_id;
352   GstClockTime timer_timeout;
353   guint16 timer_seqnum;
354   /* the latency of the upstream peer, we have to take this into account when
355    * synchronizing the buffers. */
356   GstClockTime peer_latency;
357   guint64 ext_rtptime;
358   GstBuffer *last_sr;
359
360   /* some accounting */
361   guint64 num_pushed;
362   guint64 num_lost;
363   guint64 num_late;
364   guint64 num_duplicates;
365   guint64 num_rtx_requests;
366   guint64 num_rtx_success;
367   guint64 num_rtx_failed;
368   gdouble avg_rtx_num;
369   guint64 avg_rtx_rtt;
370   RTPPacketRateCtx packet_rate_ctx;
371
372   /* for the jitter */
373   GstClockTime last_dts;
374   GstClockTime last_pts;
375   guint64 last_rtptime;
376   GstClockTime avg_jitter;
377 };
378
379 typedef enum
380 {
381   TIMER_TYPE_EXPECTED,
382   TIMER_TYPE_LOST,
383   TIMER_TYPE_DEADLINE,
384   TIMER_TYPE_EOS
385 } TimerType;
386
387 typedef struct
388 {
389   guint idx;
390   guint16 seqnum;
391   guint num;
392   TimerType type;
393   GstClockTime timeout;
394   GstClockTime duration;
395   GstClockTime rtx_base;
396   GstClockTime rtx_delay;
397   GstClockTime rtx_retry;
398   GstClockTime rtx_last;
399   guint num_rtx_retry;
400   guint num_rtx_received;
401 } TimerData;
402
403 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
404 GST_STATIC_PAD_TEMPLATE ("sink",
405     GST_PAD_SINK,
406     GST_PAD_ALWAYS,
407     GST_STATIC_CAPS ("application/x-rtp"
408         /* "clock-rate = (int) [ 1, 2147483647 ], "
409          * "payload = (int) , "
410          * "encoding-name = (string) "
411          */ )
412     );
413
414 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
415 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
416     GST_PAD_SINK,
417     GST_PAD_REQUEST,
418     GST_STATIC_CAPS ("application/x-rtcp")
419     );
420
421 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
422 GST_STATIC_PAD_TEMPLATE ("src",
423     GST_PAD_SRC,
424     GST_PAD_ALWAYS,
425     GST_STATIC_CAPS ("application/x-rtp"
426         /* "payload = (int) , "
427          * "clock-rate = (int) , "
428          * "encoding-name = (string) "
429          */ )
430     );
431
432 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
433
434 #define gst_rtp_jitter_buffer_parent_class parent_class
435 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer,
436     GST_TYPE_ELEMENT);
437
438 /* object overrides */
439 static void gst_rtp_jitter_buffer_set_property (GObject * object,
440     guint prop_id, const GValue * value, GParamSpec * pspec);
441 static void gst_rtp_jitter_buffer_get_property (GObject * object,
442     guint prop_id, GValue * value, GParamSpec * pspec);
443 static void gst_rtp_jitter_buffer_finalize (GObject * object);
444
445 /* element overrides */
446 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
447     * element, GstStateChange transition);
448 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
449     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
450 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
451     GstPad * pad);
452 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
453 static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
454     GstClock * clock);
455
456 /* pad overrides */
457 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
458 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
459     GstObject * parent);
460
461 /* sinkpad overrides */
462 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
463     GstObject * parent, GstEvent * event);
464 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
465     GstObject * parent, GstBuffer * buffer);
466 static GstFlowReturn gst_rtp_jitter_buffer_chain_list (GstPad * pad,
467     GstObject * parent, GstBufferList * buffer_list);
468
469 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
470     GstObject * parent, GstEvent * event);
471 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
472     GstObject * parent, GstBuffer * buffer);
473
474 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
475     GstObject * parent, GstQuery * query);
476
477 /* srcpad overrides */
478 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
479     GstObject * parent, GstEvent * event);
480 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
481     GstObject * parent, GstPadMode mode, gboolean active);
482 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
483 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
484     GstObject * parent, GstQuery * query);
485
486 static void
487 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
488 static GstClockTime
489 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
490     gboolean active, guint64 base_time);
491 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
492
493 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
494 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
495
496 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
497
498 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
499     jitterbuffer);
500
501 static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
502     TimerData * timer, GstClockTime dts, gboolean success);
503
504 static TimerQueue *timer_queue_new (void);
505 static void timer_queue_free (TimerQueue * queue);
506
507 static void
508 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
509 {
510   GObjectClass *gobject_class;
511   GstElementClass *gstelement_class;
512
513   gobject_class = (GObjectClass *) klass;
514   gstelement_class = (GstElementClass *) klass;
515
516   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
517
518   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
519   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
520
521   /**
522    * GstRtpJitterBuffer:latency:
523    *
524    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
525    * for at most this time.
526    */
527   g_object_class_install_property (gobject_class, PROP_LATENCY,
528       g_param_spec_uint ("latency", "Buffer latency in ms",
529           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
530           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
531   /**
532    * GstRtpJitterBuffer:drop-on-latency:
533    *
534    * Drop oldest buffers when the queue is completely filled.
535    */
536   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
537       g_param_spec_boolean ("drop-on-latency",
538           "Drop buffers when maximum latency is reached",
539           "Tells the jitterbuffer to never exceed the given latency in size",
540           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
541   /**
542    * GstRtpJitterBuffer:ts-offset:
543    *
544    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
545    * This is mainly used to ensure interstream synchronisation.
546    */
547   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
548       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
549           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
550           G_MAXINT64, DEFAULT_TS_OFFSET,
551           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
552
553   /**
554    * GstRtpJitterBuffer:max-ts-offset-adjustment:
555    *
556    * The maximum number of nanoseconds per frame that time offset may be
557    * adjusted with. This is used to avoid sudden large changes to time stamps.
558    */
559   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
560       g_param_spec_uint64 ("max-ts-offset-adjustment",
561           "Max Timestamp Offset Adjustment",
562           "The maximum number of nanoseconds per frame that time stamp "
563           "offsets may be adjusted (0 = no limit).", 0, G_MAXUINT64,
564           DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
565           G_PARAM_STATIC_STRINGS));
566
567   /**
568    * GstRtpJitterBuffer:do-lost:
569    *
570    * Send out a GstRTPPacketLost event downstream when a packet is considered
571    * lost.
572    */
573   g_object_class_install_property (gobject_class, PROP_DO_LOST,
574       g_param_spec_boolean ("do-lost", "Do Lost",
575           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
576           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
577
578   /**
579    * GstRtpJitterBuffer:mode:
580    *
581    * Control the buffering and timestamping mode used by the jitterbuffer.
582    */
583   g_object_class_install_property (gobject_class, PROP_MODE,
584       g_param_spec_enum ("mode", "Mode",
585           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
586           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
587   /**
588    * GstRtpJitterBuffer:percent:
589    *
590    * The percent of the jitterbuffer that is filled.
591    */
592   g_object_class_install_property (gobject_class, PROP_PERCENT,
593       g_param_spec_int ("percent", "percent",
594           "The buffer filled percent", 0, 100,
595           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
596   /**
597    * GstRtpJitterBuffer:do-retransmission:
598    *
599    * Send out a GstRTPRetransmission event upstream when a packet is considered
600    * late and should be retransmitted.
601    *
602    * Since: 1.2
603    */
604   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
605       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
606           "Send retransmission events upstream when a packet is late",
607           DEFAULT_DO_RETRANSMISSION,
608           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
609
610   /**
611    * GstRtpJitterBuffer:rtx-next-seqnum
612    *
613    * Estimate when the next packet should arrive and schedule a retransmission
614    * request for it.
615    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
616    * for packet N+1. So it will be requested if it does not arrive at the expected time.
617    * The expected time is calculated using the dts of N and the packet spacing.
618    *
619    * Since: 1.6
620    */
621   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
622       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
623           "Estimate when the next packet should arrive and schedule a "
624           "retransmission request for it.",
625           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
626
627   /**
628    * GstRtpJitterBuffer:rtx-delay:
629    *
630    * When a packet did not arrive at the expected time, wait this extra amount
631    * of time before sending a retransmission event.
632    *
633    * When -1 is used, the max jitter will be used as extra delay.
634    *
635    * Since: 1.2
636    */
637   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
638       g_param_spec_int ("rtx-delay", "RTX Delay",
639           "Extra time in ms to wait before sending retransmission "
640           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
641           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
642
643   /**
644    * GstRtpJitterBuffer:rtx-min-delay:
645    *
646    * When a packet did not arrive at the expected time, wait at least this extra amount
647    * of time before sending a retransmission event.
648    *
649    * Since: 1.6
650    */
651   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
652       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
653           "Minimum time in ms to wait before sending retransmission "
654           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
655           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
656   /**
657    * GstRtpJitterBuffer:rtx-delay-reorder:
658    *
659    * Assume that a retransmission event should be sent when we see
660    * this much packet reordering.
661    *
662    * When -1 is used, the value will be estimated based on observed packet
663    * reordering. When 0 is used packet reordering alone will not cause a
664    * retransmission event (Since 1.10).
665    *
666    * Since: 1.2
667    */
668   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
669       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
670           "Sending retransmission event when this much reordering "
671           "(0 disable, -1 automatic)",
672           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
673           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
674   /**
675    * GstRtpJitterBuffer::rtx-retry-timeout:
676    *
677    * When no packet has been received after sending a retransmission event
678    * for this time, retry sending a retransmission event.
679    *
680    * When -1 is used, the value will be estimated based on observed round
681    * trip time.
682    *
683    * Since: 1.2
684    */
685   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
686       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
687           "Retry sending a transmission event after this timeout in "
688           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
689           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
690   /**
691    * GstRtpJitterBuffer::rtx-min-retry-timeout:
692    *
693    * The minimum amount of time between retry timeouts. When
694    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
695    * minimum interval between retry timeouts.
696    *
697    * When -1 is used, the value will be estimated based on the
698    * packet spacing.
699    *
700    * Since: 1.6
701    */
702   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
703       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
704           "Minimum timeout between sending a transmission event in "
705           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
706           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
707   /**
708    * GstRtpJitterBuffer:rtx-retry-period:
709    *
710    * The amount of time to try to get a retransmission.
711    *
712    * When -1 is used, the value will be estimated based on the jitterbuffer
713    * latency and the observed round trip time.
714    *
715    * Since: 1.2
716    */
717   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
718       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
719           "Try to get a retransmission for this many ms "
720           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
721           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
722   /**
723    * GstRtpJitterBuffer:rtx-max-retries:
724    *
725    * The maximum number of retries to request a retransmission.
726    *
727    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
728    * When -1 is used, the number of retransmission request will not be limited.
729    *
730    * Since: 1.6
731    */
732   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
733       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
734           "The maximum number of retries to request a retransmission. "
735           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
736           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
737   /**
738    * GstRtpJitterBuffer:rtx-deadline:
739    *
740    * The deadline for a valid RTX request in ms.
741    *
742    * How long the RTX RTCP will be valid for.
743    * When -1 is used, the size of the jitterbuffer will be used.
744    *
745    * Since: 1.10
746    */
747   g_object_class_install_property (gobject_class, PROP_RTX_DEADLINE,
748       g_param_spec_int ("rtx-deadline", "RTX Deadline (ms)",
749           "The deadline for a valid RTX request in milliseconds. "
750           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DEADLINE,
751           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
752 /**
753    * GstRtpJitterBuffer::rtx-stats-timeout:
754    *
755    * The time to wait for a retransmitted packet after it has been
756    * considered lost in order to collect RTX statistics.
757    *
758    * Since: 1.10
759    */
760   g_object_class_install_property (gobject_class, PROP_RTX_STATS_TIMEOUT,
761       g_param_spec_uint ("rtx-stats-timeout", "RTX Statistics Timeout",
762           "The time to wait for a retransmitted packet after it has been "
763           "considered lost in order to collect statistics (ms)",
764           0, G_MAXUINT, DEFAULT_RTX_STATS_TIMEOUT,
765           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
766
767   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
768       g_param_spec_uint ("max-dropout-time", "Max dropout time",
769           "The maximum time (milliseconds) of missing packets tolerated.",
770           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
771           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
772
773   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
774       g_param_spec_uint ("max-misorder-time", "Max misorder time",
775           "The maximum time (milliseconds) of misordered packets tolerated.",
776           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
777           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
778   /**
779    * GstRtpJitterBuffer:stats:
780    *
781    * Various jitterbuffer statistics. This property returns a GstStructure
782    * with name application/x-rtp-jitterbuffer-stats with the following fields:
783    *
784    * <itemizedlist>
785    * <listitem>
786    *   <para>
787    *   #guint64
788    *   <classname>&quot;num-pushed&quot;</classname>:
789    *   the number of packets pushed out.
790    *   </para>
791    * </listitem>
792    * <listitem>
793    *   <para>
794    *   #guint64
795    *   <classname>&quot;num-lost&quot;</classname>:
796    *   the number of packets considered lost.
797    *   </para>
798    * </listitem>
799    * <listitem>
800    *   <para>
801    *   #guint64
802    *   <classname>&quot;num-late&quot;</classname>:
803    *   the number of packets arriving too late.
804    *   </para>
805    * </listitem>
806    * <listitem>
807    *   <para>
808    *   #guint64
809    *   <classname>&quot;num-duplicates&quot;</classname>:
810    *   the number of duplicate packets.
811    *   </para>
812    * </listitem>
813    * <listitem>
814    *   <para>
815    *   #guint64
816    *   <classname>&quot;rtx-count&quot;</classname>:
817    *   the number of retransmissions requested.
818    *   </para>
819    * </listitem>
820    * <listitem>
821    *   <para>
822    *   #guint64
823    *   <classname>&quot;rtx-success-count&quot;</classname>:
824    *   the number of successful retransmissions.
825    *   </para>
826    * </listitem>
827    * <listitem>
828    *   <para>
829    *   #gdouble
830    *   <classname>&quot;rtx-per-packet&quot;</classname>:
831    *   average number of RTX per packet.
832    *   </para>
833    * </listitem>
834    * <listitem>
835    *   <para>
836    *   #guint64
837    *   <classname>&quot;rtx-rtt&quot;</classname>:
838    *   average round trip time per RTX.
839    *   </para>
840    * </listitem>
841    * </itemizedlist>
842    *
843    * Since: 1.4
844    */
845   g_object_class_install_property (gobject_class, PROP_STATS,
846       g_param_spec_boxed ("stats", "Statistics",
847           "Various statistics", GST_TYPE_STRUCTURE,
848           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
849
850   /**
851    * GstRtpJitterBuffer:max-rtcp-rtp-time-diff
852    *
853    * The maximum amount of time in ms that the RTP time in the RTCP SRs
854    * is allowed to be ahead of the last RTP packet we received. Use
855    * -1 to disable ignoring of RTCP packets.
856    *
857    * Since: 1.8
858    */
859   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
860       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
861           "Maximum amount of time in ms that the RTP time in RTCP SRs "
862           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
863           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
864           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
865
866   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
867       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
868           "Synchronize received streams to the RFC7273 clock "
869           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
870           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
871
872   /**
873    * GstRtpJitterBuffer:faststart-min-packets
874    *
875    * The number of consecutive packets needed to start (set to 0 to
876    * disable faststart. The jitterbuffer will by default start after the
877    * latency has elapsed)
878    *
879    * Since: 1.14
880    */
881   g_object_class_install_property (gobject_class, PROP_FASTSTART_MIN_PACKETS,
882       g_param_spec_uint ("faststart-min-packets", "Faststart minimum packets",
883           "The number of consecutive packets needed to start (set to 0 to "
884           "disable faststart. The jitterbuffer will by default start after "
885           "the latency has elapsed)",
886           0, G_MAXUINT, DEFAULT_FASTSTART_MIN_PACKETS,
887           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
888
889   /**
890    * GstRtpJitterBuffer::request-pt-map:
891    * @buffer: the object which received the signal
892    * @pt: the pt
893    *
894    * Request the payload type as #GstCaps for @pt.
895    */
896   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
897       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
898       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
899           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
900       GST_TYPE_CAPS, 1, G_TYPE_UINT);
901   /**
902    * GstRtpJitterBuffer::handle-sync:
903    * @buffer: the object which received the signal
904    * @struct: a GstStructure containing sync values.
905    *
906    * Be notified of new sync values.
907    */
908   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
909       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
910       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
911           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
912       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
913
914   /**
915    * GstRtpJitterBuffer::on-npt-stop:
916    * @buffer: the object which received the signal
917    *
918    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
919    * the npt-stop position.
920    */
921   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
922       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
923       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
924           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
925       G_TYPE_NONE, 0, G_TYPE_NONE);
926
927   /**
928    * GstRtpJitterBuffer::clear-pt-map:
929    * @buffer: the object which received the signal
930    *
931    * Invalidate the clock-rate as obtained with the
932    * #GstRtpJitterBuffer::request-pt-map signal.
933    */
934   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
935       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
936       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
937       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
938       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
939
940   /**
941    * GstRtpJitterBuffer::set-active:
942    * @buffer: the object which received the signal
943    *
944    * Start pushing out packets with the given base time. This signal is only
945    * useful in buffering mode.
946    *
947    * Returns: the time of the last pushed packet.
948    */
949   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
950       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
951       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
952       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
953       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
954       G_TYPE_UINT64);
955
956   gstelement_class->change_state =
957       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
958   gstelement_class->request_new_pad =
959       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
960   gstelement_class->release_pad =
961       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
962   gstelement_class->provide_clock =
963       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
964   gstelement_class->set_clock =
965       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
966
967   gst_element_class_add_static_pad_template (gstelement_class,
968       &gst_rtp_jitter_buffer_src_template);
969   gst_element_class_add_static_pad_template (gstelement_class,
970       &gst_rtp_jitter_buffer_sink_template);
971   gst_element_class_add_static_pad_template (gstelement_class,
972       &gst_rtp_jitter_buffer_sink_rtcp_template);
973
974   gst_element_class_set_static_metadata (gstelement_class,
975       "RTP packet jitter-buffer", "Filter/Network/RTP",
976       "A buffer that deals with network jitter and other transmission faults",
977       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
978       "Wim Taymans <wim.taymans@gmail.com>");
979
980   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
981   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
982
983   GST_DEBUG_CATEGORY_INIT
984       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
985 }
986
987 static void
988 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
989 {
990   GstRtpJitterBufferPrivate *priv;
991
992   priv = gst_rtp_jitter_buffer_get_instance_private (jitterbuffer);
993   jitterbuffer->priv = priv;
994
995   priv->latency_ms = DEFAULT_LATENCY_MS;
996   priv->latency_ns = priv->latency_ms * GST_MSECOND;
997   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
998   priv->ts_offset = DEFAULT_TS_OFFSET;
999   priv->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
1000   priv->do_lost = DEFAULT_DO_LOST;
1001   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
1002   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
1003   priv->rtx_delay = DEFAULT_RTX_DELAY;
1004   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
1005   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
1006   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
1007   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
1008   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
1009   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
1010   priv->rtx_deadline_ms = DEFAULT_RTX_DEADLINE;
1011   priv->rtx_stats_timeout = DEFAULT_RTX_STATS_TIMEOUT;
1012   priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
1013   priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
1014   priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
1015   priv->faststart_min_packets = DEFAULT_FASTSTART_MIN_PACKETS;
1016
1017   priv->ts_offset_remainder = 0;
1018   priv->last_dts = -1;
1019   priv->last_pts = -1;
1020   priv->last_rtptime = -1;
1021   priv->avg_jitter = 0;
1022   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
1023   priv->rtx_stats_timers = timer_queue_new ();
1024   priv->jbuf = rtp_jitter_buffer_new ();
1025   g_mutex_init (&priv->jbuf_lock);
1026   g_cond_init (&priv->jbuf_timer);
1027   g_cond_init (&priv->jbuf_event);
1028   g_cond_init (&priv->jbuf_query);
1029   g_queue_init (&priv->gap_packets);
1030   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1031
1032   /* reset skew detection initialy */
1033   rtp_jitter_buffer_reset_skew (priv->jbuf);
1034   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
1035   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1036   priv->active = TRUE;
1037
1038   priv->srcpad =
1039       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
1040       "src");
1041
1042   gst_pad_set_activatemode_function (priv->srcpad,
1043       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
1044   gst_pad_set_query_function (priv->srcpad,
1045       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
1046   gst_pad_set_event_function (priv->srcpad,
1047       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
1048
1049   priv->sinkpad =
1050       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
1051       "sink");
1052
1053   gst_pad_set_chain_function (priv->sinkpad,
1054       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
1055   gst_pad_set_chain_list_function (priv->sinkpad,
1056       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain_list));
1057   gst_pad_set_event_function (priv->sinkpad,
1058       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
1059   gst_pad_set_query_function (priv->sinkpad,
1060       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
1061
1062   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
1063   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
1064
1065   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
1066 }
1067
1068 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
1069
1070 #define ITEM_TYPE_BUFFER        0
1071 #define ITEM_TYPE_LOST          1
1072 #define ITEM_TYPE_EVENT         2
1073 #define ITEM_TYPE_QUERY         3
1074
1075 static RTPJitterBufferItem *
1076 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
1077     guint seqnum, guint count, guint rtptime)
1078 {
1079   RTPJitterBufferItem *item;
1080
1081   item = g_slice_new (RTPJitterBufferItem);
1082   item->data = data;
1083   item->next = NULL;
1084   item->prev = NULL;
1085   item->type = type;
1086   item->dts = dts;
1087   item->pts = pts;
1088   item->seqnum = seqnum;
1089   item->count = count;
1090   item->rtptime = rtptime;
1091
1092   return item;
1093 }
1094
1095 static void
1096 free_item (RTPJitterBufferItem * item)
1097 {
1098   g_return_if_fail (item != NULL);
1099
1100   if (item->data && item->type != ITEM_TYPE_QUERY)
1101     gst_mini_object_unref (item->data);
1102   g_slice_free (RTPJitterBufferItem, item);
1103 }
1104
1105 static void
1106 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
1107 {
1108   GList **l = user_data;
1109
1110   if (item->data && item->type == ITEM_TYPE_EVENT
1111       && GST_EVENT_IS_STICKY (item->data)) {
1112     *l = g_list_prepend (*l, item->data);
1113   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
1114     gst_mini_object_unref (item->data);
1115   }
1116   g_slice_free (RTPJitterBufferItem, item);
1117 }
1118
1119 static void
1120 gst_rtp_jitter_buffer_finalize (GObject * object)
1121 {
1122   GstRtpJitterBuffer *jitterbuffer;
1123   GstRtpJitterBufferPrivate *priv;
1124
1125   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1126   priv = jitterbuffer->priv;
1127
1128   g_array_free (priv->timers, TRUE);
1129   timer_queue_free (priv->rtx_stats_timers);
1130   g_mutex_clear (&priv->jbuf_lock);
1131   g_cond_clear (&priv->jbuf_timer);
1132   g_cond_clear (&priv->jbuf_event);
1133   g_cond_clear (&priv->jbuf_query);
1134
1135   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1136   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1137   g_queue_clear (&priv->gap_packets);
1138   g_object_unref (priv->jbuf);
1139
1140   G_OBJECT_CLASS (parent_class)->finalize (object);
1141 }
1142
1143 static GstIterator *
1144 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
1145 {
1146   GstRtpJitterBuffer *jitterbuffer;
1147   GstPad *otherpad = NULL;
1148   GstIterator *it = NULL;
1149   GValue val = { 0, };
1150
1151   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1152
1153   if (pad == jitterbuffer->priv->sinkpad) {
1154     otherpad = jitterbuffer->priv->srcpad;
1155   } else if (pad == jitterbuffer->priv->srcpad) {
1156     otherpad = jitterbuffer->priv->sinkpad;
1157   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
1158     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1159   }
1160
1161   if (it == NULL) {
1162     g_value_init (&val, GST_TYPE_PAD);
1163     g_value_set_object (&val, otherpad);
1164     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1165     g_value_unset (&val);
1166   }
1167
1168   return it;
1169 }
1170
1171 static GstPad *
1172 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1173 {
1174   GstRtpJitterBufferPrivate *priv;
1175
1176   priv = jitterbuffer->priv;
1177
1178   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
1179
1180   priv->rtcpsinkpad =
1181       gst_pad_new_from_static_template
1182       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
1183   gst_pad_set_chain_function (priv->rtcpsinkpad,
1184       gst_rtp_jitter_buffer_chain_rtcp);
1185   gst_pad_set_event_function (priv->rtcpsinkpad,
1186       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
1187   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
1188       gst_rtp_jitter_buffer_iterate_internal_links);
1189   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
1190   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1191
1192   return priv->rtcpsinkpad;
1193 }
1194
1195 static void
1196 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1197 {
1198   GstRtpJitterBufferPrivate *priv;
1199
1200   priv = jitterbuffer->priv;
1201
1202   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
1203
1204   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
1205
1206   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1207   priv->rtcpsinkpad = NULL;
1208 }
1209
1210 static GstPad *
1211 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
1212     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
1213 {
1214   GstRtpJitterBuffer *jitterbuffer;
1215   GstElementClass *klass;
1216   GstPad *result;
1217   GstRtpJitterBufferPrivate *priv;
1218
1219   g_return_val_if_fail (templ != NULL, NULL);
1220   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
1221
1222   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1223   priv = jitterbuffer->priv;
1224   klass = GST_ELEMENT_GET_CLASS (element);
1225
1226   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1227
1228   /* figure out the template */
1229   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1230     if (priv->rtcpsinkpad != NULL)
1231       goto exists;
1232
1233     result = create_rtcp_sink (jitterbuffer);
1234   } else
1235     goto wrong_template;
1236
1237   return result;
1238
1239   /* ERRORS */
1240 wrong_template:
1241   {
1242     g_warning ("rtpjitterbuffer: this is not our template");
1243     return NULL;
1244   }
1245 exists:
1246   {
1247     g_warning ("rtpjitterbuffer: pad already requested");
1248     return NULL;
1249   }
1250 }
1251
1252 static void
1253 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1254 {
1255   GstRtpJitterBuffer *jitterbuffer;
1256   GstRtpJitterBufferPrivate *priv;
1257
1258   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1259   g_return_if_fail (GST_IS_PAD (pad));
1260
1261   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1262   priv = jitterbuffer->priv;
1263
1264   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1265
1266   if (priv->rtcpsinkpad == pad) {
1267     remove_rtcp_sink (jitterbuffer);
1268   } else
1269     goto wrong_pad;
1270
1271   return;
1272
1273   /* ERRORS */
1274 wrong_pad:
1275   {
1276     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1277     return;
1278   }
1279 }
1280
1281 static GstClock *
1282 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1283 {
1284   return gst_system_clock_obtain ();
1285 }
1286
1287 static gboolean
1288 gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
1289 {
1290   GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1291
1292   rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
1293
1294   return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
1295 }
1296
1297 static void
1298 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1299 {
1300   GstRtpJitterBufferPrivate *priv;
1301
1302   priv = jitterbuffer->priv;
1303
1304   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1305
1306   JBUF_LOCK (priv);
1307   priv->clock_rate = -1;
1308   /* do not clear current content, but refresh state for new arrival */
1309   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1310   rtp_jitter_buffer_reset_skew (priv->jbuf);
1311   JBUF_UNLOCK (priv);
1312 }
1313
1314 static GstClockTime
1315 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1316     guint64 offset)
1317 {
1318   GstRtpJitterBufferPrivate *priv;
1319   GstClockTime last_out;
1320   RTPJitterBufferItem *item;
1321
1322   priv = jbuf->priv;
1323
1324   JBUF_LOCK (priv);
1325   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1326       active, GST_TIME_ARGS (offset));
1327
1328   if (active != priv->active) {
1329     /* add the amount of time spent in paused to the output offset. All
1330      * outgoing buffers will have this offset applied to their timestamps in
1331      * order to make them arrive in time in the sink. */
1332     priv->out_offset = offset;
1333     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1334         GST_TIME_ARGS (priv->out_offset));
1335     priv->active = active;
1336     JBUF_SIGNAL_EVENT (priv);
1337   }
1338   if (!active) {
1339     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1340   }
1341   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1342     /* head buffer timestamp and offset gives our output time */
1343     last_out = item->pts + priv->ts_offset;
1344   } else {
1345     /* use last known time when the buffer is empty */
1346     last_out = priv->last_out_time;
1347   }
1348   JBUF_UNLOCK (priv);
1349
1350   return last_out;
1351 }
1352
1353 static GstCaps *
1354 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1355 {
1356   GstRtpJitterBuffer *jitterbuffer;
1357   GstRtpJitterBufferPrivate *priv;
1358   GstPad *other;
1359   GstCaps *caps;
1360   GstCaps *templ;
1361
1362   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1363   priv = jitterbuffer->priv;
1364
1365   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1366
1367   caps = gst_pad_peer_query_caps (other, filter);
1368
1369   templ = gst_pad_get_pad_template_caps (pad);
1370   if (caps == NULL) {
1371     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1372     caps = templ;
1373   } else {
1374     GstCaps *intersect;
1375
1376     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1377
1378     intersect = gst_caps_intersect (caps, templ);
1379     gst_caps_unref (caps);
1380     gst_caps_unref (templ);
1381
1382     caps = intersect;
1383   }
1384   gst_object_unref (jitterbuffer);
1385
1386   return caps;
1387 }
1388
1389 /*
1390  * Must be called with JBUF_LOCK held
1391  */
1392
1393 static gboolean
1394 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1395     GstCaps * caps, gint pt)
1396 {
1397   GstRtpJitterBufferPrivate *priv;
1398   GstStructure *caps_struct;
1399   guint val;
1400   gint payload = -1;
1401   GstClockTime tval;
1402   const gchar *ts_refclk, *mediaclk;
1403
1404   priv = jitterbuffer->priv;
1405
1406   /* first parse the caps */
1407   caps_struct = gst_caps_get_structure (caps, 0);
1408
1409   GST_DEBUG_OBJECT (jitterbuffer, "got caps %" GST_PTR_FORMAT, caps);
1410
1411   if (gst_structure_get_int (caps_struct, "payload", &payload) && pt != -1
1412       && payload != pt) {
1413     GST_ERROR_OBJECT (jitterbuffer,
1414         "Got caps with wrong payload type (got %d, expected %d)", pt, payload);
1415     return FALSE;
1416   }
1417
1418   if (payload != -1) {
1419     GST_DEBUG_OBJECT (jitterbuffer, "Got payload type %d", payload);
1420     priv->last_pt = payload;
1421   }
1422
1423   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1424    * measure the amount of data in the buffer */
1425   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1426     goto error;
1427
1428   if (priv->clock_rate <= 0)
1429     goto wrong_rate;
1430
1431   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1432
1433   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1434
1435   gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
1436
1437   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1438    * can use this to track the amount of time elapsed on the sender. */
1439   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1440     priv->clock_base = val;
1441   else
1442     priv->clock_base = -1;
1443
1444   priv->ext_timestamp = priv->clock_base;
1445
1446   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1447       priv->clock_base);
1448
1449   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1450     /* first expected seqnum, only update when we didn't have a previous base. */
1451     if (priv->next_in_seqnum == -1)
1452       priv->next_in_seqnum = val;
1453     if (priv->next_seqnum == -1) {
1454       priv->next_seqnum = val;
1455       JBUF_SIGNAL_EVENT (priv);
1456     }
1457     priv->seqnum_base = val;
1458   } else {
1459     priv->seqnum_base = -1;
1460   }
1461
1462   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1463
1464   /* the start and stop times. The seqnum-base corresponds to the start time. We
1465    * will keep track of the seqnums on the output and when we reach the one
1466    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1467   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1468     priv->npt_start = tval;
1469   else
1470     priv->npt_start = 0;
1471
1472   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1473     priv->npt_stop = tval;
1474   else
1475     priv->npt_stop = -1;
1476
1477   GST_DEBUG_OBJECT (jitterbuffer,
1478       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1479       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1480
1481   if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
1482     GstClock *clock = NULL;
1483     guint64 clock_offset = -1;
1484
1485     GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
1486         ts_refclk);
1487
1488     if (g_str_has_prefix (ts_refclk, "ntp=")) {
1489       if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
1490         GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
1491       } else {
1492         const gchar *host, *portstr;
1493         gchar *hostname;
1494         guint port;
1495
1496         host = ts_refclk + sizeof ("ntp=") - 1;
1497         if (host[0] == '[') {
1498           /* IPv6 */
1499           portstr = strchr (host, ']');
1500           if (portstr && portstr[1] == ':')
1501             portstr = portstr + 1;
1502           else
1503             portstr = NULL;
1504         } else {
1505           portstr = strrchr (host, ':');
1506         }
1507
1508
1509         if (!portstr || sscanf (portstr, ":%u", &port) != 1)
1510           port = 123;
1511
1512         if (portstr)
1513           hostname = g_strndup (host, (portstr - host));
1514         else
1515           hostname = g_strdup (host);
1516
1517         clock = gst_ntp_clock_new (NULL, hostname, port, 0);
1518         g_free (hostname);
1519       }
1520     } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
1521       const gchar *domainstr =
1522           ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
1523       guint domain;
1524
1525       if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
1526         domain = 0;
1527
1528       clock = gst_ptp_clock_new (NULL, domain);
1529     } else {
1530       GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
1531     }
1532
1533     if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
1534       GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
1535
1536       if (!g_str_has_prefix (mediaclk, "direct=")
1537           || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
1538         GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
1539       if (strstr (mediaclk, "rate=") != NULL) {
1540         GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
1541         clock_offset = -1;
1542       }
1543     }
1544
1545     rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
1546   } else {
1547     rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
1548   }
1549
1550   return TRUE;
1551
1552   /* ERRORS */
1553 error:
1554   {
1555     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1556     return FALSE;
1557   }
1558 wrong_rate:
1559   {
1560     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1561     return FALSE;
1562   }
1563 }
1564
1565 static void
1566 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1567 {
1568   GstRtpJitterBufferPrivate *priv;
1569
1570   priv = jitterbuffer->priv;
1571
1572   JBUF_LOCK (priv);
1573   /* mark ourselves as flushing */
1574   priv->srcresult = GST_FLOW_FLUSHING;
1575   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1576   /* this unblocks any waiting pops on the src pad task */
1577   JBUF_SIGNAL_EVENT (priv);
1578   JBUF_SIGNAL_QUERY (priv, FALSE);
1579   JBUF_UNLOCK (priv);
1580 }
1581
1582 static void
1583 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1584 {
1585   GstRtpJitterBufferPrivate *priv;
1586
1587   priv = jitterbuffer->priv;
1588
1589   JBUF_LOCK (priv);
1590   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1591   /* Mark as non flushing */
1592   priv->srcresult = GST_FLOW_OK;
1593   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1594   priv->last_popped_seqnum = -1;
1595   priv->last_out_time = GST_CLOCK_TIME_NONE;
1596   priv->next_seqnum = -1;
1597   priv->seqnum_base = -1;
1598   priv->ips_rtptime = -1;
1599   priv->ips_pts = GST_CLOCK_TIME_NONE;
1600   priv->packet_spacing = 0;
1601   priv->next_in_seqnum = -1;
1602   priv->clock_rate = -1;
1603   priv->last_pt = -1;
1604   priv->eos = FALSE;
1605   priv->estimated_eos = -1;
1606   priv->last_elapsed = 0;
1607   priv->ext_timestamp = -1;
1608   priv->avg_jitter = 0;
1609   priv->last_dts = -1;
1610   priv->last_rtptime = -1;
1611   priv->last_in_pts = 0;
1612   priv->equidistant = 0;
1613   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1614   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1615   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1616   rtp_jitter_buffer_reset_skew (priv->jbuf);
1617   remove_all_timers (jitterbuffer);
1618   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1619   g_queue_clear (&priv->gap_packets);
1620   JBUF_UNLOCK (priv);
1621 }
1622
1623 static gboolean
1624 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1625     GstPadMode mode, gboolean active)
1626 {
1627   gboolean result;
1628   GstRtpJitterBuffer *jitterbuffer = NULL;
1629
1630   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1631
1632   switch (mode) {
1633     case GST_PAD_MODE_PUSH:
1634       if (active) {
1635         /* allow data processing */
1636         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1637
1638         /* start pushing out buffers */
1639         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1640         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1641             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1642       } else {
1643         /* make sure all data processing stops ASAP */
1644         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1645
1646         /* NOTE this will hardlock if the state change is called from the src pad
1647          * task thread because we will _join() the thread. */
1648         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1649         result = gst_pad_stop_task (pad);
1650       }
1651       break;
1652     default:
1653       result = FALSE;
1654       break;
1655   }
1656   return result;
1657 }
1658
1659 static GstStateChangeReturn
1660 gst_rtp_jitter_buffer_change_state (GstElement * element,
1661     GstStateChange transition)
1662 {
1663   GstRtpJitterBuffer *jitterbuffer;
1664   GstRtpJitterBufferPrivate *priv;
1665   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1666
1667   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1668   priv = jitterbuffer->priv;
1669
1670   switch (transition) {
1671     case GST_STATE_CHANGE_NULL_TO_READY:
1672       break;
1673     case GST_STATE_CHANGE_READY_TO_PAUSED:
1674       JBUF_LOCK (priv);
1675       /* reset negotiated values */
1676       priv->clock_rate = -1;
1677       priv->clock_base = -1;
1678       priv->peer_latency = 0;
1679       priv->last_pt = -1;
1680       /* block until we go to PLAYING */
1681       priv->blocked = TRUE;
1682       priv->timer_running = TRUE;
1683       priv->timer_thread =
1684           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1685       JBUF_UNLOCK (priv);
1686       break;
1687     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1688       JBUF_LOCK (priv);
1689       /* unblock to allow streaming in PLAYING */
1690       priv->blocked = FALSE;
1691       JBUF_SIGNAL_EVENT (priv);
1692       JBUF_SIGNAL_TIMER (priv);
1693       JBUF_UNLOCK (priv);
1694       break;
1695     default:
1696       break;
1697   }
1698
1699   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1700
1701   switch (transition) {
1702     case GST_STATE_CHANGE_READY_TO_PAUSED:
1703       /* we are a live element because we sync to the clock, which we can only
1704        * do in the PLAYING state */
1705       if (ret != GST_STATE_CHANGE_FAILURE)
1706         ret = GST_STATE_CHANGE_NO_PREROLL;
1707       break;
1708     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1709       JBUF_LOCK (priv);
1710       /* block to stop streaming when PAUSED */
1711       priv->blocked = TRUE;
1712       unschedule_current_timer (jitterbuffer);
1713       JBUF_UNLOCK (priv);
1714       if (ret != GST_STATE_CHANGE_FAILURE)
1715         ret = GST_STATE_CHANGE_NO_PREROLL;
1716       break;
1717     case GST_STATE_CHANGE_PAUSED_TO_READY:
1718       JBUF_LOCK (priv);
1719       gst_buffer_replace (&priv->last_sr, NULL);
1720       priv->timer_running = FALSE;
1721       unschedule_current_timer (jitterbuffer);
1722       JBUF_SIGNAL_TIMER (priv);
1723       JBUF_SIGNAL_QUERY (priv, FALSE);
1724       JBUF_UNLOCK (priv);
1725       g_thread_join (priv->timer_thread);
1726       priv->timer_thread = NULL;
1727       break;
1728     case GST_STATE_CHANGE_READY_TO_NULL:
1729       break;
1730     default:
1731       break;
1732   }
1733
1734   return ret;
1735 }
1736
1737 static gboolean
1738 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1739     GstEvent * event)
1740 {
1741   gboolean ret = TRUE;
1742   GstRtpJitterBuffer *jitterbuffer;
1743   GstRtpJitterBufferPrivate *priv;
1744
1745   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1746   priv = jitterbuffer->priv;
1747
1748   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1749
1750   switch (GST_EVENT_TYPE (event)) {
1751     case GST_EVENT_LATENCY:
1752     {
1753       GstClockTime latency;
1754
1755       gst_event_parse_latency (event, &latency);
1756
1757       GST_DEBUG_OBJECT (jitterbuffer,
1758           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1759
1760       JBUF_LOCK (priv);
1761       /* adjust the overall buffer delay to the total pipeline latency in
1762        * buffering mode because if downstream consumes too fast (because of
1763        * large latency or queues, we would start rebuffering again. */
1764       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1765           RTP_JITTER_BUFFER_MODE_BUFFER) {
1766         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1767       }
1768       JBUF_UNLOCK (priv);
1769
1770       ret = gst_pad_push_event (priv->sinkpad, event);
1771       break;
1772     }
1773     default:
1774       ret = gst_pad_push_event (priv->sinkpad, event);
1775       break;
1776   }
1777
1778   return ret;
1779 }
1780
1781 /* handles and stores the event in the jitterbuffer, must be called with
1782  * LOCK */
1783 static gboolean
1784 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1785 {
1786   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1787   RTPJitterBufferItem *item;
1788   gboolean head;
1789
1790   switch (GST_EVENT_TYPE (event)) {
1791     case GST_EVENT_CAPS:
1792     {
1793       GstCaps *caps;
1794
1795       gst_event_parse_caps (event, &caps);
1796       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, -1);
1797       break;
1798     }
1799     case GST_EVENT_SEGMENT:
1800     {
1801       GstSegment segment;
1802       gst_event_copy_segment (event, &segment);
1803
1804       /* we need time for now */
1805       if (segment.format != GST_FORMAT_TIME) {
1806         GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
1807         gst_event_unref (event);
1808
1809         gst_segment_init (&segment, GST_FORMAT_TIME);
1810         event = gst_event_new_segment (&segment);
1811       }
1812
1813       priv->segment = segment;
1814       break;
1815     }
1816     case GST_EVENT_EOS:
1817       priv->eos = TRUE;
1818       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1819       break;
1820     default:
1821       break;
1822   }
1823
1824
1825   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1826   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1827   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1828   if (head)
1829     JBUF_SIGNAL_EVENT (priv);
1830
1831   return TRUE;
1832 }
1833
1834 static gboolean
1835 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1836     GstEvent * event)
1837 {
1838   gboolean ret = TRUE;
1839   GstRtpJitterBuffer *jitterbuffer;
1840   GstRtpJitterBufferPrivate *priv;
1841
1842   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1843   priv = jitterbuffer->priv;
1844
1845   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1846
1847   switch (GST_EVENT_TYPE (event)) {
1848     case GST_EVENT_FLUSH_START:
1849       ret = gst_pad_push_event (priv->srcpad, event);
1850       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1851       /* wait for the loop to go into PAUSED */
1852       gst_pad_pause_task (priv->srcpad);
1853       break;
1854     case GST_EVENT_FLUSH_STOP:
1855       ret = gst_pad_push_event (priv->srcpad, event);
1856       ret =
1857           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1858           GST_PAD_MODE_PUSH, TRUE);
1859       break;
1860     default:
1861       if (GST_EVENT_IS_SERIALIZED (event)) {
1862         /* serialized events go in the queue */
1863         JBUF_LOCK (priv);
1864         if (priv->srcresult != GST_FLOW_OK) {
1865           /* Errors in sticky event pushing are no problem and ignored here
1866            * as they will cause more meaningful errors during data flow.
1867            * For EOS events, that are not followed by data flow, we still
1868            * return FALSE here though.
1869            */
1870           if (!GST_EVENT_IS_STICKY (event) ||
1871               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1872             goto out_flow_error;
1873         }
1874         /* refuse more events on EOS */
1875         if (priv->eos)
1876           goto out_eos;
1877         ret = queue_event (jitterbuffer, event);
1878         JBUF_UNLOCK (priv);
1879       } else {
1880         /* non-serialized events are forwarded downstream immediately */
1881         ret = gst_pad_push_event (priv->srcpad, event);
1882       }
1883       break;
1884   }
1885   return ret;
1886
1887   /* ERRORS */
1888 out_flow_error:
1889   {
1890     GST_DEBUG_OBJECT (jitterbuffer,
1891         "refusing event, we have a downstream flow error: %s",
1892         gst_flow_get_name (priv->srcresult));
1893     JBUF_UNLOCK (priv);
1894     gst_event_unref (event);
1895     return FALSE;
1896   }
1897 out_eos:
1898   {
1899     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1900     JBUF_UNLOCK (priv);
1901     gst_event_unref (event);
1902     return FALSE;
1903   }
1904 }
1905
1906 static gboolean
1907 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1908     GstEvent * event)
1909 {
1910   gboolean ret = TRUE;
1911   GstRtpJitterBuffer *jitterbuffer;
1912
1913   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1914
1915   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1916
1917   switch (GST_EVENT_TYPE (event)) {
1918     case GST_EVENT_FLUSH_START:
1919       gst_event_unref (event);
1920       break;
1921     case GST_EVENT_FLUSH_STOP:
1922       gst_event_unref (event);
1923       break;
1924     default:
1925       ret = gst_pad_event_default (pad, parent, event);
1926       break;
1927   }
1928
1929   return ret;
1930 }
1931
1932 /*
1933  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1934  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1935  * GST_FLOW_FLUSHING when the element is shutting down. On success
1936  * GST_FLOW_OK is returned.
1937  */
1938 static GstFlowReturn
1939 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1940     guint8 pt)
1941 {
1942   GValue ret = { 0 };
1943   GValue args[2] = { {0}, {0} };
1944   GstCaps *caps;
1945   gboolean res;
1946
1947   g_value_init (&args[0], GST_TYPE_ELEMENT);
1948   g_value_set_object (&args[0], jitterbuffer);
1949   g_value_init (&args[1], G_TYPE_UINT);
1950   g_value_set_uint (&args[1], pt);
1951
1952   g_value_init (&ret, GST_TYPE_CAPS);
1953   g_value_set_boxed (&ret, NULL);
1954
1955   JBUF_UNLOCK (jitterbuffer->priv);
1956   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1957       &ret);
1958   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1959
1960   g_value_unset (&args[0]);
1961   g_value_unset (&args[1]);
1962   caps = (GstCaps *) g_value_dup_boxed (&ret);
1963   g_value_unset (&ret);
1964   if (!caps)
1965     goto no_caps;
1966
1967   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
1968   gst_caps_unref (caps);
1969
1970   if (G_UNLIKELY (!res))
1971     goto parse_failed;
1972
1973   return GST_FLOW_OK;
1974
1975   /* ERRORS */
1976 no_caps:
1977   {
1978     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1979     return GST_FLOW_ERROR;
1980   }
1981 out_flushing:
1982   {
1983     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1984     return GST_FLOW_FLUSHING;
1985   }
1986 parse_failed:
1987   {
1988     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1989     return GST_FLOW_ERROR;
1990   }
1991 }
1992
1993 /* call with jbuf lock held */
1994 static GstMessage *
1995 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1996 {
1997   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1998   GstMessage *message = NULL;
1999
2000   if (percent == -1)
2001     return NULL;
2002
2003   /* Post a buffering message */
2004   if (priv->last_percent != percent) {
2005     priv->last_percent = percent;
2006     message =
2007         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
2008     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
2009   }
2010
2011   return message;
2012 }
2013
2014 static void
2015 update_offset (GstRtpJitterBuffer * jitterbuffer)
2016 {
2017   GstRtpJitterBufferPrivate *priv;
2018
2019   priv = jitterbuffer->priv;
2020
2021   if (priv->ts_offset_remainder != 0) {
2022     GST_DEBUG ("adjustment %" G_GUINT64_FORMAT " remain %" G_GINT64_FORMAT
2023         " off %" G_GINT64_FORMAT, priv->max_ts_offset_adjustment,
2024         priv->ts_offset_remainder, priv->ts_offset);
2025     if (ABS (priv->ts_offset_remainder) > priv->max_ts_offset_adjustment) {
2026       if (priv->ts_offset_remainder > 0) {
2027         priv->ts_offset += priv->max_ts_offset_adjustment;
2028         priv->ts_offset_remainder -= priv->max_ts_offset_adjustment;
2029       } else {
2030         priv->ts_offset -= priv->max_ts_offset_adjustment;
2031         priv->ts_offset_remainder += priv->max_ts_offset_adjustment;
2032       }
2033     } else {
2034       priv->ts_offset += priv->ts_offset_remainder;
2035       priv->ts_offset_remainder = 0;
2036     }
2037   }
2038 }
2039
2040 static GstClockTime
2041 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
2042 {
2043   GstRtpJitterBufferPrivate *priv;
2044
2045   priv = jitterbuffer->priv;
2046
2047   if (timestamp == -1)
2048     return -1;
2049
2050   /* apply the timestamp offset, this is used for inter stream sync */
2051   timestamp += priv->ts_offset;
2052   /* add the offset, this is used when buffering */
2053   timestamp += priv->out_offset;
2054
2055   return timestamp;
2056 }
2057
2058 static TimerQueue *
2059 timer_queue_new (void)
2060 {
2061   TimerQueue *queue;
2062
2063   queue = g_slice_new (TimerQueue);
2064   queue->timers = g_queue_new ();
2065   queue->hashtable = g_hash_table_new (NULL, NULL);
2066
2067   return queue;
2068 }
2069
2070 static void
2071 timer_queue_free (TimerQueue * queue)
2072 {
2073   if (!queue)
2074     return;
2075
2076   g_hash_table_destroy (queue->hashtable);
2077   g_queue_free_full (queue->timers, g_free);
2078   g_slice_free (TimerQueue, queue);
2079 }
2080
2081 static void
2082 timer_queue_append (TimerQueue * queue, const TimerData * timer,
2083     GstClockTime timeout, gboolean lost)
2084 {
2085   TimerData *copy;
2086
2087   copy = g_memdup (timer, sizeof (*timer));
2088   copy->timeout = timeout;
2089   copy->type = lost ? TIMER_TYPE_LOST : TIMER_TYPE_EXPECTED;
2090   copy->idx = -1;
2091
2092   GST_LOG ("Append rtx-stats timer #%d, %" GST_TIME_FORMAT,
2093       copy->seqnum, GST_TIME_ARGS (copy->timeout));
2094   g_queue_push_tail (queue->timers, copy);
2095   g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (copy->seqnum), copy);
2096 }
2097
2098 static void
2099 timer_queue_clear_until (TimerQueue * queue, GstClockTime timeout)
2100 {
2101   TimerData *test;
2102
2103   test = g_queue_peek_head (queue->timers);
2104   while (test && test->timeout < timeout) {
2105     GST_LOG ("Pop rtx-stats timer #%d, %" GST_TIME_FORMAT " < %"
2106         GST_TIME_FORMAT, test->seqnum, GST_TIME_ARGS (test->timeout),
2107         GST_TIME_ARGS (timeout));
2108     g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (test->seqnum));
2109     g_free (g_queue_pop_head (queue->timers));
2110     test = g_queue_peek_head (queue->timers);
2111   }
2112 }
2113
2114 static TimerData *
2115 timer_queue_find (TimerQueue * queue, guint16 seqnum)
2116 {
2117   return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum));
2118 }
2119
2120 static TimerData *
2121 find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2122 {
2123   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2124   TimerData *timer = NULL;
2125   gint i, len;
2126
2127   len = priv->timers->len;
2128   for (i = 0; i < len; i++) {
2129     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2130     if (test->seqnum == seqnum) {
2131       timer = test;
2132       break;
2133     }
2134   }
2135   return timer;
2136 }
2137
2138 static void
2139 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
2140 {
2141   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2142
2143   if (priv->clock_id) {
2144     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
2145     gst_clock_id_unschedule (priv->clock_id);
2146     priv->clock_id = NULL;
2147   }
2148 }
2149
2150 static GstClockTime
2151 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2152 {
2153   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2154   GstClockTime test_timeout;
2155
2156   if ((test_timeout = timer->timeout) == -1)
2157     return -1;
2158
2159   if (timer->type != TIMER_TYPE_EXPECTED) {
2160     /* add our latency and offset to get output times. */
2161     test_timeout = apply_offset (jitterbuffer, test_timeout);
2162     test_timeout += priv->latency_ns;
2163   }
2164   return test_timeout;
2165 }
2166
2167 static void
2168 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2169 {
2170   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2171
2172   if (priv->clock_id) {
2173     GstClockTime timeout = get_timeout (jitterbuffer, timer);
2174
2175     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
2176         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
2177
2178     if (timeout == -1 || timeout < priv->timer_timeout)
2179       unschedule_current_timer (jitterbuffer);
2180   }
2181 }
2182
2183 static TimerData *
2184 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2185     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
2186     GstClockTime duration)
2187 {
2188   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2189   TimerData *timer;
2190   gint len;
2191
2192   GST_DEBUG_OBJECT (jitterbuffer,
2193       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
2194       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
2195       GST_TIME_ARGS (delay));
2196
2197   len = priv->timers->len;
2198   g_array_set_size (priv->timers, len + 1);
2199   timer = &g_array_index (priv->timers, TimerData, len);
2200   timer->idx = len;
2201   timer->type = type;
2202   timer->seqnum = seqnum;
2203   timer->num = num;
2204   timer->timeout = timeout + delay;
2205   timer->duration = duration;
2206   if (type == TIMER_TYPE_EXPECTED) {
2207     timer->rtx_base = timeout;
2208     timer->rtx_delay = delay;
2209     timer->rtx_retry = 0;
2210   }
2211   timer->rtx_last = GST_CLOCK_TIME_NONE;
2212   timer->num_rtx_retry = 0;
2213   timer->num_rtx_received = 0;
2214   recalculate_timer (jitterbuffer, timer);
2215   JBUF_SIGNAL_TIMER (priv);
2216
2217   return timer;
2218 }
2219
2220 static void
2221 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2222     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
2223 {
2224   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2225   gboolean seqchange, timechange;
2226   guint16 oldseq;
2227   GstClockTime new_timeout;
2228
2229   oldseq = timer->seqnum;
2230   new_timeout = timeout + delay;
2231   seqchange = oldseq != seqnum;
2232   timechange = timer->timeout != new_timeout;
2233
2234   if (!seqchange && !timechange) {
2235     GST_DEBUG_OBJECT (jitterbuffer,
2236         "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT
2237         "), skipping", oldseq, GST_TIME_ARGS (timer->timeout));
2238     return;
2239   }
2240
2241   GST_DEBUG_OBJECT (jitterbuffer,
2242       "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT
2243       "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum,
2244       GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout));
2245
2246   timer->timeout = new_timeout;
2247   timer->seqnum = seqnum;
2248   if (reset) {
2249     GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT
2250         "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay),
2251         GST_TIME_ARGS (delay));
2252     timer->rtx_base = timeout;
2253     timer->rtx_delay = delay;
2254     timer->rtx_retry = 0;
2255   }
2256   if (seqchange) {
2257     timer->num_rtx_retry = 0;
2258     timer->num_rtx_received = 0;
2259   }
2260
2261   if (priv->clock_id) {
2262     /* we changed the seqnum and there is a timer currently waiting with this
2263      * seqnum, unschedule it */
2264     if (seqchange && priv->timer_seqnum == oldseq)
2265       unschedule_current_timer (jitterbuffer);
2266     /* we changed the time, check if it is earlier than what we are waiting
2267      * for and unschedule if so */
2268     else if (timechange)
2269       recalculate_timer (jitterbuffer, timer);
2270   }
2271 }
2272
2273 static TimerData *
2274 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2275     guint16 seqnum, GstClockTime timeout)
2276 {
2277   TimerData *timer;
2278
2279   /* find the seqnum timer */
2280   timer = find_timer (jitterbuffer, seqnum);
2281   if (timer == NULL) {
2282     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
2283   } else {
2284     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
2285   }
2286   return timer;
2287 }
2288
2289 static void
2290 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2291 {
2292   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2293   guint idx;
2294
2295   if (timer->idx == -1)
2296     return;
2297
2298   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
2299     unschedule_current_timer (jitterbuffer);
2300
2301   idx = timer->idx;
2302   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
2303   g_array_remove_index_fast (priv->timers, idx);
2304   timer->idx = idx;
2305 }
2306
2307 static void
2308 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
2309 {
2310   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2311   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
2312   g_array_set_size (priv->timers, 0);
2313   unschedule_current_timer (jitterbuffer);
2314 }
2315
2316 /* get the extra delay to wait before sending RTX */
2317 static GstClockTime
2318 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
2319 {
2320   GstClockTime delay;
2321
2322   if (priv->rtx_delay == -1) {
2323     if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
2324       delay = DEFAULT_AUTO_RTX_DELAY;
2325     } else {
2326       /* jitter is in nanoseconds, maximum of 2x jitter and half the
2327        * packet spacing is a good margin */
2328       delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
2329     }
2330   } else {
2331     delay = priv->rtx_delay * GST_MSECOND;
2332   }
2333   if (priv->rtx_min_delay > 0)
2334     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
2335
2336   return delay;
2337 }
2338
2339 /* Check if packet with seqnum is already considered definitely lost by being
2340  * part of a "lost timer" for multiple packets */
2341 static gboolean
2342 already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2343 {
2344   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2345   gint i, len;
2346
2347   len = priv->timers->len;
2348   for (i = 0; i < len; i++) {
2349     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2350     gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2351
2352     if (test->num > 1 && test->type == TIMER_TYPE_LOST && gap >= 0 &&
2353         gap < test->num) {
2354       GST_DEBUG ("seqnum #%d already considered definitely lost (#%d->#%d)",
2355           seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff);
2356       return TRUE;
2357     }
2358   }
2359
2360   return FALSE;
2361 }
2362
2363 /* we just received a packet with seqnum and dts.
2364  *
2365  * First check for old seqnum that we are still expecting. If the gap with the
2366  * current seqnum is too big, unschedule the timeouts.
2367  *
2368  * If we have a valid packet spacing estimate we can set a timer for when we
2369  * should receive the next packet.
2370  * If we don't have a valid estimate, we remove any timer we might have
2371  * had for this packet.
2372  */
2373 static void
2374 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
2375     GstClockTime dts, GstClockTime pts, gboolean do_next_seqnum,
2376     gboolean is_rtx, TimerData * timer)
2377 {
2378   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2379
2380   /* go through all timers and unschedule the ones with a large gap */
2381   if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
2382     gint i, len;
2383     len = priv->timers->len;
2384     for (i = 0; i < len; i++) {
2385       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2386       gint gap;
2387
2388       gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2389
2390       GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
2391           test->type, test->seqnum, seqnum, gap);
2392
2393       if (gap > priv->rtx_delay_reorder) {
2394         /* max gap, we exceeded the max reorder distance and we don't expect the
2395          * missing packet to be this reordered */
2396         if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
2397           reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
2398       }
2399     }
2400   }
2401
2402   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
2403       && priv->do_retransmission && priv->rtx_next_seqnum;
2404
2405   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2406     if (timer->num_rtx_retry > 0) {
2407       if (is_rtx) {
2408         update_rtx_stats (jitterbuffer, timer, dts, TRUE);
2409         /* don't try to estimate the next seqnum because this is a retransmitted
2410          * packet and it probably did not arrive with the expected packet
2411          * spacing. */
2412         do_next_seqnum = FALSE;
2413       }
2414
2415       if (!is_rtx || timer->num_rtx_retry > 1) {
2416         /* Store timer in order to record stats when/if the retransmitted
2417          * packet arrives. We should also store timer information if we've
2418          * requested retransmission more than once since we may receive
2419          * several retransmitted packets. For accuracy we should update the
2420          * stats also when the redundant retransmitted packets arrives. */
2421         timer_queue_append (priv->rtx_stats_timers, timer,
2422             pts + priv->rtx_stats_timeout * GST_MSECOND, FALSE);
2423       }
2424     }
2425   }
2426
2427   if (do_next_seqnum && pts != GST_CLOCK_TIME_NONE) {
2428     GstClockTime expected, delay;
2429
2430     /* calculate expected arrival time of the next seqnum */
2431     expected = pts + priv->packet_spacing;
2432
2433     delay = get_rtx_delay (priv);
2434
2435     /* and update/install timer for next seqnum */
2436     GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer #%d, expected %"
2437         GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", packet-spacing %"
2438         GST_TIME_FORMAT ", jitter %" GST_TIME_FORMAT, priv->next_in_seqnum,
2439         GST_TIME_ARGS (expected), GST_TIME_ARGS (delay),
2440         GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter));
2441
2442     if (timer) {
2443       timer->type = TIMER_TYPE_EXPECTED;
2444       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
2445           delay, TRUE);
2446     } else {
2447       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
2448           expected, delay, priv->packet_spacing);
2449     }
2450   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2451     /* if we had a timer, remove it, we don't know when to expect the next
2452      * packet. */
2453     remove_timer (jitterbuffer, timer);
2454   }
2455 }
2456
2457 static void
2458 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2459     GstClockTime pts)
2460 {
2461   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2462
2463   /* we need consecutive seqnums with a different
2464    * rtptime to estimate the packet spacing. */
2465   if (priv->ips_rtptime != rtptime) {
2466     /* rtptime changed, check pts diff */
2467     if (priv->ips_pts != -1 && pts != -1 && pts > priv->ips_pts) {
2468       GstClockTime new_packet_spacing = pts - priv->ips_pts;
2469       GstClockTime old_packet_spacing = priv->packet_spacing;
2470
2471       /* Biased towards bigger packet spacings to prevent
2472        * too many unneeded retransmission requests for next
2473        * packets that just arrive a little later than we would
2474        * expect */
2475       if (old_packet_spacing > new_packet_spacing)
2476         priv->packet_spacing =
2477             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2478       else if (old_packet_spacing > 0)
2479         priv->packet_spacing =
2480             (3 * new_packet_spacing + old_packet_spacing) / 4;
2481       else
2482         priv->packet_spacing = new_packet_spacing;
2483
2484       GST_DEBUG_OBJECT (jitterbuffer,
2485           "new packet spacing %" GST_TIME_FORMAT
2486           " old packet spacing %" GST_TIME_FORMAT
2487           " combined to %" GST_TIME_FORMAT,
2488           GST_TIME_ARGS (new_packet_spacing),
2489           GST_TIME_ARGS (old_packet_spacing),
2490           GST_TIME_ARGS (priv->packet_spacing));
2491     }
2492     priv->ips_rtptime = rtptime;
2493     priv->ips_pts = pts;
2494   }
2495 }
2496
2497 static void
2498 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2499     guint16 seqnum, GstClockTime pts, gint gap)
2500 {
2501   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2502   GstClockTime duration, expected_pts, delay;
2503   TimerType type;
2504   gboolean equidistant = priv->equidistant > 0;
2505
2506   GST_DEBUG_OBJECT (jitterbuffer,
2507       "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2508       GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts));
2509
2510   if (pts == GST_CLOCK_TIME_NONE) {
2511     GST_WARNING_OBJECT (jitterbuffer, "Have no PTS");
2512     return;
2513   }
2514
2515   if (equidistant) {
2516     GstClockTime total_duration;
2517     /* the total duration spanned by the missing packets */
2518     if (pts >= priv->last_in_pts)
2519       total_duration = pts - priv->last_in_pts;
2520     else
2521       total_duration = 0;
2522
2523     /* interpolate between the current time and the last time based on
2524      * number of packets we are missing, this is the estimated duration
2525      * for the missing packet based on equidistant packet spacing. */
2526     duration = total_duration / (gap + 1);
2527
2528     GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2529         GST_TIME_ARGS (duration));
2530
2531     if (total_duration > priv->latency_ns) {
2532       GstClockTime gap_time;
2533       guint lost_packets;
2534
2535       if (duration > 0) {
2536         GstClockTime gap_dur = gap * duration;
2537         if (gap_dur > priv->latency_ns)
2538           gap_time = gap_dur - priv->latency_ns;
2539         else
2540           gap_time = 0;
2541         lost_packets = gap_time / duration;
2542       } else {
2543         gap_time = total_duration - priv->latency_ns;
2544         lost_packets = gap;
2545       }
2546
2547       /* too many lost packets, some of the missing packets are already
2548        * too late and we can generate lost packet events for them. */
2549       GST_INFO_OBJECT (jitterbuffer,
2550           "lost packets (%d, #%d->#%d) duration too large %" GST_TIME_FORMAT
2551           " > %" GST_TIME_FORMAT ", consider %u lost (%" GST_TIME_FORMAT ")",
2552           gap, expected, seqnum - 1, GST_TIME_ARGS (total_duration),
2553           GST_TIME_ARGS (priv->latency_ns), lost_packets,
2554           GST_TIME_ARGS (gap_time));
2555
2556       /* this timer will fire immediately and the lost event will be pushed from
2557        * the timer thread */
2558       if (lost_packets > 0) {
2559         add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2560             priv->last_in_pts + duration, 0, gap_time);
2561         expected += lost_packets;
2562         priv->last_in_pts += gap_time;
2563       }
2564     }
2565
2566     expected_pts = priv->last_in_pts + duration;
2567   } else {
2568     /* If we cannot assume equidistant packet spacing, the only thing we now
2569      * for sure is that the missing packets have expected pts not later than
2570      * the last received pts. */
2571     duration = 0;
2572     expected_pts = pts;
2573   }
2574
2575   delay = 0;
2576
2577   if (priv->do_retransmission) {
2578     TimerData *timer = find_timer (jitterbuffer, expected);
2579
2580     type = TIMER_TYPE_EXPECTED;
2581     delay = get_rtx_delay (priv);
2582
2583     /* if we had a timer for the first missing packet, update it. */
2584     if (timer && timer->type == TIMER_TYPE_EXPECTED) {
2585       GstClockTime timeout = timer->timeout;
2586
2587       timer->duration = duration;
2588       if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) {
2589         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts,
2590             delay, TRUE);
2591       }
2592       expected++;
2593       expected_pts += duration;
2594     }
2595   } else {
2596     type = TIMER_TYPE_LOST;
2597   }
2598
2599   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2600     add_timer (jitterbuffer, type, expected, 0, expected_pts, delay, duration);
2601     expected_pts += duration;
2602     expected++;
2603   }
2604 }
2605
2606 static void
2607 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2608     guint32 rtptime)
2609 {
2610   gint32 rtpdiff;
2611   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2612   GstRtpJitterBufferPrivate *priv;
2613
2614   priv = jitterbuffer->priv;
2615
2616   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2617     goto no_time;
2618
2619   if (priv->last_dts != -1)
2620     dtsdiff = dts - priv->last_dts;
2621   else
2622     dtsdiff = 0;
2623
2624   if (priv->last_rtptime != -1)
2625     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2626   else
2627     rtpdiff = 0;
2628
2629   /* Guess whether stream currently uses equidistant packet spacing. If we
2630    * often see identical timestamps it means the packets are not
2631    * equidistant. */
2632   if (rtptime == priv->last_rtptime)
2633     priv->equidistant -= 2;
2634   else
2635     priv->equidistant += 1;
2636   priv->equidistant = CLAMP (priv->equidistant, -7, 7);
2637
2638   priv->last_dts = dts;
2639   priv->last_rtptime = rtptime;
2640
2641   if (rtpdiff > 0)
2642     rtpdiffns =
2643         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2644   else
2645     rtpdiffns =
2646         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2647
2648   diff = ABS (dtsdiff - rtpdiffns);
2649
2650   /* jitter is stored in nanoseconds */
2651   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2652
2653   GST_LOG_OBJECT (jitterbuffer,
2654       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2655       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2656       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2657       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2658
2659   return;
2660
2661   /* ERRORS */
2662 no_time:
2663   {
2664     GST_DEBUG_OBJECT (jitterbuffer,
2665         "no dts or no clock-rate, can't calculate jitter");
2666     return;
2667   }
2668 }
2669
2670 static gint
2671 compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
2672 {
2673   GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
2674   GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
2675   guint seq_a, seq_b;
2676
2677   gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
2678   seq_a = gst_rtp_buffer_get_seq (&rtp_a);
2679   gst_rtp_buffer_unmap (&rtp_a);
2680
2681   gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
2682   seq_b = gst_rtp_buffer_get_seq (&rtp_b);
2683   gst_rtp_buffer_unmap (&rtp_b);
2684
2685   return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
2686 }
2687
2688 static gboolean
2689 handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, GstBuffer * buffer,
2690     guint8 pt, guint16 seqnum, gint gap, guint max_dropout, guint max_misorder)
2691 {
2692   GstRtpJitterBufferPrivate *priv;
2693   guint gap_packets_length;
2694   gboolean reset = FALSE;
2695   gboolean future = gap > 0;
2696
2697   priv = jitterbuffer->priv;
2698
2699   if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
2700     GList *l;
2701     guint32 prev_gap_seq = -1;
2702     gboolean all_consecutive = TRUE;
2703
2704     g_queue_insert_sorted (&priv->gap_packets, buffer,
2705         (GCompareDataFunc) compare_buffer_seqnum, NULL);
2706
2707     for (l = priv->gap_packets.head; l; l = l->next) {
2708       GstBuffer *gap_buffer = l->data;
2709       GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2710       guint32 gap_seq;
2711
2712       gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2713
2714       all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
2715
2716       gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
2717       if (prev_gap_seq == -1)
2718         prev_gap_seq = gap_seq;
2719       else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
2720         all_consecutive = FALSE;
2721       else
2722         prev_gap_seq = gap_seq;
2723
2724       gst_rtp_buffer_unmap (&gap_rtp);
2725       if (!all_consecutive)
2726         break;
2727     }
2728
2729     if (all_consecutive && gap_packets_length > 3) {
2730       GST_DEBUG_OBJECT (jitterbuffer,
2731           "buffer too %s %d < %d, got 5 consecutive ones - reset",
2732           (future ? "new" : "old"), gap,
2733           (future ? max_dropout : -max_misorder));
2734       reset = TRUE;
2735     } else if (!all_consecutive) {
2736       g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2737       g_queue_clear (&priv->gap_packets);
2738       GST_DEBUG_OBJECT (jitterbuffer,
2739           "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
2740           (future ? "new" : "old"), gap,
2741           (future ? max_dropout : -max_misorder));
2742       buffer = NULL;
2743     } else {
2744       GST_DEBUG_OBJECT (jitterbuffer,
2745           "buffer too %s %d < %d, got %u consecutive ones - waiting",
2746           (future ? "new" : "old"), gap,
2747           (future ? max_dropout : -max_misorder), gap_packets_length + 1);
2748       buffer = NULL;
2749     }
2750   } else {
2751     GST_DEBUG_OBJECT (jitterbuffer,
2752         "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
2753         gap, -max_misorder);
2754     g_queue_push_tail (&priv->gap_packets, buffer);
2755     buffer = NULL;
2756   }
2757
2758   return reset;
2759 }
2760
2761 static GstClockTime
2762 get_current_running_time (GstRtpJitterBuffer * jitterbuffer)
2763 {
2764   GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer));
2765   GstClockTime running_time = GST_CLOCK_TIME_NONE;
2766
2767   if (clock) {
2768     GstClockTime base_time =
2769         gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer));
2770     GstClockTime clock_time = gst_clock_get_time (clock);
2771
2772     if (clock_time > base_time)
2773       running_time = clock_time - base_time;
2774     else
2775       running_time = 0;
2776
2777     gst_object_unref (clock);
2778   }
2779
2780   return running_time;
2781 }
2782
2783 static GstFlowReturn
2784 gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
2785     GstPad * pad, GstObject * parent, guint16 seqnum)
2786 {
2787   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2788   GstFlowReturn ret = GST_FLOW_OK;
2789   GList *events = NULL, *l;
2790   GList *buffers;
2791   gboolean head;
2792
2793   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2794   rtp_jitter_buffer_flush (priv->jbuf,
2795       (GFunc) free_item_and_retain_events, &events);
2796   rtp_jitter_buffer_reset_skew (priv->jbuf);
2797   remove_all_timers (jitterbuffer);
2798   priv->discont = TRUE;
2799   priv->last_popped_seqnum = -1;
2800
2801   if (priv->gap_packets.head) {
2802     GstBuffer *gap_buffer = priv->gap_packets.head->data;
2803     GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2804
2805     gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2806     priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp);
2807     gst_rtp_buffer_unmap (&gap_rtp);
2808   } else {
2809     priv->next_seqnum = seqnum;
2810   }
2811
2812   priv->last_in_pts = -1;
2813   priv->next_in_seqnum = -1;
2814
2815   /* Insert all sticky events again in order, otherwise we would
2816    * potentially loose STREAM_START, CAPS or SEGMENT events
2817    */
2818   events = g_list_reverse (events);
2819   for (l = events; l; l = l->next) {
2820     RTPJitterBufferItem *item;
2821
2822     item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2823     rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2824   }
2825   g_list_free (events);
2826
2827   JBUF_SIGNAL_EVENT (priv);
2828
2829   /* reset spacing estimation when gap */
2830   priv->ips_rtptime = -1;
2831   priv->ips_pts = GST_CLOCK_TIME_NONE;
2832
2833   buffers = g_list_copy (priv->gap_packets.head);
2834   g_queue_clear (&priv->gap_packets);
2835
2836   priv->ips_rtptime = -1;
2837   priv->ips_pts = GST_CLOCK_TIME_NONE;
2838   JBUF_UNLOCK (jitterbuffer->priv);
2839
2840   for (l = buffers; l; l = l->next) {
2841     ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
2842     l->data = NULL;
2843     if (ret != GST_FLOW_OK) {
2844       l = l->next;
2845       break;
2846     }
2847   }
2848   for (; l; l = l->next)
2849     gst_buffer_unref (l->data);
2850   g_list_free (buffers);
2851
2852   return ret;
2853 }
2854
2855 static gboolean
2856 gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer)
2857 {
2858   GstRtpJitterBufferPrivate *priv;
2859   RTPJitterBufferItem *item;
2860   TimerData *timer;
2861
2862   priv = jitterbuffer->priv;
2863
2864   if (priv->faststart_min_packets == 0)
2865     return FALSE;
2866
2867   item = rtp_jitter_buffer_peek (priv->jbuf);
2868   if (!item)
2869     return FALSE;
2870
2871   timer = find_timer (jitterbuffer, item->seqnum);
2872   if (!timer || timer->type != TIMER_TYPE_DEADLINE)
2873     return FALSE;
2874
2875   if (rtp_jitter_buffer_can_fast_start (priv->jbuf,
2876           priv->faststart_min_packets)) {
2877     GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
2878         priv->faststart_min_packets);
2879     timer->timeout = -1;
2880     return TRUE;
2881   }
2882
2883   return FALSE;
2884 }
2885
2886 static GstFlowReturn
2887 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2888     GstBuffer * buffer)
2889 {
2890   GstRtpJitterBuffer *jitterbuffer;
2891   GstRtpJitterBufferPrivate *priv;
2892   guint16 seqnum;
2893   guint32 expected, rtptime;
2894   GstFlowReturn ret = GST_FLOW_OK;
2895   GstClockTime dts, pts;
2896   guint64 latency_ts;
2897   gboolean head;
2898   gint percent = -1;
2899   guint8 pt;
2900   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2901   gboolean do_next_seqnum = FALSE;
2902   RTPJitterBufferItem *item;
2903   GstMessage *msg = NULL;
2904   gboolean estimated_dts = FALSE;
2905   gint32 packet_rate, max_dropout, max_misorder;
2906   TimerData *timer = NULL;
2907
2908   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2909
2910   priv = jitterbuffer->priv;
2911
2912   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2913     goto invalid_buffer;
2914
2915   pt = gst_rtp_buffer_get_payload_type (&rtp);
2916   seqnum = gst_rtp_buffer_get_seq (&rtp);
2917   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2918   gst_rtp_buffer_unmap (&rtp);
2919
2920   /* make sure we have PTS and DTS set */
2921   pts = GST_BUFFER_PTS (buffer);
2922   dts = GST_BUFFER_DTS (buffer);
2923   if (dts == -1)
2924     dts = pts;
2925   else if (pts == -1)
2926     pts = dts;
2927
2928   if (dts == -1) {
2929     /* If we have no DTS here, i.e. no capture time, get one from the
2930      * clock now to have something to calculate with in the future. */
2931     dts = get_current_running_time (jitterbuffer);
2932     pts = dts;
2933
2934     /* Remember that we estimated the DTS if we are running already
2935      * and this is not our first packet (or first packet after a reset).
2936      * If it's the first packet, we somehow must generate a timestamp for
2937      * everything, otherwise we can't calculate any times
2938      */
2939     estimated_dts = (priv->next_in_seqnum != -1);
2940   } else {
2941     /* take the DTS of the buffer. This is the time when the packet was
2942      * received and is used to calculate jitter and clock skew. We will adjust
2943      * this DTS with the smoothed value after processing it in the
2944      * jitterbuffer and assign it as the PTS. */
2945     /* bring to running time */
2946     dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2947   }
2948
2949   GST_DEBUG_OBJECT (jitterbuffer,
2950       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
2951       seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer),
2952       GST_BUFFER_IS_RETRANSMISSION (buffer));
2953
2954   JBUF_LOCK_CHECK (priv, out_flushing);
2955
2956   if (G_UNLIKELY (priv->last_pt != pt)) {
2957     GstCaps *caps;
2958
2959     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2960         pt);
2961
2962     priv->last_pt = pt;
2963     /* reset clock-rate so that we get a new one */
2964     priv->clock_rate = -1;
2965
2966     /* Try to get the clock-rate from the caps first if we can. If there are no
2967      * caps we must fire the signal to get the clock-rate. */
2968     if ((caps = gst_pad_get_current_caps (pad))) {
2969       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
2970       gst_caps_unref (caps);
2971     }
2972   }
2973
2974   if (G_UNLIKELY (priv->clock_rate == -1)) {
2975     /* no clock rate given on the caps, try to get one with the signal */
2976     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2977             pt) == GST_FLOW_FLUSHING)
2978       goto out_flushing;
2979
2980     if (G_UNLIKELY (priv->clock_rate == -1))
2981       goto no_clock_rate;
2982
2983     gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
2984   }
2985
2986   /* don't accept more data on EOS */
2987   if (G_UNLIKELY (priv->eos))
2988     goto have_eos;
2989
2990   if (!GST_BUFFER_IS_RETRANSMISSION (buffer))
2991     calculate_jitter (jitterbuffer, dts, rtptime);
2992
2993   if (priv->seqnum_base != -1) {
2994     gint gap;
2995
2996     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2997
2998     if (gap < 0) {
2999       GST_DEBUG_OBJECT (jitterbuffer,
3000           "packet seqnum #%d before seqnum-base #%d", seqnum,
3001           priv->seqnum_base);
3002       gst_buffer_unref (buffer);
3003       goto finished;
3004     } else if (gap > 16384) {
3005       /* From now on don't compare against the seqnum base anymore as
3006        * at some point in the future we will wrap around and also that
3007        * much reordering is very unlikely */
3008       priv->seqnum_base = -1;
3009     }
3010   }
3011
3012   expected = priv->next_in_seqnum;
3013
3014   packet_rate =
3015       gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx, seqnum, rtptime);
3016   max_dropout =
3017       gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
3018       priv->max_dropout_time);
3019   max_misorder =
3020       gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
3021       priv->max_misorder_time);
3022   GST_TRACE_OBJECT (jitterbuffer,
3023       "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
3024       max_dropout, max_misorder);
3025
3026   /* now check against our expected seqnum */
3027   if (G_UNLIKELY (expected == -1)) {
3028     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3029
3030     /* calculate a pts based on rtptime and arrival time (dts) */
3031     pts =
3032         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3033         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
3034
3035     /* we don't know what the next_in_seqnum should be, wait for the last
3036      * possible moment to push this buffer, maybe we get an earlier seqnum
3037      * while we wait */
3038     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts);
3039
3040     do_next_seqnum = TRUE;
3041     /* take rtptime and pts to calculate packet spacing */
3042     priv->ips_rtptime = rtptime;
3043     priv->ips_pts = pts;
3044
3045   } else {
3046     gint gap;
3047     /* now calculate gap */
3048     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
3049     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
3050         expected, seqnum, gap);
3051
3052     if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) {
3053       /* If we have timers for more than RTP_MAX_DROPOUT packets
3054        * pending this means that we have a huge gap overall. We can
3055        * reset the jitterbuffer at this point because there's
3056        * just too much data missing to be able to do anything
3057        * sensible with the past data. Just try again from the
3058        * next packet */
3059       GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
3060           priv->timers->len, max_dropout);
3061       gst_buffer_unref (buffer);
3062       return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3063     }
3064
3065     /* Special handling of large gaps */
3066     if ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout)) {
3067       gboolean reset = handle_big_gap_buffer (jitterbuffer, buffer, pt, seqnum,
3068           gap, max_dropout, max_misorder);
3069       if (reset) {
3070         return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3071       } else {
3072         GST_DEBUG_OBJECT (jitterbuffer,
3073             "Had big gap, waiting for more consecutive packets");
3074         goto finished;
3075       }
3076     }
3077
3078     /* We had no huge gap, let's drop all the gap packets */
3079     GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
3080     g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3081     g_queue_clear (&priv->gap_packets);
3082
3083     /* calculate a pts based on rtptime and arrival time (dts) */
3084     /* If we estimated the DTS, don't consider it in the clock skew calculations */
3085     pts =
3086         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3087         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
3088
3089     if (G_LIKELY (gap == 0)) {
3090       /* packet is expected */
3091       calculate_packet_spacing (jitterbuffer, rtptime, pts);
3092       do_next_seqnum = TRUE;
3093     } else {
3094
3095       /* we have a gap */
3096       if (gap > 0) {
3097         GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
3098         /* fill in the gap with EXPECTED timers */
3099         calculate_expected (jitterbuffer, expected, seqnum, pts, gap);
3100         do_next_seqnum = TRUE;
3101       } else {
3102         GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
3103         do_next_seqnum = FALSE;
3104       }
3105
3106       /* reset spacing estimation when gap */
3107       priv->ips_rtptime = -1;
3108       priv->ips_pts = GST_CLOCK_TIME_NONE;
3109     }
3110   }
3111
3112   if (do_next_seqnum) {
3113     priv->last_in_pts = pts;
3114     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
3115   }
3116
3117   timer = find_timer (jitterbuffer, seqnum);
3118   if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
3119     if (!timer)
3120       timer = timer_queue_find (priv->rtx_stats_timers, seqnum);
3121     if (timer)
3122       timer->num_rtx_received++;
3123   }
3124
3125   /* let's check if this buffer is too late, we can only accept packets with
3126    * bigger seqnum than the one we last pushed. */
3127   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
3128     gint gap;
3129
3130     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
3131
3132     /* priv->last_popped_seqnum >= seqnum, we're too late. */
3133     if (G_UNLIKELY (gap <= 0)) {
3134       if (priv->do_retransmission) {
3135         if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) {
3136           update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3137           /* Only count the retranmitted packet too late if it has been
3138            * considered lost. If the original packet arrived before the
3139            * retransmitted we just count it as a duplicate. */
3140           if (timer->type != TIMER_TYPE_LOST)
3141             goto rtx_duplicate;
3142         }
3143       }
3144       goto too_late;
3145     }
3146   }
3147
3148   if (already_lost (jitterbuffer, seqnum))
3149     goto already_lost;
3150
3151   /* let's drop oldest packet if the queue is already full and drop-on-latency
3152    * is set. We can only do this when there actually is a latency. When no
3153    * latency is set, we just pump it in the queue and let the other end push it
3154    * out as fast as possible. */
3155   if (priv->latency_ms && priv->drop_on_latency) {
3156     latency_ts =
3157         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
3158
3159     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
3160       RTPJitterBufferItem *old_item;
3161
3162       old_item = rtp_jitter_buffer_peek (priv->jbuf);
3163
3164       if (IS_DROPABLE (old_item)) {
3165         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3166         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
3167             old_item);
3168         priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
3169         free_item (old_item);
3170       }
3171       /* we might have removed some head buffers, signal the pushing thread to
3172        * see if it can push now */
3173       JBUF_SIGNAL_EVENT (priv);
3174     }
3175   }
3176
3177   /* If we estimated the DTS, don't consider it in the clock skew calculations
3178    * later. The code above always sets dts to pts or the other way around if
3179    * any of those is valid in the buffer, so we know that if we estimated the
3180    * dts that both are unknown */
3181   if (estimated_dts)
3182     item =
3183         alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE,
3184         pts, seqnum, 1, rtptime);
3185   else
3186     item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
3187
3188   /* now insert the packet into the queue in sorted order. This function returns
3189    * FALSE if a packet with the same seqnum was already in the queue, meaning we
3190    * have a duplicate. */
3191   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, &head,
3192               &percent))) {
3193     if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer)
3194       update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3195     goto duplicate;
3196   }
3197
3198   /* Trigger fast start if needed */
3199   if (gst_rtp_jitter_buffer_fast_start (jitterbuffer))
3200     head = TRUE;
3201
3202   /* update timers */
3203   update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum,
3204       GST_BUFFER_IS_RETRANSMISSION (buffer), timer);
3205
3206   /* we had an unhandled SR, handle it now */
3207   if (priv->last_sr)
3208     do_handle_sync (jitterbuffer);
3209
3210   if (G_UNLIKELY (head)) {
3211     /* signal addition of new buffer when the _loop is waiting. */
3212     if (G_LIKELY (priv->active))
3213       JBUF_SIGNAL_EVENT (priv);
3214
3215     /* let's unschedule and unblock any waiting buffers. We only want to do this
3216      * when the head buffer changed */
3217     if (G_UNLIKELY (priv->clock_id)) {
3218       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
3219       unschedule_current_timer (jitterbuffer);
3220     }
3221   }
3222
3223   GST_DEBUG_OBJECT (jitterbuffer,
3224       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
3225       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
3226
3227   msg = check_buffering_percent (jitterbuffer, percent);
3228
3229 finished:
3230   JBUF_UNLOCK (priv);
3231
3232   if (msg)
3233     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3234
3235   return ret;
3236
3237   /* ERRORS */
3238 invalid_buffer:
3239   {
3240     /* this is not fatal but should be filtered earlier */
3241     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3242         ("Received invalid RTP payload, dropping"));
3243     gst_buffer_unref (buffer);
3244     return GST_FLOW_OK;
3245   }
3246 no_clock_rate:
3247   {
3248     GST_WARNING_OBJECT (jitterbuffer,
3249         "No clock-rate in caps!, dropping buffer");
3250     gst_buffer_unref (buffer);
3251     goto finished;
3252   }
3253 out_flushing:
3254   {
3255     ret = priv->srcresult;
3256     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
3257     gst_buffer_unref (buffer);
3258     goto finished;
3259   }
3260 have_eos:
3261   {
3262     ret = GST_FLOW_EOS;
3263     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
3264     gst_buffer_unref (buffer);
3265     goto finished;
3266   }
3267 too_late:
3268   {
3269     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
3270         " popped, dropping", seqnum, priv->last_popped_seqnum);
3271     priv->num_late++;
3272     gst_buffer_unref (buffer);
3273     goto finished;
3274   }
3275 already_lost:
3276   {
3277     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as it was already "
3278         "considered lost", seqnum);
3279     priv->num_late++;
3280     gst_buffer_unref (buffer);
3281     goto finished;
3282   }
3283 duplicate:
3284   {
3285     GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
3286         seqnum);
3287     priv->num_duplicates++;
3288     free_item (item);
3289     goto finished;
3290   }
3291 rtx_duplicate:
3292   {
3293     GST_DEBUG_OBJECT (jitterbuffer,
3294         "Duplicate RTX packet #%d detected, dropping", seqnum);
3295     priv->num_duplicates++;
3296     gst_buffer_unref (buffer);
3297     goto finished;
3298   }
3299 }
3300
3301 /* FIXME: hopefully we can do something more efficient here, especially when
3302  * all packets are in order and/or outside of the currently cached range.
3303  * Still worthwhile to have it, avoids taking/releasing object lock and pad
3304  * stream lock for every single buffer in the default chain_list fallback. */
3305 static GstFlowReturn
3306 gst_rtp_jitter_buffer_chain_list (GstPad * pad, GstObject * parent,
3307     GstBufferList * buffer_list)
3308 {
3309   GstFlowReturn flow_ret = GST_FLOW_OK;
3310   guint i, n;
3311
3312   n = gst_buffer_list_length (buffer_list);
3313   for (i = 0; i < n; ++i) {
3314     GstBuffer *buf = gst_buffer_list_get (buffer_list, i);
3315
3316     flow_ret = gst_rtp_jitter_buffer_chain (pad, parent, gst_buffer_ref (buf));
3317
3318     if (flow_ret != GST_FLOW_OK)
3319       break;
3320   }
3321   gst_buffer_list_unref (buffer_list);
3322
3323   return flow_ret;
3324 }
3325
3326 static GstClockTime
3327 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
3328 {
3329   guint64 ext_time, elapsed;
3330   guint32 rtp_time;
3331   GstRtpJitterBufferPrivate *priv;
3332
3333   priv = jitterbuffer->priv;
3334   rtp_time = item->rtptime;
3335
3336   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
3337       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
3338
3339   ext_time = priv->ext_timestamp;
3340   ext_time = gst_rtp_buffer_ext_timestamp (&ext_time, rtp_time);
3341   if (ext_time < priv->ext_timestamp) {
3342     ext_time = priv->ext_timestamp;
3343   } else {
3344     priv->ext_timestamp = ext_time;
3345   }
3346
3347   if (ext_time > priv->clock_base)
3348     elapsed = ext_time - priv->clock_base;
3349   else
3350     elapsed = 0;
3351
3352   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
3353   return elapsed;
3354 }
3355
3356 static void
3357 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
3358     RTPJitterBufferItem * item)
3359 {
3360   guint64 total, elapsed, left, estimated;
3361   GstClockTime out_time;
3362   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3363
3364   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
3365       || priv->clock_base == -1 || priv->clock_rate <= 0)
3366     return;
3367
3368   /* compute the elapsed time */
3369   elapsed = compute_elapsed (jitterbuffer, item);
3370
3371   /* do nothing if elapsed time doesn't increment */
3372   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
3373     return;
3374
3375   priv->last_elapsed = elapsed;
3376
3377   /* this is the total time we need to play */
3378   total = priv->npt_stop - priv->npt_start;
3379   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
3380       GST_TIME_ARGS (total));
3381
3382   /* this is how much time there is left */
3383   if (total > elapsed)
3384     left = total - elapsed;
3385   else
3386     left = 0;
3387
3388   /* if we have less time left that the size of the buffer, we will not
3389    * be able to keep it filled, disabled buffering then */
3390   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
3391     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
3392         ", disable buffering close to EOS", GST_TIME_ARGS (left));
3393     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
3394   }
3395
3396   /* this is the current time as running-time */
3397   out_time = item->pts;
3398
3399   if (elapsed > 0)
3400     estimated = gst_util_uint64_scale (out_time, total, elapsed);
3401   else {
3402     /* if there is almost nothing left,
3403      * we may never advance enough to end up in the above case */
3404     if (total < GST_SECOND)
3405       estimated = GST_SECOND;
3406     else
3407       estimated = -1;
3408   }
3409   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
3410       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
3411
3412   if (estimated != -1 && priv->estimated_eos != estimated) {
3413     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
3414     priv->estimated_eos = estimated;
3415   }
3416 }
3417
3418 /* take a buffer from the queue and push it */
3419 static GstFlowReturn
3420 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
3421 {
3422   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3423   GstFlowReturn result = GST_FLOW_OK;
3424   RTPJitterBufferItem *item;
3425   GstBuffer *outbuf = NULL;
3426   GstEvent *outevent = NULL;
3427   GstQuery *outquery = NULL;
3428   GstClockTime dts, pts;
3429   gint percent = -1;
3430   gboolean do_push = TRUE;
3431   guint type;
3432   GstMessage *msg;
3433
3434   /* when we get here we are ready to pop and push the buffer */
3435   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3436   type = item->type;
3437
3438   switch (type) {
3439     case ITEM_TYPE_BUFFER:
3440
3441       /* we need to make writable to change the flags and timestamps */
3442       outbuf = gst_buffer_make_writable (item->data);
3443
3444       if (G_UNLIKELY (priv->discont)) {
3445         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
3446          * into the jitterbuffer so we can modify now. */
3447         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
3448         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
3449         priv->discont = FALSE;
3450       }
3451       if (G_UNLIKELY (priv->ts_discont)) {
3452         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
3453         priv->ts_discont = FALSE;
3454       }
3455
3456       dts =
3457           gst_segment_position_from_running_time (&priv->segment,
3458           GST_FORMAT_TIME, item->dts);
3459       pts =
3460           gst_segment_position_from_running_time (&priv->segment,
3461           GST_FORMAT_TIME, item->pts);
3462
3463       /* if this is a new frame, check if ts_offset needs to be updated */
3464       if (pts != priv->last_pts) {
3465         update_offset (jitterbuffer);
3466       }
3467
3468       /* apply timestamp with offset to buffer now */
3469       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
3470       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
3471
3472       /* update the elapsed time when we need to check against the npt stop time. */
3473       update_estimated_eos (jitterbuffer, item);
3474
3475       priv->last_pts = pts;
3476       priv->last_out_time = GST_BUFFER_PTS (outbuf);
3477       break;
3478     case ITEM_TYPE_LOST:
3479       priv->discont = TRUE;
3480       if (!priv->do_lost)
3481         do_push = FALSE;
3482       /* FALLTHROUGH */
3483     case ITEM_TYPE_EVENT:
3484       outevent = item->data;
3485       break;
3486     case ITEM_TYPE_QUERY:
3487       outquery = item->data;
3488       break;
3489   }
3490
3491   /* now we are ready to push the buffer. Save the seqnum and release the lock
3492    * so the other end can push stuff in the queue again. */
3493   if (seqnum != -1) {
3494     priv->last_popped_seqnum = seqnum;
3495     priv->next_seqnum = (seqnum + item->count) & 0xffff;
3496   }
3497   msg = check_buffering_percent (jitterbuffer, percent);
3498   JBUF_UNLOCK (priv);
3499
3500   item->data = NULL;
3501   free_item (item);
3502
3503   if (msg)
3504     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3505
3506   switch (type) {
3507     case ITEM_TYPE_BUFFER:
3508       /* push buffer */
3509       GST_DEBUG_OBJECT (jitterbuffer,
3510           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
3511           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
3512           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
3513       priv->num_pushed++;
3514       result = gst_pad_push (priv->srcpad, outbuf);
3515
3516       JBUF_LOCK_CHECK (priv, out_flushing);
3517       break;
3518     case ITEM_TYPE_LOST:
3519     case ITEM_TYPE_EVENT:
3520       /* We got not enough consecutive packets with a huge gap, we can
3521        * as well just drop them here now on EOS */
3522       if (outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3523         GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
3524         g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3525         g_queue_clear (&priv->gap_packets);
3526       }
3527
3528       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
3529           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
3530
3531       if (do_push)
3532         gst_pad_push_event (priv->srcpad, outevent);
3533       else if (outevent)
3534         gst_event_unref (outevent);
3535
3536       result = GST_FLOW_OK;
3537
3538       JBUF_LOCK_CHECK (priv, out_flushing);
3539       break;
3540     case ITEM_TYPE_QUERY:
3541     {
3542       gboolean res;
3543
3544       res = gst_pad_peer_query (priv->srcpad, outquery);
3545
3546       JBUF_LOCK_CHECK (priv, out_flushing);
3547       result = GST_FLOW_OK;
3548       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
3549       JBUF_SIGNAL_QUERY (priv, res);
3550       break;
3551     }
3552   }
3553   return result;
3554
3555   /* ERRORS */
3556 out_flushing:
3557   {
3558     return priv->srcresult;
3559   }
3560 }
3561
3562 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
3563
3564 /* Peek a buffer and compare the seqnum to the expected seqnum.
3565  * If all is fine, the buffer is pushed.
3566  * If something is wrong, we wait for some event
3567  */
3568 static GstFlowReturn
3569 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
3570 {
3571   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3572   GstFlowReturn result;
3573   RTPJitterBufferItem *item;
3574   guint seqnum;
3575   guint32 next_seqnum;
3576
3577   /* only push buffers when PLAYING and active and not buffering */
3578   if (priv->blocked || !priv->active ||
3579       rtp_jitter_buffer_is_buffering (priv->jbuf)) {
3580     return GST_FLOW_WAIT;
3581   }
3582
3583   /* peek a buffer, we're just looking at the sequence number.
3584    * If all is fine, we'll pop and push it. If the sequence number is wrong we
3585    * wait for a timeout or something to change.
3586    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
3587   item = rtp_jitter_buffer_peek (priv->jbuf);
3588   if (item == NULL) {
3589     goto wait;
3590   }
3591
3592   /* get the seqnum and the next expected seqnum */
3593   seqnum = item->seqnum;
3594   if (seqnum == -1) {
3595     return pop_and_push_next (jitterbuffer, seqnum);
3596   }
3597
3598   next_seqnum = priv->next_seqnum;
3599
3600   /* get the gap between this and the previous packet. If we don't know the
3601    * previous packet seqnum assume no gap. */
3602   if (G_UNLIKELY (next_seqnum == -1)) {
3603     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3604     /* we don't know what the next_seqnum should be, the chain function should
3605      * have scheduled a DEADLINE timer that will increment next_seqnum when it
3606      * fires, so wait for that */
3607     result = GST_FLOW_WAIT;
3608   } else {
3609     gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
3610
3611     if (G_LIKELY (gap == 0)) {
3612       /* no missing packet, pop and push */
3613       result = pop_and_push_next (jitterbuffer, seqnum);
3614     } else if (G_UNLIKELY (gap < 0)) {
3615       /* if we have a packet that we already pushed or considered dropped, pop it
3616        * off and get the next packet */
3617       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
3618           seqnum, next_seqnum);
3619       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
3620       free_item (item);
3621       result = GST_FLOW_OK;
3622     } else {
3623       /* the chain function has scheduled timers to request retransmission or
3624        * when to consider the packet lost, wait for that */
3625       GST_DEBUG_OBJECT (jitterbuffer,
3626           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
3627           next_seqnum, seqnum, gap);
3628       result = GST_FLOW_WAIT;
3629     }
3630   }
3631
3632   return result;
3633
3634 wait:
3635   {
3636     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
3637     if (priv->eos) {
3638       return GST_FLOW_EOS;
3639     } else {
3640       return GST_FLOW_WAIT;
3641     }
3642   }
3643 }
3644
3645 static GstClockTime
3646 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
3647 {
3648   GstClockTime rtx_retry_timeout;
3649   GstClockTime rtx_min_retry_timeout;
3650
3651   if (priv->rtx_retry_timeout == -1) {
3652     if (priv->avg_rtx_rtt == 0)
3653       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
3654     else
3655       /* we want to ask for a retransmission after we waited for a
3656        * complete RTT and the additional jitter */
3657       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
3658   } else {
3659     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
3660   }
3661   /* make sure we don't retry too often. On very low latency networks,
3662    * the RTT and jitter can be very low. */
3663   if (priv->rtx_min_retry_timeout == -1) {
3664     rtx_min_retry_timeout = priv->packet_spacing;
3665   } else {
3666     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
3667   }
3668   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
3669
3670   return rtx_retry_timeout;
3671 }
3672
3673 static GstClockTime
3674 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
3675     GstClockTime rtx_retry_timeout)
3676 {
3677   GstClockTime rtx_retry_period;
3678
3679   if (priv->rtx_retry_period == -1) {
3680     /* we retry up to the configured jitterbuffer size but leaving some
3681      * room for the retransmission to arrive in time */
3682     if (rtx_retry_timeout > priv->latency_ns) {
3683       rtx_retry_period = 0;
3684     } else {
3685       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
3686     }
3687   } else {
3688     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
3689   }
3690   return rtx_retry_period;
3691 }
3692
3693 /*
3694   1. For *larger* rtx-rtt, weigh a new measurement as before (1/8th)
3695   2. For *smaller* rtx-rtt, be a bit more conservative and weigh a bit less (1/16th)
3696   3. For very large measurements (> avg * 2), consider them "outliers"
3697      and count them a lot less (1/48th)
3698 */
3699 static void
3700 update_avg_rtx_rtt (GstRtpJitterBufferPrivate * priv, GstClockTime rtt)
3701 {
3702   gint weight;
3703
3704   if (priv->avg_rtx_rtt == 0) {
3705     priv->avg_rtx_rtt = rtt;
3706     return;
3707   }
3708
3709   if (rtt > 2 * priv->avg_rtx_rtt)
3710     weight = 48;
3711   else if (rtt > priv->avg_rtx_rtt)
3712     weight = 8;
3713   else
3714     weight = 16;
3715
3716   priv->avg_rtx_rtt = (rtt + (weight - 1) * priv->avg_rtx_rtt) / weight;
3717 }
3718
3719 static void
3720 update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3721     GstClockTime dts, gboolean success)
3722 {
3723   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3724   GstClockTime delay;
3725
3726   if (success) {
3727     /* we scheduled a retry for this packet and now we have it */
3728     priv->num_rtx_success++;
3729     /* all the previous retry attempts failed */
3730     priv->num_rtx_failed += timer->num_rtx_retry - 1;
3731   } else {
3732     /* All retries failed or was too late */
3733     priv->num_rtx_failed += timer->num_rtx_retry;
3734   }
3735
3736   /* number of retries before (hopefully) receiving the packet */
3737   if (priv->avg_rtx_num == 0.0)
3738     priv->avg_rtx_num = timer->num_rtx_retry;
3739   else
3740     priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
3741
3742   /* Calculate the delay between retransmission request and receiving this
3743    * packet. We have a valid delay if and only if this packet is a response to
3744    * our last request. If not we don't know if this is a response to an
3745    * earlier request and delay could be way off. For RTT is more important
3746    * with correct values than to update for every packet. */
3747   if (timer->num_rtx_retry == timer->num_rtx_received &&
3748       dts != GST_CLOCK_TIME_NONE && dts > timer->rtx_last) {
3749     delay = dts - timer->rtx_last;
3750     update_avg_rtx_rtt (priv, delay);
3751   } else {
3752     delay = 0;
3753   }
3754
3755   GST_LOG_OBJECT (jitterbuffer,
3756       "RTX #%d, result %d, success %" G_GUINT64_FORMAT ", failed %"
3757       G_GUINT64_FORMAT ", requests %" G_GUINT64_FORMAT ", dups %"
3758       G_GUINT64_FORMAT ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %"
3759       GST_TIME_FORMAT, timer->seqnum, success, priv->num_rtx_success,
3760       priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
3761       priv->avg_rtx_num, GST_TIME_ARGS (delay),
3762       GST_TIME_ARGS (priv->avg_rtx_rtt));
3763 }
3764
3765 /* the timeout for when we expected a packet expired */
3766 static gboolean
3767 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3768     GstClockTime now)
3769 {
3770   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3771   GstEvent *event;
3772   guint delay, delay_ms, avg_rtx_rtt_ms;
3773   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
3774   guint rtx_deadline_ms;
3775   GstClockTime rtx_retry_period;
3776   GstClockTime rtx_retry_timeout;
3777   GstClock *clock;
3778
3779   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
3780       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
3781
3782   rtx_retry_timeout = get_rtx_retry_timeout (priv);
3783   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
3784
3785   delay = timer->rtx_delay + timer->rtx_retry;
3786
3787   delay_ms = GST_TIME_AS_MSECONDS (delay);
3788   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
3789   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
3790   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
3791   rtx_deadline_ms =
3792       priv->rtx_deadline_ms != -1 ? priv->rtx_deadline_ms : priv->latency_ms;
3793
3794   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
3795       gst_structure_new ("GstRTPRetransmissionRequest",
3796           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
3797           "running-time", G_TYPE_UINT64, timer->rtx_base,
3798           "delay", G_TYPE_UINT, delay_ms,
3799           "retry", G_TYPE_UINT, timer->num_rtx_retry,
3800           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
3801           "period", G_TYPE_UINT, rtx_retry_period_ms,
3802           "deadline", G_TYPE_UINT, rtx_deadline_ms,
3803           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
3804           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
3805   GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
3806
3807   priv->num_rtx_requests++;
3808   timer->num_rtx_retry++;
3809
3810   GST_OBJECT_LOCK (jitterbuffer);
3811   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
3812     timer->rtx_last = gst_clock_get_time (clock);
3813     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
3814   } else {
3815     timer->rtx_last = now;
3816   }
3817   GST_OBJECT_UNLOCK (jitterbuffer);
3818
3819   /* calculate the timeout for the next retransmission attempt */
3820   timer->rtx_retry += rtx_retry_timeout;
3821   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
3822       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
3823       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
3824       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
3825   if ((priv->rtx_max_retries != -1
3826           && timer->num_rtx_retry >= priv->rtx_max_retries)
3827       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)
3828       || (timer->rtx_base + rtx_retry_period < now)) {
3829     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
3830     /* too many retransmission request, we now convert the timer
3831      * to a lost timer, leave the num_rtx_retry as it is for stats */
3832     timer->type = TIMER_TYPE_LOST;
3833     timer->rtx_delay = 0;
3834     timer->rtx_retry = 0;
3835   }
3836   reschedule_timer (jitterbuffer, timer, timer->seqnum,
3837       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
3838
3839   JBUF_UNLOCK (priv);
3840   gst_pad_push_event (priv->sinkpad, event);
3841   JBUF_LOCK (priv);
3842
3843   return FALSE;
3844 }
3845
3846 /* a packet is lost */
3847 static gboolean
3848 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3849     GstClockTime now)
3850 {
3851   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3852   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
3853   gboolean head;
3854   GstEvent *event = NULL;
3855   RTPJitterBufferItem *item;
3856
3857   seqnum = timer->seqnum;
3858   lost_packets = MAX (timer->num, 1);
3859   num_rtx_retry = timer->num_rtx_retry;
3860
3861   /* we had a gap and thus we lost some packets. Create an event for this.  */
3862   if (lost_packets > 1)
3863     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
3864         seqnum + lost_packets - 1);
3865   else
3866     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
3867
3868   priv->num_lost += lost_packets;
3869   priv->num_rtx_failed += num_rtx_retry;
3870
3871   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
3872
3873   /* we now only accept seqnum bigger than this */
3874   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
3875     priv->next_in_seqnum = next_in_seqnum;
3876     priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout);
3877   }
3878
3879   /* Avoid creating events if we don't need it. Note that we still need to create
3880    * the lost *ITEM* since it will be used to notify the outgoing thread of
3881    * lost items (so that we can set discont flags and such) */
3882   if (priv->do_lost) {
3883     GstClockTime duration, timestamp;
3884     /* create paket lost event */
3885     timestamp = apply_offset (jitterbuffer, timer->timeout);
3886     duration = timer->duration;
3887     if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
3888       duration = priv->packet_spacing;
3889     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
3890         gst_structure_new ("GstRTPPacketLost",
3891             "seqnum", G_TYPE_UINT, (guint) seqnum,
3892             "timestamp", G_TYPE_UINT64, timestamp,
3893             "duration", G_TYPE_UINT64, duration,
3894             "retry", G_TYPE_UINT, num_rtx_retry, NULL));
3895   }
3896   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
3897   if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL))
3898     /* Duplicate */
3899     free_item (item);
3900
3901   if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
3902     /* Store info to update stats if the packet arrives too late */
3903     timer_queue_append (priv->rtx_stats_timers, timer,
3904         now + priv->rtx_stats_timeout * GST_MSECOND, TRUE);
3905   }
3906   remove_timer (jitterbuffer, timer);
3907
3908   if (head)
3909     JBUF_SIGNAL_EVENT (priv);
3910
3911   return TRUE;
3912 }
3913
3914 static gboolean
3915 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3916     GstClockTime now)
3917 {
3918   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3919
3920   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3921   remove_timer (jitterbuffer, timer);
3922   if (!priv->eos) {
3923     /* there was no EOS in the buffer, put one in there now */
3924     queue_event (jitterbuffer, gst_event_new_eos ());
3925   }
3926   JBUF_SIGNAL_EVENT (priv);
3927
3928   return TRUE;
3929 }
3930
3931 static gboolean
3932 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3933     GstClockTime now)
3934 {
3935   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3936
3937   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3938
3939   /* timer seqnum might have been obsoleted by caps seqnum-base,
3940    * only mess with current ongoing seqnum if still unknown */
3941   if (priv->next_seqnum == -1)
3942     priv->next_seqnum = timer->seqnum;
3943   remove_timer (jitterbuffer, timer);
3944   JBUF_SIGNAL_EVENT (priv);
3945
3946   return TRUE;
3947 }
3948
3949 static gboolean
3950 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3951     GstClockTime now)
3952 {
3953   gboolean removed = FALSE;
3954
3955   switch (timer->type) {
3956     case TIMER_TYPE_EXPECTED:
3957       removed = do_expected_timeout (jitterbuffer, timer, now);
3958       break;
3959     case TIMER_TYPE_LOST:
3960       removed = do_lost_timeout (jitterbuffer, timer, now);
3961       break;
3962     case TIMER_TYPE_DEADLINE:
3963       removed = do_deadline_timeout (jitterbuffer, timer, now);
3964       break;
3965     case TIMER_TYPE_EOS:
3966       removed = do_eos_timeout (jitterbuffer, timer, now);
3967       break;
3968   }
3969   return removed;
3970 }
3971
3972 /* called when we need to wait for the next timeout.
3973  *
3974  * We loop over the array of recorded timeouts and wait for the earliest one.
3975  * When it timed out, do the logic associated with the timer.
3976  *
3977  * If there are no timers, we wait on a gcond until something new happens.
3978  */
3979 static void
3980 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3981 {
3982   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3983   GstClockTime now = 0;
3984
3985   JBUF_LOCK (priv);
3986   while (priv->timer_running) {
3987     TimerData *timer = NULL;
3988     GstClockTime timer_timeout = -1;
3989     gint i, len;
3990
3991     /* If we have a clock, update "now" now with the very
3992      * latest running time we have. If timers are unscheduled below we
3993      * otherwise wouldn't update now (it's only updated when timers
3994      * expire), and also for the very first loop iteration now would
3995      * otherwise always be 0
3996      */
3997     GST_OBJECT_LOCK (jitterbuffer);
3998     if (GST_ELEMENT_CLOCK (jitterbuffer)) {
3999       now =
4000           gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
4001           GST_ELEMENT_CAST (jitterbuffer)->base_time;
4002     }
4003     GST_OBJECT_UNLOCK (jitterbuffer);
4004
4005     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
4006         GST_TIME_ARGS (now));
4007
4008     /* Clear expired rtx-stats timers */
4009     if (priv->do_retransmission)
4010       timer_queue_clear_until (priv->rtx_stats_timers, now);
4011
4012     /* Iterate "normal" timers */
4013     len = priv->timers->len;
4014     for (i = 0; i < len;) {
4015       TimerData *test = &g_array_index (priv->timers, TimerData, i);
4016       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
4017       gboolean save_best = FALSE;
4018
4019       GST_DEBUG_OBJECT (jitterbuffer,
4020           "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i,
4021           test->type, test->seqnum, GST_TIME_ARGS (test_timeout),
4022           GST_STIME_ARGS ((gint64) (test_timeout - now)));
4023
4024       /* Weed out anything too late */
4025       if (test->type == TIMER_TYPE_LOST &&
4026           (test_timeout == -1 || test_timeout <= now)) {
4027         GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry");
4028         do_lost_timeout (jitterbuffer, test, now);
4029         if (!priv->timer_running)
4030           break;
4031         /* We don't move the iterator forward since we just removed the current entry,
4032          * but we update the termination condition */
4033         len = priv->timers->len;
4034       } else {
4035         /* find the smallest timeout */
4036         if (timer == NULL) {
4037           save_best = TRUE;
4038         } else if (timer_timeout == -1) {
4039           /* we already have an immediate timeout, the new timer must be an
4040            * immediate timer with smaller seqnum to become the best */
4041           if (test_timeout == -1
4042               && (gst_rtp_buffer_compare_seqnum (test->seqnum,
4043                       timer->seqnum) > 0))
4044             save_best = TRUE;
4045         } else if (test_timeout == -1) {
4046           /* first immediate timer */
4047           save_best = TRUE;
4048         } else if (test_timeout < timer_timeout) {
4049           /* earlier timer */
4050           save_best = TRUE;
4051         } else if (test_timeout == timer_timeout
4052             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
4053                     timer->seqnum) > 0)) {
4054           /* same timer, smaller seqnum */
4055           save_best = TRUE;
4056         }
4057
4058         if (save_best) {
4059           GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
4060           timer = test;
4061           timer_timeout = test_timeout;
4062         }
4063         i++;
4064       }
4065     }
4066     if (timer && !priv->blocked) {
4067       GstClock *clock;
4068       GstClockTime sync_time;
4069       GstClockID id;
4070       GstClockReturn ret;
4071       GstClockTimeDiff clock_jitter;
4072
4073       if (timer_timeout == -1 || timer_timeout <= now) {
4074         /* We have normally removed all lost timers in the loop above */
4075         g_assert (timer->type != TIMER_TYPE_LOST);
4076
4077         do_timeout (jitterbuffer, timer, now);
4078         /* check here, do_timeout could have released the lock */
4079         if (!priv->timer_running)
4080           break;
4081         continue;
4082       }
4083
4084       GST_OBJECT_LOCK (jitterbuffer);
4085       clock = GST_ELEMENT_CLOCK (jitterbuffer);
4086       if (!clock) {
4087         GST_OBJECT_UNLOCK (jitterbuffer);
4088         /* let's just push if there is no clock */
4089         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
4090         now = timer_timeout;
4091         continue;
4092       }
4093
4094       /* prepare for sync against clock */
4095       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
4096       /* add latency of peer to get input time */
4097       sync_time += priv->peer_latency;
4098
4099       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
4100           " with sync time %" GST_TIME_FORMAT,
4101           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
4102
4103       /* create an entry for the clock */
4104       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
4105       priv->timer_timeout = timer_timeout;
4106       priv->timer_seqnum = timer->seqnum;
4107       GST_OBJECT_UNLOCK (jitterbuffer);
4108
4109       /* release the lock so that the other end can push stuff or unlock */
4110       JBUF_UNLOCK (priv);
4111
4112       ret = gst_clock_id_wait (id, &clock_jitter);
4113
4114       JBUF_LOCK (priv);
4115       if (!priv->timer_running) {
4116         gst_clock_id_unref (id);
4117         priv->clock_id = NULL;
4118         break;
4119       }
4120
4121       if (ret != GST_CLOCK_UNSCHEDULED) {
4122         now = timer_timeout + MAX (clock_jitter, 0);
4123         GST_DEBUG_OBJECT (jitterbuffer,
4124             "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
4125             GST_STIME_ARGS (clock_jitter));
4126       } else {
4127         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
4128       }
4129       /* and free the entry */
4130       gst_clock_id_unref (id);
4131       priv->clock_id = NULL;
4132     } else {
4133       /* no timers, wait for activity */
4134       JBUF_WAIT_TIMER (priv);
4135     }
4136   }
4137   JBUF_UNLOCK (priv);
4138
4139   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
4140   return;
4141 }
4142
4143 /*
4144  * This funcion implements the main pushing loop on the source pad.
4145  *
4146  * It first tries to push as many buffers as possible. If there is a seqnum
4147  * mismatch, we wait for the next timeouts.
4148  */
4149 static void
4150 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
4151 {
4152   GstRtpJitterBufferPrivate *priv;
4153   GstFlowReturn result = GST_FLOW_OK;
4154
4155   priv = jitterbuffer->priv;
4156
4157   JBUF_LOCK_CHECK (priv, flushing);
4158   do {
4159     result = handle_next_buffer (jitterbuffer);
4160     if (G_LIKELY (result == GST_FLOW_WAIT)) {
4161       /* now wait for the next event */
4162       JBUF_WAIT_EVENT (priv, flushing);
4163       result = GST_FLOW_OK;
4164     }
4165   } while (result == GST_FLOW_OK);
4166   /* store result for upstream */
4167   priv->srcresult = result;
4168   /* if we get here we need to pause */
4169   goto pause;
4170
4171   /* ERRORS */
4172 flushing:
4173   {
4174     result = priv->srcresult;
4175     goto pause;
4176   }
4177 pause:
4178   {
4179     GstEvent *event;
4180
4181     JBUF_SIGNAL_QUERY (priv, FALSE);
4182     JBUF_UNLOCK (priv);
4183
4184     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
4185         gst_flow_get_name (result));
4186     gst_pad_pause_task (priv->srcpad);
4187     if (result == GST_FLOW_EOS) {
4188       event = gst_event_new_eos ();
4189       gst_pad_push_event (priv->srcpad, event);
4190     }
4191     return;
4192   }
4193 }
4194
4195 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
4196  * some sanity checks and then emit the handle-sync signal with the parameters.
4197  * This function must be called with the LOCK */
4198 static void
4199 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
4200 {
4201   GstRtpJitterBufferPrivate *priv;
4202   guint64 base_rtptime, base_time;
4203   guint32 clock_rate;
4204   guint64 last_rtptime;
4205   guint64 clock_base;
4206   guint64 ext_rtptime, diff;
4207   gboolean valid = TRUE, keep = FALSE;
4208
4209   priv = jitterbuffer->priv;
4210
4211   /* get the last values from the jitterbuffer */
4212   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
4213       &clock_rate, &last_rtptime);
4214
4215   clock_base = priv->clock_base;
4216   ext_rtptime = priv->ext_rtptime;
4217
4218   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
4219       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
4220       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
4221       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
4222
4223   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
4224     /* we keep this SR packet for later. When we get a valid RTP packet the
4225      * above values will be set and we can try to use the SR packet */
4226     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
4227     keep = TRUE;
4228   } else {
4229     /* we can't accept anything that happened before we did the last resync */
4230     if (base_rtptime > ext_rtptime) {
4231       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
4232       valid = FALSE;
4233     } else {
4234       /* the SR RTP timestamp must be something close to what we last observed
4235        * in the jitterbuffer */
4236       if (ext_rtptime > last_rtptime) {
4237         /* check how far ahead it is to our RTP timestamps */
4238         diff = ext_rtptime - last_rtptime;
4239         /* if bigger than 1 second, we drop it */
4240         if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 &&
4241             diff >
4242             gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff,
4243                 clock_rate, 1000)) {
4244           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
4245           /* should drop this, but some RTSP servers end up with bogus
4246            * way too ahead RTCP packet when repeated PAUSE/PLAY,
4247            * so still trigger rptbin sync but invalidate RTCP data
4248            * (sync might use other methods) */
4249           ext_rtptime = -1;
4250         }
4251         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
4252             G_GUINT64_FORMAT, last_rtptime, diff);
4253       }
4254     }
4255   }
4256
4257   if (keep) {
4258     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
4259   } else if (valid) {
4260     GstStructure *s;
4261
4262     s = gst_structure_new ("application/x-rtp-sync",
4263         "base-rtptime", G_TYPE_UINT64, base_rtptime,
4264         "base-time", G_TYPE_UINT64, base_time,
4265         "clock-rate", G_TYPE_UINT, clock_rate,
4266         "clock-base", G_TYPE_UINT64, clock_base,
4267         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
4268         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
4269
4270     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
4271     gst_buffer_replace (&priv->last_sr, NULL);
4272     JBUF_UNLOCK (priv);
4273     g_signal_emit (jitterbuffer,
4274         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
4275     JBUF_LOCK (priv);
4276     gst_structure_free (s);
4277   } else {
4278     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
4279     gst_buffer_replace (&priv->last_sr, NULL);
4280   }
4281 }
4282
4283 static GstFlowReturn
4284 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
4285     GstBuffer * buffer)
4286 {
4287   GstRtpJitterBuffer *jitterbuffer;
4288   GstRtpJitterBufferPrivate *priv;
4289   GstFlowReturn ret = GST_FLOW_OK;
4290   guint32 ssrc;
4291   GstRTCPPacket packet;
4292   guint64 ext_rtptime;
4293   guint32 rtptime;
4294   GstRTCPBuffer rtcp = { NULL, };
4295
4296   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4297
4298   if (G_UNLIKELY (!gst_rtcp_buffer_validate_reduced (buffer)))
4299     goto invalid_buffer;
4300
4301   priv = jitterbuffer->priv;
4302
4303   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
4304
4305   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
4306     goto empty_buffer;
4307
4308   /* first packet must be SR or RR or else the validate would have failed */
4309   switch (gst_rtcp_packet_get_type (&packet)) {
4310     case GST_RTCP_TYPE_SR:
4311       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
4312           NULL, NULL);
4313       break;
4314     default:
4315       goto ignore_buffer;
4316   }
4317   gst_rtcp_buffer_unmap (&rtcp);
4318
4319   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
4320
4321   JBUF_LOCK (priv);
4322   /* convert the RTP timestamp to our extended timestamp, using the same offset
4323    * we used in the jitterbuffer */
4324   ext_rtptime = priv->jbuf->ext_rtptime;
4325   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
4326
4327   priv->ext_rtptime = ext_rtptime;
4328   gst_buffer_replace (&priv->last_sr, buffer);
4329
4330   do_handle_sync (jitterbuffer);
4331   JBUF_UNLOCK (priv);
4332
4333 done:
4334   gst_buffer_unref (buffer);
4335
4336   return ret;
4337
4338 invalid_buffer:
4339   {
4340     /* this is not fatal but should be filtered earlier */
4341     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4342         ("Received invalid RTCP payload, dropping"));
4343     ret = GST_FLOW_OK;
4344     goto done;
4345   }
4346 empty_buffer:
4347   {
4348     /* this is not fatal but should be filtered earlier */
4349     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4350         ("Received empty RTCP payload, dropping"));
4351     gst_rtcp_buffer_unmap (&rtcp);
4352     ret = GST_FLOW_OK;
4353     goto done;
4354   }
4355 ignore_buffer:
4356   {
4357     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
4358     gst_rtcp_buffer_unmap (&rtcp);
4359     ret = GST_FLOW_OK;
4360     goto done;
4361   }
4362 }
4363
4364 static gboolean
4365 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
4366     GstQuery * query)
4367 {
4368   gboolean res = FALSE;
4369   GstRtpJitterBuffer *jitterbuffer;
4370   GstRtpJitterBufferPrivate *priv;
4371
4372   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4373   priv = jitterbuffer->priv;
4374
4375   switch (GST_QUERY_TYPE (query)) {
4376     case GST_QUERY_CAPS:
4377     {
4378       GstCaps *filter, *caps;
4379
4380       gst_query_parse_caps (query, &filter);
4381       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4382       gst_query_set_caps_result (query, caps);
4383       gst_caps_unref (caps);
4384       res = TRUE;
4385       break;
4386     }
4387     default:
4388       if (GST_QUERY_IS_SERIALIZED (query)) {
4389         RTPJitterBufferItem *item;
4390         gboolean head;
4391
4392         JBUF_LOCK_CHECK (priv, out_flushing);
4393         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
4394             RTP_JITTER_BUFFER_MODE_BUFFER) {
4395           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
4396           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
4397           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
4398           if (head)
4399             JBUF_SIGNAL_EVENT (priv);
4400           JBUF_WAIT_QUERY (priv, out_flushing);
4401           res = priv->last_query;
4402         } else {
4403           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
4404           res = FALSE;
4405         }
4406         JBUF_UNLOCK (priv);
4407       } else {
4408         res = gst_pad_query_default (pad, parent, query);
4409       }
4410       break;
4411   }
4412   return res;
4413   /* ERRORS */
4414 out_flushing:
4415   {
4416     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
4417     JBUF_UNLOCK (priv);
4418     return FALSE;
4419   }
4420
4421 }
4422
4423 static gboolean
4424 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
4425     GstQuery * query)
4426 {
4427   GstRtpJitterBuffer *jitterbuffer;
4428   GstRtpJitterBufferPrivate *priv;
4429   gboolean res = FALSE;
4430
4431   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4432   priv = jitterbuffer->priv;
4433
4434   switch (GST_QUERY_TYPE (query)) {
4435     case GST_QUERY_LATENCY:
4436     {
4437       /* We need to send the query upstream and add the returned latency to our
4438        * own */
4439       GstClockTime min_latency, max_latency;
4440       gboolean us_live;
4441       GstClockTime our_latency;
4442
4443       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
4444         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
4445
4446         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
4447             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4448             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4449
4450         /* store this so that we can safely sync on the peer buffers. */
4451         JBUF_LOCK (priv);
4452         priv->peer_latency = min_latency;
4453         our_latency = priv->latency_ns;
4454         JBUF_UNLOCK (priv);
4455
4456         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
4457             GST_TIME_ARGS (our_latency));
4458
4459         /* we add some latency but can buffer an infinite amount of time */
4460         min_latency += our_latency;
4461         max_latency = -1;
4462
4463         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
4464             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4465             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4466
4467         gst_query_set_latency (query, TRUE, min_latency, max_latency);
4468       }
4469       break;
4470     }
4471     case GST_QUERY_POSITION:
4472     {
4473       GstClockTime start, last_out;
4474       GstFormat fmt;
4475
4476       gst_query_parse_position (query, &fmt, NULL);
4477       if (fmt != GST_FORMAT_TIME) {
4478         res = gst_pad_query_default (pad, parent, query);
4479         break;
4480       }
4481
4482       JBUF_LOCK (priv);
4483       start = priv->npt_start;
4484       last_out = priv->last_out_time;
4485       JBUF_UNLOCK (priv);
4486
4487       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
4488           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
4489           GST_TIME_ARGS (last_out));
4490
4491       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
4492         /* bring 0-based outgoing time to stream time */
4493         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
4494         res = TRUE;
4495       } else {
4496         res = gst_pad_query_default (pad, parent, query);
4497       }
4498       break;
4499     }
4500     case GST_QUERY_CAPS:
4501     {
4502       GstCaps *filter, *caps;
4503
4504       gst_query_parse_caps (query, &filter);
4505       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4506       gst_query_set_caps_result (query, caps);
4507       gst_caps_unref (caps);
4508       res = TRUE;
4509       break;
4510     }
4511     default:
4512       res = gst_pad_query_default (pad, parent, query);
4513       break;
4514   }
4515
4516   return res;
4517 }
4518
4519 static void
4520 gst_rtp_jitter_buffer_set_property (GObject * object,
4521     guint prop_id, const GValue * value, GParamSpec * pspec)
4522 {
4523   GstRtpJitterBuffer *jitterbuffer;
4524   GstRtpJitterBufferPrivate *priv;
4525
4526   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4527   priv = jitterbuffer->priv;
4528
4529   switch (prop_id) {
4530     case PROP_LATENCY:
4531     {
4532       guint new_latency, old_latency;
4533
4534       new_latency = g_value_get_uint (value);
4535
4536       JBUF_LOCK (priv);
4537       old_latency = priv->latency_ms;
4538       priv->latency_ms = new_latency;
4539       priv->latency_ns = priv->latency_ms * GST_MSECOND;
4540       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
4541       JBUF_UNLOCK (priv);
4542
4543       /* post message if latency changed, this will inform the parent pipeline
4544        * that a latency reconfiguration is possible/needed. */
4545       if (new_latency != old_latency) {
4546         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
4547             GST_TIME_ARGS (new_latency * GST_MSECOND));
4548
4549         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
4550             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
4551       }
4552       break;
4553     }
4554     case PROP_DROP_ON_LATENCY:
4555       JBUF_LOCK (priv);
4556       priv->drop_on_latency = g_value_get_boolean (value);
4557       JBUF_UNLOCK (priv);
4558       break;
4559     case PROP_TS_OFFSET:
4560       JBUF_LOCK (priv);
4561       if (priv->max_ts_offset_adjustment != 0) {
4562         gint64 new_offset = g_value_get_int64 (value);
4563
4564         if (new_offset > priv->ts_offset) {
4565           priv->ts_offset_remainder = new_offset - priv->ts_offset;
4566         } else {
4567           priv->ts_offset_remainder = -(priv->ts_offset - new_offset);
4568         }
4569       } else {
4570         priv->ts_offset = g_value_get_int64 (value);
4571         priv->ts_offset_remainder = 0;
4572       }
4573       priv->ts_discont = TRUE;
4574       JBUF_UNLOCK (priv);
4575       break;
4576     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4577       JBUF_LOCK (priv);
4578       priv->max_ts_offset_adjustment = g_value_get_uint64 (value);
4579       JBUF_UNLOCK (priv);
4580       break;
4581     case PROP_DO_LOST:
4582       JBUF_LOCK (priv);
4583       priv->do_lost = g_value_get_boolean (value);
4584       JBUF_UNLOCK (priv);
4585       break;
4586     case PROP_MODE:
4587       JBUF_LOCK (priv);
4588       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
4589       JBUF_UNLOCK (priv);
4590       break;
4591     case PROP_DO_RETRANSMISSION:
4592       JBUF_LOCK (priv);
4593       priv->do_retransmission = g_value_get_boolean (value);
4594       JBUF_UNLOCK (priv);
4595       break;
4596     case PROP_RTX_NEXT_SEQNUM:
4597       JBUF_LOCK (priv);
4598       priv->rtx_next_seqnum = g_value_get_boolean (value);
4599       JBUF_UNLOCK (priv);
4600       break;
4601     case PROP_RTX_DELAY:
4602       JBUF_LOCK (priv);
4603       priv->rtx_delay = g_value_get_int (value);
4604       JBUF_UNLOCK (priv);
4605       break;
4606     case PROP_RTX_MIN_DELAY:
4607       JBUF_LOCK (priv);
4608       priv->rtx_min_delay = g_value_get_uint (value);
4609       JBUF_UNLOCK (priv);
4610       break;
4611     case PROP_RTX_DELAY_REORDER:
4612       JBUF_LOCK (priv);
4613       priv->rtx_delay_reorder = g_value_get_int (value);
4614       JBUF_UNLOCK (priv);
4615       break;
4616     case PROP_RTX_RETRY_TIMEOUT:
4617       JBUF_LOCK (priv);
4618       priv->rtx_retry_timeout = g_value_get_int (value);
4619       JBUF_UNLOCK (priv);
4620       break;
4621     case PROP_RTX_MIN_RETRY_TIMEOUT:
4622       JBUF_LOCK (priv);
4623       priv->rtx_min_retry_timeout = g_value_get_int (value);
4624       JBUF_UNLOCK (priv);
4625       break;
4626     case PROP_RTX_RETRY_PERIOD:
4627       JBUF_LOCK (priv);
4628       priv->rtx_retry_period = g_value_get_int (value);
4629       JBUF_UNLOCK (priv);
4630       break;
4631     case PROP_RTX_MAX_RETRIES:
4632       JBUF_LOCK (priv);
4633       priv->rtx_max_retries = g_value_get_int (value);
4634       JBUF_UNLOCK (priv);
4635       break;
4636     case PROP_RTX_DEADLINE:
4637       JBUF_LOCK (priv);
4638       priv->rtx_deadline_ms = g_value_get_int (value);
4639       JBUF_UNLOCK (priv);
4640       break;
4641     case PROP_RTX_STATS_TIMEOUT:
4642       JBUF_LOCK (priv);
4643       priv->rtx_stats_timeout = g_value_get_uint (value);
4644       JBUF_UNLOCK (priv);
4645       break;
4646     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4647       JBUF_LOCK (priv);
4648       priv->max_rtcp_rtp_time_diff = g_value_get_int (value);
4649       JBUF_UNLOCK (priv);
4650       break;
4651     case PROP_MAX_DROPOUT_TIME:
4652       JBUF_LOCK (priv);
4653       priv->max_dropout_time = g_value_get_uint (value);
4654       JBUF_UNLOCK (priv);
4655       break;
4656     case PROP_MAX_MISORDER_TIME:
4657       JBUF_LOCK (priv);
4658       priv->max_misorder_time = g_value_get_uint (value);
4659       JBUF_UNLOCK (priv);
4660       break;
4661     case PROP_RFC7273_SYNC:
4662       JBUF_LOCK (priv);
4663       rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
4664           g_value_get_boolean (value));
4665       JBUF_UNLOCK (priv);
4666       break;
4667     case PROP_FASTSTART_MIN_PACKETS:
4668       JBUF_LOCK (priv);
4669       priv->faststart_min_packets = g_value_get_uint (value);
4670       JBUF_UNLOCK (priv);
4671       break;
4672     default:
4673       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4674       break;
4675   }
4676 }
4677
4678 static void
4679 gst_rtp_jitter_buffer_get_property (GObject * object,
4680     guint prop_id, GValue * value, GParamSpec * pspec)
4681 {
4682   GstRtpJitterBuffer *jitterbuffer;
4683   GstRtpJitterBufferPrivate *priv;
4684
4685   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4686   priv = jitterbuffer->priv;
4687
4688   switch (prop_id) {
4689     case PROP_LATENCY:
4690       JBUF_LOCK (priv);
4691       g_value_set_uint (value, priv->latency_ms);
4692       JBUF_UNLOCK (priv);
4693       break;
4694     case PROP_DROP_ON_LATENCY:
4695       JBUF_LOCK (priv);
4696       g_value_set_boolean (value, priv->drop_on_latency);
4697       JBUF_UNLOCK (priv);
4698       break;
4699     case PROP_TS_OFFSET:
4700       JBUF_LOCK (priv);
4701       g_value_set_int64 (value, priv->ts_offset);
4702       JBUF_UNLOCK (priv);
4703       break;
4704     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4705       JBUF_LOCK (priv);
4706       g_value_set_uint64 (value, priv->max_ts_offset_adjustment);
4707       JBUF_UNLOCK (priv);
4708       break;
4709     case PROP_DO_LOST:
4710       JBUF_LOCK (priv);
4711       g_value_set_boolean (value, priv->do_lost);
4712       JBUF_UNLOCK (priv);
4713       break;
4714     case PROP_MODE:
4715       JBUF_LOCK (priv);
4716       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
4717       JBUF_UNLOCK (priv);
4718       break;
4719     case PROP_PERCENT:
4720     {
4721       gint percent;
4722
4723       JBUF_LOCK (priv);
4724       if (priv->srcresult != GST_FLOW_OK)
4725         percent = 100;
4726       else
4727         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
4728
4729       g_value_set_int (value, percent);
4730       JBUF_UNLOCK (priv);
4731       break;
4732     }
4733     case PROP_DO_RETRANSMISSION:
4734       JBUF_LOCK (priv);
4735       g_value_set_boolean (value, priv->do_retransmission);
4736       JBUF_UNLOCK (priv);
4737       break;
4738     case PROP_RTX_NEXT_SEQNUM:
4739       JBUF_LOCK (priv);
4740       g_value_set_boolean (value, priv->rtx_next_seqnum);
4741       JBUF_UNLOCK (priv);
4742       break;
4743     case PROP_RTX_DELAY:
4744       JBUF_LOCK (priv);
4745       g_value_set_int (value, priv->rtx_delay);
4746       JBUF_UNLOCK (priv);
4747       break;
4748     case PROP_RTX_MIN_DELAY:
4749       JBUF_LOCK (priv);
4750       g_value_set_uint (value, priv->rtx_min_delay);
4751       JBUF_UNLOCK (priv);
4752       break;
4753     case PROP_RTX_DELAY_REORDER:
4754       JBUF_LOCK (priv);
4755       g_value_set_int (value, priv->rtx_delay_reorder);
4756       JBUF_UNLOCK (priv);
4757       break;
4758     case PROP_RTX_RETRY_TIMEOUT:
4759       JBUF_LOCK (priv);
4760       g_value_set_int (value, priv->rtx_retry_timeout);
4761       JBUF_UNLOCK (priv);
4762       break;
4763     case PROP_RTX_MIN_RETRY_TIMEOUT:
4764       JBUF_LOCK (priv);
4765       g_value_set_int (value, priv->rtx_min_retry_timeout);
4766       JBUF_UNLOCK (priv);
4767       break;
4768     case PROP_RTX_RETRY_PERIOD:
4769       JBUF_LOCK (priv);
4770       g_value_set_int (value, priv->rtx_retry_period);
4771       JBUF_UNLOCK (priv);
4772       break;
4773     case PROP_RTX_MAX_RETRIES:
4774       JBUF_LOCK (priv);
4775       g_value_set_int (value, priv->rtx_max_retries);
4776       JBUF_UNLOCK (priv);
4777       break;
4778     case PROP_RTX_DEADLINE:
4779       JBUF_LOCK (priv);
4780       g_value_set_int (value, priv->rtx_deadline_ms);
4781       JBUF_UNLOCK (priv);
4782       break;
4783     case PROP_RTX_STATS_TIMEOUT:
4784       JBUF_LOCK (priv);
4785       g_value_set_uint (value, priv->rtx_stats_timeout);
4786       JBUF_UNLOCK (priv);
4787       break;
4788     case PROP_STATS:
4789       g_value_take_boxed (value,
4790           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
4791       break;
4792     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4793       JBUF_LOCK (priv);
4794       g_value_set_int (value, priv->max_rtcp_rtp_time_diff);
4795       JBUF_UNLOCK (priv);
4796       break;
4797     case PROP_MAX_DROPOUT_TIME:
4798       JBUF_LOCK (priv);
4799       g_value_set_uint (value, priv->max_dropout_time);
4800       JBUF_UNLOCK (priv);
4801       break;
4802     case PROP_MAX_MISORDER_TIME:
4803       JBUF_LOCK (priv);
4804       g_value_set_uint (value, priv->max_misorder_time);
4805       JBUF_UNLOCK (priv);
4806       break;
4807     case PROP_RFC7273_SYNC:
4808       JBUF_LOCK (priv);
4809       g_value_set_boolean (value,
4810           rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
4811       JBUF_UNLOCK (priv);
4812       break;
4813     case PROP_FASTSTART_MIN_PACKETS:
4814       JBUF_LOCK (priv);
4815       g_value_set_uint (value, priv->faststart_min_packets);
4816       JBUF_UNLOCK (priv);
4817       break;
4818     default:
4819       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4820       break;
4821   }
4822 }
4823
4824 static GstStructure *
4825 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
4826 {
4827   GstRtpJitterBufferPrivate *priv = jbuf->priv;
4828   GstStructure *s;
4829
4830   JBUF_LOCK (priv);
4831   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
4832       "num-pushed", G_TYPE_UINT64, priv->num_pushed,
4833       "num-lost", G_TYPE_UINT64, priv->num_lost,
4834       "num-late", G_TYPE_UINT64, priv->num_late,
4835       "num-duplicates", G_TYPE_UINT64, priv->num_duplicates,
4836       "avg-jitter", G_TYPE_UINT64, priv->avg_jitter,
4837       "rtx-count", G_TYPE_UINT64, priv->num_rtx_requests,
4838       "rtx-success-count", G_TYPE_UINT64, priv->num_rtx_success,
4839       "rtx-per-packet", G_TYPE_DOUBLE, priv->avg_rtx_num,
4840       "rtx-rtt", G_TYPE_UINT64, priv->avg_rtx_rtt, NULL);
4841   JBUF_UNLOCK (priv);
4842
4843   return s;
4844 }