rtpjitterbuffer: Limit size to 2^15 packets
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *  Copyright 2015 Kurento (http://kurento.org/)
9  *   @author: Miguel ParĂ­s <mparisdiaz@gmail.com>
10  *  Copyright 2016 Pexip AS
11  *   @author: Havard Graff <havard@pexip.com>
12  *   @author: Stian Selnes <stian@pexip.com>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  *
29  */
30
31 /**
32  * SECTION:element-rtpjitterbuffer
33  *
34  * This element reorders and removes duplicate RTP packets as they are received
35  * from a network source.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * The rtpjitterbuffer will wait for missing packets up to a configurable time
43  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
44  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
45  * property is set, lost packets will result in a custom serialized downstream
46  * event of name GstRTPPacketLost. The lost packet events are usually used by a
47  * depayloader or other element to create concealment data or some other logic
48  * to gracefully handle the missing packets.
49  *
50  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
51  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
52  * buffer.
53  *
54  * The jitterbuffer can also be configured to send early retransmission events
55  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
56  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
57  * sends a custom upstream event named GstRTPRetransmissionRequest when the
58  * packet is considered late. The initial expected packet arrival time is
59  * calculated as follows:
60  *
61  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
62  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
63  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
64  *     packets with different rtptime.
65  *
66  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
67  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
68  *     previously scheduled timeout is overwritten.
69  *
70  * - If seqnum N arrived, all seqnum older than
71  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
72  *     immediately. This is to request fast feedback for abonormally reorder
73  *     packets before any of the previous timeouts is triggered.
74  *
75  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
76  * event. After the initial timeout expires and the retransmission event is
77  * sent, the timeout is scheduled for
78  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
79  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
80  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
81  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
82  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
83  * retransmission requests are sent and the regular logic is performed to
84  * schedule a lost packet as discussed above.
85  *
86  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
87  * to the pipeline.
88  *
89  * This element will automatically be used inside rtpbin.
90  *
91  * <refsect2>
92  * <title>Example pipelines</title>
93  * |[
94  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
95  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
96  * inserted into the pipeline to smooth out network jitter and to reorder the
97  * out-of-order RTP packets.
98  * </refsect2>
99  */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include <stdlib.h>
106 #include <stdio.h>
107 #include <string.h>
108 #include <gst/rtp/gstrtpbuffer.h>
109 #include <gst/net/net.h>
110
111 #include "gstrtpjitterbuffer.h"
112 #include "rtpjitterbuffer.h"
113 #include "rtpstats.h"
114
115 #include <gst/glib-compat-private.h>
116
117 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
118 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
119
120 /* RTPJitterBuffer signals and args */
121 enum
122 {
123   SIGNAL_REQUEST_PT_MAP,
124   SIGNAL_CLEAR_PT_MAP,
125   SIGNAL_HANDLE_SYNC,
126   SIGNAL_ON_NPT_STOP,
127   SIGNAL_SET_ACTIVE,
128   LAST_SIGNAL
129 };
130
131 #define DEFAULT_LATENCY_MS          200
132 #define DEFAULT_DROP_ON_LATENCY     FALSE
133 #define DEFAULT_TS_OFFSET           0
134 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT 0
135 #define DEFAULT_DO_LOST             FALSE
136 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
137 #define DEFAULT_PERCENT             0
138 #define DEFAULT_DO_RETRANSMISSION   FALSE
139 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
140 #define DEFAULT_RTX_DELAY           -1
141 #define DEFAULT_RTX_MIN_DELAY       0
142 #define DEFAULT_RTX_DELAY_REORDER   3
143 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
144 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
145 #define DEFAULT_RTX_RETRY_PERIOD    -1
146 #define DEFAULT_RTX_MAX_RETRIES    -1
147 #define DEFAULT_RTX_DEADLINE       -1
148 #define DEFAULT_RTX_STATS_TIMEOUT   1000
149 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
150 #define DEFAULT_MAX_DROPOUT_TIME    60000
151 #define DEFAULT_MAX_MISORDER_TIME   2000
152 #define DEFAULT_RFC7273_SYNC        FALSE
153 #define DEFAULT_FASTSTART_MIN_PACKETS 0
154
155 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
156 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
157
158 enum
159 {
160   PROP_0,
161   PROP_LATENCY,
162   PROP_DROP_ON_LATENCY,
163   PROP_TS_OFFSET,
164   PROP_MAX_TS_OFFSET_ADJUSTMENT,
165   PROP_DO_LOST,
166   PROP_MODE,
167   PROP_PERCENT,
168   PROP_DO_RETRANSMISSION,
169   PROP_RTX_NEXT_SEQNUM,
170   PROP_RTX_DELAY,
171   PROP_RTX_MIN_DELAY,
172   PROP_RTX_DELAY_REORDER,
173   PROP_RTX_RETRY_TIMEOUT,
174   PROP_RTX_MIN_RETRY_TIMEOUT,
175   PROP_RTX_RETRY_PERIOD,
176   PROP_RTX_MAX_RETRIES,
177   PROP_RTX_DEADLINE,
178   PROP_RTX_STATS_TIMEOUT,
179   PROP_STATS,
180   PROP_MAX_RTCP_RTP_TIME_DIFF,
181   PROP_MAX_DROPOUT_TIME,
182   PROP_MAX_MISORDER_TIME,
183   PROP_RFC7273_SYNC,
184   PROP_FASTSTART_MIN_PACKETS
185 };
186
187 #define JBUF_LOCK(priv)   G_STMT_START {                        \
188     GST_TRACE("Locking from thread %p", g_thread_self());       \
189     (g_mutex_lock (&(priv)->jbuf_lock));                        \
190     GST_TRACE("Locked from thread %p", g_thread_self());        \
191   } G_STMT_END
192
193 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
194   JBUF_LOCK (priv);                                   \
195   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
196     goto label;                                       \
197 } G_STMT_END
198 #define JBUF_UNLOCK(priv) G_STMT_START {                        \
199     GST_TRACE ("Unlocking from thread %p", g_thread_self ());   \
200     (g_mutex_unlock (&(priv)->jbuf_lock));                      \
201 } G_STMT_END
202
203 #define JBUF_WAIT_QUEUE(priv)   G_STMT_START {            \
204   GST_DEBUG ("waiting queue");                            \
205   (priv)->waiting_queue++;                                \
206   g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock);  \
207   (priv)->waiting_queue--;                                \
208   GST_DEBUG ("waiting queue done");                       \
209 } G_STMT_END
210 #define JBUF_SIGNAL_QUEUE(priv) G_STMT_START {            \
211   if (G_UNLIKELY ((priv)->waiting_queue)) {               \
212     GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
213     g_cond_signal (&(priv)->jbuf_queue);                  \
214   }                                                       \
215 } G_STMT_END
216
217 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
218   GST_DEBUG ("waiting timer");                            \
219   (priv)->waiting_timer++;                                \
220   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
221   (priv)->waiting_timer--;                                \
222   GST_DEBUG ("waiting timer done");                       \
223 } G_STMT_END
224 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
225   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
226     GST_DEBUG ("signal timer, %d waiters", (priv)->waiting_timer); \
227     g_cond_signal (&(priv)->jbuf_timer);                  \
228   }                                                       \
229 } G_STMT_END
230
231 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
232   GST_DEBUG ("waiting event");                           \
233   (priv)->waiting_event = TRUE;                          \
234   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
235   (priv)->waiting_event = FALSE;                         \
236   GST_DEBUG ("waiting event done");                      \
237   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
238     goto label;                                          \
239 } G_STMT_END
240 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
241   if (G_UNLIKELY ((priv)->waiting_event)) {              \
242     GST_DEBUG ("signal event");                          \
243     g_cond_signal (&(priv)->jbuf_event);                 \
244   }                                                      \
245 } G_STMT_END
246
247 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
248   GST_DEBUG ("waiting query");                           \
249   (priv)->waiting_query = TRUE;                          \
250   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
251   (priv)->waiting_query = FALSE;                         \
252   GST_DEBUG ("waiting query done");                      \
253   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
254     goto label;                                          \
255 } G_STMT_END
256 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
257   (priv)->last_query = res;                              \
258   if (G_UNLIKELY ((priv)->waiting_query)) {              \
259     GST_DEBUG ("signal query");                          \
260     g_cond_signal (&(priv)->jbuf_query);                 \
261   }                                                      \
262 } G_STMT_END
263
264 #define GST_BUFFER_IS_RETRANSMISSION(buffer) \
265   GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
266
267 typedef struct TimerQueue
268 {
269   GQueue *timers;
270   GHashTable *hashtable;
271 } TimerQueue;
272
273 struct _GstRtpJitterBufferPrivate
274 {
275   GstPad *sinkpad, *srcpad;
276   GstPad *rtcpsinkpad;
277
278   RTPJitterBuffer *jbuf;
279   GMutex jbuf_lock;
280   gboolean waiting_queue;
281   GCond jbuf_queue;
282   gboolean waiting_timer;
283   GCond jbuf_timer;
284   gboolean waiting_event;
285   GCond jbuf_event;
286   gboolean waiting_query;
287   GCond jbuf_query;
288   gboolean last_query;
289   gboolean discont;
290   gboolean ts_discont;
291   gboolean active;
292   guint64 out_offset;
293   guint32 segment_seqnum;
294
295   gboolean timer_running;
296   GThread *timer_thread;
297
298   /* properties */
299   guint latency_ms;
300   guint64 latency_ns;
301   gboolean drop_on_latency;
302   gint64 ts_offset;
303   guint64 max_ts_offset_adjustment;
304   gboolean do_lost;
305   gboolean do_retransmission;
306   gboolean rtx_next_seqnum;
307   gint rtx_delay;
308   guint rtx_min_delay;
309   gint rtx_delay_reorder;
310   gint rtx_retry_timeout;
311   gint rtx_min_retry_timeout;
312   gint rtx_retry_period;
313   gint rtx_max_retries;
314   guint rtx_stats_timeout;
315   gint rtx_deadline_ms;
316   gint max_rtcp_rtp_time_diff;
317   guint32 max_dropout_time;
318   guint32 max_misorder_time;
319   guint faststart_min_packets;
320
321   /* the last seqnum we pushed out */
322   guint32 last_popped_seqnum;
323   /* the next expected seqnum we push */
324   guint32 next_seqnum;
325   /* seqnum-base, if known */
326   guint32 seqnum_base;
327   /* last output time */
328   GstClockTime last_out_time;
329   /* last valid input timestamp and rtptime pair */
330   GstClockTime ips_pts;
331   guint64 ips_rtptime;
332   GstClockTime packet_spacing;
333   gint equidistant;
334
335   GQueue gap_packets;
336
337   /* the next expected seqnum we receive */
338   GstClockTime last_in_pts;
339   guint32 next_in_seqnum;
340
341   GArray *timers;
342   TimerQueue *rtx_stats_timers;
343
344   /* start and stop ranges */
345   GstClockTime npt_start;
346   GstClockTime npt_stop;
347   guint64 ext_timestamp;
348   guint64 last_elapsed;
349   guint64 estimated_eos;
350   GstClockID eos_id;
351
352   /* state */
353   gboolean eos;
354   guint last_percent;
355
356   /* clock rate and rtp timestamp offset */
357   gint last_pt;
358   gint32 clock_rate;
359   gint64 clock_base;
360   gint64 ts_offset_remainder;
361
362   /* when we are shutting down */
363   GstFlowReturn srcresult;
364   gboolean blocked;
365
366   /* for sync */
367   GstSegment segment;
368   GstClockID clock_id;
369   GstClockTime timer_timeout;
370   guint16 timer_seqnum;
371   /* the latency of the upstream peer, we have to take this into account when
372    * synchronizing the buffers. */
373   GstClockTime peer_latency;
374   guint64 ext_rtptime;
375   GstBuffer *last_sr;
376
377   /* some accounting */
378   guint64 num_pushed;
379   guint64 num_lost;
380   guint64 num_late;
381   guint64 num_duplicates;
382   guint64 num_rtx_requests;
383   guint64 num_rtx_success;
384   guint64 num_rtx_failed;
385   gdouble avg_rtx_num;
386   guint64 avg_rtx_rtt;
387   RTPPacketRateCtx packet_rate_ctx;
388
389   /* for the jitter */
390   GstClockTime last_dts;
391   GstClockTime last_pts;
392   guint64 last_rtptime;
393   GstClockTime avg_jitter;
394 };
395
396 typedef enum
397 {
398   TIMER_TYPE_EXPECTED,
399   TIMER_TYPE_LOST,
400   TIMER_TYPE_DEADLINE,
401   TIMER_TYPE_EOS
402 } TimerType;
403
404 typedef struct
405 {
406   guint idx;
407   guint16 seqnum;
408   guint num;
409   TimerType type;
410   GstClockTime timeout;
411   GstClockTime duration;
412   GstClockTime rtx_base;
413   GstClockTime rtx_delay;
414   GstClockTime rtx_retry;
415   GstClockTime rtx_last;
416   guint num_rtx_retry;
417   guint num_rtx_received;
418 } TimerData;
419
420 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
421 GST_STATIC_PAD_TEMPLATE ("sink",
422     GST_PAD_SINK,
423     GST_PAD_ALWAYS,
424     GST_STATIC_CAPS ("application/x-rtp"
425         /* "clock-rate = (int) [ 1, 2147483647 ], "
426          * "payload = (int) , "
427          * "encoding-name = (string) "
428          */ )
429     );
430
431 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
432 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
433     GST_PAD_SINK,
434     GST_PAD_REQUEST,
435     GST_STATIC_CAPS ("application/x-rtcp")
436     );
437
438 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
439 GST_STATIC_PAD_TEMPLATE ("src",
440     GST_PAD_SRC,
441     GST_PAD_ALWAYS,
442     GST_STATIC_CAPS ("application/x-rtp"
443         /* "payload = (int) , "
444          * "clock-rate = (int) , "
445          * "encoding-name = (string) "
446          */ )
447     );
448
449 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
450
451 #define gst_rtp_jitter_buffer_parent_class parent_class
452 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer,
453     GST_TYPE_ELEMENT);
454
455 /* object overrides */
456 static void gst_rtp_jitter_buffer_set_property (GObject * object,
457     guint prop_id, const GValue * value, GParamSpec * pspec);
458 static void gst_rtp_jitter_buffer_get_property (GObject * object,
459     guint prop_id, GValue * value, GParamSpec * pspec);
460 static void gst_rtp_jitter_buffer_finalize (GObject * object);
461
462 /* element overrides */
463 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
464     * element, GstStateChange transition);
465 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
466     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
467 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
468     GstPad * pad);
469 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
470 static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
471     GstClock * clock);
472
473 /* pad overrides */
474 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
475 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
476     GstObject * parent);
477
478 /* sinkpad overrides */
479 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
480     GstObject * parent, GstEvent * event);
481 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
482     GstObject * parent, GstBuffer * buffer);
483 static GstFlowReturn gst_rtp_jitter_buffer_chain_list (GstPad * pad,
484     GstObject * parent, GstBufferList * buffer_list);
485
486 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
487     GstObject * parent, GstEvent * event);
488 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
489     GstObject * parent, GstBuffer * buffer);
490
491 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
492     GstObject * parent, GstQuery * query);
493
494 /* srcpad overrides */
495 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
496     GstObject * parent, GstEvent * event);
497 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
498     GstObject * parent, GstPadMode mode, gboolean active);
499 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
500 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
501     GstObject * parent, GstQuery * query);
502
503 static void
504 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
505 static GstClockTime
506 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
507     gboolean active, guint64 base_time);
508 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
509
510 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
511 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
512
513 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
514
515 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
516     jitterbuffer);
517
518 static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
519     TimerData * timer, GstClockTime dts, gboolean success);
520
521 static TimerQueue *timer_queue_new (void);
522 static void timer_queue_free (TimerQueue * queue);
523
524 static void
525 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
526 {
527   GObjectClass *gobject_class;
528   GstElementClass *gstelement_class;
529
530   gobject_class = (GObjectClass *) klass;
531   gstelement_class = (GstElementClass *) klass;
532
533   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
534
535   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
536   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
537
538   /**
539    * GstRtpJitterBuffer:latency:
540    *
541    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
542    * for at most this time.
543    */
544   g_object_class_install_property (gobject_class, PROP_LATENCY,
545       g_param_spec_uint ("latency", "Buffer latency in ms",
546           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
547           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
548   /**
549    * GstRtpJitterBuffer:drop-on-latency:
550    *
551    * Drop oldest buffers when the queue is completely filled.
552    */
553   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
554       g_param_spec_boolean ("drop-on-latency",
555           "Drop buffers when maximum latency is reached",
556           "Tells the jitterbuffer to never exceed the given latency in size",
557           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
558   /**
559    * GstRtpJitterBuffer:ts-offset:
560    *
561    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
562    * This is mainly used to ensure interstream synchronisation.
563    */
564   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
565       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
566           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
567           G_MAXINT64, DEFAULT_TS_OFFSET,
568           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
569
570   /**
571    * GstRtpJitterBuffer:max-ts-offset-adjustment:
572    *
573    * The maximum number of nanoseconds per frame that time offset may be
574    * adjusted with. This is used to avoid sudden large changes to time stamps.
575    */
576   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
577       g_param_spec_uint64 ("max-ts-offset-adjustment",
578           "Max Timestamp Offset Adjustment",
579           "The maximum number of nanoseconds per frame that time stamp "
580           "offsets may be adjusted (0 = no limit).", 0, G_MAXUINT64,
581           DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
582           G_PARAM_STATIC_STRINGS));
583
584   /**
585    * GstRtpJitterBuffer:do-lost:
586    *
587    * Send out a GstRTPPacketLost event downstream when a packet is considered
588    * lost.
589    */
590   g_object_class_install_property (gobject_class, PROP_DO_LOST,
591       g_param_spec_boolean ("do-lost", "Do Lost",
592           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
593           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
594
595   /**
596    * GstRtpJitterBuffer:mode:
597    *
598    * Control the buffering and timestamping mode used by the jitterbuffer.
599    */
600   g_object_class_install_property (gobject_class, PROP_MODE,
601       g_param_spec_enum ("mode", "Mode",
602           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
603           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
604   /**
605    * GstRtpJitterBuffer:percent:
606    *
607    * The percent of the jitterbuffer that is filled.
608    */
609   g_object_class_install_property (gobject_class, PROP_PERCENT,
610       g_param_spec_int ("percent", "percent",
611           "The buffer filled percent", 0, 100,
612           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
613   /**
614    * GstRtpJitterBuffer:do-retransmission:
615    *
616    * Send out a GstRTPRetransmission event upstream when a packet is considered
617    * late and should be retransmitted.
618    *
619    * Since: 1.2
620    */
621   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
622       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
623           "Send retransmission events upstream when a packet is late",
624           DEFAULT_DO_RETRANSMISSION,
625           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
626
627   /**
628    * GstRtpJitterBuffer:rtx-next-seqnum
629    *
630    * Estimate when the next packet should arrive and schedule a retransmission
631    * request for it.
632    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
633    * for packet N+1. So it will be requested if it does not arrive at the expected time.
634    * The expected time is calculated using the dts of N and the packet spacing.
635    *
636    * Since: 1.6
637    */
638   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
639       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
640           "Estimate when the next packet should arrive and schedule a "
641           "retransmission request for it.",
642           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
643
644   /**
645    * GstRtpJitterBuffer:rtx-delay:
646    *
647    * When a packet did not arrive at the expected time, wait this extra amount
648    * of time before sending a retransmission event.
649    *
650    * When -1 is used, the max jitter will be used as extra delay.
651    *
652    * Since: 1.2
653    */
654   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
655       g_param_spec_int ("rtx-delay", "RTX Delay",
656           "Extra time in ms to wait before sending retransmission "
657           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
658           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
659
660   /**
661    * GstRtpJitterBuffer:rtx-min-delay:
662    *
663    * When a packet did not arrive at the expected time, wait at least this extra amount
664    * of time before sending a retransmission event.
665    *
666    * Since: 1.6
667    */
668   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
669       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
670           "Minimum time in ms to wait before sending retransmission "
671           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
672           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
673   /**
674    * GstRtpJitterBuffer:rtx-delay-reorder:
675    *
676    * Assume that a retransmission event should be sent when we see
677    * this much packet reordering.
678    *
679    * When -1 is used, the value will be estimated based on observed packet
680    * reordering. When 0 is used packet reordering alone will not cause a
681    * retransmission event (Since 1.10).
682    *
683    * Since: 1.2
684    */
685   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
686       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
687           "Sending retransmission event when this much reordering "
688           "(0 disable)",
689           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
690           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
691   /**
692    * GstRtpJitterBuffer::rtx-retry-timeout:
693    *
694    * When no packet has been received after sending a retransmission event
695    * for this time, retry sending a retransmission event.
696    *
697    * When -1 is used, the value will be estimated based on observed round
698    * trip time.
699    *
700    * Since: 1.2
701    */
702   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
703       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
704           "Retry sending a transmission event after this timeout in "
705           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
706           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
707   /**
708    * GstRtpJitterBuffer::rtx-min-retry-timeout:
709    *
710    * The minimum amount of time between retry timeouts. When
711    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
712    * minimum interval between retry timeouts.
713    *
714    * When -1 is used, the value will be estimated based on the
715    * packet spacing.
716    *
717    * Since: 1.6
718    */
719   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
720       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
721           "Minimum timeout between sending a transmission event in "
722           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
723           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
724   /**
725    * GstRtpJitterBuffer:rtx-retry-period:
726    *
727    * The amount of time to try to get a retransmission.
728    *
729    * When -1 is used, the value will be estimated based on the jitterbuffer
730    * latency and the observed round trip time.
731    *
732    * Since: 1.2
733    */
734   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
735       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
736           "Try to get a retransmission for this many ms "
737           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
738           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
739   /**
740    * GstRtpJitterBuffer:rtx-max-retries:
741    *
742    * The maximum number of retries to request a retransmission.
743    *
744    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
745    * When -1 is used, the number of retransmission request will not be limited.
746    *
747    * Since: 1.6
748    */
749   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
750       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
751           "The maximum number of retries to request a retransmission. "
752           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
753           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
754   /**
755    * GstRtpJitterBuffer:rtx-deadline:
756    *
757    * The deadline for a valid RTX request in ms.
758    *
759    * How long the RTX RTCP will be valid for.
760    * When -1 is used, the size of the jitterbuffer will be used.
761    *
762    * Since: 1.10
763    */
764   g_object_class_install_property (gobject_class, PROP_RTX_DEADLINE,
765       g_param_spec_int ("rtx-deadline", "RTX Deadline (ms)",
766           "The deadline for a valid RTX request in milliseconds. "
767           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DEADLINE,
768           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
769 /**
770    * GstRtpJitterBuffer::rtx-stats-timeout:
771    *
772    * The time to wait for a retransmitted packet after it has been
773    * considered lost in order to collect RTX statistics.
774    *
775    * Since: 1.10
776    */
777   g_object_class_install_property (gobject_class, PROP_RTX_STATS_TIMEOUT,
778       g_param_spec_uint ("rtx-stats-timeout", "RTX Statistics Timeout",
779           "The time to wait for a retransmitted packet after it has been "
780           "considered lost in order to collect statistics (ms)",
781           0, G_MAXUINT, DEFAULT_RTX_STATS_TIMEOUT,
782           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
783
784   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
785       g_param_spec_uint ("max-dropout-time", "Max dropout time",
786           "The maximum time (milliseconds) of missing packets tolerated.",
787           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
788           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
789
790   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
791       g_param_spec_uint ("max-misorder-time", "Max misorder time",
792           "The maximum time (milliseconds) of misordered packets tolerated.",
793           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
794           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
795   /**
796    * GstRtpJitterBuffer:stats:
797    *
798    * Various jitterbuffer statistics. This property returns a GstStructure
799    * with name application/x-rtp-jitterbuffer-stats with the following fields:
800    *
801    * <itemizedlist>
802    * <listitem>
803    *   <para>
804    *   #guint64
805    *   <classname>&quot;num-pushed&quot;</classname>:
806    *   the number of packets pushed out.
807    *   </para>
808    * </listitem>
809    * <listitem>
810    *   <para>
811    *   #guint64
812    *   <classname>&quot;num-lost&quot;</classname>:
813    *   the number of packets considered lost.
814    *   </para>
815    * </listitem>
816    * <listitem>
817    *   <para>
818    *   #guint64
819    *   <classname>&quot;num-late&quot;</classname>:
820    *   the number of packets arriving too late.
821    *   </para>
822    * </listitem>
823    * <listitem>
824    *   <para>
825    *   #guint64
826    *   <classname>&quot;num-duplicates&quot;</classname>:
827    *   the number of duplicate packets.
828    *   </para>
829    * </listitem>
830    * <listitem>
831    *   <para>
832    *   #guint64
833    *   <classname>&quot;rtx-count&quot;</classname>:
834    *   the number of retransmissions requested.
835    *   </para>
836    * </listitem>
837    * <listitem>
838    *   <para>
839    *   #guint64
840    *   <classname>&quot;rtx-success-count&quot;</classname>:
841    *   the number of successful retransmissions.
842    *   </para>
843    * </listitem>
844    * <listitem>
845    *   <para>
846    *   #gdouble
847    *   <classname>&quot;rtx-per-packet&quot;</classname>:
848    *   average number of RTX per packet.
849    *   </para>
850    * </listitem>
851    * <listitem>
852    *   <para>
853    *   #guint64
854    *   <classname>&quot;rtx-rtt&quot;</classname>:
855    *   average round trip time per RTX.
856    *   </para>
857    * </listitem>
858    * </itemizedlist>
859    *
860    * Since: 1.4
861    */
862   g_object_class_install_property (gobject_class, PROP_STATS,
863       g_param_spec_boxed ("stats", "Statistics",
864           "Various statistics", GST_TYPE_STRUCTURE,
865           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
866
867   /**
868    * GstRtpJitterBuffer:max-rtcp-rtp-time-diff
869    *
870    * The maximum amount of time in ms that the RTP time in the RTCP SRs
871    * is allowed to be ahead of the last RTP packet we received. Use
872    * -1 to disable ignoring of RTCP packets.
873    *
874    * Since: 1.8
875    */
876   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
877       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
878           "Maximum amount of time in ms that the RTP time in RTCP SRs "
879           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
880           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
881           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
882
883   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
884       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
885           "Synchronize received streams to the RFC7273 clock "
886           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
887           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
888
889   /**
890    * GstRtpJitterBuffer:faststart-min-packets
891    *
892    * The number of consecutive packets needed to start (set to 0 to
893    * disable faststart. The jitterbuffer will by default start after the
894    * latency has elapsed)
895    *
896    * Since: 1.14
897    */
898   g_object_class_install_property (gobject_class, PROP_FASTSTART_MIN_PACKETS,
899       g_param_spec_uint ("faststart-min-packets", "Faststart minimum packets",
900           "The number of consecutive packets needed to start (set to 0 to "
901           "disable faststart. The jitterbuffer will by default start after "
902           "the latency has elapsed)",
903           0, G_MAXUINT, DEFAULT_FASTSTART_MIN_PACKETS,
904           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
905
906   /**
907    * GstRtpJitterBuffer::request-pt-map:
908    * @buffer: the object which received the signal
909    * @pt: the pt
910    *
911    * Request the payload type as #GstCaps for @pt.
912    */
913   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
914       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
915       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
916           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
917       GST_TYPE_CAPS, 1, G_TYPE_UINT);
918   /**
919    * GstRtpJitterBuffer::handle-sync:
920    * @buffer: the object which received the signal
921    * @struct: a GstStructure containing sync values.
922    *
923    * Be notified of new sync values.
924    */
925   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
926       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
927       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
928           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
929       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
930
931   /**
932    * GstRtpJitterBuffer::on-npt-stop:
933    * @buffer: the object which received the signal
934    *
935    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
936    * the npt-stop position.
937    */
938   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
939       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
940       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
941           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
942       G_TYPE_NONE, 0, G_TYPE_NONE);
943
944   /**
945    * GstRtpJitterBuffer::clear-pt-map:
946    * @buffer: the object which received the signal
947    *
948    * Invalidate the clock-rate as obtained with the
949    * #GstRtpJitterBuffer::request-pt-map signal.
950    */
951   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
952       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
953       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
954       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
955       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
956
957   /**
958    * GstRtpJitterBuffer::set-active:
959    * @buffer: the object which received the signal
960    *
961    * Start pushing out packets with the given base time. This signal is only
962    * useful in buffering mode.
963    *
964    * Returns: the time of the last pushed packet.
965    */
966   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
967       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
968       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
969       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
970       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
971       G_TYPE_UINT64);
972
973   gstelement_class->change_state =
974       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
975   gstelement_class->request_new_pad =
976       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
977   gstelement_class->release_pad =
978       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
979   gstelement_class->provide_clock =
980       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
981   gstelement_class->set_clock =
982       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
983
984   gst_element_class_add_static_pad_template (gstelement_class,
985       &gst_rtp_jitter_buffer_src_template);
986   gst_element_class_add_static_pad_template (gstelement_class,
987       &gst_rtp_jitter_buffer_sink_template);
988   gst_element_class_add_static_pad_template (gstelement_class,
989       &gst_rtp_jitter_buffer_sink_rtcp_template);
990
991   gst_element_class_set_static_metadata (gstelement_class,
992       "RTP packet jitter-buffer", "Filter/Network/RTP",
993       "A buffer that deals with network jitter and other transmission faults",
994       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
995       "Wim Taymans <wim.taymans@gmail.com>");
996
997   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
998   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
999
1000   GST_DEBUG_CATEGORY_INIT
1001       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
1002 }
1003
1004 static void
1005 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
1006 {
1007   GstRtpJitterBufferPrivate *priv;
1008
1009   priv = gst_rtp_jitter_buffer_get_instance_private (jitterbuffer);
1010   jitterbuffer->priv = priv;
1011
1012   priv->latency_ms = DEFAULT_LATENCY_MS;
1013   priv->latency_ns = priv->latency_ms * GST_MSECOND;
1014   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
1015   priv->ts_offset = DEFAULT_TS_OFFSET;
1016   priv->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
1017   priv->do_lost = DEFAULT_DO_LOST;
1018   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
1019   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
1020   priv->rtx_delay = DEFAULT_RTX_DELAY;
1021   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
1022   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
1023   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
1024   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
1025   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
1026   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
1027   priv->rtx_deadline_ms = DEFAULT_RTX_DEADLINE;
1028   priv->rtx_stats_timeout = DEFAULT_RTX_STATS_TIMEOUT;
1029   priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
1030   priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
1031   priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
1032   priv->faststart_min_packets = DEFAULT_FASTSTART_MIN_PACKETS;
1033
1034   priv->ts_offset_remainder = 0;
1035   priv->last_dts = -1;
1036   priv->last_pts = -1;
1037   priv->last_rtptime = -1;
1038   priv->avg_jitter = 0;
1039   priv->segment_seqnum = GST_SEQNUM_INVALID;
1040   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
1041   priv->rtx_stats_timers = timer_queue_new ();
1042   priv->jbuf = rtp_jitter_buffer_new ();
1043   g_mutex_init (&priv->jbuf_lock);
1044   g_cond_init (&priv->jbuf_queue);
1045   g_cond_init (&priv->jbuf_timer);
1046   g_cond_init (&priv->jbuf_event);
1047   g_cond_init (&priv->jbuf_query);
1048   g_queue_init (&priv->gap_packets);
1049   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1050
1051   /* reset skew detection initialy */
1052   rtp_jitter_buffer_reset_skew (priv->jbuf);
1053   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
1054   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1055   priv->active = TRUE;
1056
1057   priv->srcpad =
1058       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
1059       "src");
1060
1061   gst_pad_set_activatemode_function (priv->srcpad,
1062       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
1063   gst_pad_set_query_function (priv->srcpad,
1064       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
1065   gst_pad_set_event_function (priv->srcpad,
1066       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
1067
1068   priv->sinkpad =
1069       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
1070       "sink");
1071
1072   gst_pad_set_chain_function (priv->sinkpad,
1073       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
1074   gst_pad_set_chain_list_function (priv->sinkpad,
1075       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain_list));
1076   gst_pad_set_event_function (priv->sinkpad,
1077       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
1078   gst_pad_set_query_function (priv->sinkpad,
1079       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
1080
1081   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
1082   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
1083
1084   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
1085 }
1086
1087 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
1088
1089 #define ITEM_TYPE_BUFFER        0
1090 #define ITEM_TYPE_LOST          1
1091 #define ITEM_TYPE_EVENT         2
1092 #define ITEM_TYPE_QUERY         3
1093
1094 static RTPJitterBufferItem *
1095 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
1096     guint seqnum, guint count, guint rtptime)
1097 {
1098   RTPJitterBufferItem *item;
1099
1100   item = g_slice_new (RTPJitterBufferItem);
1101   item->data = data;
1102   item->next = NULL;
1103   item->prev = NULL;
1104   item->type = type;
1105   item->dts = dts;
1106   item->pts = pts;
1107   item->seqnum = seqnum;
1108   item->count = count;
1109   item->rtptime = rtptime;
1110
1111   return item;
1112 }
1113
1114 static void
1115 free_item (RTPJitterBufferItem * item)
1116 {
1117   g_return_if_fail (item != NULL);
1118
1119   if (item->data && item->type != ITEM_TYPE_QUERY)
1120     gst_mini_object_unref (item->data);
1121   g_slice_free (RTPJitterBufferItem, item);
1122 }
1123
1124 static void
1125 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
1126 {
1127   GList **l = user_data;
1128
1129   if (item->data && item->type == ITEM_TYPE_EVENT
1130       && GST_EVENT_IS_STICKY (item->data)) {
1131     *l = g_list_prepend (*l, item->data);
1132   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
1133     gst_mini_object_unref (item->data);
1134   }
1135   g_slice_free (RTPJitterBufferItem, item);
1136 }
1137
1138 static void
1139 gst_rtp_jitter_buffer_finalize (GObject * object)
1140 {
1141   GstRtpJitterBuffer *jitterbuffer;
1142   GstRtpJitterBufferPrivate *priv;
1143
1144   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1145   priv = jitterbuffer->priv;
1146
1147   g_array_free (priv->timers, TRUE);
1148   timer_queue_free (priv->rtx_stats_timers);
1149   g_mutex_clear (&priv->jbuf_lock);
1150   g_cond_clear (&priv->jbuf_queue);
1151   g_cond_clear (&priv->jbuf_timer);
1152   g_cond_clear (&priv->jbuf_event);
1153   g_cond_clear (&priv->jbuf_query);
1154
1155   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1156   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1157   g_queue_clear (&priv->gap_packets);
1158   g_object_unref (priv->jbuf);
1159
1160   G_OBJECT_CLASS (parent_class)->finalize (object);
1161 }
1162
1163 static GstIterator *
1164 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
1165 {
1166   GstRtpJitterBuffer *jitterbuffer;
1167   GstPad *otherpad = NULL;
1168   GstIterator *it = NULL;
1169   GValue val = { 0, };
1170
1171   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1172
1173   if (pad == jitterbuffer->priv->sinkpad) {
1174     otherpad = jitterbuffer->priv->srcpad;
1175   } else if (pad == jitterbuffer->priv->srcpad) {
1176     otherpad = jitterbuffer->priv->sinkpad;
1177   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
1178     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1179   }
1180
1181   if (it == NULL) {
1182     g_value_init (&val, GST_TYPE_PAD);
1183     g_value_set_object (&val, otherpad);
1184     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1185     g_value_unset (&val);
1186   }
1187
1188   return it;
1189 }
1190
1191 static GstPad *
1192 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1193 {
1194   GstRtpJitterBufferPrivate *priv;
1195
1196   priv = jitterbuffer->priv;
1197
1198   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
1199
1200   priv->rtcpsinkpad =
1201       gst_pad_new_from_static_template
1202       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
1203   gst_pad_set_chain_function (priv->rtcpsinkpad,
1204       gst_rtp_jitter_buffer_chain_rtcp);
1205   gst_pad_set_event_function (priv->rtcpsinkpad,
1206       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
1207   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
1208       gst_rtp_jitter_buffer_iterate_internal_links);
1209   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
1210   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1211
1212   return priv->rtcpsinkpad;
1213 }
1214
1215 static void
1216 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1217 {
1218   GstRtpJitterBufferPrivate *priv;
1219
1220   priv = jitterbuffer->priv;
1221
1222   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
1223
1224   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
1225
1226   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1227   priv->rtcpsinkpad = NULL;
1228 }
1229
1230 static GstPad *
1231 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
1232     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
1233 {
1234   GstRtpJitterBuffer *jitterbuffer;
1235   GstElementClass *klass;
1236   GstPad *result;
1237   GstRtpJitterBufferPrivate *priv;
1238
1239   g_return_val_if_fail (templ != NULL, NULL);
1240   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
1241
1242   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1243   priv = jitterbuffer->priv;
1244   klass = GST_ELEMENT_GET_CLASS (element);
1245
1246   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1247
1248   /* figure out the template */
1249   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1250     if (priv->rtcpsinkpad != NULL)
1251       goto exists;
1252
1253     result = create_rtcp_sink (jitterbuffer);
1254   } else
1255     goto wrong_template;
1256
1257   return result;
1258
1259   /* ERRORS */
1260 wrong_template:
1261   {
1262     g_warning ("rtpjitterbuffer: this is not our template");
1263     return NULL;
1264   }
1265 exists:
1266   {
1267     g_warning ("rtpjitterbuffer: pad already requested");
1268     return NULL;
1269   }
1270 }
1271
1272 static void
1273 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1274 {
1275   GstRtpJitterBuffer *jitterbuffer;
1276   GstRtpJitterBufferPrivate *priv;
1277
1278   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1279   g_return_if_fail (GST_IS_PAD (pad));
1280
1281   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1282   priv = jitterbuffer->priv;
1283
1284   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1285
1286   if (priv->rtcpsinkpad == pad) {
1287     remove_rtcp_sink (jitterbuffer);
1288   } else
1289     goto wrong_pad;
1290
1291   return;
1292
1293   /* ERRORS */
1294 wrong_pad:
1295   {
1296     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1297     return;
1298   }
1299 }
1300
1301 static GstClock *
1302 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1303 {
1304   return gst_system_clock_obtain ();
1305 }
1306
1307 static gboolean
1308 gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
1309 {
1310   GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1311
1312   rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
1313
1314   return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
1315 }
1316
1317 static void
1318 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1319 {
1320   GstRtpJitterBufferPrivate *priv;
1321
1322   priv = jitterbuffer->priv;
1323
1324   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1325
1326   JBUF_LOCK (priv);
1327   priv->clock_rate = -1;
1328   /* do not clear current content, but refresh state for new arrival */
1329   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1330   rtp_jitter_buffer_reset_skew (priv->jbuf);
1331   JBUF_UNLOCK (priv);
1332 }
1333
1334 static GstClockTime
1335 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1336     guint64 offset)
1337 {
1338   GstRtpJitterBufferPrivate *priv;
1339   GstClockTime last_out;
1340   RTPJitterBufferItem *item;
1341
1342   priv = jbuf->priv;
1343
1344   JBUF_LOCK (priv);
1345   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1346       active, GST_TIME_ARGS (offset));
1347
1348   if (active != priv->active) {
1349     /* add the amount of time spent in paused to the output offset. All
1350      * outgoing buffers will have this offset applied to their timestamps in
1351      * order to make them arrive in time in the sink. */
1352     priv->out_offset = offset;
1353     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1354         GST_TIME_ARGS (priv->out_offset));
1355     priv->active = active;
1356     JBUF_SIGNAL_EVENT (priv);
1357   }
1358   if (!active) {
1359     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1360   }
1361   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1362     /* head buffer timestamp and offset gives our output time */
1363     last_out = item->pts + priv->ts_offset;
1364   } else {
1365     /* use last known time when the buffer is empty */
1366     last_out = priv->last_out_time;
1367   }
1368   JBUF_UNLOCK (priv);
1369
1370   return last_out;
1371 }
1372
1373 static GstCaps *
1374 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1375 {
1376   GstRtpJitterBuffer *jitterbuffer;
1377   GstRtpJitterBufferPrivate *priv;
1378   GstPad *other;
1379   GstCaps *caps;
1380   GstCaps *templ;
1381
1382   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1383   priv = jitterbuffer->priv;
1384
1385   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1386
1387   caps = gst_pad_peer_query_caps (other, filter);
1388
1389   templ = gst_pad_get_pad_template_caps (pad);
1390   if (caps == NULL) {
1391     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1392     caps = templ;
1393   } else {
1394     GstCaps *intersect;
1395
1396     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1397
1398     intersect = gst_caps_intersect (caps, templ);
1399     gst_caps_unref (caps);
1400     gst_caps_unref (templ);
1401
1402     caps = intersect;
1403   }
1404   gst_object_unref (jitterbuffer);
1405
1406   return caps;
1407 }
1408
1409 /*
1410  * Must be called with JBUF_LOCK held
1411  */
1412
1413 static gboolean
1414 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1415     GstCaps * caps, gint pt)
1416 {
1417   GstRtpJitterBufferPrivate *priv;
1418   GstStructure *caps_struct;
1419   guint val;
1420   gint payload = -1;
1421   GstClockTime tval;
1422   const gchar *ts_refclk, *mediaclk;
1423
1424   priv = jitterbuffer->priv;
1425
1426   /* first parse the caps */
1427   caps_struct = gst_caps_get_structure (caps, 0);
1428
1429   GST_DEBUG_OBJECT (jitterbuffer, "got caps %" GST_PTR_FORMAT, caps);
1430
1431   if (gst_structure_get_int (caps_struct, "payload", &payload) && pt != -1
1432       && payload != pt) {
1433     GST_ERROR_OBJECT (jitterbuffer,
1434         "Got caps with wrong payload type (got %d, expected %d)", pt, payload);
1435     return FALSE;
1436   }
1437
1438   if (payload != -1) {
1439     GST_DEBUG_OBJECT (jitterbuffer, "Got payload type %d", payload);
1440     priv->last_pt = payload;
1441   }
1442
1443   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1444    * measure the amount of data in the buffer */
1445   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1446     goto error;
1447
1448   if (priv->clock_rate <= 0)
1449     goto wrong_rate;
1450
1451   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1452
1453   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1454
1455   gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
1456
1457   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1458    * can use this to track the amount of time elapsed on the sender. */
1459   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1460     priv->clock_base = val;
1461   else
1462     priv->clock_base = -1;
1463
1464   priv->ext_timestamp = priv->clock_base;
1465
1466   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1467       priv->clock_base);
1468
1469   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1470     /* first expected seqnum, only update when we didn't have a previous base. */
1471     if (priv->next_in_seqnum == -1)
1472       priv->next_in_seqnum = val;
1473     if (priv->next_seqnum == -1) {
1474       priv->next_seqnum = val;
1475       JBUF_SIGNAL_EVENT (priv);
1476     }
1477     priv->seqnum_base = val;
1478   } else {
1479     priv->seqnum_base = -1;
1480   }
1481
1482   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1483
1484   /* the start and stop times. The seqnum-base corresponds to the start time. We
1485    * will keep track of the seqnums on the output and when we reach the one
1486    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1487   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1488     priv->npt_start = tval;
1489   else
1490     priv->npt_start = 0;
1491
1492   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1493     priv->npt_stop = tval;
1494   else
1495     priv->npt_stop = -1;
1496
1497   GST_DEBUG_OBJECT (jitterbuffer,
1498       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1499       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1500
1501   if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
1502     GstClock *clock = NULL;
1503     guint64 clock_offset = -1;
1504
1505     GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
1506         ts_refclk);
1507
1508     if (g_str_has_prefix (ts_refclk, "ntp=")) {
1509       if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
1510         GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
1511       } else {
1512         const gchar *host, *portstr;
1513         gchar *hostname;
1514         guint port;
1515
1516         host = ts_refclk + sizeof ("ntp=") - 1;
1517         if (host[0] == '[') {
1518           /* IPv6 */
1519           portstr = strchr (host, ']');
1520           if (portstr && portstr[1] == ':')
1521             portstr = portstr + 1;
1522           else
1523             portstr = NULL;
1524         } else {
1525           portstr = strrchr (host, ':');
1526         }
1527
1528
1529         if (!portstr || sscanf (portstr, ":%u", &port) != 1)
1530           port = 123;
1531
1532         if (portstr)
1533           hostname = g_strndup (host, (portstr - host));
1534         else
1535           hostname = g_strdup (host);
1536
1537         clock = gst_ntp_clock_new (NULL, hostname, port, 0);
1538         g_free (hostname);
1539       }
1540     } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
1541       const gchar *domainstr =
1542           ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
1543       guint domain;
1544
1545       if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
1546         domain = 0;
1547
1548       clock = gst_ptp_clock_new (NULL, domain);
1549     } else {
1550       GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
1551     }
1552
1553     if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
1554       GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
1555
1556       if (!g_str_has_prefix (mediaclk, "direct=")
1557           || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
1558         GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
1559       if (strstr (mediaclk, "rate=") != NULL) {
1560         GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
1561         clock_offset = -1;
1562       }
1563     }
1564
1565     rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
1566   } else {
1567     rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
1568   }
1569
1570   return TRUE;
1571
1572   /* ERRORS */
1573 error:
1574   {
1575     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1576     return FALSE;
1577   }
1578 wrong_rate:
1579   {
1580     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1581     return FALSE;
1582   }
1583 }
1584
1585 static void
1586 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1587 {
1588   GstRtpJitterBufferPrivate *priv;
1589
1590   priv = jitterbuffer->priv;
1591
1592   JBUF_LOCK (priv);
1593   /* mark ourselves as flushing */
1594   priv->srcresult = GST_FLOW_FLUSHING;
1595   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1596   /* this unblocks any waiting pops on the src pad task */
1597   JBUF_SIGNAL_EVENT (priv);
1598   JBUF_SIGNAL_QUERY (priv, FALSE);
1599   JBUF_SIGNAL_QUEUE (priv);
1600   JBUF_UNLOCK (priv);
1601 }
1602
1603 static void
1604 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1605 {
1606   GstRtpJitterBufferPrivate *priv;
1607
1608   priv = jitterbuffer->priv;
1609
1610   JBUF_LOCK (priv);
1611   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1612   /* Mark as non flushing */
1613   priv->srcresult = GST_FLOW_OK;
1614   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1615   priv->last_popped_seqnum = -1;
1616   priv->last_out_time = GST_CLOCK_TIME_NONE;
1617   priv->next_seqnum = -1;
1618   priv->seqnum_base = -1;
1619   priv->ips_rtptime = -1;
1620   priv->ips_pts = GST_CLOCK_TIME_NONE;
1621   priv->packet_spacing = 0;
1622   priv->next_in_seqnum = -1;
1623   priv->clock_rate = -1;
1624   priv->last_pt = -1;
1625   priv->eos = FALSE;
1626   priv->estimated_eos = -1;
1627   priv->last_elapsed = 0;
1628   priv->ext_timestamp = -1;
1629   priv->avg_jitter = 0;
1630   priv->last_dts = -1;
1631   priv->last_rtptime = -1;
1632   priv->last_in_pts = 0;
1633   priv->equidistant = 0;
1634   priv->segment_seqnum = GST_SEQNUM_INVALID;
1635   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1636   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1637   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1638   rtp_jitter_buffer_reset_skew (priv->jbuf);
1639   remove_all_timers (jitterbuffer);
1640   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1641   g_queue_clear (&priv->gap_packets);
1642   JBUF_UNLOCK (priv);
1643 }
1644
1645 static gboolean
1646 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1647     GstPadMode mode, gboolean active)
1648 {
1649   gboolean result;
1650   GstRtpJitterBuffer *jitterbuffer = NULL;
1651
1652   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1653
1654   switch (mode) {
1655     case GST_PAD_MODE_PUSH:
1656       if (active) {
1657         /* allow data processing */
1658         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1659
1660         /* start pushing out buffers */
1661         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1662         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1663             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1664       } else {
1665         /* make sure all data processing stops ASAP */
1666         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1667
1668         /* NOTE this will hardlock if the state change is called from the src pad
1669          * task thread because we will _join() the thread. */
1670         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1671         result = gst_pad_stop_task (pad);
1672       }
1673       break;
1674     default:
1675       result = FALSE;
1676       break;
1677   }
1678   return result;
1679 }
1680
1681 static GstStateChangeReturn
1682 gst_rtp_jitter_buffer_change_state (GstElement * element,
1683     GstStateChange transition)
1684 {
1685   GstRtpJitterBuffer *jitterbuffer;
1686   GstRtpJitterBufferPrivate *priv;
1687   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1688
1689   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1690   priv = jitterbuffer->priv;
1691
1692   switch (transition) {
1693     case GST_STATE_CHANGE_NULL_TO_READY:
1694       break;
1695     case GST_STATE_CHANGE_READY_TO_PAUSED:
1696       JBUF_LOCK (priv);
1697       /* reset negotiated values */
1698       priv->clock_rate = -1;
1699       priv->clock_base = -1;
1700       priv->peer_latency = 0;
1701       priv->last_pt = -1;
1702       /* block until we go to PLAYING */
1703       priv->blocked = TRUE;
1704       priv->timer_running = TRUE;
1705       priv->srcresult = GST_FLOW_OK;
1706       priv->timer_thread =
1707           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1708       JBUF_UNLOCK (priv);
1709       break;
1710     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1711       JBUF_LOCK (priv);
1712       /* unblock to allow streaming in PLAYING */
1713       priv->blocked = FALSE;
1714       JBUF_SIGNAL_EVENT (priv);
1715       JBUF_SIGNAL_TIMER (priv);
1716       JBUF_UNLOCK (priv);
1717       break;
1718     default:
1719       break;
1720   }
1721
1722   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1723
1724   switch (transition) {
1725     case GST_STATE_CHANGE_READY_TO_PAUSED:
1726       /* we are a live element because we sync to the clock, which we can only
1727        * do in the PLAYING state */
1728       if (ret != GST_STATE_CHANGE_FAILURE)
1729         ret = GST_STATE_CHANGE_NO_PREROLL;
1730       break;
1731     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1732       JBUF_LOCK (priv);
1733       /* block to stop streaming when PAUSED */
1734       priv->blocked = TRUE;
1735       unschedule_current_timer (jitterbuffer);
1736       JBUF_UNLOCK (priv);
1737       if (ret != GST_STATE_CHANGE_FAILURE)
1738         ret = GST_STATE_CHANGE_NO_PREROLL;
1739       break;
1740     case GST_STATE_CHANGE_PAUSED_TO_READY:
1741       JBUF_LOCK (priv);
1742       gst_buffer_replace (&priv->last_sr, NULL);
1743       priv->timer_running = FALSE;
1744       priv->srcresult = GST_FLOW_FLUSHING;
1745       unschedule_current_timer (jitterbuffer);
1746       JBUF_SIGNAL_TIMER (priv);
1747       JBUF_SIGNAL_QUERY (priv, FALSE);
1748       JBUF_SIGNAL_QUEUE (priv);
1749       JBUF_UNLOCK (priv);
1750       g_thread_join (priv->timer_thread);
1751       priv->timer_thread = NULL;
1752       break;
1753     case GST_STATE_CHANGE_READY_TO_NULL:
1754       break;
1755     default:
1756       break;
1757   }
1758
1759   return ret;
1760 }
1761
1762 static gboolean
1763 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1764     GstEvent * event)
1765 {
1766   gboolean ret = TRUE;
1767   GstRtpJitterBuffer *jitterbuffer;
1768   GstRtpJitterBufferPrivate *priv;
1769
1770   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1771   priv = jitterbuffer->priv;
1772
1773   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1774
1775   switch (GST_EVENT_TYPE (event)) {
1776     case GST_EVENT_LATENCY:
1777     {
1778       GstClockTime latency;
1779
1780       gst_event_parse_latency (event, &latency);
1781
1782       GST_DEBUG_OBJECT (jitterbuffer,
1783           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1784
1785       JBUF_LOCK (priv);
1786       /* adjust the overall buffer delay to the total pipeline latency in
1787        * buffering mode because if downstream consumes too fast (because of
1788        * large latency or queues, we would start rebuffering again. */
1789       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1790           RTP_JITTER_BUFFER_MODE_BUFFER) {
1791         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1792       }
1793       JBUF_UNLOCK (priv);
1794
1795       ret = gst_pad_push_event (priv->sinkpad, event);
1796       break;
1797     }
1798     default:
1799       ret = gst_pad_push_event (priv->sinkpad, event);
1800       break;
1801   }
1802
1803   return ret;
1804 }
1805
1806 /* handles and stores the event in the jitterbuffer, must be called with
1807  * LOCK */
1808 static gboolean
1809 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1810 {
1811   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1812   RTPJitterBufferItem *item;
1813   gboolean head;
1814
1815   switch (GST_EVENT_TYPE (event)) {
1816     case GST_EVENT_CAPS:
1817     {
1818       GstCaps *caps;
1819
1820       gst_event_parse_caps (event, &caps);
1821       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, -1);
1822       break;
1823     }
1824     case GST_EVENT_SEGMENT:
1825     {
1826       GstSegment segment;
1827       gst_event_copy_segment (event, &segment);
1828
1829       priv->segment_seqnum = gst_event_get_seqnum (event);
1830
1831       /* we need time for now */
1832       if (segment.format != GST_FORMAT_TIME) {
1833         GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
1834         gst_event_unref (event);
1835
1836         gst_segment_init (&segment, GST_FORMAT_TIME);
1837         event = gst_event_new_segment (&segment);
1838         gst_event_set_seqnum (event, priv->segment_seqnum);
1839       }
1840
1841       priv->segment = segment;
1842       break;
1843     }
1844     case GST_EVENT_EOS:
1845       priv->eos = TRUE;
1846       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1847       break;
1848     default:
1849       break;
1850   }
1851
1852
1853   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1854   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1855   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1856   if (head || priv->eos)
1857     JBUF_SIGNAL_EVENT (priv);
1858
1859   return TRUE;
1860 }
1861
1862 static gboolean
1863 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1864     GstEvent * event)
1865 {
1866   gboolean ret = TRUE;
1867   GstRtpJitterBuffer *jitterbuffer;
1868   GstRtpJitterBufferPrivate *priv;
1869
1870   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1871   priv = jitterbuffer->priv;
1872
1873   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1874
1875   switch (GST_EVENT_TYPE (event)) {
1876     case GST_EVENT_FLUSH_START:
1877       ret = gst_pad_push_event (priv->srcpad, event);
1878       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1879       /* wait for the loop to go into PAUSED */
1880       gst_pad_pause_task (priv->srcpad);
1881       break;
1882     case GST_EVENT_FLUSH_STOP:
1883       ret = gst_pad_push_event (priv->srcpad, event);
1884       ret =
1885           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1886           GST_PAD_MODE_PUSH, TRUE);
1887       break;
1888     default:
1889       if (GST_EVENT_IS_SERIALIZED (event)) {
1890         /* serialized events go in the queue */
1891         JBUF_LOCK (priv);
1892         if (priv->srcresult != GST_FLOW_OK) {
1893           /* Errors in sticky event pushing are no problem and ignored here
1894            * as they will cause more meaningful errors during data flow.
1895            * For EOS events, that are not followed by data flow, we still
1896            * return FALSE here though.
1897            */
1898           if (!GST_EVENT_IS_STICKY (event) ||
1899               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1900             goto out_flow_error;
1901         }
1902         /* refuse more events on EOS */
1903         if (priv->eos)
1904           goto out_eos;
1905         ret = queue_event (jitterbuffer, event);
1906         JBUF_UNLOCK (priv);
1907       } else {
1908         /* non-serialized events are forwarded downstream immediately */
1909         ret = gst_pad_push_event (priv->srcpad, event);
1910       }
1911       break;
1912   }
1913   return ret;
1914
1915   /* ERRORS */
1916 out_flow_error:
1917   {
1918     GST_DEBUG_OBJECT (jitterbuffer,
1919         "refusing event, we have a downstream flow error: %s",
1920         gst_flow_get_name (priv->srcresult));
1921     JBUF_UNLOCK (priv);
1922     gst_event_unref (event);
1923     return FALSE;
1924   }
1925 out_eos:
1926   {
1927     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1928     JBUF_UNLOCK (priv);
1929     gst_event_unref (event);
1930     return FALSE;
1931   }
1932 }
1933
1934 static gboolean
1935 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1936     GstEvent * event)
1937 {
1938   gboolean ret = TRUE;
1939   GstRtpJitterBuffer *jitterbuffer;
1940
1941   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1942
1943   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1944
1945   switch (GST_EVENT_TYPE (event)) {
1946     case GST_EVENT_FLUSH_START:
1947       gst_event_unref (event);
1948       break;
1949     case GST_EVENT_FLUSH_STOP:
1950       gst_event_unref (event);
1951       break;
1952     default:
1953       ret = gst_pad_event_default (pad, parent, event);
1954       break;
1955   }
1956
1957   return ret;
1958 }
1959
1960 /*
1961  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1962  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1963  * GST_FLOW_FLUSHING when the element is shutting down. On success
1964  * GST_FLOW_OK is returned.
1965  */
1966 static GstFlowReturn
1967 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1968     guint8 pt)
1969 {
1970   GValue ret = { 0 };
1971   GValue args[2] = { {0}, {0} };
1972   GstCaps *caps;
1973   gboolean res;
1974
1975   g_value_init (&args[0], GST_TYPE_ELEMENT);
1976   g_value_set_object (&args[0], jitterbuffer);
1977   g_value_init (&args[1], G_TYPE_UINT);
1978   g_value_set_uint (&args[1], pt);
1979
1980   g_value_init (&ret, GST_TYPE_CAPS);
1981   g_value_set_boxed (&ret, NULL);
1982
1983   JBUF_UNLOCK (jitterbuffer->priv);
1984   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1985       &ret);
1986   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1987
1988   g_value_unset (&args[0]);
1989   g_value_unset (&args[1]);
1990   caps = (GstCaps *) g_value_dup_boxed (&ret);
1991   g_value_unset (&ret);
1992   if (!caps)
1993     goto no_caps;
1994
1995   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
1996   gst_caps_unref (caps);
1997
1998   if (G_UNLIKELY (!res))
1999     goto parse_failed;
2000
2001   return GST_FLOW_OK;
2002
2003   /* ERRORS */
2004 no_caps:
2005   {
2006     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
2007     return GST_FLOW_ERROR;
2008   }
2009 out_flushing:
2010   {
2011     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
2012     return GST_FLOW_FLUSHING;
2013   }
2014 parse_failed:
2015   {
2016     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
2017     return GST_FLOW_ERROR;
2018   }
2019 }
2020
2021 /* call with jbuf lock held */
2022 static GstMessage *
2023 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
2024 {
2025   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2026   GstMessage *message = NULL;
2027
2028   if (percent == -1)
2029     return NULL;
2030
2031   /* Post a buffering message */
2032   if (priv->last_percent != percent) {
2033     priv->last_percent = percent;
2034     message =
2035         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
2036     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
2037   }
2038
2039   return message;
2040 }
2041
2042 static void
2043 update_offset (GstRtpJitterBuffer * jitterbuffer)
2044 {
2045   GstRtpJitterBufferPrivate *priv;
2046
2047   priv = jitterbuffer->priv;
2048
2049   if (priv->ts_offset_remainder != 0) {
2050     GST_DEBUG ("adjustment %" G_GUINT64_FORMAT " remain %" G_GINT64_FORMAT
2051         " off %" G_GINT64_FORMAT, priv->max_ts_offset_adjustment,
2052         priv->ts_offset_remainder, priv->ts_offset);
2053     if (ABS (priv->ts_offset_remainder) > priv->max_ts_offset_adjustment) {
2054       if (priv->ts_offset_remainder > 0) {
2055         priv->ts_offset += priv->max_ts_offset_adjustment;
2056         priv->ts_offset_remainder -= priv->max_ts_offset_adjustment;
2057       } else {
2058         priv->ts_offset -= priv->max_ts_offset_adjustment;
2059         priv->ts_offset_remainder += priv->max_ts_offset_adjustment;
2060       }
2061     } else {
2062       priv->ts_offset += priv->ts_offset_remainder;
2063       priv->ts_offset_remainder = 0;
2064     }
2065   }
2066 }
2067
2068 static GstClockTime
2069 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
2070 {
2071   GstRtpJitterBufferPrivate *priv;
2072
2073   priv = jitterbuffer->priv;
2074
2075   if (timestamp == -1)
2076     return -1;
2077
2078   /* apply the timestamp offset, this is used for inter stream sync */
2079   timestamp += priv->ts_offset;
2080   /* add the offset, this is used when buffering */
2081   timestamp += priv->out_offset;
2082
2083   return timestamp;
2084 }
2085
2086 static TimerQueue *
2087 timer_queue_new (void)
2088 {
2089   TimerQueue *queue;
2090
2091   queue = g_slice_new (TimerQueue);
2092   queue->timers = g_queue_new ();
2093   queue->hashtable = g_hash_table_new (NULL, NULL);
2094
2095   return queue;
2096 }
2097
2098 static void
2099 timer_queue_free (TimerQueue * queue)
2100 {
2101   if (!queue)
2102     return;
2103
2104   g_hash_table_destroy (queue->hashtable);
2105   g_queue_free_full (queue->timers, g_free);
2106   g_slice_free (TimerQueue, queue);
2107 }
2108
2109 static void
2110 timer_queue_append (TimerQueue * queue, const TimerData * timer,
2111     GstClockTime timeout, gboolean lost)
2112 {
2113   TimerData *copy;
2114
2115   copy = g_memdup (timer, sizeof (*timer));
2116   copy->timeout = timeout;
2117   copy->type = lost ? TIMER_TYPE_LOST : TIMER_TYPE_EXPECTED;
2118   copy->idx = -1;
2119
2120   GST_LOG ("Append rtx-stats timer #%d, %" GST_TIME_FORMAT,
2121       copy->seqnum, GST_TIME_ARGS (copy->timeout));
2122   g_queue_push_tail (queue->timers, copy);
2123   g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (copy->seqnum), copy);
2124 }
2125
2126 static void
2127 timer_queue_clear_until (TimerQueue * queue, GstClockTime timeout)
2128 {
2129   TimerData *test;
2130
2131   test = g_queue_peek_head (queue->timers);
2132   while (test && test->timeout < timeout) {
2133     GST_LOG ("Pop rtx-stats timer #%d, %" GST_TIME_FORMAT " < %"
2134         GST_TIME_FORMAT, test->seqnum, GST_TIME_ARGS (test->timeout),
2135         GST_TIME_ARGS (timeout));
2136     g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (test->seqnum));
2137     g_free (g_queue_pop_head (queue->timers));
2138     test = g_queue_peek_head (queue->timers);
2139   }
2140 }
2141
2142 static TimerData *
2143 timer_queue_find (TimerQueue * queue, guint16 seqnum)
2144 {
2145   return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum));
2146 }
2147
2148 static TimerData *
2149 find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2150 {
2151   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2152   TimerData *timer = NULL;
2153   gint i, len;
2154
2155   len = priv->timers->len;
2156   for (i = 0; i < len; i++) {
2157     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2158     if (test->seqnum == seqnum) {
2159       timer = test;
2160       break;
2161     }
2162   }
2163   return timer;
2164 }
2165
2166 static void
2167 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
2168 {
2169   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2170
2171   if (priv->clock_id) {
2172     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
2173     gst_clock_id_unschedule (priv->clock_id);
2174     priv->clock_id = NULL;
2175   }
2176 }
2177
2178 static GstClockTime
2179 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2180 {
2181   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2182   GstClockTime test_timeout;
2183
2184   if ((test_timeout = timer->timeout) == -1)
2185     return -1;
2186
2187   if (timer->type != TIMER_TYPE_EXPECTED) {
2188     /* add our latency and offset to get output times. */
2189     test_timeout = apply_offset (jitterbuffer, test_timeout);
2190     test_timeout += priv->latency_ns;
2191   }
2192   return test_timeout;
2193 }
2194
2195 static void
2196 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2197 {
2198   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2199
2200   if (priv->clock_id) {
2201     GstClockTime timeout = get_timeout (jitterbuffer, timer);
2202
2203     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
2204         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
2205
2206     if (timeout == -1 || timeout < priv->timer_timeout)
2207       unschedule_current_timer (jitterbuffer);
2208   }
2209 }
2210
2211 static TimerData *
2212 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2213     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
2214     GstClockTime duration)
2215 {
2216   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2217   TimerData *timer;
2218   gint len;
2219
2220   GST_DEBUG_OBJECT (jitterbuffer,
2221       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
2222       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
2223       GST_TIME_ARGS (delay));
2224
2225   len = priv->timers->len;
2226   g_array_set_size (priv->timers, len + 1);
2227   timer = &g_array_index (priv->timers, TimerData, len);
2228   timer->idx = len;
2229   timer->type = type;
2230   timer->seqnum = seqnum;
2231   timer->num = num;
2232   timer->timeout = timeout + delay;
2233   timer->duration = duration;
2234   if (type == TIMER_TYPE_EXPECTED) {
2235     timer->rtx_base = timeout;
2236     timer->rtx_delay = delay;
2237     timer->rtx_retry = 0;
2238   }
2239   timer->rtx_last = GST_CLOCK_TIME_NONE;
2240   timer->num_rtx_retry = 0;
2241   timer->num_rtx_received = 0;
2242   recalculate_timer (jitterbuffer, timer);
2243   JBUF_SIGNAL_TIMER (priv);
2244
2245   return timer;
2246 }
2247
2248 static void
2249 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2250     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
2251 {
2252   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2253   gboolean seqchange, timechange;
2254   guint16 oldseq;
2255   GstClockTime new_timeout;
2256
2257   oldseq = timer->seqnum;
2258   new_timeout = timeout + delay;
2259   seqchange = oldseq != seqnum;
2260   timechange = timer->timeout != new_timeout;
2261
2262   if (!seqchange && !timechange) {
2263     GST_DEBUG_OBJECT (jitterbuffer,
2264         "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT
2265         "), skipping", oldseq, GST_TIME_ARGS (timer->timeout));
2266     return;
2267   }
2268
2269   GST_DEBUG_OBJECT (jitterbuffer,
2270       "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT
2271       "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum,
2272       GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout));
2273
2274   timer->timeout = new_timeout;
2275   timer->seqnum = seqnum;
2276   if (reset) {
2277     GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT
2278         "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay),
2279         GST_TIME_ARGS (delay));
2280     timer->rtx_base = timeout;
2281     timer->rtx_delay = delay;
2282     timer->rtx_retry = 0;
2283   }
2284   if (seqchange) {
2285     timer->num_rtx_retry = 0;
2286     timer->num_rtx_received = 0;
2287   }
2288
2289   if (priv->clock_id) {
2290     /* we changed the seqnum and there is a timer currently waiting with this
2291      * seqnum, unschedule it */
2292     if (seqchange && priv->timer_seqnum == oldseq)
2293       unschedule_current_timer (jitterbuffer);
2294     /* we changed the time, check if it is earlier than what we are waiting
2295      * for and unschedule if so */
2296     else if (timechange)
2297       recalculate_timer (jitterbuffer, timer);
2298   }
2299 }
2300
2301 static TimerData *
2302 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2303     guint16 seqnum, GstClockTime timeout)
2304 {
2305   TimerData *timer;
2306
2307   /* find the seqnum timer */
2308   timer = find_timer (jitterbuffer, seqnum);
2309   if (timer == NULL) {
2310     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
2311   } else {
2312     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
2313   }
2314   return timer;
2315 }
2316
2317 static void
2318 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2319 {
2320   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2321   guint idx;
2322
2323   if (timer->idx == -1)
2324     return;
2325
2326   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
2327     unschedule_current_timer (jitterbuffer);
2328
2329   idx = timer->idx;
2330   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
2331   g_array_remove_index_fast (priv->timers, idx);
2332   timer->idx = idx;
2333
2334   JBUF_SIGNAL_TIMER (priv);
2335 }
2336
2337 static void
2338 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
2339 {
2340   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2341   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
2342   g_array_set_size (priv->timers, 0);
2343   unschedule_current_timer (jitterbuffer);
2344   JBUF_SIGNAL_TIMER (priv);
2345 }
2346
2347 /* get the extra delay to wait before sending RTX */
2348 static GstClockTime
2349 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
2350 {
2351   GstClockTime delay;
2352
2353   if (priv->rtx_delay == -1) {
2354     if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
2355       delay = DEFAULT_AUTO_RTX_DELAY;
2356     } else {
2357       /* jitter is in nanoseconds, maximum of 2x jitter and half the
2358        * packet spacing is a good margin */
2359       delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
2360     }
2361   } else {
2362     delay = priv->rtx_delay * GST_MSECOND;
2363   }
2364   if (priv->rtx_min_delay > 0)
2365     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
2366
2367   return delay;
2368 }
2369
2370 /* Check if packet with seqnum is already considered definitely lost by being
2371  * part of a "lost timer" for multiple packets */
2372 static gboolean
2373 already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2374 {
2375   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2376   gint i, len;
2377
2378   len = priv->timers->len;
2379   for (i = 0; i < len; i++) {
2380     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2381     gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2382
2383     if (test->num > 1 && test->type == TIMER_TYPE_LOST && gap >= 0 &&
2384         gap < test->num) {
2385       GST_DEBUG ("seqnum #%d already considered definitely lost (#%d->#%d)",
2386           seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff);
2387       return TRUE;
2388     }
2389   }
2390
2391   return FALSE;
2392 }
2393
2394 /* we just received a packet with seqnum and dts.
2395  *
2396  * First check for old seqnum that we are still expecting. If the gap with the
2397  * current seqnum is too big, unschedule the timeouts.
2398  *
2399  * If we have a valid packet spacing estimate we can set a timer for when we
2400  * should receive the next packet.
2401  * If we don't have a valid estimate, we remove any timer we might have
2402  * had for this packet.
2403  */
2404 static void
2405 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
2406     GstClockTime dts, GstClockTime pts, gboolean do_next_seqnum,
2407     gboolean is_rtx, TimerData * timer)
2408 {
2409   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2410
2411   /* go through all timers and unschedule the ones with a large gap */
2412   if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
2413     gint i, len;
2414     len = priv->timers->len;
2415     for (i = 0; i < len; i++) {
2416       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2417       gint gap;
2418
2419       gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2420
2421       GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
2422           test->type, test->seqnum, seqnum, gap);
2423
2424       if (gap > priv->rtx_delay_reorder) {
2425         /* max gap, we exceeded the max reorder distance and we don't expect the
2426          * missing packet to be this reordered */
2427         if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
2428           reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
2429       }
2430     }
2431   }
2432
2433   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
2434       && priv->do_retransmission && priv->rtx_next_seqnum;
2435
2436   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2437     if (timer->num_rtx_retry > 0) {
2438       if (is_rtx) {
2439         update_rtx_stats (jitterbuffer, timer, dts, TRUE);
2440         /* don't try to estimate the next seqnum because this is a retransmitted
2441          * packet and it probably did not arrive with the expected packet
2442          * spacing. */
2443         do_next_seqnum = FALSE;
2444       }
2445
2446       if (!is_rtx || timer->num_rtx_retry > 1) {
2447         /* Store timer in order to record stats when/if the retransmitted
2448          * packet arrives. We should also store timer information if we've
2449          * requested retransmission more than once since we may receive
2450          * several retransmitted packets. For accuracy we should update the
2451          * stats also when the redundant retransmitted packets arrives. */
2452         timer_queue_append (priv->rtx_stats_timers, timer,
2453             pts + priv->rtx_stats_timeout * GST_MSECOND, FALSE);
2454       }
2455     }
2456   }
2457
2458   if (do_next_seqnum && pts != GST_CLOCK_TIME_NONE) {
2459     GstClockTime expected, delay;
2460
2461     /* calculate expected arrival time of the next seqnum */
2462     expected = pts + priv->packet_spacing;
2463
2464     delay = get_rtx_delay (priv);
2465
2466     /* and update/install timer for next seqnum */
2467     GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer #%d, expected %"
2468         GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", packet-spacing %"
2469         GST_TIME_FORMAT ", jitter %" GST_TIME_FORMAT, priv->next_in_seqnum,
2470         GST_TIME_ARGS (expected), GST_TIME_ARGS (delay),
2471         GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter));
2472
2473     if (timer) {
2474       timer->type = TIMER_TYPE_EXPECTED;
2475       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
2476           delay, TRUE);
2477     } else {
2478       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
2479           expected, delay, priv->packet_spacing);
2480     }
2481   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2482     /* if we had a timer, remove it, we don't know when to expect the next
2483      * packet. */
2484     remove_timer (jitterbuffer, timer);
2485   }
2486 }
2487
2488 static void
2489 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2490     GstClockTime pts)
2491 {
2492   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2493
2494   /* we need consecutive seqnums with a different
2495    * rtptime to estimate the packet spacing. */
2496   if (priv->ips_rtptime != rtptime) {
2497     /* rtptime changed, check pts diff */
2498     if (priv->ips_pts != -1 && pts != -1 && pts > priv->ips_pts) {
2499       GstClockTime new_packet_spacing = pts - priv->ips_pts;
2500       GstClockTime old_packet_spacing = priv->packet_spacing;
2501
2502       /* Biased towards bigger packet spacings to prevent
2503        * too many unneeded retransmission requests for next
2504        * packets that just arrive a little later than we would
2505        * expect */
2506       if (old_packet_spacing > new_packet_spacing)
2507         priv->packet_spacing =
2508             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2509       else if (old_packet_spacing > 0)
2510         priv->packet_spacing =
2511             (3 * new_packet_spacing + old_packet_spacing) / 4;
2512       else
2513         priv->packet_spacing = new_packet_spacing;
2514
2515       GST_DEBUG_OBJECT (jitterbuffer,
2516           "new packet spacing %" GST_TIME_FORMAT
2517           " old packet spacing %" GST_TIME_FORMAT
2518           " combined to %" GST_TIME_FORMAT,
2519           GST_TIME_ARGS (new_packet_spacing),
2520           GST_TIME_ARGS (old_packet_spacing),
2521           GST_TIME_ARGS (priv->packet_spacing));
2522     }
2523     priv->ips_rtptime = rtptime;
2524     priv->ips_pts = pts;
2525   }
2526 }
2527
2528 static void
2529 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2530     guint16 seqnum, GstClockTime pts, gint gap)
2531 {
2532   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2533   GstClockTime duration, expected_pts, delay;
2534   TimerType type;
2535   gboolean equidistant = priv->equidistant > 0;
2536
2537   GST_DEBUG_OBJECT (jitterbuffer,
2538       "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2539       GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts));
2540
2541   if (pts == GST_CLOCK_TIME_NONE) {
2542     GST_WARNING_OBJECT (jitterbuffer, "Have no PTS");
2543     return;
2544   }
2545
2546   if (equidistant) {
2547     GstClockTime total_duration;
2548     /* the total duration spanned by the missing packets */
2549     if (pts >= priv->last_in_pts)
2550       total_duration = pts - priv->last_in_pts;
2551     else
2552       total_duration = 0;
2553
2554     /* interpolate between the current time and the last time based on
2555      * number of packets we are missing, this is the estimated duration
2556      * for the missing packet based on equidistant packet spacing. */
2557     duration = total_duration / (gap + 1);
2558
2559     GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2560         GST_TIME_ARGS (duration));
2561
2562     if (total_duration > priv->latency_ns) {
2563       GstClockTime gap_time;
2564       guint lost_packets;
2565
2566       if (duration > 0) {
2567         GstClockTime gap_dur = gap * duration;
2568         if (gap_dur > priv->latency_ns)
2569           gap_time = gap_dur - priv->latency_ns;
2570         else
2571           gap_time = 0;
2572         lost_packets = gap_time / duration;
2573       } else {
2574         gap_time = total_duration - priv->latency_ns;
2575         lost_packets = gap;
2576       }
2577
2578       /* too many lost packets, some of the missing packets are already
2579        * too late and we can generate lost packet events for them. */
2580       GST_INFO_OBJECT (jitterbuffer,
2581           "lost packets (%d, #%d->#%d) duration too large %" GST_TIME_FORMAT
2582           " > %" GST_TIME_FORMAT ", consider %u lost (%" GST_TIME_FORMAT ")",
2583           gap, expected, seqnum - 1, GST_TIME_ARGS (total_duration),
2584           GST_TIME_ARGS (priv->latency_ns), lost_packets,
2585           GST_TIME_ARGS (gap_time));
2586
2587       /* this timer will fire immediately and the lost event will be pushed from
2588        * the timer thread */
2589       if (lost_packets > 0) {
2590         add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2591             priv->last_in_pts + duration, 0, gap_time);
2592         expected += lost_packets;
2593         priv->last_in_pts += gap_time;
2594       }
2595     }
2596
2597     expected_pts = priv->last_in_pts + duration;
2598   } else {
2599     /* If we cannot assume equidistant packet spacing, the only thing we now
2600      * for sure is that the missing packets have expected pts not later than
2601      * the last received pts. */
2602     duration = 0;
2603     expected_pts = pts;
2604   }
2605
2606   delay = 0;
2607
2608   if (priv->do_retransmission) {
2609     TimerData *timer = find_timer (jitterbuffer, expected);
2610
2611     type = TIMER_TYPE_EXPECTED;
2612     delay = get_rtx_delay (priv);
2613
2614     /* if we had a timer for the first missing packet, update it. */
2615     if (timer && timer->type == TIMER_TYPE_EXPECTED) {
2616       GstClockTime timeout = timer->timeout;
2617
2618       timer->duration = duration;
2619       if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) {
2620         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts,
2621             delay, TRUE);
2622       }
2623       expected++;
2624       expected_pts += duration;
2625     }
2626   } else {
2627     type = TIMER_TYPE_LOST;
2628   }
2629
2630   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2631     add_timer (jitterbuffer, type, expected, 0, expected_pts, delay, duration);
2632     expected_pts += duration;
2633     expected++;
2634   }
2635 }
2636
2637 static void
2638 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2639     guint32 rtptime)
2640 {
2641   gint32 rtpdiff;
2642   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2643   GstRtpJitterBufferPrivate *priv;
2644
2645   priv = jitterbuffer->priv;
2646
2647   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2648     goto no_time;
2649
2650   if (priv->last_dts != -1)
2651     dtsdiff = dts - priv->last_dts;
2652   else
2653     dtsdiff = 0;
2654
2655   if (priv->last_rtptime != -1)
2656     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2657   else
2658     rtpdiff = 0;
2659
2660   /* Guess whether stream currently uses equidistant packet spacing. If we
2661    * often see identical timestamps it means the packets are not
2662    * equidistant. */
2663   if (rtptime == priv->last_rtptime)
2664     priv->equidistant -= 2;
2665   else
2666     priv->equidistant += 1;
2667   priv->equidistant = CLAMP (priv->equidistant, -7, 7);
2668
2669   priv->last_dts = dts;
2670   priv->last_rtptime = rtptime;
2671
2672   if (rtpdiff > 0)
2673     rtpdiffns =
2674         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2675   else
2676     rtpdiffns =
2677         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2678
2679   diff = ABS (dtsdiff - rtpdiffns);
2680
2681   /* jitter is stored in nanoseconds */
2682   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2683
2684   GST_LOG_OBJECT (jitterbuffer,
2685       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2686       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2687       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2688       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2689
2690   return;
2691
2692   /* ERRORS */
2693 no_time:
2694   {
2695     GST_DEBUG_OBJECT (jitterbuffer,
2696         "no dts or no clock-rate, can't calculate jitter");
2697     return;
2698   }
2699 }
2700
2701 static gint
2702 compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
2703 {
2704   GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
2705   GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
2706   guint seq_a, seq_b;
2707
2708   gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
2709   seq_a = gst_rtp_buffer_get_seq (&rtp_a);
2710   gst_rtp_buffer_unmap (&rtp_a);
2711
2712   gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
2713   seq_b = gst_rtp_buffer_get_seq (&rtp_b);
2714   gst_rtp_buffer_unmap (&rtp_b);
2715
2716   return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
2717 }
2718
2719 static gboolean
2720 handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, GstBuffer * buffer,
2721     guint8 pt, guint16 seqnum, gint gap, guint max_dropout, guint max_misorder)
2722 {
2723   GstRtpJitterBufferPrivate *priv;
2724   guint gap_packets_length;
2725   gboolean reset = FALSE;
2726   gboolean future = gap > 0;
2727
2728   priv = jitterbuffer->priv;
2729
2730   if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
2731     GList *l;
2732     guint32 prev_gap_seq = -1;
2733     gboolean all_consecutive = TRUE;
2734
2735     g_queue_insert_sorted (&priv->gap_packets, buffer,
2736         (GCompareDataFunc) compare_buffer_seqnum, NULL);
2737
2738     for (l = priv->gap_packets.head; l; l = l->next) {
2739       GstBuffer *gap_buffer = l->data;
2740       GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2741       guint32 gap_seq;
2742
2743       gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2744
2745       all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
2746
2747       gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
2748       if (prev_gap_seq == -1)
2749         prev_gap_seq = gap_seq;
2750       else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
2751         all_consecutive = FALSE;
2752       else
2753         prev_gap_seq = gap_seq;
2754
2755       gst_rtp_buffer_unmap (&gap_rtp);
2756       if (!all_consecutive)
2757         break;
2758     }
2759
2760     if (all_consecutive && gap_packets_length > 3) {
2761       GST_DEBUG_OBJECT (jitterbuffer,
2762           "buffer too %s %d < %d, got 5 consecutive ones - reset",
2763           (future ? "new" : "old"), gap,
2764           (future ? max_dropout : -max_misorder));
2765       reset = TRUE;
2766     } else if (!all_consecutive) {
2767       g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2768       g_queue_clear (&priv->gap_packets);
2769       GST_DEBUG_OBJECT (jitterbuffer,
2770           "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
2771           (future ? "new" : "old"), gap,
2772           (future ? max_dropout : -max_misorder));
2773       buffer = NULL;
2774     } else {
2775       GST_DEBUG_OBJECT (jitterbuffer,
2776           "buffer too %s %d < %d, got %u consecutive ones - waiting",
2777           (future ? "new" : "old"), gap,
2778           (future ? max_dropout : -max_misorder), gap_packets_length + 1);
2779       buffer = NULL;
2780     }
2781   } else {
2782     GST_DEBUG_OBJECT (jitterbuffer,
2783         "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
2784         gap, -max_misorder);
2785     g_queue_push_tail (&priv->gap_packets, buffer);
2786     buffer = NULL;
2787   }
2788
2789   return reset;
2790 }
2791
2792 static GstClockTime
2793 get_current_running_time (GstRtpJitterBuffer * jitterbuffer)
2794 {
2795   GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer));
2796   GstClockTime running_time = GST_CLOCK_TIME_NONE;
2797
2798   if (clock) {
2799     GstClockTime base_time =
2800         gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer));
2801     GstClockTime clock_time = gst_clock_get_time (clock);
2802
2803     if (clock_time > base_time)
2804       running_time = clock_time - base_time;
2805     else
2806       running_time = 0;
2807
2808     gst_object_unref (clock);
2809   }
2810
2811   return running_time;
2812 }
2813
2814 static GstFlowReturn
2815 gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
2816     GstPad * pad, GstObject * parent, guint16 seqnum)
2817 {
2818   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2819   GstFlowReturn ret = GST_FLOW_OK;
2820   GList *events = NULL, *l;
2821   GList *buffers;
2822   gboolean head;
2823
2824   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2825   rtp_jitter_buffer_flush (priv->jbuf,
2826       (GFunc) free_item_and_retain_events, &events);
2827   rtp_jitter_buffer_reset_skew (priv->jbuf);
2828   remove_all_timers (jitterbuffer);
2829   priv->discont = TRUE;
2830   priv->last_popped_seqnum = -1;
2831
2832   if (priv->gap_packets.head) {
2833     GstBuffer *gap_buffer = priv->gap_packets.head->data;
2834     GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2835
2836     gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2837     priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp);
2838     gst_rtp_buffer_unmap (&gap_rtp);
2839   } else {
2840     priv->next_seqnum = seqnum;
2841   }
2842
2843   priv->last_in_pts = -1;
2844   priv->next_in_seqnum = -1;
2845
2846   /* Insert all sticky events again in order, otherwise we would
2847    * potentially loose STREAM_START, CAPS or SEGMENT events
2848    */
2849   events = g_list_reverse (events);
2850   for (l = events; l; l = l->next) {
2851     RTPJitterBufferItem *item;
2852
2853     item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2854     rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2855   }
2856   g_list_free (events);
2857
2858   JBUF_SIGNAL_EVENT (priv);
2859
2860   /* reset spacing estimation when gap */
2861   priv->ips_rtptime = -1;
2862   priv->ips_pts = GST_CLOCK_TIME_NONE;
2863
2864   buffers = g_list_copy (priv->gap_packets.head);
2865   g_queue_clear (&priv->gap_packets);
2866
2867   priv->ips_rtptime = -1;
2868   priv->ips_pts = GST_CLOCK_TIME_NONE;
2869   JBUF_UNLOCK (jitterbuffer->priv);
2870
2871   for (l = buffers; l; l = l->next) {
2872     ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
2873     l->data = NULL;
2874     if (ret != GST_FLOW_OK) {
2875       l = l->next;
2876       break;
2877     }
2878   }
2879   for (; l; l = l->next)
2880     gst_buffer_unref (l->data);
2881   g_list_free (buffers);
2882
2883   return ret;
2884 }
2885
2886 static gboolean
2887 gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer)
2888 {
2889   GstRtpJitterBufferPrivate *priv;
2890   RTPJitterBufferItem *item;
2891   TimerData *timer;
2892
2893   priv = jitterbuffer->priv;
2894
2895   if (priv->faststart_min_packets == 0)
2896     return FALSE;
2897
2898   item = rtp_jitter_buffer_peek (priv->jbuf);
2899   if (!item)
2900     return FALSE;
2901
2902   timer = find_timer (jitterbuffer, item->seqnum);
2903   if (!timer || timer->type != TIMER_TYPE_DEADLINE)
2904     return FALSE;
2905
2906   if (rtp_jitter_buffer_can_fast_start (priv->jbuf,
2907           priv->faststart_min_packets)) {
2908     GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
2909         priv->faststart_min_packets);
2910     timer->timeout = -1;
2911     return TRUE;
2912   }
2913
2914   return FALSE;
2915 }
2916
2917 static GstFlowReturn
2918 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2919     GstBuffer * buffer)
2920 {
2921   GstRtpJitterBuffer *jitterbuffer;
2922   GstRtpJitterBufferPrivate *priv;
2923   guint16 seqnum;
2924   guint32 expected, rtptime;
2925   GstFlowReturn ret = GST_FLOW_OK;
2926   GstClockTime dts, pts;
2927   guint64 latency_ts;
2928   gboolean head;
2929   gint percent = -1;
2930   guint8 pt;
2931   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2932   gboolean do_next_seqnum = FALSE;
2933   RTPJitterBufferItem *item;
2934   GstMessage *msg = NULL;
2935   gboolean estimated_dts = FALSE;
2936   gint32 packet_rate, max_dropout, max_misorder;
2937   TimerData *timer = NULL;
2938
2939   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2940
2941   priv = jitterbuffer->priv;
2942
2943   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2944     goto invalid_buffer;
2945
2946   pt = gst_rtp_buffer_get_payload_type (&rtp);
2947   seqnum = gst_rtp_buffer_get_seq (&rtp);
2948   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2949   gst_rtp_buffer_unmap (&rtp);
2950
2951   /* make sure we have PTS and DTS set */
2952   pts = GST_BUFFER_PTS (buffer);
2953   dts = GST_BUFFER_DTS (buffer);
2954   if (dts == -1)
2955     dts = pts;
2956   else if (pts == -1)
2957     pts = dts;
2958
2959   if (dts == -1) {
2960     /* If we have no DTS here, i.e. no capture time, get one from the
2961      * clock now to have something to calculate with in the future. */
2962     dts = get_current_running_time (jitterbuffer);
2963     pts = dts;
2964
2965     /* Remember that we estimated the DTS if we are running already
2966      * and this is not our first packet (or first packet after a reset).
2967      * If it's the first packet, we somehow must generate a timestamp for
2968      * everything, otherwise we can't calculate any times
2969      */
2970     estimated_dts = (priv->next_in_seqnum != -1);
2971   } else {
2972     /* take the DTS of the buffer. This is the time when the packet was
2973      * received and is used to calculate jitter and clock skew. We will adjust
2974      * this DTS with the smoothed value after processing it in the
2975      * jitterbuffer and assign it as the PTS. */
2976     /* bring to running time */
2977     dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2978   }
2979
2980   GST_DEBUG_OBJECT (jitterbuffer,
2981       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
2982       seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer),
2983       GST_BUFFER_IS_RETRANSMISSION (buffer));
2984
2985   JBUF_LOCK_CHECK (priv, out_flushing);
2986
2987   if (G_UNLIKELY (priv->last_pt != pt)) {
2988     GstCaps *caps;
2989
2990     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2991         pt);
2992
2993     priv->last_pt = pt;
2994     /* reset clock-rate so that we get a new one */
2995     priv->clock_rate = -1;
2996
2997     /* Try to get the clock-rate from the caps first if we can. If there are no
2998      * caps we must fire the signal to get the clock-rate. */
2999     if ((caps = gst_pad_get_current_caps (pad))) {
3000       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
3001       gst_caps_unref (caps);
3002     }
3003   }
3004
3005   if (G_UNLIKELY (priv->clock_rate == -1)) {
3006     /* no clock rate given on the caps, try to get one with the signal */
3007     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
3008             pt) == GST_FLOW_FLUSHING)
3009       goto out_flushing;
3010
3011     if (G_UNLIKELY (priv->clock_rate == -1))
3012       goto no_clock_rate;
3013
3014     gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
3015   }
3016
3017   /* don't accept more data on EOS */
3018   if (G_UNLIKELY (priv->eos))
3019     goto have_eos;
3020
3021   if (!GST_BUFFER_IS_RETRANSMISSION (buffer))
3022     calculate_jitter (jitterbuffer, dts, rtptime);
3023
3024   if (priv->seqnum_base != -1) {
3025     gint gap;
3026
3027     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
3028
3029     if (gap < 0) {
3030       GST_DEBUG_OBJECT (jitterbuffer,
3031           "packet seqnum #%d before seqnum-base #%d", seqnum,
3032           priv->seqnum_base);
3033       gst_buffer_unref (buffer);
3034       goto finished;
3035     } else if (gap > 16384) {
3036       /* From now on don't compare against the seqnum base anymore as
3037        * at some point in the future we will wrap around and also that
3038        * much reordering is very unlikely */
3039       priv->seqnum_base = -1;
3040     }
3041   }
3042
3043   expected = priv->next_in_seqnum;
3044
3045   packet_rate =
3046       gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx, seqnum, rtptime);
3047   max_dropout =
3048       gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
3049       priv->max_dropout_time);
3050   max_misorder =
3051       gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
3052       priv->max_misorder_time);
3053   GST_TRACE_OBJECT (jitterbuffer,
3054       "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
3055       max_dropout, max_misorder);
3056
3057   /* now check against our expected seqnum */
3058   if (G_UNLIKELY (expected == -1)) {
3059     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3060
3061     /* calculate a pts based on rtptime and arrival time (dts) */
3062     pts =
3063         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3064         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
3065
3066     /* we don't know what the next_in_seqnum should be, wait for the last
3067      * possible moment to push this buffer, maybe we get an earlier seqnum
3068      * while we wait */
3069     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts);
3070
3071     do_next_seqnum = TRUE;
3072     /* take rtptime and pts to calculate packet spacing */
3073     priv->ips_rtptime = rtptime;
3074     priv->ips_pts = pts;
3075
3076   } else {
3077     gint gap;
3078     /* now calculate gap */
3079     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
3080     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
3081         expected, seqnum, gap);
3082
3083     if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) {
3084       /* If we have timers for more than RTP_MAX_DROPOUT packets
3085        * pending this means that we have a huge gap overall. We can
3086        * reset the jitterbuffer at this point because there's
3087        * just too much data missing to be able to do anything
3088        * sensible with the past data. Just try again from the
3089        * next packet */
3090       GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
3091           priv->timers->len, max_dropout);
3092       gst_buffer_unref (buffer);
3093       return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3094     }
3095
3096     /* Special handling of large gaps */
3097     if ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout)) {
3098       gboolean reset = handle_big_gap_buffer (jitterbuffer, buffer, pt, seqnum,
3099           gap, max_dropout, max_misorder);
3100       if (reset) {
3101         return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3102       } else {
3103         GST_DEBUG_OBJECT (jitterbuffer,
3104             "Had big gap, waiting for more consecutive packets");
3105         goto finished;
3106       }
3107     }
3108
3109     /* We had no huge gap, let's drop all the gap packets */
3110     GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
3111     g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3112     g_queue_clear (&priv->gap_packets);
3113
3114     /* calculate a pts based on rtptime and arrival time (dts) */
3115     /* If we estimated the DTS, don't consider it in the clock skew calculations */
3116     pts =
3117         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3118         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
3119
3120     if (G_LIKELY (gap == 0)) {
3121       /* packet is expected */
3122       calculate_packet_spacing (jitterbuffer, rtptime, pts);
3123       do_next_seqnum = TRUE;
3124     } else {
3125
3126       /* we have a gap */
3127       if (gap > 0) {
3128         GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
3129         /* fill in the gap with EXPECTED timers */
3130         calculate_expected (jitterbuffer, expected, seqnum, pts, gap);
3131         do_next_seqnum = TRUE;
3132       } else {
3133         GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
3134         do_next_seqnum = FALSE;
3135       }
3136
3137       /* reset spacing estimation when gap */
3138       priv->ips_rtptime = -1;
3139       priv->ips_pts = GST_CLOCK_TIME_NONE;
3140     }
3141   }
3142
3143   if (do_next_seqnum) {
3144     priv->last_in_pts = pts;
3145     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
3146   }
3147
3148   timer = find_timer (jitterbuffer, seqnum);
3149   if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
3150     if (!timer)
3151       timer = timer_queue_find (priv->rtx_stats_timers, seqnum);
3152     if (timer)
3153       timer->num_rtx_received++;
3154   }
3155
3156   /* At 2^15, we would detect a seqnum rollover too early, therefore
3157    * limit the queue size. But let's not limit it to a number that is
3158    * too small to avoid emptying it needlessly if there is a spurious huge
3159    * sequence number, let's allow at least 10k packets in any case. */
3160   while (rtp_jitter_buffer_get_seqnum_diff (priv->jbuf) >= 32765 &&
3161       rtp_jitter_buffer_num_packets (priv->jbuf) > 10000 &&
3162       priv->srcresult == GST_FLOW_OK)
3163     JBUF_WAIT_QUEUE (priv);
3164   if (priv->srcresult != GST_FLOW_OK)
3165     goto out_flushing;
3166
3167   /* let's check if this buffer is too late, we can only accept packets with
3168    * bigger seqnum than the one we last pushed. */
3169   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
3170     gint gap;
3171
3172     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
3173
3174     /* priv->last_popped_seqnum >= seqnum, we're too late. */
3175     if (G_UNLIKELY (gap <= 0)) {
3176       if (priv->do_retransmission) {
3177         if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) {
3178           update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3179           /* Only count the retranmitted packet too late if it has been
3180            * considered lost. If the original packet arrived before the
3181            * retransmitted we just count it as a duplicate. */
3182           if (timer->type != TIMER_TYPE_LOST)
3183             goto rtx_duplicate;
3184         }
3185       }
3186       goto too_late;
3187     }
3188   }
3189
3190   if (already_lost (jitterbuffer, seqnum))
3191     goto already_lost;
3192
3193   /* let's drop oldest packet if the queue is already full and drop-on-latency
3194    * is set. We can only do this when there actually is a latency. When no
3195    * latency is set, we just pump it in the queue and let the other end push it
3196    * out as fast as possible. */
3197   if (priv->latency_ms && priv->drop_on_latency) {
3198     latency_ts =
3199         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
3200
3201     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
3202       RTPJitterBufferItem *old_item;
3203
3204       old_item = rtp_jitter_buffer_peek (priv->jbuf);
3205
3206       if (IS_DROPABLE (old_item)) {
3207         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3208         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
3209             old_item);
3210         priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
3211         free_item (old_item);
3212       }
3213       /* we might have removed some head buffers, signal the pushing thread to
3214        * see if it can push now */
3215       JBUF_SIGNAL_EVENT (priv);
3216     }
3217   }
3218
3219   /* If we estimated the DTS, don't consider it in the clock skew calculations
3220    * later. The code above always sets dts to pts or the other way around if
3221    * any of those is valid in the buffer, so we know that if we estimated the
3222    * dts that both are unknown */
3223   if (estimated_dts)
3224     item =
3225         alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE,
3226         pts, seqnum, 1, rtptime);
3227   else
3228     item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
3229
3230   /* now insert the packet into the queue in sorted order. This function returns
3231    * FALSE if a packet with the same seqnum was already in the queue, meaning we
3232    * have a duplicate. */
3233   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, &head,
3234               &percent))) {
3235     if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer)
3236       update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3237     goto duplicate;
3238   }
3239
3240   /* Trigger fast start if needed */
3241   if (gst_rtp_jitter_buffer_fast_start (jitterbuffer))
3242     head = TRUE;
3243
3244   /* update timers */
3245   update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum,
3246       GST_BUFFER_IS_RETRANSMISSION (buffer), timer);
3247
3248   /* we had an unhandled SR, handle it now */
3249   if (priv->last_sr)
3250     do_handle_sync (jitterbuffer);
3251
3252   if (G_UNLIKELY (head)) {
3253     /* signal addition of new buffer when the _loop is waiting. */
3254     if (G_LIKELY (priv->active))
3255       JBUF_SIGNAL_EVENT (priv);
3256
3257     /* let's unschedule and unblock any waiting buffers. We only want to do this
3258      * when the head buffer changed */
3259     if (G_UNLIKELY (priv->clock_id)) {
3260       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
3261       unschedule_current_timer (jitterbuffer);
3262     }
3263   }
3264
3265   GST_DEBUG_OBJECT (jitterbuffer,
3266       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
3267       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
3268
3269   msg = check_buffering_percent (jitterbuffer, percent);
3270
3271 finished:
3272   JBUF_UNLOCK (priv);
3273
3274   if (msg)
3275     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3276
3277   return ret;
3278
3279   /* ERRORS */
3280 invalid_buffer:
3281   {
3282     /* this is not fatal but should be filtered earlier */
3283     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3284         ("Received invalid RTP payload, dropping"));
3285     gst_buffer_unref (buffer);
3286     return GST_FLOW_OK;
3287   }
3288 no_clock_rate:
3289   {
3290     GST_WARNING_OBJECT (jitterbuffer,
3291         "No clock-rate in caps!, dropping buffer");
3292     gst_buffer_unref (buffer);
3293     goto finished;
3294   }
3295 out_flushing:
3296   {
3297     ret = priv->srcresult;
3298     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
3299     gst_buffer_unref (buffer);
3300     goto finished;
3301   }
3302 have_eos:
3303   {
3304     ret = GST_FLOW_EOS;
3305     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
3306     gst_buffer_unref (buffer);
3307     goto finished;
3308   }
3309 too_late:
3310   {
3311     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
3312         " popped, dropping", seqnum, priv->last_popped_seqnum);
3313     priv->num_late++;
3314     gst_buffer_unref (buffer);
3315     goto finished;
3316   }
3317 already_lost:
3318   {
3319     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as it was already "
3320         "considered lost", seqnum);
3321     priv->num_late++;
3322     gst_buffer_unref (buffer);
3323     goto finished;
3324   }
3325 duplicate:
3326   {
3327     GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
3328         seqnum);
3329     priv->num_duplicates++;
3330     free_item (item);
3331     goto finished;
3332   }
3333 rtx_duplicate:
3334   {
3335     GST_DEBUG_OBJECT (jitterbuffer,
3336         "Duplicate RTX packet #%d detected, dropping", seqnum);
3337     priv->num_duplicates++;
3338     gst_buffer_unref (buffer);
3339     goto finished;
3340   }
3341 }
3342
3343 /* FIXME: hopefully we can do something more efficient here, especially when
3344  * all packets are in order and/or outside of the currently cached range.
3345  * Still worthwhile to have it, avoids taking/releasing object lock and pad
3346  * stream lock for every single buffer in the default chain_list fallback. */
3347 static GstFlowReturn
3348 gst_rtp_jitter_buffer_chain_list (GstPad * pad, GstObject * parent,
3349     GstBufferList * buffer_list)
3350 {
3351   GstFlowReturn flow_ret = GST_FLOW_OK;
3352   guint i, n;
3353
3354   n = gst_buffer_list_length (buffer_list);
3355   for (i = 0; i < n; ++i) {
3356     GstBuffer *buf = gst_buffer_list_get (buffer_list, i);
3357
3358     flow_ret = gst_rtp_jitter_buffer_chain (pad, parent, gst_buffer_ref (buf));
3359
3360     if (flow_ret != GST_FLOW_OK)
3361       break;
3362   }
3363   gst_buffer_list_unref (buffer_list);
3364
3365   return flow_ret;
3366 }
3367
3368 static GstClockTime
3369 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
3370 {
3371   guint64 ext_time, elapsed;
3372   guint32 rtp_time;
3373   GstRtpJitterBufferPrivate *priv;
3374
3375   priv = jitterbuffer->priv;
3376   rtp_time = item->rtptime;
3377
3378   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
3379       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
3380
3381   ext_time = priv->ext_timestamp;
3382   ext_time = gst_rtp_buffer_ext_timestamp (&ext_time, rtp_time);
3383   if (ext_time < priv->ext_timestamp) {
3384     ext_time = priv->ext_timestamp;
3385   } else {
3386     priv->ext_timestamp = ext_time;
3387   }
3388
3389   if (ext_time > priv->clock_base)
3390     elapsed = ext_time - priv->clock_base;
3391   else
3392     elapsed = 0;
3393
3394   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
3395   return elapsed;
3396 }
3397
3398 static void
3399 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
3400     RTPJitterBufferItem * item)
3401 {
3402   guint64 total, elapsed, left, estimated;
3403   GstClockTime out_time;
3404   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3405
3406   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
3407       || priv->clock_base == -1 || priv->clock_rate <= 0)
3408     return;
3409
3410   /* compute the elapsed time */
3411   elapsed = compute_elapsed (jitterbuffer, item);
3412
3413   /* do nothing if elapsed time doesn't increment */
3414   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
3415     return;
3416
3417   priv->last_elapsed = elapsed;
3418
3419   /* this is the total time we need to play */
3420   total = priv->npt_stop - priv->npt_start;
3421   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
3422       GST_TIME_ARGS (total));
3423
3424   /* this is how much time there is left */
3425   if (total > elapsed)
3426     left = total - elapsed;
3427   else
3428     left = 0;
3429
3430   /* if we have less time left that the size of the buffer, we will not
3431    * be able to keep it filled, disabled buffering then */
3432   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
3433     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
3434         ", disable buffering close to EOS", GST_TIME_ARGS (left));
3435     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
3436   }
3437
3438   /* this is the current time as running-time */
3439   out_time = item->pts;
3440
3441   if (elapsed > 0)
3442     estimated = gst_util_uint64_scale (out_time, total, elapsed);
3443   else {
3444     /* if there is almost nothing left,
3445      * we may never advance enough to end up in the above case */
3446     if (total < GST_SECOND)
3447       estimated = GST_SECOND;
3448     else
3449       estimated = -1;
3450   }
3451   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
3452       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
3453
3454   if (estimated != -1 && priv->estimated_eos != estimated) {
3455     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
3456     priv->estimated_eos = estimated;
3457   }
3458 }
3459
3460 /* take a buffer from the queue and push it */
3461 static GstFlowReturn
3462 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
3463 {
3464   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3465   GstFlowReturn result = GST_FLOW_OK;
3466   RTPJitterBufferItem *item;
3467   GstBuffer *outbuf = NULL;
3468   GstEvent *outevent = NULL;
3469   GstQuery *outquery = NULL;
3470   GstClockTime dts, pts;
3471   gint percent = -1;
3472   gboolean do_push = TRUE;
3473   guint type;
3474   GstMessage *msg;
3475
3476   /* when we get here we are ready to pop and push the buffer */
3477   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3478   type = item->type;
3479
3480   switch (type) {
3481     case ITEM_TYPE_BUFFER:
3482
3483       /* we need to make writable to change the flags and timestamps */
3484       outbuf = gst_buffer_make_writable (item->data);
3485
3486       if (G_UNLIKELY (priv->discont)) {
3487         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
3488          * into the jitterbuffer so we can modify now. */
3489         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
3490         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
3491         priv->discont = FALSE;
3492       }
3493       if (G_UNLIKELY (priv->ts_discont)) {
3494         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
3495         priv->ts_discont = FALSE;
3496       }
3497
3498       dts =
3499           gst_segment_position_from_running_time (&priv->segment,
3500           GST_FORMAT_TIME, item->dts);
3501       pts =
3502           gst_segment_position_from_running_time (&priv->segment,
3503           GST_FORMAT_TIME, item->pts);
3504
3505       /* if this is a new frame, check if ts_offset needs to be updated */
3506       if (pts != priv->last_pts) {
3507         update_offset (jitterbuffer);
3508       }
3509
3510       /* apply timestamp with offset to buffer now */
3511       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
3512       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
3513
3514       /* update the elapsed time when we need to check against the npt stop time. */
3515       update_estimated_eos (jitterbuffer, item);
3516
3517       priv->last_pts = pts;
3518       priv->last_out_time = GST_BUFFER_PTS (outbuf);
3519       break;
3520     case ITEM_TYPE_LOST:
3521       priv->discont = TRUE;
3522       if (!priv->do_lost)
3523         do_push = FALSE;
3524       /* FALLTHROUGH */
3525     case ITEM_TYPE_EVENT:
3526       outevent = item->data;
3527       break;
3528     case ITEM_TYPE_QUERY:
3529       outquery = item->data;
3530       break;
3531   }
3532
3533   /* now we are ready to push the buffer. Save the seqnum and release the lock
3534    * so the other end can push stuff in the queue again. */
3535   if (seqnum != -1) {
3536     priv->last_popped_seqnum = seqnum;
3537     priv->next_seqnum = (seqnum + item->count) & 0xffff;
3538   }
3539   msg = check_buffering_percent (jitterbuffer, percent);
3540
3541   if (type == ITEM_TYPE_EVENT && outevent &&
3542       GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3543     g_assert (priv->eos);
3544     while (priv->timers->len > 0) {
3545       /* Stopping timers */
3546       unschedule_current_timer (jitterbuffer);
3547       JBUF_WAIT_TIMER (priv);
3548     }
3549   }
3550
3551   JBUF_UNLOCK (priv);
3552
3553   item->data = NULL;
3554   free_item (item);
3555
3556   if (msg)
3557     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3558
3559   switch (type) {
3560     case ITEM_TYPE_BUFFER:
3561       /* push buffer */
3562       GST_DEBUG_OBJECT (jitterbuffer,
3563           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
3564           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
3565           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
3566       priv->num_pushed++;
3567       result = gst_pad_push (priv->srcpad, outbuf);
3568
3569       JBUF_LOCK_CHECK (priv, out_flushing);
3570       break;
3571     case ITEM_TYPE_LOST:
3572     case ITEM_TYPE_EVENT:
3573       /* We got not enough consecutive packets with a huge gap, we can
3574        * as well just drop them here now on EOS */
3575       if (outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3576         GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
3577         g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3578         g_queue_clear (&priv->gap_packets);
3579       }
3580
3581       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
3582           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
3583
3584       if (do_push)
3585         gst_pad_push_event (priv->srcpad, outevent);
3586       else if (outevent)
3587         gst_event_unref (outevent);
3588
3589       result = GST_FLOW_OK;
3590
3591       JBUF_LOCK_CHECK (priv, out_flushing);
3592       break;
3593     case ITEM_TYPE_QUERY:
3594     {
3595       gboolean res;
3596
3597       res = gst_pad_peer_query (priv->srcpad, outquery);
3598
3599       JBUF_LOCK_CHECK (priv, out_flushing);
3600       result = GST_FLOW_OK;
3601       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
3602       JBUF_SIGNAL_QUERY (priv, res);
3603       break;
3604     }
3605   }
3606   return result;
3607
3608   /* ERRORS */
3609 out_flushing:
3610   {
3611     return priv->srcresult;
3612   }
3613 }
3614
3615 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
3616
3617 /* Peek a buffer and compare the seqnum to the expected seqnum.
3618  * If all is fine, the buffer is pushed.
3619  * If something is wrong, we wait for some event
3620  */
3621 static GstFlowReturn
3622 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
3623 {
3624   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3625   GstFlowReturn result;
3626   RTPJitterBufferItem *item;
3627   guint seqnum;
3628   guint32 next_seqnum;
3629
3630   /* only push buffers when PLAYING and active and not buffering */
3631   if (priv->blocked || !priv->active ||
3632       rtp_jitter_buffer_is_buffering (priv->jbuf)) {
3633     return GST_FLOW_WAIT;
3634   }
3635
3636   /* peek a buffer, we're just looking at the sequence number.
3637    * If all is fine, we'll pop and push it. If the sequence number is wrong we
3638    * wait for a timeout or something to change.
3639    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
3640   item = rtp_jitter_buffer_peek (priv->jbuf);
3641   if (item == NULL) {
3642     goto wait;
3643   }
3644
3645   /* get the seqnum and the next expected seqnum */
3646   seqnum = item->seqnum;
3647   if (seqnum == -1) {
3648     return pop_and_push_next (jitterbuffer, seqnum);
3649   }
3650
3651   next_seqnum = priv->next_seqnum;
3652
3653   /* get the gap between this and the previous packet. If we don't know the
3654    * previous packet seqnum assume no gap. */
3655   if (G_UNLIKELY (next_seqnum == -1)) {
3656     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3657     /* we don't know what the next_seqnum should be, the chain function should
3658      * have scheduled a DEADLINE timer that will increment next_seqnum when it
3659      * fires, so wait for that */
3660     result = GST_FLOW_WAIT;
3661   } else {
3662     gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
3663
3664     if (G_LIKELY (gap == 0)) {
3665       /* no missing packet, pop and push */
3666       result = pop_and_push_next (jitterbuffer, seqnum);
3667     } else if (G_UNLIKELY (gap < 0)) {
3668       /* if we have a packet that we already pushed or considered dropped, pop it
3669        * off and get the next packet */
3670       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
3671           seqnum, next_seqnum);
3672       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
3673       free_item (item);
3674       result = GST_FLOW_OK;
3675     } else {
3676       /* the chain function has scheduled timers to request retransmission or
3677        * when to consider the packet lost, wait for that */
3678       GST_DEBUG_OBJECT (jitterbuffer,
3679           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
3680           next_seqnum, seqnum, gap);
3681       /* if we have reached EOS, just keep processing */
3682       if (priv->eos) {
3683         result = pop_and_push_next (jitterbuffer, seqnum);
3684         result = GST_FLOW_OK;
3685       } else {
3686         result = GST_FLOW_WAIT;
3687       }
3688     }
3689   }
3690
3691   return result;
3692
3693 wait:
3694   {
3695     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
3696     if (priv->eos) {
3697       return GST_FLOW_EOS;
3698     } else {
3699       return GST_FLOW_WAIT;
3700     }
3701   }
3702 }
3703
3704 static GstClockTime
3705 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
3706 {
3707   GstClockTime rtx_retry_timeout;
3708   GstClockTime rtx_min_retry_timeout;
3709
3710   if (priv->rtx_retry_timeout == -1) {
3711     if (priv->avg_rtx_rtt == 0)
3712       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
3713     else
3714       /* we want to ask for a retransmission after we waited for a
3715        * complete RTT and the additional jitter */
3716       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
3717   } else {
3718     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
3719   }
3720   /* make sure we don't retry too often. On very low latency networks,
3721    * the RTT and jitter can be very low. */
3722   if (priv->rtx_min_retry_timeout == -1) {
3723     rtx_min_retry_timeout = priv->packet_spacing;
3724   } else {
3725     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
3726   }
3727   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
3728
3729   return rtx_retry_timeout;
3730 }
3731
3732 static GstClockTime
3733 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
3734     GstClockTime rtx_retry_timeout)
3735 {
3736   GstClockTime rtx_retry_period;
3737
3738   if (priv->rtx_retry_period == -1) {
3739     /* we retry up to the configured jitterbuffer size but leaving some
3740      * room for the retransmission to arrive in time */
3741     if (rtx_retry_timeout > priv->latency_ns) {
3742       rtx_retry_period = 0;
3743     } else {
3744       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
3745     }
3746   } else {
3747     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
3748   }
3749   return rtx_retry_period;
3750 }
3751
3752 /*
3753   1. For *larger* rtx-rtt, weigh a new measurement as before (1/8th)
3754   2. For *smaller* rtx-rtt, be a bit more conservative and weigh a bit less (1/16th)
3755   3. For very large measurements (> avg * 2), consider them "outliers"
3756      and count them a lot less (1/48th)
3757 */
3758 static void
3759 update_avg_rtx_rtt (GstRtpJitterBufferPrivate * priv, GstClockTime rtt)
3760 {
3761   gint weight;
3762
3763   if (priv->avg_rtx_rtt == 0) {
3764     priv->avg_rtx_rtt = rtt;
3765     return;
3766   }
3767
3768   if (rtt > 2 * priv->avg_rtx_rtt)
3769     weight = 48;
3770   else if (rtt > priv->avg_rtx_rtt)
3771     weight = 8;
3772   else
3773     weight = 16;
3774
3775   priv->avg_rtx_rtt = (rtt + (weight - 1) * priv->avg_rtx_rtt) / weight;
3776 }
3777
3778 static void
3779 update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3780     GstClockTime dts, gboolean success)
3781 {
3782   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3783   GstClockTime delay;
3784
3785   if (success) {
3786     /* we scheduled a retry for this packet and now we have it */
3787     priv->num_rtx_success++;
3788     /* all the previous retry attempts failed */
3789     priv->num_rtx_failed += timer->num_rtx_retry - 1;
3790   } else {
3791     /* All retries failed or was too late */
3792     priv->num_rtx_failed += timer->num_rtx_retry;
3793   }
3794
3795   /* number of retries before (hopefully) receiving the packet */
3796   if (priv->avg_rtx_num == 0.0)
3797     priv->avg_rtx_num = timer->num_rtx_retry;
3798   else
3799     priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
3800
3801   /* Calculate the delay between retransmission request and receiving this
3802    * packet. We have a valid delay if and only if this packet is a response to
3803    * our last request. If not we don't know if this is a response to an
3804    * earlier request and delay could be way off. For RTT is more important
3805    * with correct values than to update for every packet. */
3806   if (timer->num_rtx_retry == timer->num_rtx_received &&
3807       dts != GST_CLOCK_TIME_NONE && dts > timer->rtx_last) {
3808     delay = dts - timer->rtx_last;
3809     update_avg_rtx_rtt (priv, delay);
3810   } else {
3811     delay = 0;
3812   }
3813
3814   GST_LOG_OBJECT (jitterbuffer,
3815       "RTX #%d, result %d, success %" G_GUINT64_FORMAT ", failed %"
3816       G_GUINT64_FORMAT ", requests %" G_GUINT64_FORMAT ", dups %"
3817       G_GUINT64_FORMAT ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %"
3818       GST_TIME_FORMAT, timer->seqnum, success, priv->num_rtx_success,
3819       priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
3820       priv->avg_rtx_num, GST_TIME_ARGS (delay),
3821       GST_TIME_ARGS (priv->avg_rtx_rtt));
3822 }
3823
3824 /* the timeout for when we expected a packet expired */
3825 static gboolean
3826 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3827     GstClockTime now)
3828 {
3829   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3830   GstEvent *event;
3831   guint delay, delay_ms, avg_rtx_rtt_ms;
3832   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
3833   guint rtx_deadline_ms;
3834   GstClockTime rtx_retry_period;
3835   GstClockTime rtx_retry_timeout;
3836   GstClock *clock;
3837
3838   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
3839       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
3840
3841   rtx_retry_timeout = get_rtx_retry_timeout (priv);
3842   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
3843
3844   delay = timer->rtx_delay + timer->rtx_retry;
3845
3846   delay_ms = GST_TIME_AS_MSECONDS (delay);
3847   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
3848   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
3849   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
3850   rtx_deadline_ms =
3851       priv->rtx_deadline_ms != -1 ? priv->rtx_deadline_ms : priv->latency_ms;
3852
3853   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
3854       gst_structure_new ("GstRTPRetransmissionRequest",
3855           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
3856           "running-time", G_TYPE_UINT64, timer->rtx_base,
3857           "delay", G_TYPE_UINT, delay_ms,
3858           "retry", G_TYPE_UINT, timer->num_rtx_retry,
3859           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
3860           "period", G_TYPE_UINT, rtx_retry_period_ms,
3861           "deadline", G_TYPE_UINT, rtx_deadline_ms,
3862           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
3863           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
3864   GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
3865
3866   priv->num_rtx_requests++;
3867   timer->num_rtx_retry++;
3868
3869   GST_OBJECT_LOCK (jitterbuffer);
3870   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
3871     timer->rtx_last = gst_clock_get_time (clock);
3872     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
3873   } else {
3874     timer->rtx_last = now;
3875   }
3876   GST_OBJECT_UNLOCK (jitterbuffer);
3877
3878   /* calculate the timeout for the next retransmission attempt */
3879   timer->rtx_retry += rtx_retry_timeout;
3880   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
3881       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
3882       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
3883       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
3884   if ((priv->rtx_max_retries != -1
3885           && timer->num_rtx_retry >= priv->rtx_max_retries)
3886       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)
3887       || (timer->rtx_base + rtx_retry_period < now)) {
3888     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
3889     /* too many retransmission request, we now convert the timer
3890      * to a lost timer, leave the num_rtx_retry as it is for stats */
3891     timer->type = TIMER_TYPE_LOST;
3892     timer->rtx_delay = 0;
3893     timer->rtx_retry = 0;
3894   }
3895   reschedule_timer (jitterbuffer, timer, timer->seqnum,
3896       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
3897
3898   JBUF_UNLOCK (priv);
3899   gst_pad_push_event (priv->sinkpad, event);
3900   JBUF_LOCK (priv);
3901
3902   return FALSE;
3903 }
3904
3905 /* a packet is lost */
3906 static gboolean
3907 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3908     GstClockTime now)
3909 {
3910   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3911   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
3912   gboolean head;
3913   GstEvent *event = NULL;
3914   RTPJitterBufferItem *item;
3915
3916   seqnum = timer->seqnum;
3917   lost_packets = MAX (timer->num, 1);
3918   num_rtx_retry = timer->num_rtx_retry;
3919
3920   /* we had a gap and thus we lost some packets. Create an event for this.  */
3921   if (lost_packets > 1)
3922     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
3923         seqnum + lost_packets - 1);
3924   else
3925     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
3926
3927   priv->num_lost += lost_packets;
3928   priv->num_rtx_failed += num_rtx_retry;
3929
3930   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
3931
3932   /* we now only accept seqnum bigger than this */
3933   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
3934     priv->next_in_seqnum = next_in_seqnum;
3935     priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout);
3936   }
3937
3938   /* Avoid creating events if we don't need it. Note that we still need to create
3939    * the lost *ITEM* since it will be used to notify the outgoing thread of
3940    * lost items (so that we can set discont flags and such) */
3941   if (priv->do_lost) {
3942     GstClockTime duration, timestamp;
3943     /* create paket lost event */
3944     timestamp = apply_offset (jitterbuffer, timer->timeout);
3945     duration = timer->duration;
3946     if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
3947       duration = priv->packet_spacing;
3948     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
3949         gst_structure_new ("GstRTPPacketLost",
3950             "seqnum", G_TYPE_UINT, (guint) seqnum,
3951             "timestamp", G_TYPE_UINT64, timestamp,
3952             "duration", G_TYPE_UINT64, duration,
3953             "retry", G_TYPE_UINT, num_rtx_retry, NULL));
3954   }
3955   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
3956   if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL))
3957     /* Duplicate */
3958     free_item (item);
3959
3960   if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
3961     /* Store info to update stats if the packet arrives too late */
3962     timer_queue_append (priv->rtx_stats_timers, timer,
3963         now + priv->rtx_stats_timeout * GST_MSECOND, TRUE);
3964   }
3965   remove_timer (jitterbuffer, timer);
3966
3967   if (head)
3968     JBUF_SIGNAL_EVENT (priv);
3969
3970   return TRUE;
3971 }
3972
3973 static gboolean
3974 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3975     GstClockTime now)
3976 {
3977   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3978
3979   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3980   remove_timer (jitterbuffer, timer);
3981   if (!priv->eos) {
3982     GstEvent *event;
3983
3984     /* there was no EOS in the buffer, put one in there now */
3985     event = gst_event_new_eos ();
3986     if (priv->segment_seqnum != GST_SEQNUM_INVALID)
3987       gst_event_set_seqnum (event, priv->segment_seqnum);
3988     queue_event (jitterbuffer, event);
3989   }
3990   JBUF_SIGNAL_EVENT (priv);
3991
3992   return TRUE;
3993 }
3994
3995 static gboolean
3996 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3997     GstClockTime now)
3998 {
3999   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4000
4001   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
4002
4003   /* timer seqnum might have been obsoleted by caps seqnum-base,
4004    * only mess with current ongoing seqnum if still unknown */
4005   if (priv->next_seqnum == -1)
4006     priv->next_seqnum = timer->seqnum;
4007   remove_timer (jitterbuffer, timer);
4008   JBUF_SIGNAL_EVENT (priv);
4009
4010   return TRUE;
4011 }
4012
4013 static gboolean
4014 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
4015     GstClockTime now)
4016 {
4017   gboolean removed = FALSE;
4018
4019   switch (timer->type) {
4020     case TIMER_TYPE_EXPECTED:
4021       removed = do_expected_timeout (jitterbuffer, timer, now);
4022       break;
4023     case TIMER_TYPE_LOST:
4024       removed = do_lost_timeout (jitterbuffer, timer, now);
4025       break;
4026     case TIMER_TYPE_DEADLINE:
4027       removed = do_deadline_timeout (jitterbuffer, timer, now);
4028       break;
4029     case TIMER_TYPE_EOS:
4030       removed = do_eos_timeout (jitterbuffer, timer, now);
4031       break;
4032   }
4033   return removed;
4034 }
4035
4036 /* called when we need to wait for the next timeout.
4037  *
4038  * We loop over the array of recorded timeouts and wait for the earliest one.
4039  * When it timed out, do the logic associated with the timer.
4040  *
4041  * If there are no timers, we wait on a gcond until something new happens.
4042  */
4043 static void
4044 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
4045 {
4046   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4047   GstClockTime now = 0;
4048
4049   JBUF_LOCK (priv);
4050   while (priv->timer_running) {
4051     TimerData *timer = NULL;
4052     GstClockTime timer_timeout = -1;
4053     gint i, len;
4054
4055     /* If we have a clock, update "now" now with the very
4056      * latest running time we have. If timers are unscheduled below we
4057      * otherwise wouldn't update now (it's only updated when timers
4058      * expire), and also for the very first loop iteration now would
4059      * otherwise always be 0
4060      */
4061     GST_OBJECT_LOCK (jitterbuffer);
4062     if (priv->eos) {
4063       now = GST_CLOCK_TIME_NONE;
4064     } else if (GST_ELEMENT_CLOCK (jitterbuffer)) {
4065       now =
4066           gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
4067           GST_ELEMENT_CAST (jitterbuffer)->base_time;
4068     }
4069     GST_OBJECT_UNLOCK (jitterbuffer);
4070
4071     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
4072         GST_TIME_ARGS (now));
4073
4074     /* Clear expired rtx-stats timers */
4075     if (priv->do_retransmission)
4076       timer_queue_clear_until (priv->rtx_stats_timers, now);
4077
4078     /* Iterate "normal" timers */
4079     len = priv->timers->len;
4080     for (i = 0; i < len;) {
4081       TimerData *test = &g_array_index (priv->timers, TimerData, i);
4082       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
4083       gboolean save_best = FALSE;
4084
4085       GST_DEBUG_OBJECT (jitterbuffer,
4086           "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i,
4087           test->type, test->seqnum, GST_TIME_ARGS (test_timeout),
4088           GST_STIME_ARGS ((gint64) (test_timeout - now)));
4089
4090       /* Weed out anything too late */
4091       if (test->type == TIMER_TYPE_LOST &&
4092           (test_timeout == -1 || test_timeout <= now)) {
4093         GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry");
4094         do_lost_timeout (jitterbuffer, test, now);
4095         if (!priv->timer_running)
4096           break;
4097         /* We don't move the iterator forward since we just removed the current entry,
4098          * but we update the termination condition */
4099         len = priv->timers->len;
4100       } else {
4101         /* find the smallest timeout */
4102         if (timer == NULL) {
4103           save_best = TRUE;
4104         } else if (timer_timeout == -1) {
4105           /* we already have an immediate timeout, the new timer must be an
4106            * immediate timer with smaller seqnum to become the best */
4107           if (test_timeout == -1
4108               && (gst_rtp_buffer_compare_seqnum (test->seqnum,
4109                       timer->seqnum) > 0))
4110             save_best = TRUE;
4111         } else if (test_timeout == -1) {
4112           /* first immediate timer */
4113           save_best = TRUE;
4114         } else if (test_timeout < timer_timeout) {
4115           /* earlier timer */
4116           save_best = TRUE;
4117         } else if (test_timeout == timer_timeout
4118             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
4119                     timer->seqnum) > 0)) {
4120           /* same timer, smaller seqnum */
4121           save_best = TRUE;
4122         }
4123
4124         if (save_best) {
4125           GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
4126           timer = test;
4127           timer_timeout = test_timeout;
4128         }
4129         i++;
4130       }
4131     }
4132     if (timer && !priv->blocked) {
4133       GstClock *clock;
4134       GstClockTime sync_time;
4135       GstClockID id;
4136       GstClockReturn ret;
4137       GstClockTimeDiff clock_jitter;
4138
4139       if (timer_timeout == -1 || timer_timeout <= now || priv->eos) {
4140         /* We have normally removed all lost timers in the loop above */
4141         g_assert (timer->type != TIMER_TYPE_LOST);
4142
4143         do_timeout (jitterbuffer, timer, now);
4144         /* check here, do_timeout could have released the lock */
4145         if (!priv->timer_running)
4146           break;
4147         continue;
4148       }
4149
4150       GST_OBJECT_LOCK (jitterbuffer);
4151       clock = GST_ELEMENT_CLOCK (jitterbuffer);
4152       if (!clock) {
4153         GST_OBJECT_UNLOCK (jitterbuffer);
4154         /* let's just push if there is no clock */
4155         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
4156         now = timer_timeout;
4157         continue;
4158       }
4159
4160       /* prepare for sync against clock */
4161       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
4162       /* add latency of peer to get input time */
4163       sync_time += priv->peer_latency;
4164
4165       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
4166           " with sync time %" GST_TIME_FORMAT,
4167           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
4168
4169       /* create an entry for the clock */
4170       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
4171       priv->timer_timeout = timer_timeout;
4172       priv->timer_seqnum = timer->seqnum;
4173       GST_OBJECT_UNLOCK (jitterbuffer);
4174
4175       /* release the lock so that the other end can push stuff or unlock */
4176       JBUF_UNLOCK (priv);
4177
4178       ret = gst_clock_id_wait (id, &clock_jitter);
4179
4180       JBUF_LOCK (priv);
4181       if (!priv->timer_running) {
4182         gst_clock_id_unref (id);
4183         priv->clock_id = NULL;
4184         break;
4185       }
4186
4187       if (ret != GST_CLOCK_UNSCHEDULED) {
4188         now = timer_timeout + MAX (clock_jitter, 0);
4189         GST_DEBUG_OBJECT (jitterbuffer,
4190             "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
4191             GST_STIME_ARGS (clock_jitter));
4192       } else {
4193         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
4194       }
4195       /* and free the entry */
4196       gst_clock_id_unref (id);
4197       priv->clock_id = NULL;
4198     } else {
4199       /* no timers, wait for activity */
4200       JBUF_WAIT_TIMER (priv);
4201     }
4202   }
4203   JBUF_UNLOCK (priv);
4204
4205   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
4206   return;
4207 }
4208
4209 /*
4210  * This funcion implements the main pushing loop on the source pad.
4211  *
4212  * It first tries to push as many buffers as possible. If there is a seqnum
4213  * mismatch, we wait for the next timeouts.
4214  */
4215 static void
4216 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
4217 {
4218   GstRtpJitterBufferPrivate *priv;
4219   GstFlowReturn result = GST_FLOW_OK;
4220
4221   priv = jitterbuffer->priv;
4222
4223   JBUF_LOCK_CHECK (priv, flushing);
4224   do {
4225     result = handle_next_buffer (jitterbuffer);
4226     JBUF_SIGNAL_QUEUE (priv);
4227     if (G_LIKELY (result == GST_FLOW_WAIT)) {
4228       /* now wait for the next event */
4229       JBUF_WAIT_EVENT (priv, flushing);
4230       result = GST_FLOW_OK;
4231     }
4232   } while (result == GST_FLOW_OK);
4233   /* store result for upstream */
4234   priv->srcresult = result;
4235   /* if we get here we need to pause */
4236   goto pause;
4237
4238   /* ERRORS */
4239 flushing:
4240   {
4241     result = priv->srcresult;
4242     goto pause;
4243   }
4244 pause:
4245   {
4246     GstEvent *event;
4247
4248     JBUF_SIGNAL_QUERY (priv, FALSE);
4249     JBUF_UNLOCK (priv);
4250
4251     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
4252         gst_flow_get_name (result));
4253     gst_pad_pause_task (priv->srcpad);
4254     if (result == GST_FLOW_EOS) {
4255       event = gst_event_new_eos ();
4256       if (priv->segment_seqnum != GST_SEQNUM_INVALID)
4257         gst_event_set_seqnum (event, priv->segment_seqnum);
4258       gst_pad_push_event (priv->srcpad, event);
4259     }
4260     return;
4261   }
4262 }
4263
4264 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
4265  * some sanity checks and then emit the handle-sync signal with the parameters.
4266  * This function must be called with the LOCK */
4267 static void
4268 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
4269 {
4270   GstRtpJitterBufferPrivate *priv;
4271   guint64 base_rtptime, base_time;
4272   guint32 clock_rate;
4273   guint64 last_rtptime;
4274   guint64 clock_base;
4275   guint64 ext_rtptime, diff;
4276   gboolean valid = TRUE, keep = FALSE;
4277
4278   priv = jitterbuffer->priv;
4279
4280   /* get the last values from the jitterbuffer */
4281   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
4282       &clock_rate, &last_rtptime);
4283
4284   clock_base = priv->clock_base;
4285   ext_rtptime = priv->ext_rtptime;
4286
4287   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
4288       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
4289       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
4290       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
4291
4292   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
4293     /* we keep this SR packet for later. When we get a valid RTP packet the
4294      * above values will be set and we can try to use the SR packet */
4295     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
4296     keep = TRUE;
4297   } else {
4298     /* we can't accept anything that happened before we did the last resync */
4299     if (base_rtptime > ext_rtptime) {
4300       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
4301       valid = FALSE;
4302     } else {
4303       /* the SR RTP timestamp must be something close to what we last observed
4304        * in the jitterbuffer */
4305       if (ext_rtptime > last_rtptime) {
4306         /* check how far ahead it is to our RTP timestamps */
4307         diff = ext_rtptime - last_rtptime;
4308         /* if bigger than 1 second, we drop it */
4309         if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 &&
4310             diff >
4311             gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff,
4312                 clock_rate, 1000)) {
4313           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
4314           /* should drop this, but some RTSP servers end up with bogus
4315            * way too ahead RTCP packet when repeated PAUSE/PLAY,
4316            * so still trigger rptbin sync but invalidate RTCP data
4317            * (sync might use other methods) */
4318           ext_rtptime = -1;
4319         }
4320         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
4321             G_GUINT64_FORMAT, last_rtptime, diff);
4322       }
4323     }
4324   }
4325
4326   if (keep) {
4327     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
4328   } else if (valid) {
4329     GstStructure *s;
4330
4331     s = gst_structure_new ("application/x-rtp-sync",
4332         "base-rtptime", G_TYPE_UINT64, base_rtptime,
4333         "base-time", G_TYPE_UINT64, base_time,
4334         "clock-rate", G_TYPE_UINT, clock_rate,
4335         "clock-base", G_TYPE_UINT64, clock_base,
4336         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
4337         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
4338
4339     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
4340     gst_buffer_replace (&priv->last_sr, NULL);
4341     JBUF_UNLOCK (priv);
4342     g_signal_emit (jitterbuffer,
4343         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
4344     JBUF_LOCK (priv);
4345     gst_structure_free (s);
4346   } else {
4347     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
4348     gst_buffer_replace (&priv->last_sr, NULL);
4349   }
4350 }
4351
4352 static GstFlowReturn
4353 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
4354     GstBuffer * buffer)
4355 {
4356   GstRtpJitterBuffer *jitterbuffer;
4357   GstRtpJitterBufferPrivate *priv;
4358   GstFlowReturn ret = GST_FLOW_OK;
4359   guint32 ssrc;
4360   GstRTCPPacket packet;
4361   guint64 ext_rtptime;
4362   guint32 rtptime;
4363   GstRTCPBuffer rtcp = { NULL, };
4364
4365   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4366
4367   if (G_UNLIKELY (!gst_rtcp_buffer_validate_reduced (buffer)))
4368     goto invalid_buffer;
4369
4370   priv = jitterbuffer->priv;
4371
4372   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
4373
4374   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
4375     goto empty_buffer;
4376
4377   /* first packet must be SR or RR or else the validate would have failed */
4378   switch (gst_rtcp_packet_get_type (&packet)) {
4379     case GST_RTCP_TYPE_SR:
4380       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
4381           NULL, NULL);
4382       break;
4383     default:
4384       goto ignore_buffer;
4385   }
4386   gst_rtcp_buffer_unmap (&rtcp);
4387
4388   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
4389
4390   JBUF_LOCK (priv);
4391   /* convert the RTP timestamp to our extended timestamp, using the same offset
4392    * we used in the jitterbuffer */
4393   ext_rtptime = priv->jbuf->ext_rtptime;
4394   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
4395
4396   priv->ext_rtptime = ext_rtptime;
4397   gst_buffer_replace (&priv->last_sr, buffer);
4398
4399   do_handle_sync (jitterbuffer);
4400   JBUF_UNLOCK (priv);
4401
4402 done:
4403   gst_buffer_unref (buffer);
4404
4405   return ret;
4406
4407 invalid_buffer:
4408   {
4409     /* this is not fatal but should be filtered earlier */
4410     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4411         ("Received invalid RTCP payload, dropping"));
4412     ret = GST_FLOW_OK;
4413     goto done;
4414   }
4415 empty_buffer:
4416   {
4417     /* this is not fatal but should be filtered earlier */
4418     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4419         ("Received empty RTCP payload, dropping"));
4420     gst_rtcp_buffer_unmap (&rtcp);
4421     ret = GST_FLOW_OK;
4422     goto done;
4423   }
4424 ignore_buffer:
4425   {
4426     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
4427     gst_rtcp_buffer_unmap (&rtcp);
4428     ret = GST_FLOW_OK;
4429     goto done;
4430   }
4431 }
4432
4433 static gboolean
4434 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
4435     GstQuery * query)
4436 {
4437   gboolean res = FALSE;
4438   GstRtpJitterBuffer *jitterbuffer;
4439   GstRtpJitterBufferPrivate *priv;
4440
4441   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4442   priv = jitterbuffer->priv;
4443
4444   switch (GST_QUERY_TYPE (query)) {
4445     case GST_QUERY_CAPS:
4446     {
4447       GstCaps *filter, *caps;
4448
4449       gst_query_parse_caps (query, &filter);
4450       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4451       gst_query_set_caps_result (query, caps);
4452       gst_caps_unref (caps);
4453       res = TRUE;
4454       break;
4455     }
4456     default:
4457       if (GST_QUERY_IS_SERIALIZED (query)) {
4458         RTPJitterBufferItem *item;
4459         gboolean head;
4460
4461         JBUF_LOCK_CHECK (priv, out_flushing);
4462         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
4463             RTP_JITTER_BUFFER_MODE_BUFFER) {
4464           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
4465           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
4466           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
4467           if (head)
4468             JBUF_SIGNAL_EVENT (priv);
4469           JBUF_WAIT_QUERY (priv, out_flushing);
4470           res = priv->last_query;
4471         } else {
4472           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
4473           res = FALSE;
4474         }
4475         JBUF_UNLOCK (priv);
4476       } else {
4477         res = gst_pad_query_default (pad, parent, query);
4478       }
4479       break;
4480   }
4481   return res;
4482   /* ERRORS */
4483 out_flushing:
4484   {
4485     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
4486     JBUF_UNLOCK (priv);
4487     return FALSE;
4488   }
4489
4490 }
4491
4492 static gboolean
4493 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
4494     GstQuery * query)
4495 {
4496   GstRtpJitterBuffer *jitterbuffer;
4497   GstRtpJitterBufferPrivate *priv;
4498   gboolean res = FALSE;
4499
4500   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4501   priv = jitterbuffer->priv;
4502
4503   switch (GST_QUERY_TYPE (query)) {
4504     case GST_QUERY_LATENCY:
4505     {
4506       /* We need to send the query upstream and add the returned latency to our
4507        * own */
4508       GstClockTime min_latency, max_latency;
4509       gboolean us_live;
4510       GstClockTime our_latency;
4511
4512       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
4513         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
4514
4515         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
4516             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4517             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4518
4519         /* store this so that we can safely sync on the peer buffers. */
4520         JBUF_LOCK (priv);
4521         priv->peer_latency = min_latency;
4522         our_latency = priv->latency_ns;
4523         JBUF_UNLOCK (priv);
4524
4525         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
4526             GST_TIME_ARGS (our_latency));
4527
4528         /* we add some latency but can buffer an infinite amount of time */
4529         min_latency += our_latency;
4530         max_latency = -1;
4531
4532         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
4533             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4534             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4535
4536         gst_query_set_latency (query, TRUE, min_latency, max_latency);
4537       }
4538       break;
4539     }
4540     case GST_QUERY_POSITION:
4541     {
4542       GstClockTime start, last_out;
4543       GstFormat fmt;
4544
4545       gst_query_parse_position (query, &fmt, NULL);
4546       if (fmt != GST_FORMAT_TIME) {
4547         res = gst_pad_query_default (pad, parent, query);
4548         break;
4549       }
4550
4551       JBUF_LOCK (priv);
4552       start = priv->npt_start;
4553       last_out = priv->last_out_time;
4554       JBUF_UNLOCK (priv);
4555
4556       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
4557           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
4558           GST_TIME_ARGS (last_out));
4559
4560       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
4561         /* bring 0-based outgoing time to stream time */
4562         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
4563         res = TRUE;
4564       } else {
4565         res = gst_pad_query_default (pad, parent, query);
4566       }
4567       break;
4568     }
4569     case GST_QUERY_CAPS:
4570     {
4571       GstCaps *filter, *caps;
4572
4573       gst_query_parse_caps (query, &filter);
4574       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4575       gst_query_set_caps_result (query, caps);
4576       gst_caps_unref (caps);
4577       res = TRUE;
4578       break;
4579     }
4580     default:
4581       res = gst_pad_query_default (pad, parent, query);
4582       break;
4583   }
4584
4585   return res;
4586 }
4587
4588 static void
4589 gst_rtp_jitter_buffer_set_property (GObject * object,
4590     guint prop_id, const GValue * value, GParamSpec * pspec)
4591 {
4592   GstRtpJitterBuffer *jitterbuffer;
4593   GstRtpJitterBufferPrivate *priv;
4594
4595   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4596   priv = jitterbuffer->priv;
4597
4598   switch (prop_id) {
4599     case PROP_LATENCY:
4600     {
4601       guint new_latency, old_latency;
4602
4603       new_latency = g_value_get_uint (value);
4604
4605       JBUF_LOCK (priv);
4606       old_latency = priv->latency_ms;
4607       priv->latency_ms = new_latency;
4608       priv->latency_ns = priv->latency_ms * GST_MSECOND;
4609       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
4610       JBUF_UNLOCK (priv);
4611
4612       /* post message if latency changed, this will inform the parent pipeline
4613        * that a latency reconfiguration is possible/needed. */
4614       if (new_latency != old_latency) {
4615         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
4616             GST_TIME_ARGS (new_latency * GST_MSECOND));
4617
4618         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
4619             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
4620       }
4621       break;
4622     }
4623     case PROP_DROP_ON_LATENCY:
4624       JBUF_LOCK (priv);
4625       priv->drop_on_latency = g_value_get_boolean (value);
4626       JBUF_UNLOCK (priv);
4627       break;
4628     case PROP_TS_OFFSET:
4629       JBUF_LOCK (priv);
4630       if (priv->max_ts_offset_adjustment != 0) {
4631         gint64 new_offset = g_value_get_int64 (value);
4632
4633         if (new_offset > priv->ts_offset) {
4634           priv->ts_offset_remainder = new_offset - priv->ts_offset;
4635         } else {
4636           priv->ts_offset_remainder = -(priv->ts_offset - new_offset);
4637         }
4638       } else {
4639         priv->ts_offset = g_value_get_int64 (value);
4640         priv->ts_offset_remainder = 0;
4641       }
4642       priv->ts_discont = TRUE;
4643       JBUF_UNLOCK (priv);
4644       break;
4645     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4646       JBUF_LOCK (priv);
4647       priv->max_ts_offset_adjustment = g_value_get_uint64 (value);
4648       JBUF_UNLOCK (priv);
4649       break;
4650     case PROP_DO_LOST:
4651       JBUF_LOCK (priv);
4652       priv->do_lost = g_value_get_boolean (value);
4653       JBUF_UNLOCK (priv);
4654       break;
4655     case PROP_MODE:
4656       JBUF_LOCK (priv);
4657       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
4658       JBUF_UNLOCK (priv);
4659       break;
4660     case PROP_DO_RETRANSMISSION:
4661       JBUF_LOCK (priv);
4662       priv->do_retransmission = g_value_get_boolean (value);
4663       JBUF_UNLOCK (priv);
4664       break;
4665     case PROP_RTX_NEXT_SEQNUM:
4666       JBUF_LOCK (priv);
4667       priv->rtx_next_seqnum = g_value_get_boolean (value);
4668       JBUF_UNLOCK (priv);
4669       break;
4670     case PROP_RTX_DELAY:
4671       JBUF_LOCK (priv);
4672       priv->rtx_delay = g_value_get_int (value);
4673       JBUF_UNLOCK (priv);
4674       break;
4675     case PROP_RTX_MIN_DELAY:
4676       JBUF_LOCK (priv);
4677       priv->rtx_min_delay = g_value_get_uint (value);
4678       JBUF_UNLOCK (priv);
4679       break;
4680     case PROP_RTX_DELAY_REORDER:
4681       JBUF_LOCK (priv);
4682       priv->rtx_delay_reorder = g_value_get_int (value);
4683       JBUF_UNLOCK (priv);
4684       break;
4685     case PROP_RTX_RETRY_TIMEOUT:
4686       JBUF_LOCK (priv);
4687       priv->rtx_retry_timeout = g_value_get_int (value);
4688       JBUF_UNLOCK (priv);
4689       break;
4690     case PROP_RTX_MIN_RETRY_TIMEOUT:
4691       JBUF_LOCK (priv);
4692       priv->rtx_min_retry_timeout = g_value_get_int (value);
4693       JBUF_UNLOCK (priv);
4694       break;
4695     case PROP_RTX_RETRY_PERIOD:
4696       JBUF_LOCK (priv);
4697       priv->rtx_retry_period = g_value_get_int (value);
4698       JBUF_UNLOCK (priv);
4699       break;
4700     case PROP_RTX_MAX_RETRIES:
4701       JBUF_LOCK (priv);
4702       priv->rtx_max_retries = g_value_get_int (value);
4703       JBUF_UNLOCK (priv);
4704       break;
4705     case PROP_RTX_DEADLINE:
4706       JBUF_LOCK (priv);
4707       priv->rtx_deadline_ms = g_value_get_int (value);
4708       JBUF_UNLOCK (priv);
4709       break;
4710     case PROP_RTX_STATS_TIMEOUT:
4711       JBUF_LOCK (priv);
4712       priv->rtx_stats_timeout = g_value_get_uint (value);
4713       JBUF_UNLOCK (priv);
4714       break;
4715     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4716       JBUF_LOCK (priv);
4717       priv->max_rtcp_rtp_time_diff = g_value_get_int (value);
4718       JBUF_UNLOCK (priv);
4719       break;
4720     case PROP_MAX_DROPOUT_TIME:
4721       JBUF_LOCK (priv);
4722       priv->max_dropout_time = g_value_get_uint (value);
4723       JBUF_UNLOCK (priv);
4724       break;
4725     case PROP_MAX_MISORDER_TIME:
4726       JBUF_LOCK (priv);
4727       priv->max_misorder_time = g_value_get_uint (value);
4728       JBUF_UNLOCK (priv);
4729       break;
4730     case PROP_RFC7273_SYNC:
4731       JBUF_LOCK (priv);
4732       rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
4733           g_value_get_boolean (value));
4734       JBUF_UNLOCK (priv);
4735       break;
4736     case PROP_FASTSTART_MIN_PACKETS:
4737       JBUF_LOCK (priv);
4738       priv->faststart_min_packets = g_value_get_uint (value);
4739       JBUF_UNLOCK (priv);
4740       break;
4741     default:
4742       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4743       break;
4744   }
4745 }
4746
4747 static void
4748 gst_rtp_jitter_buffer_get_property (GObject * object,
4749     guint prop_id, GValue * value, GParamSpec * pspec)
4750 {
4751   GstRtpJitterBuffer *jitterbuffer;
4752   GstRtpJitterBufferPrivate *priv;
4753
4754   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4755   priv = jitterbuffer->priv;
4756
4757   switch (prop_id) {
4758     case PROP_LATENCY:
4759       JBUF_LOCK (priv);
4760       g_value_set_uint (value, priv->latency_ms);
4761       JBUF_UNLOCK (priv);
4762       break;
4763     case PROP_DROP_ON_LATENCY:
4764       JBUF_LOCK (priv);
4765       g_value_set_boolean (value, priv->drop_on_latency);
4766       JBUF_UNLOCK (priv);
4767       break;
4768     case PROP_TS_OFFSET:
4769       JBUF_LOCK (priv);
4770       g_value_set_int64 (value, priv->ts_offset);
4771       JBUF_UNLOCK (priv);
4772       break;
4773     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4774       JBUF_LOCK (priv);
4775       g_value_set_uint64 (value, priv->max_ts_offset_adjustment);
4776       JBUF_UNLOCK (priv);
4777       break;
4778     case PROP_DO_LOST:
4779       JBUF_LOCK (priv);
4780       g_value_set_boolean (value, priv->do_lost);
4781       JBUF_UNLOCK (priv);
4782       break;
4783     case PROP_MODE:
4784       JBUF_LOCK (priv);
4785       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
4786       JBUF_UNLOCK (priv);
4787       break;
4788     case PROP_PERCENT:
4789     {
4790       gint percent;
4791
4792       JBUF_LOCK (priv);
4793       if (priv->srcresult != GST_FLOW_OK)
4794         percent = 100;
4795       else
4796         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
4797
4798       g_value_set_int (value, percent);
4799       JBUF_UNLOCK (priv);
4800       break;
4801     }
4802     case PROP_DO_RETRANSMISSION:
4803       JBUF_LOCK (priv);
4804       g_value_set_boolean (value, priv->do_retransmission);
4805       JBUF_UNLOCK (priv);
4806       break;
4807     case PROP_RTX_NEXT_SEQNUM:
4808       JBUF_LOCK (priv);
4809       g_value_set_boolean (value, priv->rtx_next_seqnum);
4810       JBUF_UNLOCK (priv);
4811       break;
4812     case PROP_RTX_DELAY:
4813       JBUF_LOCK (priv);
4814       g_value_set_int (value, priv->rtx_delay);
4815       JBUF_UNLOCK (priv);
4816       break;
4817     case PROP_RTX_MIN_DELAY:
4818       JBUF_LOCK (priv);
4819       g_value_set_uint (value, priv->rtx_min_delay);
4820       JBUF_UNLOCK (priv);
4821       break;
4822     case PROP_RTX_DELAY_REORDER:
4823       JBUF_LOCK (priv);
4824       g_value_set_int (value, priv->rtx_delay_reorder);
4825       JBUF_UNLOCK (priv);
4826       break;
4827     case PROP_RTX_RETRY_TIMEOUT:
4828       JBUF_LOCK (priv);
4829       g_value_set_int (value, priv->rtx_retry_timeout);
4830       JBUF_UNLOCK (priv);
4831       break;
4832     case PROP_RTX_MIN_RETRY_TIMEOUT:
4833       JBUF_LOCK (priv);
4834       g_value_set_int (value, priv->rtx_min_retry_timeout);
4835       JBUF_UNLOCK (priv);
4836       break;
4837     case PROP_RTX_RETRY_PERIOD:
4838       JBUF_LOCK (priv);
4839       g_value_set_int (value, priv->rtx_retry_period);
4840       JBUF_UNLOCK (priv);
4841       break;
4842     case PROP_RTX_MAX_RETRIES:
4843       JBUF_LOCK (priv);
4844       g_value_set_int (value, priv->rtx_max_retries);
4845       JBUF_UNLOCK (priv);
4846       break;
4847     case PROP_RTX_DEADLINE:
4848       JBUF_LOCK (priv);
4849       g_value_set_int (value, priv->rtx_deadline_ms);
4850       JBUF_UNLOCK (priv);
4851       break;
4852     case PROP_RTX_STATS_TIMEOUT:
4853       JBUF_LOCK (priv);
4854       g_value_set_uint (value, priv->rtx_stats_timeout);
4855       JBUF_UNLOCK (priv);
4856       break;
4857     case PROP_STATS:
4858       g_value_take_boxed (value,
4859           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
4860       break;
4861     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4862       JBUF_LOCK (priv);
4863       g_value_set_int (value, priv->max_rtcp_rtp_time_diff);
4864       JBUF_UNLOCK (priv);
4865       break;
4866     case PROP_MAX_DROPOUT_TIME:
4867       JBUF_LOCK (priv);
4868       g_value_set_uint (value, priv->max_dropout_time);
4869       JBUF_UNLOCK (priv);
4870       break;
4871     case PROP_MAX_MISORDER_TIME:
4872       JBUF_LOCK (priv);
4873       g_value_set_uint (value, priv->max_misorder_time);
4874       JBUF_UNLOCK (priv);
4875       break;
4876     case PROP_RFC7273_SYNC:
4877       JBUF_LOCK (priv);
4878       g_value_set_boolean (value,
4879           rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
4880       JBUF_UNLOCK (priv);
4881       break;
4882     case PROP_FASTSTART_MIN_PACKETS:
4883       JBUF_LOCK (priv);
4884       g_value_set_uint (value, priv->faststart_min_packets);
4885       JBUF_UNLOCK (priv);
4886       break;
4887     default:
4888       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4889       break;
4890   }
4891 }
4892
4893 static GstStructure *
4894 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
4895 {
4896   GstRtpJitterBufferPrivate *priv = jbuf->priv;
4897   GstStructure *s;
4898
4899   JBUF_LOCK (priv);
4900   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
4901       "num-pushed", G_TYPE_UINT64, priv->num_pushed,
4902       "num-lost", G_TYPE_UINT64, priv->num_lost,
4903       "num-late", G_TYPE_UINT64, priv->num_late,
4904       "num-duplicates", G_TYPE_UINT64, priv->num_duplicates,
4905       "avg-jitter", G_TYPE_UINT64, priv->avg_jitter,
4906       "rtx-count", G_TYPE_UINT64, priv->num_rtx_requests,
4907       "rtx-success-count", G_TYPE_UINT64, priv->num_rtx_success,
4908       "rtx-per-packet", G_TYPE_DOUBLE, priv->avg_rtx_num,
4909       "rtx-rtt", G_TYPE_UINT64, priv->avg_rtx_rtt, NULL);
4910   JBUF_UNLOCK (priv);
4911
4912   return s;
4913 }